提交 8a5415bb authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: concurrency problems have been fixed.

上级 8022f479
...@@ -19,6 +19,7 @@ Change Log ...@@ -19,6 +19,7 @@ Change Log
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>Issue 73: MySQL compatibility: support REPLACE, patch by Cemo Koc. <ul><li>Issue 73: MySQL compatibility: support REPLACE, patch by Cemo Koc.
</li><li>MVStore: concurrency problems have been fixed.
</li></ul> </li></ul>
<h2>Version 1.3.174 (2013-10-19)</h2> <h2>Version 1.3.174 (2013-10-19)</h2>
......
...@@ -28,4 +28,5 @@ Add to http://freecode.com/ ...@@ -28,4 +28,5 @@ Add to http://freecode.com/
Add to http://twitter.com Add to http://twitter.com
- tweet: add @geospatialnews for the new geometry type and disk spatial index - tweet: add @geospatialnews for the new geometry type and disk spatial index
Close bugs: http://code.google.com/p/h2database/issues/list Close bugs: http://code.google.com/p/h2database/issues/list
Update statistics
...@@ -396,7 +396,7 @@ public class DataUtils { ...@@ -396,7 +396,7 @@ public class DataUtils {
} }
throw newIllegalStateException( throw newIllegalStateException(
ERROR_READING_FAILED, ERROR_READING_FAILED,
"Reading from {0} failed; file length {1} read length {1} at {2}", "Reading from {0} failed; file length {1} read length {2} at {3}",
file, size, dst.remaining(), pos, e); file, size, dst.remaining(), pos, e);
} }
} }
......
...@@ -214,7 +214,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -214,7 +214,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public K getKey(long index) { public K getKey(long index) {
checkOpen();
if (index < 0 || index >= size()) { if (index < 0 || index >= size()) {
return null; return null;
} }
...@@ -285,7 +284,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -285,7 +284,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the index * @return the index
*/ */
public long getKeyIndex(K key) { public long getKeyIndex(K key) {
checkOpen();
if (size() == 0) { if (size() == 0) {
return -1; return -1;
} }
...@@ -319,7 +317,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -319,7 +317,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected K getFirstLast(boolean first) { protected K getFirstLast(boolean first) {
checkOpen();
if (size() == 0) { if (size() == 0) {
return null; return null;
} }
...@@ -383,7 +380,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -383,7 +380,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the key, or null if no such key exists * @return the key, or null if no such key exists
*/ */
protected K getMinMax(K key, boolean min, boolean excluding) { protected K getMinMax(K key, boolean min, boolean excluding) {
checkOpen();
return getMinMax(root, key, min, excluding); return getMinMax(root, key, min, excluding);
} }
...@@ -429,7 +425,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -429,7 +425,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public V get(Object key) { public V get(Object key) {
checkOpen();
return (V) binarySearch(root, key); return (V) binarySearch(root, key);
} }
...@@ -514,7 +509,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -514,7 +509,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Remove all entries, and close the map. * Remove all entries, and close the map.
*/ */
public void removeMap() { public void removeMap() {
checkOpen(); int todoMoveToMVStore;
if (this == store.getMetaMap()) { if (this == store.getMetaMap()) {
return; return;
} }
...@@ -712,7 +707,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -712,7 +707,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} else { } else {
list.add(root); list.add(root);
} }
store.markChanged(this);
} }
root = newRoot; root = newRoot;
} }
...@@ -775,7 +769,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -775,7 +769,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the iterator * @return the iterator
*/ */
public Cursor<K> keyIterator(K from) { public Cursor<K> keyIterator(K from) {
checkOpen();
return new Cursor<K>(this, root, from); return new Cursor<K>(this, root, from);
} }
...@@ -790,7 +783,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -790,7 +783,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
@Override @Override
public Set<K> keySet() { public Set<K> keySet() {
checkOpen();
final MVMap<K, V> map = this; final MVMap<K, V> map = this;
final Page root = this.root; final Page root = this.root;
return new AbstractSet<K>() { return new AbstractSet<K>() {
...@@ -831,7 +823,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -831,7 +823,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return store.getMapName(id); return store.getMapName(id);
} }
public MVStore getStore() { MVStore getStore() {
return store; return store;
} }
...@@ -897,18 +889,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -897,18 +889,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return readOnly; return readOnly;
} }
/**
* Check whether the map is open.
*
* @throws IllegalStateException if the map is closed
*/
protected void checkOpen() {
if (closed) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_CLOSED, "This map is closed");
}
}
/** /**
* This method is called before writing to the map. The default * This method is called before writing to the map. The default
* implementation checks whether writing is allowed, and tries * implementation checks whether writing is allowed, and tries
...@@ -918,8 +898,11 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -918,8 +898,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* or if another thread is concurrently writing * or if another thread is concurrently writing
*/ */
protected void beforeWrite() { protected void beforeWrite() {
if (closed) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_CLOSED, "This map is closed");
}
if (readOnly) { if (readOnly) {
checkOpen();
throw DataUtils.newUnsupportedOperationException( throw DataUtils.newUnsupportedOperationException(
"This map is read-only"); "This map is read-only");
} }
...@@ -991,14 +974,12 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -991,14 +974,12 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the number of entries * @return the number of entries
*/ */
public long sizeAsLong() { public long sizeAsLong() {
checkOpen();
return root.getTotalCount(); return root.getTotalCount();
} }
@Override @Override
public boolean isEmpty() { public boolean isEmpty() {
// could also use (sizeAsLong() == 0) // could also use (sizeAsLong() == 0)
checkOpen();
return root.isLeaf() && root.getKeyCount() == 0; return root.isLeaf() && root.getKeyCount() == 0;
} }
...@@ -1061,7 +1042,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1061,7 +1042,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* *
* @return the opened map * @return the opened map
*/ */
protected MVMap<K, V> openReadOnly() { MVMap<K, V> openReadOnly() {
MVMap<K, V> m = new MVMap<K, V>(keyType, valueType); MVMap<K, V> m = new MVMap<K, V>(keyType, valueType);
m.readOnly = true; m.readOnly = true;
HashMap<String, String> config = New.hashMap(); HashMap<String, String> config = New.hashMap();
...@@ -1140,6 +1121,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1140,6 +1121,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param newMapName the name name * @param newMapName the name name
*/ */
public void renameMap(String newMapName) { public void renameMap(String newMapName) {
int todoMoveToMVStore;
beforeWrite(); beforeWrite();
try { try {
store.renameMap(this, newMapName); store.renameMap(this, newMapName);
......
...@@ -47,7 +47,7 @@ Documentation ...@@ -47,7 +47,7 @@ Documentation
TestMVStoreDataLoss TestMVStoreDataLoss
MVTableEngine: MVTableEngine:
- use StreamStore - use StreamStore to avoid deadlocks
- when the MVStore was enabled before, use it again - when the MVStore was enabled before, use it again
(probably by checking existence of the mvstore file) (probably by checking existence of the mvstore file)
- not use the .h2.db file - not use the .h2.db file
...@@ -56,7 +56,6 @@ MVTableEngine: ...@@ -56,7 +56,6 @@ MVTableEngine:
TransactionStore: TransactionStore:
MVStore: MVStore:
- rename FilePathCrypt to FilePathCipher
- automated 'kill process' and 'power failure' test - automated 'kill process' and 'power failure' test
- update checkstyle - update checkstyle
- auto-compact from time to time and on close - auto-compact from time to time and on close
...@@ -107,11 +106,14 @@ MVStore: ...@@ -107,11 +106,14 @@ MVStore:
- storage that splits database into multiple files, - storage that splits database into multiple files,
to speed up compact and allow using trim to speed up compact and allow using trim
(by truncating / deleting empty files) (by truncating / deleting empty files)
- add new feature to file systems that avoid copying data - add new feature to the file system API to avoid copying data
(reads should return a ByteBuffer, not write into one) (reads that returns a ByteBuffer instead of writing into one)
for memory mapped files and off-heap storage
- do we need to store a dummy chunk entry in the chunk itself? - do we need to store a dummy chunk entry in the chunk itself?
currently yes, as some fields are not set in the chunk header currently yes, as some fields are not set in the chunk header
- off-heap LIRS cache (the LIRS cache should use a map factory) - support log structured merge style operations (blind writes)
using one map per level plus bloom filter
- have a strict call order MVStore -> MVMap -> Page -> FileStore
*/ */
...@@ -207,6 +209,7 @@ public class MVStore { ...@@ -207,6 +209,7 @@ public class MVStore {
private int unsavedPageCount; private int unsavedPageCount;
private int unsavedPageCountMax; private int unsavedPageCountMax;
private boolean storeNeeded;
/** /**
* The time the store was created, in milliseconds since 1970. * The time the store was created, in milliseconds since 1970.
...@@ -427,9 +430,9 @@ public class MVStore { ...@@ -427,9 +430,9 @@ public class MVStore {
c.put("createVersion", Long.toString(currentVersion)); c.put("createVersion", Long.toString(currentVersion));
map = builder.create(); map = builder.create();
map.init(this, c); map.init(this, c);
markMetaChanged();
meta.put("map." + id, map.asString(name)); meta.put("map." + id, map.asString(name));
meta.put("name." + name, Integer.toString(id)); meta.put("name." + name, Integer.toString(id));
markMetaChanged();
root = 0; root = 0;
} }
map.setRootPos(root, -1); map.setRootPos(root, -1);
...@@ -439,8 +442,11 @@ public class MVStore { ...@@ -439,8 +442,11 @@ public class MVStore {
/** /**
* Get the metadata map. This data is for informational purposes only. The * Get the metadata map. This data is for informational purposes only. The
* data is subject to change in future versions. The data should not be * data is subject to change in future versions.
* modified (doing so may corrupt the store). * <p>
* The data should not be modified (doing so may corrupt the store). Writing
* to it is not always detected as a modification, so that changes to it
* might not be stored.
* <p> * <p>
* It contains the following entries: * It contains the following entries:
* *
...@@ -502,21 +508,10 @@ public class MVStore { ...@@ -502,21 +508,10 @@ public class MVStore {
return meta.containsKey("name." + name); return meta.containsKey("name." + name);
} }
/**
* Mark a map as changed (containing unsaved changes).
*
* @param map the map
*/
void markChanged(MVMap<?, ?> map) {
if (map == meta) {
metaChanged = true;
}
}
private void markMetaChanged() { private void markMetaChanged() {
// changes in the metadata alone are usually not detected, as the meta // changes in the metadata alone are usually not detected, as the meta
// map is changed after storing // map is changed after storing
markChanged(meta); metaChanged = true;
} }
private void readMeta() { private void readMeta() {
...@@ -892,6 +887,10 @@ public class MVStore { ...@@ -892,6 +887,10 @@ public class MVStore {
for (MVMap<?, ?> m : list) { for (MVMap<?, ?> m : list) {
m.setWriteVersion(version); m.setWriteVersion(version);
long v = m.getVersion(); long v = m.getVersion();
if (m.getCreateVersion() > storeVersion) {
// the map was created after storing started
continue;
}
if (v >= 0 && v >= lastStoredVersion) { if (v >= 0 && v >= lastStoredVersion) {
m.waitUntilWritten(storeVersion); m.waitUntilWritten(storeVersion);
MVMap<?, ?> r = m.openVersion(storeVersion); MVMap<?, ?> r = m.openVersion(storeVersion);
...@@ -1203,6 +1202,7 @@ public class MVStore { ...@@ -1203,6 +1202,7 @@ public class MVStore {
} }
for (Chunk c : free) { for (Chunk c : free) {
chunks.remove(c.id); chunks.remove(c.id);
markMetaChanged();
meta.remove("chunk." + c.id); meta.remove("chunk." + c.id);
int length = MathUtils.roundUpInt(c.length, BLOCK_SIZE) + BLOCK_SIZE; int length = MathUtils.roundUpInt(c.length, BLOCK_SIZE) + BLOCK_SIZE;
fileStore.free(c.start, length); fileStore.free(c.start, length);
...@@ -1237,6 +1237,7 @@ public class MVStore { ...@@ -1237,6 +1237,7 @@ public class MVStore {
buff.position(0); buff.position(0);
fileStore.writeFully(end, buff.getBuffer()); fileStore.writeFully(end, buff.getBuffer());
releaseWriteBuffer(buff); releaseWriteBuffer(buff);
markMetaChanged();
meta.put("chunk." + c.id, c.asString()); meta.put("chunk." + c.id, c.asString());
} }
boolean oldReuse = reuseSpace; boolean oldReuse = reuseSpace;
...@@ -1273,6 +1274,7 @@ public class MVStore { ...@@ -1273,6 +1274,7 @@ public class MVStore {
buff.position(0); buff.position(0);
fileStore.writeFully(pos, buff.getBuffer()); fileStore.writeFully(pos, buff.getBuffer());
releaseWriteBuffer(buff); releaseWriteBuffer(buff);
markMetaChanged();
meta.put("chunk." + c.id, c.asString()); meta.put("chunk." + c.id, c.asString());
} }
...@@ -1501,11 +1503,15 @@ public class MVStore { ...@@ -1501,11 +1503,15 @@ public class MVStore {
} }
private void registerFreePage(long version, int chunkId, long maxLengthLive, int pageCount) { private void registerFreePage(long version, int chunkId, long maxLengthLive, int pageCount) {
HashMap<Integer, Chunk>freed = freedPageSpace.get(version); HashMap<Integer, Chunk> freed = freedPageSpace.get(version);
if (freed == null) { if (freed == null) {
freed = New.hashMap(); freed = New.hashMap();
freedPageSpace.put(version, freed); HashMap<Integer, Chunk> f2 = freedPageSpace.putIfAbsent(version, freed);
if (f2 != null) {
freed = f2;
} }
}
synchronized (freed) {
Chunk f = freed.get(chunkId); Chunk f = freed.get(chunkId);
if (f == null) { if (f == null) {
f = new Chunk(chunkId); f = new Chunk(chunkId);
...@@ -1514,6 +1520,7 @@ public class MVStore { ...@@ -1514,6 +1520,7 @@ public class MVStore {
f.maxLengthLive -= maxLengthLive; f.maxLengthLive -= maxLengthLive;
f.pageCountLive -= pageCount; f.pageCountLive -= pageCount;
} }
}
Compressor getCompressor() { Compressor getCompressor() {
return compressor; return compressor;
...@@ -1679,17 +1686,17 @@ public class MVStore { ...@@ -1679,17 +1686,17 @@ public class MVStore {
*/ */
void registerUnsavedPage() { void registerUnsavedPage() {
unsavedPageCount++; unsavedPageCount++;
if (unsavedPageCount > unsavedPageCountMax && unsavedPageCountMax > 0) {
storeNeeded = true;
}
} }
/** /**
* This method is called before writing to a map. * This method is called before writing to a map.
*/ */
void beforeWrite() { void beforeWrite() {
if (currentStoreVersion >= 0) { if (storeNeeded) {
// store is possibly called within store, if the meta map changed storeNeeded = false;
return;
}
if (unsavedPageCount > unsavedPageCountMax && unsavedPageCountMax > 0) {
store(true); store(true);
} }
} }
...@@ -1819,6 +1826,7 @@ public class MVStore { ...@@ -1819,6 +1826,7 @@ public class MVStore {
// rollback might have rolled back the stored chunk metadata as well // rollback might have rolled back the stored chunk metadata as well
Chunk c = chunks.get(lastChunkId - 1); Chunk c = chunks.get(lastChunkId - 1);
if (c != null) { if (c != null) {
markMetaChanged();
meta.put("chunk." + c.id, c.asString()); meta.put("chunk." + c.id, c.asString());
} }
currentVersion = version; currentVersion = version;
......
...@@ -52,7 +52,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -52,7 +52,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public V get(Object key) { public V get(Object key) {
checkOpen();
return (V) get(root, key); return (V) get(root, key);
} }
...@@ -63,7 +62,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -63,7 +62,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
* @return the iterator * @return the iterator
*/ */
public RTreeCursor findIntersectingKeys(SpatialKey x) { public RTreeCursor findIntersectingKeys(SpatialKey x) {
checkOpen();
return new RTreeCursor(root, x) { return new RTreeCursor(root, x) {
@Override @Override
protected boolean check(boolean leaf, SpatialKey key, SpatialKey test) { protected boolean check(boolean leaf, SpatialKey key, SpatialKey test) {
...@@ -79,7 +77,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -79,7 +77,6 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
* @return the iterator * @return the iterator
*/ */
public RTreeCursor findContainedKeys(SpatialKey x) { public RTreeCursor findContainedKeys(SpatialKey x) {
checkOpen();
return new RTreeCursor(root, x) { return new RTreeCursor(root, x) {
@Override @Override
protected boolean check(boolean leaf, SpatialKey key, SpatialKey test) { protected boolean check(boolean leaf, SpatialKey key, SpatialKey test) {
......
...@@ -233,7 +233,7 @@ java org.h2.test.TestAll timer ...@@ -233,7 +233,7 @@ java org.h2.test.TestAll timer
*/ */
; ;
private static final boolean MV_STORE = false; private static final boolean MV_STORE = true;
/** /**
* If the test should run with many rows. * If the test should run with many rows.
...@@ -470,7 +470,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1` ...@@ -470,7 +470,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
prof.interval = 1; prof.interval = 1;
prof.startCollecting(); prof.startCollecting();
if (test.mvStore) { if (test.mvStore) {
TestPerformance.main("-init", "-db", "9", "-size", "1000"); TestPerformance.main("-init", "-db", "9", "-size", "10000");
} else { } else {
TestPerformance.main("-init", "-db", "1"); TestPerformance.main("-init", "-db", "1");
} }
...@@ -481,7 +481,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1` ...@@ -481,7 +481,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
prof.depth = 16; prof.depth = 16;
prof.interval = 1; prof.interval = 1;
prof.startCollecting(); prof.startCollecting();
TestPerformance.main("-init", "-db", "1", "-size", "1000"); TestPerformance.main("-init", "-db", "1", "-size", "10000");
prof.stopCollecting(); prof.stopCollecting();
System.out.println(prof.getTop(3)); System.out.println(prof.getTop(3));
} }
......
...@@ -10,6 +10,7 @@ import java.io.ByteArrayOutputStream; ...@@ -10,6 +10,7 @@ import java.io.ByteArrayOutputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.ConcurrentModificationException; import java.util.ConcurrentModificationException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
...@@ -23,6 +24,7 @@ import org.h2.mvstore.MVStore; ...@@ -23,6 +24,7 @@ import org.h2.mvstore.MVStore;
import org.h2.store.fs.FileChannelInputStream; import org.h2.store.fs.FileChannelInputStream;
import org.h2.store.fs.FileUtils; import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase; import org.h2.test.TestBase;
import org.h2.upgrade.v1_1.util.New;
import org.h2.util.Task; import org.h2.util.Task;
/** /**
...@@ -41,9 +43,12 @@ public class TestConcurrent extends TestMVStore { ...@@ -41,9 +43,12 @@ public class TestConcurrent extends TestMVStore {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true); FileUtils.deleteRecursive(getBaseDir(), true);
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
FileUtils.deleteRecursive("memFS:", false);
testConcurrentFree();
testConcurrentStoreAndRemoveMap(); testConcurrentStoreAndRemoveMap();
testConcurrentStoreAndClose(); testConcurrentStoreAndClose();
testConcurrentOnlineBackup(); testConcurrentOnlineBackup();
...@@ -53,65 +58,149 @@ public class TestConcurrent extends TestMVStore { ...@@ -53,65 +58,149 @@ public class TestConcurrent extends TestMVStore {
testConcurrentRead(); testConcurrentRead();
} }
private void testConcurrentFree() throws InterruptedException {
String fileName = "memFS:testConcurrentFree.h3";
for (int test = 0; test < 10; test++) {
FileUtils.delete(fileName);
final MVStore s1 = new MVStore.Builder().
fileName(fileName).writeDelay(-1).open();
s1.setRetentionTime(0);
final int count = 200;
for (int i = 0; i < count; i++) {
MVMap<Integer, Integer> m = s1.openMap("d" + i);
m.put(1, 1);
if (i % 2 == 0) {
s1.store();
}
}
s1.store();
s1.close();
final MVStore s = new MVStore.Builder().
fileName(fileName).writeDelay(-1).open();
s.setRetentionTime(0);
final ArrayList<MVMap<Integer, Integer>> list = New.arrayList();
for (int i = 0; i < count; i++) {
MVMap<Integer, Integer> m = s.openMap("d" + i);
list.add(m);
}
final AtomicInteger counter = new AtomicInteger();
Task task = new Task() {
@Override
public void call() throws Exception {
while (!stop) {
int x = counter.getAndIncrement();
if (x >= count) {
break;
}
MVMap<Integer, Integer> m = list.get(x);
m.clear();
m.removeMap();
}
}
};
task.execute();
Thread.sleep(1);
while (true) {
int x = counter.getAndIncrement();
if (x >= count) {
break;
}
MVMap<Integer, Integer> m = list.get(x);
m.clear();
m.removeMap();
if (x % 5 == 0) {
s.incrementVersion();
}
}
task.get();
s.store();
MVMap<String, String> meta = s.getMetaMap();
int chunkCount = 0;
for (String k : meta.keyList()) {
if (k.startsWith("chunk.")) {
chunkCount++;
}
}
assertEquals(1, chunkCount);
s.close();
}
}
private void testConcurrentStoreAndRemoveMap() throws InterruptedException { private void testConcurrentStoreAndRemoveMap() throws InterruptedException {
String fileName = getBaseDir() + "/testConcurrentStoreAndRemoveMap.h3"; String fileName = "memFS:testConcurrentStoreAndRemoveMap.h3";
final MVStore s = openStore(fileName); final MVStore s = openStore(fileName);
int count = 100; int count = 200;
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
MVMap<Integer, Integer> m = s.openMap("d" + i); MVMap<Integer, Integer> m = s.openMap("d" + i);
m.put(1, 1); m.put(1, 1);
} }
final AtomicInteger counter = new AtomicInteger();
Task task = new Task() { Task task = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
while (!stop) { while (!stop) {
counter.incrementAndGet();
s.store(); s.store();
} }
} }
}; };
task.execute(); task.execute();
Thread.sleep(1); Thread.sleep(1);
for (int i = 0; i < count; i++) { for (int i = 0; i < count || counter.get() < count; i++) {
MVMap<Integer, Integer> m = s.openMap("d" + i); MVMap<Integer, Integer> m = s.openMap("d" + i);
m.put(1, 10); m.put(1, 10);
m.removeMap(); m.removeMap();
if (task.isFinished()) {
break;
}
} }
task.get(); task.get();
s.close(); s.close();
} }
private void testConcurrentStoreAndClose() throws InterruptedException { private void testConcurrentStoreAndClose() throws InterruptedException {
String fileName = getBaseDir() + "/testConcurrentStoreAndClose.h3"; String fileName = "memFS:testConcurrentStoreAndClose";
for (int i = 0; i < 10; i++) {
FileUtils.delete(fileName);
final MVStore s = openStore(fileName); final MVStore s = openStore(fileName);
final AtomicInteger counter = new AtomicInteger();
Task task = new Task() { Task task = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
int x = 0;
while (!stop) { while (!stop) {
s.setStoreVersion(x++); s.setStoreVersion(counter.incrementAndGet());
s.store(); s.store();
} }
} }
}; };
task.execute(); task.execute();
while (counter.get() < 5) {
Thread.sleep(1); Thread.sleep(1);
}
try { try {
s.close(); s.close();
// sometimes closing works, in which case // sometimes closing works, in which case
// storing fails at some point // storing must fail at some point (not necessarily
Thread.sleep(100); // immediately)
for (int x = counter.get(), y = x; x <= y + 2; x++) {
Thread.sleep(1);
}
Exception e = task.getException(); Exception e = task.getException();
assertEquals(DataUtils.ERROR_CLOSED, assertEquals(DataUtils.ERROR_CLOSED,
DataUtils.getErrorCode(e.getMessage())); DataUtils.getErrorCode(e.getMessage()));
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
// sometimes storing works, in which case // sometimes storing works, in which case
// closing fails // closing must fail
assertEquals(DataUtils.ERROR_WRITING_FAILED, assertEquals(DataUtils.ERROR_WRITING_FAILED,
DataUtils.getErrorCode(e.getMessage())); DataUtils.getErrorCode(e.getMessage()));
task.get(); task.get();
} }
s.close(); s.close();
} }
}
/** /**
* Test the concurrent map implementation. * Test the concurrent map implementation.
...@@ -177,7 +266,7 @@ public class TestConcurrent extends TestMVStore { ...@@ -177,7 +266,7 @@ public class TestConcurrent extends TestMVStore {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
while (!stop) { while (!stop) {
for (int i = 0; i < 20; i++) { for (int i = 0; i < 10; i++) {
map.put(i, new byte[100 * r.nextInt(100)]); map.put(i, new byte[100 * r.nextInt(100)]);
} }
s.store(); s.store();
...@@ -187,7 +276,7 @@ public class TestConcurrent extends TestMVStore { ...@@ -187,7 +276,7 @@ public class TestConcurrent extends TestMVStore {
if (len > 1024 * 1024) { if (len > 1024 * 1024) {
// slow down writing a lot // slow down writing a lot
Thread.sleep(200); Thread.sleep(200);
} else if (len > 1024 * 100) { } else if (len > 20 * 1024) {
// slow down writing // slow down writing
Thread.sleep(20); Thread.sleep(20);
} }
......
...@@ -547,6 +547,7 @@ public class TestMVStore extends TestBase { ...@@ -547,6 +547,7 @@ public class TestMVStore extends TestBase {
MVMap<Integer, String> map; MVMap<Integer, String> map;
s = new MVStore.Builder(). s = new MVStore.Builder().
fileName(fileName). fileName(fileName).
writeDelay(-1).
compressData().open(); compressData().open();
map = s.openMap("test"); map = s.openMap("test");
// add 10 MB of data // add 10 MB of data
...@@ -556,7 +557,7 @@ public class TestMVStore extends TestBase { ...@@ -556,7 +557,7 @@ public class TestMVStore extends TestBase {
s.store(); s.store();
s.close(); s.close();
int[] expectedReadsForCacheSize = { int[] expectedReadsForCacheSize = {
3405, 2590, 1924, 1440, 1108, 956, 918 3405, 2590, 1924, 1440, 1103, 956, 918
}; };
for (int cacheSize = 0; cacheSize <= 6; cacheSize += 4) { for (int cacheSize = 0; cacheSize <= 6; cacheSize += 4) {
s = new MVStore.Builder(). s = new MVStore.Builder().
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论