提交 a90648e1 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: deal with the case that data was moved to a new chunk (work in progress)

上级 5175b741
......@@ -198,7 +198,7 @@ public class Chunk {
* @return the fill rate
*/
public int getFillRate() {
if (maxLenLive == 0) {
if (maxLenLive <= 0) {
return 0;
} else if (maxLenLive == maxLen) {
return 100;
......
......@@ -1238,12 +1238,12 @@ public class MVStore {
// are not concurrently modified
c.maxLenLive += f.maxLenLive;
c.pageCountLive += f.pageCountLive;
if (c.pageCountLive < 0) {
if (c.pageCountLive < 0 && c.pageCountLive > -Integer.MAX_VALUE / 2) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL,
"Corrupt page count {0}", c.pageCountLive);
}
if (c.maxLenLive < 0) {
if (c.maxLenLive < 0 && c.maxLenLive > -Long.MAX_VALUE / 2) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL,
"Corrupt max length {0}", c.maxLenLive);
......@@ -1258,9 +1258,9 @@ public class MVStore {
it.remove();
}
for (Chunk c : modified) {
if (c.maxLenLive == 0) {
if (c.maxLenLive <= 0) {
if (c.unused == 0) {
c.unused = getTime();
c.unused = time;
}
if (canOverwriteChunk(c, time)) {
removedChunks.add(c);
......@@ -1461,7 +1461,7 @@ public class MVStore {
long time = getTime();
ArrayList<Chunk> free = New.arrayList();
for (Chunk c : chunks.values()) {
if (c.maxLenLive == 0) {
if (c.maxLenLive <= 0) {
if (canOverwriteChunk(c, time)) {
free.add(c);
}
......@@ -1689,7 +1689,7 @@ public class MVStore {
for (Chunk c : old) {
// a concurrent commit could free up the chunk
// so we wait for any commits to finish
if (c.maxLenLive == 0) {
if (c.maxLenLive <= 0) {
continue;
}
// not cleared - that means bookkeeping of live pages
......@@ -1702,7 +1702,7 @@ public class MVStore {
}
}
private void compactFixLive(Chunk chunk) {
private synchronized void compactFixLive(Chunk chunk) {
long start = chunk.block * BLOCK_SIZE;
int length = chunk.len * BLOCK_SIZE;
ByteBuffer buff = fileStore.readFully(start, length);
......@@ -1747,11 +1747,9 @@ public class MVStore {
boolean pendingChanges = false;
for (HashMap<Integer, Chunk> e : freedPageSpace.values()) {
synchronized (e) {
for (int x : e.keySet()) {
if (x == chunk.id) {
pendingChanges = true;
break;
}
if (e.containsKey(chunk.id)) {
pendingChanges = true;
break;
}
}
}
......@@ -1759,7 +1757,8 @@ public class MVStore {
// bookkeeping is broken for this chunk:
// fix it
registerFreePage(currentVersion, chunk.id,
chunk.maxLenLive, chunk.pageCountLive);
c.maxLenLive + Long.MAX_VALUE / 2,
c.pageCountLive + Integer.MAX_VALUE / 2);
}
}
}
......
......@@ -66,37 +66,40 @@ public class TestConcurrent extends TestMVStore {
FileUtils.delete(fileName);
final MVStore s = new MVStore.Builder().fileName(
fileName).open();
s.setRetentionTime(10000);
s.setAutoCommitDelay(1);
Task task = new Task() {
@Override
public void call() throws Exception {
while (!stop) {
s.compact(100, 1024 * 1024);
try {
s.setRetentionTime(1000);
s.setAutoCommitDelay(1);
Task task = new Task() {
@Override
public void call() throws Exception {
while (!stop) {
s.compact(100, 1024 * 1024);
}
}
}
};
final MVMap<Integer, Integer> dataMap = s.openMap("data");
Task task2 = new Task() {
@Override
public void call() throws Exception {
int i = 0;
while (!stop) {
dataMap.put(i++, i);
};
final MVMap<Integer, Integer> dataMap = s.openMap("data");
Task task2 = new Task() {
@Override
public void call() throws Exception {
int i = 0;
while (!stop) {
dataMap.put(i++, i);
}
}
};
task.execute();
task2.execute();
Thread.sleep(1);
for (int i = 0; !task.isFinished() && !task2.isFinished() && i < 1000; i++) {
MVMap<Integer, Integer> map = s.openMap("d" + (i % 3));
map.put(0, i);
s.commit();
}
};
task.execute();
task2.execute();
Thread.sleep(1);
for (int i = 0; !task.isFinished() && !task2.isFinished() && i < 1000; i++) {
MVMap<Integer, Integer> map = s.openMap("d" + (i % 3));
map.put(0, i);
s.commit();
task.get();
task2.get();
} finally {
s.close();
}
task.get();
task2.get();
s.close();
}
private void testConcurrentReplaceAndRead() throws InterruptedException {
......
......@@ -444,6 +444,7 @@ public class TestMVStore extends TestBase {
FileUtils.delete(fileName);
MVStore s = new MVStore.Builder().
fileName(fileName).
autoCommitDisabled().
open();
MVMap<Integer, String> m;
for (int i = 0; i < 10; i++) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论