提交 60ab827b authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore / TransactionStore: concurrent updates could result in a "Too many open…

MVStore / TransactionStore: concurrent updates could result in a "Too many open transactions" exception.
上级 00101bcf
......@@ -17,7 +17,9 @@ Change Log
<h1>Change Log</h1>
<h2>Next Version (unreleased)</h2>
<ul><li>StringUtils.toUpperEnglish now has a small cache.
<ul><li>MVStore / TransactionStore: concurrent updates could result in a
"Too many open transactions" exception.
</li><li>StringUtils.toUpperEnglish now has a small cache.
This should speed up reading from a ResultSet when using the column name.
</li><li>MVStore: up to 65535 open transactions are now supported.
Previously, the limit was at most 65535 transactions between the oldest open and the
......
......@@ -287,9 +287,15 @@ public class TransactionStore {
* @param logId the log id
*/
public void logUndo(Transaction t, long logId) {
long[] undoKey = { t.getId(), logId };
Long undoKey = getOperationId(t.getId(), logId);
synchronized (undoLog) {
undoLog.remove(undoKey);
Object[] old = undoLog.remove(undoKey);
if (old == null) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
"Transaction {0} was concurrently rolled back",
t.getId());
}
}
}
......
......@@ -45,6 +45,7 @@ public class TestTransactionStore extends TestBase {
@Override
public void test() throws Exception {
FileUtils.createDirectories(getBaseDir());
testConcurrentAdd();
testCountWithOpenTransactions();
testConcurrentUpdate();
testRepeatedChange();
......@@ -60,6 +61,62 @@ public class TestTransactionStore extends TestBase {
testCompareWithPostgreSQL();
}
private void testConcurrentAdd() {
MVStore s;
s = MVStore.open(null);
final TransactionStore ts = new TransactionStore(s);
ts.init();
final Random r = new Random(1);
final AtomicInteger key = new AtomicInteger();
final AtomicInteger failCount = new AtomicInteger();
Task task = new Task() {
@Override
public void call() throws Exception {
Transaction tx = null;
TransactionMap<Integer, Integer> map = null;
while (!stop) {
int k = key.get();
tx = ts.begin();
map = tx.openMap("data");
try {
map.put(k, r.nextInt());
} catch (IllegalStateException e) {
failCount.incrementAndGet();
// ignore and retry
}
tx.commit();
}
}
};
task.execute();
Transaction tx = null;
int count = 10000;
TransactionMap<Integer, Integer> map = null;
for (int i = 0; i < count; i++) {
int k = i;
key.set(k);
tx = ts.begin();
map = tx.openMap("data");
try {
map.put(k, r.nextInt());
} catch (IllegalStateException e) {
failCount.incrementAndGet();
// ignore and retry
}
tx.commit();
}
// we expect at least half the operations were successful
assertTrue(failCount.toString(), failCount.get() < count / 2);
// we expect at least a few failures
assertTrue(failCount.toString(), failCount.get() > 0);
s.close();
}
private void testCountWithOpenTransactions() {
MVStore s;
TransactionStore ts;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论