提交 9164f30d authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: avoid deadlocks

上级 ef857032
...@@ -169,7 +169,8 @@ public class MVStore { ...@@ -169,7 +169,8 @@ public class MVStore {
* is the unsaved version, the value is the map of chunks. The maps contains * is the unsaved version, the value is the map of chunks. The maps contains
* the number of freed entries per chunk. Access is synchronized. * the number of freed entries per chunk. Access is synchronized.
*/ */
private final HashMap<Long, HashMap<Integer, Chunk>> freedPageSpace = New.hashMap(); private final ConcurrentHashMap<Long, HashMap<Integer, Chunk>> freedPageSpace =
new ConcurrentHashMap<Long, HashMap<Integer, Chunk>>();
/** /**
* The metadata map. * The metadata map.
...@@ -1056,59 +1057,58 @@ public class MVStore { ...@@ -1056,59 +1057,58 @@ public class MVStore {
*/ */
private Set<Chunk> applyFreedSpace(long storeVersion, long time) { private Set<Chunk> applyFreedSpace(long storeVersion, long time) {
Set<Chunk> removedChunks = New.hashSet(); Set<Chunk> removedChunks = New.hashSet();
synchronized (freedPageSpace) { while (true) {
while (true) { ArrayList<Chunk> modified = New.arrayList();
ArrayList<Chunk> modified = New.arrayList(); ArrayList<Long> keys = new ArrayList<Long>(freedPageSpace.keySet());
for (Iterator<Long> it = freedPageSpace.keySet().iterator(); it.hasNext();) { for (Iterator<Long> it = keys.iterator(); it.hasNext();) {
long v = it.next(); long v = it.next();
if (v > storeVersion) { if (v > storeVersion) {
continue;
}
Map<Integer, Chunk> freed = freedPageSpace.get(v);
for (Chunk f : freed.values()) {
Chunk c = chunks.get(f.id);
if (c == null) {
// already removed
continue; continue;
} }
Map<Integer, Chunk> freed = freedPageSpace.get(v); c.maxLengthLive += f.maxLengthLive;
for (Chunk f : freed.values()) { c.pageCountLive += f.pageCountLive;
Chunk c = chunks.get(f.id); if (c.pageCountLive < 0) {
if (c == null) { throw DataUtils.newIllegalStateException(
// already removed DataUtils.ERROR_INTERNAL,
continue; "Corrupt page count {0}", c.pageCountLive);
} }
c.maxLengthLive += f.maxLengthLive; if (c.maxLengthLive < 0) {
c.pageCountLive += f.pageCountLive; throw DataUtils.newIllegalStateException(
if (c.pageCountLive < 0) { DataUtils.ERROR_INTERNAL,
throw DataUtils.newIllegalStateException( "Corrupt max length {0}", c.maxLengthLive);
DataUtils.ERROR_INTERNAL,
"Corrupt page count {0}", c.pageCountLive);
}
if (c.maxLengthLive < 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL,
"Corrupt max length {0}", c.maxLengthLive);
}
if (c.pageCount == 0 && c.maxLengthLive > 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL,
"Corrupt max length {0}", c.maxLengthLive);
}
modified.add(c);
} }
it.remove(); if (c.pageCount == 0 && c.maxLengthLive > 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL,
"Corrupt max length {0}", c.maxLengthLive);
}
modified.add(c);
} }
for (Chunk c : modified) { freedPageSpace.remove(v);
if (c.maxLengthLive == 0) { }
if (canOverwriteChunk(c, time)) { for (Chunk c : modified) {
removedChunks.add(c); if (c.maxLengthLive == 0) {
chunks.remove(c.id); if (canOverwriteChunk(c, time)) {
meta.remove("chunk." + c.id); removedChunks.add(c);
} else { chunks.remove(c.id);
// remove this chunk in the next save operation meta.remove("chunk." + c.id);
registerFreePage(storeVersion + 1, c.id, 0, 0);
}
} else { } else {
meta.put("chunk." + c.id, c.asString()); // remove this chunk in the next save operation
registerFreePage(storeVersion + 1, c.id, 0, 0);
} }
} else {
meta.put("chunk." + c.id, c.asString());
} }
if (modified.size() == 0) { }
break; if (modified.size() == 0) {
} break;
} }
} }
return removedChunks; return removedChunks;
...@@ -1501,20 +1501,18 @@ public class MVStore { ...@@ -1501,20 +1501,18 @@ public class MVStore {
} }
private void registerFreePage(long version, int chunkId, long maxLengthLive, int pageCount) { private void registerFreePage(long version, int chunkId, long maxLengthLive, int pageCount) {
synchronized (freedPageSpace) { HashMap<Integer, Chunk>freed = freedPageSpace.get(version);
HashMap<Integer, Chunk>freed = freedPageSpace.get(version); if (freed == null) {
if (freed == null) { freed = New.hashMap();
freed = New.hashMap(); freedPageSpace.put(version, freed);
freedPageSpace.put(version, freed); }
} Chunk f = freed.get(chunkId);
Chunk f = freed.get(chunkId); if (f == null) {
if (f == null) { f = new Chunk(chunkId);
f = new Chunk(chunkId); freed.put(chunkId, f);
freed.put(chunkId, f);
}
f.maxLengthLive -= maxLengthLive;
f.pageCountLive -= pageCount;
} }
f.maxLengthLive -= maxLengthLive;
f.pageCountLive -= pageCount;
} }
Compressor getCompressor() { Compressor getCompressor() {
...@@ -1741,9 +1739,7 @@ public class MVStore { ...@@ -1741,9 +1739,7 @@ public class MVStore {
fileStore.clear(); fileStore.clear();
} }
maps.clear(); maps.clear();
synchronized (freedPageSpace) { freedPageSpace.clear();
freedPageSpace.clear();
}
currentVersion = version; currentVersion = version;
setWriteVersion(version); setWriteVersion(version);
lastCommittedVersion = version; lastCommittedVersion = version;
...@@ -1831,14 +1827,12 @@ public class MVStore { ...@@ -1831,14 +1827,12 @@ public class MVStore {
} }
private void revertTemp(long storeVersion) { private void revertTemp(long storeVersion) {
synchronized (freedPageSpace) { for (Iterator<Long> it = freedPageSpace.keySet().iterator(); it.hasNext();) {
for (Iterator<Long> it = freedPageSpace.keySet().iterator(); it.hasNext();) { long v = it.next();
long v = it.next(); if (v > storeVersion) {
if (v > storeVersion) { continue;
continue;
}
it.remove();
} }
it.remove();
} }
for (MVMap<?, ?> m : maps.values()) { for (MVMap<?, ?> m : maps.values()) {
m.removeUnusedOldVersions(); m.removeUnusedOldVersions();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论