提交 d476faca authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore compaction

上级 35b32304
......@@ -72,7 +72,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param mapId the map id
* @return the metadata key
*/
public static String getMapRootKey(int mapId) {
static String getMapRootKey(int mapId) {
return "root." + Integer.toHexString(mapId);
}
......@@ -82,7 +82,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param mapId the map id
* @return the metadata key
*/
public static String getMapKey(int mapId) {
static String getMapKey(int mapId) {
return "map." + Integer.toHexString(mapId);
}
......@@ -837,6 +837,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
// an inner node page that is in one of the chunks,
// but only points to chunks that are not in the set:
// if no child was changed, we need to do that now
// (this is not needed if anyway one of the children
// was changed, as this would have updated this
// page as well)
Page p2 = p;
while (!p2.isLeaf()) {
p2 = p2.getChildPage(0);
......@@ -955,6 +958,12 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return store;
}
/**
* Get the map id. Please note the map id may be different after compacting
* a store.
*
* @return the map id
*/
public int getId() {
return id;
}
......
......@@ -243,6 +243,8 @@ public class MVStore {
private int autoCompactFillRate;
private long autoCompactLastFileOpCount;
private Object compactSync = new Object();
/**
* Create and open the store.
*
......@@ -834,14 +836,22 @@ public class MVStore {
return c;
}
boolean isChunkKnown(long pos) {
int chunkId = DataUtils.getPageChunkId(pos);
return chunks.containsKey(chunkId);
}
private Chunk getChunkIfFound(long pos) {
int chunkId = DataUtils.getPageChunkId(pos);
Chunk c = chunks.get(chunkId);
if (c == null) {
if (!Thread.holdsLock(this)) {
// it could also be unsynchronized metadata
// access (if synchronization on this was forgotten)
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL,
"Unsynchronized metadata read");
"Chunk {0} no longer exists",
chunkId);
}
String s = meta.get(Chunk.getMetaKey(chunkId));
if (s == null) {
......@@ -1551,6 +1561,7 @@ public class MVStore {
* @return if a chunk was re-written
*/
public boolean compact(int targetFillRate, int write) {
synchronized (compactSync) {
checkOpen();
ArrayList<Chunk> old;
synchronized (this) {
......@@ -1562,6 +1573,7 @@ public class MVStore {
compactRewrite(old);
return true;
}
}
private ArrayList<Chunk> compactGetOldChunks(int targetFillRate, int write) {
if (lastChunk == null) {
......@@ -1655,22 +1667,26 @@ public class MVStore {
MVMap<Object, Object> map = (MVMap<Object, Object>) m;
map.rewrite(set);
}
meta.rewrite(set);
commitAndSave();
boolean again = false;
boolean commitAgain = false;
for (Chunk c : old) {
if (c.maxLenLive > 0) {
// not cleared - that means bookkeeping of live pages
// is broken; copyLive will fix this
again = true;
copyLive(c);
// a concurrent commit could free up the chunk
// so we wait for any commits to finish
if (c.maxLenLive == 0) {
continue;
}
// not cleared - that means bookkeeping of live pages
// may be broken; copyLive will fix this
compactFixLive(c);
commitAgain = true;
}
if (again) {
if (commitAgain) {
commitAndSave();
}
}
private void copyLive(Chunk chunk) {
private void compactFixLive(Chunk chunk) {
long start = chunk.block * BLOCK_SIZE;
int length = chunk.len * BLOCK_SIZE;
ByteBuffer buff = fileStore.readFully(start, length);
......@@ -1683,7 +1699,6 @@ public class MVStore {
int pagesRemaining = chunk.pageCount;
markMetaChanged();
boolean mapNotOpen = false;
int changeCount = 0;
while (pagesRemaining-- > 0) {
int offset = buff.position();
int pageLength = buff.getInt();
......@@ -1704,43 +1719,27 @@ public class MVStore {
// chunk is not removed
mapNotOpen = true;
}
}
buff.position(offset + pageLength);
continue;
}
buff.position(offset);
Page page = new Page(map, 0);
int limit = buff.limit();
page.read(buff, chunk.id, buff.position(), length);
buff.limit(limit);
int type = page.isLeaf() ? 0 : 1;
long pos = DataUtils.getPagePos(chunk.id, offset, pageLength, type);
page.setPos(pos);
Object k = map.getLiveKey(page);
if (k != null) {
Object value = map.get(k);
if (value != null) {
map.replace(k, value, value);
changeCount++;
}
}
}
if (!mapNotOpen && changeCount == 0) {
// if all maps are open, but no changes were made,
if (!mapNotOpen) {
// if all maps are open
// then live bookkeeping is wrong, and we anyway
// remove the chunk
// (but we first need to check that there are no
// pending changes)
boolean pendingChanges = false;
for (HashMap<Integer, Chunk> e : freedPageSpace.values()) {
synchronized (e) {
for (int x : e.keySet()) {
if (x == chunk.id) {
changeCount++;
pendingChanges = true;
break;
}
}
}
}
if (changeCount == 0) {
if (!pendingChanges) {
// bookkeeping is broken for this chunk:
// fix it
registerFreePage(currentVersion, chunk.id,
......@@ -1797,7 +1796,7 @@ public class MVStore {
if (pos == 0) {
// the value could be smaller than 0 because
// in some cases a page is allocated,
// but never stored
// but never stored, so we need to use max
unsavedMemory = Math.max(0, unsavedMemory - memory);
return;
}
......@@ -1907,6 +1906,9 @@ public class MVStore {
* according to various tests this does not always work as expected
* depending on the operating system and hardware.
* <p>
* The retention time needs to be long enough to allow reading old chunks
* while traversing over the entries of a map.
* <p>
* This setting is not persisted.
*
* @param ms how many milliseconds to retain old chunks (0 to overwrite them
......@@ -2307,12 +2309,6 @@ public class MVStore {
// use a lower fill rate if there were any file operations
int fillRate = fileOps ? autoCompactFillRate / 3 : autoCompactFillRate;
compact(fillRate, autoCommitMemory);
; // TODO find out why this doesn't work
// if (!fileOps) {
// // if there were no file operations at all,
// // compact the file by moving chunks
// compactMoveChunks(autoCompactFillRate, autoCommitMemory);
// }
autoCompactLastFileOpCount = fileStore.getWriteCount() + fileStore.getReadCount();
} catch (Exception e) {
if (backgroundExceptionHandler != null) {
......
......@@ -62,9 +62,12 @@ public class TestConcurrent extends TestMVStore {
}
private void testConcurrentAutoCommitAndChange() throws InterruptedException {
String fileName = "memFS:testConcurrentChangeAndBackgroundCompact";
FileUtils.delete(fileName);
final MVStore s = new MVStore.Builder().fileName(
"memFS:testConcurrentChangeAndBackgroundCompact").open();
s.setRetentionTime(0);
fileName).open();
s.setRetentionTime(10000);
s.setAutoCommitDelay(1);
Task task = new Task() {
@Override
public void call() throws Exception {
......@@ -125,9 +128,13 @@ public class TestConcurrent extends TestMVStore {
}
private void testConcurrentChangeAndCompact() throws InterruptedException {
String fileName = "memFS:testConcurrentChangeAndBackgroundCompact";
FileUtils.delete(fileName);
final MVStore s = new MVStore.Builder().fileName(
"memFS:testConcurrentChangeAndBackgroundCompact").autoCommitDisabled().open();
s.setRetentionTime(0);
fileName).
pageSplitSize(10).
autoCommitDisabled().open();
s.setRetentionTime(10000);
Task task = new Task() {
@Override
public void call() throws Exception {
......@@ -137,15 +144,26 @@ public class TestConcurrent extends TestMVStore {
}
};
task.execute();
Task task2 = new Task() {
@Override
public void call() throws Exception {
while (!stop) {
s.compact(100, 1024 * 1024);
}
}
};
task2.execute();
Thread.sleep(1);
for (int i = 0; !task.isFinished() && i < 1000; i++) {
for (int i = 0; !task.isFinished() && !task2.isFinished() && i < 1000; i++) {
MVMap<Integer, Integer> map = s.openMap("d" + (i % 3));
// MVMap<Integer, Integer> map = s.openMap("d" + (i % 3),
// new MVMapConcurrent.Builder<Integer, Integer>());
map.put(0, i);
map.get(0);
s.commit();
}
task.get();
task2.get();
s.close();
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论