提交 844f4f49 authored 作者: Noel Grandin's avatar Noel Grandin

read multiple child pages in parallel

上级 0a5b618f
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
*/ */
package org.h2.mvstore; package org.h2.mvstore;
import static org.h2.mvstore.MVMap.INITIAL_VERSION;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
...@@ -17,11 +18,17 @@ import java.util.HashMap; ...@@ -17,11 +18,17 @@ import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.PriorityQueue; import java.util.PriorityQueue;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
...@@ -30,9 +37,9 @@ import org.h2.compress.CompressDeflate; ...@@ -30,9 +37,9 @@ import org.h2.compress.CompressDeflate;
import org.h2.compress.CompressLZF; import org.h2.compress.CompressLZF;
import org.h2.compress.Compressor; import org.h2.compress.Compressor;
import org.h2.engine.Constants; import org.h2.engine.Constants;
import org.h2.message.DbException;
import org.h2.mvstore.cache.CacheLongKeyLIRS; import org.h2.mvstore.cache.CacheLongKeyLIRS;
import org.h2.util.MathUtils; import org.h2.util.MathUtils;
import static org.h2.mvstore.MVMap.INITIAL_VERSION;
import org.h2.util.Utils; import org.h2.util.Utils;
/* /*
...@@ -1374,10 +1381,11 @@ public class MVStore { ...@@ -1374,10 +1381,11 @@ public class MVStore {
return collector.getReferenced(); return collector.getReferenced();
} }
final ExecutorService executorService = Executors.newCachedThreadPool();
final class ChunkIdsCollector { final class ChunkIdsCollector {
private final Set<Integer> referencedChunks = new HashSet<>(); private final Set<Integer> referencedChunks = ConcurrentHashMap.newKeySet();
private final ChunkIdsCollector parent; private final ChunkIdsCollector parent;
private int mapId; private int mapId;
...@@ -1454,12 +1462,21 @@ public class MVStore { ...@@ -1454,12 +1462,21 @@ public class MVStore {
long filePos = chunk.block * BLOCK_SIZE; long filePos = chunk.block * BLOCK_SIZE;
filePos += DataUtils.getPageOffset(pos); filePos += DataUtils.getPageOffset(pos);
if (filePos < 0) { if (filePos < 0) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT,
"Negative position {0}; p={1}, c={2}", filePos, pos, chunk.toString()); "Negative position {0}; p={1}, c={2}", filePos, pos, chunk.toString());
} }
long maxPos = (chunk.block + chunk.len) * BLOCK_SIZE; long maxPos = (chunk.block + chunk.len) * BLOCK_SIZE;
Page.readChildrenPositions(fileStore, pos, filePos, maxPos, childCollector); final List<Future<?>> futures = Page.readChildrenPositions(fileStore, pos, filePos, maxPos,
childCollector, executorService);
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
} catch (ExecutionException ex) {
throw DbException.convert(ex);
}
}
} }
// and cache resulting set of chunk ids // and cache resulting set of chunk ids
if (cacheChunkRef != null) { if (cacheChunkRef != null) {
......
...@@ -5,15 +5,20 @@ ...@@ -5,15 +5,20 @@
*/ */
package org.h2.mvstore; package org.h2.mvstore;
import static org.h2.engine.Constants.MEMORY_ARRAY;
import static org.h2.engine.Constants.MEMORY_OBJECT;
import static org.h2.engine.Constants.MEMORY_POINTER;
import static org.h2.mvstore.DataUtils.PAGE_TYPE_LEAF;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import org.h2.compress.Compressor; import org.h2.compress.Compressor;
import org.h2.mvstore.type.DataType; import org.h2.mvstore.type.DataType;
import org.h2.util.Utils; import org.h2.util.Utils;
import static org.h2.engine.Constants.MEMORY_ARRAY;
import static org.h2.engine.Constants.MEMORY_OBJECT;
import static org.h2.engine.Constants.MEMORY_POINTER;
import static org.h2.mvstore.DataUtils.PAGE_TYPE_LEAF;
/** /**
* A page (a node or a leaf). * A page (a node or a leaf).
...@@ -247,9 +252,10 @@ public abstract class Page implements Cloneable ...@@ -247,9 +252,10 @@ public abstract class Page implements Cloneable
* @param maxPos the maximum position (the end of the chunk) * @param maxPos the maximum position (the end of the chunk)
* @param collector to report child pages positions to * @param collector to report child pages positions to
*/ */
static void readChildrenPositions(FileStore fileStore, long pos, static List<Future<?>> readChildrenPositions(FileStore fileStore, long pos,
long filePos, long maxPos, long filePos, long maxPos,
MVStore.ChunkIdsCollector collector) { MVStore.ChunkIdsCollector collector,
ExecutorService executorService) {
ByteBuffer buff; ByteBuffer buff;
int maxLength = DataUtils.getPageMaxLength(pos); int maxLength = DataUtils.getPageMaxLength(pos);
if (maxLength == DataUtils.PAGE_LARGE) { if (maxLength == DataUtils.PAGE_LARGE) {
...@@ -302,9 +308,18 @@ public abstract class Page implements Cloneable ...@@ -302,9 +308,18 @@ public abstract class Page implements Cloneable
DataUtils.ERROR_FILE_CORRUPT, DataUtils.ERROR_FILE_CORRUPT,
"Position {0} expected to be a non-leaf", pos); "Position {0} expected to be a non-leaf", pos);
} }
final List<Future<?>> futures = new ArrayList<>(len);
for (int i = 0; i <= len; i++) { for (int i = 0; i <= len; i++) {
collector.visit(buff.getLong()); final long childPagePos = buff.getLong();
Future<?> f = executorService.submit(new Runnable() {
@Override
public void run() {
collector.visit(childPagePos);
}
});
futures.add(f);
} }
return futures;
} }
/** /**
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论