提交 71d0a91b authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: an IndexOutOfBoundsException could sometimes occur MVMap.openVersion…

MVStore: an IndexOutOfBoundsException could sometimes occur MVMap.openVersion when concurrently accessing the store. 
上级 4b204d0b
......@@ -64,6 +64,26 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.valueType = valueType;
this.root = Page.createEmpty(this, -1);
}
/**
* Get the metadata key for the root of the given map id.
*
* @param mapId the map id
* @return the metadata key
*/
public static String getMapRootKey(int mapId) {
return "root." + Integer.toHexString(mapId);
}
/**
* Get the metadata key for the given map id.
*
* @param mapId the map id
* @return the metadata key
*/
public static String getMapKey(int mapId) {
return "map." + Integer.toHexString(mapId);
}
/**
* Open this map with the given store and configuration.
......@@ -698,14 +718,22 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (root != newRoot) {
removeUnusedOldVersions();
if (root.getVersion() != newRoot.getVersion()) {
ArrayList<Page> list = oldRoots;
if (list.size() > 0) {
Page last = list.get(list.size() - 1);
// directly append to the list, modifying the list
// (if somebody concurrently replaces the oldRoots
// field, the change is lost, but in many cases this is
// detected at the end of this method)
ArrayList<Page> roots = oldRoots;
if (roots.size() > 0) {
Page last = roots.get(roots.size() - 1);
if (last.getVersion() != root.getVersion()) {
list.add(root);
roots.add(root);
}
} else {
list.add(root);
roots.add(root);
}
if (roots != oldRoots) {
throw DataUtils.newConcurrentModificationException(
getName());
}
}
root = newRoot;
......@@ -955,16 +983,17 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} else if (root.getVersion() >= version) {
// iterating in descending order -
// this is not terribly efficient if there are many versions
ArrayList<Page> list = oldRoots;
while (list.size() > 0) {
int i = list.size() - 1;
Page p = list.get(i);
ArrayList<Page> roots = new ArrayList<Page>(oldRoots);
while (roots.size() > 0) {
int i = roots.size() - 1;
Page p = roots.get(i);
root = p;
list.remove(i);
roots.remove(i);
if (p.getVersion() < version) {
break;
}
}
oldRoots = roots;
}
} finally {
afterWrite();
......@@ -979,7 +1008,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (oldest == -1) {
return;
}
int i = searchRoot(oldest);
// operate on a stable array
// (items might be appended concurrently,
// but not removed)
ArrayList<Page> roots = oldRoots;
int i = searchRoot(roots, oldest);
if (i < 0) {
i = -i - 1;
}
......@@ -989,9 +1022,13 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
// create a new instance
// because another thread might iterate over it
int size = oldRoots.size() - i;
int size = roots.size() - i;
ArrayList<Page> list = new ArrayList<Page>(size);
list.addAll(oldRoots.subList(i, oldRoots.size()));
list.addAll(roots.subList(i, roots.size()));
if (roots != oldRoots) {
throw DataUtils.newConcurrentModificationException(
getName());
}
oldRoots = list;
}
......@@ -1153,8 +1190,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
store.getFileStore() == null)) {
newest = r;
} else {
// operate on a stable array
// (items might be appended concurrently)
ArrayList<Page> roots = oldRoots;
// find the newest page that has a getVersion() <= version
int i = searchRoot(version);
int i = searchRoot(roots, version);
if (i < 0) {
// not found
if (i == -1) {
......@@ -1163,7 +1203,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
i = -i - 2;
}
newest = oldRoots.get(i);
newest = roots.get(i);
}
MVMap<K, V> m = openReadOnly();
m.root = newest;
......@@ -1186,11 +1226,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return m;
}
private int searchRoot(long version) {
int low = 0, high = oldRoots.size() - 1;
private static int searchRoot(ArrayList<Page> roots, long version) {
int low = 0, high = roots.size() - 1;
while (low <= high) {
int x = (low + high) >>> 1;
long v = oldRoots.get(x).getVersion();
long v = roots.get(x).getVersion();
if (v < version) {
low = x + 1;
} else if (version < v) {
......@@ -1251,6 +1291,25 @@ public class MVMap<K, V> extends AbstractMap<K, V>
void setWriteVersion(long writeVersion) {
this.writeVersion = writeVersion;
}
public void copyFrom(MVMap<K, V> sourceMap) {
Page sourceRoot = sourceMap.root;
root = Page.create(this, writeVersion, sourceRoot);
root = copy(root, sourceRoot);
}
private Page copy(Page target, Page source) {
target = copyOnWrite(target, writeVersion);
if (!target.isLeaf()) {
for (int i = 0; i < target.getChildPageCount(); i++) {
Page sourceChild = source.getChildPage(i);
Page targetChild = Page.create(this, writeVersion, sourceChild);
targetChild = copy(targetChild, sourceChild);
target.setChild(i, targetChild);
}
}
return target;
}
@Override
public String toString() {
......
......@@ -395,7 +395,7 @@ public class MVStore {
return old;
}
map = builder.create();
String config = meta.get("map." + x);
String config = meta.get(MVMap.getMapKey(id));
c = New.hashMap();
c.putAll(DataUtils.parseMap(config));
c.put("id", id);
......@@ -410,7 +410,7 @@ public class MVStore {
map.init(this, c);
markMetaChanged();
x = Integer.toHexString(id);
meta.put("map." + x, map.asString(name));
meta.put(MVMap.getMapKey(id), map.asString(name));
meta.put("name." + name, x);
root = 0;
}
......@@ -977,7 +977,7 @@ public class MVStore {
}
for (MVMap<?, ?> m : changed) {
Page p = m.getRoot();
String key = "root." + Long.toHexString(m.getId());
String key = MVMap.getMapRootKey(m.getId());
if (p.getTotalCount() == 0) {
meta.put(key, "0");
} else {
......@@ -998,7 +998,7 @@ public class MVStore {
if (p.getTotalCount() > 0) {
p.writeUnsavedRecursive(c, buff);
long root = p.getPos();
String key = "root." + Long.toHexString(m.getId());
String key = MVMap.getMapRootKey(m.getId());
meta.put(key, Long.toHexString(root));
}
}
......@@ -1784,7 +1784,7 @@ public class MVStore {
@SuppressWarnings("unchecked")
MVMap<Object, Object> map = (MVMap<Object, Object>) getMap(mapId);
if (map == null) {
boolean mapExists = meta.containsKey("root." + Integer.toHexString(mapId));
boolean mapExists = meta.containsKey(MVMap.getMapRootKey(mapId));
if (mapExists) {
// pages of maps that were removed: the live count was
// already decremented, but maps that are not open, the
......@@ -2242,7 +2242,7 @@ public class MVStore {
}
private static long getRootPos(MVMap<String, String> map, int mapId) {
String root = map.get("root." + Integer.toHexString(mapId));
String root = map.get(MVMap.getMapRootKey(mapId));
return root == null ? 0 : DataUtils.parseHexLong(root);
}
......@@ -2318,7 +2318,7 @@ public class MVStore {
markMetaChanged();
String x = Integer.toHexString(id);
meta.remove("name." + oldName);
meta.put("map." + x, map.asString(newName));
meta.put(MVMap.getMapKey(id), map.asString(newName));
meta.put("name." + newName, x);
}
......@@ -2335,10 +2335,9 @@ public class MVStore {
int id = map.getId();
String name = getMapName(id);
markMetaChanged();
String x = Integer.toHexString(id);
meta.remove("map." + x);
meta.remove(MVMap.getMapKey(id));
meta.remove("name." + name);
meta.remove("root." + x);
meta.remove(MVMap.getMapRootKey(id));
maps.remove(id);
}
......@@ -2349,7 +2348,7 @@ public class MVStore {
* @return the name, or null if not found
*/
public synchronized String getMapName(int id) {
String m = meta.get("map." + Integer.toHexString(id));
String m = meta.get(MVMap.getMapKey(id));
return m == null ? null : DataUtils.parseMap(m).get("name");
}
......@@ -2388,6 +2387,7 @@ public class MVStore {
public void setCacheSize(int mb) {
if (cache != null) {
cache.setMaxMemory((long) mb * 1024 * 1024);
cache.clear();
}
}
......
......@@ -15,7 +15,6 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.StringDataType;
import org.h2.store.fs.FilePath;
import org.h2.store.fs.FileUtils;
......@@ -80,6 +79,7 @@ public class MVStoreTool {
pw.println("File not found: " + fileName);
return;
}
pw.printf("File %s, length %d\n", fileName, FileUtils.size(fileName));
FileChannel file = null;
int blockSize = MVStore.BLOCK_SIZE;
try {
......@@ -292,54 +292,4 @@ public class MVStoreTool {
return x.substring(0, 19);
}
/**
* A data type that can read any data that is persisted, and converts it to
* a byte array.
*/
static class GenericDataType implements DataType {
@Override
public int compare(Object a, Object b) {
throw DataUtils.newUnsupportedOperationException("Can not compare");
}
@Override
public int getMemory(Object obj) {
return obj == null ? 0 : ((byte[]) obj).length;
}
@Override
public void write(WriteBuffer buff, Object obj) {
if (obj != null) {
buff.put((byte[]) obj);
}
}
@Override
public void write(WriteBuffer buff, Object[] obj, int len, boolean key) {
for (Object o : obj) {
write(buff, o);
}
}
@Override
public Object read(ByteBuffer buff) {
int len = buff.remaining();
if (len == 0) {
return null;
}
byte[] data = new byte[len];
buff.get(data);
return data;
}
@Override
public void read(ByteBuffer buff, Object[] obj, int len, boolean key) {
for (int i = 0; i < obj.length; i++) {
obj[i] = read(buff);
}
}
}
}
......@@ -163,6 +163,29 @@ public class Page {
}
return p;
}
public static Page create(MVMap<?, ?> map, long version, Page source) {
Page p = new Page(map, version);
// the position is 0
p.keyCount = source.keyCount;
p.keys = source.keys;
if (source.isLeaf()) {
p.values = source.values;
} else {
p.childCount = source.childCount;
p.children = new long[source.children.length];
p.childrenPages = new Page[source.childrenPages.length];
p.counts = source.counts;
}
p.totalCount = source.totalCount;
p.sharedFlags = source.sharedFlags;
p.memory = source.memory;
MVStore store = map.store;
if (store != null) {
store.registerUnsavedPage(p.memory);
}
return p;
}
/**
* Read a page.
......
......@@ -47,6 +47,7 @@ public class TestConcurrent extends TestMVStore {
FileUtils.createDirectories(getBaseDir());
FileUtils.deleteRecursive("memFS:", false);
testConcurrentChangeAndGetVersion();
testConcurrentFree();
testConcurrentStoreAndRemoveMap();
testConcurrentStoreAndClose();
......@@ -56,7 +57,49 @@ public class TestConcurrent extends TestMVStore {
testConcurrentWrite();
testConcurrentRead();
}
private void testConcurrentChangeAndGetVersion() throws InterruptedException {
for (int test = 0; test < 10; test++) {
final MVStore s = new MVStore.Builder().
autoCommitDisabled().open();
s.setVersionsToKeep(10);
final MVMapConcurrent<Integer, Integer> m = s.openMap("data",
new MVMapConcurrent.Builder<Integer, Integer>());
m.put(1, 1);
Task task = new Task() {
@Override
public void call() throws Exception {
while (!stop) {
m.put(1, 1);
s.commit();
}
}
};
task.execute();
Thread.sleep(1);
for (int i = 0; i < 10000; i++) {
if (task.isFinished()) {
break;
}
for (int j = 0; j < 20; j++) {
m.put(1, 1);
s.commit();
}
s.setVersionsToKeep(15);
long version = s.getCurrentVersion() - 1;
try {
m.openVersion(version);
} catch (IllegalArgumentException e) {
// ignore
}
s.setVersionsToKeep(20);
}
task.get();
s.commit();
s.close();
}
FileUtils.deleteRecursive("memFS:", false);
}
private void testConcurrentFree() throws InterruptedException {
String fileName = "memFS:testConcurrentFree.h3";
for (int test = 0; test < 10; test++) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论