提交 c0b25204 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore transactions: a thread could see a change of a different thread within a…

MVStore transactions: a thread could see a change of a different thread within a different map. Pull request #153.
上级 ab37045c
...@@ -21,6 +21,9 @@ Change Log ...@@ -21,6 +21,9 @@ Change Log
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul> <ul>
<li>MVStore transactions: a thread could see a change of a different thread
within a different map. Pull request #153.
</li>
<li>H2 Console: improved IBM DB2 compatibility. <li>H2 Console: improved IBM DB2 compatibility.
</li> </li>
<li>A thread deadlock detector (disabled by default) can help <li>A thread deadlock detector (disabled by default) can help
......
...@@ -1199,12 +1199,16 @@ public class TransactionStore { ...@@ -1199,12 +1199,16 @@ public class TransactionStore {
} }
private VersionedValue getValue(K key, long maxLog) { private VersionedValue getValue(K key, long maxLog) {
synchronized (transaction.store.undoLog) { synchronized (getUndoLog()) {
VersionedValue data = map.get(key); VersionedValue data = map.get(key);
return getValue(key, maxLog, data); return getValue(key, maxLog, data);
} }
} }
Object getUndoLog() {
return transaction.store.undoLog;
}
/** /**
* Get the versioned value for the given key. * Get the versioned value for the given key.
* *
...@@ -1215,10 +1219,10 @@ public class TransactionStore { ...@@ -1215,10 +1219,10 @@ public class TransactionStore {
*/ */
VersionedValue getValue(K key, long maxLog, VersionedValue data) { VersionedValue getValue(K key, long maxLog, VersionedValue data) {
if (MVStore.ASSERT) { if (MVStore.ASSERT) {
if (!Thread.holdsLock(transaction.store.undoLog)) { if (!Thread.holdsLock(getUndoLog())) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL, DataUtils.ERROR_INTERNAL,
"getValue not invoked while synchronized on undoLog"); "not synchronized on undoLog");
} }
} }
while (true) { while (true) {
...@@ -1449,8 +1453,8 @@ public class TransactionStore { ...@@ -1449,8 +1453,8 @@ public class TransactionStore {
private void fetchNext() { private void fetchNext() {
while (cursor.hasNext()) { while (cursor.hasNext()) {
synchronized (getUndoLog()) {
K k; K k;
synchronized (transaction.store.undoLog) {
try { try {
k = cursor.next(); k = cursor.next();
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
......
...@@ -15,7 +15,6 @@ import java.util.Iterator; ...@@ -15,7 +15,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import org.h2.mvstore.DataUtils; import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
...@@ -129,6 +128,7 @@ public class TestTransactionStore extends TestBase { ...@@ -129,6 +128,7 @@ public class TestTransactionStore extends TestBase {
try { try {
map.put(k, r.nextInt()); map.put(k, r.nextInt());
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
//System.out.println("a: " + e);
failCount.incrementAndGet(); failCount.incrementAndGet();
// ignore and retry // ignore and retry
} }
...@@ -149,13 +149,15 @@ public class TestTransactionStore extends TestBase { ...@@ -149,13 +149,15 @@ public class TestTransactionStore extends TestBase {
try { try {
map.put(k, r.nextInt()); map.put(k, r.nextInt());
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
//System.out.println("b: " + e);
failCount.incrementAndGet(); failCount.incrementAndGet();
// ignore and retry // ignore and retry
} }
tx.commit(); tx.commit();
} }
// we expect at least half the operations were successful // we expect at least 10% the operations were successful
assertTrue(failCount.toString(), failCount.get() < count / 2); assertTrue(failCount.toString() + " >= " + (count * 0.9),
failCount.get() < count * 0.9);
// we expect at least a few failures // we expect at least a few failures
assertTrue(failCount.toString(), failCount.get() > 0); assertTrue(failCount.toString(), failCount.get() > 0);
s.close(); s.close();
...@@ -983,152 +985,58 @@ public class TestTransactionStore extends TestBase { ...@@ -983,152 +985,58 @@ public class TestTransactionStore extends TestBase {
s.close(); s.close();
} }
// Internal helper class private void testStoreMultiThreadedReads() throws Exception {
private static class Sync extends AbstractQueuedSynchronizer { MVStore s = MVStore.open(null);
private static final int FREE = 0; final TransactionStore ts = new TransactionStore(s);
private static final int LOCKED = 1;
private static final long serialVersionUID = 1L;
// Reports whether in locked state by this thread
@Override
protected boolean isHeldExclusively() {
return getState() == LOCKED &&
this.getExclusiveOwnerThread() == Thread.currentThread();
}
@Override
public boolean tryAcquire(int newState) {
assert newState == LOCKED;
if (getFirstQueuedThread() != Thread.currentThread() &&
hasQueuedThreads()) {
// prevent barging
return false;
}
if (compareAndSetState(FREE, LOCKED)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// someone else won the race
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() != LOCKED) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(FREE);
// generation == state now (free)
return true;
}
}
class ReadUpdateWriteTask extends Task {
final int n;
final TransactionStore store;
final Sync sync;
public ReadUpdateWriteTask(int n, TransactionStore store, Sync sync) { ts.init();
this.n = n; Transaction t = ts.begin();
this.store = store; TransactionMap<Integer, Integer> mapA = t.openMap("a");
this.sync = sync; mapA.put(1, 0);
} t.commit();
/** Task task = new Task() {
* This tests the synchronization of the undoLog with the getting of
* committed values when other threads are committing to a different
* map. The failure became noticeable when the transaction "window" was
* changed to a <code>BitSet</code>. This caused the reuse of recent
* transaction id's so that the lack of proper undoLog synchronization
* cause the stale uncommitted value to be decoded using a changed
* undoLog. This test should (with the fix) run in multiple threads
* without a problem.
*/
@Override @Override
public void call() { public void call() throws Exception {
Transaction tx; for (int i = 0; !stop; i++) {
int j = 0; Transaction tx = ts.begin();
// Fails reliably on i5 3.4 quad core in the first few hundred TransactionMap<Integer, Integer> mapA = tx.openMap("a");
// iterations while (!mapA.tryPut(1, i)) {
for (int i = 0; i < 10000; i++) { // repeat
tx = store.begin();
TransactionMap<Object, Object> txMapA = tx.openMap("a");
Object o;
// retrieve the control value (key = 1)
Object key = txMapA.higherKey(0);
if (key == null) {
// this is the potential failure point; they key is null
// because the undoLog entry
// examined was for the same transaction id in the B map,
// not this one
result = "null key for control map on try #" + i;
return;
}
// now serialize all threads
sync.acquire(1);
j = ((o = txMapA.get(key)) instanceof Integer) ? ((Integer) o)
: -1;
if (j < 0) {
result = "get of control value key " + key + " not valid";
return;
}
// update the control (map A) value
boolean ok = txMapA.tryPut(key, ++j);
if (!ok) {
result = "transactional put of " + j + " to key " + key
+ " failed";
return;
} }
// commit the map A transaction
tx.commit(); tx.commit();
// release the other thread(s) [they will immediately attempt to
// read the map A key] // map B transaction
// one of them will get the lock, and another one or more will // the other thread will get a map A uncommitted value,
// get a map A uncommitted value // but by the time it tries to walk back to the committed
// but by the time they try to walk back to the committed value, // value, the undoLog has changed
// the undoLog has changed out from tx = ts.begin();
// under them TransactionMap<Integer, Integer> mapB = tx.openMap("b");
sync.release(0); // put a new value to the map; this will cause a map B
// begin a map B transaction // undoLog entry to be created with a null pre-image value
tx = store.begin(); mapB.tryPut(i, -i);
TransactionMap<Object, Object> txMapB = tx.openMap("b"); // this is where the real race condition occurs:
// put a new value to the map; this will cause a map B undoLog
// entry to be created with a null pre-image value
txMapB.tryPut(j, n);
// This is where the real race condition occurs
// some other thread might get the B log entry // some other thread might get the B log entry
// for this transaction rather than the uncommitted A log entry // for this transaction rather than the uncommitted A log
// they are expecting // entry it is expecting
tx.commit(); tx.commit();
} }
} }
};
task.execute();
try {
for (int i = 0; i < 10000; i++) {
Transaction tx = ts.begin();
mapA = tx.openMap("a");
if (mapA.get(1) == null) {
throw new AssertionError("key not found");
} }
tx.commit();
private void testStoreMultiThreadedReads() throws Exception {
MVStore s = MVStore.open(null);
TransactionStore ts = new TransactionStore(s);
ts.init();
Sync sync = new Sync();
Transaction t = ts.begin();
TransactionMap<Object, Object> txMapA = t.openMap("a");
txMapA.put(1, 0);
t.commit();
final int threadCount = 3;
Task[] tasks = new Task[threadCount];
for (int i = 0; i < tasks.length; i++)
(tasks[i] = new ReadUpdateWriteTask(i, ts, sync)).execute();
for (int i = 0; i < tasks.length; i++) {
Object result = (String) tasks[i].get();
if (result instanceof String) {
fail((String) result);
} }
} finally {
task.get();
} }
ts.close(); ts.close();
} }
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论