提交 82884531 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: support concurrent transactions (PostgreSQL read-committed)

上级 7dca5fdc
...@@ -12,7 +12,6 @@ import java.sql.ResultSet; ...@@ -12,7 +12,6 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
...@@ -39,12 +38,45 @@ public class TestTransactionMap extends TestBase { ...@@ -39,12 +38,45 @@ public class TestTransactionMap extends TestBase {
} }
public void test() throws Exception { public void test() throws Exception {
testSavepoint();
testConcurrentTransactionsReadCommitted(); testConcurrentTransactionsReadCommitted();
testSingleConnection(); testSingleConnection();
testCompareWithPostgreSQL(); testCompareWithPostgreSQL();
} }
public void testCompareWithPostgreSQL() throws Exception { private void testSavepoint() throws Exception {
MVStore s = MVStore.open(null);
TransactionalStore ts = new TransactionalStore(s);
Transaction tx;
TransactionalMap<String, String> m;
tx = ts.begin();
m = tx.openMap("test");
m.put("1", "Hello");
m.put("2", "World");
m.put("1", "Hallo");
m.put("2", null);
m.put("3", "!");
long logId = tx.setSavepoint();
m.put("1", "Hi");
m.put("2", ".");
m.put("3", null);
tx.rollbackTo(logId);
assertEquals("Hallo", m.get("1"));
assertNull(m.get("2"));
assertEquals("!", m.get("3"));
tx.rollback();
tx = ts.begin();
m = tx.openMap("test");
assertNull(m.get("1"));
assertNull(m.get("2"));
assertNull(m.get("3"));
s.close();
}
private void testCompareWithPostgreSQL() throws Exception {
ArrayList<Statement> statements = New.arrayList(); ArrayList<Statement> statements = New.arrayList();
ArrayList<Transaction> transactions = New.arrayList(); ArrayList<Transaction> transactions = New.arrayList();
ArrayList<TransactionalMap<Integer, String>> maps = New.arrayList(); ArrayList<TransactionalMap<Integer, String>> maps = New.arrayList();
...@@ -246,8 +278,21 @@ public class TestTransactionMap extends TestBase { ...@@ -246,8 +278,21 @@ public class TestTransactionMap extends TestBase {
tx2 = ts.begin(); tx2 = ts.begin();
m2 = tx2.openMap("test"); m2 = tx2.openMap("test");
assertNull(m2.get("2"));
m1.put("2", null); m1.put("2", null);
assertNull(m2.get("2")); assertNull(m2.get("2"));
tx1.commit();
tx1 = ts.begin();
m1 = tx1.openMap("test");
assertNull(m1.get("2"));
m1.put("2", "World");
m1.put("2", "Welt");
tx1.rollback();
tx1 = ts.begin();
m1 = tx1.openMap("test");
assertNull(m1.get("2"));
s.close(); s.close();
} }
...@@ -386,21 +431,12 @@ public class TestTransactionMap extends TestBase { ...@@ -386,21 +431,12 @@ public class TestTransactionMap extends TestBase {
return new Transaction(this, transactionId); return new Transaction(this, transactionId);
} }
// TODO rollback in reverse order, public void commit(long transactionId, long maxLogId) {
// to support delete & add of the same key // TODO commit should be much faster
// with different baseVersions
// TODO return the undo operations instead,
// so that index changed can be undone
public void endTransaction(boolean success, long transactionId) {
Iterator<long[]> it = undoLog.keyIterator(new long[] {
transactionId, 0 });
store.incrementVersion(); store.incrementVersion();
while (it.hasNext()) { for (long logId = 0; logId < maxLogId; logId++) {
long[] k = it.next(); Object[] op = undoLog.get(new long[] {
if (k[0] != transactionId) { transactionId, logId });
break;
}
Object[] op = undoLog.get(k);
int mapId = ((Integer) op[1]).intValue(); int mapId = ((Integer) op[1]).intValue();
Map<String, String> meta = store.getMetaMap(); Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId); String m = meta.get("map." + mapId);
...@@ -408,30 +444,57 @@ public class TestTransactionMap extends TestBase { ...@@ -408,30 +444,57 @@ public class TestTransactionMap extends TestBase {
MVMap<Object, Object[]> map = store.openMap(mapName); MVMap<Object, Object[]> map = store.openMap(mapName);
Object key = op[2]; Object key = op[2];
Object[] value = map.get(key); Object[] value = map.get(key);
if (success) { if (value == null) {
if (value[2] == null) { // already removed
} else if (value[2] == null) {
// remove the value // remove the value
map.remove(key); map.remove(key);
} }
} else if (value == null) { undoLog.remove(logId);
// the value was deleted afterwards }
// TODO how can this happen? openTransactions.remove(transactionId);
} else { store.commit();
}
public void rollback(long transactionId, long maxLogId) {
rollbackTo(transactionId, maxLogId, 0);
openTransactions.remove(transactionId);
store.commit();
}
public void rollbackTo(long transactionId, long maxLogId, long toLogId) {
store.incrementVersion();
for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
Object[] op = undoLog.get(new long[] {
transactionId, logId });
int mapId = ((Integer) op[1]).intValue();
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
String mapName = DataUtils.parseMap(m).get("name");
MVMap<Object, Object[]> map = store.openMap(mapName);
Object key = op[2];
Object[] value = map.get(key);
if (value != null) {
Long oldVersion = (Long) value[1]; Long oldVersion = (Long) value[1];
if (oldVersion == null) { if (oldVersion == null) {
// this transaction added the value // this transaction added the value
map.remove(key); map.remove(key);
} else if (oldVersion < map.getCreateVersion()) {
map.remove(key);
} else { } else {
// this transaction updated the value // this transaction updated the value
MVMap<Object, Object[]> mapOld = map MVMap<Object, Object[]> mapOld = map
.openVersion(oldVersion); .openVersion(oldVersion);
Object[] old = mapOld.get(key); Object[] old = mapOld.get(key);
if (old == null) {
map.remove(key);
} else {
map.put(key, old); map.put(key, old);
} }
} }
undoLog.remove(k);
} }
openTransactions.remove(transactionId); undoLog.remove(logId);
}
store.commit(); store.commit();
} }
...@@ -451,6 +514,11 @@ public class TestTransactionMap extends TestBase { ...@@ -451,6 +514,11 @@ public class TestTransactionMap extends TestBase {
this.transactionId = transactionId; this.transactionId = transactionId;
} }
public long setSavepoint() {
store.store.incrementVersion();
return logId;
}
void log(long baseVersion, int mapId, Object key) { void log(long baseVersion, int mapId, Object key) {
long[] undoKey = { transactionId, logId++ }; long[] undoKey = { transactionId, logId++ };
Object[] log = new Object[] { baseVersion, mapId, key }; Object[] log = new Object[] { baseVersion, mapId, key };
...@@ -463,12 +531,17 @@ public class TestTransactionMap extends TestBase { ...@@ -463,12 +531,17 @@ public class TestTransactionMap extends TestBase {
void commit() { void commit() {
closed = true; closed = true;
store.endTransaction(true, transactionId); store.commit(transactionId, logId);
} }
void rollback() { void rollback() {
closed = true; closed = true;
store.endTransaction(false, transactionId); store.rollback(transactionId, logId);
}
public void rollbackTo(long logId) {
store.rollbackTo(transactionId, this.logId, logId);
this.logId = logId;
} }
void checkOpen() { void checkOpen() {
...@@ -567,12 +640,15 @@ public class TestTransactionMap extends TestBase { ...@@ -567,12 +640,15 @@ public class TestTransactionMap extends TestBase {
long tx = ((Long) current[0]).longValue(); long tx = ((Long) current[0]).longValue();
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
// added or updated by this transaction // added or updated by this transaction
// use the previous oldVersion
newValue[1] = current[1];
if (map.replace(key, current, newValue)) { if (map.replace(key, current, newValue)) {
// we already have a log entry, so don't log if (current[1] == null) {
// TODO does not work when using savepoints transaction.log(oldVersion, mapId, key);
// transaction.log(oldVersion, mapId, key); } else {
long c = (Long) current[1];
if (c != oldVersion) {
transaction.log(oldVersion, mapId, key);
}
}
return true; return true;
} }
// strange, somebody overwrite the value // strange, somebody overwrite the value
...@@ -598,38 +674,34 @@ public class TestTransactionMap extends TestBase { ...@@ -598,38 +674,34 @@ public class TestTransactionMap extends TestBase {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
V get(K key) { V get(K key) {
checkOpen(); checkOpen();
Object[] current = map.get(key); MVMap<K, Object[]> m = map;
while (true) {
Object[] data = m.get(key);
long tx; long tx;
if (current == null) { if (data == null) {
// doesn't exist or deleted by a committed transaction // doesn't exist or deleted by a committed transaction
return null; return null;
} }
tx = ((Long) current[0]).longValue(); tx = ((Long) data[0]).longValue();
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
// added by this transaction // added by this transaction
return (V) current[2]; return (V) data[2];
} }
// added or updated by another transaction // added or updated by another transaction
Long base = transaction.store.openTransactions.get(tx); Long base = transaction.store.openTransactions.get(tx);
if (base == null) { if (base == null) {
// it is committed // it is committed
return (V) current[2]; return (V) data[2];
} }
tx = ((Long) current[0]).longValue(); tx = ((Long) data[0]).longValue();
// get the value before the uncommitted transaction // get the value before the uncommitted transaction
if (current[1] == null) { if (data[1] == null) {
// a new entry // a new entry
return null; return null;
} }
long oldVersion = (Long) current[1]; long oldVersion = (Long) data[1];
MVMap<K, Object[]> oldMap = map.openVersion(oldVersion); m = map.openVersion(oldVersion);
Object[] old = oldMap.get(key);
if (old == null) {
// there was none
return null;
} }
// the previous committed value
return (V) old[2];
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论