提交 b56c16ef authored 作者: andrei's avatar andrei

minor refactoring between Transaction and TransactionStore

上级 d5478abd
...@@ -230,7 +230,7 @@ public class Transaction { ...@@ -230,7 +230,7 @@ public class Transaction {
* @param key the key * @param key the key
* @param oldValue the old value * @param oldValue the old value
*/ */
void log(int mapId, Object key, VersionedValue oldValue) { long log(int mapId, Object key, VersionedValue oldValue) {
long currentState = statusAndLogId.getAndIncrement(); long currentState = statusAndLogId.getAndIncrement();
long logId = getLogId(currentState); long logId = getLogId(currentState);
if (logId >= LOG_ID_LIMIT) { if (logId >= LOG_ID_LIMIT) {
...@@ -239,7 +239,10 @@ public class Transaction { ...@@ -239,7 +239,10 @@ public class Transaction {
"Transaction {0} has too many changes", "Transaction {0} has too many changes",
transactionId); transactionId);
} }
store.log(this, logId, mapId, key, oldValue); int currentStatus = getStatus(currentState);
checkOpen(currentStatus);
long undoKey = store.addUndoLogRecord(transactionId, logId, new Object[]{ mapId, key, oldValue });
return undoKey;
} }
/** /**
...@@ -254,7 +257,9 @@ public class Transaction { ...@@ -254,7 +257,9 @@ public class Transaction {
"Transaction {0} has internal error", "Transaction {0} has internal error",
transactionId); transactionId);
} }
store.logUndo(this, logId); int currentStatus = getStatus(currentState);
checkOpen(currentStatus);
store.removeUndoLogRecord(transactionId, logId);
} }
/** /**
...@@ -313,11 +318,29 @@ public class Transaction { ...@@ -313,11 +318,29 @@ public class Transaction {
*/ */
public void commit() { public void commit() {
assert store.openTransactions.get().get(transactionId); assert store.openTransactions.get().get(transactionId);
Throwable ex = null;
boolean hasChanges = false;
try {
long state = setStatus(STATUS_COMMITTING); long state = setStatus(STATUS_COMMITTING);
long logId = Transaction.getLogId(state); long logId = getLogId(state);
boolean hasChanges = hasChanges(state); hasChanges = hasChanges(state);
if (hasChanges) {
store.commit(this, logId, hasChanges); store.commit(this, logId);
}
} catch (Throwable e) {
ex = e;
throw e;
} finally {
try {
store.endTransaction(this, hasChanges);
} catch (Throwable e) {
if (ex == null) {
throw e;
} else {
ex.addSuppressed(e);
}
}
}
} }
/** /**
...@@ -376,13 +399,24 @@ public class Transaction { ...@@ -376,13 +399,24 @@ public class Transaction {
return getLogId(statusAndLogId.get()); return getLogId(statusAndLogId.get());
} }
/**
* Check whether this transaction is open.
*/
private void checkOpen(int status) {
if (status != STATUS_OPEN) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
"Transaction {0} has status {1}, not open", transactionId, status);
}
}
/** /**
* Check whether this transaction is open or prepared. * Check whether this transaction is open or prepared.
*/ */
void checkNotClosed() { void checkNotClosed() {
if (getStatus() == STATUS_CLOSED) { if (getStatus() == STATUS_CLOSED) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_CLOSED, "Transaction is closed"); DataUtils.ERROR_CLOSED, "Transaction {0} is closed", transactionId);
} }
} }
......
...@@ -366,18 +366,14 @@ public class TransactionStore { ...@@ -366,18 +366,14 @@ public class TransactionStore {
} }
/** /**
* Log an entry. * Add an undoLog entry.
* *
* @param t the transaction * @param transactionId id of the transaction
* @param logId the log id * @param logId sequential number of the log record within transaction
* @param mapId the map id * @param undoLogRecord Object[]
* @param key the key
* @param oldValue the old value
*/ */
long log(Transaction t, long logId, int mapId, long addUndoLogRecord(int transactionId, long logId, Object[] undoLogRecord) {
Object key, Object oldValue) { Long undoKey = getOperationId(transactionId, logId);
Long undoKey = getOperationId(t.getId(), logId);
Object[] log = { mapId, key, oldValue };
rwLock.writeLock().lock(); rwLock.writeLock().lock();
try { try {
if (logId == 0) { if (logId == 0) {
...@@ -386,10 +382,10 @@ public class TransactionStore { ...@@ -386,10 +382,10 @@ public class TransactionStore {
DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS, DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS,
"An old transaction with the same id " + "An old transaction with the same id " +
"is still open: {0}", "is still open: {0}",
t.getId()); transactionId);
} }
} }
undoLog.put(undoKey, log); undoLog.put(undoKey, undoLogRecord);
} finally { } finally {
rwLock.writeLock().unlock(); rwLock.writeLock().unlock();
} }
...@@ -399,11 +395,11 @@ public class TransactionStore { ...@@ -399,11 +395,11 @@ public class TransactionStore {
/** /**
* Remove a log entry. * Remove a log entry.
* *
* @param t the transaction * @param transactionId id of the transaction
* @param logId the log id * @param logId sequential number of the log record within transaction
*/ */
public void logUndo(Transaction t, long logId) { public void removeUndoLogRecord(int transactionId, long logId) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(transactionId, logId);
rwLock.writeLock().lock(); rwLock.writeLock().lock();
try { try {
Object[] old = undoLog.remove(undoKey); Object[] old = undoLog.remove(undoKey);
...@@ -411,7 +407,7 @@ public class TransactionStore { ...@@ -411,7 +407,7 @@ public class TransactionStore {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE, DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
"Transaction {0} was concurrently rolled back", "Transaction {0} was concurrently rolled back",
t.getId()); transactionId);
} }
} finally { } finally {
rwLock.writeLock().unlock(); rwLock.writeLock().unlock();
...@@ -435,11 +431,8 @@ public class TransactionStore { ...@@ -435,11 +431,8 @@ public class TransactionStore {
* *
* @param t the transaction * @param t the transaction
* @param maxLogId the last log id * @param maxLogId the last log id
* @param hasChanges true if there were updates within specified
* transaction (even fully rolled back),
* false if just data access
*/ */
void commit(Transaction t, long maxLogId, boolean hasChanges) { void commit(Transaction t, long maxLogId) {
if (store.isClosed()) { if (store.isClosed()) {
return; return;
} }
...@@ -487,7 +480,6 @@ public class TransactionStore { ...@@ -487,7 +480,6 @@ public class TransactionStore {
rwLock.writeLock().unlock(); rwLock.writeLock().unlock();
flipCommittingTransactionsBit(transactionId, false); flipCommittingTransactionsBit(transactionId, false);
} }
endTransaction(t, hasChanges);
} }
private void flipCommittingTransactionsBit(int transactionId, boolean flag) { private void flipCommittingTransactionsBit(int transactionId, boolean flag) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论