提交 16b41b06 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: use a mark & sweep GC algorithm instead of reference counting, to…

MVStore: use a mark & sweep GC algorithm instead of reference counting, to ensure used chunks are never overwrite, even if the reference counting algorithm does not work properly.
上级 75e7ff82
...@@ -17,7 +17,10 @@ Change Log ...@@ -17,7 +17,10 @@ Change Log
<h1>Change Log</h1> <h1>Change Log</h1>
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>In the multi-threaded mode, updating the column selectivity ("analyze") <ul><li>MVStore: use a mark & sweep GC algorithm instead of reference counting,
to ensure used chunks are never overwrite, even if the reference counting
algorithm does not work properly.
</li><li>In the multi-threaded mode, updating the column selectivity ("analyze")
in the background sometimes did not work. in the background sometimes did not work.
</li><li>In the multi-threaded mode, database metadata operations </li><li>In the multi-threaded mode, database metadata operations
did sometimes not work if the schema was changed at the same time did sometimes not work if the schema was changed at the same time
......
...@@ -123,6 +123,23 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -123,6 +123,23 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return (V) result; return (V) result;
} }
/**
* Add or replace a key-value pair in a branch.
*
* @param root the root page
* @param key the key (may not be null)
* @param value the value (may not be null)
* @return the new root page
*/
synchronized Page putBranch(Page root, K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null");
long v = writeVersion;
Page p = root.copy(v);
p = splitRootIfNeeded(p, v);
put(p, v, key, value);
return p;
}
/** /**
* Split the root page if necessary. * Split the root page if necessary.
* *
......
...@@ -24,6 +24,7 @@ import org.h2.compress.CompressLZF; ...@@ -24,6 +24,7 @@ import org.h2.compress.CompressLZF;
import org.h2.compress.Compressor; import org.h2.compress.Compressor;
import org.h2.mvstore.cache.CacheLongKeyLIRS; import org.h2.mvstore.cache.CacheLongKeyLIRS;
import org.h2.mvstore.type.StringDataType; import org.h2.mvstore.type.StringDataType;
import org.h2.mvstore.Page.PageChildren;
import org.h2.util.MathUtils; import org.h2.util.MathUtils;
import org.h2.util.New; import org.h2.util.New;
...@@ -166,6 +167,13 @@ public class MVStore { ...@@ -166,6 +167,13 @@ public class MVStore {
*/ */
private CacheLongKeyLIRS<Page> cache; private CacheLongKeyLIRS<Page> cache;
/**
* The page chunk references cache. The default size is 4 MB, and the average size is 2 KB.
* It is split in 16 segments. The stack move distance is 2% of the expected
* number of entries.
*/
private CacheLongKeyLIRS<PageChildren> cacheChunkRef;
/** /**
* The newest chunk. If nothing was stored yet, this field is not set. * The newest chunk. If nothing was stored yet, this field is not set.
*/ */
...@@ -292,6 +300,7 @@ public class MVStore { ...@@ -292,6 +300,7 @@ public class MVStore {
fileStore = (FileStore) config.get("fileStore"); fileStore = (FileStore) config.get("fileStore");
if (fileName == null && fileStore == null) { if (fileName == null && fileStore == null) {
cache = null; cache = null;
cacheChunkRef = null;
return; return;
} }
if (fileStore == null) { if (fileStore == null) {
...@@ -309,7 +318,11 @@ public class MVStore { ...@@ -309,7 +318,11 @@ public class MVStore {
int averageMemory = Math.max(10, pageSplitSize / 2); int averageMemory = Math.max(10, pageSplitSize / 2);
int segmentCount = 16; int segmentCount = 16;
int stackMoveDistance = maxMemoryBytes / averageMemory * 2 / 100; int stackMoveDistance = maxMemoryBytes / averageMemory * 2 / 100;
cache = new CacheLongKeyLIRS<Page>(maxMemoryBytes, averageMemory, cache = new CacheLongKeyLIRS<Page>(
maxMemoryBytes, averageMemory,
segmentCount, stackMoveDistance);
cacheChunkRef = new CacheLongKeyLIRS<PageChildren>(
maxMemoryBytes / 4, 20,
segmentCount, stackMoveDistance); segmentCount, stackMoveDistance);
} }
o = config.get("autoCommitBufferSize"); o = config.get("autoCommitBufferSize");
...@@ -318,7 +331,7 @@ public class MVStore { ...@@ -318,7 +331,7 @@ public class MVStore {
autoCommitMemory = kb * 1024 * 19; autoCommitMemory = kb * 1024 * 19;
o = config.get("autoCompactFillRate"); o = config.get("autoCompactFillRate");
autoCompactFillRate = o == null ? 80 : (Integer) o; autoCompactFillRate = o == null ? 50 : (Integer) o;
char[] encryptionKey = (char[]) config.get("encryptionKey"); char[] encryptionKey = (char[]) config.get("encryptionKey");
try { try {
...@@ -813,6 +826,7 @@ public class MVStore { ...@@ -813,6 +826,7 @@ public class MVStore {
// release memory early - this is important when called // release memory early - this is important when called
// because of out of memory // because of out of memory
cache = null; cache = null;
cacheChunkRef = null;
for (MVMap<?, ?> m : New.arrayList(maps.values())) { for (MVMap<?, ?> m : New.arrayList(maps.values())) {
m.close(); m.close();
} }
...@@ -956,6 +970,7 @@ public class MVStore { ...@@ -956,6 +970,7 @@ public class MVStore {
} }
private long storeNow() { private long storeNow() {
freeUnusedChunks();
int currentUnsavedPageCount = unsavedMemory; int currentUnsavedPageCount = unsavedMemory;
long storeVersion = currentStoreVersion; long storeVersion = currentStoreVersion;
long version = ++currentVersion; long version = ++currentVersion;
...@@ -1016,16 +1031,7 @@ public class MVStore { ...@@ -1016,16 +1031,7 @@ public class MVStore {
} }
} }
} }
for (MVMap<?, ?> m : changed) { applyFreedSpace(storeVersion);
Page p = m.getRoot();
String key = MVMap.getMapRootKey(m.getId());
if (p.getTotalCount() == 0) {
meta.put(key, "0");
} else {
meta.put(key, Long.toHexString(Long.MAX_VALUE));
}
}
Set<Chunk> removedChunks = applyFreedSpace(storeVersion, time);
WriteBuffer buff = getWriteBuffer(); WriteBuffer buff = getWriteBuffer();
// need to patch the header later // need to patch the header later
c.writeChunkHeader(buff, 0); c.writeChunkHeader(buff, 0);
...@@ -1036,10 +1042,12 @@ public class MVStore { ...@@ -1036,10 +1042,12 @@ public class MVStore {
c.maxLenLive = 0; c.maxLenLive = 0;
for (MVMap<?, ?> m : changed) { for (MVMap<?, ?> m : changed) {
Page p = m.getRoot(); Page p = m.getRoot();
if (p.getTotalCount() > 0) { String key = MVMap.getMapRootKey(m.getId());
if (p.getTotalCount() == 0) {
meta.put(key, "0");
} else {
p.writeUnsavedRecursive(c, buff); p.writeUnsavedRecursive(c, buff);
long root = p.getPos(); long root = p.getPos();
String key = MVMap.getMapRootKey(m.getId());
meta.put(key, Long.toHexString(root)); meta.put(key, Long.toHexString(root));
} }
} }
...@@ -1067,15 +1075,6 @@ public class MVStore { ...@@ -1067,15 +1075,6 @@ public class MVStore {
// end is not necessarily the end of the file // end is not necessarily the end of the file
boolean storeAtEndOfFile = filePos + length >= fileStore.size(); boolean storeAtEndOfFile = filePos + length >= fileStore.size();
// free up the space of unused chunks now
// (after allocating space for this chunk, so that
// no data is lost if writing this chunk fails)
for (Chunk x : removedChunks) {
long start = x.block * BLOCK_SIZE;
int len = x.len * BLOCK_SIZE;
fileStore.free(start, len);
}
if (!reuseSpace) { if (!reuseSpace) {
// we can not mark it earlier, because it // we can not mark it earlier, because it
// might have been allocated by one of the // might have been allocated by one of the
...@@ -1173,6 +1172,106 @@ public class MVStore { ...@@ -1173,6 +1172,106 @@ public class MVStore {
return version; return version;
} }
private void freeUnusedChunks() {
if (lastChunk == null) {
return;
}
Set<Integer> referenced = collectReferencedChunks();
ArrayList<Chunk> free = New.arrayList();
long time = getTime();
for (Chunk c : chunks.values()) {
if (!referenced.contains(c.id)) {
free.add(c);
}
}
for (Chunk c : free) {
if (canOverwriteChunk(c, time)) {
chunks.remove(c.id);
markMetaChanged();
meta.remove(Chunk.getMetaKey(c.id));
long start = c.block * BLOCK_SIZE;
int length = c.len * BLOCK_SIZE;
fileStore.free(start, length);
} else {
if (c.unused == 0) {
c.unused = time;
meta.put(Chunk.getMetaKey(c.id), c.asString());
markMetaChanged();
}
}
}
}
private Set<Integer> collectReferencedChunks() {
long testVersion = lastChunk.version;
DataUtils.checkArgument(testVersion > 0, "Collect references on version 0");
long readCount = getFileStore().readCount;
Set<Integer> referenced = New.hashSet();
for (Cursor<String, String> c = meta.cursor("root."); c.hasNext();) {
String key = c.next();
if (!key.startsWith("root.")) {
break;
}
long pos = DataUtils.parseHexLong(c.getValue());
if (pos == 0) {
continue;
}
int mapId = DataUtils.parseHexInt(key.substring("root.".length()));
collectReferencedChunks(referenced, mapId, pos);
}
long pos = lastChunk.metaRootPos;
collectReferencedChunks(referenced, 0, pos);
readCount = fileStore.readCount - readCount;
return referenced;
}
private int collectReferencedChunks(Set<Integer> targetChunkSet, int mapId, long pos) {
targetChunkSet.add(DataUtils.getPageChunkId(pos));
if (DataUtils.getPageType(pos) == DataUtils.PAGE_TYPE_LEAF) {
return 1;
}
PageChildren refs = readPageChunkReferences(mapId, pos, -1);
int count = 0;
if (refs != null) {
for (long p : refs.children) {
count += collectReferencedChunks(targetChunkSet, mapId, p);
}
}
return count;
}
PageChildren readPageChunkReferences(int mapId, long pos, int parentChunk) {
if (DataUtils.getPageType(pos) == DataUtils.PAGE_TYPE_LEAF) {
return null;
}
PageChildren r = cacheChunkRef.get(pos);
if (r == null) {
Page p = cache.get(pos);
if (p == null) {
Chunk c = getChunk(pos);
long filePos = c.block * BLOCK_SIZE;
filePos += DataUtils.getPageOffset(pos);
if (filePos < 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Negative position {0}", filePos);
}
r = PageChildren.read(fileStore, filePos, mapId, pos);
} else {
r = new PageChildren(p);
}
r.removeDuplicateChunkReferences();
cacheChunkRef.put(pos, r);
}
if (r.children.length == 0) {
int chunk = DataUtils.getPageChunkId(pos);
if (chunk == parentChunk) {
return null;
}
}
return r;
}
/** /**
* Get a buffer for writing. This caller must synchronize on the store * Get a buffer for writing. This caller must synchronize on the store
* before calling the method and until after using the buffer. * before calling the method and until after using the buffer.
...@@ -1206,7 +1305,7 @@ public class MVStore { ...@@ -1206,7 +1305,7 @@ public class MVStore {
if (c.time + retentionTime > time) { if (c.time + retentionTime > time) {
return false; return false;
} }
if (c.unused != 0 && c.unused + retentionTime / 2 > time) { if (c.unused == 0 || c.unused + retentionTime / 2 > time) {
return false; return false;
} }
Chunk r = retainChunk; Chunk r = retainChunk;
...@@ -1222,12 +1321,12 @@ public class MVStore { ...@@ -1222,12 +1321,12 @@ public class MVStore {
/** /**
* Apply the freed space to the chunk metadata. The metadata is updated, but * Apply the freed space to the chunk metadata. The metadata is updated, but
* freed chunks are not removed yet. * completely free chunks are not removed from the set of chunks, and the
* disk space is not yet marked as free.
* *
* @param storeVersion apply up to the given version * @param storeVersion apply up to the given version
* @return the set of completely freed chunks (might be empty)
*/ */
private Set<Chunk> applyFreedSpace(long storeVersion, long time) { private Set<Chunk> applyFreedSpace(long storeVersion) {
Set<Chunk> removedChunks = New.hashSet(); Set<Chunk> removedChunks = New.hashSet();
while (true) { while (true) {
ArrayList<Chunk> modified = New.arrayList(); ArrayList<Chunk> modified = New.arrayList();
...@@ -1271,22 +1370,7 @@ public class MVStore { ...@@ -1271,22 +1370,7 @@ public class MVStore {
it.remove(); it.remove();
} }
for (Chunk c : modified) { for (Chunk c : modified) {
if (c.maxLenLive <= 0) {
if (c.unused == 0) {
c.unused = time;
}
if (canOverwriteChunk(c, time)) {
removedChunks.add(c);
chunks.remove(c.id);
meta.remove(Chunk.getMetaKey(c.id));
} else {
meta.put(Chunk.getMetaKey(c.id), c.asString()); meta.put(Chunk.getMetaKey(c.id), c.asString());
// possibly remove this chunk in the next save operation
registerFreePage(storeVersion + 1, c.id, 0, 0);
}
} else {
meta.put(Chunk.getMetaKey(c.id), c.asString());
}
} }
if (modified.size() == 0) { if (modified.size() == 0) {
break; break;
...@@ -1422,13 +1506,15 @@ public class MVStore { ...@@ -1422,13 +1506,15 @@ public class MVStore {
boolean oldReuse = reuseSpace; boolean oldReuse = reuseSpace;
try { try {
retentionTime = 0; retentionTime = 0;
compactFreeUnusedChunks(); freeUnusedChunks();
if (fileStore.getFillRate() > targetFillRate) { if (fileStore.getFillRate() > targetFillRate) {
return false; return false;
} }
long start = fileStore.getFirstFree() / BLOCK_SIZE; long start = fileStore.getFirstFree() / BLOCK_SIZE;
ArrayList<Chunk> move = compactGetMoveBlocks(start, moveSize); ArrayList<Chunk> move = compactGetMoveBlocks(start, moveSize);
compactMoveChunks(move); compactMoveChunks(move);
freeUnusedChunks();
storeNow();
} finally { } finally {
reuseSpace = oldReuse; reuseSpace = oldReuse;
retentionTime = oldRetentionTime; retentionTime = oldRetentionTime;
...@@ -1470,26 +1556,6 @@ public class MVStore { ...@@ -1470,26 +1556,6 @@ public class MVStore {
return move; return move;
} }
private void compactFreeUnusedChunks() {
long time = getTime();
ArrayList<Chunk> free = New.arrayList();
for (Chunk c : chunks.values()) {
if (c.maxLenLive <= 0) {
if (canOverwriteChunk(c, time)) {
free.add(c);
}
}
}
for (Chunk c : free) {
chunks.remove(c.id);
markMetaChanged();
meta.remove(Chunk.getMetaKey(c.id));
long start = c.block * BLOCK_SIZE;
int length = c.len * BLOCK_SIZE;
fileStore.free(start, length);
}
}
private void compactMoveChunks(ArrayList<Chunk> move) { private void compactMoveChunks(ArrayList<Chunk> move) {
for (Chunk c : move) { for (Chunk c : move) {
WriteBuffer buff = getWriteBuffer(); WriteBuffer buff = getWriteBuffer();
...@@ -1633,7 +1699,7 @@ public class MVStore { ...@@ -1633,7 +1699,7 @@ public class MVStore {
// only look at chunk older than the retention time // only look at chunk older than the retention time
// (it's possible to compact chunks earlier, but right // (it's possible to compact chunks earlier, but right
// now we don't do that) // now we don't do that)
if (canOverwriteChunk(c, time)) { if (c.time + retentionTime <= time) {
long age = last.version - c.version + 1; long age = last.version - c.version + 1;
c.collectPriority = (int) (c.getFillRate() * 1000 / age); c.collectPriority = (int) (c.getFillRate() * 1000 / age);
old.add(c); old.add(c);
...@@ -1701,90 +1767,8 @@ public class MVStore { ...@@ -1701,90 +1767,8 @@ public class MVStore {
if (!meta.rewrite(set)) { if (!meta.rewrite(set)) {
return; return;
} }
freeUnusedChunks();
commitAndSave(); commitAndSave();
boolean commitAgain = false;
for (Chunk c : old) {
// a concurrent commit could free up the chunk
// so we wait for any commits to finish
if (c.maxLenLive <= 0) {
continue;
}
// not cleared - that means bookkeeping of live pages
// may be broken; copyLive will fix this
compactFixLive(c);
commitAgain = true;
}
if (commitAgain) {
commitAndSave();
}
}
private synchronized void compactFixLive(Chunk chunk) {
long start = chunk.block * BLOCK_SIZE;
int length = chunk.len * BLOCK_SIZE;
ByteBuffer buff = fileStore.readFully(start, length);
Chunk verify = Chunk.readChunkHeader(buff, start);
if (verify.id != chunk.id) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Expected chunk {0}, got {1}", chunk.id, verify.id);
}
int pagesRemaining = chunk.pageCount;
markMetaChanged();
boolean mapNotOpen = false;
while (pagesRemaining-- > 0) {
int offset = buff.position();
int pageLength = buff.getInt();
if (pageLength <= 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Page length {0}", pageLength);
}
buff.getShort();
int mapId = DataUtils.readVarInt(buff);
@SuppressWarnings("unchecked")
MVMap<Object, Object> map = (MVMap<Object, Object>) getMap(mapId);
if (map == null) {
boolean mapExists = meta.containsKey(MVMap.getMapRootKey(mapId));
if (mapExists) {
// pages of maps that were removed: the live count was
// already decremented, but maps that are not open, the
// chunk is not removed
mapNotOpen = true;
}
}
buff.position(offset + pageLength);
}
if (!mapNotOpen) {
// if all maps are open
// then live bookkeeping is wrong, and we anyway
// remove the chunk
// (but we first need to check that there are no
// pending changes)
boolean pendingChanges = false;
for (HashMap<Integer, Chunk> e : freedPageSpace.values()) {
synchronized (e) {
if (e.containsKey(chunk.id)) {
pendingChanges = true;
break;
}
}
}
if (!pendingChanges) {
// bookkeeping is broken for this chunk:
// fix it
registerFreePage(currentVersion, chunk.id,
chunk.maxLenLive + MARKED_FREE,
chunk.pageCountLive + MARKED_FREE);
}
}
}
private MVMap<?, ?> getMap(int mapId) {
if (mapId == 0) {
return meta;
}
return maps.get(mapId);
} }
/** /**
...@@ -1839,8 +1823,12 @@ public class MVStore { ...@@ -1839,8 +1823,12 @@ public class MVStore {
// We could also keep the page in the cache, as somebody // We could also keep the page in the cache, as somebody
// could still read it (reading the old version). // could still read it (reading the old version).
if (cache != null) { if (cache != null) {
if (DataUtils.getPageType(pos) == DataUtils.PAGE_TYPE_LEAF) {
// keep nodes in the cache, because they are still used for
// garbage collection
cache.remove(pos); cache.remove(pos);
} }
}
Chunk c = getChunk(pos); Chunk c = getChunk(pos);
long version = currentVersion; long version = currentVersion;
...@@ -2343,6 +2331,8 @@ public class MVStore { ...@@ -2343,6 +2331,8 @@ public class MVStore {
} }
// use a lower fill rate if there were any file operations // use a lower fill rate if there were any file operations
int fillRate = fileOps ? autoCompactFillRate / 3 : autoCompactFillRate; int fillRate = fileOps ? autoCompactFillRate / 3 : autoCompactFillRate;
// TODO how to avoid endless compaction if there is a bug
// in the bookkeeping?
compact(fillRate, autoCommitMemory); compact(fillRate, autoCommitMemory);
autoCompactLastFileOpCount = fileStore.getWriteCount() + fileStore.getReadCount(); autoCompactLastFileOpCount = fileStore.getWriteCount() + fileStore.getReadCount();
} catch (Exception e) { } catch (Exception e) {
...@@ -2591,7 +2581,7 @@ public class MVStore { ...@@ -2591,7 +2581,7 @@ public class MVStore {
* this value, then chunks at the end of the file are moved. Compaction * this value, then chunks at the end of the file are moved. Compaction
* stops if the target fill rate is reached. * stops if the target fill rate is reached.
* <p> * <p>
* The default value is 80 (80%). The value 0 disables auto-compacting. * The default value is 50 (50%). The value 0 disables auto-compacting.
* <p> * <p>
* *
* @param percent the target fill rate * @param percent the target fill rate
......
...@@ -153,13 +153,15 @@ public class MVStoreTool { ...@@ -153,13 +153,15 @@ public class MVStoreTool {
boolean node = (type & 1) != 0; boolean node = (type & 1) != 0;
pw.printf( pw.printf(
"+%0" + len + "+%0" + len +
"x %s, map %x, %d entries, %d bytes%n", "x %s, map %x, %d entries, %d bytes, maxLen %x%n",
p, p,
(node ? "node" : "leaf") + (node ? "node" : "leaf") +
(compressed ? " compressed" : ""), (compressed ? " compressed" : ""),
mapId, mapId,
node ? entries + 1 : entries, node ? entries + 1 : entries,
pageSize); pageSize,
DataUtils.getPageMaxLength(DataUtils.getPagePos(0, 0, pageSize, 0))
);
p += pageSize; p += pageSize;
Integer mapSize = mapSizes.get(mapId); Integer mapSize = mapSizes.get(mapId);
if (mapSize == null) { if (mapSize == null) {
......
...@@ -7,8 +7,10 @@ package org.h2.mvstore; ...@@ -7,8 +7,10 @@ package org.h2.mvstore;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashSet;
import org.h2.compress.Compressor; import org.h2.compress.Compressor;
import org.h2.mvstore.type.DataType; import org.h2.mvstore.type.DataType;
import org.h2.util.New;
/** /**
* A page (a node or a leaf). * A page (a node or a leaf).
...@@ -766,6 +768,11 @@ public class Page { ...@@ -766,6 +768,11 @@ public class Page {
} }
pos = DataUtils.getPagePos(chunkId, start, pageLength, type); pos = DataUtils.getPagePos(chunkId, start, pageLength, type);
store.cachePage(pos, this, getMemory()); store.cachePage(pos, this, getMemory());
if (type == DataUtils.PAGE_TYPE_NODE) {
// cache again - this will make sure nodes stays in the cache
// for a longer time
store.cachePage(pos, this, getMemory());
}
long max = DataUtils.getPageMaxLength(pos); long max = DataUtils.getPageMaxLength(pos);
chunk.maxLen += max; chunk.maxLen += max;
chunk.maxLenLive += max; chunk.maxLenLive += max;
...@@ -820,7 +827,9 @@ public class Page { ...@@ -820,7 +827,9 @@ public class Page {
* Unlink the children recursively after all data is written. * Unlink the children recursively after all data is written.
*/ */
void writeEnd() { void writeEnd() {
if (!isLeaf()) { if (isLeaf()) {
return;
}
int len = children.length; int len = children.length;
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
PageReference ref = children[i]; PageReference ref = children[i];
...@@ -834,7 +843,6 @@ public class Page { ...@@ -834,7 +843,6 @@ public class Page {
} }
} }
} }
}
long getVersion() { long getVersion() {
return version; return version;
...@@ -939,4 +947,142 @@ public class Page { ...@@ -939,4 +947,142 @@ public class Page {
} }
/**
* Contains information about which other pages are referenced (directly or
* indirectly) by the given page. This is a subset of the page data, for
* pages of type node. This information is used for garbage collection (to
* quickly find out which chunks are still in use).
*/
public static class PageChildren {
/**
* An empty array of type long.
*/
public static final long[] EMPTY_ARRAY = new long[0];
/**
* The position of the page.
*/
final long pos;
/**
* The page positions of (direct or indirect) children. Depending on the
* use case, this can be the complete list, or only a subset of all
* children, for example only only one reference to a child in another
* chunk.
*/
long[] children;
private PageChildren(long pos, long[] children) {
this.pos = pos;
this.children = children;
}
PageChildren(Page p) {
this.pos = p.getPos();
int count = p.getRawChildPageCount();
this.children = new long[count];
for (int i = 0; i < count; i++) {
children[i] = p.getChildPagePos(i);
}
}
int getMemory() {
return 64 + 8 * children.length;
}
/**
* Read the page from the buffer.
*
* @param pos the position
* @param buff the buffer
* @param chunkId the chunk id
* @param mapId the map id
* @param offset the offset within the chunk
* @param maxLength the maximum length
*/
static PageChildren read(FileStore fileStore, long filePos, int mapId, long pos) {
ByteBuffer buff;
int maxLength = DataUtils.getPageMaxLength(pos);
if (maxLength == DataUtils.PAGE_LARGE) {
buff = fileStore.readFully(filePos, 128);
maxLength = buff.getInt();
// read the first bytes again
}
long fileSize = fileStore.fileSize;
maxLength = (int) Math.min(fileSize - filePos, maxLength);
int length = maxLength;
if (length < 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Illegal page length {0} reading at {1}; file size {2} ",
length, filePos, fileSize);
}
buff = fileStore.readFully(filePos, length);
int chunkId = DataUtils.getPageChunkId(pos);
int offset = DataUtils.getPageOffset(pos);
int start = buff.position();
int pageLength = buff.getInt();
if (pageLength > maxLength) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected page length =< {1}, got {2}",
chunkId, maxLength, pageLength);
}
buff.limit(start + pageLength);
short check = buff.getShort();
int m = DataUtils.readVarInt(buff);
if (m != mapId) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected map id {1}, got {2}",
chunkId, mapId, m);
}
int checkTest = DataUtils.getCheckValue(chunkId)
^ DataUtils.getCheckValue(offset)
^ DataUtils.getCheckValue(pageLength);
if (check != (short) checkTest) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"File corrupted in chunk {0}, expected check value {1}, got {2}",
chunkId, checkTest, check);
}
int len = DataUtils.readVarInt(buff);
int type = buff.get();
boolean node = (type & 1) == DataUtils.PAGE_TYPE_NODE;
if (!node) {
return null;
}
long[] children = new long[len + 1];
for (int i = 0; i <= len; i++) {
children[i] = buff.getLong();
}
return new PageChildren(pos, children);
}
void removeDuplicateChunkReferences() {
HashSet<Integer> chunks = New.hashSet();
// we don't need references to leaves in the same chunk
chunks.add(DataUtils.getPageChunkId(pos));
for (int i = 0; i < children.length; i++) {
long p = children[i];
if (DataUtils.getPageType(p) == DataUtils.PAGE_TYPE_NODE) {
continue;
}
int chunkId = DataUtils.getPageChunkId(p);
if (chunks.add(chunkId)) {
continue;
}
long[] c2 = new long[children.length - 1];
DataUtils.copyExcept(children, c2, children.length, i);
children = c2;
i--;
}
if (children.length == 0) {
children = EMPTY_ARRAY;
}
}
}
} }
...@@ -221,6 +221,7 @@ public class TestConcurrent extends TestMVStore { ...@@ -221,6 +221,7 @@ public class TestConcurrent extends TestMVStore {
} }
FileUtils.deleteRecursive("memFS:", false); FileUtils.deleteRecursive("memFS:", false);
} }
private void testConcurrentFree() throws InterruptedException { private void testConcurrentFree() throws InterruptedException {
String fileName = "memFS:testConcurrentFree.h3"; String fileName = "memFS:testConcurrentFree.h3";
for (int test = 0; test < 10; test++) { for (int test = 0; test < 10; test++) {
...@@ -276,6 +277,13 @@ public class TestConcurrent extends TestMVStore { ...@@ -276,6 +277,13 @@ public class TestConcurrent extends TestMVStore {
} }
} }
task.get(); task.get();
// this will mark old chunks as unused,
// but not remove (and overwrite) them yet
s.commit();
// this will remove them, so we end up with
// one unused one, and one active one
MVMap<Integer, Integer> m = s.openMap("dummy");
m.put(1, 1);
s.commit(); s.commit();
MVMap<String, String> meta = s.getMetaMap(); MVMap<String, String> meta = s.getMetaMap();
...@@ -285,8 +293,7 @@ public class TestConcurrent extends TestMVStore { ...@@ -285,8 +293,7 @@ public class TestConcurrent extends TestMVStore {
chunkCount++; chunkCount++;
} }
} }
// the chunk metadata is not yet written assertTrue("" + chunkCount, chunkCount < 3);
assertEquals(0, chunkCount);
s.close(); s.close();
} }
FileUtils.deleteRecursive("memFS:", false); FileUtils.deleteRecursive("memFS:", false);
......
...@@ -398,19 +398,20 @@ public class TestMVStore extends TestBase { ...@@ -398,19 +398,20 @@ public class TestMVStore extends TestBase {
MVStore s = new MVStore.Builder(). MVStore s = new MVStore.Builder().
fileStore(offHeap). fileStore(offHeap).
open(); open();
int count = 1000;
Map<Integer, String> map = s.openMap("data"); Map<Integer, String> map = s.openMap("data");
for (int i = 0; i < 1000; i++) { for (int i = 0; i < count; i++) {
map.put(i, "Hello " + i); map.put(i, "Hello " + i);
s.commit(); s.commit();
} }
assertTrue(offHeap.getWriteCount() > 1000); assertTrue(offHeap.getWriteCount() > count);
s.close(); s.close();
s = new MVStore.Builder(). s = new MVStore.Builder().
fileStore(offHeap). fileStore(offHeap).
open(); open();
map = s.openMap("data"); map = s.openMap("data");
for (int i = 0; i < 1000; i++) { for (int i = 0; i < count; i++) {
assertEquals("Hello " + i, map.get(i)); assertEquals("Hello " + i, map.get(i));
} }
s.close(); s.close();
...@@ -1734,8 +1735,13 @@ public class TestMVStore extends TestBase { ...@@ -1734,8 +1735,13 @@ public class TestMVStore extends TestBase {
assertTrue(chunkCount2 >= chunkCount1); assertTrue(chunkCount2 >= chunkCount1);
m = s.openMap("data"); m = s.openMap("data");
assertTrue(s.compact(80, 50 * 1024)); for (int i = 0; i < 10; i++) {
assertFalse(s.compact(80, 1024)); boolean result = s.compact(50, 50 * 1024);
if (!result) {
break;
}
}
assertFalse(s.compact(50, 1024));
int chunkCount3 = 0; int chunkCount3 = 0;
for (String k : meta.keySet()) { for (String k : meta.keySet()) {
...@@ -1744,7 +1750,8 @@ public class TestMVStore extends TestBase { ...@@ -1744,7 +1750,8 @@ public class TestMVStore extends TestBase {
} }
} }
assertTrue(chunkCount3 < chunkCount1); assertTrue(chunkCount1 + ">" + chunkCount2 + ">" + chunkCount3,
chunkCount3 < chunkCount1);
for (int i = 0; i < 10 * factor; i++) { for (int i = 0; i < 10 * factor; i++) {
assertEquals("x" + i, "Hello" + (i / factor), m.get(i)); assertEquals("x" + i, "Hello" + (i / factor), m.get(i));
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论