提交 06e879b0 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: concurrent compaction and store operations could result in an IllegalStateException.

上级 c4388e03
...@@ -17,7 +17,8 @@ Change Log ...@@ -17,7 +17,8 @@ Change Log
<h1>Change Log</h1> <h1>Change Log</h1>
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>Issue 594: Profiler.copyInThread does not work properly. <ul><li>MVStore: concurrent compaction and store operations could result in an IllegalStateException.
</li><li>Issue 594: Profiler.copyInThread does not work properly.
</li></ul> </li></ul>
<h2>Version 1.4.184 Beta (2014-12-19)</h2> <h2>Version 1.4.184 Beta (2014-12-19)</h2>
......
...@@ -1184,7 +1184,7 @@ public class MVStore { ...@@ -1184,7 +1184,7 @@ public class MVStore {
return version; return version;
} }
private void freeUnusedChunks() { private synchronized void freeUnusedChunks() {
if (lastChunk == null) { if (lastChunk == null) {
return; return;
} }
...@@ -1256,25 +1256,35 @@ public class MVStore { ...@@ -1256,25 +1256,35 @@ public class MVStore {
if (DataUtils.getPageType(pos) == DataUtils.PAGE_TYPE_LEAF) { if (DataUtils.getPageType(pos) == DataUtils.PAGE_TYPE_LEAF) {
return null; return null;
} }
PageChildren r = cacheChunkRef.get(pos); PageChildren r;
if (cacheChunkRef != null) {
r = cacheChunkRef.get(pos);
} else {
r = null;
}
if (r == null) { if (r == null) {
Page p = cache.get(pos); if (cache != null) {
if (p == null) { Page p = cache.get(pos);
if (p != null) {
r = new PageChildren(p);
}
}
if (r == null) {
Chunk c = getChunk(pos); Chunk c = getChunk(pos);
long filePos = c.block * BLOCK_SIZE; long filePos = c.block * BLOCK_SIZE;
filePos += DataUtils.getPageOffset(pos); filePos += DataUtils.getPageOffset(pos);
if (filePos < 0) { if (filePos < 0) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT, DataUtils.ERROR_FILE_CORRUPT,
"Negative position {0}", filePos); "Negative position {0}; p={1}, c={2}", filePos, pos, c.toString());
} }
long maxPos = (c.block + c.len) * BLOCK_SIZE; long maxPos = (c.block + c.len) * BLOCK_SIZE;
r = PageChildren.read(fileStore, pos, mapId, filePos, maxPos); r = PageChildren.read(fileStore, pos, mapId, filePos, maxPos);
} else {
r = new PageChildren(p);
} }
r.removeDuplicateChunkReferences(); r.removeDuplicateChunkReferences();
cacheChunkRef.put(pos, r); if (cacheChunkRef != null) {
cacheChunkRef.put(pos, r);
}
} }
if (r.children.length == 0) { if (r.children.length == 0) {
int chunk = DataUtils.getPageChunkId(pos); int chunk = DataUtils.getPageChunkId(pos);
......
...@@ -49,7 +49,8 @@ public class TestConcurrent extends TestMVStore { ...@@ -49,7 +49,8 @@ public class TestConcurrent extends TestMVStore {
FileUtils.deleteRecursive(getBaseDir(), true); FileUtils.deleteRecursive(getBaseDir(), true);
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
FileUtils.deleteRecursive("memFS:", false); FileUtils.deleteRecursive("memFS:", false);
testConcurrentSaveCompact();
testConcurrentDataType(); testConcurrentDataType();
testConcurrentAutoCommitAndChange(); testConcurrentAutoCommitAndChange();
testConcurrentReplaceAndRead(); testConcurrentReplaceAndRead();
...@@ -64,6 +65,40 @@ public class TestConcurrent extends TestMVStore { ...@@ -64,6 +65,40 @@ public class TestConcurrent extends TestMVStore {
testConcurrentWrite(); testConcurrentWrite();
testConcurrentRead(); testConcurrentRead();
} }
private void testConcurrentSaveCompact() throws Exception {
String fileName = "memFS:testConcurrentSaveCompact";
FileUtils.delete(fileName);
final MVStore s = new MVStore.Builder().
fileName(fileName).
cacheSize(0).
open();
try {
s.setRetentionTime(0);
final MVMap<Integer, Integer> dataMap = s.openMap("data");
Task task = new Task() {
@Override
public void call() throws Exception {
int i = 0;
while (!stop) {
s.compact(100, 1024 * 1024);
dataMap.put(i % 1000, i * 10);
s.commit();
i++;
}
}
};
task.execute();
for (int i = 0; i < 1000 && !task.isFinished(); i++) {
s.compact(100, 1024 * 1024);
dataMap.put(i % 1000, i * 10);
s.commit();
}
task.get();
} finally {
s.close();
}
}
private void testConcurrentDataType() throws InterruptedException { private void testConcurrentDataType() throws InterruptedException {
final ObjectDataType type = new ObjectDataType(); final ObjectDataType type = new ObjectDataType();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论