提交 8e29ff1f authored 作者: andrei's avatar andrei

Atomic change of transaction state

上级 313e60db
......@@ -10,6 +10,7 @@ import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.type.DataType;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
/**
* A transaction.
......@@ -39,6 +40,35 @@ public class Transaction {
*/
public static final int STATUS_COMMITTING = 3;
/**
* The status of a transaction that has been logically committed or rather
* marked as committed, because it might be still listed among prepared,
* if it was prepared for commit, undo log entries might still exists for it
* and not all of it's changes within map's are re-written as committed yet.
* Nevertheless, those changes should be already viewed by other
* transactions as committed.
* This transaction's id can not be re-used until all the above is completed
* and transaction is closed.
*/
public static final int STATUS_COMMITTED = 4;
/**
* The status of a transaction that currently in a process of rolling back
* to a savepoint.
*/
private static final int STATUS_ROLLING_BACK = 5;
/**
* The status of a transaction that has been rolled back completely,
* but undo operations are not finished yet.
*/
private static final int STATUS_ROLLED_BACK = 6;
private static final String STATUS_NAMES[] = {
"CLOSED", "OPEN", "PREPARED", "COMMITTING",
"COMMITTED", "ROLLING_BACK", "ROLLED_BACK"
};
/**
* The transaction store.
*/
......@@ -49,12 +79,13 @@ public class Transaction {
*/
final int transactionId;
/**
* The log id of the last entry in the undo log map.
/*
* Transation state is an atomic composite field:
* bit 44 : flag whether transaction had rollback(s)
* bits 42-40 : status
* bits 39-0 : log id of the last entry in the undo log map
*/
long logId;
private int status;
private final AtomicLong statusAndLogId;
private MVStore.TxCounter txCounter;
......@@ -64,9 +95,8 @@ public class Transaction {
String name, long logId) {
this.store = store;
this.transactionId = transactionId;
this.status = status;
this.statusAndLogId = new AtomicLong(composeState(status, logId, false));
this.name = name;
this.logId = logId;
}
public int getId() {
......@@ -74,11 +104,60 @@ public class Transaction {
}
public int getStatus() {
return status;
return getStatus(statusAndLogId.get());
}
void setStatus(int status) {
this.status = status;
long setStatus(int status) {
while (true) {
long currentState = statusAndLogId.get();
long logId = getLogId(currentState);
int currentStatus = getStatus(currentState);
boolean valid;
switch (status) {
case STATUS_OPEN:
valid = currentStatus == STATUS_CLOSED ||
currentStatus == STATUS_ROLLING_BACK;
break;
case STATUS_ROLLING_BACK:
valid = currentStatus == STATUS_OPEN;
break;
case STATUS_PREPARED:
valid = currentStatus == STATUS_OPEN;
break;
case STATUS_COMMITTING:
valid = currentStatus == STATUS_OPEN ||
currentStatus == STATUS_PREPARED ||
// this case is only possible if called
// from endLeftoverTransactions()
currentStatus == STATUS_COMMITTING;
break;
case STATUS_COMMITTED:
valid = currentStatus == STATUS_COMMITTING;
break;
case STATUS_ROLLED_BACK:
valid = currentStatus == STATUS_OPEN ||
currentStatus == STATUS_PREPARED;
break;
case STATUS_CLOSED:
valid = currentStatus == STATUS_COMMITTING ||
currentStatus == STATUS_COMMITTED ||
currentStatus == STATUS_ROLLED_BACK;
break;
default:
valid = false;
break;
}
if (!valid) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
"Transaction was illegally transitioned from {0} to {1}",
STATUS_NAMES[currentStatus], STATUS_NAMES[status]);
}
long newState = composeState(status, logId, hasRollback(currentState));
if (statusAndLogId.compareAndSet(currentState, newState)) {
return currentState;
}
}
}
public void setName(String name) {
......@@ -97,7 +176,7 @@ public class Transaction {
* @return the savepoint id
*/
public long setSavepoint() {
return logId;
return getLogId();
}
public void markStatementStart() {
......@@ -120,17 +199,19 @@ public class Transaction {
* @param key the key
* @param oldValue the old value
*/
void log(int mapId, Object key, Object oldValue) {
void log(int mapId, Object key, VersionedValue oldValue) {
long currentState = statusAndLogId.getAndIncrement();
long logId = getLogId(currentState);
store.log(this, logId, mapId, key, oldValue);
// only increment the log id if logging was successful
logId++;
}
/**
* Remove the last log entry.
*/
void logUndo() {
store.logUndo(this, --logId);
long currentState = statusAndLogId.decrementAndGet();
long logId = getLogId(currentState);
store.logUndo(this, logId);
}
/**
......@@ -157,11 +238,8 @@ public class Transaction {
*/
public <K, V> TransactionMap<K, V> openMap(String name,
DataType keyType, DataType valueType) {
checkNotClosed();
MVMap<K, VersionedValue> map = store.openMap(name, keyType,
valueType);
int mapId = map.getId();
return new TransactionMap<>(this, map, mapId);
MVMap<K, VersionedValue> map = store.openMap(name, keyType, valueType);
return openMap(map);
}
/**
......@@ -172,8 +250,7 @@ public class Transaction {
* @param map the base map
* @return the transactional map
*/
public <K, V> TransactionMap<K, V> openMap(
MVMap<K, VersionedValue> map) {
public <K, V> TransactionMap<K, V> openMap(MVMap<K, VersionedValue> map) {
checkNotClosed();
int mapId = map.getId();
return new TransactionMap<>(this, map, mapId);
......@@ -181,11 +258,10 @@ public class Transaction {
/**
* Prepare the transaction. Afterwards, the transaction can only be
* committed or rolled back.
* committed or completely rolled back.
*/
public void prepare() {
checkNotClosed();
status = STATUS_PREPARED;
setStatus(STATUS_PREPARED);
store.storeTransaction(this);
}
......@@ -193,8 +269,10 @@ public class Transaction {
* Commit the transaction. Afterwards, this transaction is closed.
*/
public void commit() {
checkNotClosed();
store.commit(this, logId);
long state = setStatus(Transaction.STATUS_COMMITTING);
long logId = Transaction.getLogId(state);
int oldStatus = Transaction.getStatus(state);
store.commit(this, logId, oldStatus);
}
/**
......@@ -204,18 +282,36 @@ public class Transaction {
* @param savepointId the savepoint id
*/
public void rollbackToSavepoint(long savepointId) {
checkNotClosed();
long lastState = setStatus(STATUS_ROLLING_BACK);
long logId = getLogId(lastState);
try {
store.rollbackTo(this, logId, savepointId);
logId = savepointId;
} finally {
long expectedState = composeState(STATUS_ROLLING_BACK, logId, hasRollback(lastState));
long newState = composeState(STATUS_OPEN, savepointId, true);
if (!statusAndLogId.compareAndSet(expectedState, newState)) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
"Transaction {0} concurrently modified " +
"while rollback to savepoint was in progress",
transactionId);
}
}
}
/**
* Roll the transaction back. Afterwards, this transaction is closed.
*/
public void rollback() {
checkNotClosed();
try {
long lastState = setStatus(STATUS_ROLLED_BACK);
long logId = getLogId(lastState);
if (logId > 0) {
store.rollbackTo(this, logId, 0);
store.endTransaction(this, status);
}
} finally {
store.endTransaction(this, STATUS_ROLLED_BACK);
}
}
/**
......@@ -228,14 +324,18 @@ public class Transaction {
* @return the changes
*/
public Iterator<TransactionStore.Change> getChanges(long savepointId) {
return store.getChanges(this, logId, savepointId);
return store.getChanges(this, getLogId(), savepointId);
}
long getLogId() {
return getLogId(statusAndLogId.get());
}
/**
* Check whether this transaction is open or prepared.
*/
void checkNotClosed() {
if (status == STATUS_CLOSED) {
if (getStatus() == STATUS_CLOSED) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_CLOSED, "Transaction is closed");
}
......@@ -252,7 +352,31 @@ public class Transaction {
@Override
public String toString() {
return "" + transactionId;
long state = statusAndLogId.get();
return transactionId + " " + STATUS_NAMES[getStatus(state)] + " " + getLogId(state);
}
public static int getStatus(long state) {
return (int)(state >>> 40) & 15;
}
public static long getLogId(long state) {
return state & ((1L << 40) - 1);
}
private static boolean hasRollback(long state) {
return (state & (1L << 44)) != 0;
}
private static boolean hasChanges(long state) {
return getLogId(state) != 0;
}
private static long composeState(int status, long logId, boolean hasRollback) {
if (hasRollback) {
status |= 16;
}
return ((long)status << 40) | logId;
}
}
......@@ -277,7 +277,7 @@ public class TransactionMap<K, V> {
}
}
VersionedValue newValue = new VersionedValue(
TransactionStore.getOperationId(transaction.transactionId, transaction.logId),
TransactionStore.getOperationId(transaction.transactionId, transaction.getLogId()),
value);
if (current == null) {
// a new value
......
......@@ -345,16 +345,15 @@ public class TransactionStore {
*
* @param t the transaction
* @param maxLogId the last log id
* @param oldStatus last status
*/
void commit(Transaction t, long maxLogId) {
void commit(Transaction t, long maxLogId, int oldStatus) {
if (store.isClosed()) {
return;
}
// TODO could synchronize on blocks (100 at a time or so)
rwLock.writeLock().lock();
int oldStatus = t.getStatus();
try {
t.setStatus(Transaction.STATUS_COMMITTING);
for (long logId = 0; logId < maxLogId; logId++) {
Long undoKey = getOperationId(t.getId(), logId);
Object[] op = undoLog.get(undoKey);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论