提交 d61decb7 authored 作者: Thomas Mueller's avatar Thomas Mueller

A concurrent linked list, to replace the array list of old roots (work in progress)

上级 82b73a05
...@@ -53,7 +53,9 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -53,7 +53,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private long createVersion; private long createVersion;
private final DataType keyType; private final DataType keyType;
private final DataType valueType; private final DataType valueType;
private ArrayList<Page> oldRoots = new ArrayList<Page>();
private ConcurrentLinkedList<Page> oldRootsList =
new ConcurrentLinkedList<Page>();
private boolean closed; private boolean closed;
private boolean readOnly; private boolean readOnly;
...@@ -718,22 +720,9 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -718,22 +720,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (root != newRoot) { if (root != newRoot) {
removeUnusedOldVersions(); removeUnusedOldVersions();
if (root.getVersion() != newRoot.getVersion()) { if (root.getVersion() != newRoot.getVersion()) {
// directly append to the list, modifying the list Page last = oldRootsList.peekLast();
// (if somebody concurrently replaces the oldRoots if (last == null || last.getVersion() != root.getVersion()) {
// field, the change is lost, but in many cases this is oldRootsList.add(root);
// detected at the end of this method)
ArrayList<Page> roots = oldRoots;
if (roots.size() > 0) {
Page last = roots.get(roots.size() - 1);
if (last.getVersion() != root.getVersion()) {
roots.add(root);
}
} else {
roots.add(root);
}
if (roots != oldRoots) {
throw DataUtils.newConcurrentModificationException(
getName());
} }
} }
root = newRoot; root = newRoot;
...@@ -977,23 +966,21 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -977,23 +966,21 @@ public class MVMap<K, V> extends AbstractMap<K, V>
void rollbackTo(long version) { void rollbackTo(long version) {
beforeWrite(); beforeWrite();
try { try {
removeUnusedOldVersions();
if (version <= createVersion) { if (version <= createVersion) {
// the map is removed later // the map is removed later
} else if (root.getVersion() >= version) { } else if (root.getVersion() >= version) {
// iterating in descending order - while (true) {
// this is not terribly efficient if there are many versions Page last = oldRootsList.peekLast();
ArrayList<Page> roots = new ArrayList<Page>(oldRoots); if (last == null) {
while (roots.size() > 0) { break;
int i = roots.size() - 1; }
Page p = roots.get(i); // slow, but rollback is not a common operation
root = p; oldRootsList.removeLast(last);
roots.remove(i); root = last;
if (p.getVersion() < version) { if (root.getVersion() < version) {
break; break;
} }
} }
oldRoots = roots;
} }
} finally { } finally {
afterWrite(); afterWrite();
...@@ -1008,35 +995,20 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1008,35 +995,20 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (oldest == -1) { if (oldest == -1) {
return; return;
} }
// operate on a stable array if (oldRootsList.peekFirst() == oldRootsList.peekLast()) {
// (items might be appended concurrently, // do nothing if there is no or only one entry
// but not removed)
ArrayList<Page> roots = oldRoots;
int i = searchRoot(roots, oldest);
if (i < 0) {
i = -i - 1;
}
i--;
if (i <= 0) {
return; return;
} }
// create a new instance
// because another thread might iterate over it
int size = roots.size() - i;
ArrayList<Page> list = new ArrayList<Page>(size);
list.addAll(roots.subList(i, roots.size()));
if (roots != oldRoots) {
throw DataUtils.newConcurrentModificationException(
getName());
}
; ;
// TODO work in progress // TODO why is this?
if(oldRoots.size() - list.size() > 1) { oldest--;
// System.out.println("reduced! from " + while (true) {
// oldRoots.size() + " to " + list.size() +" " + getClass()); Page p = oldRootsList.peekFirst();
if (p == null || p.getVersion() >= oldest) {
break;
}
oldRootsList.removeFirst(p);
} }
oldRoots = list;
} }
public boolean isReadOnly() { public boolean isReadOnly() {
...@@ -1197,20 +1169,20 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1197,20 +1169,20 @@ public class MVMap<K, V> extends AbstractMap<K, V>
store.getFileStore() == null)) { store.getFileStore() == null)) {
newest = r; newest = r;
} else { } else {
// operate on a stable array Page last = oldRootsList.peekFirst();
// (items might be appended concurrently) if (last == null || version < last.getVersion()) {
ArrayList<Page> roots = oldRoots; // smaller than all in-memory versions
// find the newest page that has a getVersion() <= version return store.openMapVersion(version, id, this);
int i = searchRoot(roots, version); }
if (i < 0) { Iterator<Page> it = oldRootsList.iterator();
// not found while (it.hasNext()) {
if (i == -1) { Page p = it.next();
// smaller than all in-memory versions if (p.getVersion() > version) {
return store.openMapVersion(version, id, this); break;
} }
i = -i - 2; last = p;
} }
newest = roots.get(i); newest = last;
} }
MVMap<K, V> m = openReadOnly(); MVMap<K, V> m = openReadOnly();
m.root = newest; m.root = newest;
...@@ -1233,22 +1205,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1233,22 +1205,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return m; return m;
} }
private static int searchRoot(ArrayList<Page> roots, long version) {
int low = 0, high = roots.size() - 1;
while (low <= high) {
int x = (low + high) >>> 1;
long v = roots.get(x).getVersion();
if (v < version) {
low = x + 1;
} else if (version < v) {
high = x - 1;
} else {
return x;
}
}
return -(low + 1);
}
public long getVersion() { public long getVersion() {
return root.getVersion(); return root.getVersion();
} }
......
...@@ -313,6 +313,7 @@ public class Page { ...@@ -313,6 +313,7 @@ public class Page {
public String toString() { public String toString() {
StringBuilder buff = new StringBuilder(); StringBuilder buff = new StringBuilder();
buff.append("id: ").append(System.identityHashCode(this)).append('\n'); buff.append("id: ").append(System.identityHashCode(this)).append('\n');
buff.append("version: ").append(Long.toHexString(version)).append("\n");
buff.append("pos: ").append(Long.toHexString(pos)).append("\n"); buff.append("pos: ").append(Long.toHexString(pos)).append("\n");
if (pos != 0) { if (pos != 0) {
int chunkId = DataUtils.getPageChunkId(pos); int chunkId = DataUtils.getPageChunkId(pos);
......
...@@ -113,6 +113,7 @@ import org.h2.test.store.TestCacheConcurrentLIRS; ...@@ -113,6 +113,7 @@ import org.h2.test.store.TestCacheConcurrentLIRS;
import org.h2.test.store.TestCacheLIRS; import org.h2.test.store.TestCacheLIRS;
import org.h2.test.store.TestCacheLongKeyLIRS; import org.h2.test.store.TestCacheLongKeyLIRS;
import org.h2.test.store.TestConcurrent; import org.h2.test.store.TestConcurrent;
import org.h2.test.store.TestConcurrentLinkedList;
import org.h2.test.store.TestDataUtils; import org.h2.test.store.TestDataUtils;
import org.h2.test.store.TestFreeSpace; import org.h2.test.store.TestFreeSpace;
import org.h2.test.store.TestKillProcessWhileWriting; import org.h2.test.store.TestKillProcessWhileWriting;
...@@ -741,6 +742,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1` ...@@ -741,6 +742,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
new TestCacheLIRS().runTest(this); new TestCacheLIRS().runTest(this);
new TestCacheLongKeyLIRS().runTest(this); new TestCacheLongKeyLIRS().runTest(this);
new TestConcurrent().runTest(this); new TestConcurrent().runTest(this);
new TestConcurrentLinkedList().runTest(this);
new TestDataUtils().runTest(this); new TestDataUtils().runTest(this);
new TestFreeSpace().runTest(this); new TestFreeSpace().runTest(this);
new TestKillProcessWhileWriting().runTest(this); new TestKillProcessWhileWriting().runTest(this);
......
...@@ -1444,6 +1444,7 @@ public class TestMVStore extends TestBase { ...@@ -1444,6 +1444,7 @@ public class TestMVStore extends TestBase {
String fileName = getBaseDir() + "/testRollback.h3"; String fileName = getBaseDir() + "/testRollback.h3";
FileUtils.delete(fileName); FileUtils.delete(fileName);
MVStore s = openStore(fileName, 5); MVStore s = openStore(fileName, 5);
s.setAutoCommitDelay(0);
assertEquals(0, s.getCurrentVersion()); assertEquals(0, s.getCurrentVersion());
MVMap<String, String> m = s.openMap("data"); MVMap<String, String> m = s.openMap("data");
s.rollbackTo(0); s.rollbackTo(0);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论