提交 d7feb4c3 authored 作者: Andrei Tokar's avatar Andrei Tokar

fix broken defrag procedure, which leaks memory

上级 fc0670d5
......@@ -1053,7 +1053,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return version
*/
public final long getVersion() {
RootReference rootReference = getRoot();
return getVersion(getRoot());
}
private long getVersion(RootReference rootReference) {
RootReference previous = rootReference.previous;
return previous == null || previous.root != rootReference.root ||
previous.appendCounter != rootReference.appendCounter ?
......@@ -1061,7 +1064,8 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
final boolean hasChangesSince(long version) {
return getVersion() > version;
RootReference rootReference = getRoot();
return !rootReference.root.isSaved() || getVersion(rootReference) > version;
}
public boolean isSingleWriter() {
......@@ -1153,28 +1157,33 @@ public class MVMap<K, V> extends AbstractMap<K, V>
MVStore.TxCounter txCounter = store.registerVersionUsage();
try {
beforeWrite();
setRoot(copy(sourceMap.getRootPage()));
copy(sourceMap.getRootPage(), null, 0);
} finally {
store.deregisterVersionUsage(txCounter);
}
}
private Page copy(Page source) {
private Page copy(Page source, Page parent, int index) {
Page target = source.copy(this);
store.registerUnsavedPage(target.getMemory());
if (parent == null) {
setRoot(target);
} else {
parent.setChild(index, target);
}
if (!source.isLeaf()) {
for (int i = 0; i < getChildPageCount(target); i++) {
if (source.getChildPagePos(i) != 0) {
// position 0 means no child
// (for example the last entry of an r-tree node)
// (the MVMap is also used for r-trees for compacting)
Page child = copy(source.getChildPage(i));
target.setChild(i, child);
copy(source.getChildPage(i), target, i);
}
}
setRoot(target);
beforeWrite();
target.setComplete();
}
store.registerUnsavedPage(target.getMemory());
if (store.isSaveNeeded()) {
store.commit();
}
return target;
}
......
......@@ -130,7 +130,7 @@ MVStore:
/**
* A persistent storage for maps.
*/
public class MVStore {
public class MVStore implements AutoCloseable {
/**
* The block size (physical sector size) of the disk. The store header is
......@@ -1326,9 +1326,7 @@ public class MVStore {
shrinkFileIfPossible(1);
}
for (Page p : changed) {
if (p.getTotalCount() > 0) {
p.writeEnd();
}
p.writeEnd();
}
metaRoot.writeEnd();
......@@ -1346,7 +1344,7 @@ public class MVStore {
*/
private void freeUnusedIfNeeded(long time) {
int freeDelay = retentionTime / 5;
if (time >= lastFreeUnusedChunks + freeDelay) {
if (time - lastFreeUnusedChunks >= freeDelay) {
// set early in case it fails (out of memory or so)
lastFreeUnusedChunks = time;
freeUnusedChunks(true);
......@@ -1391,6 +1389,7 @@ public class MVStore {
* @param fast if true, simplified version is used, which assumes that recent chunks
* are still in-use and do not scan recent versions of the store.
* Also is this case only oldest available version of the store is scanned.
* @return set of chunk ids in-use, or null if all chunks should be considered in-use
*/
private Set<Integer> collectReferencedChunks(boolean fast) {
assert lastChunk != null;
......@@ -1427,6 +1426,15 @@ public class MVStore {
}
}
/**
* Scans all map of a particular store version and marks visited chunks as in-use.
* @param rootReference of the meta map of the version
* @param collector to report visited chunks to
* @param executorService to use for parallel processing
* @param executingThreadCounter counter for threads already in use
* @param inspectedRoots set of page positions for map's roots already inspected
* or null if not to be used
*/
private void inspectVersion(MVMap.RootReference rootReference, ChunkIdsCollector collector,
ThreadPoolExecutor executorService,
AtomicInteger executingThreadCounter,
......@@ -1442,17 +1450,17 @@ public class MVStore {
}
for (Cursor<String, String> c = new Cursor<>(rootPage, "root."); c.hasNext(); ) {
String key = c.next();
assert key != null;
if (!key.startsWith("root.")) {
break;
}
pos = DataUtils.parseHexLong(c.getValue());
assert DataUtils.isPageSaved(pos);
if (inspectedRoots == null || inspectedRoots.add(pos)) {
// to allow for something like "root.tmp.123" to be processed
int mapId = DataUtils.parseHexInt(key.substring(key.lastIndexOf('.') + 1));
collector.setMapId(mapId);
collector.visit(pos, executorService, executingThreadCounter);
if (DataUtils.isPageSaved(pos)) {
if (inspectedRoots == null || inspectedRoots.add(pos)) {
// to allow for something like "root.tmp.123" to be processed
int mapId = DataUtils.parseHexInt(key.substring(key.lastIndexOf('.') + 1));
collector.setMapId(mapId);
collector.visit(pos, executorService, executingThreadCounter);
}
}
}
}
......@@ -1640,12 +1648,12 @@ public class MVStore {
}
freedPageSpace.clear();
}
for (Chunk c : modified) {
meta.put(Chunk.getMetaKey(c.id), c.asString());
}
if (modified.isEmpty()) {
break;
}
for (Chunk c : modified) {
meta.put(Chunk.getMetaKey(c.id), c.asString());
}
markMetaChanged();
}
}
......
......@@ -481,30 +481,22 @@ public class MVStoreTool {
* @param compress whether to compress the data
*/
public static void compact(String sourceFileName, String targetFileName, boolean compress) {
MVStore source = new MVStore.Builder().
fileName(sourceFileName).
readOnly().
open();
// Bugfix - Add double "try-finally" statements to close source and target stores for
//releasing lock and file resources in these stores even if OOM occurs.
// Fix issues such as "Cannot delete file "/h2/data/test.mv.db.tempFile" [90025-197]"
//when client connects to this server and reopens this store database in this process.
// @since 2018-09-13 little-pan
try{
try (MVStore source = new MVStore.Builder().
fileName(sourceFileName).readOnly().open()) {
// Bugfix - Add double "try-finally" statements to close source and target stores for
//releasing lock and file resources in these stores even if OOM occurs.
// Fix issues such as "Cannot delete file "/h2/data/test.mv.db.tempFile" [90025-197]"
//when client connects to this server and reopens this store database in this process.
// @since 2018-09-13 little-pan
FileUtils.delete(targetFileName);
MVStore.Builder b = new MVStore.Builder().
fileName(targetFileName);
if (compress) {
b.compress();
}
MVStore target = b.open();
try{
try (MVStore target = b.open()) {
compact(source, target);
}finally{
target.close();
}
}finally{
source.close();
}
}
......@@ -515,6 +507,10 @@ public class MVStoreTool {
* @param target the target store
*/
public static void compact(MVStore source, MVStore target) {
int autoCommitDelay = target.getAutoCommitDelay();
int retentionTime = target.getRetentionTime();
target.setAutoCommitDelay(0);
target.setRetentionTime(Integer.MAX_VALUE); // disable unused chunks collection
MVMap<String, String> sourceMeta = source.getMetaMap();
MVMap<String, String> targetMeta = target.getMetaMap();
for (Entry<String, String> m : sourceMeta.entrySet()) {
......@@ -540,6 +536,8 @@ public class MVStoreTool {
MVMap<Object, Object> targetMap = target.openMap(mapName, mp);
targetMap.copyFrom(sourceMap);
}
target.setRetentionTime(retentionTime);
target.setAutoCommitDelay(autoCommitDelay);
}
/**
......
......@@ -813,6 +813,12 @@ public abstract class Page implements Cloneable
return mem;
}
public boolean isComplete() {
return true;
}
public void setComplete() {}
/**
* Remove the page.
*/
......@@ -883,14 +889,13 @@ public abstract class Page implements Cloneable
void clearPageReference() {
if (page != null) {
if (!page.isSaved()) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL, "Page not written");
}
page.writeEnd();
assert pos == page.getPos();
assert count == page.getTotalCount();
page = null;
assert page.isSaved() || !page.isComplete();
if (page.isSaved()) {
assert pos == page.getPos();
assert count == page.getTotalCount() : count + " != " + page.getTotalCount();
page = null;
}
}
}
......@@ -900,7 +905,7 @@ public abstract class Page implements Cloneable
void resetPos() {
Page p = page;
if (p != null) {
if (p != null && p.isSaved()) {
pos = p.getPos();
assert count == p.getTotalCount();
}
......@@ -910,12 +915,12 @@ public abstract class Page implements Cloneable
public String toString() {
return "Cnt:" + count + ", pos:" + DataUtils.getPageChunkId(pos) +
"-" + DataUtils.getPageOffset(pos) + ":" + DataUtils.getPageMaxLength(pos) +
(DataUtils.getPageType(pos) == 0 ? " leaf" : " node") + ", " + page;
(page == null ? DataUtils.getPageType(pos) == 0 : page.isLeaf() ? " leaf" : " node") + ", " + page;
}
}
private static final class NonLeaf extends Page
private static class NonLeaf extends Page
{
/**
* The child page references.
......@@ -950,10 +955,7 @@ public abstract class Page implements Cloneable
@Override
public Page copy(MVMap<?, ?> map) {
// replace child pages with empty pages
PageReference[] children = new PageReference[this.children.length];
Arrays.fill(children, PageReference.EMPTY);
return new NonLeaf(map, this, children, 0);
return new IncompleteNonLeaf(map, this);
}
@Override
......@@ -1012,7 +1014,7 @@ public abstract class Page implements Cloneable
@Override
public long getTotalCount() {
assert totalCount == calculateTotalCount() :
assert !isComplete() || totalCount == calculateTotalCount() :
"Total count: " + totalCount + " != " + calculateTotalCount();
return totalCount;
}
......@@ -1026,6 +1028,10 @@ public abstract class Page implements Cloneable
return check;
}
protected void recalculateTotalCount() {
totalCount = calculateTotalCount();
}
@Override
long getCounts(int index) {
return children[index].count;
......@@ -1150,15 +1156,7 @@ public abstract class Page implements Cloneable
void writeUnsavedRecursive(Chunk chunk, WriteBuffer buff) {
if (!isSaved()) {
int patch = write(chunk, buff);
int len = getRawChildPageCount();
for (int i = 0; i < len; i++) {
PageReference ref = children[i];
Page p = ref.getPage();
if (p != null) {
p.writeUnsavedRecursive(chunk, buff);
ref.resetPos();
}
}
writeChildrenRecursive(chunk, buff);
int old = buff.position();
buff.position(patch);
writeChildren(buff, false);
......@@ -1166,6 +1164,18 @@ public abstract class Page implements Cloneable
}
}
void writeChildrenRecursive(Chunk chunk, WriteBuffer buff) {
int len = getRawChildPageCount();
for (int i = 0; i < len; i++) {
PageReference ref = children[i];
Page p = ref.getPage();
if (p != null) {
p.writeUnsavedRecursive(chunk, buff);
ref.resetPos();
}
}
}
@Override
void writeEnd() {
int len = getRawChildPageCount();
......@@ -1202,6 +1212,48 @@ public abstract class Page implements Cloneable
}
private static class IncompleteNonLeaf extends NonLeaf {
private boolean complete;
IncompleteNonLeaf(MVMap<?, ?> map, NonLeaf source) {
super(map, source, constructEmptyPageRefs(source.getRawChildPageCount()), source.getTotalCount());
}
private static PageReference[] constructEmptyPageRefs(int size) {
// replace child pages with empty pages
PageReference[] children = new PageReference[size];
Arrays.fill(children, PageReference.EMPTY);
return children;
}
@Override
void writeUnsavedRecursive(Chunk chunk, WriteBuffer buff) {
if (complete) {
super.writeUnsavedRecursive(chunk, buff);
} else if (!isSaved()) {
writeChildrenRecursive(chunk, buff);
}
}
public boolean isComplete() {
return complete;
}
public void setComplete() {
recalculateTotalCount();
complete = true;
}
@Override
public void dump(StringBuilder buff) {
super.dump(buff);
buff.append(", complete:").append(complete);
}
}
private static class Leaf extends Page
{
/**
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论