提交 5de4e30b authored 作者: Andrei Tokar's avatar Andrei Tokar

locking of the append buffer

上级 d6b7a8d9
......@@ -51,6 +51,8 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private final K[] keysBuffer;
private final V[] valuesBuffer;
private final Object lock = new Object();
private volatile boolean notificationRequested;
/**
* Whether the map is closed. Volatile so we don't accidentally write to a
......@@ -425,7 +427,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
Page emptyRootPage = createEmptyLeaf();
int attempt = 0;
do {
rootReference = getRoot();
rootReference = getRootInternal();
} while (!updateRoot(rootReference, emptyRootPage, ++attempt));
rootReference.root.removeAllRecursive();
}
......@@ -780,9 +782,15 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
public final RootReference getRoot() {
return flushAndGetRoot();
}
private RootReference flushAndGetRoot() {
RootReference rootReference = getRootInternal();
return singleWriter && rootReference.getAppendCounter() > 0 ?
flushAppendBuffer(rootReference) : rootReference;
if (singleWriter && rootReference.getAppendCounter() > 0) {
return flushAppendBuffer(rootReference, false);
}
return rootReference;
}
private RootReference getRootInternal() {
......@@ -836,7 +844,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
RootReference updatedRootReference = new RootReference(newRootPage, newVersion, previous, updateCounter,
attemptUpdateCounter, false);
attemptUpdateCounter);
boolean success = root.compareAndSet(currentRoot, updatedRootReference);
return success ? updatedRootReference : null;
}
......@@ -969,13 +977,13 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the number of entries
*/
public final long sizeAsLong() {
RootReference rootReference = getRoot();
RootReference rootReference = getRootInternal();
return rootReference.root.getTotalCount() + rootReference.getAppendCounter();
}
@Override
public boolean isEmpty() {
RootReference rootReference = getRoot();
RootReference rootReference = getRootInternal();
Page rootPage = rootReference.root;
return rootPage.isLeaf() && rootPage.getKeyCount() == 0 && rootReference.getAppendCounter() == 0;
}
......@@ -1192,35 +1200,76 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* If map was used in append mode, this method will ensure that append buffer
* is flushed - emptied with all entries inserted into map as a new leaf.
* @param rootReference current RootReference
* @param lockedForUpdate whether rootReference is pre-locked already and
* should stay locked upon return
* @return potentially updated RootReference
*/
private RootReference flushAppendBuffer(RootReference rootReference) {
private RootReference flushAppendBuffer(RootReference rootReference, boolean lockedForUpdate) {
IntValueHolder unsavedMemoryHolder = new IntValueHolder();
RootReference lockedRootReference = lockedForUpdate ? rootReference : null;
try {
int attempt = 0;
int keyCount;
while((keyCount = rootReference.getAppendCounter()) > 0) {
Page page = Page.createLeaf(this,
Arrays.copyOf(keysBuffer, keyCount),
Arrays.copyOf(valuesBuffer, keyCount),
0);
CursorPos pos = rootReference.root.getAppendCursorPos(null);
assert page.map == this;
while ((keyCount = rootReference.getAppendCounter()) > 0) {
if (lockedRootReference == null) {
lockedRootReference = tryLock(rootReference, ++attempt);
rootReference = lockedRootReference == null ? getRootInternal() : lockedRootReference;
continue;
}
Page rootPage = rootReference.root;
CursorPos pos = rootPage.getAppendCursorPos(null);
assert pos != null;
assert page.getKeyCount() > 0;
Object key = page.getKey(0);
assert pos.index < 0 : pos.index;
int index = -pos.index - 1;
assert index == pos.page.getKeyCount() : index + " != " + pos.page.getKeyCount();
Page p = pos.page;
pos = pos.parent;
CursorPos tip = pos;
int unsavedMemory = page.getMemory();
pos = pos.parent;
int remainingBuffer = 0;
Page page = null;
int available = store.getKeysPerPage() - p.getKeyCount();
if (available > 0) {
p = p.copy();
if (keyCount <= available) {
p.expand(keyCount, keysBuffer, valuesBuffer);
} else {
p.expand(available, keysBuffer, valuesBuffer);
keyCount -= available;
if (lockedForUpdate) {
System.arraycopy(keysBuffer, available, keysBuffer, 0, keyCount);
System.arraycopy(valuesBuffer, available, valuesBuffer, 0, keyCount);
remainingBuffer = keyCount;
} else {
Object[] keys = new Object[keyCount];
Object[] values = new Object[keyCount];
System.arraycopy(keysBuffer, available, keys, 0, keyCount);
System.arraycopy(valuesBuffer, available, values, 0, keyCount);
page = Page.createLeaf(this, keys, values, 0);
}
}
} else {
page = Page.createLeaf(this,
Arrays.copyOf(keysBuffer, keyCount),
Arrays.copyOf(valuesBuffer, keyCount),
0);
}
unsavedMemoryHolder.value = 0;
if (page != null) {
assert page.map == this;
assert page.getKeyCount() > 0;
Object key = page.getKey(0);
unsavedMemoryHolder.value += page.getMemory();
while (true) {
if (pos == null) {
if (p.getKeyCount() == 0) {
p = page;
} else {
Object[] keys = new Object[] { key };
Page.PageReference[] children = new Page.PageReference[] {
Object[] keys = new Object[]{key};
Page.PageReference[] children = new Page.PageReference[]{
new Page.PageReference(p),
new Page.PageReference(page)};
p = Page.createNode(this, keys, children, p.getTotalCount() + page.getTotalCount(), 0);
......@@ -1241,34 +1290,49 @@ public class MVMap<K, V> extends AbstractMap<K, V>
int at = keyCount - 2;
key = p.getKey(at);
page = p.split(at);
unsavedMemory += p.getMemory() + page.getMemory();
unsavedMemoryHolder.value += p.getMemory() + page.getMemory();
}
unsavedMemory += p.getMemory();
while (pos != null) {
Page c = p;
p = pos.page;
p = p.copy();
p.setChild(pos.index, c);
unsavedMemory += p.getMemory();
pos = pos.parent;
}
RootReference updatedRootReference = new RootReference(rootReference, p, ++attempt);
if(root.compareAndSet(rootReference, updatedRootReference)) {
p = replacePage(pos, p, unsavedMemoryHolder);
RootReference updatedRootReference = new RootReference(rootReference, p, remainingBuffer, lockedForUpdate);
if (root.compareAndSet(rootReference, updatedRootReference)) {
lockedRootReference = null;
while (tip != null) {
tip.page.removePage();
tip = tip.parent;
}
if (store.getFileStore() != null) {
store.registerUnsavedPage(unsavedMemory);
store.registerUnsavedPage(unsavedMemoryHolder.value);
}
assert updatedRootReference.getAppendCounter() == 0;
assert lockedForUpdate || updatedRootReference.getAppendCounter() == 0;
return updatedRootReference;
}
rootReference = getRootInternal();
}
} finally {
if (lockedRootReference != null && !lockedForUpdate) {
assert rootReference.root == lockedRootReference.root;
// assert rootReference.appendCounter == lockedRootReference.appendCounter : rootReference.appendCounter + " != " + lockedRootReference.appendCounter;
rootReference = unlockRoot(lockedRootReference.root, lockedRootReference.appendCounter);
}
}
return rootReference;
}
private Page replacePage(CursorPos path, Page replacement, IntValueHolder unsavedMemoryHolder) {
int unsavedMemory = replacement.getMemory();
while (path != null) {
Page child = replacement;
replacement = path.page.copy();
replacement.setChild(path.index, child);
unsavedMemory += replacement.getMemory();
path = path.parent;
}
unsavedMemoryHolder.value += unsavedMemory;
return replacement;
}
/**
* Appends entry to this map. this method is NOT thread safe and can not be used
* neither concurrently, nor in combination with any method that updates this map.
......@@ -1279,22 +1343,28 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
public void append(K key, V value) {
int attempt = 0;
boolean success = false;
while(!success) {
RootReference lockedRootReference = null;
boolean success;
do {
RootReference rootReference = getRootInternal();
if (lockedRootReference == null) {
lockedRootReference = lockRoot(rootReference, ++attempt);
rootReference = lockedRootReference;
}
int appendCounter = rootReference.getAppendCounter();
if (appendCounter >= keysPerPage) {
beforeWrite();
rootReference = flushAppendBuffer(rootReference);
// beforeWrite();
rootReference = flushAppendBuffer(rootReference, true);
appendCounter = rootReference.getAppendCounter();
assert appendCounter < keysPerPage;
}
keysBuffer[appendCounter] = key;
valuesBuffer[appendCounter] = value;
RootReference updatedRootReference = new RootReference(rootReference, appendCounter + 1, ++attempt);
RootReference updatedRootReference = new RootReference(rootReference, appendCounter + 1, ++attempt, false);
success = root.compareAndSet(rootReference, updatedRootReference);
}
} while(!success);
}
/**
......@@ -1304,14 +1374,25 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
public void trimLast() {
int attempt = 0;
boolean success;
do {
RootReference lockedRootReference = null;
RootReference rootReference = getRootInternal();
boolean success = false;
while(!success) {
int appendCounter = rootReference.getAppendCounter();
if (appendCounter > 0) {
RootReference updatedRootReference = new RootReference(rootReference, appendCounter - 1, ++attempt);
if (lockedRootReference == null) {
lockedRootReference = lockRoot(rootReference, ++attempt);
rootReference = lockedRootReference;
continue;
}
// RootReference updatedRootReference = new RootReference(rootReference, appendCounter - 1, ++attempt, rootReference.lockedForUpdate);
RootReference updatedRootReference = new RootReference(rootReference, appendCounter - 1, ++attempt, false);
success = root.compareAndSet(rootReference, updatedRootReference);
rootReference = getRootInternal();
} else {
if (lockedRootReference != null) {
rootReference = unlockRoot(lockedRootReference.root, lockedRootReference.appendCounter);
}
assert rootReference.root.getKeyCount() > 0;
Page lastLeaf = rootReference.root.getAppendCursorPos(null).page;
assert lastLeaf.isLeaf();
......@@ -1320,7 +1401,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
success = remove(key) != null;
assert success;
}
} while(!success);
}
}
@Override
......@@ -1359,38 +1440,46 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
public final byte appendCounter;
RootReference(Page root, long version, RootReference previous,
long updateCounter, long updateAttemptCounter,
boolean lockedForUpdate) {
RootReference(Page root, long version, RootReference previous, long updateCounter, long updateAttemptCounter) {
this.root = root;
this.version = version;
this.previous = previous;
this.updateCounter = updateCounter;
this.updateAttemptCounter = updateAttemptCounter;
this.lockedForUpdate = lockedForUpdate;
this.lockedForUpdate = false;
this.appendCounter = 0;
}
// This one is used for locking
RootReference(RootReference r) {
RootReference(RootReference r, int attempt) {
this.root = r.root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter;
this.updateAttemptCounter = r.updateAttemptCounter;
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.lockedForUpdate = true;
this.appendCounter = 0;
this.appendCounter = r.appendCounter;
}
// This one is used for unlocking
RootReference(RootReference r, Page root, int attempt) {
RootReference(RootReference r, Page root, int appendCounter) {
this.root = root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.updateCounter = r.updateCounter;
this.updateAttemptCounter = r.updateAttemptCounter;
this.lockedForUpdate = false;
this.appendCounter = 0;
this.appendCounter = (byte)appendCounter;
}
RootReference(RootReference r, Page root, int appendCounter, boolean lockedForUpdate) {
this.root = root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter;
this.updateAttemptCounter = r.updateAttemptCounter;
this.lockedForUpdate = lockedForUpdate;
this.appendCounter = (byte)appendCounter;
}
// This one is used for version change
......@@ -1421,13 +1510,13 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
// This one is used for append buffer maintenance
RootReference(RootReference r, int appendCounter, int attempt) {
RootReference(RootReference r, int appendCounter, int attempt, boolean lockedForUpdate) {
this.root = r.root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.lockedForUpdate = r.lockedForUpdate;
this.lockedForUpdate = lockedForUpdate;
this.appendCounter = (byte)appendCounter;
}
......@@ -1703,34 +1792,43 @@ public class MVMap<K, V> extends AbstractMap<K, V>
public V operate(K key, V value, DecisionMaker<? super V> decisionMaker) {
beforeWrite();
IntValueHolder unsavedMemoryHolder = new IntValueHolder();
int attempt = 0;
RootReference oldRootReference = null;
// RootReference oldRootReference = null;
while(true) {
RootReference rootReference = getRoot();
int contention = 0;
if (oldRootReference != null) {
long updateAttemptCounter = rootReference.updateAttemptCounter -
oldRootReference.updateAttemptCounter;
assert updateAttemptCounter >= 0 : updateAttemptCounter;
long updateCounter = rootReference.updateCounter - oldRootReference.updateCounter;
assert updateCounter >= 0 : updateCounter;
assert updateAttemptCounter >= updateCounter : updateAttemptCounter + " >= " + updateCounter;
contention = (int)((updateAttemptCounter+1) / (updateCounter+1));
// int contention = 1;
// if (oldRootReference != null) {
// long updateAttemptCounter = rootReference.updateAttemptCounter -
// oldRootReference.updateAttemptCounter;
// assert updateAttemptCounter >= 0 : updateAttemptCounter;
// long updateCounter = rootReference.updateCounter - oldRootReference.updateCounter;
// assert updateCounter >= 0 : updateCounter;
// assert updateAttemptCounter >= updateCounter : updateAttemptCounter + " >= " + updateCounter;
// contention = (int)((updateAttemptCounter+1) / (updateCounter+1));
// }
// oldRootReference = rootReference;
RootReference lockedRootReference = null;
if ((++attempt > 3 || rootReference.lockedForUpdate)) {
lockedRootReference = lockRoot(rootReference, attempt);
rootReference = lockedRootReference;
}
oldRootReference = rootReference;
++attempt;
CursorPos pos = traverseDown(rootReference.root, key);
Page rootPage = rootReference.root;
int appendCounter = rootReference.getAppendCounter();
CursorPos tip;
V result;
unsavedMemoryHolder.value = 0;
try {
CursorPos pos = traverseDown(rootPage, key);
Page p = pos.page;
int index = pos.index;
CursorPos tip = pos;
tip = pos;
pos = pos.parent;
@SuppressWarnings("unchecked")
V result = index < 0 ? null : (V)p.getValue(index);
//noinspection unchecked
result = index < 0 ? null : (V)p.getValue(index);
Decision decision = decisionMaker.decide(result, value);
int unsavedMemory = 0;
boolean needUnlock = false;
try {
switch (decision) {
case REPEAT:
decisionMaker.reset();
......@@ -1749,10 +1847,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
return null;
}
if (attempt > 2 && !(needUnlock = lockRoot(decisionMaker, rootReference,
attempt, contention))) {
continue;
}
if (p.getTotalCount() == 1 && pos != null) {
p = pos.page;
index = pos.index;
......@@ -1769,10 +1864,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
break;
}
case PUT: {
if (attempt > 2 && !(needUnlock = lockRoot(decisionMaker, rootReference,
attempt, contention))) {
continue;
}
value = decisionMaker.selectValue(result, value);
p = p.copy();
if (index < 0) {
......@@ -1785,11 +1876,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
int at = keyCount >> 1;
Object k = p.getKey(at);
Page split = p.split(at);
unsavedMemory += p.getMemory();
unsavedMemory += split.getMemory();
unsavedMemoryHolder.value += p.getMemory() + split.getMemory();
if (pos == null) {
Object keys[] = { k };
Page.PageReference children[] = {
Object[] keys = { k };
Page.PageReference[] children = {
new Page.PageReference(p),
new Page.PageReference(split)
};
......@@ -1810,6 +1900,8 @@ public class MVMap<K, V> extends AbstractMap<K, V>
break;
}
}
p = replacePage(pos, p, unsavedMemoryHolder);
/*
unsavedMemory += p.getMemory();
while (pos != null) {
Page c = p;
......@@ -1819,61 +1911,99 @@ public class MVMap<K, V> extends AbstractMap<K, V>
unsavedMemory += p.getMemory();
pos = pos.parent;
}
if(needUnlock) {
unlockRoot(p, attempt);
needUnlock = false;
} else if(!updateRoot(rootReference, p, attempt)) {
*/
rootPage = p;
if(lockedRootReference == null && !updateRoot(rootReference, p, attempt)) {
decisionMaker.reset();
continue;
}
} finally {
if(lockedRootReference != null) {
unlockRoot(rootPage, appendCounter);
}
}
while (tip != null) {
tip.page.removePage();
tip = tip.parent;
}
if (store.getFileStore() != null) {
store.registerUnsavedPage(unsavedMemory);
store.registerUnsavedPage(unsavedMemoryHolder.value);
}
return result;
} finally {
if(needUnlock) {
unlockRoot(rootReference.root, attempt);
}
}
private RootReference lockRoot(RootReference rootReference, int attempt) {
while(true) {
RootReference lockedRootReference = tryLock(rootReference, attempt++);
if (lockedRootReference != null) {
return lockedRootReference;
}
rootReference = getRootInternal();
}
}
private boolean lockRoot(DecisionMaker<? super V> decisionMaker, RootReference rootReference,
int attempt, int contention) {
boolean success = lockRoot(rootReference);
if (!success) {
decisionMaker.reset();
private RootReference tryLock(RootReference rootReference, int attempt) {
if (!rootReference.lockedForUpdate) {
RootReference lockedRootReference = new RootReference(rootReference, attempt);
if (root.compareAndSet(rootReference, lockedRootReference)) {
return lockedRootReference;
}
}
RootReference oldRootReference = rootReference.previous;
int contention = 1;
if (oldRootReference != null) {
long updateAttemptCounter = rootReference.updateAttemptCounter -
oldRootReference.updateAttemptCounter;
assert updateAttemptCounter >= 0 : updateAttemptCounter;
long updateCounter = rootReference.updateCounter - oldRootReference.updateCounter;
assert updateCounter >= 0 : updateCounter;
assert updateAttemptCounter >= updateCounter : updateAttemptCounter + " >= " + updateCounter;
contention += (int)((updateAttemptCounter+1) / (updateCounter+1));
}
if(attempt > 4) {
if (attempt <= 24) {
if (attempt <= 12) {
Thread.yield();
} else {
} else if (attempt <= 64) {
try {
Thread.sleep(0, 100 / contention + 50);
Thread.sleep(0, 10 * contention + 5);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
} else {
synchronized (lock) {
notificationRequested = true;
try {
lock.wait(0, 100);
} catch (InterruptedException ignore) {
}
}
}
return success;
}
private boolean lockRoot(RootReference rootReference) {
return !rootReference.lockedForUpdate
&& root.compareAndSet(rootReference, new RootReference(rootReference));
return null;
}
private void unlockRoot(Page newRoot, int attempt) {
private RootReference unlockRoot(Page newRoot, int appendCounter) {
RootReference updatedRootReference;
boolean success;
do {
RootReference rootReference = getRoot();
RootReference updatedRootReference = new RootReference(rootReference, newRoot, attempt);
RootReference rootReference = getRootInternal();
// assert rootReference.lockedForUpdate;
if (!rootReference.lockedForUpdate) {
return rootReference;
}
updatedRootReference = new RootReference(rootReference, newRoot, appendCounter);
success = root.compareAndSet(rootReference, updatedRootReference);
} while(!success);
if (notificationRequested) {
synchronized (lock) {
notificationRequested = false;
lock.notifyAll();;
}
}
return updatedRootReference;
}
private static CursorPos traverseDown(Page p, Object key) {
......@@ -1923,4 +2053,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
}
private static final class IntValueHolder {
int value;
}
}
......@@ -493,6 +493,16 @@ public abstract class Page implements Cloneable
return bKeys;
}
abstract void expand(int extraKeyCount, Object[] extraKeys, Object[] extraValues);
final void expandKeys(int extraKeyCount, Object[] extraKeys) {
int keyCount = getKeyCount();
Object[] newKeys = createKeyStorage(keyCount + extraKeyCount);
System.arraycopy(keys, 0, newKeys, 0, keyCount);
System.arraycopy(extraKeys, 0, newKeys, keyCount, extraKeyCount);
keys = newKeys;
}
/**
* Get the total number of key-value pairs, including child pages.
*
......@@ -1012,6 +1022,11 @@ public abstract class Page implements Cloneable
return newPage;
}
@Override
public void expand(int keyCount, Object[] extraKys, Object[] extraValues) {
throw new UnsupportedOperationException();
}
@Override
public long getTotalCount() {
assert !isComplete() || totalCount == calculateTotalCount() :
......@@ -1324,6 +1339,21 @@ public abstract class Page implements Cloneable
return newPage;
}
@Override
public void expand(int extraKeyCount, Object[] extraKeys, Object[] extraValues) {
int keyCount = getKeyCount();
expandKeys(extraKeyCount, extraKeys);
if(values != null) {
Object[] newValues = createValueStorage(keyCount + extraKeyCount);
System.arraycopy(values, 0, newValues, 0, keyCount);
System.arraycopy(extraValues, 0, newValues, keyCount, extraKeyCount);
values = newValues;
}
if(isPersistent()) {
recalculateMemory();
}
}
@Override
public long getTotalCount() {
return getKeyCount();
......
......@@ -577,7 +577,7 @@ public class Transaction {
}
private static String stateToString(long state) {
return STATUS_NAMES[getStatus(state)] + (hasRollback(state) ? "" : "!") + " " + getLogId(state);
return STATUS_NAMES[getStatus(state)] + (hasRollback(state) ? "<" : "") + " " + getLogId(state);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论