提交 6ab7643f authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: a concurrent map implementation

上级 431505b4
......@@ -235,22 +235,31 @@ for spatial operations (contain and intersection; nearest neighbor is not yet im
<h3>Concurrent Operations and Caching</h3>
<p>
At the moment, concurrent read on old versions of the data is supported.
The default map implementation supports concurrent reads on old versions of the data.
All such read operations can occur in parallel. Concurrent reads from the page cache,
as well as concurrent reads from the file system are supported.
</p><p>
Storing changes can occur concurrently to modifying the data,
as <code>store()</code> operates on a snapshot.
</p><p>
Caching is done on the page level.
The page cache is a concurrent LIRS cache,
which should be resistant against scan operations.
The page cache is a concurrent LIRS cache, which should be resistant against scan operations.
</p><p>
Concurrent modification operations on a map are currently not supported
(the same as <code>HashMap</code> and <code>TreeMap</code>),
however it is planned to support an additional map implementation
that supports concurrent writes
(at the cost of speed if used in a single thread, same as <code>ConcurrentHashMap</code>).
The default map implementation does not support concurrent modification
operations on a map (the same as <code>HashMap</code> and <code>TreeMap</code>).
</p><p>
Storing changes can occur concurrently to modifying the data,
as <code>store()</code> operates on a snapshot.
With the <code>MVMapConcurrent</code> implementation,
read operations even on the newest version can happen concurrently with all other
operations, without risk of corruption.
This comes with slightly reduced speed in single threaded mode,
the same as with other <code>ConcurrentHashMap</code> implementations.
Write operations first read the relevant area from disk to memory
(this can happen concurrently), and only then modify the data. The in-memory part of write
operations is synchronized.
</p><p>
For fully scalable concurrent write operations to a map (in-memory and to disk),
the map could be split into multiple maps in different stores ('sharding').
The plan is to add such a mechanism later when needed.
</p>
<h3>Log Structured Storage</h3>
......
......@@ -67,6 +67,21 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.createVersion = Long.parseLong(config.get("createVersion"));
}
/**
* Create a copy of a page, if the write version is higher than the current
* version.
*
* @param p the page
* @param writeVersion the write version
* @return a page with the given write version
*/
protected Page copyOnWrite(Page p, long writeVersion) {
if (p.getVersion() == writeVersion) {
return p;
}
return p.copy(writeVersion);
}
/**
* Add or replace a key-value pair.
*
......@@ -78,26 +93,31 @@ public class MVMap<K, V> extends AbstractMap<K, V>
public V put(K key, V value) {
checkWrite();
long writeVersion = store.getCurrentVersion();
Page p = root.copyOnWrite(writeVersion);
if (p.getMemory() > store.getPageSize() && p.getKeyCount() > 1) {
int at = p.getKeyCount() / 2;
long totalCount = p.getTotalCount();
Object k = p.getKey(at);
Page split = p.split(at);
Object[] keys = { k };
long[] children = { p.getPos(), split.getPos() };
Page[] childrenPages = { p, split };
long[] counts = { p.getTotalCount(), split.getTotalCount() };
p = Page.create(this, writeVersion, 1,
keys, null, children, childrenPages, counts, totalCount, 0, 0);
store.registerUnsavedPage();
// now p is a node; insert continues
}
Page p = copyOnWrite(root, writeVersion);
p = splitRootIfNeeded(p, writeVersion);
Object result = put(p, writeVersion, key, value);
newRoot(p);
return (V) result;
}
protected Page splitRootIfNeeded(Page p, long writeVersion) {
if (p.getMemory() <= store.getPageSize() || p.getKeyCount() <= 1) {
return p;
}
int at = p.getKeyCount() / 2;
long totalCount = p.getTotalCount();
Object k = p.getKey(at);
Page split = p.split(at);
Object[] keys = { k };
long[] children = { p.getPos(), split.getPos() };
Page[] childrenPages = { p, split };
long[] counts = { p.getTotalCount(), split.getTotalCount() };
p = Page.create(this, writeVersion, 1,
keys, null, children, childrenPages, counts, totalCount, 0, 0);
store.registerUnsavedPage();
return p;
}
/**
* Add or update a key-value pair.
*
......@@ -108,8 +128,8 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the old value, or null
*/
protected Object put(Page p, long writeVersion, Object key, Object value) {
int index = p.binarySearch(key);
if (p.isLeaf()) {
int index = p.binarySearch(key);
if (index < 0) {
index = -index - 1;
p.insertLeaf(index, key, value);
......@@ -118,25 +138,26 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return p.setValue(index, value);
}
// p is a node
int index = p.binarySearch(key);
if (index < 0) {
index = -index - 1;
} else {
index++;
}
Page c = p.getChildPage(index).copyOnWrite(writeVersion);
Page c = copyOnWrite(p.getChildPage(index), writeVersion);
if (c.getMemory() > store.getPageSize() && c.getKeyCount() > 1) {
// split on the way down
int at = c.getKeyCount() / 2;
Object k = c.getKey(at);
Page split = c.split(at);
p.setChild(index, split);
p.setCounts(index, c);
p.insertNode(index, k, c);
// now we are not sure where to add
return put(p, writeVersion, key, value);
}
Object result = put(c, writeVersion, key, value);
p.setChild(index, c);
Object result = put(c, writeVersion, key, value);
p.setCounts(index, c);
return result;
}
......@@ -243,14 +264,13 @@ public class MVMap<K, V> extends AbstractMap<K, V>
Page p = root;
long offset = 0;
while (true) {
int x = p.binarySearch(key);
if (p.isLeaf()) {
int x = p.binarySearch(key);
if (x < 0) {
return -offset + x;
}
return offset + x;
}
int x = key == null ? -1 : p.binarySearch(key);
if (x < 0) {
x = -x - 1;
} else {
......@@ -264,7 +284,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
@SuppressWarnings("unchecked")
private K getFirstLast(boolean first) {
protected K getFirstLast(boolean first) {
checkOpen();
if (size() == 0) {
return null;
......@@ -320,7 +340,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return getMinMax(key, true, true);
}
private K getMinMax(K key, boolean min, boolean excluding) {
protected K getMinMax(K key, boolean min, boolean excluding) {
checkOpen();
if (size() == 0) {
return null;
......@@ -487,7 +507,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
public V remove(Object key) {
checkWrite();
long writeVersion = store.getCurrentVersion();
Page p = root.copyOnWrite(writeVersion);
Page p = copyOnWrite(root, writeVersion);
@SuppressWarnings("unchecked")
V result = (V) remove(p, writeVersion, key);
newRoot(p);
......@@ -586,22 +606,23 @@ public class MVMap<K, V> extends AbstractMap<K, V>
index++;
}
Page cOld = p.getChildPage(index);
Page c = cOld.copyOnWrite(writeVersion);
long oldCount = c.getTotalCount();
Page c = copyOnWrite(cOld, writeVersion);
result = remove(c, writeVersion, key);
if (oldCount == c.getTotalCount()) {
if (result == null) {
return null;
}
if (c.getTotalCount() == 0) {
// this child was deleted
if (p.getKeyCount() == 0) {
p.setChild(index, c);
p.setCounts(index, c);
removePage(p);
} else {
p.remove(index);
}
} else {
p.setChild(index, c);
p.setCounts(index, c);
}
return result;
}
......
/*
* Copyright 2004-2011 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore;
import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.ObjectDataType;
/**
* A stored map. Read operations can happen concurrently with all other
* operations, without risk of corruption.
* <p>
* Write operations first read the relevant area from disk to memory
* concurrently, and only then modify the data. The in-memory part of write
* operations is synchronized. For scalable concurrent in-memory write
* operations, the map should be split into multiple smaller sub-maps that are
* then synchronized independently.
*
* @param <K> the key class
* @param <V> the value class
*/
public class MVMapConcurrent<K, V> extends MVMap<K, V> {
public MVMapConcurrent(DataType keyType, DataType valueType) {
super(keyType, valueType);
}
public static <K, V> MVMap<K, V> create() {
return new MVMapConcurrent<K, V>(new ObjectDataType(), new ObjectDataType());
}
protected Page copyOnWrite(Page p, long writeVersion) {
return p.copy(writeVersion);
}
@SuppressWarnings("unchecked")
public V put(K key, V value) {
checkWrite();
V result = get(key);
if (value.equals(result)) {
return result;
}
long writeVersion = store.getCurrentVersion();
synchronized (this) {
Page p = copyOnWrite(root, writeVersion);
p = splitRootIfNeeded(p, writeVersion);
result = (V) put(p, writeVersion, key, value);
newRoot(p);
}
return result;
}
@SuppressWarnings("unchecked")
public V remove(Object key) {
checkWrite();
V result = get(key);
if (result == null) {
return null;
}
long writeVersion = store.getCurrentVersion();
synchronized (this) {
Page p = copyOnWrite(root, writeVersion);
result = (V) remove(p, writeVersion, key);
newRoot(p);
}
return result;
}
}
......@@ -91,6 +91,8 @@ TODO:
- unlimited transaction size
- MVStoreTool.shrink to shrink a store (create, copy, rename, delete)
-- and for MVStore on Windows, auto-detect renamed file
- ensure data is overwritten eventually if the system doesn't have a timer
- SSD-friendly write (always in blocks of 128 or 256 KB?)
*/
......
......@@ -39,7 +39,15 @@ public class Page {
private final MVMap<?, ?> map;
private long version;
private long pos;
/**
* The total entry count of this page and all children.
*/
private long totalCount;
/**
* The number of keys.
*/
private int keyCount;
/**
......@@ -57,10 +65,37 @@ public class Page {
*/
private int memory;
/**
* The keys.
* <p>
* The array might be larger than needed, to avoid frequent re-sizing.
*/
private Object[] keys;
/**
* The values.
* <p>
* The array might be larger than needed, to avoid frequent re-sizing.
*/
private Object[] values;
/**
* The child page ids.
* <p>
* The array might be larger than needed, to avoid frequent re-sizing.
*/
private long[] children;
/**
* The child pages.
* <p>
* The array might be larger than needed, to avoid frequent re-sizing.
*/
private Page[] childrenPages;
/**
* The entry count for the given children.
*/
private long[] counts;
Page(MVMap<?, ?> map, long version) {
......@@ -168,16 +203,6 @@ public class Page {
return p != null ? p : map.readPage(children[index]);
}
/**
* Get the position of the child page at the given index.
*
* @param index the index
* @return the position
*/
long getChildPagePos(int index) {
return children[index];
}
/**
* Get the value at the given index.
*
......@@ -237,18 +262,14 @@ public class Page {
}
/**
* Create a copy of this page, if the write version is higher than the
* current version.
* Create a copy of this page.
*
* @param writeVersion the write version
* @return a page with the given write version
* @param version the new version
* @return a page with the given version
*/
public Page copyOnWrite(long writeVersion) {
if (version == writeVersion) {
return this;
}
public Page copy(long version) {
map.getStore().removePage(pos);
Page newPage = create(map, writeVersion,
Page newPage = create(map, version,
keyCount, keys, values, children, childrenPages,
counts, totalCount,
SHARED_KEYS | SHARED_VALUES | SHARED_CHILDREN | SHARED_COUNTS,
......@@ -438,13 +459,32 @@ public class Page {
children[index] = c.getPos();
childrenPages[index] = c;
}
if (c.getTotalCount() != counts[index]) {
}
/**
* Update the (descendent) count for the given child, if there was a change.
*
* @param index the index
* @param c the new child page
*/
public void setCounts(int index, Page c) {
setCounts(index, c.totalCount);
}
/**
* Update the (descendent) count for the given child, if there was a change.
*
* @param index the index
* @param total the new value
*/
public void setCounts(int index, long total) {
if (total != counts[index]) {
if ((sharedFlags & SHARED_COUNTS) != 0) {
counts = Arrays.copyOf(counts, counts.length);
sharedFlags &= ~SHARED_COUNTS;
}
long oldCount = counts[index];
counts[index] = c.getTotalCount();
counts[index] = total;
totalCount += counts[index] - oldCount;
}
}
......@@ -568,11 +608,11 @@ public class Page {
long[] newCounts = new long[counts.length + 1];
DataUtils.copyWithGap(counts, newCounts, counts.length, index);
newCounts[index] = childPage.getTotalCount();
newCounts[index] = childPage.totalCount;
counts = newCounts;
sharedFlags &= ~(SHARED_KEYS | SHARED_CHILDREN | SHARED_COUNTS);
totalCount += childPage.getTotalCount();
totalCount += childPage.totalCount;
memory += map.getKeyType().getMemory(key);
memory += DataUtils.PAGE_MEMORY_CHILD;
}
......
......@@ -158,7 +158,7 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
for (int i = 0; i < p.getKeyCount(); i++) {
if (contains(p, i, key)) {
Page cOld = p.getChildPage(i);
Page c = cOld.copyOnWrite(writeVersion);
Page c = copyOnWrite(cOld, writeVersion);
long oldSize = c.getTotalCount();
result = remove(c, writeVersion, key);
if (oldSize == c.getTotalCount()) {
......@@ -177,6 +177,7 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
p.setKey(i, getBounds(c));
}
p.setChild(i, c);
p.setCounts(i, c);
break;
}
}
......@@ -210,7 +211,7 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
private Object putOrAdd(SpatialKey key, V value, boolean alwaysAdd) {
checkWrite();
long writeVersion = store.getCurrentVersion();
Page p = root.copyOnWrite(writeVersion);
Page p = copyOnWrite(root, writeVersion);
Object result;
if (alwaysAdd || get(key) == null) {
if (p.getMemory() > store.getPageSize() && p.getKeyCount() > 1) {
......@@ -251,10 +252,11 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
if (!p.isLeaf()) {
for (int i = 0; i < p.getKeyCount(); i++) {
if (contains(p, i, key)) {
Page c = p.getChildPage(i).copyOnWrite(writeVersion);
Page c = copyOnWrite(p.getChildPage(i), writeVersion);
Object result = set(c, writeVersion, key, value);
if (result != null) {
p.setChild(i, c);
p.setCounts(i, c);
return result;
}
}
......@@ -294,13 +296,14 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
}
}
}
Page c = p.getChildPage(index).copyOnWrite(writeVersion);
Page c = copyOnWrite(p.getChildPage(index), writeVersion);
if (c.getMemory() > store.getPageSize() && c.getKeyCount() > 1) {
// split on the way down
Page split = split(c, writeVersion);
p = p.copyOnWrite(writeVersion);
p = copyOnWrite(p, writeVersion);
p.setKey(index, getBounds(c));
p.setChild(index, c);
p.setCounts(index, c);
p.insertNode(index, getBounds(split), split);
// now we are not sure where to add
add(p, writeVersion, key, value);
......@@ -311,6 +314,7 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
keyType.increaseBounds(bounds, key);
p.setKey(index, bounds);
p.setChild(index, c);
p.setCounts(index, c);
}
private Page split(Page p, long writeVersion) {
......
......@@ -15,6 +15,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVMapConcurrent;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.MVStoreBuilder;
import org.h2.mvstore.type.ObjectDataTypeFactory;
......@@ -39,11 +40,65 @@ public class TestConcurrent extends TestMVStore {
public void test() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true);
// testConcurrentOnlineBackup();
testConcurrentMap();
testConcurrentIterate();
testConcurrentWrite();
testConcurrentRead();
}
/**
* Test the concurrent map implementation.
*/
private void testConcurrentMap() throws InterruptedException {
final MVStore s = openStore(null);
final MVMap<Integer, Integer> cm = MVMapConcurrent.create();
final MVMap<Integer, Integer> m = s.openMap("data", cm);
final int size = 20;
final Random rand = new Random(1);
Task task = new Task() {
public void call() throws Exception {
try {
while (!stop) {
if (rand.nextBoolean()) {
m.put(rand.nextInt(size), 1);
} else {
m.remove(rand.nextInt(size));
}
m.get(rand.nextInt(size));
m.firstKey();
m.lastKey();
m.ceilingKey(5);
m.floorKey(5);
m.higherKey(5);
m.lowerKey(5);
for (Iterator<Integer> it = m.keyIterator(null);
it.hasNext();) {
it.next();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
task.execute();
Thread.sleep(1);
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 100; i++) {
if (rand.nextBoolean()) {
m.put(rand.nextInt(size), 2);
} else {
m.remove(rand.nextInt(size));
}
m.get(rand.nextInt(size));
}
s.incrementVersion();
Thread.sleep(1);
}
task.get();
s.close();
}
private void testConcurrentOnlineBackup() throws Exception {
// because absolute and relative reads are mixed, this currently
// only works when using FileChannel directly
......@@ -195,6 +250,8 @@ public class TestConcurrent extends TestMVStore {
// ignore
} catch (ArrayIndexOutOfBoundsException e) {
// ignore
} catch (IllegalArgumentException e) {
// ignore
} catch (NullPointerException e) {
// ignore
}
......@@ -216,6 +273,8 @@ public class TestConcurrent extends TestMVStore {
// ignore
} catch (ArrayIndexOutOfBoundsException e) {
// ignore
} catch (IllegalArgumentException e) {
// ignore
} catch (NullPointerException e) {
// ignore
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论