提交 12500cfb authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: deal with the case that data was moved to a new chunk (work in progress)

上级 a90648e1
...@@ -826,10 +826,10 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -826,10 +826,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
int writtenPageCount = 0; int writtenPageCount = 0;
for (int i = 0; i < p.getChildPageCount(); i++) { for (int i = 0; i < p.getChildPageCount(); i++) {
long pos = p.getChildPagePos(i); long pos = p.getChildPagePos(i);
if (pos == 0) { if (pos != 0 && DataUtils.getPageType(pos) == DataUtils.PAGE_TYPE_LEAF) {
continue; // we would need to load the page, and it's a leaf:
} // only do that if it's within the set of chunks we are
if (DataUtils.getPageType(pos) == DataUtils.PAGE_TYPE_LEAF) { // interested in
int chunkId = DataUtils.getPageChunkId(pos); int chunkId = DataUtils.getPageChunkId(pos);
if (!set.contains(chunkId)) { if (!set.contains(chunkId)) {
continue; continue;
......
...@@ -134,6 +134,12 @@ public class MVStore { ...@@ -134,6 +134,12 @@ public class MVStore {
private static final int FORMAT_WRITE = 1; private static final int FORMAT_WRITE = 1;
private static final int FORMAT_READ = 1; private static final int FORMAT_READ = 1;
/**
* Used to mark a chunk as free, when it was detected that live bookkeeping
* is incorrect.
*/
private static final int MARKED_FREE = 10000000;
/** /**
* The background thread, if any. * The background thread, if any.
*/ */
...@@ -1238,17 +1244,18 @@ public class MVStore { ...@@ -1238,17 +1244,18 @@ public class MVStore {
// are not concurrently modified // are not concurrently modified
c.maxLenLive += f.maxLenLive; c.maxLenLive += f.maxLenLive;
c.pageCountLive += f.pageCountLive; c.pageCountLive += f.pageCountLive;
if (c.pageCountLive < 0 && c.pageCountLive > -Integer.MAX_VALUE / 2) { if (c.pageCountLive < 0 && c.pageCountLive > -MARKED_FREE) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL, DataUtils.ERROR_INTERNAL,
"Corrupt page count {0}", c.pageCountLive); "Corrupt page count {0}", c.pageCountLive);
} }
if (c.maxLenLive < 0 && c.maxLenLive > -Long.MAX_VALUE / 2) { if (c.maxLenLive < 0 && c.maxLenLive > -MARKED_FREE) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL, DataUtils.ERROR_INTERNAL,
"Corrupt max length {0}", c.maxLenLive); "Corrupt max length {0}", c.maxLenLive);
} }
if (c.pageCount == 0 && c.maxLenLive > 0) { if (c.pageCountLive <= 0 && c.maxLenLive > 0 ||
c.maxLenLive <= 0 && c.pageCountLive > 0) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL, DataUtils.ERROR_INTERNAL,
"Corrupt max length {0}", c.maxLenLive); "Corrupt max length {0}", c.maxLenLive);
...@@ -1706,11 +1713,11 @@ public class MVStore { ...@@ -1706,11 +1713,11 @@ public class MVStore {
long start = chunk.block * BLOCK_SIZE; long start = chunk.block * BLOCK_SIZE;
int length = chunk.len * BLOCK_SIZE; int length = chunk.len * BLOCK_SIZE;
ByteBuffer buff = fileStore.readFully(start, length); ByteBuffer buff = fileStore.readFully(start, length);
Chunk c = Chunk.readChunkHeader(buff, start); Chunk verify = Chunk.readChunkHeader(buff, start);
if (c.id != chunk.id) { if (verify.id != chunk.id) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT, DataUtils.ERROR_FILE_CORRUPT,
"Expected chunk {0}, got {1}", chunk.id, c.id); "Expected chunk {0}, got {1}", chunk.id, verify.id);
} }
int pagesRemaining = chunk.pageCount; int pagesRemaining = chunk.pageCount;
markMetaChanged(); markMetaChanged();
...@@ -1754,11 +1761,14 @@ public class MVStore { ...@@ -1754,11 +1761,14 @@ public class MVStore {
} }
} }
if (!pendingChanges) { if (!pendingChanges) {
;
new Exception(fileStore.getFileName() + " chunk " + chunk.id + " fix live! " + chunk).printStackTrace(System.out);
// bookkeeping is broken for this chunk: // bookkeeping is broken for this chunk:
// fix it // fix it
registerFreePage(currentVersion, chunk.id, registerFreePage(currentVersion, chunk.id,
c.maxLenLive + Long.MAX_VALUE / 2, chunk.maxLenLive + MARKED_FREE,
c.pageCountLive + Integer.MAX_VALUE / 2); chunk.pageCountLive + MARKED_FREE);
} }
} }
} }
...@@ -1809,9 +1819,10 @@ public class MVStore { ...@@ -1809,9 +1819,10 @@ public class MVStore {
// we need to keep temporary pages, // we need to keep temporary pages,
// to support reading old versions and rollback // to support reading old versions and rollback
if (pos == 0) { if (pos == 0) {
// the value could be smaller than 0 because // the page was not yet stored:
// in some cases a page is allocated, // just using "unsavedMemory -= memory" could result in negative
// but never stored, so we need to use max // values, because in some cases a page is allocated, but never
// stored, so we need to use max
unsavedMemory = Math.max(0, unsavedMemory - memory); unsavedMemory = Math.max(0, unsavedMemory - memory);
return; return;
} }
......
...@@ -103,6 +103,14 @@ public class Page { ...@@ -103,6 +103,14 @@ public class Page {
*/ */
private Page[] childrenPages; private Page[] childrenPages;
/**
* Whether the page is an in-memory (not stored, or not yet stored) page,
* and it is removed. This is to keep track of pages that concurrently
* changed while they are being stored, in which case the live bookkeeping
* needs to be aware of such cases.
*/
private volatile boolean removedInMemory;
Page(MVMap<?, ?> map, long version) { Page(MVMap<?, ?> map, long version) {
this.map = map; this.map = map;
this.version = version; this.version = version;
...@@ -940,9 +948,13 @@ public class Page { ...@@ -940,9 +948,13 @@ public class Page {
store.cachePage(pos, this, getMemory()); store.cachePage(pos, this, getMemory());
long max = DataUtils.getPageMaxLength(pos); long max = DataUtils.getPageMaxLength(pos);
chunk.maxLen += max; chunk.maxLen += max;
chunk.maxLenLive += max;
chunk.pageCount++; chunk.pageCount++;
if (!removedInMemory) {
// if the page was removed _before_ the position was assigned, we
// must not increase the live fields.
chunk.maxLenLive += max;
chunk.pageCountLive++; chunk.pageCountLive++;
}
return typePos + 1; return typePos + 1;
} }
...@@ -1070,11 +1082,11 @@ public class Page { ...@@ -1070,11 +1082,11 @@ public class Page {
* Remove the page. * Remove the page.
*/ */
public void removePage() { public void removePage() {
map.removePage(pos, memory); long p = pos;
if (p == 0) {
removedInMemory = true;
} }
map.removePage(p, memory);
public void setPos(long pos) {
this.pos = pos;
} }
} }
...@@ -78,12 +78,13 @@ public class TestConcurrent extends TestMVStore { ...@@ -78,12 +78,13 @@ public class TestConcurrent extends TestMVStore {
} }
}; };
final MVMap<Integer, Integer> dataMap = s.openMap("data"); final MVMap<Integer, Integer> dataMap = s.openMap("data");
final AtomicInteger counter = new AtomicInteger();
Task task2 = new Task() { Task task2 = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
int i = 0;
while (!stop) { while (!stop) {
dataMap.put(i++, i); int i = counter.getAndIncrement();
dataMap.put(i, i * 10);
} }
} }
}; };
...@@ -97,6 +98,9 @@ public class TestConcurrent extends TestMVStore { ...@@ -97,6 +98,9 @@ public class TestConcurrent extends TestMVStore {
} }
task.get(); task.get();
task2.get(); task2.get();
for (int i = 0; i < counter.get(); i++) {
assertEquals(10 * i, dataMap.get(i).intValue());
}
} finally { } finally {
s.close(); s.close();
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论