提交 529db29a authored 作者: Thomas Mueller's avatar Thomas Mueller

A persistent multi-version map - test concurrent iterate and write

上级 e949b40f
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
*/ */
package org.h2.test.store; package org.h2.test.store;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import org.h2.dev.store.btree.MVMap; import org.h2.dev.store.btree.MVMap;
...@@ -27,10 +28,47 @@ public class TestConcurrent extends TestMVStore { ...@@ -27,10 +28,47 @@ public class TestConcurrent extends TestMVStore {
} }
public void test() throws InterruptedException { public void test() throws InterruptedException {
testConcurrentIterate();
testConcurrentWrite(); testConcurrentWrite();
testConcurrentRead(); testConcurrentRead();
} }
private void testConcurrentIterate() {
MVStore s = MVStore.open(null, new TestMapFactory());
s.setMaxPageSize(3);
final MVMap<Integer, Integer> map = s.openMap("test");
final int len = 10;
final Random r = new Random();
Task t = new Task() {
@Override
public void call() throws Exception {
while (!stop) {
int x = r.nextInt(len);
if (r.nextBoolean()) {
map.remove(x);
} else {
map.put(x, r.nextInt(100));
}
}
}
};
t.execute();
for (int k = 0; k < 10000; k++) {
Iterator<Integer> it = map.keyIterator(r.nextInt(len));
long old = s.incrementVersion();
s.setRetainVersion(old - 100);
while (map.getVersion() == old) {
Thread.yield();
}
while (it.hasNext()) {
it.next();
}
}
t.get();
s.close();
}
/** /**
* Test what happens on concurrent write. Concurrent write may corrupt the * Test what happens on concurrent write. Concurrent write may corrupt the
* map, so that keys and values may become null. * map, so that keys and values may become null.
......
...@@ -31,6 +31,7 @@ public class TestMVStore extends TestBase { ...@@ -31,6 +31,7 @@ public class TestMVStore extends TestBase {
} }
public void test() throws InterruptedException { public void test() throws InterruptedException {
testIterateOldVersion();
testObjects(); testObjects();
testExample(); testExample();
testIterateOverChanges(); testIterateOverChanges();
...@@ -52,6 +53,29 @@ public class TestMVStore extends TestBase { ...@@ -52,6 +53,29 @@ public class TestMVStore extends TestBase {
testSimple(); testSimple();
} }
private void testIterateOldVersion() {
MVStore s;
Map<Integer, Integer> map;
s = MVStore.open(null, new TestMapFactory());
map = s.openMap("test");
int len = 100;
for (int i = 0; i < len; i++) {
map.put(i, 10 * i);
}
Iterator<Integer> it = map.keySet().iterator();
s.incrementVersion();
for (int i = 0; i < len; i += 2) {
map.remove(i);
}
int count = 0;
while (it.hasNext()) {
it.next();
count++;
}
assertEquals(len, count);
s.close();
}
private void testObjects() { private void testObjects() {
String fileName = getBaseDir() + "/testObjects.h3"; String fileName = getBaseDir() + "/testObjects.h3";
FileUtils.delete(fileName); FileUtils.delete(fileName);
......
...@@ -16,8 +16,8 @@ public class ChangeCursor<K, V> extends Cursor<K, V> { ...@@ -16,8 +16,8 @@ public class ChangeCursor<K, V> extends Cursor<K, V> {
private final long minVersion; private final long minVersion;
ChangeCursor(MVMap<K, V> map, long minVersion) { ChangeCursor(MVMap<K, V> map, Page root, K from, long minVersion) {
super(map); super(map, root, from);
this.minVersion = minVersion; this.minVersion = minVersion;
} }
......
...@@ -18,28 +18,22 @@ import java.util.Iterator; ...@@ -18,28 +18,22 @@ import java.util.Iterator;
public class Cursor<K, V> implements Iterator<K> { public class Cursor<K, V> implements Iterator<K> {
protected final MVMap<K, V> map; protected final MVMap<K, V> map;
protected final ArrayList<CursorPos> parents = new ArrayList<CursorPos>(); protected final Page root;
protected final K from;
protected ArrayList<CursorPos> parents;
protected CursorPos currentPos; protected CursorPos currentPos;
protected K current; protected K current;
Cursor(MVMap<K, V> map) { Cursor(MVMap<K, V> map, Page root, K from) {
this.map = map; this.map = map;
} this.root = root;
this.from = from;
/**
* Fetch the first key.
*
* @param root the root page
* @param from the key, or null
*/
void start(Page root, K from) {
currentPos = min(root, from);
if (currentPos != null) {
fetchNext();
}
} }
public K next() { public K next() {
if (!hasNext()) {
return null;
}
K c = current; K c = current;
if (c != null) { if (c != null) {
fetchNext(); fetchNext();
...@@ -56,6 +50,14 @@ public class Cursor<K, V> implements Iterator<K> { ...@@ -56,6 +50,14 @@ public class Cursor<K, V> implements Iterator<K> {
} }
public boolean hasNext() { public boolean hasNext() {
if (parents == null) {
// not initialized yet: fetch the first key
parents = new ArrayList<CursorPos>();
currentPos = min(root, from);
if (currentPos != null) {
fetchNext();
}
}
return current != null; return current != null;
} }
......
...@@ -28,9 +28,9 @@ public class MVMap<K, V> extends AbstractMap<K, V> { ...@@ -28,9 +28,9 @@ public class MVMap<K, V> extends AbstractMap<K, V> {
protected final MVStore store; protected final MVStore store;
/** /**
* The root page (may not be null). * The current root page (may not be null).
*/ */
protected Page root; protected volatile Page root;
private final int id; private final int id;
private final String name; private final String name;
...@@ -291,7 +291,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> { ...@@ -291,7 +291,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> {
public void close() { public void close() {
closed = true; closed = true;
readOnly = true; readOnly = true;
clearOldVersions(); removeAllOldVersions();
root = null; root = null;
} }
...@@ -366,6 +366,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> { ...@@ -366,6 +366,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> {
protected void setRoot(Page newRoot) { protected void setRoot(Page newRoot) {
if (root != newRoot) { if (root != newRoot) {
removeUnusedOldVersions();
if (root.getVersion() != newRoot.getVersion()) { if (root.getVersion() != newRoot.getVersion()) {
ArrayList<Page> list = oldRoots; ArrayList<Page> list = oldRoots;
if (list.size() > 0) { if (list.size() > 0) {
...@@ -447,9 +448,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> { ...@@ -447,9 +448,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> {
*/ */
public Iterator<K> keyIterator(K from) { public Iterator<K> keyIterator(K from) {
checkOpen(); checkOpen();
Cursor<K, V> c = new Cursor<K, V>(this); return new Cursor<K, V>(this, root, from);
c.start(root, from);
return c;
} }
/** /**
...@@ -461,9 +460,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> { ...@@ -461,9 +460,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> {
*/ */
public Iterator<K> changeIterator(long minVersion) { public Iterator<K> changeIterator(long minVersion) {
checkOpen(); checkOpen();
Cursor<K, V> c = new ChangeCursor<K, V>(this, minVersion); return new ChangeCursor<K, V>(this, root, null, minVersion);
c.start(root, null);
return c;
} }
public Set<Map.Entry<K, V>> entrySet() { public Set<Map.Entry<K, V>> entrySet() {
...@@ -481,9 +478,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> { ...@@ -481,9 +478,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> {
@Override @Override
public Iterator<K> iterator() { public Iterator<K> iterator() {
Cursor<K, V> c = new Cursor<K, V>(MVMap.this); return new Cursor<K, V>(MVMap.this, root, null);
c.start(root, null);
return c;
} }
@Override @Override
...@@ -532,6 +527,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> { ...@@ -532,6 +527,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> {
*/ */
void rollbackTo(long version) { void rollbackTo(long version) {
checkWrite(); checkWrite();
removeUnusedOldVersions();
if (version < createVersion) { if (version < createVersion) {
removeMap(); removeMap();
} else if (root.getVersion() != version) { } else if (root.getVersion() != version) {
...@@ -553,12 +549,39 @@ public class MVMap<K, V> extends AbstractMap<K, V> { ...@@ -553,12 +549,39 @@ public class MVMap<K, V> extends AbstractMap<K, V> {
/** /**
* Forget all old versions. * Forget all old versions.
*/ */
void clearOldVersions() { void removeAllOldVersions() {
// create a new instance // create a new instance
// because another thread might iterate over it // because another thread might iterate over it
oldRoots = new ArrayList<Page>(); oldRoots = new ArrayList<Page>();
} }
/**
* Forget those old versions that are no longer needed.
*/
void removeUnusedOldVersions() {
long oldest = store.getRetainVersion();
if (oldest == -1) {
return;
}
ArrayList<Page> list = oldRoots;
int i = 0;
// TODO iterate over the list is inefficient
for (; i < list.size(); i++) {
Page p = list.get(i);
if (p.getVersion() > oldest) {
break;
}
}
if (i == 0) {
return;
}
// create a new instance
// because another thread might iterate over it
list = new ArrayList<Page>();
list.addAll(oldRoots.subList(i, oldRoots.size()));
oldRoots = list;
}
public void setReadOnly(boolean readOnly) { public void setReadOnly(boolean readOnly) {
this.readOnly = readOnly; this.readOnly = readOnly;
} }
...@@ -671,4 +694,8 @@ public class MVMap<K, V> extends AbstractMap<K, V> { ...@@ -671,4 +694,8 @@ public class MVMap<K, V> extends AbstractMap<K, V> {
return m; return m;
} }
public long getVersion() {
return root.getVersion();
}
} }
...@@ -35,7 +35,6 @@ header: ...@@ -35,7 +35,6 @@ header:
H:3,blockSize=4096,... H:3,blockSize=4096,...
TODO: TODO:
- concurrent iterator (when to increment version; read on first hasNext())
- how to iterate (just) over deleted pages / entries - how to iterate (just) over deleted pages / entries
- compact: use total max length instead of page count (liveCount) - compact: use total max length instead of page count (liveCount)
- support background writes (store old version) - support background writes (store old version)
...@@ -62,6 +61,7 @@ TODO: ...@@ -62,6 +61,7 @@ TODO:
- allocate memory Utils.newBytes - allocate memory Utils.newBytes
- unified exception handling - unified exception handling
- check if locale specific string comparison can make data disappear - check if locale specific string comparison can make data disappear
- concurrent map; avoid locking during IO (pre-load pages)
*/ */
...@@ -113,6 +113,7 @@ public class MVStore { ...@@ -113,6 +113,7 @@ public class MVStore {
private int lastMapId; private int lastMapId;
private boolean reuseSpace = true; private boolean reuseSpace = true;
private long retainVersion = -1;
private int retainChunk = -1; private int retainChunk = -1;
private Compressor compressor = new CompressLZF(); private Compressor compressor = new CompressLZF();
...@@ -976,6 +977,19 @@ public class MVStore { ...@@ -976,6 +977,19 @@ public class MVStore {
this.retainChunk = retainChunk; this.retainChunk = retainChunk;
} }
/**
* Which version to retain. If not set, all versions up to the last stored version are retained.
*
* @param retainVersion the oldest version to retain
*/
public void setRetainVersion(long retainVersion) {
this.retainVersion = retainVersion;
}
public long getRetainVersion() {
return retainVersion;
}
private boolean isKnownVersion(long version) { private boolean isKnownVersion(long version) {
if (version > currentVersion || version < 0) { if (version > currentVersion || version < 0) {
return false; return false;
...@@ -1067,7 +1081,7 @@ public class MVStore { ...@@ -1067,7 +1081,7 @@ public class MVStore {
private void revertTemp() { private void revertTemp() {
freedChunks.clear(); freedChunks.clear();
for (MVMap<?, ?> m : mapsChanged.values()) { for (MVMap<?, ?> m : mapsChanged.values()) {
m.clearOldVersions(); m.removeAllOldVersions();
} }
mapsChanged.clear(); mapsChanged.clear();
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论