提交 7dca5fdc authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: support concurrent transactions (PostgreSQL read-committed)

上级 2ebd3286
......@@ -16,6 +16,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import org.h2.mvstore.Cursor;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVMapConcurrent;
......@@ -47,8 +48,7 @@ public class TestTransactionMap extends TestBase {
ArrayList<Statement> statements = New.arrayList();
ArrayList<Transaction> transactions = New.arrayList();
ArrayList<TransactionalMap<Integer, String>> maps = New.arrayList();
// TODO there are still problems with 1000 operations
int connectionCount = 4, opCount = 100, rowCount = 10;
int connectionCount = 3, opCount = 1000, rowCount = 10;
try {
Class.forName("org.postgresql.Driver");
for (int i = 0; i < connectionCount; i++) {
......@@ -69,8 +69,8 @@ public class TestTransactionMap extends TestBase {
TransactionalStore ts = new TransactionalStore(s);
for (int i = 0; i < connectionCount; i++) {
Statement stat = statements.get(i);
// 500 ms to avoid blocking (the test is single threaded)
stat.execute("set statement_timeout to 500");
// 100 ms to avoid blocking (the test is single threaded)
stat.execute("set statement_timeout to 100");
Connection c = stat.getConnection();
c.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
c.setAutoCommit(false);
......@@ -98,16 +98,25 @@ public class TestTransactionMap extends TestBase {
// read all data, to get a snapshot
ResultSet rs = stat.executeQuery(
"select * from test order by id");
buff.append("[" + connIndex + "]=");
buff.append(i).append(": [" + connIndex + "]=");
int size = 0;
while (rs.next()) {
buff.append(' ');
buff.append(rs.getInt(1)).append(':').append(rs.getString(2));
int k = rs.getInt(1);
String v = rs.getString(2);
buff.append(k).append(':').append(v);
assertEquals(v, map.get(k));
size++;
}
buff.append('\n');
if (size != map.size()) {
assertEquals(size, map.size());
}
}
int x = r.nextInt(rowCount);
int y = r.nextInt(rowCount);
buff.append("[" + connIndex + "]: ");
buff.append(i).append(": [" + connIndex + "]: ");
ResultSet rs = null;
switch (r.nextInt(7)) {
case 0:
buff.append("commit");
......@@ -129,7 +138,9 @@ public class TestTransactionMap extends TestBase {
if (map.tryPut(x, "" + y)) {
stat.execute("insert into test values(" + x + ", '" + y + "')");
} else {
// TODO how to check for locked rows in PostgreSQL?
buff.append(" -> row was locked");
// the statement would time out in PostgreSQL
// TODO test sometimes if timeout occurs
}
} else {
buff.append("update " + x + "=" + y + " (old:" + old + ")");
......@@ -138,7 +149,9 @@ public class TestTransactionMap extends TestBase {
+ "' where id = " + x);
assertEquals(1, c);
} else {
// TODO how to check for locked rows in PostgreSQL?
buff.append(" -> row was locked");
// the statement would time out in PostgreSQL
// TODO test sometimes if timeout occurs
}
}
break;
......@@ -152,22 +165,22 @@ public class TestTransactionMap extends TestBase {
assertNull(map.get(x));
}
} catch (SQLException e) {
// statement timeout
assertTrue(map.get(x) != null);
assertFalse(map.tryPut(x, null));
// PostgreSQL needs to rollback
buff.append(" -> rollback");
stat.getConnection().rollback();
transaction.rollback();
transactions.set(connIndex, null);
}
break;
case 4:
case 5:
case 6:
try {
ResultSet rs = stat.executeQuery("select * from test where id = " + x);
String expected = rs.next() ? rs.getString(2) : null;
buff.append("select " + x + "=" + expected);
assertEquals(expected, map.get(x));
} catch (SQLException e) {
// statement timeout
// strange, I thought PostgreSQL doesn't block
}
rs = stat.executeQuery("select * from test where id = " + x);
String expected = rs.next() ? rs.getString(2) : null;
buff.append("select " + x + "=" + expected);
assertEquals("i:" + i, expected, map.get(x));
break;
}
buff.append('\n');
......@@ -230,7 +243,12 @@ public class TestTransactionMap extends TestBase {
assertNull(m2.get("2"));
assertFalse(m2.tryPut("2", null));
assertFalse(m2.tryPut("2", "Welt"));
tx2 = ts.begin();
m2 = tx2.openMap("test");
m1.put("2", null);
assertNull(m2.get("2"));
s.close();
}
......@@ -333,6 +351,8 @@ public class TestTransactionMap extends TestBase {
settings = store.openMap("settings");
openTransactions = store.openMap("openTransactions",
new MVMapConcurrent.Builder<Long, Long>());
// TODO one undo log per transaction to speed up commit
// (alternative: add a range delete operation for maps)
undoLog = store.openMap("undoLog",
new MVMapConcurrent.Builder<long[], Object[]>());
}
......@@ -387,28 +407,25 @@ public class TestTransactionMap extends TestBase {
String mapName = DataUtils.parseMap(m).get("name");
MVMap<Object, Object[]> map = store.openMap(mapName);
Object key = op[2];
Object[] value = map.get(key);
if (success) {
Object[] value = map.get(key);
if (value[1] == null) {
if (value[2] == null) {
// remove the value
map.remove(key);
}
} else if (value == null) {
// the value was deleted afterwards
// TODO how can this happen?
} else {
long baseVersion = ((Long) op[0]).longValue();
Object[] old;
if (baseVersion <= map.getCreateVersion()) {
// the map didn't exist yet
old = null;
} else {
MVMap<Object, Object[]> mapOld = map
.openVersion(baseVersion - 1);
old = mapOld.get(key);
}
if (old == null) {
Long oldVersion = (Long) value[1];
if (oldVersion == null) {
// this transaction added the value
map.remove(key);
} else {
// this transaction updated the value
MVMap<Object, Object[]> mapOld = map
.openVersion(oldVersion);
Object[] old = mapOld.get(key);
map.put(key, old);
}
}
......@@ -427,13 +444,11 @@ public class TestTransactionMap extends TestBase {
final TransactionalStore store;
final long transactionId;
long logId;
long baseVersion;
private boolean closed;
Transaction(TransactionalStore store, long transactionId) {
this.store = store;
this.transactionId = transactionId;
this.baseVersion = store.store.incrementVersion();
}
void log(long baseVersion, int mapId, Object key) {
......@@ -475,8 +490,9 @@ public class TestTransactionMap extends TestBase {
private Transaction transaction;
/**
* The newest version of the data. Key: key. Value: { transactionId,
* value }
* The newest version of the data.
* Key: key.
* Value: { transactionId, oldVersion, value }
*/
private final MVMap<K, Object[]> map;
private final int mapId;
......@@ -487,6 +503,19 @@ public class TestTransactionMap extends TestBase {
mapId = map.getId();
}
public long size() {
// TODO this method is very slow
long size = 0;
Cursor<K> cursor = map.keyIterator(null);
while (cursor.hasNext()) {
K key = cursor.next();
if (get(key) != null) {
size++;
}
}
return size;
}
private void checkOpen() {
transaction.checkOpen();
}
......@@ -523,12 +552,14 @@ public class TestTransactionMap extends TestBase {
public boolean tryPut(K key, V value) {
Object[] current = map.get(key);
Object[] newValue = { transaction.transactionId, value };
long oldVersion = transaction.store.store.getCurrentVersion() - 1;
Object[] newValue = { transaction.transactionId, oldVersion, value };
if (current == null) {
// a new value
newValue[1] = null;
Object[] old = map.putIfAbsent(key, newValue);
if (old == null) {
transaction.log(transaction.baseVersion, mapId, key);
transaction.log(oldVersion, mapId, key);
return true;
}
return false;
......@@ -536,8 +567,12 @@ public class TestTransactionMap extends TestBase {
long tx = ((Long) current[0]).longValue();
if (tx == transaction.transactionId) {
// added or updated by this transaction
// use the previous oldVersion
newValue[1] = current[1];
if (map.replace(key, current, newValue)) {
transaction.log(transaction.baseVersion, mapId, key);
// we already have a log entry, so don't log
// TODO does not work when using savepoints
// transaction.log(oldVersion, mapId, key);
return true;
}
// strange, somebody overwrite the value
......@@ -550,7 +585,7 @@ public class TestTransactionMap extends TestBase {
// the transaction is committed:
// overwrite the value
if (map.replace(key, current, newValue)) {
transaction.log(transaction.baseVersion, mapId, key);
transaction.log(oldVersion, mapId, key);
return true;
}
// somebody else was faster
......@@ -572,24 +607,29 @@ public class TestTransactionMap extends TestBase {
tx = ((Long) current[0]).longValue();
if (tx == transaction.transactionId) {
// added by this transaction
return (V) current[1];
return (V) current[2];
}
// added or updated by another transaction
Long base = transaction.store.openTransactions.get(tx);
if (base == null) {
// it is committed
return (V) current[1];
return (V) current[2];
}
tx = ((Long) current[0]).longValue();
// get the value before the uncommitted transaction
MVMap<K, Object[]> oldMap = map.openVersion(base.longValue());
if (current[1] == null) {
// a new entry
return null;
}
long oldVersion = (Long) current[1];
MVMap<K, Object[]> oldMap = map.openVersion(oldVersion);
Object[] old = oldMap.get(key);
if (old == null) {
// there was none
return null;
}
// the previous committed value
return (V) old[1];
return (V) old[2];
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论