提交 9cc5ebee authored 作者: Andrei Tokar's avatar Andrei Tokar

Refactor code around append buffer locking

上级 5de4e30b
...@@ -427,7 +427,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -427,7 +427,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
Page emptyRootPage = createEmptyLeaf(); Page emptyRootPage = createEmptyLeaf();
int attempt = 0; int attempt = 0;
do { do {
rootReference = getRootInternal(); rootReference = getRoot();
} while (!updateRoot(rootReference, emptyRootPage, ++attempt)); } while (!updateRoot(rootReference, emptyRootPage, ++attempt));
rootReference.root.removeAllRecursive(); rootReference.root.removeAllRecursive();
} }
...@@ -778,25 +778,21 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -778,25 +778,21 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the root page * @return the root page
*/ */
public final Page getRootPage() { public final Page getRootPage() {
return getRoot().root; return flushAndGetRoot().root;
} }
public final RootReference getRoot() { public RootReference getRoot() {
return flushAndGetRoot(); return root.get();
} }
private RootReference flushAndGetRoot() { public RootReference flushAndGetRoot() {
RootReference rootReference = getRootInternal(); RootReference rootReference = getRoot();
if (singleWriter && rootReference.getAppendCounter() > 0) { if (singleWriter && rootReference.getAppendCounter() > 0) {
return flushAppendBuffer(rootReference, false); return flushAppendBuffer(rootReference, false);
} }
return rootReference; return rootReference;
} }
private RootReference getRootInternal() {
return root.get();
}
final void setRoot(Page rootPage) { final void setRoot(Page rootPage) {
int attempt = 0; int attempt = 0;
while (setNewRoot(null, rootPage, ++attempt, false) == null) {/**/} while (setNewRoot(null, rootPage, ++attempt, false) == null) {/**/}
...@@ -819,7 +815,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -819,7 +815,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/ */
private RootReference setNewRoot(RootReference oldRoot, Page newRootPage, private RootReference setNewRoot(RootReference oldRoot, Page newRootPage,
int attemptUpdateCounter, boolean obeyLock) { int attemptUpdateCounter, boolean obeyLock) {
RootReference currentRoot = getRoot(); RootReference currentRoot = flushAndGetRoot();
assert newRootPage != null || currentRoot != null; assert newRootPage != null || currentRoot != null;
if (currentRoot != oldRoot && oldRoot != null) { if (currentRoot != oldRoot && oldRoot != null) {
return null; return null;
...@@ -863,7 +859,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -863,7 +859,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
void rollbackRoot(long version) void rollbackRoot(long version)
{ {
RootReference rootReference = getRoot(); RootReference rootReference = flushAndGetRoot();
RootReference previous; RootReference previous;
while (rootReference.version >= version && (previous = rootReference.previous) != null) { while (rootReference.version >= version && (previous = rootReference.previous) != null) {
if (root.compareAndSet(rootReference, previous)) { if (root.compareAndSet(rootReference, previous)) {
...@@ -977,13 +973,13 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -977,13 +973,13 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the number of entries * @return the number of entries
*/ */
public final long sizeAsLong() { public final long sizeAsLong() {
RootReference rootReference = getRootInternal(); RootReference rootReference = getRoot();
return rootReference.root.getTotalCount() + rootReference.getAppendCounter(); return rootReference.root.getTotalCount() + rootReference.getAppendCounter();
} }
@Override @Override
public boolean isEmpty() { public boolean isEmpty() {
RootReference rootReference = getRootInternal(); RootReference rootReference = getRoot();
Page rootPage = rootReference.root; Page rootPage = rootReference.root;
return rootPage.isLeaf() && rootPage.getKeyCount() == 0 && rootReference.getAppendCounter() == 0; return rootPage.isLeaf() && rootPage.getKeyCount() == 0 && rootReference.getAppendCounter() == 0;
} }
...@@ -1072,7 +1068,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1072,7 +1068,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
final boolean hasChangesSince(long version) { final boolean hasChangesSince(long version) {
RootReference rootReference = getRoot(); RootReference rootReference = getRoot();
Page root = rootReference.root; Page root = rootReference.root;
return !root.isSaved() && root.getTotalCount() > 0 || return !root.isSaved() && rootReference.getTotalCount() > 0 ||
getVersion(rootReference) > version; getVersion(rootReference) > version;
} }
...@@ -1125,7 +1121,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1125,7 +1121,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
final RootReference setWriteVersion(long writeVersion) { final RootReference setWriteVersion(long writeVersion) {
int attempt = 0; int attempt = 0;
while(true) { while(true) {
RootReference rootReference = getRoot(); RootReference rootReference = flushAndGetRoot();
if(rootReference.version >= writeVersion) { if(rootReference.version >= writeVersion) {
return rootReference; return rootReference;
} else if (isClosed()) { } else if (isClosed()) {
...@@ -1213,7 +1209,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1213,7 +1209,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
while ((keyCount = rootReference.getAppendCounter()) > 0) { while ((keyCount = rootReference.getAppendCounter()) > 0) {
if (lockedRootReference == null) { if (lockedRootReference == null) {
lockedRootReference = tryLock(rootReference, ++attempt); lockedRootReference = tryLock(rootReference, ++attempt);
rootReference = lockedRootReference == null ? getRootInternal() : lockedRootReference; rootReference = lockedRootReference == null ? getRoot() : lockedRootReference;
continue; continue;
} }
...@@ -1308,12 +1304,11 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1308,12 +1304,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
assert lockedForUpdate || updatedRootReference.getAppendCounter() == 0; assert lockedForUpdate || updatedRootReference.getAppendCounter() == 0;
return updatedRootReference; return updatedRootReference;
} }
rootReference = getRootInternal(); rootReference = getRoot();
} }
} finally { } finally {
if (lockedRootReference != null && !lockedForUpdate) { if (lockedRootReference != null && !lockedForUpdate) {
assert rootReference.root == lockedRootReference.root; assert rootReference.root == lockedRootReference.root;
// assert rootReference.appendCounter == lockedRootReference.appendCounter : rootReference.appendCounter + " != " + lockedRootReference.appendCounter;
rootReference = unlockRoot(lockedRootReference.root, lockedRootReference.appendCounter); rootReference = unlockRoot(lockedRootReference.root, lockedRootReference.appendCounter);
} }
} }
...@@ -1342,29 +1337,20 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1342,29 +1337,20 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param value to be appended * @param value to be appended
*/ */
public void append(K key, V value) { public void append(K key, V value) {
int attempt = 0; RootReference rootReference = lockRoot(getRoot(), 1);
RootReference lockedRootReference = null; int appendCounter = rootReference.getAppendCounter();
boolean success; try {
do {
RootReference rootReference = getRootInternal();
if (lockedRootReference == null) {
lockedRootReference = lockRoot(rootReference, ++attempt);
rootReference = lockedRootReference;
}
int appendCounter = rootReference.getAppendCounter();
if (appendCounter >= keysPerPage) { if (appendCounter >= keysPerPage) {
// beforeWrite();
rootReference = flushAppendBuffer(rootReference, true); rootReference = flushAppendBuffer(rootReference, true);
appendCounter = rootReference.getAppendCounter(); appendCounter = rootReference.getAppendCounter();
assert appendCounter < keysPerPage; assert appendCounter < keysPerPage;
} }
keysBuffer[appendCounter] = key; keysBuffer[appendCounter] = key;
valuesBuffer[appendCounter] = value; valuesBuffer[appendCounter] = value;
++appendCounter;
RootReference updatedRootReference = new RootReference(rootReference, appendCounter + 1, ++attempt, false); } finally {
success = root.compareAndSet(rootReference, updatedRootReference); unlockRoot(rootReference.root, appendCounter);
} while(!success); }
} }
/** /**
...@@ -1373,34 +1359,24 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1373,34 +1359,24 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Non-updating method may be used concurrently, but latest removal may not be visible. * Non-updating method may be used concurrently, but latest removal may not be visible.
*/ */
public void trimLast() { public void trimLast() {
int attempt = 0; RootReference rootReference = getRoot();
RootReference lockedRootReference = null; int appendCounter = rootReference.getAppendCounter();
RootReference rootReference = getRootInternal(); boolean fallback = appendCounter == 0;
boolean success = false; if (!fallback) {
while(!success) { rootReference = lockRoot(rootReference, 1);
int appendCounter = rootReference.getAppendCounter(); appendCounter = rootReference.getAppendCounter();
if (appendCounter > 0) { fallback = appendCounter == 0;
if (lockedRootReference == null) { if (!fallback) {
lockedRootReference = lockRoot(rootReference, ++attempt); --appendCounter;
rootReference = lockedRootReference;
continue;
}
// RootReference updatedRootReference = new RootReference(rootReference, appendCounter - 1, ++attempt, rootReference.lockedForUpdate);
RootReference updatedRootReference = new RootReference(rootReference, appendCounter - 1, ++attempt, false);
success = root.compareAndSet(rootReference, updatedRootReference);
rootReference = getRootInternal();
} else {
if (lockedRootReference != null) {
rootReference = unlockRoot(lockedRootReference.root, lockedRootReference.appendCounter);
}
assert rootReference.root.getKeyCount() > 0;
Page lastLeaf = rootReference.root.getAppendCursorPos(null).page;
assert lastLeaf.isLeaf();
assert lastLeaf.getKeyCount() > 0;
Object key = lastLeaf.getKey(lastLeaf.getKeyCount() - 1);
success = remove(key) != null;
assert success;
} }
unlockRoot(rootReference.root, appendCounter);
}
if (fallback) {
Page lastLeaf = rootReference.root.getAppendCursorPos(null).page;
assert lastLeaf.isLeaf();
assert lastLeaf.getKeyCount() > 0;
Object key = lastLeaf.getKey(lastLeaf.getKeyCount() - 1);
remove(key);
} }
} }
...@@ -1462,16 +1438,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1462,16 +1438,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
// This one is used for unlocking // This one is used for unlocking
RootReference(RootReference r, Page root, int appendCounter) {
this.root = root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter;
this.updateAttemptCounter = r.updateAttemptCounter;
this.lockedForUpdate = false;
this.appendCounter = (byte)appendCounter;
}
RootReference(RootReference r, Page root, int appendCounter, boolean lockedForUpdate) { RootReference(RootReference r, Page root, int appendCounter, boolean lockedForUpdate) {
this.root = root; this.root = root;
this.version = r.version; this.version = r.version;
...@@ -1509,21 +1475,14 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1509,21 +1475,14 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.appendCounter = 0; this.appendCounter = 0;
} }
// This one is used for append buffer maintenance
RootReference(RootReference r, int appendCounter, int attempt, boolean lockedForUpdate) {
this.root = r.root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.lockedForUpdate = lockedForUpdate;
this.appendCounter = (byte)appendCounter;
}
public int getAppendCounter() { public int getAppendCounter() {
return appendCounter & 0xff; return appendCounter & 0xff;
} }
public long getTotalCount() {
return root.getTotalCount() + getAppendCounter();
}
@Override @Override
public String toString() { public String toString() {
return "RootReference("+ System.identityHashCode(root)+","+version+","+ lockedForUpdate +")"; return "RootReference("+ System.identityHashCode(root)+","+version+","+ lockedForUpdate +")";
...@@ -1794,21 +1753,8 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1794,21 +1753,8 @@ public class MVMap<K, V> extends AbstractMap<K, V>
beforeWrite(); beforeWrite();
IntValueHolder unsavedMemoryHolder = new IntValueHolder(); IntValueHolder unsavedMemoryHolder = new IntValueHolder();
int attempt = 0; int attempt = 0;
// RootReference oldRootReference = null;
while(true) { while(true) {
RootReference rootReference = getRoot(); RootReference rootReference = flushAndGetRoot();
// int contention = 1;
// if (oldRootReference != null) {
// long updateAttemptCounter = rootReference.updateAttemptCounter -
// oldRootReference.updateAttemptCounter;
// assert updateAttemptCounter >= 0 : updateAttemptCounter;
// long updateCounter = rootReference.updateCounter - oldRootReference.updateCounter;
// assert updateCounter >= 0 : updateCounter;
// assert updateAttemptCounter >= updateCounter : updateAttemptCounter + " >= " + updateCounter;
// contention = (int)((updateAttemptCounter+1) / (updateCounter+1));
// }
// oldRootReference = rootReference;
RootReference lockedRootReference = null; RootReference lockedRootReference = null;
if ((++attempt > 3 || rootReference.lockedForUpdate)) { if ((++attempt > 3 || rootReference.lockedForUpdate)) {
lockedRootReference = lockRoot(rootReference, attempt); lockedRootReference = lockRoot(rootReference, attempt);
...@@ -1901,17 +1847,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1901,17 +1847,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
} }
p = replacePage(pos, p, unsavedMemoryHolder); p = replacePage(pos, p, unsavedMemoryHolder);
/*
unsavedMemory += p.getMemory();
while (pos != null) {
Page c = p;
p = pos.page;
p = p.copy();
p.setChild(pos.index, c);
unsavedMemory += p.getMemory();
pos = pos.parent;
}
*/
rootPage = p; rootPage = p;
if(lockedRootReference == null && !updateRoot(rootReference, p, attempt)) { if(lockedRootReference == null && !updateRoot(rootReference, p, attempt)) {
decisionMaker.reset(); decisionMaker.reset();
...@@ -1939,7 +1874,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1939,7 +1874,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (lockedRootReference != null) { if (lockedRootReference != null) {
return lockedRootReference; return lockedRootReference;
} }
rootReference = getRootInternal(); rootReference = getRoot();
} }
} }
...@@ -1966,7 +1901,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1966,7 +1901,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if(attempt > 4) { if(attempt > 4) {
if (attempt <= 12) { if (attempt <= 12) {
Thread.yield(); Thread.yield();
} else if (attempt <= 64) { } else if (attempt <= 24) {
try { try {
Thread.sleep(0, 10 * contention + 5); Thread.sleep(0, 10 * contention + 5);
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
...@@ -1976,7 +1911,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1976,7 +1911,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
synchronized (lock) { synchronized (lock) {
notificationRequested = true; notificationRequested = true;
try { try {
lock.wait(0, 100); lock.wait(100);
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
} }
} }
...@@ -1989,14 +1924,12 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1989,14 +1924,12 @@ public class MVMap<K, V> extends AbstractMap<K, V>
RootReference updatedRootReference; RootReference updatedRootReference;
boolean success; boolean success;
do { do {
RootReference rootReference = getRootInternal(); RootReference rootReference = getRoot();
// assert rootReference.lockedForUpdate; assert rootReference.lockedForUpdate;
if (!rootReference.lockedForUpdate) { updatedRootReference = new RootReference(rootReference, newRoot, appendCounter, false);
return rootReference;
}
updatedRootReference = new RootReference(rootReference, newRoot, appendCounter);
success = root.compareAndSet(rootReference, updatedRootReference); success = root.compareAndSet(rootReference, updatedRootReference);
} while(!success); } while(!success);
if (notificationRequested) { if (notificationRequested) {
synchronized (lock) { synchronized (lock) {
notificationRequested = false; notificationRequested = false;
......
...@@ -1400,7 +1400,7 @@ public class MVStore implements AutoCloseable { ...@@ -1400,7 +1400,7 @@ public class MVStore implements AutoCloseable {
try { try {
ChunkIdsCollector collector = new ChunkIdsCollector(meta.getId()); ChunkIdsCollector collector = new ChunkIdsCollector(meta.getId());
long oldestVersionToKeep = getOldestVersionToKeep(); long oldestVersionToKeep = getOldestVersionToKeep();
MVMap.RootReference rootReference = meta.getRoot(); MVMap.RootReference rootReference = meta.flushAndGetRoot();
if (fast) { if (fast) {
MVMap.RootReference previous; MVMap.RootReference previous;
while (rootReference.version >= oldestVersionToKeep && (previous = rootReference.previous) != null) { while (rootReference.version >= oldestVersionToKeep && (previous = rootReference.previous) != null) {
......
...@@ -134,7 +134,7 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -134,7 +134,7 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
int attempt = 0; int attempt = 0;
while(true) { while(true) {
++attempt; ++attempt;
RootReference rootReference = getRoot(); RootReference rootReference = flushAndGetRoot();
Page p = rootReference.root.copy(true); Page p = rootReference.root.copy(true);
V result = operate(p, key, value, decisionMaker); V result = operate(p, key, value, decisionMaker);
if (!p.isLeaf() && p.getTotalCount() == 0) { if (!p.isLeaf() && p.getTotalCount() == 0) {
......
...@@ -104,7 +104,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> { ...@@ -104,7 +104,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
long undoLogSize; long undoLogSize;
do { do {
committingTransactions = store.committingTransactions.get(); committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot(); mapRootReference = map.flushAndGetRoot();
BitSet opentransactions = store.openTransactions.get(); BitSet opentransactions = store.openTransactions.get();
undoLogRootReferences = new MVMap.RootReference[opentransactions.length()]; undoLogRootReferences = new MVMap.RootReference[opentransactions.length()];
undoLogSize = 0; undoLogSize = 0;
...@@ -113,7 +113,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> { ...@@ -113,7 +113,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
if (undoLog != null) { if (undoLog != null) {
MVMap.RootReference rootReference = undoLog.getRoot(); MVMap.RootReference rootReference = undoLog.getRoot();
undoLogRootReferences[i] = rootReference; undoLogRootReferences[i] = rootReference;
undoLogSize += rootReference.root.getTotalCount() + rootReference.getAppendCounter(); undoLogSize += rootReference.getTotalCount();
} }
} }
} while(committingTransactions != store.committingTransactions.get() || } while(committingTransactions != store.committingTransactions.get() ||
...@@ -124,7 +124,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> { ...@@ -124,7 +124,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
// should be considered as committed. // should be considered as committed.
// Subsequent processing uses this snapshot info only. // Subsequent processing uses this snapshot info only.
Page mapRootPage = mapRootReference.root; Page mapRootPage = mapRootReference.root;
long size = mapRootPage.getTotalCount(); long size = mapRootReference.getTotalCount();
// if we are looking at the map without any uncommitted values // if we are looking at the map without any uncommitted values
if (undoLogSize == 0) { if (undoLogSize == 0) {
return size; return size;
...@@ -240,6 +240,16 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> { ...@@ -240,6 +240,16 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
return set(key, decisionMaker); return set(key, decisionMaker);
} }
/**
* Appends entry to uderlying map. This method may be used concurrently,
* but latest appended values are not guaranteed to be visible.
* @param key should be higher in map's order than any existing key
* @param value to be appended
*/
public void append(K key, V value) {
map.append(key, VersionedValue.getInstance(transaction.log(map.getId(), key, null), value, null));
}
/** /**
* Lock row for the given key. * Lock row for the given key.
* <p> * <p>
...@@ -678,7 +688,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> { ...@@ -678,7 +688,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
MVMap.RootReference mapRootReference; MVMap.RootReference mapRootReference;
do { do {
committingTransactions = store.committingTransactions.get(); committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot(); mapRootReference = map.flushAndGetRoot();
} while (committingTransactions != store.committingTransactions.get()); } while (committingTransactions != store.committingTransactions.get());
// Now we have a snapshot, where mapRootReference points to state of the map // Now we have a snapshot, where mapRootReference points to state of the map
// and committingTransactions mask tells us which of seemingly uncommitted changes // and committingTransactions mask tells us which of seemingly uncommitted changes
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论