Unverified 提交 c0a838a1 authored 作者: Noel Grandin's avatar Noel Grandin 提交者: GitHub

Merge pull request #1413 from grandinj/gc_gather_io2

improvements to MVStore garbage collection
......@@ -5,6 +5,7 @@
*/
package org.h2.mvstore;
import static org.h2.mvstore.MVMap.INITIAL_VERSION;
import java.lang.Thread.UncaughtExceptionHandler;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
......@@ -17,11 +18,17 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
......@@ -30,9 +37,9 @@ import org.h2.compress.CompressDeflate;
import org.h2.compress.CompressLZF;
import org.h2.compress.Compressor;
import org.h2.engine.Constants;
import org.h2.message.DbException;
import org.h2.mvstore.cache.CacheLongKeyLIRS;
import org.h2.util.MathUtils;
import static org.h2.mvstore.MVMap.INITIAL_VERSION;
import org.h2.util.Utils;
/*
......@@ -1337,11 +1344,15 @@ public class MVStore {
}
private Set<Integer> collectReferencedChunks() {
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(keysPerPage + 1));
final AtomicInteger executingThreadCounter = new AtomicInteger(0);
try {
ChunkIdsCollector collector = new ChunkIdsCollector(meta.getId());
Set<Long> inspectedRoots = new HashSet<>();
long pos = lastChunk.metaRootPos;
inspectedRoots.add(pos);
collector.visit(pos);
collector.visit(pos, executorService, executingThreadCounter);
long oldestVersionToKeep = getOldestVersionToKeep();
MVMap.RootReference rootReference = meta.getRoot();
do {
......@@ -1349,13 +1360,13 @@ public class MVStore {
pos = rootPage.getPos();
if (!rootPage.isSaved()) {
collector.setMapId(meta.getId());
collector.visit(rootPage);
} else if(inspectedRoots.add(pos)) {
collector.visit(rootPage, executorService, executingThreadCounter);
} else if (inspectedRoots.add(pos)) {
collector.setMapId(meta.getId());
collector.visit(pos);
collector.visit(pos, executorService, executingThreadCounter);
}
for (Cursor<String, String> c = new Cursor<>(rootPage, "root."); c.hasNext(); ) {
for (Cursor<String, String> c = new Cursor<>(rootPage, "root."); c.hasNext();) {
String key = c.next();
assert key != null;
if (!key.startsWith("root.")) {
......@@ -1363,23 +1374,25 @@ public class MVStore {
}
pos = DataUtils.parseHexLong(c.getValue());
if (DataUtils.isPageSaved(pos) && inspectedRoots.add(pos)) {
// to allow for something like "root.tmp.123" to be processed
// to allow for something like "root.tmp.123" to be
// processed
int mapId = DataUtils.parseHexInt(key.substring(key.lastIndexOf('.') + 1));
collector.setMapId(mapId);
collector.visit(pos);
collector.visit(pos, executorService, executingThreadCounter);
}
}
} while(rootReference.version >= oldestVersionToKeep &&
(rootReference = rootReference.previous) != null);
} while (rootReference.version >= oldestVersionToKeep && (rootReference = rootReference.previous) != null);
return collector.getReferenced();
} finally {
executorService.shutdownNow();
}
}
final class ChunkIdsCollector {
private final Set<Integer> referenced = new HashSet<>();
/** really a set */
private final ConcurrentHashMap<Integer, Integer> referencedChunks = new ConcurrentHashMap<>();
private final ChunkIdsCollector parent;
private ChunkIdsCollector child;
private int mapId;
ChunkIdsCollector(int mapId) {
......@@ -1398,29 +1411,30 @@ public class MVStore {
public void setMapId(int mapId) {
this.mapId = mapId;
if (child != null) {
child.setMapId(mapId);
}
}
public Set<Integer> getReferenced() {
return referenced;
Set<Integer> set = new HashSet<>();
set.addAll(referencedChunks.keySet());
return set;
}
public void visit(Page page) {
public void visit(Page page, ThreadPoolExecutor executorService, AtomicInteger executingThreadCounter) {
long pos = page.getPos();
if (DataUtils.isPageSaved(pos)) {
register(DataUtils.getPageChunkId(pos));
registerChunk(DataUtils.getPageChunkId(pos));
}
int count = page.map.getChildPageCount(page);
if (count > 0) {
ChunkIdsCollector childCollector = getChild();
if (count == 0) {
return;
}
final ChunkIdsCollector childCollector = new ChunkIdsCollector(this);
for (int i = 0; i < count; i++) {
Page childPage = page.getChildPageIfLoaded(i);
if (childPage != null) {
childCollector.visit(childPage);
childCollector.visit(childPage, executorService, executingThreadCounter);
} else {
childCollector.visit(page.getChildPagePos(i));
childCollector.visit(page.getChildPagePos(i), executorService, executingThreadCounter);
}
}
// and cache resulting set of chunk ids
......@@ -1429,38 +1443,39 @@ public class MVStore {
cacheChunkRef.put(pos, chunkIds, Constants.MEMORY_ARRAY + 4 * chunkIds.length);
}
}
}
public void visit(long pos) {
public void visit(long pos, ThreadPoolExecutor executorService, AtomicInteger executingThreadCounter) {
if (!DataUtils.isPageSaved(pos)) {
return;
}
register(DataUtils.getPageChunkId(pos));
if (DataUtils.getPageType(pos) != DataUtils.PAGE_TYPE_LEAF) {
registerChunk(DataUtils.getPageChunkId(pos));
if (DataUtils.getPageType(pos) == DataUtils.PAGE_TYPE_LEAF) {
return;
}
int chunkIds[];
if (cacheChunkRef != null && (chunkIds = cacheChunkRef.get(pos)) != null) {
// there is a cached set of chunk ids for this position
for (int chunkId : chunkIds) {
register(chunkId);
registerChunk(chunkId);
}
} else {
ChunkIdsCollector childCollector = getChild();
final ChunkIdsCollector childCollector = new ChunkIdsCollector(this);
Page page;
if (cache != null && (page = cache.get(pos)) != null) {
// there is a full page in cache, use it
childCollector.visit(page);
childCollector.visit(page, executorService, executingThreadCounter);
} else {
// page was not cached: read the data
Chunk chunk = getChunk(pos);
long filePos = chunk.block * BLOCK_SIZE;
filePos += DataUtils.getPageOffset(pos);
if (filePos < 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"Negative position {0}; p={1}, c={2}", filePos, pos, chunk.toString());
}
long maxPos = (chunk.block + chunk.len) * BLOCK_SIZE;
Page.readChildrenPositions(fileStore, pos, filePos, maxPos, childCollector);
Page.readChildrenPositions(fileStore, pos, filePos, maxPos,
childCollector, executorService, executingThreadCounter);
}
// and cache resulting set of chunk ids
if (cacheChunkRef != null) {
......@@ -1469,27 +1484,17 @@ public class MVStore {
}
}
}
}
private ChunkIdsCollector getChild() {
if (child == null) {
child = new ChunkIdsCollector(this);
} else {
child.referenced.clear();
}
return child;
}
private void register(int chunkId) {
if (referenced.add(chunkId) && parent != null) {
parent.register(chunkId);
private void registerChunk(int chunkId) {
if (referencedChunks.put(chunkId, 1) == null && parent != null) {
parent.registerChunk(chunkId);
}
}
private int[] getChunkIds() {
int chunkIds[] = new int[referenced.size()];
int chunkIds[] = new int[referencedChunks.size()];
int index = 0;
for (int chunkId : referenced) {
for (Integer chunkId : referencedChunks.keySet()) {
chunkIds[index++] = chunkId;
}
return chunkIds;
......
......@@ -5,15 +5,22 @@
*/
package org.h2.mvstore;
import static org.h2.engine.Constants.MEMORY_ARRAY;
import static org.h2.engine.Constants.MEMORY_OBJECT;
import static org.h2.engine.Constants.MEMORY_POINTER;
import static org.h2.mvstore.DataUtils.PAGE_TYPE_LEAF;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.compress.Compressor;
import org.h2.message.DbException;
import org.h2.mvstore.type.DataType;
import org.h2.util.Utils;
import static org.h2.engine.Constants.MEMORY_ARRAY;
import static org.h2.engine.Constants.MEMORY_OBJECT;
import static org.h2.engine.Constants.MEMORY_POINTER;
import static org.h2.mvstore.DataUtils.PAGE_TYPE_LEAF;
/**
* A page (a node or a leaf).
......@@ -247,9 +254,9 @@ public abstract class Page implements Cloneable
* @param maxPos the maximum position (the end of the chunk)
* @param collector to report child pages positions to
*/
static void readChildrenPositions(FileStore fileStore, long pos,
long filePos, long maxPos,
MVStore.ChunkIdsCollector collector) {
static void readChildrenPositions(FileStore fileStore, long pos, long filePos, long maxPos,
final MVStore.ChunkIdsCollector collector, final ThreadPoolExecutor executorService,
final AtomicInteger executingThreadCounter) {
ByteBuffer buff;
int maxLength = DataUtils.getPageMaxLength(pos);
if (maxLength == DataUtils.PAGE_LARGE) {
......@@ -260,10 +267,8 @@ public abstract class Page implements Cloneable
maxLength = (int) Math.min(maxPos - filePos, maxLength);
int length = maxLength;
if (length < 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Illegal page length {0} reading at {1}; max pos {2} ",
length, filePos, maxPos);
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"Illegal page length {0} reading at {1}; max pos {2} ", length, filePos, maxPos);
}
buff = fileStore.readFully(filePos, length);
int chunkId = DataUtils.getPageChunkId(pos);
......@@ -271,39 +276,70 @@ public abstract class Page implements Cloneable
int start = buff.position();
int pageLength = buff.getInt();
if (pageLength > maxLength) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected page length =< {1}, got {2}",
chunkId, maxLength, pageLength);
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected page length =< {1}, got {2}", chunkId, maxLength,
pageLength);
}
buff.limit(start + pageLength);
short check = buff.getShort();
int m = DataUtils.readVarInt(buff);
int mapId = collector.getMapId();
if (m != mapId) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected map id {1}, got {2}",
chunkId, mapId, m);
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected map id {1}, got {2}", chunkId, mapId, m);
}
int checkTest = DataUtils.getCheckValue(chunkId)
^ DataUtils.getCheckValue(offset)
int checkTest = DataUtils.getCheckValue(chunkId) ^ DataUtils.getCheckValue(offset)
^ DataUtils.getCheckValue(pageLength);
if (check != (short) checkTest) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected check value {1}, got {2}",
chunkId, checkTest, check);
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected check value {1}, got {2}", chunkId, checkTest, check);
}
int len = DataUtils.readVarInt(buff);
int type = buff.get();
if ((type & 1) != DataUtils.PAGE_TYPE_NODE) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"Position {0} expected to be a non-leaf", pos);
}
/**
* The logic here is a little awkward. We want to (a) execute reads in parallel, but (b)
* limit the number of threads we create. This is complicated by (a) the algorithm is
* recursive and needs to wait for children before returning up the call-stack, (b) checking
* the size of the thread-pool is not reliable.
*/
final List<Future<?>> futures = new ArrayList<>(len);
for (int i = 0; i <= len; i++) {
collector.visit(buff.getLong());
final long childPagePos = buff.getLong();
for (;;) {
int counter = executingThreadCounter.get();
if (counter >= executorService.getMaximumPoolSize()) {
collector.visit(childPagePos, executorService, executingThreadCounter);
break;
} else {
if (executingThreadCounter.compareAndSet(counter, counter + 1)) {
Future<?> f = executorService.submit(new Runnable() {
@Override
public void run() {
try {
collector.visit(childPagePos, executorService, executingThreadCounter);
} finally {
executingThreadCounter.decrementAndGet();
}
}
});
futures.add(f);
break;
}
}
}
}
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
} catch (ExecutionException ex) {
throw DbException.convert(ex);
}
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论