提交 22a4be5b authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: support concurrent transactions (MVCC style, should match the behavior…

MVStore: support concurrent transactions (MVCC style, should match the behavior of PostgreSQL, now just read-committed)
上级 5c4c6e46
...@@ -9,6 +9,7 @@ package org.h2.test.store; ...@@ -9,6 +9,7 @@ package org.h2.test.store;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
...@@ -37,16 +38,17 @@ public class TestTransactionMap extends TestBase { ...@@ -37,16 +38,17 @@ public class TestTransactionMap extends TestBase {
} }
public void test() throws Exception { public void test() throws Exception {
testConcurrentTransactions(); testConcurrentTransactionsReadCommitted();
testSingleConnection(); testSingleConnection();
// testCompareWithPostgreSQL(); testCompareWithPostgreSQL();
} }
public void testCompareWithPostgreSQL() throws Exception { public void testCompareWithPostgreSQL() throws Exception {
ArrayList<Statement> statements = New.arrayList(); ArrayList<Statement> statements = New.arrayList();
ArrayList<Transaction> transactions = New.arrayList(); ArrayList<Transaction> transactions = New.arrayList();
ArrayList<TransactionalMap<Integer, String>> maps = New.arrayList(); ArrayList<TransactionalMap<Integer, String>> maps = New.arrayList();
int connectionCount = 4, opCount = 1000, rowCount = 10; // TODO there are still problems with 1000 operations
int connectionCount = 4, opCount = 100, rowCount = 10;
try { try {
Class.forName("org.postgresql.Driver"); Class.forName("org.postgresql.Driver");
for (int i = 0; i < connectionCount; i++) { for (int i = 0; i < connectionCount; i++) {
...@@ -67,8 +69,10 @@ public class TestTransactionMap extends TestBase { ...@@ -67,8 +69,10 @@ public class TestTransactionMap extends TestBase {
TransactionalStore ts = new TransactionalStore(s); TransactionalStore ts = new TransactionalStore(s);
for (int i = 0; i < connectionCount; i++) { for (int i = 0; i < connectionCount; i++) {
Statement stat = statements.get(i); Statement stat = statements.get(i);
// 500 ms to avoid blocking (the test is single threaded)
stat.execute("set statement_timeout to 500");
Connection c = stat.getConnection(); Connection c = stat.getConnection();
c.setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ); c.setTransactionIsolation(Connection.TRANSACTION_READ_COMMITTED);
c.setAutoCommit(false); c.setAutoCommit(false);
Transaction transaction = ts.begin(); Transaction transaction = ts.begin();
transactions.add(transaction); transactions.add(transaction);
...@@ -119,10 +123,6 @@ public class TestTransactionMap extends TestBase { ...@@ -119,10 +123,6 @@ public class TestTransactionMap extends TestBase {
break; break;
case 2: case 2:
// insert or update // insert or update
if (i == 98) {
int test;
System.out.println(map.get(x));
}
String old = map.get(x); String old = map.get(x);
if (old == null) { if (old == null) {
buff.append("insert " + x + "=" + y); buff.append("insert " + x + "=" + y);
...@@ -136,10 +136,6 @@ public class TestTransactionMap extends TestBase { ...@@ -136,10 +136,6 @@ public class TestTransactionMap extends TestBase {
if (map.tryPut(x, "" + y)) { if (map.tryPut(x, "" + y)) {
int c = stat.executeUpdate("update test set name = '" + y int c = stat.executeUpdate("update test set name = '" + y
+ "' where id = " + x); + "' where id = " + x);
if (c == 0) {
int test;
System.out.println(map.get(x));
}
assertEquals(1, c); assertEquals(1, c);
} else { } else {
// TODO how to check for locked rows in PostgreSQL? // TODO how to check for locked rows in PostgreSQL?
...@@ -148,16 +144,30 @@ public class TestTransactionMap extends TestBase { ...@@ -148,16 +144,30 @@ public class TestTransactionMap extends TestBase {
break; break;
case 3: case 3:
buff.append("delete " + x); buff.append("delete " + x);
stat.execute("delete from test where id = " + x); try {
map.put(x, null); int c = stat.executeUpdate("delete from test where id = " + x);
if (c == 1) {
map.put(x, null);
} else {
assertNull(map.get(x));
}
} catch (SQLException e) {
// statement timeout
assertFalse(map.tryPut(x, null));
}
break; break;
case 4: case 4:
case 5: case 5:
case 6: case 6:
ResultSet rs = stat.executeQuery("select * from test where id = " + x); try {
String expected = rs.next() ? rs.getString(2) : null; ResultSet rs = stat.executeQuery("select * from test where id = " + x);
buff.append("select " + x + "=" + expected); String expected = rs.next() ? rs.getString(2) : null;
assertEquals(expected, map.get(x)); buff.append("select " + x + "=" + expected);
assertEquals(expected, map.get(x));
} catch (SQLException e) {
// statement timeout
// strange, I thought PostgreSQL doesn't block
}
break; break;
} }
buff.append('\n'); buff.append('\n');
...@@ -171,7 +181,7 @@ public class TestTransactionMap extends TestBase { ...@@ -171,7 +181,7 @@ public class TestTransactionMap extends TestBase {
} }
} }
public void testConcurrentTransactions() { public void testConcurrentTransactionsReadCommitted() {
MVStore s = MVStore.open(null); MVStore s = MVStore.open(null);
TransactionalStore ts = new TransactionalStore(s); TransactionalStore ts = new TransactionalStore(s);
...@@ -202,19 +212,24 @@ public class TestTransactionMap extends TestBase { ...@@ -202,19 +212,24 @@ public class TestTransactionMap extends TestBase {
m1.put("1", "Hallo"); m1.put("1", "Hallo");
m1.put("2", null); m1.put("2", null);
m1.put("3", "!"); m1.put("3", "!");
tx1.commit();
assertEquals("Hello", m2.get("1")); assertEquals("Hello", m2.get("1"));
assertEquals("World", m2.get("2")); assertEquals("World", m2.get("2"));
assertNull(m2.get("3")); assertNull(m2.get("3"));
tx1.commit();
assertEquals("Hallo", m2.get("1"));
assertNull(m2.get("2"));
assertEquals("!", m2.get("3"));
// even thought the row is locked, tx1 = ts.begin();
// trying to remove it should work, as m1 = tx1.openMap("test");
// this key is unknown to this map m1.put("2", "World");
m2.put("3", null);
// the row is locked, and trying to add a value assertNull(m2.get("2"));
// should fail assertFalse(m2.tryPut("2", null));
assertFalse(m2.tryPut("3", ".")); assertFalse(m2.tryPut("2", "Welt"));
s.close(); s.close();
} }
...@@ -460,18 +475,16 @@ public class TestTransactionMap extends TestBase { ...@@ -460,18 +475,16 @@ public class TestTransactionMap extends TestBase {
private Transaction transaction; private Transaction transaction;
/** /**
* The newest version of the data. Key: key Value: { transactionId, * The newest version of the data. Key: key. Value: { transactionId,
* value } * value }
*/ */
private final MVMap<K, Object[]> map; private final MVMap<K, Object[]> map;
private final int mapId; private final int mapId;
private final MVMap<K, Object[]> oldMap;
TransactionalMap(Transaction transaction, String name) { TransactionalMap(Transaction transaction, String name) {
this.transaction = transaction; this.transaction = transaction;
map = transaction.store.store.openMap(name); map = transaction.store.store.openMap(name);
mapId = map.getId(); mapId = map.getId();
oldMap = map.openVersion(transaction.baseVersion);
} }
private void checkOpen() { private void checkOpen() {
...@@ -509,18 +522,6 @@ public class TestTransactionMap extends TestBase { ...@@ -509,18 +522,6 @@ public class TestTransactionMap extends TestBase {
} }
public boolean tryPut(K key, V value) { public boolean tryPut(K key, V value) {
if (tryReplace(key, value)) {
return true;
}
if (value == null) {
// trying to remove a row that is invisible to this
// transaction is a no-op
return true;
}
return false;
}
public boolean tryReplace(K key, V value) {
Object[] current = map.get(key); Object[] current = map.get(key);
Object[] newValue = { transaction.transactionId, value }; Object[] newValue = { transaction.transactionId, value };
if (current == null) { if (current == null) {
...@@ -532,71 +533,57 @@ public class TestTransactionMap extends TestBase { ...@@ -532,71 +533,57 @@ public class TestTransactionMap extends TestBase {
} }
return false; return false;
} }
Object[] old = oldMap.get(key); long tx = ((Long) current[0]).longValue();
if (old == null) {
// added by another transaction
return false;
}
long tx = ((Long) old[0]).longValue();
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
// update using the same transaction // added or updated by this transaction
if (map.replace(key, current, newValue)) { if (map.replace(key, current, newValue)) {
transaction.log(transaction.baseVersion, mapId, key); transaction.log(transaction.baseVersion, mapId, key);
return true; return true;
} }
// strange, somebody overwrite the value
// even thought the change was not committed
return false; return false;
} }
// added or updated by another transaction
Long base = transaction.store.openTransactions.get(tx); Long base = transaction.store.openTransactions.get(tx);
if (base == null) { if (base == null) {
// from a transaction that was committed // the transaction is committed:
// when this transaction began:
// overwrite the value // overwrite the value
if (map.replace(key, old, newValue)) { if (map.replace(key, current, newValue)) {
transaction.log(transaction.baseVersion, mapId, key); transaction.log(transaction.baseVersion, mapId, key);
return true; return true;
} }
// somebody else was faster
return false;
} }
// the transaction is not yet committed
return false; return false;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
V get(K key) { V get(K key) {
checkOpen(); checkOpen();
Object[] old = oldMap.get(key);
Object[] current = map.get(key); Object[] current = map.get(key);
long tx; long tx;
if (old == null) { if (current == null) {
if (current == null) { // doesn't exist or deleted by a committed transaction
// didn't exist before and doesn't exist now
return null;
}
tx = ((Long) current[0]).longValue();
if (tx == transaction.transactionId) {
// added by this transaction
return (V) current[1];
}
// added by another transaction
return null; return null;
} else if (current == null) {
// deleted by a committed transaction
// which means not by the current transaction
tx = ((Long) old[0]).longValue();
} else {
tx = ((Long) current[0]).longValue();
if (tx == transaction.transactionId) {
// updated by this transaction
return (V) current[1];
}
} }
// updated by another transaction tx = ((Long) current[0]).longValue();
if (tx == transaction.transactionId) {
// added by this transaction
return (V) current[1];
}
// added or updated by another transaction
Long base = transaction.store.openTransactions.get(tx); Long base = transaction.store.openTransactions.get(tx);
if (base == null) { if (base == null) {
// it was committed // it is committed
return (V) old[1]; return (V) current[1];
} }
tx = ((Long) current[0]).longValue();
// get the value before the uncommitted transaction // get the value before the uncommitted transaction
MVMap<K, Object[]> olderMap = map.openVersion(base.longValue()); MVMap<K, Object[]> oldMap = map.openVersion(base.longValue());
old = olderMap.get(key); Object[] old = oldMap.get(key);
if (old == null) { if (old == null) {
// there was none // there was none
return null; return null;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论