Unverified 提交 422f2826 authored 作者: Andrei Tokar's avatar Andrei Tokar 提交者: GitHub

Merge pull request #1627 from h2database/append-single

Use lock to protect append buffer
......@@ -48,9 +48,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private final DataType valueType;
private final int keysPerPage;
private final boolean singleWriter;
private final K keysBuffer[];
private final V valuesBuffer[];
private final K[] keysBuffer;
private final V[] valuesBuffer;
private final Object lock = new Object();
private volatile boolean notificationRequested;
/**
* Whether the map is closed. Volatile so we don't accidentally write to a
......@@ -619,16 +621,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return 0;
}
assert p.getKeyCount() > 0;
@SuppressWarnings("unchecked")
K key = (K) p.getKey(0);
V value = get(key);
if (value != null) {
if (isClosed()) {
return 0;
}
replace(key, value, value);
}
return 1;
return rewritePage(p) ? 0 : 1;
}
int writtenPageCount = 0;
for (int i = 0; i < getChildPageCount(p); i++) {
......@@ -658,19 +651,26 @@ public class MVMap<K, V> extends AbstractMap<K, V>
while (!p2.isLeaf()) {
p2 = p2.getChildPage(0);
}
if (rewritePage(p2)) {
return 0;
}
writtenPageCount++;
}
}
return writtenPageCount;
}
private boolean rewritePage(Page p) {
@SuppressWarnings("unchecked")
K key = (K) p2.getKey(0);
K key = (K) p.getKey(0);
V value = get(key);
if (value != null) {
if (isClosed()) {
return 0;
return true;
}
replace(key, value, value);
}
writtenPageCount++;
}
}
return writtenPageCount;
return false;
}
/**
......@@ -779,17 +779,19 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the root page
*/
public final Page getRootPage() {
return getRoot().root;
return flushAndGetRoot().root;
}
public final RootReference getRoot() {
RootReference rootReference = getRootInternal();
return singleWriter && rootReference.getAppendCounter() > 0 ?
flushAppendBuffer(rootReference) : rootReference;
public RootReference getRoot() {
return root.get();
}
private RootReference getRootInternal() {
return root.get();
public RootReference flushAndGetRoot() {
RootReference rootReference = getRoot();
if (singleWriter && rootReference.getAppendCounter() > 0) {
return flushAppendBuffer(rootReference, false);
}
return rootReference;
}
final void setRoot(Page rootPage) {
......@@ -814,13 +816,14 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
private RootReference setNewRoot(RootReference oldRoot, Page newRootPage,
int attemptUpdateCounter, boolean obeyLock) {
RootReference currentRoot = getRoot();
RootReference currentRoot = flushAndGetRoot();
assert newRootPage != null || currentRoot != null;
if (currentRoot != oldRoot && oldRoot != null) {
return null;
}
RootReference previous = currentRoot;
int appendCounter = 0;
long updateCounter = 1;
long newVersion = INITIAL_VERSION;
if(currentRoot != null) {
......@@ -834,12 +837,13 @@ public class MVMap<K, V> extends AbstractMap<K, V>
newVersion = currentRoot.version;
previous = currentRoot.previous;
appendCounter = currentRoot.getAppendCounter();
updateCounter += currentRoot.updateCounter;
attemptUpdateCounter += currentRoot.updateAttemptCounter;
}
RootReference updatedRootReference = new RootReference(newRootPage, newVersion, previous, updateCounter,
attemptUpdateCounter, false);
RootReference updatedRootReference = new RootReference(newRootPage, newVersion, previous, appendCounter,
updateCounter, attemptUpdateCounter);
boolean success = root.compareAndSet(currentRoot, updatedRootReference);
return success ? updatedRootReference : null;
}
......@@ -858,7 +862,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
void rollbackRoot(long version)
{
RootReference rootReference = getRoot();
RootReference rootReference = flushAndGetRoot();
RootReference previous;
while (rootReference.version >= version && (previous = rootReference.previous) != null) {
if (root.compareAndSet(rootReference, previous)) {
......@@ -1070,7 +1074,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
final boolean hasChangesSince(long version) {
RootReference rootReference = getRoot();
Page root = rootReference.root;
return !root.isSaved() && root.getTotalCount() > 0 ||
return !root.isSaved() && rootReference.getTotalCount() > 0 ||
getVersion(rootReference) > version;
}
......@@ -1123,7 +1127,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
final RootReference setWriteVersion(long writeVersion) {
int attempt = 0;
while(true) {
RootReference rootReference = getRoot();
RootReference rootReference = flushAndGetRoot();
if(rootReference.version >= writeVersion) {
return rootReference;
} else if (isClosed()) {
......@@ -1198,35 +1202,76 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* If map was used in append mode, this method will ensure that append buffer
* is flushed - emptied with all entries inserted into map as a new leaf.
* @param rootReference current RootReference
* @param lockedForUpdate whether rootReference is pre-locked already and
* should stay locked upon return
* @return potentially updated RootReference
*/
private RootReference flushAppendBuffer(RootReference rootReference) {
private RootReference flushAppendBuffer(RootReference rootReference, boolean lockedForUpdate) {
IntValueHolder unsavedMemoryHolder = new IntValueHolder();
RootReference lockedRootReference = lockedForUpdate ? rootReference : null;
try {
int attempt = 0;
int keyCount;
while((keyCount = rootReference.getAppendCounter()) > 0) {
Page page = Page.createLeaf(this,
Arrays.copyOf(keysBuffer, keyCount),
Arrays.copyOf(valuesBuffer, keyCount),
0);
CursorPos pos = rootReference.root.getAppendCursorPos(null);
assert page.map == this;
while ((keyCount = rootReference.getAppendCounter()) > 0) {
if (lockedRootReference == null) {
lockedRootReference = tryLock(rootReference, ++attempt);
rootReference = lockedRootReference == null ? getRoot() : lockedRootReference;
continue;
}
Page rootPage = rootReference.root;
CursorPos pos = rootPage.getAppendCursorPos(null);
assert pos != null;
assert page.getKeyCount() > 0;
Object key = page.getKey(0);
assert pos.index < 0 : pos.index;
int index = -pos.index - 1;
assert index == pos.page.getKeyCount() : index + " != " + pos.page.getKeyCount();
Page p = pos.page;
pos = pos.parent;
CursorPos tip = pos;
int unsavedMemory = page.getMemory();
pos = pos.parent;
int remainingBuffer = 0;
Page page = null;
int available = store.getKeysPerPage() - p.getKeyCount();
if (available > 0) {
p = p.copy();
if (keyCount <= available) {
p.expand(keyCount, keysBuffer, valuesBuffer);
} else {
p.expand(available, keysBuffer, valuesBuffer);
keyCount -= available;
if (lockedForUpdate) {
System.arraycopy(keysBuffer, available, keysBuffer, 0, keyCount);
System.arraycopy(valuesBuffer, available, valuesBuffer, 0, keyCount);
remainingBuffer = keyCount;
} else {
Object[] keys = new Object[keyCount];
Object[] values = new Object[keyCount];
System.arraycopy(keysBuffer, available, keys, 0, keyCount);
System.arraycopy(valuesBuffer, available, values, 0, keyCount);
page = Page.createLeaf(this, keys, values, 0);
}
}
} else {
page = Page.createLeaf(this,
Arrays.copyOf(keysBuffer, keyCount),
Arrays.copyOf(valuesBuffer, keyCount),
0);
}
unsavedMemoryHolder.value = 0;
if (page != null) {
assert page.map == this;
assert page.getKeyCount() > 0;
Object key = page.getKey(0);
unsavedMemoryHolder.value += page.getMemory();
while (true) {
if (pos == null) {
if (p.getKeyCount() == 0) {
p = page;
} else {
Object keys[] = new Object[] { key };
Page.PageReference children[] = new Page.PageReference[] {
Object[] keys = new Object[]{key};
Page.PageReference[] children = new Page.PageReference[]{
new Page.PageReference(p),
new Page.PageReference(page)};
p = Page.createNode(this, keys, children, p.getTotalCount() + page.getTotalCount(), 0);
......@@ -1247,34 +1292,48 @@ public class MVMap<K, V> extends AbstractMap<K, V>
int at = keyCount - 2;
key = p.getKey(at);
page = p.split(at);
unsavedMemory += p.getMemory() + page.getMemory();
unsavedMemoryHolder.value += p.getMemory() + page.getMemory();
}
unsavedMemory += p.getMemory();
while (pos != null) {
Page c = p;
p = pos.page;
p = p.copy();
p.setChild(pos.index, c);
unsavedMemory += p.getMemory();
pos = pos.parent;
}
RootReference updatedRootReference = new RootReference(rootReference, p, ++attempt);
if(root.compareAndSet(rootReference, updatedRootReference)) {
p = replacePage(pos, p, unsavedMemoryHolder);
RootReference updatedRootReference = new RootReference(rootReference, p, remainingBuffer, lockedForUpdate);
if (root.compareAndSet(rootReference, updatedRootReference)) {
lockedRootReference = null;
while (tip != null) {
tip.page.removePage();
tip = tip.parent;
}
if (store.getFileStore() != null) {
store.registerUnsavedPage(unsavedMemory);
store.registerUnsavedPage(unsavedMemoryHolder.value);
}
assert updatedRootReference.getAppendCounter() == 0;
assert lockedForUpdate || updatedRootReference.getAppendCounter() == 0;
return updatedRootReference;
}
rootReference = getRootInternal();
rootReference = getRoot();
}
} finally {
if (lockedRootReference != null && !lockedForUpdate) {
assert rootReference.root == lockedRootReference.root;
rootReference = unlockRoot(lockedRootReference.root, lockedRootReference.appendCounter);
}
}
return rootReference;
}
private static Page replacePage(CursorPos path, Page replacement, IntValueHolder unsavedMemoryHolder) {
int unsavedMemory = replacement.getMemory();
while (path != null) {
Page child = replacement;
replacement = path.page.copy();
replacement.setChild(path.index, child);
unsavedMemory += replacement.getMemory();
path = path.parent;
}
unsavedMemoryHolder.value += unsavedMemory;
return replacement;
}
/**
* Appends entry to this map. this method is NOT thread safe and can not be used
* neither concurrently, nor in combination with any method that updates this map.
......@@ -1284,22 +1343,19 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param value to be appended
*/
public void append(K key, V value) {
int attempt = 0;
boolean success = false;
while(!success) {
RootReference rootReference = getRootInternal();
RootReference rootReference = lockRoot(getRoot(), 1);
int appendCounter = rootReference.getAppendCounter();
try {
if (appendCounter >= keysPerPage) {
beforeWrite();
rootReference = flushAppendBuffer(rootReference);
rootReference = flushAppendBuffer(rootReference, true);
appendCounter = rootReference.getAppendCounter();
assert appendCounter < keysPerPage;
}
keysBuffer[appendCounter] = key;
valuesBuffer[appendCounter] = value;
RootReference updatedRootReference = new RootReference(rootReference, appendCounter + 1, ++attempt);
success = root.compareAndSet(rootReference, updatedRootReference);
++appendCounter;
} finally {
unlockRoot(rootReference.root, appendCounter);
}
}
......@@ -1309,24 +1365,25 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Non-updating method may be used concurrently, but latest removal may not be visible.
*/
public void trimLast() {
int attempt = 0;
boolean success;
do {
RootReference rootReference = getRootInternal();
RootReference rootReference = getRoot();
int appendCounter = rootReference.getAppendCounter();
if (appendCounter > 0) {
RootReference updatedRootReference = new RootReference(rootReference, appendCounter - 1, ++attempt);
success = root.compareAndSet(rootReference, updatedRootReference);
} else {
assert rootReference.root.getKeyCount() > 0;
boolean useRegularRemove = appendCounter == 0;
if (!useRegularRemove) {
rootReference = lockRoot(rootReference, 1);
appendCounter = rootReference.getAppendCounter();
useRegularRemove = appendCounter == 0;
if (!useRegularRemove) {
--appendCounter;
}
unlockRoot(rootReference.root, appendCounter);
}
if (useRegularRemove) {
Page lastLeaf = rootReference.root.getAppendCursorPos(null).page;
assert lastLeaf.isLeaf();
assert lastLeaf.getKeyCount() > 0;
Object key = lastLeaf.getKey(lastLeaf.getKeyCount() - 1);
success = remove(key) != null;
assert success;
remove(key);
}
} while(!success);
}
@Override
......@@ -1365,38 +1422,36 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
public final byte appendCounter;
RootReference(Page root, long version, RootReference previous,
long updateCounter, long updateAttemptCounter,
boolean lockedForUpdate) {
RootReference(Page root, long version, RootReference previous, int appendCounter, long updateCounter, long updateAttemptCounter) {
this.root = root;
this.version = version;
this.previous = previous;
this.updateCounter = updateCounter;
this.updateAttemptCounter = updateAttemptCounter;
this.lockedForUpdate = lockedForUpdate;
this.appendCounter = 0;
this.lockedForUpdate = false;
this.appendCounter = (byte)appendCounter;
}
// This one is used for locking
RootReference(RootReference r) {
RootReference(RootReference r, int attempt) {
this.root = r.root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter;
this.updateAttemptCounter = r.updateAttemptCounter;
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.lockedForUpdate = true;
this.appendCounter = 0;
this.appendCounter = r.appendCounter;
}
// This one is used for unlocking
RootReference(RootReference r, Page root, int attempt) {
RootReference(RootReference r, Page root, int appendCounter, boolean lockedForUpdate) {
this.root = root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.lockedForUpdate = false;
this.appendCounter = 0;
this.updateCounter = r.updateCounter;
this.updateAttemptCounter = r.updateAttemptCounter;
this.lockedForUpdate = lockedForUpdate;
this.appendCounter = (byte)appendCounter;
}
// This one is used for version change
......@@ -1426,21 +1481,14 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.appendCounter = 0;
}
// This one is used for append buffer maintenance
RootReference(RootReference r, int appendCounter, int attempt) {
this.root = r.root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.lockedForUpdate = r.lockedForUpdate;
this.appendCounter = (byte)appendCounter;
}
public int getAppendCounter() {
return appendCounter & 0xff;
}
public long getTotalCount() {
return root.getTotalCount() + getAppendCounter();
}
@Override
public String toString() {
return "RootReference("+ System.identityHashCode(root)+","+version+","+ lockedForUpdate +")";
......@@ -1707,36 +1755,32 @@ public class MVMap<K, V> extends AbstractMap<K, V>
public void reset() {}
}
@SuppressWarnings("unchecked")
public V operate(K key, V value, DecisionMaker<? super V> decisionMaker) {
beforeWrite();
IntValueHolder unsavedMemoryHolder = new IntValueHolder();
int attempt = 0;
RootReference oldRootReference = null;
while(true) {
RootReference rootReference = getRoot();
int contention = 0;
if (oldRootReference != null) {
long updateAttemptCounter = rootReference.updateAttemptCounter -
oldRootReference.updateAttemptCounter;
assert updateAttemptCounter >= 0 : updateAttemptCounter;
long updateCounter = rootReference.updateCounter - oldRootReference.updateCounter;
assert updateCounter >= 0 : updateCounter;
assert updateAttemptCounter >= updateCounter : updateAttemptCounter + " >= " + updateCounter;
contention = (int)((updateAttemptCounter+1) / (updateCounter+1));
RootReference rootReference = flushAndGetRoot();
RootReference lockedRootReference = null;
if ((++attempt > 3 || rootReference.lockedForUpdate)) {
lockedRootReference = lockRoot(rootReference, attempt);
rootReference = lockedRootReference;
}
oldRootReference = rootReference;
++attempt;
CursorPos pos = traverseDown(rootReference.root, key);
Page rootPage = rootReference.root;
int appendCounter = rootReference.getAppendCounter();
CursorPos tip;
V result;
unsavedMemoryHolder.value = 0;
try {
CursorPos pos = traverseDown(rootPage, key);
Page p = pos.page;
int index = pos.index;
CursorPos tip = pos;
tip = pos;
pos = pos.parent;
@SuppressWarnings("unchecked")
V result = index < 0 ? null : (V)p.getValue(index);
result = index < 0 ? null : (V)p.getValue(index);
Decision decision = decisionMaker.decide(result, value);
int unsavedMemory = 0;
boolean needUnlock = false;
try {
switch (decision) {
case REPEAT:
decisionMaker.reset();
......@@ -1755,10 +1799,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
return null;
}
if (attempt > 2 && !(needUnlock = lockRoot(decisionMaker, rootReference,
attempt, contention))) {
continue;
}
if (p.getTotalCount() == 1 && pos != null) {
p = pos.page;
index = pos.index;
......@@ -1775,10 +1816,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
break;
}
case PUT: {
if (attempt > 2 && !(needUnlock = lockRoot(decisionMaker, rootReference,
attempt, contention))) {
continue;
}
value = decisionMaker.selectValue(result, value);
p = p.copy();
if (index < 0) {
......@@ -1791,11 +1828,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
int at = keyCount >> 1;
Object k = p.getKey(at);
Page split = p.split(at);
unsavedMemory += p.getMemory();
unsavedMemory += split.getMemory();
unsavedMemoryHolder.value += p.getMemory() + split.getMemory();
if (pos == null) {
Object keys[] = { k };
Page.PageReference children[] = {
Object[] keys = { k };
Page.PageReference[] children = {
new Page.PageReference(p),
new Page.PageReference(split)
};
......@@ -1816,70 +1852,97 @@ public class MVMap<K, V> extends AbstractMap<K, V>
break;
}
}
unsavedMemory += p.getMemory();
while (pos != null) {
Page c = p;
p = pos.page;
p = p.copy();
p.setChild(pos.index, c);
unsavedMemory += p.getMemory();
pos = pos.parent;
}
if(needUnlock) {
unlockRoot(p, attempt);
needUnlock = false;
} else if(!updateRoot(rootReference, p, attempt)) {
p = replacePage(pos, p, unsavedMemoryHolder);
rootPage = p;
if(lockedRootReference == null && !updateRoot(rootReference, p, attempt)) {
decisionMaker.reset();
continue;
}
} finally {
if(lockedRootReference != null) {
unlockRoot(rootPage, appendCounter);
}
}
while (tip != null) {
tip.page.removePage();
tip = tip.parent;
}
if (store.getFileStore() != null) {
store.registerUnsavedPage(unsavedMemory);
store.registerUnsavedPage(unsavedMemoryHolder.value);
}
return result;
} finally {
if(needUnlock) {
unlockRoot(rootReference.root, attempt);
}
}
private RootReference lockRoot(RootReference rootReference, int attempt) {
while(true) {
RootReference lockedRootReference = tryLock(rootReference, attempt++);
if (lockedRootReference != null) {
return lockedRootReference;
}
rootReference = getRoot();
}
}
private boolean lockRoot(DecisionMaker<? super V> decisionMaker, RootReference rootReference,
int attempt, int contention) {
boolean success = lockRoot(rootReference);
if (!success) {
decisionMaker.reset();
private RootReference tryLock(RootReference rootReference, int attempt) {
if (!rootReference.lockedForUpdate) {
RootReference lockedRootReference = new RootReference(rootReference, attempt);
if (root.compareAndSet(rootReference, lockedRootReference)) {
return lockedRootReference;
}
}
RootReference oldRootReference = rootReference.previous;
int contention = 1;
if (oldRootReference != null) {
long updateAttemptCounter = rootReference.updateAttemptCounter -
oldRootReference.updateAttemptCounter;
assert updateAttemptCounter >= 0 : updateAttemptCounter;
long updateCounter = rootReference.updateCounter - oldRootReference.updateCounter;
assert updateCounter >= 0 : updateCounter;
assert updateAttemptCounter >= updateCounter : updateAttemptCounter + " >= " + updateCounter;
contention += (int)((updateAttemptCounter+1) / (updateCounter+1));
}
if(attempt > 4) {
if (attempt <= 24) {
if (attempt <= 12) {
Thread.yield();
} else {
} else if (attempt <= 24) {
try {
Thread.sleep(0, 100 / contention + 50);
Thread.sleep(0, 10 * contention + 5);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
} else {
synchronized (lock) {
notificationRequested = true;
try {
lock.wait(100);
} catch (InterruptedException ignore) {
}
}
}
return success;
}
private boolean lockRoot(RootReference rootReference) {
return !rootReference.lockedForUpdate
&& root.compareAndSet(rootReference, new RootReference(rootReference));
return null;
}
private void unlockRoot(Page newRoot, int attempt) {
private RootReference unlockRoot(Page newRoot, int appendCounter) {
RootReference updatedRootReference;
boolean success;
do {
RootReference rootReference = getRoot();
RootReference updatedRootReference = new RootReference(rootReference, newRoot, attempt);
assert rootReference.lockedForUpdate;
updatedRootReference = new RootReference(rootReference, newRoot, appendCounter, false);
success = root.compareAndSet(rootReference, updatedRootReference);
} while(!success);
if (notificationRequested) {
synchronized (lock) {
notificationRequested = false;
lock.notifyAll();;
}
}
return updatedRootReference;
}
private static CursorPos traverseDown(Page p, Object key) {
......@@ -1929,4 +1992,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
}
private static final class IntValueHolder {
int value;
IntValueHolder() {}
}
}
......@@ -1401,7 +1401,7 @@ public class MVStore implements AutoCloseable {
try {
ChunkIdsCollector collector = new ChunkIdsCollector(meta.getId());
long oldestVersionToKeep = getOldestVersionToKeep();
MVMap.RootReference rootReference = meta.getRoot();
MVMap.RootReference rootReference = meta.flushAndGetRoot();
if (fast) {
MVMap.RootReference previous;
while (rootReference.version >= oldestVersionToKeep && (previous = rootReference.previous) != null) {
......
......@@ -493,6 +493,16 @@ public abstract class Page implements Cloneable
return bKeys;
}
abstract void expand(int extraKeyCount, Object[] extraKeys, Object[] extraValues);
final void expandKeys(int extraKeyCount, Object[] extraKeys) {
int keyCount = getKeyCount();
Object[] newKeys = createKeyStorage(keyCount + extraKeyCount);
System.arraycopy(keys, 0, newKeys, 0, keyCount);
System.arraycopy(extraKeys, 0, newKeys, keyCount, extraKeyCount);
keys = newKeys;
}
/**
* Get the total number of key-value pairs, including child pages.
*
......@@ -1012,6 +1022,11 @@ public abstract class Page implements Cloneable
return newPage;
}
@Override
public void expand(int keyCount, Object[] extraKys, Object[] extraValues) {
throw new UnsupportedOperationException();
}
@Override
public long getTotalCount() {
assert !isComplete() || totalCount == calculateTotalCount() :
......@@ -1324,6 +1339,21 @@ public abstract class Page implements Cloneable
return newPage;
}
@Override
public void expand(int extraKeyCount, Object[] extraKeys, Object[] extraValues) {
int keyCount = getKeyCount();
expandKeys(extraKeyCount, extraKeys);
if(values != null) {
Object[] newValues = createValueStorage(keyCount + extraKeyCount);
System.arraycopy(values, 0, newValues, 0, keyCount);
System.arraycopy(extraValues, 0, newValues, keyCount, extraKeyCount);
values = newValues;
}
if(isPersistent()) {
recalculateMemory();
}
}
@Override
public long getTotalCount() {
return getKeyCount();
......
......@@ -134,7 +134,7 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
int attempt = 0;
while(true) {
++attempt;
RootReference rootReference = getRoot();
RootReference rootReference = flushAndGetRoot();
Page p = rootReference.root.copy(true);
V result = operate(p, key, value, decisionMaker);
if (!p.isLeaf() && p.getTotalCount() == 0) {
......
......@@ -61,7 +61,7 @@ public class Transaction {
*/
private static final int STATUS_ROLLED_BACK = 5;
private static final String STATUS_NAMES[] = {
private static final String[] STATUS_NAMES = {
"CLOSED", "OPEN", "PREPARED", "COMMITTED", "ROLLING_BACK", "ROLLED_BACK"
};
static final int LOG_ID_BITS = 40;
......@@ -91,7 +91,7 @@ public class Transaction {
/**
* This is really a transaction identity, because it's not re-used.
*/
public final long sequenceNum;
final long sequenceNum;
/*
* Transaction state is an atomic composite field:
......@@ -138,12 +138,17 @@ public class Transaction {
/**
* Map on which this transaction is blocked.
*/
MVMap<?,VersionedValue> blockingMap;
private MVMap<?,VersionedValue> blockingMap;
/**
* Key in blockingMap on which this transaction is blocked.
*/
Object blockingKey;
private Object blockingKey;
/**
* Whether other transaction(s) are waiting for this to close.
*/
private volatile boolean notificationRequested;
Transaction(TransactionStore store, int transactionId, long sequenceNum, int status,
......@@ -237,7 +242,8 @@ public class Transaction {
}
public int getBlockerId() {
return blockingTransaction == null ? 0 : blockingTransaction.ownerId;
Transaction blocker = this.blockingTransaction;
return blocker == null ? 0 : blocker.ownerId;
}
/**
......@@ -390,21 +396,27 @@ public class Transaction {
public void rollbackToSavepoint(long savepointId) {
long lastState = setStatus(STATUS_ROLLING_BACK);
long logId = getLogId(lastState);
boolean success;
try {
store.rollbackTo(this, logId, savepointId);
} finally {
if (notificationRequested) {
notifyAllWaitingTransactions();
}
long expectedState = composeState(STATUS_ROLLING_BACK, logId, hasRollback(lastState));
long newState = composeState(STATUS_OPEN, savepointId, true);
if (!statusAndLogId.compareAndSet(expectedState, newState)) {
do {
success = statusAndLogId.compareAndSet(expectedState, newState);
} while (!success && statusAndLogId.get() == expectedState);
}
// this is moved outside of finally block to avert masking original exception, if any
if (!success) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
"Transaction {0} concurrently modified " +
"while rollback to savepoint was in progress",
"Transaction {0} concurrently modified while rollback to savepoint was in progress",
transactionId);
}
}
}
/**
* Roll the transaction back. Afterwards, this transaction is closed.
......@@ -474,7 +486,7 @@ public class Transaction {
void closeIt() {
long lastState = setStatus(STATUS_CLOSED);
store.store.deregisterVersionUsage(txCounter);
if(hasChanges(lastState) || hasRollback(lastState)) {
if((hasChanges(lastState) || hasRollback(lastState)) && notificationRequested) {
notifyAllWaitingTransactions();
}
}
......@@ -483,7 +495,10 @@ public class Transaction {
notifyAll();
}
public boolean waitFor(Transaction toWaitFor) {
public boolean waitFor(Transaction toWaitFor, MVMap<?,VersionedValue> map, Object key) {
blockingTransaction = toWaitFor;
blockingMap = map;
blockingKey = key;
if (isDeadlocked(toWaitFor)) {
StringBuilder details = new StringBuilder(
String.format("Transaction %d has been chosen as a deadlock victim. Details:%n", transactionId));
......@@ -504,7 +519,6 @@ public class Transaction {
}
}
blockingTransaction = toWaitFor;
try {
return toWaitFor.waitForThisToEnd(timeoutMillis);
} finally {
......@@ -527,6 +541,7 @@ public class Transaction {
private synchronized boolean waitForThisToEnd(int millis) {
long until = System.currentTimeMillis() + millis;
notificationRequested = true;
long state;
int status;
while((status = getStatus(state = statusAndLogId.get())) != STATUS_CLOSED
......@@ -557,8 +572,15 @@ public class Transaction {
@Override
public String toString() {
long state = statusAndLogId.get();
return transactionId + "(" + sequenceNum + ") " + STATUS_NAMES[getStatus(state)] + " " + getLogId(state);
return transactionId + "(" + sequenceNum + ") " + stateToString();
}
private String stateToString() {
return stateToString(statusAndLogId.get());
}
private static String stateToString(long state) {
return STATUS_NAMES[getStatus(state)] + (hasRollback(state) ? "<" : "") + " " + getLogId(state);
}
......
......@@ -46,7 +46,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
/**
* The transaction which is used for this map.
*/
final Transaction transaction;
private final Transaction transaction;
TransactionMap(Transaction transaction, MVMap<K, VersionedValue> map) {
this.transaction = transaction;
......@@ -105,16 +105,16 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
long undoLogSize;
do {
committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot();
mapRootReference = map.flushAndGetRoot();
BitSet opentransactions = store.openTransactions.get();
undoLogRootReferences = new MVMap.RootReference[opentransactions.length()];
undoLogSize = 0;
for (int i = opentransactions.nextSetBit(0); i >= 0; i = opentransactions.nextSetBit(i+1)) {
MVMap<Long, Object[]> undoLog = store.undoLogs[i];
if (undoLog != null) {
MVMap.RootReference rootReference = undoLog.getRoot();
MVMap.RootReference rootReference = undoLog.flushAndGetRoot();
undoLogRootReferences[i] = rootReference;
undoLogSize += rootReference.root.getTotalCount() + rootReference.getAppendCounter();
undoLogSize += rootReference.getTotalCount();
}
}
} while(committingTransactions != store.committingTransactions.get() ||
......@@ -125,7 +125,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
// should be considered as committed.
// Subsequent processing uses this snapshot info only.
Page mapRootPage = mapRootReference.root;
long size = mapRootPage.getTotalCount();
long size = mapRootReference.getTotalCount();
// if we are looking at the map without any uncommitted values
if (undoLogSize == 0) {
return size;
......@@ -241,6 +241,16 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
return set(key, decisionMaker);
}
/**
* Appends entry to uderlying map. This method may be used concurrently,
* but latest appended values are not guaranteed to be visible.
* @param key should be higher in map's order than any existing key
* @param value to be appended
*/
public void append(K key, V value) {
map.append(key, VersionedValueUncommitted.getInstance(transaction.log(map.getId(), key, null), value, null));
}
/**
* Lock row for the given key.
* <p>
......@@ -298,16 +308,12 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
assert decision != MVMap.Decision.REPEAT;
blockingTransaction = decisionMaker.getBlockingTransaction();
if (decision != MVMap.Decision.ABORT || blockingTransaction == null) {
transaction.blockingMap = null;
transaction.blockingKey = null;
@SuppressWarnings("unchecked")
V res = result == null ? null : (V) result.getCurrentValue();
return res;
}
decisionMaker.reset();
transaction.blockingMap = map;
transaction.blockingKey = key;
} while (blockingTransaction.sequenceNum > sequenceNumWhenStarted || transaction.waitFor(blockingTransaction));
} while (blockingTransaction.sequenceNum > sequenceNumWhenStarted || transaction.waitFor(blockingTransaction, map, key));
throw DataUtils.newIllegalStateException(DataUtils.ERROR_TRANSACTION_LOCKED,
"Map entry <{0}> with key <{1}> and value {2} is locked by tx {3} and can not be updated by tx {4}"
......@@ -669,8 +675,8 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
private final boolean includeAllUncommitted;
private X current;
protected TMIterator(TransactionMap<K,?> transactionMap, K from, K to, boolean includeAllUncommitted) {
Transaction transaction = transactionMap.transaction;
TMIterator(TransactionMap<K,?> transactionMap, K from, K to, boolean includeAllUncommitted) {
Transaction transaction = transactionMap.getTransaction();
this.transactionId = transaction.transactionId;
TransactionStore store = transaction.store;
MVMap<K, VersionedValue> map = transactionMap.map;
......@@ -683,7 +689,7 @@ public class TransactionMap<K, V> extends AbstractMap<K, V> {
MVMap.RootReference mapRootReference;
do {
committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot();
mapRootReference = map.flushAndGetRoot();
} while (committingTransactions != store.committingTransactions.get());
// Now we have a snapshot, where mapRootReference points to state of the map
// and committingTransactions mask tells us which of seemingly uncommitted changes
......
......@@ -390,7 +390,7 @@ public class TransactionStore {
*/
long addUndoLogRecord(int transactionId, long logId, Object[] undoLogRecord) {
MVMap<Long, Object[]> undoLog = undoLogs[transactionId];
Long undoKey = getOperationId(transactionId, logId);
long undoKey = getOperationId(transactionId, logId);
if (logId == 0 && !undoLog.isEmpty()) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论