提交 2ed0a04b authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: the file format was changed slightly.

上级 b9633b12
......@@ -82,6 +82,12 @@ public class Chunk {
* The last used map id.
*/
public int mapId;
/**
* The predicted position of the next chunk.
*/
public long next;
public long nextSize;
Chunk(int id) {
this.id = id;
......@@ -94,7 +100,7 @@ public class Chunk {
* @param start the start of the chunk in the file
* @return the chunk
*/
static Chunk fromHeader(ByteBuffer buff, long start) {
static Chunk readChunkHeader(ByteBuffer buff, long start) {
int pos = buff.position();
byte[] data = new byte[Math.min(buff.remaining(), MAX_HEADER_LENGTH)];
buff.get(data);
......@@ -120,7 +126,7 @@ public class Chunk {
*
* @param buff the target buffer
*/
void writeHeader(WriteBuffer buff, int minLength) {
void writeChunkHeader(WriteBuffer buff, int minLength) {
long pos = buff.position();
buff.put(asString().getBytes(DataUtils.LATIN));
while (buff.position() - pos < minLength - 1) {
......@@ -152,7 +158,8 @@ public class Chunk {
c.maxLenLive = DataUtils.readHexLong(map, "liveMax", c.maxLength);
c.metaRootPos = DataUtils.readHexLong(map, "root", 0);
c.time = DataUtils.readHexLong(map, "time", 0);
c.version = DataUtils.readHexLong(map, "version", 0);
c.version = DataUtils.readHexLong(map, "version", id);
c.next = DataUtils.readHexLong(map, "next", 0);
return c;
}
......@@ -188,12 +195,34 @@ public class Chunk {
}
DataUtils.appendMap(buff, "map", mapId);
DataUtils.appendMap(buff, "max", maxLength);
if (next != 0) {
DataUtils.appendMap(buff, "next", next);
}
DataUtils.appendMap(buff, "pages", pageCount);
DataUtils.appendMap(buff, "root", metaRootPos);
DataUtils.appendMap(buff, "time", time);
DataUtils.appendMap(buff, "version", version);
if (version != id) {
DataUtils.appendMap(buff, "version", version);
}
return buff.toString();
}
byte[] getFooterBytes() {
StringBuilder buff = new StringBuilder();
DataUtils.appendMap(buff, "chunk", id);
DataUtils.appendMap(buff, "block", block);
if (version != id) {
DataUtils.appendMap(buff, "version", version);
}
byte[] bytes = buff.toString().getBytes(DataUtils.LATIN);
int checksum = DataUtils.getFletcher32(bytes, bytes.length / 2 * 2);
DataUtils.appendMap(buff, "fletcher", checksum);
while (buff.length() < MVStore.CHUNK_FOOTER_LENGTH - 1) {
buff.append(' ');
}
buff.append("\n");
return buff.toString().getBytes(DataUtils.LATIN);
}
@Override
public String toString() {
......
......@@ -50,6 +50,8 @@ Documentation
- better document that writes are in background thread
- better document how to do non-unique indexes
- document pluggable store and OffHeapStore
- document file format
- the chunk id is normally the version
MVTableEngine:
- verify that tests don't use the PageStore
......@@ -64,10 +66,12 @@ TransactionStore:
MVStore:
- maybe reduce size of store header
- maybe let a chunk point to a list of potential next chunks
(so no fixed location header is needed), similar to a skip list
(options: just after the current chunk, list of next)
- document and review the file format
- the newest chunk is the one with the newest version
(not necessarily the one with the highest chunk id)
- automated 'kill process' and 'power failure' test
- update checkstyle
......@@ -166,8 +170,6 @@ public class MVStore {
private final int pageSplitSize;
private long lastChunkBlock;
/**
* The page cache. The default size is 16 MB, and the average size is 2 KB.
* It is split in 16 segments. The stack move distance is 2% of the expected
......@@ -175,7 +177,10 @@ public class MVStore {
*/
private CacheLongKeyLIRS<Page> cache;
private int lastChunkId;
/**
* The newest chunk. If nothing was stored yet, this field is not set.
*/
private Chunk lastChunk;
/**
* The map of chunks.
......@@ -219,7 +224,7 @@ public class MVStore {
private long currentVersion;
/**
* The version of the last stored chunk.
* The version of the last stored chunk, or -1 if nothing was stored so far.
*/
private long lastStoredVersion;
......@@ -323,24 +328,6 @@ public class MVStore {
writeFileHeader();
} else {
readFileHeader();
long format = DataUtils.readHexLong(fileHeader, "format", 1);
if (format > FORMAT_WRITE && !fileStore.isReadOnly()) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_UNSUPPORTED_FORMAT,
"The write format {0} is larger than the supported format {1}, " +
"and the file was not opened in read-only mode",
format, FORMAT_WRITE);
}
format = DataUtils.readHexLong(fileHeader, "formatRead", format);
if (format > FORMAT_READ) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_UNSUPPORTED_FORMAT,
"The read format {0} is larger than the supported format {1}",
format, FORMAT_READ);
}
if (lastChunkBlock > 0) {
readMeta();
}
}
} catch (IllegalStateException e) {
try {
......@@ -510,13 +497,12 @@ public class MVStore {
}
private Chunk getChunkForVersion(long version) {
for (int chunkId = lastChunkId;; chunkId--) {
Chunk x = chunks.get(chunkId);
if (x == null) {
return null;
} else if (x.version <= version) {
return x;
Chunk c = lastChunk;
while (true) {
if (c == null || c.version <= version) {
return c;
}
c = chunks.get(c.id - 1);
}
}
......@@ -536,56 +522,11 @@ public class MVStore {
metaChanged = true;
}
private synchronized void readMeta() {
chunks.clear();
Chunk header = readChunkHeader(lastChunkBlock);
if (header.block == Long.MAX_VALUE) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Chunk {0} is invalid", header.id);
}
lastChunkId = header.id;
lastMapId = header.mapId;
currentVersion = header.version;
setWriteVersion(currentVersion);
chunks.put(header.id, header);
meta.setRootPos(header.metaRootPos, -1);
chunks.put(header.id, header);
// we can load the chunk in any order,
// because loading chunk metadata
// might recursively load another chunk
for (Iterator<String> it = meta.keyIterator("chunk."); it.hasNext();) {
String s = it.next();
if (!s.startsWith("chunk.")) {
break;
}
s = meta.get(s);
Chunk c = Chunk.fromString(s);
if (!chunks.containsKey(c.id)) {
if (c.block == Long.MAX_VALUE) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Chunk {0} is invalid", c.id);
}
chunks.put(c.id, c);
}
}
// build the free space list
for (Chunk c : chunks.values()) {
if (c.pageCountLive == 0) {
// remove this chunk in the next save operation
registerFreePage(currentVersion, c.id, 0, 0);
}
long start = c.block * BLOCK_SIZE;
int length = c.len * BLOCK_SIZE;
fileStore.markUsed(start, length);
}
}
private void readFileHeader() {
private synchronized void readFileHeader() {
boolean validHeader = false;
// we don't know yet which chunk is the newest
long newestChunk = -1;
// we don't know yet which chunk and version are the newest
long newestVersion = -1;
long newestChunkBlock = -1;
// read the last block of the file, and then the two first blocks
ByteBuffer fileHeaderBlocks = fileStore.readFully(0, 2 * BLOCK_SIZE);
byte[] buff = new byte[BLOCK_SIZE];
......@@ -603,11 +544,12 @@ public class MVStore {
if (check != checksum) {
continue;
}
long chunk = DataUtils.readHexLong(m, "chunk", 0);
if (chunk > newestChunk) {
newestChunk = chunk;
int chunk = DataUtils.readHexInt(m, "chunk", 0);
long version = DataUtils.readHexLong(m, "version", chunk);
if (version > newestVersion) {
newestVersion = version;
fileHeader.putAll(m);
lastChunkBlock = DataUtils.readHexLong(m, "block", 0);
newestChunkBlock = DataUtils.readHexLong(m, "block", 0);
creationTime = DataUtils.readHexLong(m, "created", 0);
validHeader = true;
}
......@@ -620,6 +562,23 @@ public class MVStore {
DataUtils.ERROR_FILE_CORRUPT,
"Store header is corrupt: {0}", fileStore);
}
long format = DataUtils.readHexLong(fileHeader, "format", 1);
if (format > FORMAT_WRITE && !fileStore.isReadOnly()) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_UNSUPPORTED_FORMAT,
"The write format {0} is larger than the supported format {1}, " +
"and the file was not opened in read-only mode",
format, FORMAT_WRITE);
}
format = DataUtils.readHexLong(fileHeader, "formatRead", format);
if (format > FORMAT_READ) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_UNSUPPORTED_FORMAT,
"The read format {0} is larger than the supported format {1}",
format, FORMAT_READ);
}
lastStoredVersion = -1;
ByteBuffer lastBlock = fileStore.readFully(
fileStore.size() - CHUNK_FOOTER_LENGTH, CHUNK_FOOTER_LENGTH);
buff = new byte[CHUNK_FOOTER_LENGTH];
......@@ -634,23 +593,79 @@ public class MVStore {
byte[] bytes = s.getBytes(DataUtils.LATIN);
int checksum = DataUtils.getFletcher32(bytes, bytes.length / 2 * 2);
if (check == checksum) {
long chunk = DataUtils.readHexLong(m, "chunk", 0);
if (chunk > newestChunk) {
int chunk = DataUtils.readHexInt(m, "chunk", 0);
long version = DataUtils.readHexLong(m, "version", chunk);
if (version > newestVersion) {
fileHeader.putAll(m);
lastChunkBlock = DataUtils.readHexLong(m, "block", 0);
newestVersion = version;
newestChunkBlock = DataUtils.readHexLong(m, "block", 0);
validHeader = true;
}
}
} catch (Exception e) {
// ignore
}
lastStoredVersion = -1;
}
if (newestChunkBlock > 0) {
readMeta(newestChunkBlock);
}
}
private void readMeta(long chunkBlock) {
chunks.clear();
Chunk header = readChunkHeader(chunkBlock);
if (header.block == Long.MAX_VALUE) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Chunk {0} is invalid", header.id);
}
lastChunk = header;
lastMapId = header.mapId;
currentVersion = header.version;
setWriteVersion(currentVersion);
chunks.put(header.id, header);
meta.setRootPos(header.metaRootPos, -1);
// we can load the chunk in any order,
// because loading chunk metadata
// might recursively load another chunk
for (Iterator<String> it = meta.keyIterator("chunk."); it.hasNext();) {
String s = it.next();
if (!s.startsWith("chunk.")) {
break;
}
s = meta.get(s);
Chunk c = Chunk.fromString(s);
if (!chunks.containsKey(c.id)) {
if (c.block == Long.MAX_VALUE) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Chunk {0} is invalid", c.id);
}
chunks.put(c.id, c);
}
}
// build the free space list
for (Chunk c : chunks.values()) {
if (c.pageCountLive == 0) {
// remove this chunk in the next save operation
registerFreePage(currentVersion, c.id, 0, 0);
}
long start = c.block * BLOCK_SIZE;
int length = c.len * BLOCK_SIZE;
fileStore.markUsed(start, length);
}
}
private byte[] getFileHeaderBytes() {
StringBuilder buff = new StringBuilder("H:2");
fileHeader.put("block", lastChunkBlock);
fileHeader.put("chunk", lastChunkId);
if (lastChunk != null) {
fileHeader.put("chunk", lastChunk.id);
if (lastChunk.version != lastChunk.id) {
fileHeader.put("version", lastChunk.version);
} else {
fileHeader.remove("version");
}
fileHeader.put("block", lastChunk.block);
}
DataUtils.appendMap(buff, fileHeader);
byte[] bytes = buff.toString().getBytes(DataUtils.LATIN);
int checksum = DataUtils.getFletcher32(bytes, bytes.length / 2 * 2);
......@@ -659,22 +674,6 @@ public class MVStore {
return buff.toString().getBytes(DataUtils.LATIN);
}
private byte[] getChunkFooterBytes() {
StringBuilder buff = new StringBuilder();
fileHeader.put("chunk", lastChunkId);
fileHeader.put("block", lastChunkBlock);
DataUtils.appendMap(buff, "chunk", lastChunkId);
DataUtils.appendMap(buff, "block", lastChunkBlock);
byte[] bytes = buff.toString().getBytes(DataUtils.LATIN);
int checksum = DataUtils.getFletcher32(bytes, bytes.length / 2 * 2);
DataUtils.appendMap(buff, "fletcher", checksum);
while (buff.length() < CHUNK_FOOTER_LENGTH - 1) {
buff.append(' ');
}
buff.append("\n");
return buff.toString().getBytes(DataUtils.LATIN);
}
private void writeFileHeader() {
byte[] bytes = getFileHeaderBytes();
ByteBuffer header = ByteBuffer.allocate(2 * BLOCK_SIZE);
......@@ -872,14 +871,18 @@ public class MVStore {
// the last chunk was not stored before and needs to be set now (it's
// better not to update right after storing, because that would modify
// the meta map again)
Chunk lastChunk = chunks.get(lastChunkId);
if (lastChunk != null) {
meta.put(Chunk.getMetaKey(lastChunk.id), lastChunk.asString());
int lastChunkId;
if (lastChunk == null) {
lastChunkId = 0;
} else {
lastChunkId = lastChunk.id;
meta.put(Chunk.getMetaKey(lastChunkId), lastChunk.asString());
// never go backward in time
time = Math.max(lastChunk.time, time);
}
Chunk c;
c = new Chunk(++lastChunkId);
c = new Chunk(lastChunkId + 1);
c.pageCount = Integer.MAX_VALUE;
c.pageCountLive = Integer.MAX_VALUE;
c.maxLength = Long.MAX_VALUE;
......@@ -890,6 +893,7 @@ public class MVStore {
c.time = time;
c.version = version;
c.mapId = lastMapId;
c.next = Long.MAX_VALUE;
chunks.put(c.id, c);
// force a metadata update
meta.put(Chunk.getMetaKey(c.id), c.asString());
......@@ -923,7 +927,7 @@ public class MVStore {
Set<Chunk> removedChunks = applyFreedSpace(storeVersion, time);
WriteBuffer buff = getWriteBuffer();
// need to patch the header later
c.writeHeader(buff, 0);
c.writeChunkHeader(buff, 0);
int headerLength = buff.position();
c.pageCount = 0;
c.pageCountLive = 0;
......@@ -950,13 +954,6 @@ public class MVStore {
CHUNK_FOOTER_LENGTH, BLOCK_SIZE);
buff.limit(length);
// free up the space of unused chunks now
for (Chunk x : removedChunks) {
long start = x.block * BLOCK_SIZE;
int len = x.len * BLOCK_SIZE;
fileStore.free(start, len);
}
// the length of the file that is still in use
// (not necessarily the end of the file)
long end = getFileLengthInUse();
......@@ -965,29 +962,85 @@ public class MVStore {
filePos = fileStore.allocate(length);
} else {
filePos = end;
fileStore.markUsed(end, length);
}
// end is not necessarily the end of the file
boolean storeAtEndOfFile = filePos + length >= fileStore.size();
// free up the space of unused chunks now
// (after allocating space for this chunk, so that
// no data is lost if writing this chunk fails)
for (Chunk x : removedChunks) {
long start = x.block * BLOCK_SIZE;
int len = x.len * BLOCK_SIZE;
fileStore.free(start, len);
}
if (!reuseSpace) {
// we can not mark it earlier, because it
// might have been allocated by one of the
// removed chunks
fileStore.markUsed(end, length);
}
c.block = filePos / BLOCK_SIZE;
c.len = length / BLOCK_SIZE;
c.metaRootPos = metaRoot.getPos();
// calculate and set the likely next position
if (reuseSpace) {
int predictBlocks = c.len;
c.nextSize = predictBlocks;
long predictedNextStart = fileStore.allocate(predictBlocks * BLOCK_SIZE);
fileStore.free(predictedNextStart, predictBlocks * BLOCK_SIZE);
c.next = predictedNextStart / BLOCK_SIZE;
if (c.next == c.block + c.len) {
// just after this chunk
c.next = 0;
}
} else {
// just after this chunk
c.next = 0;
}
buff.position(0);
c.writeHeader(buff, headerLength);
lastChunkBlock = filePos / BLOCK_SIZE;
c.writeChunkHeader(buff, headerLength);
revertTemp(storeVersion);
buff.position(buff.limit() - CHUNK_FOOTER_LENGTH);
buff.put(getChunkFooterBytes());
buff.put(c.getFooterBytes());
buff.position(0);
write(filePos, buff.getBuffer());
releaseWriteBuffer(buff);
// overwrite the header if required
// whether to overwrite the file header
boolean needHeader = false;
// whether to reset the list of next chunks
boolean resetNextChain = false;
if (!storeAtEndOfFile) {
if (lastChunk == null) {
needHeader = true;
} else if (lastChunk.block + lastChunk.len == c.block) {
// just after the last chunk
} else if (lastChunk.next == c.block) {
// the last matched
} else {
needHeader = true;
}
; // TODO check the length of the list - if too large
// write anyway (lets assume 11 is too large)
; // TODO verify that no chunk between the last known one
// and the current chunk were removed
; // TODO remove this once we have implemented
// reading and once we have tests
needHeader = true;
}
lastChunk = c;
if (needHeader) {
writeFileHeader();
}
if (!storeAtEndOfFile) {
// may only shrink after the file header was written
shrinkFileIfPossible(1);
}
......@@ -1187,7 +1240,7 @@ public class MVStore {
private Chunk readChunkHeader(long block) {
long p = block * BLOCK_SIZE;
ByteBuffer buff = fileStore.readFully(p, Chunk.MAX_HEADER_LENGTH);
return Chunk.fromHeader(buff, p);
return Chunk.readChunkHeader(buff, p);
}
/**
......@@ -1200,7 +1253,7 @@ public class MVStore {
*/
public synchronized boolean compactMoveChunks() {
checkOpen();
if (chunks.size() == 0) {
if (lastChunk == null) {
// nothing to do
return false;
}
......@@ -1239,7 +1292,7 @@ public class MVStore {
int length = c.len * BLOCK_SIZE;
buff.limit(length);
ByteBuffer readBuff = fileStore.readFully(start, length);
Chunk.fromHeader(readBuff, start);
Chunk.readChunkHeader(readBuff, start);
int chunkHeaderLen = readBuff.position();
buff.position(chunkHeaderLen);
buff.put(readBuff);
......@@ -1247,10 +1300,11 @@ public class MVStore {
fileStore.markUsed(end, length);
fileStore.free(start, length);
c.block = end / BLOCK_SIZE;
c.next = 0;
buff.position(0);
c.writeHeader(buff, chunkHeaderLen);
c.writeChunkHeader(buff, chunkHeaderLen);
buff.position(length - CHUNK_FOOTER_LENGTH);
buff.put(getChunkFooterBytes());
buff.put(lastChunk.getFooterBytes());
buff.position(0);
write(end, buff.getBuffer());
releaseWriteBuffer(buff);
......@@ -1278,7 +1332,7 @@ public class MVStore {
int length = c.len * BLOCK_SIZE;
buff.limit(length);
ByteBuffer readBuff = fileStore.readFully(start, length);
Chunk.fromHeader(readBuff, 0);
Chunk.readChunkHeader(readBuff, 0);
int chunkHeaderLen = readBuff.position();
buff.position(chunkHeaderLen);
buff.put(readBuff);
......@@ -1286,9 +1340,9 @@ public class MVStore {
fileStore.free(start, length);
buff.position(0);
c.block = pos / BLOCK_SIZE;
c.writeHeader(buff, chunkHeaderLen);
c.writeChunkHeader(buff, chunkHeaderLen);
buff.position(length - CHUNK_FOOTER_LENGTH);
buff.put(getChunkFooterBytes());
buff.put(lastChunk.getFooterBytes());
buff.position(0);
write(pos, buff.getBuffer());
releaseWriteBuffer(buff);
......@@ -1316,50 +1370,57 @@ public class MVStore {
}
/**
* Try to reduce the file size by re-writing partially full chunks. Chunks
* with a low number of live items are re-written. If the current fill rate
* is higher than the target fill rate, nothing is done.
* Try to increase the fill rate by re-writing partially full chunks. Chunks
* with a low number of live items are re-written.
* <p>
* If the current fill rate is higher than the target fill rate, nothing is
* done. If not at least a minimum amount of space can be saved, nothing is
* done.
* <p>
* Please note this method will not necessarily reduce the file size, as
* empty chunks are not overwritten.
* <p>
* Only data of open maps can be moved. For maps that are not open, the old
* chunk is still referenced. Therefore, it is recommended to open all maps
* before calling this method.
*
* @param fillRate the minimum percentage of live entries
* @return if anything was written
*
* @param targetFillRate the minimum percentage of live entries
* @param minSaving the minimum amount of saved space
* @return if a chunk was re-written
*/
public synchronized boolean compact(int fillRate) {
public synchronized boolean compact(int targetFillRate, int minSaving) {
checkOpen();
if (chunks.size() == 0) {
if (lastChunk == null) {
// nothing to do
return false;
}
long maxLengthSum = 0, maxLengthLiveSum = 0;
// calculate the fill rate
long maxLengthSum = 0;
long maxLengthLiveSum = 0;
for (Chunk c : chunks.values()) {
maxLengthSum += c.maxLength;
maxLengthLiveSum += c.maxLenLive;
}
// the fill rate of all chunks combined
if (maxLengthSum <= 0) {
// avoid division by 0
maxLengthSum = 1;
}
// the fill rate of all chunks combined
int totalChunkFillRate = (int) (100 * maxLengthLiveSum / maxLengthSum);
if (totalChunkFillRate > fillRate) {
int fillRate = (int) (100 * maxLengthLiveSum / maxLengthSum);
if (fillRate >= targetFillRate) {
return false;
}
// calculate the average max length
int averageMaxLength = (int) (maxLengthSum / chunks.size());
long time = getTime();
// the 'old' list contains the chunks we want to free up
ArrayList<Chunk> old = New.arrayList();
Chunk last = chunks.get(lastChunk.id);
for (Chunk c : chunks.values()) {
if (canOverwriteChunk(c, time)) {
int age = lastChunkId - c.id + 1;
c.collectPriority = c.getFillRate() / age;
long age = last.version - c.version + 1;
c.collectPriority = (int) (c.getFillRate() / age);
old.add(c);
}
}
......@@ -1377,16 +1438,21 @@ public class MVStore {
});
// find out up to were in the old list we need to move
// try to move one (average sized) chunk
long moved = 0;
long saved = 0;
Chunk move = null;
for (Chunk c : old) {
if (move != null && moved + c.maxLenLive > averageMaxLength) {
break;
long save = c.maxLength - c.maxLenLive;
if (move != null) {
if (saved > minSaving) {
break;
}
}
moved += c.maxLenLive;
saved += save;
move = c;
}
if (saved < minSaving) {
return false;
}
// remove the chunks we want to keep from this list
boolean remove = false;
......@@ -1412,7 +1478,7 @@ public class MVStore {
long start = chunk.block * BLOCK_SIZE;
int length = chunk.len * BLOCK_SIZE;
ByteBuffer buff = fileStore.readFully(start, length);
Chunk.fromHeader(buff, start);
Chunk.readChunkHeader(buff, start);
int pagesRemaining = chunk.pageCount;
markMetaChanged();
while (pagesRemaining-- > 0) {
......@@ -1783,27 +1849,27 @@ public class MVStore {
boolean loadFromFile = false;
// get the largest chunk with a version
// higher or equal the requested version
int removeChunksNewerThan = -1;
for (int chunkId = lastChunkId;; chunkId--) {
Chunk x = chunks.get(chunkId);
if (x == null) {
Chunk removeChunksNewerThan = null;
Chunk c = lastChunk;
while (true) {
if (c == null || c.version < version) {
break;
} else if (x.version >= version) {
removeChunksNewerThan = x.id;
}
}
if (removeChunksNewerThan >= 0 && lastChunkId > removeChunksNewerThan) {
removeChunksNewerThan = c;
c = chunks.get(c.id - 1);
}
Chunk last = lastChunk;
if (removeChunksNewerThan != null && last.version > removeChunksNewerThan.version) {
revertTemp(version);
loadFromFile = true;
Chunk last = null;
while (true) {
last = chunks.get(lastChunkId);
last = lastChunk;
if (last == null) {
break;
} else if (last.id <= removeChunksNewerThan) {
} else if (last.version <= removeChunksNewerThan.version) {
break;
}
chunks.remove(lastChunkId);
chunks.remove(lastChunk.id);
long start = last.block * BLOCK_SIZE;
int length = last.len * BLOCK_SIZE;
fileStore.free(start, length);
......@@ -1815,12 +1881,10 @@ public class MVStore {
Arrays.fill(buff.getBuffer().array(), (byte) 0);
write(start, buff.getBuffer());
releaseWriteBuffer(buff);
lastChunkId--;
lastChunk = chunks.get(lastChunk.id - 1);
}
lastChunkBlock = last.block;
writeFileHeader();
readFileHeader();
readMeta();
}
for (MVMap<?, ?> m : New.arrayList(maps.values())) {
int id = m.getId();
......@@ -1835,9 +1899,11 @@ public class MVStore {
}
// rollback might have rolled back the stored chunk metadata as well
Chunk c = chunks.get(lastChunkId - 1);
if (c != null) {
meta.put(Chunk.getMetaKey(c.id), c.asString());
if (lastChunk != null) {
c = chunks.get(lastChunk.id - 1);
if (c != null) {
meta.put(Chunk.getMetaKey(c.id), c.asString());
}
}
currentVersion = version;
setWriteVersion(version);
......
......@@ -85,7 +85,7 @@ public class MVStoreTool {
continue;
}
block.position(0);
Chunk c = Chunk.fromHeader(block, pos);
Chunk c = Chunk.readChunkHeader(block, pos);
int length = c.len * MVStore.BLOCK_SIZE;
pw.println(" " + c.toString());
ByteBuffer chunk = ByteBuffer.allocate(length);
......
......@@ -50,8 +50,7 @@ public class OffHeapStore extends FileStore {
freeSpace.free(pos, length);
ByteBuffer buff = memory.remove(pos);
if (buff == null) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_READING_FAILED,
"Could not find entry at position {0}", pos);
// nothing was written (just allocated)
} else if (buff.remaining() != length) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_READING_FAILED,
"Partial remove is not supported at position {0}", pos);
......@@ -105,6 +104,7 @@ public class OffHeapStore extends FileStore {
memory.clear();
return;
}
fileSize = size;
for (Iterator<Long> it = memory.keySet().iterator(); it.hasNext();) {
long pos = it.next();
if (pos < size) {
......
......@@ -187,7 +187,7 @@ public class Page {
int length = maxLength;
if (length < 0) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
"Illegal page length {0} reading at {1}; file size {1} ", length, filePos, fileSize);
"Illegal page length {0} reading at {1}; file size {2} ", length, filePos, fileSize);
}
buff = fileStore.readFully(filePos, length);
Page p = new Page(map, 0);
......
......@@ -182,7 +182,7 @@ public class MVTableEngine implements TableEngine {
if (s == null || s.isReadOnly()) {
return;
}
if (!store.compact(50)) {
if (!store.compact(50, 1024 * 1024)) {
store.commit();
}
}
......@@ -288,7 +288,7 @@ public class MVTableEngine implements TableEngine {
public void compactFile(long maxCompactTime) {
store.setRetentionTime(0);
long start = System.currentTimeMillis();
while (store.compact(99)) {
while (store.compact(99, 16 * 1024)) {
store.sync();
long time = System.currentTimeMillis() - start;
if (time > maxCompactTime) {
......@@ -311,7 +311,7 @@ public class MVTableEngine implements TableEngine {
if (!store.getFileStore().isReadOnly()) {
transactionStore.close();
long start = System.currentTimeMillis();
while (store.compact(90)) {
while (store.compact(90, 32 * 1024)) {
long time = System.currentTimeMillis() - start;
if (time > maxCompactTime) {
break;
......
......@@ -69,8 +69,8 @@ public class TestBenchmark extends TestBase {
for (int i = 0; i < rowCount; i++) {
prep.setInt(1, i);
prep.setInt(2, i);
// prep.setInt(2, r.nextInt());
// prep.setInt(2, i);
prep.setInt(2, r.nextInt());
prep.execute();
if (i % 10000 == 0) {
conn.commit();
......
......@@ -109,7 +109,7 @@ public class TestKillProcessWhileWriting extends TestBase {
s.commit();
break;
case 7:
s.compact(80);
s.compact(80, 1024);
break;
case 8:
m.clear();
......
......@@ -192,7 +192,7 @@ public class TestMVStore extends TestBase {
map.put(i, "Hello " + i);
s.commit();
}
assertTrue(1000 < offHeap.getWriteCount());
assertTrue(offHeap.getWriteCount() > 1000);
s.close();
s = new MVStore.Builder().
......@@ -212,6 +212,7 @@ public class TestMVStore extends TestBase {
encryptionKey("007".toCharArray()).
fileName(fileName).
open();
s.setRetentionTime(Integer.MAX_VALUE);
Map<String, Object> header = s.getStoreHeader();
assertEquals("1", header.get("format").toString());
header.put("formatRead", "1");
......@@ -528,19 +529,20 @@ public class TestMVStore extends TestBase {
MVStore s;
MVMap<Integer, Integer> m;
s = openStore(fileName);
s.setRetentionTime(Integer.MAX_VALUE);
m = s.openMap("test");
m.put(1, 1);
Map<String, Object> header = s.getStoreHeader();
int format = Integer.parseInt(header.get("format").toString());
assertEquals(1, format);
header.put("format", Integer.toString(format + 1));
// ensure the file header is overwritten
s.commit();
m.put(1, 10);
s.commit();
m.put(1, 20);
s.setRetentionTime(0);
s.commit();
for (int i = 0; i < 10; i++) {
if (i > 5) {
s.setRetentionTime(0);
}
m.put(10, 100 * i);
s.commit();
}
s.close();
try {
openStore(fileName).close();
......@@ -662,6 +664,7 @@ public class TestMVStore extends TestBase {
private void testFileHeader() {
String fileName = getBaseDir() + "/testFileHeader.h3";
MVStore s = openStore(fileName);
s.setRetentionTime(Integer.MAX_VALUE);
long time = System.currentTimeMillis();
Map<String, Object> m = s.getStoreHeader();
assertEquals("1", m.get("format").toString());
......@@ -671,12 +674,13 @@ public class TestMVStore extends TestBase {
MVMap<Integer, Integer> map = s.openMap("test");
map.put(10, 100);
// ensure the file header is overwritten
s.commit();
map.put(10, 110);
s.commit();
map.put(1, 120);
s.setRetentionTime(0);
s.commit();
for (int i = 0; i < 10; i++) {
if (i > 5) {
s.setRetentionTime(0);
}
map.put(10, 110);
s.commit();
}
s.close();
s = openStore(fileName);
assertEquals("123", s.getStoreHeader().get("test").toString());
......@@ -701,7 +705,7 @@ public class TestMVStore extends TestBase {
map = s.openMap("test" + i);
s.removeMap(map);
s.commit();
s.compact(100);
s.compact(100, 1);
if (fs.getFile().size() <= size) {
break;
}
......@@ -1058,6 +1062,7 @@ public class TestMVStore extends TestBase {
FileUtils.delete(fileName);
MVStore s;
s = openStore(fileName);
s.setRetentionTime(Integer.MAX_VALUE);
MVMap<String, String> m;
m = s.openMap("data");
long first = s.getCurrentVersion();
......@@ -1121,16 +1126,24 @@ public class TestMVStore extends TestBase {
MVMap<Integer, String> m;
s = openStore(fileName);
m = s.openMap("data");
for (int i = 0; i < 1000; i++) {
m.put(i, "Hello World");
String data = new String(new char[10000]).replace((char) 0, 'x');
for (int i = 1; i < 10; i++) {
m.put(i, data);
s.commit();
}
s.close();
long len = FileUtils.size(fileName);
s = openStore(fileName);
s.setRetentionTime(0);
// remove 50%
m = s.openMap("data");
m.clear();
s.compact(100);
for (int i = 0; i < 10; i += 2) {
m.remove(i);
}
s.commit();
assertTrue(s.compact(100, 1));
assertTrue(s.compact(100, 1));
assertTrue(s.compact(100, 1));
s.close();
long len2 = FileUtils.size(fileName);
assertTrue("len2: " + len2 + " len: " + len, len2 < len);
......@@ -1160,7 +1173,7 @@ public class TestMVStore extends TestBase {
s.commit();
// ensure only nodes are read, but not leaves
assertEquals(40, s.getFileStore().getReadCount());
assertEquals(1, s.getFileStore().getWriteCount());
assertTrue(s.getFileStore().getWriteCount() < 5);
s.close();
}
......@@ -1209,6 +1222,7 @@ public class TestMVStore extends TestBase {
s.close();
s = openStore(fileName);
s.setRetentionTime(45000);
assertEquals(2, s.getCurrentVersion());
meta = s.getMetaMap();
m = s.openMap("data");
......@@ -1231,6 +1245,7 @@ public class TestMVStore extends TestBase {
s.close();
s = openStore(fileName);
s.setRetentionTime(45000);
assertEquals(2, s.getCurrentVersion());
meta = s.getMetaMap();
assertTrue(meta.get("name.data") != null);
......@@ -1248,12 +1263,14 @@ public class TestMVStore extends TestBase {
s.close();
s = openStore(fileName);
s.setRetentionTime(45000);
assertEquals(3, s.getCurrentVersion());
m = s.openMap("data");
m.put("1", "Hi");
s.close();
s = openStore(fileName);
s.setRetentionTime(45000);
m = s.openMap("data");
assertEquals("Hi", m.get("1"));
s.rollbackTo(v3);
......@@ -1261,6 +1278,7 @@ public class TestMVStore extends TestBase {
s.close();
s = openStore(fileName);
s.setRetentionTime(45000);
m = s.openMap("data");
assertEquals("Hallo", m.get("1"));
s.close();
......@@ -1310,6 +1328,7 @@ public class TestMVStore extends TestBase {
String fileName = getBaseDir() + "/testMeta.h3";
FileUtils.delete(fileName);
MVStore s = openStore(fileName);
s.setRetentionTime(Integer.MAX_VALUE);
MVMap<String, String> m = s.getMetaMap();
assertEquals("[]", s.getMapNames().toString());
MVMap<String, String> data = s.openMap("data");
......@@ -1480,9 +1499,8 @@ public class TestMVStore extends TestBase {
chunkCount1++;
}
}
assertTrue(s.compact(80));
assertTrue(s.compact(80));
s.compact(80, 1);
s.compact(80, 1);
int chunkCount2 = 0;
for (String k : meta.keySet()) {
......@@ -1493,8 +1511,8 @@ public class TestMVStore extends TestBase {
assertTrue(chunkCount2 >= chunkCount1);
m = s.openMap("data");
assertTrue(s.compact(80));
assertTrue(s.compact(80));
assertTrue(s.compact(80, 16 * 1024));
assertTrue(s.compact(80, 1024));
int chunkCount3 = 0;
for (String k : meta.keySet()) {
......@@ -1522,7 +1540,7 @@ public class TestMVStore extends TestBase {
for (int i = 0; i < 100; i++) {
m.put(j + i, "Hello " + j);
}
s.compact(80);
s.compact(80, 1024);
s.close();
long len = FileUtils.size(fileName);
// System.out.println(" len:" + len);
......@@ -1539,13 +1557,13 @@ public class TestMVStore extends TestBase {
for (int i = 0; i < 100; i++) {
m.remove(i);
}
s.compact(80);
s.compact(80, 1024);
s.close();
// len = FileUtils.size(fileName);
// System.out.println("len1: " + len);
s = openStore(fileName);
m = s.openMap("data");
s.compact(80);
s.compact(80, 1024);
s.close();
// len = FileUtils.size(fileName);
// System.out.println("len2: " + len);
......
......@@ -45,16 +45,17 @@ public class TestRandomMapOps extends TestBase {
testMap("memFS:randomOps.h3");
}
private void testMap(String fileName) {
private void testMap(String fileName) throws Exception {
this.fileName = fileName;
int best = Integer.MAX_VALUE;
int bestSeed = 0;
Throwable failException = null;
int size = getSize(100, 1000);
for (seed = 0; seed < 100; seed++) {
FileUtils.delete(fileName);
Throwable ex = null;
try {
testCase();
testOps(size);
continue;
} catch (Exception e) {
ex = e;
......@@ -65,6 +66,7 @@ public class TestRandomMapOps extends TestBase {
trace(seed);
bestSeed = seed;
best = op;
size = best;
failException = ex;
// System.out.println("seed:" + seed + " op:" + op);
}
......@@ -75,7 +77,7 @@ public class TestRandomMapOps extends TestBase {
}
}
private void testCase() throws Exception {
private void testOps(int size) throws Exception {
FileUtils.delete(fileName);
MVStore s;
s = openStore(fileName);
......@@ -87,7 +89,6 @@ public class TestRandomMapOps extends TestBase {
}
Random r = new Random(seed);
op = 0;
int size = getSize(100, 1000);
TreeMap<Integer, byte[]> map = new TreeMap<Integer, byte[]>();
for (; op < size; op++) {
int k = r.nextInt(100);
......@@ -109,8 +110,8 @@ public class TestRandomMapOps extends TestBase {
map.remove(k);
break;
case 6:
log(op, k, v, "s.compact(90)");
s.compact(90);
log(op, k, v, "s.compact(90, 1024)");
s.compact(90, 1024);
break;
case 7:
log(op, k, v, "m.clear()");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论