提交 ab37045c authored 作者: Thomas Mueller's avatar Thomas Mueller

Merge pull request #153 from k5mwe/master

Test case for undoLog synchronization race and proposed fix
...@@ -933,9 +933,11 @@ public class TransactionStore { ...@@ -933,9 +933,11 @@ public class TransactionStore {
long size = 0; long size = 0;
Cursor<K, VersionedValue> cursor = map.cursor(null); Cursor<K, VersionedValue> cursor = map.cursor(null);
while (cursor.hasNext()) { while (cursor.hasNext()) {
K key = cursor.next(); VersionedValue data;
VersionedValue data = cursor.getValue(); synchronized (transaction.store.undoLog) {
data = getValue(key, readLogId, data); K key = cursor.next();
data = getValue(key, readLogId, cursor.getValue());
}
if (data != null && data.value != null) { if (data != null && data.value != null) {
size++; size++;
} }
...@@ -1197,8 +1199,10 @@ public class TransactionStore { ...@@ -1197,8 +1199,10 @@ public class TransactionStore {
} }
private VersionedValue getValue(K key, long maxLog) { private VersionedValue getValue(K key, long maxLog) {
VersionedValue data = map.get(key); synchronized (transaction.store.undoLog) {
return getValue(key, maxLog, data); VersionedValue data = map.get(key);
return getValue(key, maxLog, data);
}
} }
/** /**
...@@ -1210,6 +1214,13 @@ public class TransactionStore { ...@@ -1210,6 +1214,13 @@ public class TransactionStore {
* @return the value * @return the value
*/ */
VersionedValue getValue(K key, long maxLog, VersionedValue data) { VersionedValue getValue(K key, long maxLog, VersionedValue data) {
if (MVStore.ASSERT) {
if (!Thread.holdsLock(transaction.store.undoLog)) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL,
"getValue not invoked while synchronized on undoLog");
}
}
while (true) { while (true) {
if (data == null) { if (data == null) {
// doesn't exist or deleted by a committed transaction // doesn't exist or deleted by a committed transaction
...@@ -1229,9 +1240,7 @@ public class TransactionStore { ...@@ -1229,9 +1240,7 @@ public class TransactionStore {
} }
// get the value before the uncommitted transaction // get the value before the uncommitted transaction
Object[] d; Object[] d;
synchronized (transaction.store.undoLog) { d = transaction.store.undoLog.get(id);
d = transaction.store.undoLog.get(id);
}
if (d == null) { if (d == null) {
// this entry should be committed or rolled back // this entry should be committed or rolled back
// in the meantime (the transaction might still be open) // in the meantime (the transaction might still be open)
...@@ -1441,36 +1450,38 @@ public class TransactionStore { ...@@ -1441,36 +1450,38 @@ public class TransactionStore {
private void fetchNext() { private void fetchNext() {
while (cursor.hasNext()) { while (cursor.hasNext()) {
K k; K k;
try { synchronized (transaction.store.undoLog) {
k = cursor.next(); try {
} catch (IllegalStateException e) {
// TODO this is a bit ugly
if (DataUtils.getErrorCode(e.getMessage()) ==
DataUtils.ERROR_CHUNK_NOT_FOUND) {
cursor = map.cursor(currentKey);
// we (should) get the current key again,
// we need to ignore that one
if (!cursor.hasNext()) {
break;
}
cursor.next();
if (!cursor.hasNext()) {
break;
}
k = cursor.next(); k = cursor.next();
} else { } catch (IllegalStateException e) {
throw e; // TODO this is a bit ugly
if (DataUtils.getErrorCode(e.getMessage()) ==
DataUtils.ERROR_CHUNK_NOT_FOUND) {
cursor = map.cursor(currentKey);
// we (should) get the current key again,
// we need to ignore that one
if (!cursor.hasNext()) {
break;
}
cursor.next();
if (!cursor.hasNext()) {
break;
}
k = cursor.next();
} else {
throw e;
}
}
final K key = k;
VersionedValue data = cursor.getValue();
data = getValue(key, readLogId, data);
if (data != null && data.value != null) {
@SuppressWarnings("unchecked")
final V value = (V) data.value;
current = new DataUtils.MapEntry<K, V>(key, value);
currentKey = key;
return;
} }
}
final K key = k;
VersionedValue data = cursor.getValue();
data = getValue(key, readLogId, data);
if (data != null && data.value != null) {
@SuppressWarnings("unchecked")
final V value = (V) data.value;
current = new DataUtils.MapEntry<K, V>(key, value);
currentKey = key;
return;
} }
} }
current = null; current = null;
......
...@@ -15,6 +15,7 @@ import java.util.Iterator; ...@@ -15,6 +15,7 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import org.h2.mvstore.DataUtils; import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
...@@ -60,6 +61,7 @@ public class TestTransactionStore extends TestBase { ...@@ -60,6 +61,7 @@ public class TestTransactionStore extends TestBase {
testConcurrentTransactionsReadCommitted(); testConcurrentTransactionsReadCommitted();
testSingleConnection(); testSingleConnection();
testCompareWithPostgreSQL(); testCompareWithPostgreSQL();
testStoreMultiThreadedReads();
} }
private void testConcurrentAddRemove() throws InterruptedException { private void testConcurrentAddRemove() throws InterruptedException {
...@@ -981,4 +983,152 @@ public class TestTransactionStore extends TestBase { ...@@ -981,4 +983,152 @@ public class TestTransactionStore extends TestBase {
s.close(); s.close();
} }
// Internal helper class
private static class Sync extends AbstractQueuedSynchronizer {
private static final int FREE = 0;
private static final int LOCKED = 1;
private static final long serialVersionUID = 1L;
// Reports whether in locked state by this thread
@Override
protected boolean isHeldExclusively() {
return getState() == LOCKED &&
this.getExclusiveOwnerThread() == Thread.currentThread();
}
@Override
public boolean tryAcquire(int newState) {
assert newState == LOCKED;
if (getFirstQueuedThread() != Thread.currentThread() &&
hasQueuedThreads()) {
// prevent barging
return false;
}
if (compareAndSetState(FREE, LOCKED)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// someone else won the race
return false;
}
@Override
protected boolean tryRelease(int arg) {
if (getState() != LOCKED) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(FREE);
// generation == state now (free)
return true;
}
}
class ReadUpdateWriteTask extends Task {
final int n;
final TransactionStore store;
final Sync sync;
public ReadUpdateWriteTask(int n, TransactionStore store, Sync sync) {
this.n = n;
this.store = store;
this.sync = sync;
}
/**
* This tests the synchronization of the undoLog with the getting of
* committed values when other threads are committing to a different
* map. The failure became noticeable when the transaction "window" was
* changed to a <code>BitSet</code>. This caused the reuse of recent
* transaction id's so that the lack of proper undoLog synchronization
* cause the stale uncommitted value to be decoded using a changed
* undoLog. This test should (with the fix) run in multiple threads
* without a problem.
*/
@Override
public void call() {
Transaction tx;
int j = 0;
// Fails reliably on i5 3.4 quad core in the first few hundred
// iterations
for (int i = 0; i < 10000; i++) {
tx = store.begin();
TransactionMap<Object, Object> txMapA = tx.openMap("a");
Object o;
// retrieve the control value (key = 1)
Object key = txMapA.higherKey(0);
if (key == null) {
// this is the potential failure point; they key is null
// because the undoLog entry
// examined was for the same transaction id in the B map,
// not this one
result = "null key for control map on try #" + i;
return;
}
// now serialize all threads
sync.acquire(1);
j = ((o = txMapA.get(key)) instanceof Integer) ? ((Integer) o)
: -1;
if (j < 0) {
result = "get of control value key " + key + " not valid";
return;
}
// update the control (map A) value
boolean ok = txMapA.tryPut(key, ++j);
if (!ok) {
result = "transactional put of " + j + " to key " + key
+ " failed";
return;
}
// commit the map A transaction
tx.commit();
// release the other thread(s) [they will immediately attempt to
// read the map A key]
// one of them will get the lock, and another one or more will
// get a map A uncommitted value
// but by the time they try to walk back to the committed value,
// the undoLog has changed out from
// under them
sync.release(0);
// begin a map B transaction
tx = store.begin();
TransactionMap<Object, Object> txMapB = tx.openMap("b");
// put a new value to the map; this will cause a map B undoLog
// entry to be created with a null pre-image value
txMapB.tryPut(j, n);
// This is where the real race condition occurs
// some other thread might get the B log entry
// for this transaction rather than the uncommitted A log entry
// they are expecting
tx.commit();
}
}
}
private void testStoreMultiThreadedReads() throws Exception {
MVStore s = MVStore.open(null);
TransactionStore ts = new TransactionStore(s);
ts.init();
Sync sync = new Sync();
Transaction t = ts.begin();
TransactionMap<Object, Object> txMapA = t.openMap("a");
txMapA.put(1, 0);
t.commit();
final int threadCount = 3;
Task[] tasks = new Task[threadCount];
for (int i = 0; i < tasks.length; i++)
(tasks[i] = new ReadUpdateWriteTask(i, ts, sync)).execute();
for (int i = 0; i < tasks.length; i++) {
Object result = (String) tasks[i].get();
if (result instanceof String) {
fail((String) result);
}
}
ts.close();
}
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论