提交 0b2f0e40 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: deal with the case that data was moved to a new chunk (work in progress)

上级 14111f79
...@@ -792,45 +792,56 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -792,45 +792,56 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Re-write any pages that belong to one of the chunks in the given set. * Re-write any pages that belong to one of the chunks in the given set.
* *
* @param set the set of chunk ids * @param set the set of chunk ids
* @return whether rewriting was successful
*/ */
void rewrite(Set<Integer> set) { boolean rewrite(Set<Integer> set) {
if (root.getVersion() < createVersion) {
// a new map
return true;
}
// read from old version, to avoid concurrent reads
MVMap<K, V> readMap = openVersion(root.getVersion() - 1);
try { try {
rewrite(root, set); rewrite(readMap.root, set);
return true;
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
if (DataUtils.getErrorCode(e.getMessage()) == DataUtils.ERROR_CHUNK_NOT_FOUND) { if (DataUtils.getErrorCode(e.getMessage()) == DataUtils.ERROR_CHUNK_NOT_FOUND) {
// ignore // ignore
} else { return false;
throw e;
} }
throw e;
} }
} }
private int rewrite(Page p, Set<Integer> set) { private int rewrite(Page p, Set<Integer> set) {
if (p.isLeaf()) { if (p.isLeaf()) {
long pos = p.getPos(); long pos = p.getPos();
if (pos == 0) {
return 0;
}
int chunkId = DataUtils.getPageChunkId(pos); int chunkId = DataUtils.getPageChunkId(pos);
if (!set.contains(chunkId)) { if (!set.contains(chunkId)) {
return 0; return 0;
} }
@SuppressWarnings("unchecked") if (p.getKeyCount() > 0) {
K key = (K) p.getKey(0); @SuppressWarnings("unchecked")
V value = get(key); K key = (K) p.getKey(0);
if (value != null) { V value = get(key);
replace(key, value, value); if (value != null) {
// this is to avoid storing while replacing, to avoid a
// deadlock when rewriting the meta map
// TODO there should be no deadlocks possible
store.beforeWrite();
replace(key, value, value);
}
} }
return 1; return 1;
} }
int writtenPageCount = 0; int writtenPageCount = 0;
for (int i = 0; i < p.getChildPageCount(); i++) { for (int i = 0; i < p.getChildPageCount(); i++) {
long pos = p.getChildPagePos(i); long childPos = p.getChildPagePos(i);
if (pos != 0 && DataUtils.getPageType(pos) == DataUtils.PAGE_TYPE_LEAF) { if (childPos != 0 && DataUtils.getPageType(childPos) == DataUtils.PAGE_TYPE_LEAF) {
// we would need to load the page, and it's a leaf: // we would need to load the page, and it's a leaf:
// only do that if it's within the set of chunks we are // only do that if it's within the set of chunks we are
// interested in // interested in
int chunkId = DataUtils.getPageChunkId(pos); int chunkId = DataUtils.getPageChunkId(childPos);
if (!set.contains(chunkId)) { if (!set.contains(chunkId)) {
continue; continue;
} }
...@@ -839,27 +850,25 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -839,27 +850,25 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
if (writtenPageCount == 0) { if (writtenPageCount == 0) {
long pos = p.getPos(); long pos = p.getPos();
if (pos != 0) { int chunkId = DataUtils.getPageChunkId(pos);
int chunkId = DataUtils.getPageChunkId(pos); if (set.contains(chunkId)) {
if (set.contains(chunkId)) { // an inner node page that is in one of the chunks,
// an inner node page that is in one of the chunks, // but only points to chunks that are not in the set:
// but only points to chunks that are not in the set: // if no child was changed, we need to do that now
// if no child was changed, we need to do that now // (this is not needed if anyway one of the children
// (this is not needed if anyway one of the children // was changed, as this would have updated this
// was changed, as this would have updated this // page as well)
// page as well) Page p2 = p;
Page p2 = p; while (!p2.isLeaf()) {
while (!p2.isLeaf()) { p2 = p2.getChildPage(0);
p2 = p2.getChildPage(0); }
} @SuppressWarnings("unchecked")
@SuppressWarnings("unchecked") K key = (K) p2.getKey(0);
K key = (K) p2.getKey(0); V value = get(key);
V value = get(key); if (value != null) {
if (value != null) { replace(key, value, value);
replace(key, value, value);
}
writtenPageCount++;
} }
writtenPageCount++;
} }
} }
return writtenPageCount; return writtenPageCount;
......
...@@ -57,7 +57,7 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> { ...@@ -57,7 +57,7 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> {
afterWrite(); afterWrite();
} }
} }
@Override @Override
protected void waitUntilWritten(long version) { protected void waitUntilWritten(long version) {
// no need to wait // no need to wait
......
...@@ -1683,9 +1683,13 @@ public class MVStore { ...@@ -1683,9 +1683,13 @@ public class MVStore {
for (MVMap<?, ?> m : maps.values()) { for (MVMap<?, ?> m : maps.values()) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
MVMap<Object, Object> map = (MVMap<Object, Object>) m; MVMap<Object, Object> map = (MVMap<Object, Object>) m;
map.rewrite(set); if (!map.rewrite(set)) {
return;
}
}
if (!meta.rewrite(set)) {
return;
} }
meta.rewrite(set);
commitAndSave(); commitAndSave();
boolean commitAgain = false; boolean commitAgain = false;
for (Chunk c : old) { for (Chunk c : old) {
......
...@@ -64,8 +64,9 @@ public class TestConcurrent extends TestMVStore { ...@@ -64,8 +64,9 @@ public class TestConcurrent extends TestMVStore {
private void testConcurrentAutoCommitAndChange() throws InterruptedException { private void testConcurrentAutoCommitAndChange() throws InterruptedException {
String fileName = "memFS:testConcurrentChangeAndBackgroundCompact"; String fileName = "memFS:testConcurrentChangeAndBackgroundCompact";
FileUtils.delete(fileName); FileUtils.delete(fileName);
final MVStore s = new MVStore.Builder().fileName( final MVStore s = new MVStore.Builder().
fileName).open(); fileName(fileName).pageSplitSize(100).
open();
try { try {
s.setRetentionTime(1000); s.setRetentionTime(1000);
s.setAutoCommitDelay(1); s.setAutoCommitDelay(1);
...@@ -78,6 +79,8 @@ public class TestConcurrent extends TestMVStore { ...@@ -78,6 +79,8 @@ public class TestConcurrent extends TestMVStore {
} }
}; };
final MVMap<Integer, Integer> dataMap = s.openMap("data"); final MVMap<Integer, Integer> dataMap = s.openMap("data");
final MVMap<Integer, Integer> dataSmallMap = s.openMap("dataSmall");
s.openMap("emptyMap");
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
Task task2 = new Task() { Task task2 = new Task() {
@Override @Override
...@@ -85,6 +88,10 @@ public class TestConcurrent extends TestMVStore { ...@@ -85,6 +88,10 @@ public class TestConcurrent extends TestMVStore {
while (!stop) { while (!stop) {
int i = counter.getAndIncrement(); int i = counter.getAndIncrement();
dataMap.put(i, i * 10); dataMap.put(i, i * 10);
dataSmallMap.put(i % 100, i * 10);
if (i % 100 == 0) {
dataSmallMap.clear();
}
} }
} }
}; };
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论