提交 8ca496eb authored 作者: Andrei Tokar's avatar Andrei Tokar

refactoring, code-deduplication

上级 3b16f12f
...@@ -164,7 +164,8 @@ public class MVStore { ...@@ -164,7 +164,8 @@ public class MVStore {
private volatile boolean closed; private volatile boolean closed;
final FileStore fileStore; private final FileStore fileStore;
private final boolean fileStoreIsProvided; private final boolean fileStoreIsProvided;
private final int pageSplitSize; private final int pageSplitSize;
...@@ -970,31 +971,75 @@ public class MVStore { ...@@ -970,31 +971,75 @@ public class MVStore {
} }
} }
ByteBuffer readBufferForPage(long pos, int expectedMapId) {
Chunk c = getChunk(pos);
long filePos = c.block * BLOCK_SIZE;
filePos += DataUtils.getPageOffset(pos);
if (filePos < 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Negative position {0}; p={1}, c={2}", filePos, pos, c.toString());
}
long maxPos = (c.block + c.len) * BLOCK_SIZE;
ByteBuffer buff;
int maxLength = DataUtils.getPageMaxLength(pos);
if (maxLength == DataUtils.PAGE_LARGE) {
buff = fileStore.readFully(filePos, 128);
maxLength = buff.getInt();
// read the first bytes again
}
maxLength = (int) Math.min(maxPos - filePos, maxLength);
int length = maxLength;
if (length < 0) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"Illegal page length {0} reading at {1}; max pos {2} ", length, filePos, maxPos);
}
buff = fileStore.readFully(filePos, length);
int chunkId = DataUtils.getPageChunkId(pos);
int offset = DataUtils.getPageOffset(pos);
int start = buff.position();
int remaining = buff.remaining();
int pageLength = buff.getInt();
if (pageLength > remaining || pageLength < 4) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected page length 4..{1}, got {2}", chunkId, remaining,
pageLength);
}
buff.limit(start + pageLength);
short check = buff.getShort();
int mapId = DataUtils.readVarInt(buff);
if (mapId != expectedMapId) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected map id {1}, got {2}", chunkId, expectedMapId, mapId);
}
int checkTest = DataUtils.getCheckValue(chunkId)
^ DataUtils.getCheckValue(offset)
^ DataUtils.getCheckValue(pageLength);
if (check != (short) checkTest) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected check value {1}, got {2}", chunkId, checkTest, check);
}
return buff;
}
/** /**
* Get the chunk for the given position. * Get the chunk for the given position.
* *
* @param pos the position * @param pos the position
* @return the chunk * @return the chunk
*/ */
Chunk getChunk(long pos) { private Chunk getChunk(long pos) {
Chunk c = getChunkIfFound(pos);
if (c == null) {
int chunkId = DataUtils.getPageChunkId(pos);
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_CHUNK_NOT_FOUND,
"Chunk {0} not found", chunkId);
}
return c;
}
private Chunk getChunkIfFound(long pos) {
int chunkId = DataUtils.getPageChunkId(pos); int chunkId = DataUtils.getPageChunkId(pos);
Chunk c = chunks.get(chunkId); Chunk c = chunks.get(chunkId);
if (c == null) { if (c == null) {
checkOpen(); checkOpen();
String s = meta.get(Chunk.getMetaKey(chunkId)); String s = meta.get(Chunk.getMetaKey(chunkId));
if (s == null) { if (s == null) {
return null; throw DataUtils.newIllegalStateException(
DataUtils.ERROR_CHUNK_NOT_FOUND,
"Chunk {0} not found", chunkId);
} }
c = Chunk.fromString(s); c = Chunk.fromString(s);
if (c.block == Long.MAX_VALUE) { if (c.block == Long.MAX_VALUE) {
...@@ -1463,16 +1508,8 @@ public class MVStore { ...@@ -1463,16 +1508,8 @@ public class MVStore {
childCollector.visit(page, executorService, executingThreadCounter); childCollector.visit(page, executorService, executingThreadCounter);
} else { } else {
// page was not cached: read the data // page was not cached: read the data
Chunk chunk = getChunk(pos); ByteBuffer buff = readBufferForPage(pos, getMapId());
long filePos = chunk.block * BLOCK_SIZE; Page.readChildrenPositions(buff, pos, childCollector, executorService, executingThreadCounter);
filePos += DataUtils.getPageOffset(pos);
if (filePos < 0) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"Negative position {0}; p={1}, c={2}", filePos, pos, chunk.toString());
}
long maxPos = (chunk.block + chunk.len) * BLOCK_SIZE;
Page.readChildrenPositions(fileStore, pos, filePos, maxPos,
childCollector, executorService, executingThreadCounter);
} }
// and cache resulting set of chunk ids // and cache resulting set of chunk ids
if (cacheChunkRef != null) { if (cacheChunkRef != null) {
...@@ -2051,16 +2088,8 @@ public class MVStore { ...@@ -2051,16 +2088,8 @@ public class MVStore {
} }
Page p = cache == null ? null : cache.get(pos); Page p = cache == null ? null : cache.get(pos);
if (p == null) { if (p == null) {
Chunk c = getChunk(pos); ByteBuffer buff = readBufferForPage(pos, map.getId());
long filePos = c.block * BLOCK_SIZE; p = Page.read(buff, pos, map);
filePos += DataUtils.getPageOffset(pos);
if (filePos < 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Negative position {0}; p={1}, c={2}", filePos, pos, c.toString());
}
long maxPos = (c.block + c.len) * BLOCK_SIZE;
p = Page.read(fileStore, pos, map, filePos, maxPos);
cachePage(p); cachePage(p);
} }
return p; return p;
......
...@@ -219,37 +219,17 @@ public abstract class Page implements Cloneable ...@@ -219,37 +219,17 @@ public abstract class Page implements Cloneable
/** /**
* Read a page. * Read a page.
* *
* @param fileStore the file store * @param buff ByteBuffer containing serialized page info
* @param pos the position * @param pos the position
* @param map the map * @param map the map
* @param filePos the position in the file
* @param maxPos the maximum position (the end of the chunk)
* @return the page * @return the page
*/ */
static Page read(FileStore fileStore, long pos, MVMap<?, ?> map, static Page read(ByteBuffer buff, long pos, MVMap<?, ?> map) {
long filePos, long maxPos) {
ByteBuffer buff;
int maxLength = DataUtils.getPageMaxLength(pos);
if (maxLength == DataUtils.PAGE_LARGE) {
buff = fileStore.readFully(filePos, 128);
maxLength = buff.getInt();
// read the first bytes again
}
maxLength = (int) Math.min(maxPos - filePos, maxLength);
int length = maxLength;
if (length < 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Illegal page length {0} reading at {1}; max pos {2} ",
length, filePos, maxPos);
}
buff = fileStore.readFully(filePos, length);
boolean leaf = (DataUtils.getPageType(pos) & 1) == PAGE_TYPE_LEAF; boolean leaf = (DataUtils.getPageType(pos) & 1) == PAGE_TYPE_LEAF;
Page p = leaf ? new Leaf(map) : new NonLeaf(map); Page p = leaf ? new Leaf(map) : new NonLeaf(map);
p.pos = pos; p.pos = pos;
int chunkId = DataUtils.getPageChunkId(pos); int chunkId = DataUtils.getPageChunkId(pos);
int offset = DataUtils.getPageOffset(pos); p.read(buff, chunkId);
p.read(buff, chunkId, offset, maxLength);
return p; return p;
} }
...@@ -257,59 +237,23 @@ public abstract class Page implements Cloneable ...@@ -257,59 +237,23 @@ public abstract class Page implements Cloneable
* Read an inner node page from the buffer, but ignore the keys and * Read an inner node page from the buffer, but ignore the keys and
* values. * values.
* *
* @param fileStore the file store * @param buff ByteBuffer containing serialized page info
* @param pos the position * @param pos the position
* @param filePos the position in the file
* @param maxPos the maximum position (the end of the chunk)
* @param collector to report child pages positions to * @param collector to report child pages positions to
* @param executorService to use far parallel processing
* @param executingThreadCounter for parallel processing
*/ */
static void readChildrenPositions(FileStore fileStore, long pos, long filePos, long maxPos, static void readChildrenPositions(ByteBuffer buff, long pos,
final MVStore.ChunkIdsCollector collector, final ThreadPoolExecutor executorService, final MVStore.ChunkIdsCollector collector,
final ThreadPoolExecutor executorService,
final AtomicInteger executingThreadCounter) { final AtomicInteger executingThreadCounter) {
ByteBuffer buff;
int maxLength = DataUtils.getPageMaxLength(pos);
if (maxLength == DataUtils.PAGE_LARGE) {
buff = fileStore.readFully(filePos, 128);
maxLength = buff.getInt();
// read the first bytes again
}
maxLength = (int) Math.min(maxPos - filePos, maxLength);
int length = maxLength;
if (length < 0) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"Illegal page length {0} reading at {1}; max pos {2} ", length, filePos, maxPos);
}
buff = fileStore.readFully(filePos, length);
int chunkId = DataUtils.getPageChunkId(pos);
int offset = DataUtils.getPageOffset(pos);
int start = buff.position();
int pageLength = buff.getInt();
if (pageLength > maxLength) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected page length =< {1}, got {2}", chunkId, maxLength,
pageLength);
}
buff.limit(start + pageLength);
short check = buff.getShort();
int m = DataUtils.readVarInt(buff);
int mapId = collector.getMapId();
if (m != mapId) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected map id {1}, got {2}", chunkId, mapId, m);
}
int checkTest = DataUtils.getCheckValue(chunkId) ^ DataUtils.getCheckValue(offset)
^ DataUtils.getCheckValue(pageLength);
if (check != (short) checkTest) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected check value {1}, got {2}", chunkId, checkTest, check);
}
int len = DataUtils.readVarInt(buff); int len = DataUtils.readVarInt(buff);
int type = buff.get(); int type = buff.get();
if ((type & 1) != DataUtils.PAGE_TYPE_NODE) { if ((type & 1) != DataUtils.PAGE_TYPE_NODE) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT, throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"Position {0} expected to be a non-leaf", pos); "Position {0} expected to be a non-leaf", pos);
} }
/** /*
* The logic here is a little awkward. We want to (a) execute reads in parallel, but (b) * The logic here is a little awkward. We want to (a) execute reads in parallel, but (b)
* limit the number of threads we create. This is complicated by (a) the algorithm is * limit the number of threads we create. This is complicated by (a) the algorithm is
* recursive and needs to wait for children before returning up the call-stack, (b) checking * recursive and needs to wait for children before returning up the call-stack, (b) checking
...@@ -505,7 +449,7 @@ public abstract class Page implements Cloneable ...@@ -505,7 +449,7 @@ public abstract class Page implements Cloneable
* @param key the key * @param key the key
* @return the value or null * @return the value or null
*/ */
public int binarySearch(Object key) { int binarySearch(Object key) {
int low = 0, high = keys.length - 1; int low = 0, high = keys.length - 1;
// the cached index minus one, so that // the cached index minus one, so that
// for the first time (when cachedCompare is 0), // for the first time (when cachedCompare is 0),
...@@ -658,36 +602,9 @@ public abstract class Page implements Cloneable ...@@ -658,36 +602,9 @@ public abstract class Page implements Cloneable
* *
* @param buff the buffer * @param buff the buffer
* @param chunkId the chunk id * @param chunkId the chunk id
* @param offset the offset within the chunk
* @param maxLength the maximum length
*/ */
private void read(ByteBuffer buff, int chunkId, int offset, int maxLength) { private void read(ByteBuffer buff, int chunkId) {
int start = buff.position(); int pageLength = buff.remaining() + 4; // size of int, since we've read page length already
int pageLength = buff.getInt();
if (pageLength > maxLength || pageLength < 4) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected page length 4..{1}, got {2}",
chunkId, maxLength, pageLength);
}
buff.limit(start + pageLength);
short check = buff.getShort();
int mapId = DataUtils.readVarInt(buff);
if (mapId != map.getId()) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected map id {1}, got {2}",
chunkId, map.getId(), mapId);
}
int checkTest = DataUtils.getCheckValue(chunkId)
^ DataUtils.getCheckValue(offset)
^ DataUtils.getCheckValue(pageLength);
if (check != (short) checkTest) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected check value {1}, got {2}",
chunkId, checkTest, check);
}
int len = DataUtils.readVarInt(buff); int len = DataUtils.readVarInt(buff);
keys = new Object[len]; keys = new Object[len];
int type = buff.get(); int type = buff.get();
...@@ -710,7 +627,7 @@ public abstract class Page implements Cloneable ...@@ -710,7 +627,7 @@ public abstract class Page implements Cloneable
compressor = map.getStore().getCompressorFast(); compressor = map.getStore().getCompressorFast();
} }
int lenAdd = DataUtils.readVarInt(buff); int lenAdd = DataUtils.readVarInt(buff);
int compLen = pageLength + start - buff.position(); int compLen = buff.remaining();
byte[] comp = Utils.newBytes(compLen); byte[] comp = Utils.newBytes(compLen);
buff.get(comp); buff.get(comp);
int l = compLen + lenAdd; int l = compLen + lenAdd;
...@@ -722,7 +639,7 @@ public abstract class Page implements Cloneable ...@@ -722,7 +639,7 @@ public abstract class Page implements Cloneable
if (isLeaf()) { if (isLeaf()) {
readPayLoad(buff); readPayLoad(buff);
} }
diskSpaceUsed = maxLength; diskSpaceUsed = pageLength;
recalculateMemory(); recalculateMemory();
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论