提交 bd50328e authored 作者: Thomas Mueller's avatar Thomas Mueller

A persistent multi-version map: support store() in a background thread

上级 0aa0c2be
......@@ -243,10 +243,14 @@ Caching is done on the page level.
The page cache is a concurrent LIRS cache,
which should be resistant against scan operations.
</p><p>
Concurrent modification operations on the maps are currently not supported,
Concurrent modification operations on a map are currently not supported
(the same as <code>HashMap</code> and <code>TreeMap</code>),
however it is planned to support an additional map implementation
that supports concurrent writes
(at the cost of speed if used in a single thread, same as <code>ConcurrentHashMap</code>).
</p><p>
Storing changes can occur concurrently to modifying the data,
as <code>store()</code> operates on a snapshot.
</p>
<h3>Log Structured Storage</h3>
......
......@@ -682,9 +682,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Set the position of the root page.
*
* @param rootPos the position, 0 for empty
* @param version the version of the root
*/
void setRootPos(long rootPos) {
void setRootPos(long rootPos, long version) {
root = rootPos == 0 ? Page.createEmpty(this, -1) : readPage(rootPos);
root.setVersion(version);
}
/**
......@@ -899,7 +901,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
Page newest = null;
// need to copy because it can change
Page r = root;
if (r.getVersion() == version) {
if (r.getVersion() <= version && r.getVersion() >= 0) {
newest = r;
} else {
// find the newest page that has a getVersion() <= version
......
......@@ -40,10 +40,9 @@ header:
H:3,...
TODO:
- build script
- test concurrent storing in a background thread
- store store creation in file header, and seconds since creation
-- in chunk header (plus a counter)
-- in chunk header (plus a counter) - ensure time never goes backwards
- recovery: keep some old chunks; don't overwritten
-- for 5 minutes (configurable)
- allocate memory with Utils.newBytes and so on
......@@ -91,7 +90,8 @@ TODO:
- and maps without keys (counted b-tree)
- use a small object cache (StringCache)
- dump values
- tool to import / manipulate CSV files
- tool to import / manipulate CSV files (maybe concurrently)
- map split / merge (fast if no overlap)
*/
......@@ -216,7 +216,7 @@ public class MVStore {
String r = oldMeta.get("root." + template.getId());
long rootPos = r == null ? 0 : Long.parseLong(r);
MVMap<?, ?> m = template.openReadOnly();
m.setRootPos(rootPos);
m.setRootPos(rootPos, version);
return (T) m;
}
......@@ -286,7 +286,7 @@ public class MVStore {
root = r == null ? 0 : Long.parseLong(r);
}
m.open(this, c);
m.setRootPos(root);
m.setRootPos(root, -1);
maps.put(name, m);
return (T) m;
}
......@@ -317,7 +317,7 @@ public class MVStore {
}
c = readChunkHeader(c.start);
MVMap<String, String> oldMeta = meta.openReadOnly();
oldMeta.setRootPos(c.metaRootPos);
oldMeta.setRootPos(c.metaRootPos, version);
return oldMeta;
}
......@@ -414,7 +414,7 @@ public class MVStore {
Chunk header = readChunkHeader(rootChunkStart);
lastChunkId = header.id;
chunks.put(header.id, header);
meta.setRootPos(header.metaRootPos);
meta.setRootPos(header.metaRootPos, -1);
Iterator<String> it = meta.keyIterator("chunk.");
while (it.hasNext()) {
String s = it.next();
......@@ -553,6 +553,10 @@ public class MVStore {
return currentVersion;
}
int currentUnsavedPageCount = unsavedPageCount;
long storeVersion = currentVersion;
long version = incrementVersion();
// the last chunk was not completely correct in the last store()
// this needs to be updated now (it's better not to update right after
// storing, because that would modify the meta map again)
......@@ -565,7 +569,7 @@ public class MVStore {
c.maxLengthLive = Long.MAX_VALUE;
c.start = Long.MAX_VALUE;
c.length = Integer.MAX_VALUE;
c.version = currentVersion + 1;
c.version = version;
chunks.put(c.id, c);
meta.put("chunk." + c.id, c.asString());
......@@ -574,7 +578,7 @@ public class MVStore {
if (m == meta || !m.hasUnsavedChanges()) {
continue;
}
Page p = m.getRoot();
Page p = m.openVersion(storeVersion).getRoot();
if (p.getTotalCount() == 0) {
meta.put("root." + m.getId(), "0");
} else {
......@@ -606,7 +610,7 @@ public class MVStore {
if (m == meta || !m.hasUnsavedChanges()) {
continue;
}
Page p = m.getRoot();
Page p = m.openVersion(storeVersion).getRoot();
if (p.getTotalCount() > 0) {
long root = p.writeUnsavedRecursive(c, buff);
meta.put("root." + m.getId(), "" + root);
......@@ -654,11 +658,11 @@ public class MVStore {
rootChunkStart = filePos;
revertTemp();
long version = incrementVersion();
// write the new version (after the commit)
writeFileHeader();
shrinkFileIfPossible(1);
unsavedPageCount = 0;
// some pages might have been changed in the meantime (in the newest version)
unsavedPageCount = Math.max(0, unsavedPageCount - currentUnsavedPageCount);
return version;
}
......@@ -756,7 +760,7 @@ public class MVStore {
return false;
}
for (MVMap<?, ?> m : mapsChanged.values()) {
if (m.hasUnsavedChanges()) {
if (m == meta || m.hasUnsavedChanges()) {
return true;
}
}
......@@ -867,6 +871,9 @@ public class MVStore {
}
Chunk.fromHeader(buff, chunk.start);
int chunkLength = chunk.length;
// mark a change, even if it doesn't look like there was a change
// as changes in the metadata alone are not detected
markChanged(meta);
while (buff.position() < chunkLength) {
int start = buff.position();
int pageLength = buff.getInt();
......@@ -1161,14 +1168,11 @@ public class MVStore {
if (last != null) {
if (last.version >= version) {
revertTemp();
}
if (last.version > version) {
loadFromFile = true;
while (last != null && last.version > version) {
chunks.remove(lastChunkId);
do {
last = chunks.remove(lastChunkId);
lastChunkId--;
last = chunks.get(lastChunkId);
}
} while (last.version > version && chunks.size() > 0);
rootChunkStart = last.start;
writeFileHeader();
readFileHeader();
......@@ -1183,7 +1187,7 @@ public class MVStore {
if (loadFromFile) {
String r = meta.get("root." + m.getId());
long root = r == null ? 0 : Long.parseLong(r);
m.setRootPos(root);
m.setRootPos(root, version);
}
}
}
......
......@@ -37,7 +37,7 @@ public class Page {
private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
private final MVMap<?, ?> map;
private final long version;
private long version;
private long pos;
private long totalCount;
private int keyCount;
......@@ -886,4 +886,8 @@ public class Page {
return mem;
}
void setVersion(long version) {
this.version = version;
}
}
......@@ -77,7 +77,7 @@ public class ObjectType implements DataType {
static final int TAG_BYTE_ARRAY_0_15 = 104;
/**
* Contants for floating point synchronization.
* Constants for floating point synchronization.
*/
static final int FLOAT_ZERO_BITS = Float.floatToIntBits(0.0f);
static final int FLOAT_ONE_BITS = Float.floatToIntBits(1.0f);
......
......@@ -111,7 +111,8 @@ public class TestMVStore extends TestBase {
map = s.openMap("test");
for (int i = 0; i < 1024; i += 128) {
for (int j = 0; j < i; j++) {
map.get(j);
String x = map.get(j);
assertEquals(10240, x.length());
}
}
assertEquals(expectedReadsForCacheSize[cacheSize],
......@@ -410,11 +411,14 @@ public class TestMVStore extends TestBase {
// concurrently with further modifications)
// this will print Hello World
// System.out.println(oldMap.get(1));
assertEquals("Hello", oldMap.get(1));
// System.out.println(oldMap.get(2));
assertEquals("World", oldMap.get(2));
oldMap.close();
// print the newest version ("Hi")
// System.out.println(map.get(1));
assertEquals("Hi", map.get(1));
// close the store - this doesn't write to disk
s.close();
......@@ -504,12 +508,16 @@ public class TestMVStore extends TestBase {
assertTrue(mOld.isReadOnly());
s.getCurrentVersion();
s.setRetainChunk(0);
long old2 = s.store();
long old3 = s.store();
// the old version is still available
assertEquals("Hello", mOld.get("1"));
assertEquals("World", mOld.get("2"));
mOld = m.openVersion(old3);
assertEquals("Hallo", mOld.get("1"));
assertEquals("Welt", mOld.get("2"));
m.put("1", "Hi");
assertEquals("Welt", m.remove("2"));
s.store();
......@@ -519,7 +527,8 @@ public class TestMVStore extends TestBase {
m = s.openMap("data", String.class, String.class);
assertEquals("Hi", m.get("1"));
assertEquals(null, m.get("2"));
mOld = m.openVersion(old2);
mOld = m.openVersion(old3);
assertEquals("Hallo", mOld.get("1"));
assertEquals("Welt", mOld.get("2"));
s.close();
......@@ -635,23 +644,25 @@ public class TestMVStore extends TestBase {
assertFalse(m0.isReadOnly());
m.put("1", "Hallo");
s.incrementVersion();
assertEquals(3, s.getCurrentVersion());
long v3 = s.getCurrentVersion();
assertEquals(3, v3);
long v4 = s.store();
assertEquals(4, v4);
assertEquals(4, s.getCurrentVersion());
s.close();
s = openStore(fileName);
assertEquals(4, s.getCurrentVersion());
s.setRetainChunk(0);
m = s.openMap("data", String.class, String.class);
m.put("1", "Hello");
m.put("1", "Hi");
s.store();
s.close();
s = openStore(fileName);
s.setRetainChunk(0);
m = s.openMap("data", String.class, String.class);
assertEquals("Hello", m.get("1"));
assertEquals("Hi", m.get("1"));
s.rollbackTo(v4);
assertEquals("Hallo", m.get("1"));
s.close();
......@@ -715,6 +726,10 @@ public class TestMVStore extends TestBase {
data.put("1", "Hello");
data.put("2", "World");
s.store();
assertEquals(1, s.getCurrentVersion());
assertTrue(m.containsKey("chunk.1"));
assertFalse(m.containsKey("chunk.2"));
assertEquals("id:1,name:data,type:btree,createVersion:0,key:,value:",
m.get("map.data"));
assertTrue(m.containsKey("chunk.1"));
......@@ -725,9 +740,14 @@ public class TestMVStore extends TestBase {
assertTrue(m.get("root.1").length() > 0);
assertTrue(m.containsKey("chunk.1"));
assertTrue(m.containsKey("chunk.2"));
assertEquals(2, s.getCurrentVersion());
s.rollbackTo(1);
assertEquals("Hello", data.get("1"));
assertEquals("World", data.get("2"));
assertTrue(m.containsKey("chunk.1"));
assertFalse(m.containsKey("chunk.2"));
s.close();
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论