提交 adec62cb authored 作者: Noel Grandin's avatar Noel Grandin

limit the size of the thread-pool

上级 1a79c0ac
...@@ -309,11 +309,6 @@ public class MVStore { ...@@ -309,11 +309,6 @@ public class MVStore {
private long lastFreeUnusedChunks; private long lastFreeUnusedChunks;
/**
* Service for executing multiple reads in parallel when doing garbage collection.
*/
final ExecutorService executorService;
/** /**
* Create and open the store. * Create and open the store.
* *
...@@ -364,8 +359,6 @@ public class MVStore { ...@@ -364,8 +359,6 @@ public class MVStore {
keysPerPage = DataUtils.getConfigParam(config, "keysPerPage", 48); keysPerPage = DataUtils.getConfigParam(config, "keysPerPage", 48);
backgroundExceptionHandler = backgroundExceptionHandler =
(UncaughtExceptionHandler)config.get("backgroundExceptionHandler"); (UncaughtExceptionHandler)config.get("backgroundExceptionHandler");
executorService = new ThreadPoolExecutor(0, 10, 10L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(keysPerPage + 1));
meta = new MVMap<>(this); meta = new MVMap<>(this);
meta.init(); meta.init();
if (this.fileStore != null) { if (this.fileStore != null) {
...@@ -952,7 +945,6 @@ public class MVStore { ...@@ -952,7 +945,6 @@ public class MVStore {
return; return;
} }
stopBackgroundThread(); stopBackgroundThread();
executorService.shutdownNow(); // no need to wait for reads
closed = true; closed = true;
storeLock.lock(); storeLock.lock();
try { try {
...@@ -1352,11 +1344,15 @@ public class MVStore { ...@@ -1352,11 +1344,15 @@ public class MVStore {
} }
private Set<Integer> collectReferencedChunks() { private Set<Integer> collectReferencedChunks() {
final ThreadPoolExecutor executorService = new ThreadPoolExecutor(10, 10, 10L, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(keysPerPage + 1));
final AtomicInteger executingThreadCounter = new AtomicInteger(0);
try {
ChunkIdsCollector collector = new ChunkIdsCollector(meta.getId()); ChunkIdsCollector collector = new ChunkIdsCollector(meta.getId());
Set<Long> inspectedRoots = new HashSet<>(); Set<Long> inspectedRoots = new HashSet<>();
long pos = lastChunk.metaRootPos; long pos = lastChunk.metaRootPos;
inspectedRoots.add(pos); inspectedRoots.add(pos);
collector.visit(pos); collector.visit(pos, executorService, executingThreadCounter);
long oldestVersionToKeep = getOldestVersionToKeep(); long oldestVersionToKeep = getOldestVersionToKeep();
MVMap.RootReference rootReference = meta.getRoot(); MVMap.RootReference rootReference = meta.getRoot();
do { do {
...@@ -1364,13 +1360,13 @@ public class MVStore { ...@@ -1364,13 +1360,13 @@ public class MVStore {
pos = rootPage.getPos(); pos = rootPage.getPos();
if (!rootPage.isSaved()) { if (!rootPage.isSaved()) {
collector.setMapId(meta.getId()); collector.setMapId(meta.getId());
collector.visit(rootPage); collector.visit(rootPage, executorService, executingThreadCounter);
} else if(inspectedRoots.add(pos)) { } else if (inspectedRoots.add(pos)) {
collector.setMapId(meta.getId()); collector.setMapId(meta.getId());
collector.visit(pos); collector.visit(pos, executorService, executingThreadCounter);
} }
for (Cursor<String, String> c = new Cursor<>(rootPage, "root."); c.hasNext(); ) { for (Cursor<String, String> c = new Cursor<>(rootPage, "root."); c.hasNext();) {
String key = c.next(); String key = c.next();
assert key != null; assert key != null;
if (!key.startsWith("root.")) { if (!key.startsWith("root.")) {
...@@ -1378,15 +1374,18 @@ public class MVStore { ...@@ -1378,15 +1374,18 @@ public class MVStore {
} }
pos = DataUtils.parseHexLong(c.getValue()); pos = DataUtils.parseHexLong(c.getValue());
if (DataUtils.isPageSaved(pos) && inspectedRoots.add(pos)) { if (DataUtils.isPageSaved(pos) && inspectedRoots.add(pos)) {
// to allow for something like "root.tmp.123" to be processed // to allow for something like "root.tmp.123" to be
// processed
int mapId = DataUtils.parseHexInt(key.substring(key.lastIndexOf('.') + 1)); int mapId = DataUtils.parseHexInt(key.substring(key.lastIndexOf('.') + 1));
collector.setMapId(mapId); collector.setMapId(mapId);
collector.visit(pos); collector.visit(pos, executorService, executingThreadCounter);
} }
} }
} while(rootReference.version >= oldestVersionToKeep && } while (rootReference.version >= oldestVersionToKeep && (rootReference = rootReference.previous) != null);
(rootReference = rootReference.previous) != null);
return collector.getReferenced(); return collector.getReferenced();
} finally {
executorService.shutdownNow();
}
} }
final class ChunkIdsCollector { final class ChunkIdsCollector {
...@@ -1417,7 +1416,7 @@ public class MVStore { ...@@ -1417,7 +1416,7 @@ public class MVStore {
return referencedChunks; return referencedChunks;
} }
public void visit(Page page) { public void visit(Page page, ThreadPoolExecutor executorService, AtomicInteger executingThreadCounter) {
long pos = page.getPos(); long pos = page.getPos();
if (DataUtils.isPageSaved(pos)) { if (DataUtils.isPageSaved(pos)) {
registerChunk(DataUtils.getPageChunkId(pos)); registerChunk(DataUtils.getPageChunkId(pos));
...@@ -1430,9 +1429,9 @@ public class MVStore { ...@@ -1430,9 +1429,9 @@ public class MVStore {
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
Page childPage = page.getChildPageIfLoaded(i); Page childPage = page.getChildPageIfLoaded(i);
if (childPage != null) { if (childPage != null) {
childCollector.visit(childPage); childCollector.visit(childPage, executorService, executingThreadCounter);
} else { } else {
childCollector.visit(page.getChildPagePos(i)); childCollector.visit(page.getChildPagePos(i), executorService, executingThreadCounter);
} }
} }
// and cache resulting set of chunk ids // and cache resulting set of chunk ids
...@@ -1442,7 +1441,7 @@ public class MVStore { ...@@ -1442,7 +1441,7 @@ public class MVStore {
} }
} }
public void visit(long pos) { public void visit(long pos, ThreadPoolExecutor executorService, AtomicInteger executingThreadCounter) {
if (!DataUtils.isPageSaved(pos)) { if (!DataUtils.isPageSaved(pos)) {
return; return;
} }
...@@ -1461,7 +1460,7 @@ public class MVStore { ...@@ -1461,7 +1460,7 @@ public class MVStore {
Page page; Page page;
if (cache != null && (page = cache.get(pos)) != null) { if (cache != null && (page = cache.get(pos)) != null) {
// there is a full page in cache, use it // there is a full page in cache, use it
childCollector.visit(page); childCollector.visit(page, executorService, executingThreadCounter);
} else { } else {
// page was not cached: read the data // page was not cached: read the data
Chunk chunk = getChunk(pos); Chunk chunk = getChunk(pos);
...@@ -1472,17 +1471,8 @@ public class MVStore { ...@@ -1472,17 +1471,8 @@ public class MVStore {
"Negative position {0}; p={1}, c={2}", filePos, pos, chunk.toString()); "Negative position {0}; p={1}, c={2}", filePos, pos, chunk.toString());
} }
long maxPos = (chunk.block + chunk.len) * BLOCK_SIZE; long maxPos = (chunk.block + chunk.len) * BLOCK_SIZE;
final List<Future<?>> futures = Page.readChildrenPositions(fileStore, pos, filePos, maxPos, Page.readChildrenPositions(fileStore, pos, filePos, maxPos,
childCollector, executorService); childCollector, executorService, executingThreadCounter);
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
} catch (ExecutionException ex) {
throw DbException.convert(ex);
}
}
} }
// and cache resulting set of chunk ids // and cache resulting set of chunk ids
if (cacheChunkRef != null) { if (cacheChunkRef != null) {
......
...@@ -13,10 +13,12 @@ import java.nio.ByteBuffer; ...@@ -13,10 +13,12 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.compress.Compressor; import org.h2.compress.Compressor;
import org.h2.message.DbException;
import org.h2.mvstore.type.DataType; import org.h2.mvstore.type.DataType;
import org.h2.util.Utils; import org.h2.util.Utils;
...@@ -252,10 +254,9 @@ public abstract class Page implements Cloneable ...@@ -252,10 +254,9 @@ public abstract class Page implements Cloneable
* @param maxPos the maximum position (the end of the chunk) * @param maxPos the maximum position (the end of the chunk)
* @param collector to report child pages positions to * @param collector to report child pages positions to
*/ */
static List<Future<?>> readChildrenPositions(FileStore fileStore, long pos, static void readChildrenPositions(FileStore fileStore, long pos, long filePos, long maxPos,
long filePos, long maxPos, final MVStore.ChunkIdsCollector collector, final ThreadPoolExecutor executorService,
final MVStore.ChunkIdsCollector collector, final AtomicInteger executingThreadCounter) {
ExecutorService executorService) {
ByteBuffer buff; ByteBuffer buff;
int maxLength = DataUtils.getPageMaxLength(pos); int maxLength = DataUtils.getPageMaxLength(pos);
if (maxLength == DataUtils.PAGE_LARGE) { if (maxLength == DataUtils.PAGE_LARGE) {
...@@ -266,10 +267,8 @@ public abstract class Page implements Cloneable ...@@ -266,10 +267,8 @@ public abstract class Page implements Cloneable
maxLength = (int) Math.min(maxPos - filePos, maxLength); maxLength = (int) Math.min(maxPos - filePos, maxLength);
int length = maxLength; int length = maxLength;
if (length < 0) { if (length < 0) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT, "Illegal page length {0} reading at {1}; max pos {2} ", length, filePos, maxPos);
"Illegal page length {0} reading at {1}; max pos {2} ",
length, filePos, maxPos);
} }
buff = fileStore.readFully(filePos, length); buff = fileStore.readFully(filePos, length);
int chunkId = DataUtils.getPageChunkId(pos); int chunkId = DataUtils.getPageChunkId(pos);
...@@ -277,49 +276,65 @@ public abstract class Page implements Cloneable ...@@ -277,49 +276,65 @@ public abstract class Page implements Cloneable
int start = buff.position(); int start = buff.position();
int pageLength = buff.getInt(); int pageLength = buff.getInt();
if (pageLength > maxLength) { if (pageLength > maxLength) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT, "File corrupted in chunk {0}, expected page length =< {1}, got {2}", chunkId, maxLength,
"File corrupted in chunk {0}, expected page length =< {1}, got {2}", pageLength);
chunkId, maxLength, pageLength);
} }
buff.limit(start + pageLength); buff.limit(start + pageLength);
short check = buff.getShort(); short check = buff.getShort();
int m = DataUtils.readVarInt(buff); int m = DataUtils.readVarInt(buff);
int mapId = collector.getMapId(); int mapId = collector.getMapId();
if (m != mapId) { if (m != mapId) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT, "File corrupted in chunk {0}, expected map id {1}, got {2}", chunkId, mapId, m);
"File corrupted in chunk {0}, expected map id {1}, got {2}",
chunkId, mapId, m);
} }
int checkTest = DataUtils.getCheckValue(chunkId) int checkTest = DataUtils.getCheckValue(chunkId) ^ DataUtils.getCheckValue(offset)
^ DataUtils.getCheckValue(offset)
^ DataUtils.getCheckValue(pageLength); ^ DataUtils.getCheckValue(pageLength);
if (check != (short) checkTest) { if (check != (short) checkTest) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT, "File corrupted in chunk {0}, expected check value {1}, got {2}", chunkId, checkTest, check);
"File corrupted in chunk {0}, expected check value {1}, got {2}",
chunkId, checkTest, check);
} }
int len = DataUtils.readVarInt(buff); int len = DataUtils.readVarInt(buff);
int type = buff.get(); int type = buff.get();
if ((type & 1) != DataUtils.PAGE_TYPE_NODE) { if ((type & 1) != DataUtils.PAGE_TYPE_NODE) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT,
"Position {0} expected to be a non-leaf", pos); "Position {0} expected to be a non-leaf", pos);
} }
/**
* The logic here is a little awkward. We want to (a) execute reads in parallel, but (b)
* limit the number of threads we create. This is complicated by (a) the algorithm is
* recursive and needs to wait for children before returning up the call-stack, (b) checking
* the size of the thread-pool is not reliable.
*/
final List<Future<?>> futures = new ArrayList<>(len); final List<Future<?>> futures = new ArrayList<>(len);
for (int i = 0; i <= len; i++) { for (int i = 0; i <= len; i++) {
final long childPagePos = buff.getLong(); final long childPagePos = buff.getLong();
if (executingThreadCounter.get() >= executorService.getMaximumPoolSize()) {
collector.visit(childPagePos, executorService, executingThreadCounter);
} else {
executingThreadCounter.incrementAndGet();
Future<?> f = executorService.submit(new Runnable() { Future<?> f = executorService.submit(new Runnable() {
@Override @Override
public void run() { public void run() {
collector.visit(childPagePos); try {
collector.visit(childPagePos, executorService, executingThreadCounter);
} finally {
executingThreadCounter.decrementAndGet();
}
} }
}); });
futures.add(f); futures.add(f);
} }
return futures; }
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
} catch (ExecutionException ex) {
throw DbException.convert(ex);
}
}
} }
/** /**
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论