提交 5367d1a3 authored 作者: Andrei Tokar's avatar Andrei Tokar

Fix TestTransactionStore failure - Persist fact of transaction's commit ASAP

Address code review comments.
上级 8753a6b0
......@@ -77,7 +77,7 @@ public class TransactionMap<K, V> {
// when none of the variables concurrently changes it's value.
BitSet committingTransactions;
MVMap.RootReference mapRootReference;
MVMap.RootReference undoLogRootReferences[];
MVMap.RootReference[] undoLogRootReferences;
long undoLogSize;
do {
committingTransactions = store.committingTransactions.get();
......
......@@ -52,7 +52,7 @@ public class TransactionStore {
* Key: opId, value: [ mapId, key, oldValue ].
*/
@SuppressWarnings("unchecked")
final MVMap<Long,Object[]> undoLogs[] = (MVMap<Long,Object[]>[])new MVMap[MAX_OPEN_TRANSACTIONS];
final MVMap<Long,Object[]> undoLogs[] = new MVMap[MAX_OPEN_TRANSACTIONS];
private final MVMap.Builder<Long,Object[]> undoLogBuilder;
private final DataType dataType;
......@@ -94,7 +94,9 @@ public class TransactionStore {
*/
private int nextTempMapId;
private static final String UNDO_LOG_NAME_PEFIX = "undoLog-";
private static final String UNDO_LOG_NAME_PEFIX = "undoLog";
private static final char UNDO_LOG_OPEN = '-';
private static final char UNDO_LOG_COMMITTED = '.';
/**
* Hard limit on the number of concurrently opened transactions
......@@ -155,12 +157,13 @@ public class TransactionStore {
for (String mapName : store.getMapNames()) {
if (mapName.startsWith(UNDO_LOG_NAME_PEFIX)) {
if (store.hasData(mapName)) {
int transactionId = Integer.parseInt(mapName.substring(UNDO_LOG_NAME_PEFIX.length()));
int transactionId = Integer.parseInt(mapName.substring(UNDO_LOG_NAME_PEFIX.length() + 1));
Object[] data = preparedTransactions.get(transactionId);
int status;
String name;
if (data == null) {
status = Transaction.STATUS_OPEN;
status = mapName.charAt(UNDO_LOG_NAME_PEFIX.length()) == UNDO_LOG_OPEN ?
Transaction.STATUS_OPEN : Transaction.STATUS_COMMITTING;
name = null;
} else {
status = (Integer) data[0];
......@@ -330,7 +333,7 @@ public class TransactionStore {
transactions.set(transactionId, transaction);
if (undoLogs[transactionId] == null) {
String undoName = UNDO_LOG_NAME_PEFIX + transactionId;
String undoName = UNDO_LOG_NAME_PEFIX + UNDO_LOG_OPEN + transactionId;
undoLogs[transactionId] = store.openMap(undoName, undoLogBuilder);
}
return transaction;
......@@ -413,19 +416,24 @@ public class TransactionStore {
try {
t.setStatus(Transaction.STATUS_COMMITTED);
MVMap<Long, Object[]> undoLog = undoLogs[transactionId];
Cursor<Long, Object[]> cursor = undoLog.cursor(null);
while (cursor.hasNext()) {
Long undoKey = cursor.next();
Object[] op = cursor.getValue();
int mapId = (Integer) op[0];
MVMap<Object, VersionedValue> map = openMap(mapId);
if (map != null) { // might be null if map was removed later
Object key = op[1];
commitDecisionMaker.setUndoKey(undoKey);
map.operate(key, null, commitDecisionMaker);
store.renameMap(undoLog, UNDO_LOG_NAME_PEFIX + UNDO_LOG_COMMITTED + transactionId);
try {
Cursor<Long, Object[]> cursor = undoLog.cursor(null);
while (cursor.hasNext()) {
Long undoKey = cursor.next();
Object[] op = cursor.getValue();
int mapId = (Integer) op[0];
MVMap<Object, VersionedValue> map = openMap(mapId);
if (map != null) { // might be null if map was removed later
Object key = op[1];
commitDecisionMaker.setUndoKey(undoKey);
map.operate(key, null, commitDecisionMaker);
}
}
undoLog.clear();
} finally {
store.renameMap(undoLog, UNDO_LOG_NAME_PEFIX + UNDO_LOG_OPEN + transactionId);
}
undoLog.clear();
} finally {
flipCommittingTransactionsBit(transactionId, false);
}
......@@ -548,15 +556,7 @@ public class TransactionStore {
if (wasStored || store.getAutoCommitDelay() == 0) {
store.tryCommit();
} else {
boolean empty = true;
BitSet openTrans = openTransactions.get();
for (int i = openTrans.nextSetBit(0); empty && i >= 0; i = openTrans.nextSetBit(i + 1)) {
MVMap<Long, Object[]> undoLog = undoLogs[i];
if (undoLog != null) {
empty = undoLog.isEmpty();
}
}
if (empty) {
if (isUndoEmpty()) {
// to avoid having to store the transaction log,
// if there is no open transaction,
// and if there have been many changes, store them now
......@@ -571,6 +571,17 @@ public class TransactionStore {
}
}
private boolean isUndoEmpty() {
BitSet openTrans = openTransactions.get();
for (int i = openTrans.nextSetBit(0); i >= 0; i = openTrans.nextSetBit(i + 1)) {
MVMap<Long, Object[]> undoLog = undoLogs[i];
if (undoLog != null && !undoLog.isEmpty()) {
return false;
}
}
return true;
}
Transaction getTransaction(int transactionId) {
return transactions.get(transactionId);
}
......
......@@ -133,7 +133,7 @@ public class TestMultiThread extends TestBase implements Runnable {
tasks[i] = t;
t.execute();
}
Thread.sleep(500);
Thread.sleep(10000);
for (Task t : tasks) {
t.get();
}
......
......@@ -52,8 +52,7 @@ public class TestTransactionStore extends TestBase {
testConcurrentUpdate();
testRepeatedChange();
testTransactionAge();
// TODO: figure out why it hangs
// testStopWhileCommitting();
testStopWhileCommitting();
testGetModifiedMaps();
testKeyIterator();
testTwoPhaseCommit();
......@@ -397,7 +396,7 @@ public class TestTransactionStore extends TestBase {
store.close();
s = MVStore.open(fileName);
// roll back a bit, until we have some undo log entries
assertTrue(s.hasMap("undoLog-1"));
assertTrue(s.hasMap("undoLog-1") || s.hasMap("undoLog.1"));
for (int back = 0; back < 100; back++) {
int minus = r.nextInt(10);
s.rollbackTo(Math.max(0, s.getCurrentVersion() - minus));
......@@ -425,7 +424,7 @@ public class TestTransactionStore extends TestBase {
private boolean hasDataUndoLog(MVStore s) {
for (int i = 0; i < 255; i++) {
if(s.hasData("undoLog-"+i)) {
if(s.hasData("undoLog."+i)) {
return true;
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论