提交 670d256c authored 作者: Thomas Mueller's avatar Thomas Mueller

A persistent multi-version map: compact based on estimated size, not page count

上级 3a92ea32
......@@ -577,7 +577,7 @@ public class TestMVStore extends TestBase {
m.put(j + i, "Hello " + j);
}
s.store();
s.compact(80);
// s.compact(80);
s.close();
long len = FileUtils.size(fileName);
// System.out.println(" len:" + len);
......
......@@ -6,6 +6,7 @@
*/
package org.h2.dev.store.btree;
import java.nio.ByteBuffer;
import java.util.HashMap;
/**
......@@ -19,6 +20,7 @@ import java.util.HashMap;
* 4 bytes: length
* 4 bytes: chunk id (an incrementing number)
* 8 bytes: metaRootPos
* 8 bytes: maxLengthLive
* [ Page ] *
*/
public class Chunk {
......@@ -34,19 +36,19 @@ public class Chunk {
long start;
/**
* The length in bytes (may be larger than the actual value).
* The length in bytes.
*/
long length;
int length;
/**
* The entry count.
* The sum of the max length of all pages.
*/
int entryCount;
long maxLength;
/**
* The number of life (non-garbage) objects.
* The sum of the max length of all pages that are in use.
*/
int liveCount;
long maxLengthLive;
/**
* The garbage collection priority.
......@@ -67,6 +69,30 @@ public class Chunk {
this.id = id;
}
public static Chunk fromHeader(ByteBuffer buff, long start) {
if (buff.get() != 'c') {
throw new RuntimeException("File corrupt");
}
int length = buff.getInt();
int chunkId = buff.getInt();
long metaRootPos = buff.getLong();
long maxLengthLive = buff.getLong();
Chunk c = new Chunk(chunkId);
c.length = length;
c.start = start;
c.metaRootPos = metaRootPos;
c.maxLengthLive = maxLengthLive;
return c;
}
void writeHeader(ByteBuffer buff) {
buff.put((byte) 'c');
buff.putInt(length);
buff.putInt(id);
buff.putLong(metaRootPos);
buff.putLong(maxLengthLive);
}
/**
* Build a block from the given string.
*
......@@ -78,16 +104,16 @@ public class Chunk {
int id = Integer.parseInt(map.get("id"));
Chunk c = new Chunk(id);
c.start = Long.parseLong(map.get("start"));
c.length = Long.parseLong(map.get("length"));
c.entryCount = Integer.parseInt(map.get("entryCount"));
c.liveCount = Integer.parseInt(map.get("liveCount"));
c.length = Integer.parseInt(map.get("length"));
c.maxLength = Long.parseLong(map.get("maxLength"));
c.maxLengthLive = Long.parseLong(map.get("maxLengthLive"));
c.metaRootPos = Long.parseLong(map.get("metaRoot"));
c.version = Long.parseLong(map.get("version"));
return c;
}
public int getFillRate() {
return entryCount == 0 ? 0 : 100 * liveCount / entryCount;
return (int) (maxLength == 0 ? 0 : 100 * maxLengthLive / maxLength);
}
public int hashCode() {
......@@ -103,8 +129,8 @@ public class Chunk {
"id:" + id + "," +
"start:" + start + "," +
"length:" + length + "," +
"entryCount:" + entryCount + "," +
"liveCount:" + liveCount + "," +
"maxLength:" + maxLength + "," +
"maxLengthLive:" + maxLengthLive + "," +
"metaRoot:" + metaRootPos + "," +
"version:" + version;
}
......
......@@ -80,8 +80,12 @@ public class Dump {
int chunkLength = block.getInt();
int chunkId = block.getInt();
long metaRootPos = block.getLong();
writer.println(" chunk " + chunkId + " at " + pos +
" length " + chunkLength + " root " + metaRootPos);
long maxLengthLive = block.getLong();
writer.println(" chunk " + chunkId +
" at " + pos +
" length " + chunkLength +
" root " + metaRootPos +
" maxLengthLive " + maxLengthLive);
ByteBuffer chunk = ByteBuffer.allocate(chunkLength);
file.position(pos);
FileUtils.readFully(file, chunk);
......
......@@ -35,10 +35,7 @@ header:
H:3,blockSize=4096,...
TODO:
- compact: use total max length instead of page count (liveCount)
- support background writes (store old version)
- support database version / app version
- limited support for writing to old versions (branches)
- atomic test-and-set (when supporting concurrent writes)
- possibly split chunk data into immutable and mutable
- support stores that span multiple files (chunks stored in other files)
......@@ -56,13 +53,17 @@ TODO:
- recovery: ensure data is not overwritten for 1 minute
- pluggable caching (specially for in-memory file systems)
- file locking
- allocate memory Utils.newBytes
- allocate memory with Utils.newBytes and so on
- unified exception handling
- check if locale specific string comparison can make data disappear
- concurrent map; avoid locking during IO (pre-load pages)
- maybe split database into multiple files, to speed up compact operation
- automated 'kill process' and 'power failure' test
- implement table engine for H2
- auto-compact from time to time and on close
- test and possibly improve compact operation (for large dbs)
- support background writes (concurrent modification & store)
- limited support for writing to old versions (branches)
*/
......@@ -384,6 +385,7 @@ public class MVStore {
c.start = header.start;
c.length = header.length;
c.metaRootPos = header.metaRootPos;
c.maxLengthLive = header.maxLengthLive;
}
lastChunkId = Math.max(c.id, lastChunkId);
chunks.put(c.id, c);
......@@ -499,28 +501,26 @@ public class MVStore {
return currentVersion;
}
// the last chunk might have been changed in the last save()
// this needs to be updated now (it's better not to update right after,
// save(), because that would modify the meta map again)
// the last chunk was not completely correct in the last save()
// this needs to be updated now (it's better not to update right after
// storing, because that would modify the meta map again)
Chunk c = chunks.get(lastChunkId);
if (c != null) {
meta.put("chunk." + c.id, c.toString());
}
int chunkId = ++lastChunkId;
c = new Chunk(chunkId);
c.entryCount = Integer.MAX_VALUE;
c.liveCount = Integer.MAX_VALUE;
c = new Chunk(++lastChunkId);
c.maxLength = Long.MAX_VALUE;
c.maxLengthLive = Long.MAX_VALUE;
c.start = Long.MAX_VALUE;
c.length = Long.MAX_VALUE;
c.length = Integer.MAX_VALUE;
c.version = currentVersion;
chunks.put(c.id, c);
meta.put("chunk." + c.id, c.toString());
applyFreedChunks();
ArrayList<Integer> removedChunks = New.arrayList();
for (Chunk x : chunks.values()) {
if (x.liveCount == 0 && (retainChunk == -1 || x.id < retainChunk)) {
if (x.maxLengthLive == 0 && (retainChunk == -1 || x.id < retainChunk)) {
meta.remove("chunk." + x.id);
removedChunks.add(x.id);
} else {
......@@ -528,7 +528,6 @@ public class MVStore {
}
applyFreedChunks();
}
int count = 0;
int maxLength = 1 + 4 + 4 + 8;
for (MVMap<?, ?> m : mapsChanged.values()) {
if (m == meta || !m.hasUnsavedChanges()) {
......@@ -539,36 +538,32 @@ public class MVStore {
meta.put("root." + m.getId(), "0");
} else {
maxLength += p.getMaxLengthTempRecursive();
count += p.countTempRecursive();
meta.put("root." + m.getId(), String.valueOf(Long.MAX_VALUE));
}
}
maxLength += meta.getRoot().getMaxLengthTempRecursive();
count += meta.getRoot().countTempRecursive();
ByteBuffer buff = ByteBuffer.allocate(maxLength);
// need to patch the header later
buff.put((byte) 'c');
buff.putInt(0);
buff.putInt(0);
buff.putLong(0);
c.writeHeader(buff);
c.maxLength = 0;
c.maxLengthLive = 0;
for (MVMap<?, ?> m : mapsChanged.values()) {
if (m == meta || !m.hasUnsavedChanges()) {
continue;
}
Page p = m.getRoot();
if (p.getTotalCount() > 0) {
long root = p.writeTempRecursive(buff, chunkId);
long root = p.writeTempRecursive(c, buff);
meta.put("root." + m.getId(), "" + root);
}
}
// fix metadata
c.entryCount = count;
c.liveCount = count;
meta.put("chunk." + c.id, c.toString());
meta.getRoot().writeTempRecursive(buff, chunkId);
// this will modify maxLengthLive, but
// the correct value is written in the chunk header
meta.getRoot().writeTempRecursive(c, buff);
buff.flip();
int length = buff.limit();
......@@ -576,17 +571,18 @@ public class MVStore {
// need to keep old chunks
// until they are are no longer referenced
// by a old version
// by an old version
// so empty space is not reused too early
for (int x : removedChunks) {
chunks.remove(x);
}
buff.rewind();
buff.put((byte) 'c');
buff.putInt(length);
buff.putInt(chunkId);
buff.putLong(meta.getRoot().getPos());
c.start = filePos;
c.length = length;
c.metaRootPos = meta.getRoot().getPos();
c.writeHeader(buff);
buff.rewind();
try {
writeCount++;
......@@ -597,9 +593,6 @@ public class MVStore {
}
rootChunkStart = filePos;
revertTemp();
// update the start position and length
c.start = filePos;
c.length = length;
long version = incrementVersion();
// write the new version (after the commit)
......@@ -609,11 +602,13 @@ public class MVStore {
}
private void applyFreedChunks() {
// apply liveCount changes
for (HashMap<Integer, Chunk> freed : freedChunks.values()) {
for (Chunk f : freed.values()) {
Chunk c = chunks.get(f.id);
c.liveCount += f.liveCount;
c.maxLengthLive += f.maxLengthLive;
if (c.maxLengthLive < 0) {
throw new RuntimeException("Corrupt max length");
}
}
}
freedChunks.clear();
......@@ -714,17 +709,7 @@ public class MVStore {
ByteBuffer buff = ByteBuffer.wrap(new byte[32]);
DataUtils.readFully(file, buff);
buff.rewind();
if (buff.get() != 'c') {
throw new RuntimeException("File corrupt");
}
int length = buff.getInt();
int chunkId = buff.getInt();
long metaRootPos = buff.getLong();
Chunk c = new Chunk(chunkId);
c.start = start;
c.length = length;
c.metaRootPos = metaRootPos;
return c;
return Chunk.fromHeader(buff, start);
} catch (IOException e) {
throw new RuntimeException(e);
}
......@@ -743,23 +728,22 @@ public class MVStore {
// avoid division by 0
return false;
}
long liveCountTotal = 0, entryCountTotal = 0;
long maxLengthSum = 0, maxLengthLiveSum = 0;
for (Chunk c : chunks.values()) {
entryCountTotal += c.entryCount;
liveCountTotal += c.liveCount;
maxLengthSum += c.maxLength;
maxLengthLiveSum += c.maxLengthLive;
}
if (entryCountTotal <= 0) {
if (maxLengthSum <= 0) {
// avoid division by 0
entryCountTotal = 1;
maxLengthSum = 1;
}
int percentTotal = (int) (100 * liveCountTotal / entryCountTotal);
int percentTotal = (int) (100 * maxLengthLiveSum / maxLengthSum);
if (percentTotal > fillRate) {
return false;
}
// calculate how many entries a chunk has on average
// TODO use the max size instead of the count
int averageEntryCount = (int) (entryCountTotal / chunks.size());
// calculate the average max length
int averageMaxLength = (int) (maxLengthSum / chunks.size());
// the 'old' list contains the chunks we want to free up
ArrayList<Chunk> old = New.arrayList();
......@@ -781,16 +765,16 @@ public class MVStore {
}
});
// find out up to were we need to move
// find out up to were in the old list we need to move
// try to move one (average sized) chunk
int moveCount = 0;
long moved = 0;
Chunk move = null;
for (Chunk c : old) {
if (moveCount + c.liveCount > averageEntryCount) {
if (move != null && moved + c.maxLengthLive > averageMaxLength) {
break;
}
log(" chunk " + c.id + " " + c.getFillRate() + "% full; prio=" + c.collectPriority);
moveCount += c.liveCount;
moved += c.maxLengthLive;
move = c;
}
......@@ -804,77 +788,71 @@ public class MVStore {
it.remove();
}
}
while (!isKnownVersion(move.version)) {
int id = move.id;
while (true) {
Chunk m = chunks.get(++id);
if (id > lastChunkId) {
// no known version
return false;
}
if (m != null) {
move = m;
break;
}
}
// iterate over all the pages in the old pages
for (Chunk c : old) {
copyLive(c, old);
}
// the metaRootPos might not be set
move = readChunkHeader(move.start);
log(" meta:" + move.id + "/" + move.metaRootPos + " start: " + move.start);
store();
return true;
}
// change at least one entry in the map
// to ensure a chunk will be written
// (even if there is nothing to move)
meta.put("chunk." + move.id, move.toString());
MVMap<String, String> oldMeta = new MVMap<String, String>(this, 0, "old-meta", STRING_TYPE, STRING_TYPE, 0);
oldMeta.setRootPos(move.metaRootPos);
Iterator<String> it = oldMeta.keyIterator("map.");
while (it.hasNext()) {
String k = it.next();
if (!k.startsWith("map.")) {
break;
}
String s = oldMeta.get(k);
k = k.substring("map.".length());
private void copyLive(Chunk chunk, ArrayList<Chunk> old) {
ByteBuffer buff = ByteBuffer.allocate(chunk.length);
try {
file.position(chunk.start);
DataUtils.readFully(file, buff);
} catch (IOException e) {
throw new RuntimeException(e);
}
Chunk.fromHeader(buff, chunk.start);
int chunkLength = chunk.length;
while (buff.position() < chunkLength) {
int start = buff.position();
int pageLength = buff.getInt();
buff.getShort();
int mapId = DataUtils.readVarInt(buff);
@SuppressWarnings("unchecked")
MVMap<Object, Object> data = (MVMap<Object, Object>) maps.get(k);
if (data == null) {
MVMap<Object, Object> map = (MVMap<Object, Object>) getMap(mapId);
if (map == null) {
buff.position(start + pageLength);
continue;
}
log(" " + k + " " + s.replace('\n', ' '));
String[] idTypeList = StringUtils.arraySplit(s, '/', false);
int id = Integer.parseInt(idTypeList[0]);
DataType kt = buildDataType(idTypeList[3]);
DataType vt = buildDataType(idTypeList[4]);
long oldDataRoot = Long.parseLong(oldMeta.get("root." + id));
if (oldDataRoot != 0) {
MVMap<?, ?> oldData = new MVMap<Object, Object>(this, id, "old-" + k, kt, vt, 0);
oldData.setRootPos(oldDataRoot);
Iterator<?> dataIt = oldData.keyIterator(null);
while (dataIt.hasNext()) {
Object o = dataIt.next();
Page p = data.getPage(o);
if (p == null) {
// was removed later - ignore
// or the chunk no longer exists
} else if (p.getPos() < 0) {
// temporarily changed - ok
// TODO move old data if there is an uncommitted change?
} else {
Chunk c = getChunk(p.getPos());
if (old.contains(c)) {
log(" move key:" + o + " chunk:" + c.id);
Object value = data.get(o);
data.remove(o);
data.put(o, value);
}
buff.position(start);
Page page = new Page(map, 0);
page.read(buff, chunk.id, buff.position(), chunk.length);
for (int i = 0; i < page.getKeyCount(); i++) {
Object k = page.getKey(i);
Page p = map.getPage(k);
if (p == null) {
// was removed later - ignore
// or the chunk no longer exists
} else if (p.getPos() < 0) {
// temporarily changed - ok
// TODO move old data if there is an uncommitted change?
} else {
Chunk c = getChunk(p.getPos());
if (old.contains(c)) {
log(" move key:" + k + " chunk:" + c.id);
Object value = map.remove(k);
map.put(k, value);
}
}
}
}
store();
return true;
}
private MVMap<?, ?> getMap(int mapId) {
if (mapId == 0) {
return meta;
}
for (MVMap<?, ?> m : maps.values()) {
if (m.getId() == mapId) {
return m;
}
}
return null;
}
/**
......@@ -909,9 +887,6 @@ public class MVStore {
// but we don't optimize for rollback
cache.remove(pos);
Chunk c = getChunk(pos);
if (c.liveCount == 0) {
throw new RuntimeException("Negative live count: " + pos);
}
HashMap<Integer, Chunk>freed = freedChunks.get(currentVersion);
if (freed == null) {
freed = New.hashMap();
......@@ -922,7 +897,7 @@ public class MVStore {
f = new Chunk(c.id);
freed.put(c.id, f);
}
f.liveCount--;
f.maxLengthLive -= DataUtils.getPageMaxLength(pos);
}
}
......@@ -993,6 +968,14 @@ public class MVStore {
return retainVersion;
}
/**
* Check whether all data can be read from this version. This requires that
* all chunks referenced by this version are still available (not
* overwritten).
*
* @param version the version
* @return true if all data can be read
*/
private boolean isKnownVersion(long version) {
if (version > currentVersion || version < 0) {
return false;
......@@ -1006,7 +989,7 @@ public class MVStore {
if (c == null) {
return false;
}
// also, all check referenced by this version
// also, all chunks referenced by this version
// need to be available in the file
MVMap<String, String> oldMeta = getMetaMap(version);
if (oldMeta == null) {
......
......@@ -57,7 +57,7 @@ public class Page {
private Page[] childrenPages;
private long[] counts;
private Page(MVMap<?, ?> map, long version) {
Page(MVMap<?, ?> map, long version) {
this.map = map;
this.version = version;
}
......@@ -596,7 +596,7 @@ public class Page {
}
}
private void read(ByteBuffer buff, int chunkId, int offset, int maxLength) {
void read(ByteBuffer buff, int chunkId, int offset, int maxLength) {
int start = buff.position();
int pageLength = buff.getInt();
if (pageLength > maxLength) {
......@@ -661,10 +661,10 @@ public class Page {
/**
* Store the page and update the position.
*
* @param chunk the chunk
* @param buff the target buffer
* @param chunkId the chunk id
*/
private void write(ByteBuffer buff, int chunkId) {
private void write(Chunk chunk, ByteBuffer buff) {
int start = buff.position();
buff.putInt(0);
buff.putShort((byte) 0);
......@@ -707,11 +707,15 @@ public class Page {
}
int pageLength = buff.position() - start;
buff.putInt(start, pageLength);
int chunkId = chunk.id;
int check = DataUtils.getCheckValue(chunkId)
^ DataUtils.getCheckValue(start)
^ DataUtils.getCheckValue(pageLength);
buff.putShort(start + 4, (short) check);
this.pos = DataUtils.getPagePos(chunkId, start, pageLength, type);
long max = DataUtils.getPageMaxLength(pos);
chunk.maxLength += max;
chunk.maxLengthLive += max;
}
/**
......@@ -748,42 +752,25 @@ public class Page {
* Store this page and all children that are changed, in reverse order, and
* update the position and the children.
*
* @param chunk the chunk
* @param buff the target buffer
* @param chunkId the chunk id
* @return the page id
*/
long writeTempRecursive(ByteBuffer buff, int chunkId) {
long writeTempRecursive(Chunk chunk, ByteBuffer buff) {
if (!isLeaf()) {
int len = children.length;
for (int i = 0; i < len; i++) {
Page p = childrenPages[i];
if (p != null) {
children[i] = p.writeTempRecursive(buff, chunkId);
children[i] = p.writeTempRecursive(chunk, buff);
childrenPages[i] = null;
}
}
}
write(buff, chunkId);
write(chunk, buff);
return pos;
}
/**
* Count the temporary pages recursively.
*
* @return the number of temporary pages
*/
int countTempRecursive() {
int count = 1;
if (!isLeaf()) {
for (Page p : childrenPages) {
if (p != null) {
count += p.countTempRecursive();
}
}
}
return count;
}
long getVersion() {
return version;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论