提交 e833c48d authored 作者: Andrei Tokar's avatar Andrei Tokar

speedup unused chunks collection

上级 b824b78d
......@@ -305,6 +305,8 @@ public class MVStore {
private long lastFreeUnusedChunks;
private final ThreadPoolExecutor executorService;
/**
* Create and open the store.
*
......@@ -353,6 +355,8 @@ public class MVStore {
}
pageSplitSize = pgSplitSize;
keysPerPage = DataUtils.getConfigParam(config, "keysPerPage", 48);
executorService = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(keysPerPage + 1));
backgroundExceptionHandler =
(UncaughtExceptionHandler)config.get("backgroundExceptionHandler");
meta = new MVMap<>(this);
......@@ -941,6 +945,11 @@ public class MVStore {
return;
}
stopBackgroundThread();
if (shrinkIfPossible) {
executorService.shutdown();
} else {
executorService.shutdownNow();
}
closed = true;
storeLock.lock();
try {
......@@ -1349,16 +1358,14 @@ public class MVStore {
if (time >= lastFreeUnusedChunks + freeDelay) {
// set early in case it fails (out of memory or so)
lastFreeUnusedChunks = time;
freeUnusedChunks();
// set it here as well, to avoid calling it often if it was slow
lastFreeUnusedChunks = getTimeSinceCreation();
freeUnusedChunks(true);
}
}
private void freeUnusedChunks() {
private void freeUnusedChunks(boolean fast) {
assert storeLock.isHeldByCurrentThread();
if (lastChunk != null && reuseSpace) {
Set<Integer> referenced = collectReferencedChunks();
Set<Integer> referenced = collectReferencedChunks(fast);
long time = getTimeSinceCreation();
for (Iterator<Chunk> iterator = chunks.values().iterator(); iterator.hasNext(); ) {
......@@ -1383,52 +1390,67 @@ public class MVStore {
}
}
}
// set it here, to avoid calling it often if it was slow
lastFreeUnusedChunks = getTimeSinceCreation();
}
}
private Set<Integer> collectReferencedChunks() {
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(keysPerPage + 1));
private Set<Integer> collectReferencedChunks(boolean fast) {
assert lastChunk != null;
final AtomicInteger executingThreadCounter = new AtomicInteger(0);
try {
ChunkIdsCollector collector = new ChunkIdsCollector(meta.getId());
Set<Long> inspectedRoots = new HashSet<>();
long pos = lastChunk.metaRootPos;
inspectedRoots.add(pos);
collector.visit(pos, executorService, executingThreadCounter);
long oldestVersionToKeep = getOldestVersionToKeep();
MVMap.RootReference rootReference = meta.getRoot();
if (fast) {
MVMap.RootReference previous;
while (rootReference.version >= oldestVersionToKeep && (previous = rootReference.previous) != null) {
rootReference = previous;
}
inspectVersion(rootReference, collector, executingThreadCounter, null);
Page rootPage = rootReference.root;
long pos = rootPage.getPos();
assert rootPage.isSaved();
int chunkId = DataUtils.getPageChunkId(pos);
while (++chunkId <= lastChunk.id) {
collector.registerChunk(chunkId);
}
} else {
Set<Long> inspectedRoots = new HashSet<>();
do {
inspectVersion(rootReference, collector, executingThreadCounter, inspectedRoots);
} while (rootReference.version >= oldestVersionToKeep && (rootReference = rootReference.previous) != null);
}
return collector.getReferenced();
}
private void inspectVersion(MVMap.RootReference rootReference, ChunkIdsCollector collector,
AtomicInteger executingThreadCounter,
Set<Long> inspectedRoots) {
Page rootPage = rootReference.root;
pos = rootPage.getPos();
if (!rootPage.isSaved()) {
collector.setMapId(meta.getId());
collector.visit(rootPage, executorService, executingThreadCounter);
} else if (inspectedRoots.add(pos)) {
long pos = rootPage.getPos();
if (rootPage.isSaved()) {
if (inspectedRoots != null && !inspectedRoots.add(pos)) {
return;
}
collector.setMapId(meta.getId());
collector.visit(pos, executorService, executingThreadCounter);
}
for (Cursor<String, String> c = new Cursor<>(rootPage, "root."); c.hasNext();) {
for (Cursor<String, String> c = new Cursor<>(rootPage, "root."); c.hasNext(); ) {
String key = c.next();
assert key != null;
if (!key.startsWith("root.")) {
break;
}
pos = DataUtils.parseHexLong(c.getValue());
if (DataUtils.isPageSaved(pos) && inspectedRoots.add(pos)) {
// to allow for something like "root.tmp.123" to be
// processed
assert DataUtils.isPageSaved(pos);
if (inspectedRoots == null || inspectedRoots.add(pos)) {
// to allow for something like "root.tmp.123" to be processed
int mapId = DataUtils.parseHexInt(key.substring(key.lastIndexOf('.') + 1));
collector.setMapId(mapId);
collector.visit(pos, executorService, executingThreadCounter);
}
}
} while (rootReference.version >= oldestVersionToKeep && (rootReference = rootReference.previous) != null);
return collector.getReferenced();
} finally {
executorService.shutdownNow();
}
}
final class ChunkIdsCollector {
......@@ -1769,7 +1791,7 @@ public class MVStore {
boolean oldReuse = reuseSpace;
try {
retentionTime = -1;
freeUnusedChunks();
freeUnusedChunks(false);
if (fileStore.getFillRate() <= targetFillRate) {
long start = fileStore.getFirstFree() / BLOCK_SIZE;
ArrayList<Chunk> move = findChunksToMove(start, moveSize);
......@@ -2064,7 +2086,7 @@ public class MVStore {
}
}
meta.rewrite(set);
freeUnusedChunks();
freeUnusedChunks(false);
commit();
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论