提交 c08eb661 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: support statements (updates should not need to first remove, and then re-add all entries)

上级 6ff5798f
......@@ -583,14 +583,21 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
public synchronized boolean remove(Object key, Object value) {
V old = get(key);
if (equalsValue(old, value)) {
if (areValuesEqual(old, value)) {
remove(key);
return true;
}
return false;
}
private boolean equalsValue(Object a, Object b) {
/**
* Check whether the two values are equal.
*
* @param a the first value
* @param b the second value
* @return true if they are equal
*/
public boolean areValuesEqual(Object a, Object b) {
if (a == b) {
return true;
} else if (a == null || b == null) {
......@@ -609,7 +616,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
public synchronized boolean replace(K key, V oldValue, V newValue) {
V old = get(key);
if (equalsValue(old, oldValue)) {
if (areValuesEqual(old, oldValue)) {
put(key, newValue);
return true;
}
......
......@@ -391,8 +391,6 @@ public class TransactionStore {
*/
public <K, V> TransactionMap<K, V> openMap(String name, long readVersion) {
checkOpen();
// TODO read from a stable version of the data within
// one 'statement'
return new TransactionMap<K, V>(this, name, readVersion);
}
......@@ -463,13 +461,18 @@ public class TransactionStore {
/**
* The map used for writing (the latest version).
* <p>
* Key: key the key of the data.
* Value: { transactionId, oldVersion, value }
*/
private final MVMap<K, Object[]> mapWrite;
/**
* The map used for reading (possibly an older version).
* The map used for reading (possibly an older version). Reading is done
* on an older version so that changes are not immediately visible, to
* support statement processing (for example
* "update test set id = id + 1").
* <p>
* Key: key the key of the data.
* Value: { transactionId, oldVersion, value }
*/
......@@ -511,8 +514,8 @@ public class TransactionStore {
/**
* Remove an entry.
* <p>
* If the row is locked, this method
* will retry until the row could be updated or until a lock timeout.
* If the row is locked, this method will retry until the row could be
* updated or until a lock timeout.
*
* @param key the key
* @throws IllegalStateException if a lock timeout occurs
......@@ -524,11 +527,11 @@ public class TransactionStore {
/**
* Update the value for the given key.
* <p>
* If the row is locked, this method
* will retry until the row could be updated or until a lock timeout.
* If the row is locked, this method will retry until the row could be
* updated or until a lock timeout.
*
* @param key the key
* @param value the new value (null to remove the row)
* @param value the new value (not null)
* @throws IllegalStateException if a lock timeout occurs
*/
public V put(K key, V value) {
......@@ -536,12 +539,12 @@ public class TransactionStore {
return set(key, value);
}
public V set(K key, V value) {
private V set(K key, V value) {
checkOpen();
long start = 0;
while (true) {
V old = get(key);
boolean ok = trySet(key, value);
boolean ok = trySet(key, value, false);
if (ok) {
return old;
}
......@@ -578,7 +581,7 @@ public class TransactionStore {
* @return whether the entry could be removed
*/
public boolean tryRemove(K key) {
return trySet(key, null);
return trySet(key, null, false);
}
/**
......@@ -593,11 +596,29 @@ public class TransactionStore {
*/
public boolean tryPut(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null");
return trySet(key, value);
return trySet(key, value, false);
}
private boolean trySet(K key, V value) {
/**
* Try to set or remove the value. When updating only unchanged entries,
* then the value is only changed if it was not changed after opening
* the map.
*
* @param key the key
* @param value the new value (null to remove the value)
* @param onlyIfUnchanged only set the value if it was not changed (by
* this or another transaction) since the map was opened
* @return true if the value was set
*/
public boolean trySet(K key, V value, boolean onlyIfUnchanged) {
MVMap<K, Object[]> m = mapRead;
Object[] current = mapWrite.get(key);
if (onlyIfUnchanged) {
Object[] old = m.get(key);
if (!mapWrite.areValuesEqual(old, current)) {
return false;
}
}
long oldVersion = transaction.store.store.getCurrentVersion() - 1;
int opType;
if (current == null || current[2] == null) {
......@@ -625,7 +646,7 @@ public class TransactionStore {
}
return false;
}
long tx = ((Long) current[0]).longValue();
long tx = (Long) current[0];
if (tx == transaction.transactionId) {
// added or updated by this transaction
if (mapWrite.replace(key, current, newValue)) {
......@@ -659,6 +680,26 @@ public class TransactionStore {
return false;
}
/**
* Get the value for the given key at the time when this map was opened.
*
* @param key the key
* @return the value or null
*/
public V get(K key) {
return get(key, mapRead);
}
/**
* Get the most recent value for the given key.
*
* @param key the key
* @return the value or null
*/
public V getLatest(K key) {
return get(key, mapWrite);
}
/**
* Get the value for the given key.
*
......@@ -666,10 +707,8 @@ public class TransactionStore {
* @return the value or null
*/
@SuppressWarnings("unchecked")
public
V get(K key) {
public V get(K key, MVMap<K, Object[]> m) {
checkOpen();
MVMap<K, Object[]> m = mapRead;
while (true) {
Object[] data = m.get(key);
long tx;
......@@ -677,7 +716,7 @@ public class TransactionStore {
// doesn't exist or deleted by a committed transaction
return null;
}
tx = ((Long) data[0]).longValue();
tx = (Long) data[0];
if (tx == transaction.transactionId) {
// added by this transaction
return (V) data[2];
......@@ -688,7 +727,7 @@ public class TransactionStore {
// it is committed
return (V) data[2];
}
tx = ((Long) data[0]).longValue();
tx = (Long) data[0];
// get the value before the uncommitted transaction
if (data[1] == null) {
// a new entry
......@@ -698,7 +737,6 @@ public class TransactionStore {
m = mapWrite.openVersion(oldVersion);
}
}
}
}
......
......@@ -38,7 +38,7 @@ public class TestTransactionStore extends TestBase {
}
public void test() throws Exception {
// testMultiStatement();
testMultiStatement();
testTwoPhaseCommit();
testSavepoint();
testConcurrentTransactionsReadCommitted();
......@@ -51,59 +51,77 @@ public class TestTransactionStore extends TestBase {
* uses a savepoint. Within a statement, a change by the statement itself is
* not seen; the change is only seen when the statement finished.
*/
// private void testMultiStatement() {
// MVStore s = MVStore.open(null);
// TransactionStore ts = new TransactionStore(s);
// Transaction tx;
// TransactionMap<String, String> m;
// long startUpdate;
//
// tx = ts.begin();
//
// // start of statement
// // insert into test(id, name) values(1, 'Hello'), (2, 'World')
// startUpdate = tx.setSavepoint();
// m = tx.openMap("test", startUpdate);
// TODO putIfAbsent
// m.put("1", "Hello");
// m.put("2", "World");
// // not seen yet
// assertNull(m.get("1"));
//
// // start of statement
// startUpdate = tx.setSavepoint();
// // update test set primaryKey = primaryKey + 1
// m = tx.openMap("test", startUpdate);
//
// for (Cursor)
//
// tx.commit();
//
//
//
// m.put("2", "World");
// m.put("1", "Hallo");
// m.remove("2");
// m.put("3", "!");
// long logId = tx.setSavepoint();
// m.put("1", "Hi");
// m.put("2", ".");
// m.remove("3");
// tx.rollbackToSavepoint(logId);
// assertEquals("Hallo", m.get("1"));
// assertNull(m.get("2"));
// assertEquals("!", m.get("3"));
// tx.rollback();
//
// tx = ts.begin();
// m = tx.openMap("test");
// assertNull(m.get("1"));
// assertNull(m.get("2"));
// assertNull(m.get("3"));
//
// ts.close();
// s.close();
// }
private void testMultiStatement() {
MVStore s = MVStore.open(null);
TransactionStore ts = new TransactionStore(s);
Transaction tx;
TransactionMap<String, String> m;
long startUpdate;
long version;
tx = ts.begin();
// TODO support and test rollback of table creation / removal
// start of statement
// create table test
startUpdate = tx.setSavepoint();
tx.openMap("test");
// start of statement
// insert into test(id, name) values(1, 'Hello'), (2, 'World')
startUpdate = tx.setSavepoint();
version = s.getCurrentVersion();
m = tx.openMap("test", version);
assertTrue(m.trySet("1", "Hello", true));
assertTrue(m.trySet("2", "World", true));
// not seen yet (within the same statement)
assertNull(m.get("1"));
assertNull(m.get("2"));
// start of statement
startUpdate = tx.setSavepoint();
version = s.getCurrentVersion();
// now we see the newest version
m = tx.openMap("test", version);
assertEquals("Hello", m.get("1"));
assertEquals("World", m.get("2"));
// update test set primaryKey = primaryKey + 1
// (this is usually a tricky cases)
assertEquals("Hello", m.get("1"));
assertTrue(m.trySet("1", null, true));
assertTrue(m.trySet("2", "Hello", true));
// already updated by this statement, so it has no effect
// but still returns true because it was changed by this transaction
assertEquals("World", m.get("2"));
assertTrue(m.trySet("2", null, true));
assertTrue(m.trySet("3", "World", true));
// not seen within this statement
assertEquals("Hello", m.get("1"));
assertEquals("World", m.get("2"));
assertNull(m.get("3"));
// start of statement
startUpdate = tx.setSavepoint();
version = s.getCurrentVersion();
m = tx.openMap("test", version);
// select * from test
assertNull(m.get("1"));
assertEquals("Hello", m.get("2"));
assertEquals("World", m.get("3"));
// start of statement
startUpdate = tx.setSavepoint();
version = s.getCurrentVersion();
m = tx.openMap("test", version);
// update test set id = 1
// TODO should fail: duplicate key
tx.commit();
ts.close();
s.close();
}
private void testTwoPhaseCommit() throws Exception {
String fileName = getBaseDir() + "/testTwoPhaseCommit.h3";
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论