提交 9447abe0 authored 作者: andrei's avatar andrei

Look Ma, no more locks!

上级 05ad3781
...@@ -10,9 +10,9 @@ import org.h2.mvstore.DataUtils; ...@@ -10,9 +10,9 @@ import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.Page; import org.h2.mvstore.Page;
import org.h2.mvstore.type.DataType; import org.h2.mvstore.type.DataType;
import java.util.BitSet;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.BitSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
...@@ -99,77 +99,72 @@ public class TransactionMap<K, V> { ...@@ -99,77 +99,72 @@ public class TransactionMap<K, V> {
*/ */
public long sizeAsLong() { public long sizeAsLong() {
TransactionStore store = transaction.store; TransactionStore store = transaction.store;
store.rwLock.readLock().lock(); MVMap<Long, Object[]> undo = transaction.store.undoLog;
try {
MVMap<Long, Object[]> undo = transaction.store.undoLog;
BitSet committingTransactions;
MVMap.RootReference mapRootReference;
MVMap.RootReference undoLogRootReference;
do {
committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot();
undoLogRootReference = store.undoLog.getRoot();
} while(committingTransactions != store.committingTransactions.get() ||
mapRootReference != map.getRoot());
Page undoRootPage = undoLogRootReference.root; BitSet committingTransactions;
long undoLogSize = undoRootPage.getTotalCount(); MVMap.RootReference mapRootReference;
Page mapRootPage = mapRootReference.root; MVMap.RootReference undoLogRootReference;
long size = mapRootPage.getTotalCount(); do {
if (undoLogSize == 0) { committingTransactions = store.committingTransactions.get();
return size; mapRootReference = map.getRoot();
} undoLogRootReference = store.undoLog.getRoot();
if (undoLogSize > size) { } while(committingTransactions != store.committingTransactions.get() ||
// the undo log is larger than the map - mapRootReference != map.getRoot());
// count the entries of the map
size = 0; Page undoRootPage = undoLogRootReference.root;
Cursor<K, VersionedValue> cursor = map.cursor(null); long undoLogSize = undoRootPage.getTotalCount();
while (cursor.hasNext()) { Page mapRootPage = mapRootReference.root;
K key = cursor.next(); long size = mapRootPage.getTotalCount();
VersionedValue data = cursor.getValue(); if (undoLogSize == 0) {
data = getValue(mapRootPage, undoRootPage, key, readLogId, data, committingTransactions); return size;
if (data != null && data.value != null) { }
size++; if (undoLogSize > size) {
} // the undo log is larger than the map -
// count the entries of the map
size = 0;
Cursor<K, VersionedValue> cursor = map.cursor(null);
while (cursor.hasNext()) {
K key = cursor.next();
VersionedValue data = cursor.getValue();
data = getValue(mapRootPage, undoRootPage, key, readLogId, data, committingTransactions);
if (data != null && data.value != null) {
size++;
} }
return size;
} }
// the undo log is smaller than the map - return size;
// scan the undo log and subtract invisible entries }
MVMap<Object, Integer> temp = transaction.store // the undo log is smaller than the map -
.createTempMap(); // scan the undo log and subtract invisible entries
try { MVMap<Object, Integer> temp = transaction.store
Cursor cursor = new Cursor<Long, Object[]>(undoRootPage, null); .createTempMap();
while (cursor.hasNext()) { try {
cursor.next(); Cursor cursor = new Cursor<Long, Object[]>(undoRootPage, null);
Object[] op = (Object[]) cursor.getValue(); while (cursor.hasNext()) {
int m = (Integer) op[0]; cursor.next();
if (m != mapId) { Object[] op = (Object[]) cursor.getValue();
// a different map - ignore int m = (Integer) op[0];
continue; if (m != mapId) {
} // a different map - ignore
@SuppressWarnings("unchecked") continue;
K key = (K) op[1]; }
VersionedValue data = map.get(mapRootPage, key); @SuppressWarnings("unchecked")
data = getValue(mapRootPage, undoRootPage, key, readLogId, data, committingTransactions); K key = (K) op[1];
if (data == null || data.value == null) { VersionedValue data = map.get(mapRootPage, key);
Integer old = temp.put(key, 1); data = getValue(mapRootPage, undoRootPage, key, readLogId, data, committingTransactions);
// count each key only once (there might be if (data == null || data.value == null) {
// multiple Integer old = temp.put(key, 1);
// changes for the same key) // count each key only once (there might be
if (old == null) { // multiple
size--; // changes for the same key)
} if (old == null) {
size--;
} }
} }
} finally {
transaction.store.store.removeMap(temp);
} }
return size;
} finally { } finally {
transaction.store.rwLock.readLock().unlock(); transaction.store.store.removeMap(temp);
} }
return size;
} }
/** /**
...@@ -420,25 +415,20 @@ public class TransactionMap<K, V> { ...@@ -420,25 +415,20 @@ public class TransactionMap<K, V> {
private VersionedValue getValue(K key, long maxLog) { private VersionedValue getValue(K key, long maxLog) {
TransactionStore store = transaction.store; TransactionStore store = transaction.store;
store.rwLock.readLock().lock(); BitSet committingTransactions;
try { MVMap.RootReference mapRootReference;
BitSet committingTransactions; MVMap.RootReference undoLogRootReference;
MVMap.RootReference mapRootReference; do {
MVMap.RootReference undoLogRootReference; committingTransactions = store.committingTransactions.get();
do { mapRootReference = map.getRoot();
committingTransactions = store.committingTransactions.get(); undoLogRootReference = store.undoLog.getRoot();
mapRootReference = map.getRoot(); } while(committingTransactions != store.committingTransactions.get() ||
undoLogRootReference = store.undoLog.getRoot(); mapRootReference != map.getRoot());
} while(committingTransactions != store.committingTransactions.get() ||
mapRootReference != map.getRoot()); Page undoRootPage = undoLogRootReference.root;
Page mapRootPage = mapRootReference.root;
Page undoRootPage = undoLogRootReference.root; VersionedValue data = map.get(mapRootPage, key);
Page mapRootPage = mapRootReference.root; return getValue(mapRootPage, undoRootPage, key, maxLog, data, store.committingTransactions.get());
VersionedValue data = map.get(mapRootPage, key);
return getValue(mapRootPage, undoRootPage, key, maxLog, data, store.committingTransactions.get());
} finally {
store.rwLock.readLock().unlock();
}
} }
/** /**
......
...@@ -13,7 +13,6 @@ import java.util.Iterator; ...@@ -13,7 +13,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.h2.mvstore.DataUtils; import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore; import org.h2.mvstore.MVStore;
...@@ -54,12 +53,6 @@ public class TransactionStore { ...@@ -54,12 +53,6 @@ public class TransactionStore {
*/ */
final MVMap<Long, Object[]> undoLog; final MVMap<Long, Object[]> undoLog;
/**
* the reader/writer lock for the undo-log. Allows us to process multiple
* selects in parallel.
*/
final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
/** /**
* The map of maps. * The map of maps.
*/ */
...@@ -169,39 +162,34 @@ public class TransactionStore { ...@@ -169,39 +162,34 @@ public class TransactionStore {
store.removeMap(temp); store.removeMap(temp);
} }
} }
rwLock.writeLock().lock(); if (!undoLog.isEmpty()) {
try { Long key = undoLog.firstKey();
if (!undoLog.isEmpty()) { while (key != null) {
Long key = undoLog.firstKey(); int transactionId = getTransactionId(key);
while (key != null) { if (!openTransactions.get().get(transactionId)) {
int transactionId = getTransactionId(key); Object[] data = preparedTransactions.get(transactionId);
if (!openTransactions.get().get(transactionId)) { int status;
Object[] data = preparedTransactions.get(transactionId); String name;
int status; if (data == null) {
String name; if (undoLog.containsKey(getOperationId(transactionId, 0))) {
if (data == null) { status = Transaction.STATUS_OPEN;
if (undoLog.containsKey(getOperationId(transactionId, 0))) {
status = Transaction.STATUS_OPEN;
} else {
status = Transaction.STATUS_COMMITTING;
}
name = null;
} else { } else {
status = (Integer) data[0]; status = Transaction.STATUS_COMMITTING;
name = (String) data[1];
} }
long nextTxUndoKey = getOperationId(transactionId + 1, 0); name = null;
Long lastUndoKey = undoLog.lowerKey(nextTxUndoKey); } else {
assert lastUndoKey != null; status = (Integer) data[0];
assert getTransactionId(lastUndoKey) == transactionId; name = (String) data[1];
long logId = getLogId(lastUndoKey) + 1;
registerTransaction(transactionId, status, name, logId, timeoutMillis, 0, listener);
key = undoLog.ceilingKey(nextTxUndoKey);
} }
long nextTxUndoKey = getOperationId(transactionId + 1, 0);
Long lastUndoKey = undoLog.lowerKey(nextTxUndoKey);
assert lastUndoKey != null;
assert getTransactionId(lastUndoKey) == transactionId;
long logId = getLogId(lastUndoKey) + 1;
registerTransaction(transactionId, status, name, logId, timeoutMillis, 0, listener);
key = undoLog.ceilingKey(nextTxUndoKey);
} }
} }
} finally {
rwLock.writeLock().unlock();
} }
init = true; init = true;
} }
...@@ -277,23 +265,18 @@ public class TransactionStore { ...@@ -277,23 +265,18 @@ public class TransactionStore {
if(!init) { if(!init) {
init(); init();
} }
rwLock.readLock().lock(); ArrayList<Transaction> list = new ArrayList<>();
try { int transactionId = 0;
ArrayList<Transaction> list = new ArrayList<>(); BitSet bitSet = openTransactions.get();
int transactionId = 0; while((transactionId = bitSet.nextSetBit(transactionId + 1)) > 0) {
BitSet bitSet = openTransactions.get(); Transaction transaction = getTransaction(transactionId);
while((transactionId = bitSet.nextSetBit(transactionId + 1)) > 0) { if(transaction != null) {
Transaction transaction = getTransaction(transactionId); if(transaction.getStatus() != Transaction.STATUS_CLOSED) {
if(transaction != null) { list.add(transaction);
if(transaction.getStatus() != Transaction.STATUS_CLOSED) {
list.add(transaction);
}
} }
} }
return list;
} finally {
rwLock.readLock().unlock();
} }
return list;
} }
/** /**
...@@ -387,21 +370,16 @@ public class TransactionStore { ...@@ -387,21 +370,16 @@ public class TransactionStore {
*/ */
long addUndoLogRecord(int transactionId, long logId, Object[] undoLogRecord) { long addUndoLogRecord(int transactionId, long logId, Object[] undoLogRecord) {
Long undoKey = getOperationId(transactionId, logId); Long undoKey = getOperationId(transactionId, logId);
rwLock.writeLock().lock(); if (logId == 0) {
try { if (undoLog.containsKey(undoKey)) {
if (logId == 0) { throw DataUtils.newIllegalStateException(
if (undoLog.containsKey(undoKey)) { DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS,
throw DataUtils.newIllegalStateException( "An old transaction with the same id " +
DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS, "is still open: {0}",
"An old transaction with the same id " + transactionId);
"is still open: {0}",
transactionId);
}
} }
undoLog.put(undoKey, undoLogRecord);
} finally {
rwLock.writeLock().unlock();
} }
undoLog.put(undoKey, undoLogRecord);
return undoKey; return undoKey;
} }
...@@ -413,17 +391,12 @@ public class TransactionStore { ...@@ -413,17 +391,12 @@ public class TransactionStore {
*/ */
public void removeUndoLogRecord(int transactionId, long logId) { public void removeUndoLogRecord(int transactionId, long logId) {
Long undoKey = getOperationId(transactionId, logId); Long undoKey = getOperationId(transactionId, logId);
rwLock.writeLock().lock(); Object[] old = undoLog.remove(undoKey);
try { if (old == null) {
Object[] old = undoLog.remove(undoKey); throw DataUtils.newIllegalStateException(
if (old == null) { DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
throw DataUtils.newIllegalStateException( "Transaction {0} was concurrently rolled back",
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE, transactionId);
"Transaction {0} was concurrently rolled back",
transactionId);
}
} finally {
rwLock.writeLock().unlock();
} }
} }
...@@ -455,8 +428,6 @@ public class TransactionStore { ...@@ -455,8 +428,6 @@ public class TransactionStore {
flipCommittingTransactionsBit(transactionId, true); flipCommittingTransactionsBit(transactionId, true);
CommitDecisionMaker commitDecisionMaker = new CommitDecisionMaker(); CommitDecisionMaker commitDecisionMaker = new CommitDecisionMaker();
// TODO could synchronize on blocks (100 at a time or so)
rwLock.writeLock().lock();
try { try {
for (long logId = 0; logId < maxLogId; logId++) { for (long logId = 0; logId < maxLogId; logId++) {
Long undoKey = getOperationId(transactionId, logId); Long undoKey = getOperationId(transactionId, logId);
...@@ -481,7 +452,6 @@ public class TransactionStore { ...@@ -481,7 +452,6 @@ public class TransactionStore {
undoLog.remove(undoKey); undoLog.remove(undoKey);
} }
} finally { } finally {
rwLock.writeLock().unlock();
flipCommittingTransactionsBit(transactionId, false); flipCommittingTransactionsBit(transactionId, false);
} }
} }
...@@ -635,18 +605,12 @@ public class TransactionStore { ...@@ -635,18 +605,12 @@ public class TransactionStore {
* @param toLogId the log id to roll back to * @param toLogId the log id to roll back to
*/ */
void rollbackTo(Transaction t, long maxLogId, long toLogId) { void rollbackTo(Transaction t, long maxLogId, long toLogId) {
// TODO could synchronize on blocks (100 at a time or so) int transactionId = t.getId();
rwLock.writeLock().lock(); RollbackDecisionMaker decisionMaker = new RollbackDecisionMaker(this, transactionId, toLogId, t.listener);
try { for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
int transactionId = t.getId(); Long undoKey = getOperationId(transactionId, logId);
RollbackDecisionMaker decisionMaker = new RollbackDecisionMaker(this, transactionId, toLogId, t.listener); undoLog.operate(undoKey, null, decisionMaker);
for (long logId = maxLogId - 1; logId >= toLogId; logId--) { decisionMaker.reset();
Long undoKey = getOperationId(transactionId, logId);
undoLog.operate(undoKey, null, decisionMaker);
decisionMaker.reset();
}
} finally {
rwLock.writeLock().unlock();
} }
} }
...@@ -667,33 +631,28 @@ public class TransactionStore { ...@@ -667,33 +631,28 @@ public class TransactionStore {
private Change current; private Change current;
private void fetchNext() { private void fetchNext() {
rwLock.writeLock().lock(); int transactionId = t.getId();
try { while (logId >= toLogId) {
int transactionId = t.getId(); Long undoKey = getOperationId(transactionId, logId);
while (logId >= toLogId) { Object[] op = undoLog.get(undoKey);
Long undoKey = getOperationId(transactionId, logId); logId--;
Object[] op = undoLog.get(undoKey); if (op == null) {
logId--; // partially rolled back: load previous
if (op == null) { undoKey = undoLog.floorKey(undoKey);
// partially rolled back: load previous if (undoKey == null ||
undoKey = undoLog.floorKey(undoKey); getTransactionId(undoKey) != transactionId) {
if (undoKey == null || break;
getTransactionId(undoKey) != transactionId) {
break;
}
logId = getLogId(undoKey);
continue;
}
int mapId = ((Integer) op[0]).intValue();
MVMap<Object, VersionedValue> m = openMap(mapId);
if (m != null) { // could be null if map was removed later on
VersionedValue oldValue = (VersionedValue) op[2];
current = new Change(m.getName(), op[1], oldValue == null ? null : oldValue.value);
return;
} }
logId = getLogId(undoKey);
continue;
}
int mapId = ((Integer) op[0]).intValue();
MVMap<Object, VersionedValue> m = openMap(mapId);
if (m != null) { // could be null if map was removed later on
VersionedValue oldValue = (VersionedValue) op[2];
current = new Change(m.getName(), op[1], oldValue == null ? null : oldValue.value);
return;
} }
} finally {
rwLock.writeLock().unlock();
} }
current = null; current = null;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论