提交 67fa44ed authored 作者: Thomas Mueller's avatar Thomas Mueller

Synchronize write access (needed for background compact operations), but avoid deadlocks.

上级 15eef24e
...@@ -997,9 +997,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -997,9 +997,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return; return;
} }
Page last = oldRoots.peekLast(); Page last = oldRoots.peekLast();
// TODO why is this? (maybe not needed)
;
oldest--;
while (true) { while (true) {
Page p = oldRoots.peekFirst(); Page p = oldRoots.peekFirst();
if (p == null || p.getVersion() >= oldest || p == last) { if (p == null || p.getVersion() >= oldest || p == last) {
......
...@@ -1550,11 +1550,23 @@ public class MVStore { ...@@ -1550,11 +1550,23 @@ public class MVStore {
* @param write the minimum number of bytes to write * @param write the minimum number of bytes to write
* @return if a chunk was re-written * @return if a chunk was re-written
*/ */
public synchronized boolean compact(int targetFillRate, int write) { public boolean compact(int targetFillRate, int write) {
checkOpen(); checkOpen();
ArrayList<Chunk> old;
synchronized (this) {
old = compactGetOldChunks(targetFillRate, write);
}
if (old == null || old.size() == 0) {
return false;
}
compactRewrite(old);
return true;
}
private ArrayList<Chunk> compactGetOldChunks(int targetFillRate, int write) {
if (lastChunk == null) { if (lastChunk == null) {
// nothing to do // nothing to do
return false; return null;
} }
// calculate the fill rate // calculate the fill rate
...@@ -1571,7 +1583,7 @@ public class MVStore { ...@@ -1571,7 +1583,7 @@ public class MVStore {
} }
int fillRate = (int) (100 * maxLengthLiveSum / maxLengthSum); int fillRate = (int) (100 * maxLengthLiveSum / maxLengthSum);
if (fillRate >= targetFillRate) { if (fillRate >= targetFillRate) {
return false; return null;
} }
long time = getTime(); long time = getTime();
...@@ -1587,7 +1599,7 @@ public class MVStore { ...@@ -1587,7 +1599,7 @@ public class MVStore {
} }
} }
if (old.size() == 0) { if (old.size() == 0) {
return false; return null;
} }
// sort the list, so the first entry should be collected first // sort the list, so the first entry should be collected first
...@@ -1618,7 +1630,7 @@ public class MVStore { ...@@ -1618,7 +1630,7 @@ public class MVStore {
move = c; move = c;
} }
if (chunkCount < 1) { if (chunkCount < 1) {
return false; return null;
} }
// remove the chunks we want to keep from this list // remove the chunks we want to keep from this list
boolean remove = false; boolean remove = false;
...@@ -1630,6 +1642,10 @@ public class MVStore { ...@@ -1630,6 +1642,10 @@ public class MVStore {
it.remove(); it.remove();
} }
} }
return old;
}
private void compactRewrite(ArrayList<Chunk> old) {
HashSet<Integer> set = New.hashSet(); HashSet<Integer> set = New.hashSet();
for (Chunk c : old) { for (Chunk c : old) {
set.add(c.id); set.add(c.id);
...@@ -1652,15 +1668,9 @@ public class MVStore { ...@@ -1652,15 +1668,9 @@ public class MVStore {
if (again) { if (again) {
commitAndSave(); commitAndSave();
} }
return true;
} }
private void copyLive(Chunk chunk) { private void copyLive(Chunk chunk) {
if (chunk.pageCountLive == 0) {
// remove this chunk in the next save operation
registerFreePage(currentVersion, chunk.id, 0, 0);
return;
}
long start = chunk.block * BLOCK_SIZE; long start = chunk.block * BLOCK_SIZE;
int length = chunk.len * BLOCK_SIZE; int length = chunk.len * BLOCK_SIZE;
ByteBuffer buff = fileStore.readFully(start, length); ByteBuffer buff = fileStore.readFully(start, length);
...@@ -1721,10 +1731,12 @@ public class MVStore { ...@@ -1721,10 +1731,12 @@ public class MVStore {
// (but we first need to check that there are no // (but we first need to check that there are no
// pending changes) // pending changes)
for (HashMap<Integer, Chunk> e : freedPageSpace.values()) { for (HashMap<Integer, Chunk> e : freedPageSpace.values()) {
for (int x : e.keySet()) { synchronized (e) {
if (x == chunk.id) { for (int x : e.keySet()) {
changeCount++; if (x == chunk.id) {
break; changeCount++;
break;
}
} }
} }
} }
...@@ -1932,7 +1944,7 @@ public class MVStore { ...@@ -1932,7 +1944,7 @@ public class MVStore {
long getOldestVersionToKeep() { long getOldestVersionToKeep() {
long v = currentVersion; long v = currentVersion;
if (fileStore == null) { if (fileStore == null) {
return v - versionsToKeep + 1; return v - versionsToKeep;
} }
long storeVersion = currentStoreVersion; long storeVersion = currentStoreVersion;
if (storeVersion > -1) { if (storeVersion > -1) {
......
...@@ -47,6 +47,7 @@ public class TestConcurrent extends TestMVStore { ...@@ -47,6 +47,7 @@ public class TestConcurrent extends TestMVStore {
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
FileUtils.deleteRecursive("memFS:", false); FileUtils.deleteRecursive("memFS:", false);
testConcurrentAutoCommitAndChange();
testConcurrentReplaceAndRead(); testConcurrentReplaceAndRead();
testConcurrentChangeAndCompact(); testConcurrentChangeAndCompact();
testConcurrentChangeAndGetVersion(); testConcurrentChangeAndGetVersion();
...@@ -60,6 +61,41 @@ public class TestConcurrent extends TestMVStore { ...@@ -60,6 +61,41 @@ public class TestConcurrent extends TestMVStore {
testConcurrentRead(); testConcurrentRead();
} }
private void testConcurrentAutoCommitAndChange() throws InterruptedException {
final MVStore s = new MVStore.Builder().fileName(
"memFS:testConcurrentChangeAndBackgroundCompact").open();
s.setRetentionTime(0);
Task task = new Task() {
@Override
public void call() throws Exception {
while (!stop) {
s.compact(100, 1024 * 1024);
}
}
};
final MVMap<Integer, Integer> dataMap = s.openMap("data");
Task task2 = new Task() {
@Override
public void call() throws Exception {
int i = 0;
while (!stop) {
dataMap.put(i++, i);
}
}
};
task.execute();
task2.execute();
Thread.sleep(1);
for (int i = 0; !task.isFinished() && !task2.isFinished() && i < 1000; i++) {
MVMap<Integer, Integer> map = s.openMap("d" + (i % 3));
map.put(0, i);
s.commit();
}
task.get();
task2.get();
s.close();
}
private void testConcurrentReplaceAndRead() throws InterruptedException { private void testConcurrentReplaceAndRead() throws InterruptedException {
final MVStore s = new MVStore.Builder().open(); final MVStore s = new MVStore.Builder().open();
final MVMap<Integer, Integer> map = s.openMap("data"); final MVMap<Integer, Integer> map = s.openMap("data");
...@@ -97,8 +133,6 @@ public class TestConcurrent extends TestMVStore { ...@@ -97,8 +133,6 @@ public class TestConcurrent extends TestMVStore {
public void call() throws Exception { public void call() throws Exception {
while (!stop) { while (!stop) {
s.compact(100, 1024 * 1024); s.compact(100, 1024 * 1024);
// s.compactMoveChunks(100, 1024 * 1024);
// s.compact(100, 1024 * 1024);
} }
} }
}; };
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论