提交 fb4be198 authored 作者: Thomas Mueller's avatar Thomas Mueller

Synchronize write access (needed for background compact operations)

上级 bd147171
......@@ -123,7 +123,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
@Override
@SuppressWarnings("unchecked")
public V put(K key, V value) {
public synchronized V put(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null");
beforeWrite();
try {
......@@ -534,7 +534,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Remove all entries.
*/
@Override
public void clear() {
public synchronized void clear() {
beforeWrite();
try {
root.removeAllRecursive();
......@@ -563,7 +563,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the old value if the key existed, or null otherwise
*/
@Override
public V remove(Object key) {
public synchronized V remove(Object key) {
beforeWrite();
try {
long v = writeVersion;
......@@ -809,9 +809,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
@SuppressWarnings("unchecked")
K key = (K) p.getKey(0);
@SuppressWarnings("unchecked")
V value = (V) p.getValue(0);
put(key, value);
V value = get(key);
if (value != null) {
replace(key, value, value);
}
return 1;
}
int writtenPageCount = 0;
......@@ -842,9 +843,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
@SuppressWarnings("unchecked")
K key = (K) p2.getKey(0);
@SuppressWarnings("unchecked")
V value = (V) p2.getValue(0);
put(key, value);
V value = get(key);
if (value != null) {
replace(key, value, value);
}
writtenPageCount++;
}
}
......
......@@ -1707,9 +1707,9 @@ public class MVStore {
page.setPos(pos);
Object k = map.getLiveKey(page);
if (k != null) {
Object value = map.remove(k);
Object value = map.get(k);
if (value != null) {
map.put(k, value);
map.replace(k, value, value);
changeCount++;
}
}
......@@ -2293,13 +2293,14 @@ public class MVStore {
fileOps = false;
}
// use a lower fill rate if there were any file operations
int fillRate = fileOps ? autoCompactFillRate / 4 : autoCompactFillRate;
int fillRate = fileOps ? autoCompactFillRate / 3 : autoCompactFillRate;
compact(fillRate, autoCommitMemory);
if (!fileOps) {
// if there were no file operations at all,
// compact the file by moving chunks
compactMoveChunks(autoCompactFillRate, autoCommitMemory);
}
; // TODO find out why this doesn't work
// if (!fileOps) {
// // if there were no file operations at all,
// // compact the file by moving chunks
// compactMoveChunks(autoCompactFillRate, autoCommitMemory);
// }
autoCompactLastFileOpCount = fileStore.getWriteCount() + fileStore.getReadCount();
} catch (Exception e) {
if (backgroundExceptionHandler != null) {
......
......@@ -625,7 +625,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
new TestCompatibility().runTest(this);
new TestCompatibilityOracle().runTest(this);
new TestCsv().runTest(this);
new TestDateStorage().runTest(this);
new TestDateStorage().runTest(this); // TODO test
new TestDeadlock().runTest(this);
new TestDrop().runTest(this);
new TestDuplicateKeyUpdate().runTest(this);
......@@ -721,7 +721,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
// synth
new TestBtreeIndex().runTest(this);
new TestConcurrentUpdate().runTest(this);
new TestDiskFull().runTest(this);
new TestDiskFull().runTest(this); // TODO test
new TestCrashAPI().runTest(this);
new TestFuzzOptimizations().runTest(this);
new TestLimit().runTest(this);
......
......@@ -47,6 +47,8 @@ public class TestConcurrent extends TestMVStore {
FileUtils.createDirectories(getBaseDir());
FileUtils.deleteRecursive("memFS:", false);
testConcurrentReplaceAndRead();
testConcurrentChangeAndCompact();
testConcurrentChangeAndGetVersion();
testConcurrentFree();
testConcurrentStoreAndRemoveMap();
......@@ -58,6 +60,61 @@ public class TestConcurrent extends TestMVStore {
testConcurrentRead();
}
private void testConcurrentReplaceAndRead() throws InterruptedException {
final MVStore s = new MVStore.Builder().open();
final MVMap<Integer, Integer> map = s.openMap("data");
for (int i = 0; i < 100; i++) {
map.put(i, i % 100);
}
Task task = new Task() {
@Override
public void call() throws Exception {
int i = 0;
while (!stop) {
map.put(i % 100, i % 100);
i++;
if (i % 1000 == 0) {
s.commit();
}
}
}
};
task.execute();
Thread.sleep(1);
for (int i = 0; !task.isFinished() && i < 1000000; i++) {
assertEquals(i % 100, map.get(i % 100).intValue());
}
task.get();
s.close();
}
private void testConcurrentChangeAndCompact() throws InterruptedException {
final MVStore s = new MVStore.Builder().fileName(
"memFS:testConcurrentChangeAndBackgroundCompact").autoCommitDisabled().open();
s.setRetentionTime(0);
Task task = new Task() {
@Override
public void call() throws Exception {
while (!stop) {
s.compact(100, 1024 * 1024);
// s.compactMoveChunks(100, 1024 * 1024);
// s.compact(100, 1024 * 1024);
}
}
};
task.execute();
Thread.sleep(1);
for (int i = 0; !task.isFinished() && i < 1000; i++) {
MVMap<Integer, Integer> map = s.openMap("d" + (i % 3));
// MVMap<Integer, Integer> map = s.openMap("d" + (i % 3),
// new MVMapConcurrent.Builder<Integer, Integer>());
map.put(0, i);
s.commit();
}
task.get();
s.close();
}
private void testConcurrentChangeAndGetVersion() throws InterruptedException {
for (int test = 0; test < 10; test++) {
final MVStore s = new MVStore.Builder().
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论