提交 a71a0e3d authored 作者: Andrei Tokar's avatar Andrei Tokar

replace synchronized(this) in MVStore with ReentrantLock

上级 95a55de4
...@@ -25,7 +25,7 @@ import java.util.Set; ...@@ -25,7 +25,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock;
import org.h2.compress.CompressDeflate; import org.h2.compress.CompressDeflate;
import org.h2.compress.CompressLZF; import org.h2.compress.CompressLZF;
import org.h2.compress.Compressor; import org.h2.compress.Compressor;
...@@ -144,6 +144,14 @@ public class MVStore { ...@@ -144,6 +144,14 @@ public class MVStore {
*/ */
private static final int MARKED_FREE = 10_000_000; private static final int MARKED_FREE = 10_000_000;
/**
* Lock which governs access to major store operations: store(), close(), ...
* It should used in a non-reentrant fashion.
* It serves as a replacement for synchronized(this), except it allows for
* non-blocking lock attempts.
*/
private final ReentrantLock storeLock = new ReentrantLock();
/** /**
* The background thread, if any. * The background thread, if any.
*/ */
...@@ -195,8 +203,7 @@ public class MVStore { ...@@ -195,8 +203,7 @@ public class MVStore {
private final Map<Integer, Chunk> freedPageSpace = new HashMap<>(); private final Map<Integer, Chunk> freedPageSpace = new HashMap<>();
/** /**
* The metadata map. Write access to this map needs to be synchronized on * The metadata map. Write access to this map needs to be done under storeLock.
* the store.
*/ */
private final MVMap<String, String> meta; private final MVMap<String, String> meta;
...@@ -274,12 +281,6 @@ public class MVStore { ...@@ -274,12 +281,6 @@ public class MVStore {
*/ */
private volatile long currentStoreVersion = -1; private volatile long currentStoreVersion = -1;
/**
* Holds reference to a thread performing store operation (if any)
* or null if there is none is in progress.
*/
private final AtomicReference<Thread> currentStoreThread = new AtomicReference<>();
private volatile boolean metaChanged; private volatile boolean metaChanged;
/** /**
...@@ -289,7 +290,9 @@ public class MVStore { ...@@ -289,7 +290,9 @@ public class MVStore {
private final int autoCompactFillRate; private final int autoCompactFillRate;
private long autoCompactLastFileOpCount; private long autoCompactLastFileOpCount;
/**
* Simple lock to ensure that no more than one compaction runs at any given time
*/
private final Object compactSync = new Object(); private final Object compactSync = new Object();
private volatile IllegalStateException panicException; private volatile IllegalStateException panicException;
...@@ -474,51 +477,59 @@ public class MVStore { ...@@ -474,51 +477,59 @@ public class MVStore {
* @param builder the map builder * @param builder the map builder
* @return the map * @return the map
*/ */
public synchronized <M extends MVMap<K, V>, K, V> M openMap( public <M extends MVMap<K, V>, K, V> M openMap(String name, MVMap.MapBuilder<M, K, V> builder) {
String name, MVMap.MapBuilder<M, K, V> builder) { storeLock.lock();
int id = getMapId(name); try {
M map; int id = getMapId(name);
if (id >= 0) { M map;
map = openMap(id, builder); if (id >= 0) {
} else { map = openMap(id, builder);
HashMap<String, Object> c = new HashMap<>(); } else {
id = ++lastMapId; HashMap<String, Object> c = new HashMap<>();
c.put("id", id); id = ++lastMapId;
c.put("createVersion", currentVersion); c.put("id", id);
map = builder.create(this, c); c.put("createVersion", currentVersion);
map.init(); map = builder.create(this, c);
String x = Integer.toHexString(id); map.init();
meta.put(MVMap.getMapKey(id), map.asString(name)); String x = Integer.toHexString(id);
meta.put("name." + name, x); meta.put(MVMap.getMapKey(id), map.asString(name));
map.setRootPos(0, lastStoredVersion); meta.put("name." + name, x);
markMetaChanged(); map.setRootPos(0, lastStoredVersion);
@SuppressWarnings("unchecked") markMetaChanged();
M existingMap = (M)maps.putIfAbsent(id, map); @SuppressWarnings("unchecked")
if(existingMap != null) { M existingMap = (M) maps.putIfAbsent(id, map);
map = existingMap; if (existingMap != null) {
map = existingMap;
}
} }
return map;
} finally {
storeLock.unlock();
} }
return map;
} }
public synchronized <M extends MVMap<K, V>, K, V> M openMap(int id, public <M extends MVMap<K, V>, K, V> M openMap(int id, MVMap.MapBuilder<M, K, V> builder) {
MVMap.MapBuilder<M, K, V> builder) { storeLock.lock();
@SuppressWarnings("unchecked") try {
M map = (M) getMap(id); @SuppressWarnings("unchecked")
if (map == null) { M map = (M) getMap(id);
String configAsString = meta.get(MVMap.getMapKey(id)); if (map == null) {
if(configAsString != null) { String configAsString = meta.get(MVMap.getMapKey(id));
HashMap<String, Object> config = if(configAsString != null) {
new HashMap<String, Object>(DataUtils.parseMap(configAsString)); HashMap<String, Object> config =
config.put("id", id); new HashMap<String, Object>(DataUtils.parseMap(configAsString));
map = builder.create(this, config); config.put("id", id);
map.init(); map = builder.create(this, config);
long root = getRootPos(meta, id); map.init();
map.setRootPos(root, lastStoredVersion); long root = getRootPos(meta, id);
maps.put(id, map); map.setRootPos(root, lastStoredVersion);
maps.put(id, map);
}
} }
return map;
} finally {
storeLock.unlock();
} }
return map;
} }
public <K, V> MVMap<K,V> getMap(int id) { public <K, V> MVMap<K,V> getMap(int id) {
...@@ -954,12 +965,10 @@ public class MVStore { ...@@ -954,12 +965,10 @@ public class MVStore {
if (closed) { if (closed) {
return; return;
} }
// can not synchronize on this yet, because
// the thread also synchronized on this, which
// could result in a deadlock
stopBackgroundThread(); stopBackgroundThread();
closed = true; closed = true;
synchronized (this) { storeLock.lock();
try {
if (fileStore != null && shrinkIfPossible) { if (fileStore != null && shrinkIfPossible) {
shrinkFileIfPossible(0); shrinkFileIfPossible(0);
} }
...@@ -979,6 +988,8 @@ public class MVStore { ...@@ -979,6 +988,8 @@ public class MVStore {
if (fileStore != null && !fileStoreIsProvided) { if (fileStore != null && !fileStoreIsProvided) {
fileStore.close(); fileStore.close();
} }
} finally {
storeLock.unlock();
} }
} }
...@@ -1043,11 +1054,15 @@ public class MVStore { ...@@ -1043,11 +1054,15 @@ public class MVStore {
* @return the new version (incremented if there were changes) * @return the new version (incremented if there were changes)
*/ */
public long tryCommit() { public long tryCommit() {
// unlike synchronization, this will also prevent re-entrance, // we need to prevent re-entrance, which may be possible,
// which may be possible, if the meta map have changed // because meta map is modified within storeNow() and that
if (currentStoreThread.compareAndSet(null, Thread.currentThread())) { // causes beforeWrite() call with possibility of going back here
synchronized (this) { if ((!storeLock.isHeldByCurrentThread() || currentStoreVersion < 0) &&
storeLock.tryLock()) {
try {
store(); store();
} finally {
storeLock.unlock();
} }
} }
return currentVersion; return currentVersion;
...@@ -1069,15 +1084,16 @@ public class MVStore { ...@@ -1069,15 +1084,16 @@ public class MVStore {
* *
* @return the new version (incremented if there were changes) * @return the new version (incremented if there were changes)
*/ */
public synchronized long commit() { public long commit() {
Thread currentThread = Thread.currentThread(); // we need to prevent re-entrance, which may be possible,
Thread storeThread = currentStoreThread.get(); // because meta map is modified within storeNow() and that
if (currentThread != storeThread) { // to avoid re-entrance // causes beforeWrite() call with possibility of going back here
currentStoreThread.set(currentThread); if(!storeLock.isHeldByCurrentThread() || currentStoreVersion < 0) {
storeLock.lock();
try { try {
store(); store();
} finally { } finally {
currentStoreThread.set(storeThread); storeLock.unlock();
} }
} }
return currentVersion; return currentVersion;
...@@ -1111,12 +1127,11 @@ public class MVStore { ...@@ -1111,12 +1127,11 @@ public class MVStore {
// in any case reset the current store version, // in any case reset the current store version,
// to allow closing the store // to allow closing the store
currentStoreVersion = -1; currentStoreVersion = -1;
currentStoreThread.set(null);
} }
} }
private void storeNow() { private void storeNow() {
assert Thread.holdsLock(this); assert storeLock.isHeldByCurrentThread();
long time = getTimeSinceCreation(); long time = getTimeSinceCreation();
freeUnusedIfNeeded(time); freeUnusedIfNeeded(time);
int currentUnsavedPageCount = unsavedMemory; int currentUnsavedPageCount = unsavedMemory;
...@@ -1322,7 +1337,8 @@ public class MVStore { ...@@ -1322,7 +1337,8 @@ public class MVStore {
} }
} }
private synchronized void freeUnusedChunks() { private void freeUnusedChunks() {
assert storeLock.isHeldByCurrentThread();
if (lastChunk != null && reuseSpace) { if (lastChunk != null && reuseSpace) {
Set<Integer> referenced = collectReferencedChunks(); Set<Integer> referenced = collectReferencedChunks();
long time = getTimeSinceCreation(); long time = getTimeSinceCreation();
...@@ -1693,31 +1709,37 @@ public class MVStore { ...@@ -1693,31 +1709,37 @@ public class MVStore {
* *
* @return if anything was written * @return if anything was written
*/ */
public synchronized boolean compactRewriteFully() { public boolean compactRewriteFully() {
checkOpen(); storeLock.lock();
if (lastChunk == null) { try {
// nothing to do checkOpen();
return false; if (lastChunk == null) {
} // nothing to do
for (MVMap<?, ?> m : maps.values()) { return false;
@SuppressWarnings("unchecked") }
MVMap<Object, Object> map = (MVMap<Object, Object>) m; for (MVMap<?, ?> m : maps.values()) {
Cursor<Object, Object> cursor = map.cursor(null); @SuppressWarnings("unchecked")
Page lastPage = null; MVMap<Object, Object> map = (MVMap<Object, Object>) m;
while (cursor.hasNext()) { Cursor<Object, Object> cursor = map.cursor(null);
cursor.next(); Page lastPage = null;
Page p = cursor.getPage(); while (cursor.hasNext()) {
if (p == lastPage) { cursor.next();
continue; Page p = cursor.getPage();
if (p == lastPage) {
continue;
}
Object k = p.getKey(0);
Object v = p.getValue(0);
map.put(k, v);
lastPage = p;
} }
Object k = p.getKey(0);
Object v = p.getValue(0);
map.put(k, v);
lastPage = p;
} }
commit();
return true;
} finally {
storeLock.unlock();
} }
commit();
return true;
} }
/** /**
...@@ -1737,23 +1759,28 @@ public class MVStore { ...@@ -1737,23 +1759,28 @@ public class MVStore {
* than this * than this
* @param moveSize the number of bytes to move * @param moveSize the number of bytes to move
*/ */
public synchronized void compactMoveChunks(int targetFillRate, long moveSize) { public void compactMoveChunks(int targetFillRate, long moveSize) {
checkOpen(); storeLock.lock();
if (lastChunk != null && reuseSpace) { try {
int oldRetentionTime = retentionTime; checkOpen();
boolean oldReuse = reuseSpace; if (lastChunk != null && reuseSpace) {
try { int oldRetentionTime = retentionTime;
retentionTime = -1; boolean oldReuse = reuseSpace;
freeUnusedChunks(); try {
if (fileStore.getFillRate() <= targetFillRate) { retentionTime = -1;
long start = fileStore.getFirstFree() / BLOCK_SIZE; freeUnusedChunks();
ArrayList<Chunk> move = findChunksToMove(start, moveSize); if (fileStore.getFillRate() <= targetFillRate) {
compactMoveChunks(move); long start = fileStore.getFirstFree() / BLOCK_SIZE;
ArrayList<Chunk> move = findChunksToMove(start, moveSize);
compactMoveChunks(move);
}
} finally {
reuseSpace = oldReuse;
retentionTime = oldRetentionTime;
} }
} finally {
reuseSpace = oldReuse;
retentionTime = oldRetentionTime;
} }
} finally {
storeLock.unlock();
} }
} }
...@@ -1896,15 +1923,23 @@ public class MVStore { ...@@ -1896,15 +1923,23 @@ public class MVStore {
synchronized (compactSync) { synchronized (compactSync) {
checkOpen(); checkOpen();
ArrayList<Chunk> old; ArrayList<Chunk> old;
synchronized (this) { // We can't wait fo lock here, because if called from the background thread,
old = findOldChunks(targetFillRate, write); // it might go into deadlock with concurrent database closure
} // and attempt to stop this thread.
if (old == null || old.isEmpty()) { if (storeLock.tryLock()) {
return false; try {
old = findOldChunks(targetFillRate, write);
if (old == null || old.isEmpty()) {
return false;
}
compactRewrite(old);
return true;
} finally {
storeLock.unlock();
}
} }
compactRewrite(old);
return true;
} }
return false;
} }
/** /**
...@@ -2336,10 +2371,15 @@ public class MVStore { ...@@ -2336,10 +2371,15 @@ public class MVStore {
* *
* @param version the new store version * @param version the new store version
*/ */
public synchronized void setStoreVersion(int version) { public void setStoreVersion(int version) {
checkOpen(); storeLock.lock();
markMetaChanged(); try {
meta.put("setting.storeVersion", Integer.toHexString(version)); checkOpen();
markMetaChanged();
meta.put("setting.storeVersion", Integer.toHexString(version));
} finally {
storeLock.unlock();
}
} }
/** /**
...@@ -2358,104 +2398,109 @@ public class MVStore { ...@@ -2358,104 +2398,109 @@ public class MVStore {
* *
* @param version the version to revert to * @param version the version to revert to
*/ */
public synchronized void rollbackTo(long version) { public void rollbackTo(long version) {
checkOpen(); storeLock.lock();
if (version == 0) { try {
// special case: remove all data checkOpen();
for (MVMap<?, ?> m : maps.values()) { if (version == 0) {
m.close(); // special case: remove all data
} for (MVMap<?, ?> m : maps.values()) {
meta.setInitialRoot(meta.createEmptyLeaf(), INITIAL_VERSION); m.close();
}
meta.setInitialRoot(meta.createEmptyLeaf(), INITIAL_VERSION);
chunks.clear(); chunks.clear();
if (fileStore != null) { if (fileStore != null) {
fileStore.clear(); fileStore.clear();
}
maps.clear();
lastChunk = null;
synchronized (freedPageSpace) {
freedPageSpace.clear();
}
versions.clear();
currentVersion = version;
setWriteVersion(version);
metaChanged = false;
lastStoredVersion = INITIAL_VERSION;
return;
} }
maps.clear(); DataUtils.checkArgument(
lastChunk = null; isKnownVersion(version),
synchronized (freedPageSpace) { "Unknown version {0}", version);
freedPageSpace.clear(); for (MVMap<?, ?> m : maps.values()) {
m.rollbackTo(version);
} }
versions.clear();
currentVersion = version;
setWriteVersion(version);
metaChanged = false;
lastStoredVersion = INITIAL_VERSION;
return;
}
DataUtils.checkArgument(
isKnownVersion(version),
"Unknown version {0}", version);
for (MVMap<?, ?> m : maps.values()) {
m.rollbackTo(version);
}
TxCounter txCounter; TxCounter txCounter;
while ((txCounter = versions.peekLast()) != null && txCounter.version >= version) { while ((txCounter = versions.peekLast()) != null && txCounter.version >= version) {
versions.removeLast(); versions.removeLast();
} }
currentTxCounter = new TxCounter(version); currentTxCounter = new TxCounter(version);
meta.rollbackTo(version); meta.rollbackTo(version);
metaChanged = false; metaChanged = false;
boolean loadFromFile = false; boolean loadFromFile = false;
// find out which chunks to remove, // find out which chunks to remove,
// and which is the newest chunk to keep // and which is the newest chunk to keep
// (the chunk list can have gaps) // (the chunk list can have gaps)
ArrayList<Integer> remove = new ArrayList<>(); ArrayList<Integer> remove = new ArrayList<>();
Chunk keep = null; Chunk keep = null;
for (Chunk c : chunks.values()) { for (Chunk c : chunks.values()) {
if (c.version > version) { if (c.version > version) {
remove.add(c.id); remove.add(c.id);
} else if (keep == null || keep.id < c.id) { } else if (keep == null || keep.id < c.id) {
keep = c; keep = c;
} }
} }
if (!remove.isEmpty()) { if (!remove.isEmpty()) {
// remove the youngest first, so we don't create gaps // remove the youngest first, so we don't create gaps
// (in case we remove many chunks) // (in case we remove many chunks)
Collections.sort(remove, Collections.reverseOrder()); Collections.sort(remove, Collections.reverseOrder());
loadFromFile = true; loadFromFile = true;
for (int id : remove) { for (int id : remove) {
Chunk c = chunks.remove(id); Chunk c = chunks.remove(id);
long start = c.block * BLOCK_SIZE; long start = c.block * BLOCK_SIZE;
int length = c.len * BLOCK_SIZE; int length = c.len * BLOCK_SIZE;
fileStore.free(start, length); fileStore.free(start, length);
assert fileStore.getFileLengthInUse() == measureFileLengthInUse() : assert fileStore.getFileLengthInUse() == measureFileLengthInUse() :
fileStore.getFileLengthInUse() + " != " + measureFileLengthInUse(); fileStore.getFileLengthInUse() + " != " + measureFileLengthInUse();
// overwrite the chunk, // overwrite the chunk,
// so it is not be used later on // so it is not be used later on
WriteBuffer buff = getWriteBuffer(); WriteBuffer buff = getWriteBuffer();
buff.limit(length); buff.limit(length);
// buff.clear() does not set the data // buff.clear() does not set the data
Arrays.fill(buff.getBuffer().array(), (byte) 0); Arrays.fill(buff.getBuffer().array(), (byte) 0);
write(start, buff.getBuffer()); write(start, buff.getBuffer());
releaseWriteBuffer(buff); releaseWriteBuffer(buff);
// only really needed if we remove many chunks, when writes are // only really needed if we remove many chunks, when writes are
// re-ordered - but we do it always, because rollback is not // re-ordered - but we do it always, because rollback is not
// performance critical // performance critical
sync(); sync();
} }
lastChunk = keep; lastChunk = keep;
writeStoreHeader(); writeStoreHeader();
readStoreHeader(); readStoreHeader();
} }
for (MVMap<?, ?> m : new ArrayList<>(maps.values())) { for (MVMap<?, ?> m : new ArrayList<>(maps.values())) {
int id = m.getId(); int id = m.getId();
if (m.getCreateVersion() >= version) { if (m.getCreateVersion() >= version) {
m.close(); m.close();
maps.remove(id); maps.remove(id);
} else {
if (loadFromFile) {
m.setRootPos(getRootPos(meta, id), version);
} else { } else {
m.rollbackRoot(version); if (loadFromFile) {
m.setRootPos(getRootPos(meta, id), version);
} else {
m.rollbackRoot(version);
}
} }
} }
} currentVersion = version;
currentVersion = version; if (lastStoredVersion == INITIAL_VERSION) {
if (lastStoredVersion == INITIAL_VERSION) { lastStoredVersion = currentVersion - 1;
lastStoredVersion = currentVersion - 1; }
} finally {
storeLock.unlock();
} }
} }
...@@ -2544,18 +2589,23 @@ public class MVStore { ...@@ -2544,18 +2589,23 @@ public class MVStore {
removeMap(map, true); removeMap(map, true);
} }
public synchronized void removeMap(MVMap<?, ?> map, boolean delayed) { public void removeMap(MVMap<?, ?> map, boolean delayed) {
checkOpen(); storeLock.lock();
DataUtils.checkArgument(map != meta, try {
"Removing the meta map is not allowed"); checkOpen();
map.close(); DataUtils.checkArgument(map != meta,
MVMap.RootReference rootReference = map.getRoot(); "Removing the meta map is not allowed");
updateCounter += rootReference.updateCounter; map.close();
updateAttemptCounter += rootReference.updateAttemptCounter; MVMap.RootReference rootReference = map.getRoot();
updateCounter += rootReference.updateCounter;
int id = map.getId(); updateAttemptCounter += rootReference.updateAttemptCounter;
String name = getMapName(id);
removeMap(name, id, delayed); int id = map.getId();
String name = getMapName(id);
removeMap(name, id, delayed);
} finally {
storeLock.unlock();
}
} }
private void removeMap(String name, int id, boolean delayed) { private void removeMap(String name, int id, boolean delayed) {
...@@ -2681,11 +2731,7 @@ public class MVStore { ...@@ -2681,11 +2731,7 @@ public class MVStore {
synchronized (t.sync) { synchronized (t.sync) {
t.sync.notifyAll(); t.sync.notifyAll();
} }
if (Thread.holdsLock(this)) {
// called from storeNow: can not join,
// because that could result in a deadlock
return;
}
try { try {
t.join(); t.join();
} catch (Exception e) { } catch (Exception e) {
...@@ -2855,11 +2901,11 @@ public class MVStore { ...@@ -2855,11 +2901,11 @@ public class MVStore {
public void deregisterVersionUsage(TxCounter txCounter) { public void deregisterVersionUsage(TxCounter txCounter) {
if(txCounter != null) { if(txCounter != null) {
if(txCounter.counter.decrementAndGet() <= 0) { if(txCounter.counter.decrementAndGet() <= 0) {
if (currentStoreThread.compareAndSet(null, Thread.currentThread())) { if (!storeLock.isHeldByCurrentThread() && storeLock.tryLock()) {
try { try {
dropUnusedVersions(); dropUnusedVersions();
} finally { } finally {
currentStoreThread.set(null); storeLock.unlock();
} }
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论