提交 2184b359 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVTableEngine: concurrency

上级 1d417475
...@@ -103,7 +103,7 @@ public class TransactionStore { ...@@ -103,7 +103,7 @@ public class TransactionStore {
init(); init();
} }
private void init() { private synchronized void init() {
String s = settings.get(LAST_TRANSACTION_ID); String s = settings.get(LAST_TRANSACTION_ID);
if (s != null) { if (s != null) {
lastTransactionId = Long.parseLong(s); lastTransactionId = Long.parseLong(s);
...@@ -115,9 +115,11 @@ public class TransactionStore { ...@@ -115,9 +115,11 @@ public class TransactionStore {
DataUtils.ERROR_TRANSACTION_CORRUPT, DataUtils.ERROR_TRANSACTION_CORRUPT,
"Last transaction not stored"); "Last transaction not stored");
} }
if (undoLog.size() > 0) { synchronized (undoLog) {
long[] key = undoLog.firstKey(); if (undoLog.size() > 0) {
firstOpenTransaction = key[0]; long[] key = undoLog.firstKey();
firstOpenTransaction = key[0];
}
} }
} }
...@@ -126,29 +128,31 @@ public class TransactionStore { ...@@ -126,29 +128,31 @@ public class TransactionStore {
* *
* @return the list of transactions (sorted by id) * @return the list of transactions (sorted by id)
*/ */
public synchronized List<Transaction> getOpenTransactions() { public List<Transaction> getOpenTransactions() {
ArrayList<Transaction> list = New.arrayList(); synchronized (undoLog) {
long[] key = undoLog.firstKey(); ArrayList<Transaction> list = New.arrayList();
while (key != null) { long[] key = undoLog.firstKey();
long transactionId = key[0]; while (key != null) {
long[] end = { transactionId, Long.MAX_VALUE }; long transactionId = key[0];
key = undoLog.floorKey(end); long[] end = { transactionId, Long.MAX_VALUE };
long logId = key[1] + 1; key = undoLog.floorKey(end);
Object[] data = preparedTransactions.get(transactionId); long logId = key[1] + 1;
int status; Object[] data = preparedTransactions.get(transactionId);
String name; int status;
if (data == null) { String name;
status = Transaction.STATUS_OPEN; if (data == null) {
name = null; status = Transaction.STATUS_OPEN;
} else { name = null;
status = (Integer) data[0]; } else {
name = (String) data[1]; status = (Integer) data[0];
name = (String) data[1];
}
Transaction t = new Transaction(this, transactionId, status, name, logId);
list.add(t);
key = undoLog.higherKey(end);
} }
Transaction t = new Transaction(this, transactionId, status, name, logId); return list;
list.add(t);
key = undoLog.higherKey(end);
} }
return list;
} }
/** /**
...@@ -187,7 +191,7 @@ public class TransactionStore { ...@@ -187,7 +191,7 @@ public class TransactionStore {
* *
* @param t the transaction * @param t the transaction
*/ */
void storeTransaction(Transaction t) { synchronized void storeTransaction(Transaction t) {
if (t.getStatus() == Transaction.STATUS_PREPARED || t.getName() != null) { if (t.getStatus() == Transaction.STATUS_PREPARED || t.getName() != null) {
Object[] v = { t.getStatus(), t.getName() }; Object[] v = { t.getStatus(), t.getName() };
preparedTransactions.put(t.getId(), v); preparedTransactions.put(t.getId(), v);
...@@ -209,7 +213,9 @@ public class TransactionStore { ...@@ -209,7 +213,9 @@ public class TransactionStore {
commitIfNeeded(); commitIfNeeded();
long[] undoKey = { t.getId(), logId }; long[] undoKey = { t.getId(), logId };
Object[] log = new Object[] { opType, mapId, key, oldValue }; Object[] log = new Object[] { opType, mapId, key, oldValue };
undoLog.put(undoKey, log); synchronized (undoLog) {
undoLog.put(undoKey, log);
}
if (firstOpenTransaction == -1 || t.getId() < firstOpenTransaction) { if (firstOpenTransaction == -1 || t.getId() < firstOpenTransaction) {
firstOpenTransaction = t.getId(); firstOpenTransaction = t.getId();
} }
...@@ -225,31 +231,33 @@ public class TransactionStore { ...@@ -225,31 +231,33 @@ public class TransactionStore {
if (store.isClosed()) { if (store.isClosed()) {
return; return;
} }
for (long logId = 0; logId < maxLogId; logId++) { synchronized (undoLog) {
commitIfNeeded(); for (long logId = 0; logId < maxLogId; logId++) {
long[] undoKey = new long[] { t.getId(), logId }; commitIfNeeded();
Object[] op = undoLog.get(undoKey); long[] undoKey = new long[] { t.getId(), logId };
int opType = (Integer) op[0]; Object[] op = undoLog.get(undoKey);
if (opType == Transaction.OP_REMOVE) { int opType = (Integer) op[0];
int mapId = (Integer) op[1]; if (opType == Transaction.OP_REMOVE) {
MVMap<Object, VersionedValue> map = openMap(mapId); int mapId = (Integer) op[1];
Object key = op[2]; MVMap<Object, VersionedValue> map = openMap(mapId);
VersionedValue value = map.get(key); Object key = op[2];
// possibly the entry was added later on VersionedValue value = map.get(key);
// so we have to check // possibly the entry was added later on
if (value == null) { // so we have to check
// nothing to do if (value == null) {
} else if (value.value == null) { // nothing to do
// remove the value } else if (value.value == null) {
map.remove(key); // remove the value
map.remove(key);
}
} }
undoLog.remove(undoKey);
} }
undoLog.remove(undoKey);
} }
endTransaction(t); endTransaction(t);
} }
private MVMap<Object, VersionedValue> openMap(int mapId) { private synchronized MVMap<Object, VersionedValue> openMap(int mapId) {
// TODO open map by id if possible // TODO open map by id if possible
Map<String, String> meta = store.getMetaMap(); Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId); String m = meta.get("map." + mapId);
...@@ -277,23 +285,25 @@ public class TransactionStore { ...@@ -277,23 +285,25 @@ public class TransactionStore {
if (transactionId < firstOpenTransaction) { if (transactionId < firstOpenTransaction) {
return false; return false;
} }
if (firstOpenTransaction == -1) { synchronized (undoLog) {
if (undoLog.size() == 0) { if (firstOpenTransaction == -1) {
return false; if (undoLog.size() == 0) {
return false;
}
long[] key = undoLog.firstKey();
if (key == null) {
// unusual, but can happen
return false;
}
firstOpenTransaction = key[0];
} }
long[] key = undoLog.firstKey(); if (firstOpenTransaction == transactionId) {
if (key == null) { return true;
// unusual, but can happen
return false;
} }
firstOpenTransaction = key[0]; long[] key = { transactionId, -1 };
} key = undoLog.higherKey(key);
if (firstOpenTransaction == transactionId) { return key != null && key[0] == transactionId;
return true;
} }
long[] key = { transactionId, -1 };
key = undoLog.higherKey(key);
return key != null && key[0] == transactionId;
} }
/** /**
...@@ -301,7 +311,7 @@ public class TransactionStore { ...@@ -301,7 +311,7 @@ public class TransactionStore {
* *
* @param t the transaction * @param t the transaction
*/ */
void endTransaction(Transaction t) { synchronized void endTransaction(Transaction t) {
if (t.getStatus() == Transaction.STATUS_PREPARED) { if (t.getStatus() == Transaction.STATUS_PREPARED) {
preparedTransactions.remove(t.getId()); preparedTransactions.remove(t.getId());
} }
...@@ -322,24 +332,26 @@ public class TransactionStore { ...@@ -322,24 +332,26 @@ public class TransactionStore {
* @param toLogId the log id to roll back to * @param toLogId the log id to roll back to
*/ */
void rollbackTo(Transaction t, long maxLogId, long toLogId) { void rollbackTo(Transaction t, long maxLogId, long toLogId) {
for (long logId = maxLogId - 1; logId >= toLogId; logId--) { synchronized (undoLog) {
commitIfNeeded(); for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
long[] undoKey = new long[] { t.getId(), logId }; commitIfNeeded();
Object[] op = undoLog.get(undoKey); long[] undoKey = new long[] { t.getId(), logId };
int mapId = ((Integer) op[1]).intValue(); Object[] op = undoLog.get(undoKey);
MVMap<Object, VersionedValue> map = openMap(mapId); int mapId = ((Integer) op[1]).intValue();
if (map != null) { MVMap<Object, VersionedValue> map = openMap(mapId);
Object key = op[2]; if (map != null) {
VersionedValue oldValue = (VersionedValue) op[3]; Object key = op[2];
if (oldValue == null) { VersionedValue oldValue = (VersionedValue) op[3];
// this transaction added the value if (oldValue == null) {
map.remove(key); // this transaction added the value
} else { map.remove(key);
// this transaction updated the value } else {
map.put(key, oldValue); // this transaction updated the value
map.put(key, oldValue);
}
} }
undoLog.remove(undoKey);
} }
undoLog.remove(undoKey);
} }
} }
...@@ -363,23 +375,25 @@ public class TransactionStore { ...@@ -363,23 +375,25 @@ public class TransactionStore {
} }
private void fetchNext() { private void fetchNext() {
while (logId >= toLogId) { synchronized (undoLog) {
Object[] op = undoLog.get(new long[] { while (logId >= toLogId) {
t.getId(), logId }); Object[] op = undoLog.get(new long[] {
int mapId = ((Integer) op[1]).intValue(); t.getId(), logId });
// TODO open map by id if possible int mapId = ((Integer) op[1]).intValue();
Map<String, String> meta = store.getMetaMap(); // TODO open map by id if possible
String m = meta.get("map." + mapId); Map<String, String> meta = store.getMetaMap();
logId--; String m = meta.get("map." + mapId);
if (m == null) { logId--;
// map was removed later on if (m == null) {
} else { // map was removed later on
current = new Change(); } else {
current.mapName = DataUtils.parseMap(m).get("name"); current = new Change();
current.key = op[2]; current.mapName = DataUtils.parseMap(m).get("name");
VersionedValue oldValue = (VersionedValue) op[3]; current.key = op[2];
current.value = oldValue == null ? null : oldValue.value; VersionedValue oldValue = (VersionedValue) op[3];
return; current.value = oldValue == null ? null : oldValue.value;
return;
}
} }
} }
current = null; current = null;
...@@ -958,8 +972,10 @@ public class TransactionStore { ...@@ -958,8 +972,10 @@ public class TransactionStore {
} }
// get the value before the uncommitted transaction // get the value before the uncommitted transaction
long[] x = new long[] { tx, logId }; long[] x = new long[] { tx, logId };
Object[] d = transaction.store.undoLog.get(x); synchronized (transaction.store.undoLog) {
data = (VersionedValue) d[3]; Object[] d = transaction.store.undoLog.get(x);
data = (VersionedValue) d[3];
}
} }
} }
......
...@@ -680,10 +680,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1` ...@@ -680,10 +680,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
new TestRandomCompare().runTest(this); new TestRandomCompare().runTest(this);
new TestKillRestart().runTest(this); new TestKillRestart().runTest(this);
new TestKillRestartMulti().runTest(this); new TestKillRestartMulti().runTest(this);
if (!mvStore) { new TestMultiThreaded().runTest(this);
// concurrent modification of the undoLog
new TestMultiThreaded().runTest(this);
}
new TestOuterJoins().runTest(this); new TestOuterJoins().runTest(this);
new TestNestedJoins().runTest(this); new TestNestedJoins().runTest(this);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论