提交 337f0a4f authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: support for concurrent reads and writes is now enabled by default.

上级 a17b66d1
...@@ -8,7 +8,6 @@ package org.h2.mvstore; ...@@ -8,7 +8,6 @@ package org.h2.mvstore;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
/** /**
* A very simple array list that supports concurrent access. * A very simple array list that supports concurrent access.
* Internally, it uses immutable objects. * Internally, it uses immutable objects.
......
...@@ -21,6 +21,15 @@ import org.h2.util.New; ...@@ -21,6 +21,15 @@ import org.h2.util.New;
/** /**
* A stored map. * A stored map.
* <p>
* Read operations can happen concurrently with all other
* operations, without risk of corruption.
* <p>
* Write operations first read the relevant area from disk to memory
* concurrently, and only then modify the data. The in-memory part of write
* operations is synchronized. For scalable concurrent in-memory write
* operations, the map should be split into multiple smaller sub-maps that are
* then synchronized independently.
* *
* @param <K> the key class * @param <K> the key class
* @param <V> the value class * @param <V> the value class
...@@ -43,11 +52,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -43,11 +52,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/ */
protected volatile long writeVersion; protected volatile long writeVersion;
/**
* This version is set during a write operation.
*/
protected volatile long currentWriteVersion = -1;
private int id; private int id;
private long createVersion; private long createVersion;
private final DataType keyType; private final DataType keyType;
...@@ -99,21 +103,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -99,21 +103,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.writeVersion = store.getCurrentVersion(); this.writeVersion = store.getCurrentVersion();
} }
/**
* Create a copy of a page, if the write version is higher than the current
* version. If a copy is created, the old page is marked as deleted.
*
* @param p the page
* @param writeVersion the write version
* @return a page with the given write version
*/
protected Page copyOnWrite(Page p, long writeVersion) {
if (p.getVersion() == writeVersion) {
return p;
}
return p.copy(writeVersion);
}
/** /**
* Add or replace a key-value pair. * Add or replace a key-value pair.
* *
...@@ -126,16 +115,12 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -126,16 +115,12 @@ public class MVMap<K, V> extends AbstractMap<K, V>
public synchronized V put(K key, V value) { public synchronized V put(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null"); DataUtils.checkArgument(value != null, "The value may not be null");
beforeWrite(); beforeWrite();
try {
long v = writeVersion; long v = writeVersion;
Page p = copyOnWrite(root, v); Page p = root.copy(v);
p = splitRootIfNeeded(p, v); p = splitRootIfNeeded(p, v);
Object result = put(p, v, key, value); Object result = put(p, v, key, value);
newRoot(p); newRoot(p);
return (V) result; return (V) result;
} finally {
afterWrite();
}
} }
/** /**
...@@ -155,14 +140,13 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -155,14 +140,13 @@ public class MVMap<K, V> extends AbstractMap<K, V>
Page split = p.split(at); Page split = p.split(at);
Object[] keys = { k }; Object[] keys = { k };
Page.PageReference[] children = { Page.PageReference[] children = {
new Page.PageReference(p, p.getPos()), new Page.PageReference(p, p.getPos(), p.getTotalCount()),
new Page.PageReference(split, split.getPos()), new Page.PageReference(split, split.getPos(), split.getTotalCount()),
}; };
long[] counts = { p.getTotalCount(), split.getTotalCount() };
p = Page.create(this, writeVersion, p = Page.create(this, writeVersion,
1, keys, null, keys, null,
2, children, counts, children,
totalCount, 0, 0); totalCount, 0);
return p; return p;
} }
...@@ -191,21 +175,19 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -191,21 +175,19 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} else { } else {
index++; index++;
} }
Page c = copyOnWrite(p.getChildPage(index), writeVersion); Page c = p.getChildPage(index).copy(writeVersion);
if (c.getMemory() > store.getPageSplitSize() && c.getKeyCount() > 1) { if (c.getMemory() > store.getPageSplitSize() && c.getKeyCount() > 1) {
// split on the way down // split on the way down
int at = c.getKeyCount() / 2; int at = c.getKeyCount() / 2;
Object k = c.getKey(at); Object k = c.getKey(at);
Page split = c.split(at); Page split = c.split(at);
p.setChild(index, split); p.setChild(index, split);
p.setCounts(index, split);
p.insertNode(index, k, c); p.insertNode(index, k, c);
// now we are not sure where to add // now we are not sure where to add
return put(p, writeVersion, key, value); return put(p, writeVersion, key, value);
} }
p.setChild(index, c);
Object result = put(c, writeVersion, key, value); Object result = put(c, writeVersion, key, value);
p.setCounts(index, c); p.setChild(index, c);
return result; return result;
} }
...@@ -510,12 +492,8 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -510,12 +492,8 @@ public class MVMap<K, V> extends AbstractMap<K, V>
@Override @Override
public synchronized void clear() { public synchronized void clear() {
beforeWrite(); beforeWrite();
try {
root.removeAllRecursive(); root.removeAllRecursive();
newRoot(Page.createEmpty(this, writeVersion)); newRoot(Page.createEmpty(this, writeVersion));
} finally {
afterWrite();
}
} }
/** /**
...@@ -537,22 +515,24 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -537,22 +515,24 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the old value if the key existed, or null otherwise * @return the old value if the key existed, or null otherwise
*/ */
@Override @Override
public synchronized V remove(Object key) { @SuppressWarnings("unchecked")
public V remove(Object key) {
beforeWrite(); beforeWrite();
try { V result = get(key);
if (result == null) {
return null;
}
long v = writeVersion; long v = writeVersion;
Page p = copyOnWrite(root, v); synchronized (this) {
@SuppressWarnings("unchecked") Page p = root.copy(v);
V result = (V) remove(p, v, key); result = (V) remove(p, v, key);
if (!p.isLeaf() && p.getTotalCount() == 0) { if (!p.isLeaf() && p.getTotalCount() == 0) {
p.removePage(); p.removePage();
p = Page.createEmpty(this, p.getVersion()); p = Page.createEmpty(this, p.getVersion());
} }
newRoot(p); newRoot(p);
return result;
} finally {
afterWrite();
} }
return result;
} }
/** /**
...@@ -664,18 +644,16 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -664,18 +644,16 @@ public class MVMap<K, V> extends AbstractMap<K, V>
index++; index++;
} }
Page cOld = p.getChildPage(index); Page cOld = p.getChildPage(index);
Page c = copyOnWrite(cOld, writeVersion); Page c = cOld.copy(writeVersion);
result = remove(c, writeVersion, key); result = remove(c, writeVersion, key);
if (result == null || c.getTotalCount() != 0) { if (result == null || c.getTotalCount() != 0) {
// no change, or // no change, or
// there are more nodes // there are more nodes
p.setChild(index, c); p.setChild(index, c);
p.setCounts(index, c);
} else { } else {
// this child was deleted // this child was deleted
if (p.getKeyCount() == 0) { if (p.getKeyCount() == 0) {
p.setChild(index, c); p.setChild(index, c);
p.setCounts(index, c);
c.removePage(); c.removePage();
} else { } else {
p.remove(index); p.remove(index);
...@@ -975,7 +953,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -975,7 +953,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/ */
void rollbackTo(long version) { void rollbackTo(long version) {
beforeWrite(); beforeWrite();
try {
if (version <= createVersion) { if (version <= createVersion) {
// the map is removed later // the map is removed later
} else if (root.getVersion() >= version) { } else if (root.getVersion() >= version) {
...@@ -992,9 +969,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -992,9 +969,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
} }
} }
} finally {
afterWrite();
}
} }
/** /**
...@@ -1056,45 +1030,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1056,45 +1030,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
throw DataUtils.newUnsupportedOperationException( throw DataUtils.newUnsupportedOperationException(
"This map is read-only"); "This map is read-only");
} }
checkConcurrentWrite();
store.beforeWrite(); store.beforeWrite();
currentWriteVersion = writeVersion;
}
/**
* Check that no write operation is in progress.
*/
protected void checkConcurrentWrite() {
if (currentWriteVersion != -1) {
// try to detect concurrent modification
// on a best-effort basis
throw DataUtils.newConcurrentModificationException(getName());
}
}
/**
* This method is called after writing to the map (whether or not the write
* operation was successful).
*/
protected void afterWrite() {
currentWriteVersion = -1;
}
/**
* If there is a concurrent update to the given version, wait until it is
* finished.
*
* @param version the read version
*/
protected void waitUntilWritten(long version) {
if (readOnly) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL,
"Waiting for writes to a read-only map");
}
while (currentWriteVersion == version) {
Thread.yield();
}
} }
@Override @Override
...@@ -1267,7 +1203,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1267,7 +1203,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
void copyFrom(MVMap<K, V> sourceMap) { void copyFrom(MVMap<K, V> sourceMap) {
beforeWrite(); beforeWrite();
newRoot(copy(sourceMap.root, null)); newRoot(copy(sourceMap.root, null));
afterWrite();
} }
private Page copy(Page source, CursorPos parent) { private Page copy(Page source, CursorPos parent) {
...@@ -1276,11 +1211,10 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1276,11 +1211,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
Page child = target; Page child = target;
for (CursorPos p = parent; p != null; p = p.parent) { for (CursorPos p = parent; p != null; p = p.parent) {
p.page.setChild(p.index, child); p.page.setChild(p.index, child);
p.page = copyOnWrite(p.page, writeVersion); p.page = p.page.copy(writeVersion);
child = p.page; child = p.page;
if (p.parent == null) { if (p.parent == null) {
newRoot(p.page); newRoot(p.page);
afterWrite();
beforeWrite(); beforeWrite();
} }
} }
......
...@@ -9,17 +9,10 @@ import org.h2.mvstore.type.DataType; ...@@ -9,17 +9,10 @@ import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.ObjectDataType; import org.h2.mvstore.type.ObjectDataType;
/** /**
* A stored map. Read operations can happen concurrently with all other * A class used for backward compatibility.
* operations, without risk of corruption.
* <p>
* Write operations first read the relevant area from disk to memory
* concurrently, and only then modify the data. The in-memory part of write
* operations is synchronized. For scalable concurrent in-memory write
* operations, the map should be split into multiple smaller sub-maps that are
* then synchronized independently.
* *
* @param <K> the key class * @param <K> the key type
* @param <V> the value class * @param <V> the value type
*/ */
public class MVMapConcurrent<K, V> extends MVMap<K, V> { public class MVMapConcurrent<K, V> extends MVMap<K, V> {
...@@ -27,67 +20,6 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> { ...@@ -27,67 +20,6 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> {
super(keyType, valueType); super(keyType, valueType);
} }
@Override
protected Page copyOnWrite(Page p, long writeVersion) {
return p.copy(writeVersion);
}
@Override
protected void checkConcurrentWrite() {
// ignore (writes are synchronized)
}
@Override
@SuppressWarnings("unchecked")
public V put(K key, V value) {
beforeWrite();
try {
// even if the result is the same, we still update the value
// (otherwise compact doesn't work)
get(key);
long v = writeVersion;
synchronized (this) {
Page p = copyOnWrite(root, v);
p = splitRootIfNeeded(p, v);
V result = (V) put(p, v, key, value);
newRoot(p);
return result;
}
} finally {
afterWrite();
}
}
@Override
protected void waitUntilWritten(long version) {
// no need to wait
}
@Override
@SuppressWarnings("unchecked")
public V remove(Object key) {
beforeWrite();
try {
V result = get(key);
if (result == null) {
return null;
}
long v = writeVersion;
synchronized (this) {
Page p = copyOnWrite(root, v);
result = (V) remove(p, v, key);
if (!p.isLeaf() && p.getTotalCount() == 0) {
p.removePage();
p = Page.createEmpty(this, p.getVersion());
}
newRoot(p);
}
return result;
} finally {
afterWrite();
}
}
/** /**
* A builder for this class. * A builder for this class.
* *
......
...@@ -190,7 +190,7 @@ public class MVStore { ...@@ -190,7 +190,7 @@ public class MVStore {
* The metadata map. Write access to this map needs to be synchronized on * The metadata map. Write access to this map needs to be synchronized on
* the store. * the store.
*/ */
private MVMapConcurrent<String, String> meta; private MVMap<String, String> meta;
private final ConcurrentHashMap<Integer, MVMap<?, ?>> maps = private final ConcurrentHashMap<Integer, MVMap<?, ?>> maps =
new ConcurrentHashMap<Integer, MVMap<?, ?>>(); new ConcurrentHashMap<Integer, MVMap<?, ?>>();
...@@ -283,7 +283,7 @@ public class MVStore { ...@@ -283,7 +283,7 @@ public class MVStore {
} }
o = config.get("backgroundExceptionHandler"); o = config.get("backgroundExceptionHandler");
this.backgroundExceptionHandler = (UncaughtExceptionHandler) o; this.backgroundExceptionHandler = (UncaughtExceptionHandler) o;
meta = new MVMapConcurrent<String, String>(StringDataType.INSTANCE, meta = new MVMap<String, String>(StringDataType.INSTANCE,
StringDataType.INSTANCE); StringDataType.INSTANCE);
HashMap<String, Object> c = New.hashMap(); HashMap<String, Object> c = New.hashMap();
c.put("id", 0); c.put("id", 0);
...@@ -1010,7 +1010,6 @@ public class MVStore { ...@@ -1010,7 +1010,6 @@ public class MVStore {
continue; continue;
} }
if (v >= 0 && v >= lastStoredVersion) { if (v >= 0 && v >= lastStoredVersion) {
m.waitUntilWritten(storeVersion);
MVMap<?, ?> r = m.openVersion(storeVersion); MVMap<?, ?> r = m.openVersion(storeVersion);
if (r.getRoot().getPos() == 0) { if (r.getRoot().getPos() == 0) {
changed.add(r); changed.add(r);
......
...@@ -121,7 +121,7 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -121,7 +121,7 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
} }
@Override @Override
protected Object remove(Page p, long writeVersion, Object key) { protected synchronized Object remove(Page p, long writeVersion, Object key) {
Object result = null; Object result = null;
if (p.isLeaf()) { if (p.isLeaf()) {
for (int i = 0; i < p.getKeyCount(); i++) { for (int i = 0; i < p.getKeyCount(); i++) {
...@@ -139,11 +139,10 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -139,11 +139,10 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
// this will mark the old page as deleted // this will mark the old page as deleted
// so we need to update the parent in any case // so we need to update the parent in any case
// (otherwise the old page might be deleted again) // (otherwise the old page might be deleted again)
Page c = copyOnWrite(cOld, writeVersion); Page c = cOld.copy(writeVersion);
long oldSize = c.getTotalCount(); long oldSize = c.getTotalCount();
result = remove(c, writeVersion, key); result = remove(c, writeVersion, key);
p.setChild(i, c); p.setChild(i, c);
p.setCounts(i, c);
if (oldSize == c.getTotalCount()) { if (oldSize == c.getTotalCount()) {
continue; continue;
} }
...@@ -190,15 +189,14 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -190,15 +189,14 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
putOrAdd(key, value, true); putOrAdd(key, value, true);
} }
private Object putOrAdd(SpatialKey key, V value, boolean alwaysAdd) { private synchronized Object putOrAdd(SpatialKey key, V value, boolean alwaysAdd) {
beforeWrite(); beforeWrite();
try {
long v = writeVersion; long v = writeVersion;
Page p = copyOnWrite(root, v); Page p = root.copy(v);
Object result; Object result;
if (alwaysAdd || get(key) == null) { if (alwaysAdd || get(key) == null) {
if (p.getMemory() > store.getPageSplitSize() && if (p.getMemory() > store.getPageSplitSize() &&
p.getKeyCount() > 1) { p.getKeyCount() > 3) {
// only possible if this is the root, else we would have // only possible if this is the root, else we would have
// split earlier (this requires pageSplitSize is fixed) // split earlier (this requires pageSplitSize is fixed)
long totalCount = p.getTotalCount(); long totalCount = p.getTotalCount();
...@@ -207,16 +205,14 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -207,16 +205,14 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
Object k2 = getBounds(split); Object k2 = getBounds(split);
Object[] keys = { k1, k2 }; Object[] keys = { k1, k2 };
Page.PageReference[] children = { Page.PageReference[] children = {
new Page.PageReference(p, p.getPos()), new Page.PageReference(p, p.getPos(), p.getTotalCount()),
new Page.PageReference(split, split.getPos()), new Page.PageReference(split, split.getPos(), split.getTotalCount()),
new Page.PageReference(null, 0) new Page.PageReference(null, 0, 0)
}; };
long[] counts = { p.getTotalCount(),
split.getTotalCount(), 0 };
p = Page.create(this, v, p = Page.create(this, v,
2, keys, null, keys, null,
3, children, counts, children,
totalCount, 0, 0); totalCount, 0);
// now p is a node; continues // now p is a node; continues
} }
add(p, v, key, value); add(p, v, key, value);
...@@ -226,9 +222,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -226,9 +222,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
} }
newRoot(p); newRoot(p);
return result; return result;
} finally {
afterWrite();
}
} }
/** /**
...@@ -252,10 +245,9 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -252,10 +245,9 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
if (contains(p, i, key)) { if (contains(p, i, key)) {
Page c = p.getChildPage(i); Page c = p.getChildPage(i);
if (get(c, key) != null) { if (get(c, key) != null) {
c = copyOnWrite(c, writeVersion); c = c.copy(writeVersion);
Object result = set(c, writeVersion, key, value); Object result = set(c, writeVersion, key, value);
p.setChild(i, c); p.setChild(i, c);
p.setCounts(i, c);
return result; return result;
} }
} }
...@@ -290,14 +282,12 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -290,14 +282,12 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
} }
} }
} }
Page c = copyOnWrite(p.getChildPage(index), writeVersion); Page c = p.getChildPage(index).copy(writeVersion);
if (c.getMemory() > store.getPageSplitSize() && c.getKeyCount() > 1) { if (c.getMemory() > store.getPageSplitSize() && c.getKeyCount() > 4) {
// split on the way down // split on the way down
Page split = split(c, writeVersion); Page split = split(c, writeVersion);
p = copyOnWrite(p, writeVersion);
p.setKey(index, getBounds(c)); p.setKey(index, getBounds(c));
p.setChild(index, c); p.setChild(index, c);
p.setCounts(index, c);
p.insertNode(index, getBounds(split), split); p.insertNode(index, getBounds(split), split);
// now we are not sure where to add // now we are not sure where to add
add(p, writeVersion, key, value); add(p, writeVersion, key, value);
...@@ -308,7 +298,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -308,7 +298,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
keyType.increaseBounds(bounds, key); keyType.increaseBounds(bounds, key);
p.setKey(index, bounds); p.setKey(index, bounds);
p.setChild(index, c); p.setChild(index, c);
p.setCounts(index, c);
} }
private Page split(Page p, long writeVersion) { private Page split(Page p, long writeVersion) {
...@@ -412,20 +401,17 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -412,20 +401,17 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
private Page newPage(boolean leaf, long writeVersion) { private Page newPage(boolean leaf, long writeVersion) {
Object[] values; Object[] values;
Page.PageReference[] refs; Page.PageReference[] refs;
long[] c;
if (leaf) { if (leaf) {
values = new Object[4]; values = Page.EMPTY_OBJECT_ARRAY;
refs = null; refs = null;
c = null;
} else { } else {
values = null; values = null;
refs = new Page.PageReference[] { refs = new Page.PageReference[] {
new Page.PageReference(null, 0)}; new Page.PageReference(null, 0, 0)};
c = new long[1];
} }
return Page.create(this, writeVersion, return Page.create(this, writeVersion,
0, new Object[4], values, Page.EMPTY_OBJECT_ARRAY, values,
leaf ? 0 : 1, refs, c, 0, 0, 0); refs, 0, 0);
} }
private static void move(Page source, Page target, int sourceIndex) { private static void move(Page source, Page target, int sourceIndex) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论