提交 842d1042 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: concurrent changes to the same row could result in the exception "The…

MVStore: concurrent changes to the same row could result in the exception "The transaction log might be corrupt for key ...". This could only be reproduced with 3 or more threads.
上级 6a4ffe98
...@@ -46,7 +46,7 @@ public class TransactionStore { ...@@ -46,7 +46,7 @@ public class TransactionStore {
* is not possible). Log entries are written before the data is changed * is not possible). Log entries are written before the data is changed
* (write-ahead). * (write-ahead).
* <p> * <p>
* Key: [ opId ], value: [ mapId, key, oldValue ]. * Key: opId, value: [ mapId, key, oldValue ].
*/ */
final MVMap<Long, Object[]> undoLog; final MVMap<Long, Object[]> undoLog;
...@@ -223,12 +223,15 @@ public class TransactionStore { ...@@ -223,12 +223,15 @@ public class TransactionStore {
* @return the transaction * @return the transaction
*/ */
public synchronized Transaction begin() { public synchronized Transaction begin() {
int transactionId;
int status;
if (!init) { if (!init) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE, DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
"Not initialized"); "Not initialized");
} }
int transactionId = openTransactions.nextClearBit(1); transactionId = openTransactions.nextClearBit(1);
if (transactionId > maxTransactionId) { if (transactionId > maxTransactionId) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS, DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS,
...@@ -236,7 +239,7 @@ public class TransactionStore { ...@@ -236,7 +239,7 @@ public class TransactionStore {
transactionId - 1); transactionId - 1);
} }
openTransactions.set(transactionId); openTransactions.set(transactionId);
int status = Transaction.STATUS_OPEN; status = Transaction.STATUS_OPEN;
return new Transaction(this, transactionId, status, null, 0); return new Transaction(this, transactionId, status, null, 0);
} }
...@@ -1232,35 +1235,13 @@ public class TransactionStore { ...@@ -1232,35 +1235,13 @@ public class TransactionStore {
if (d == null) { if (d == null) {
// this entry should be committed or rolled back // this entry should be committed or rolled back
// in the meantime (the transaction might still be open) // in the meantime (the transaction might still be open)
// or it might be changed again in a different
// transaction (possibly one with the same id)
data = map.get(key); data = map.get(key);
if (data != null && data.operationId == id) {
// the transaction was not committed correctly
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_CORRUPT,
"The transaction log might be corrupt for key {0}",
key);
}
} else { } else {
data = (VersionedValue) d[2]; data = (VersionedValue) d[2];
} }
// verify this is either committed,
// or the same transaction and earlier
if (data != null) {
long id2 = data.operationId;
if (id2 != 0) {
int tx2 = getTransactionId(id2);
if (tx2 != tx) {
// a different transaction - ok
} else if (getLogId(id2) > getLogId(id)) {
// newer than before
break;
}
}
}
} }
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_CORRUPT,
"The transaction log might be corrupt for key {0}", key);
} }
/** /**
......
...@@ -45,6 +45,7 @@ public class TestTransactionStore extends TestBase { ...@@ -45,6 +45,7 @@ public class TestTransactionStore extends TestBase {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
testConcurrentAddRemove();
testConcurrentAdd(); testConcurrentAdd();
testCountWithOpenTransactions(); testCountWithOpenTransactions();
testConcurrentUpdate(); testConcurrentUpdate();
...@@ -61,6 +62,47 @@ public class TestTransactionStore extends TestBase { ...@@ -61,6 +62,47 @@ public class TestTransactionStore extends TestBase {
testCompareWithPostgreSQL(); testCompareWithPostgreSQL();
} }
private void testConcurrentAddRemove() throws InterruptedException {
MVStore s = MVStore.open(null);
int threadCount = 3;
final int keyCount = 2;
final TransactionStore ts = new TransactionStore(s);
ts.init();
final Random r = new Random(1);
Task[] tasks = new Task[threadCount];
for (int i = 0; i < threadCount; i++) {
Task task = new Task() {
@Override
public void call() throws Exception {
TransactionMap<Integer, Integer> map = null;
while (!stop) {
Transaction tx = ts.begin();
map = tx.openMap("data");
int k = r.nextInt(keyCount);
try {
map.remove(k);
map.put(k, r.nextInt());
} catch (IllegalStateException e) {
// ignore and retry
}
tx.commit();
}
}
};
task.execute();
tasks[i] = task;
}
Thread.sleep(1000);
for (Task t : tasks) {
t.get();
}
s.close();
}
private void testConcurrentAdd() { private void testConcurrentAdd() {
MVStore s; MVStore s;
s = MVStore.open(null); s = MVStore.open(null);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论