提交 8d5524fb authored 作者: andrei's avatar andrei

step toward atomic transaction commit

上级 95769d9d
...@@ -379,9 +379,19 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -379,9 +379,19 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the value, or null if not found * @return the value, or null if not found
*/ */
@Override @Override
public final V get(Object key) {
return get(getRootPage(), key);
}
/**
* Get the value for the given key from a snapshot, or null if not found.
*
* @param p the root of a snapshot
* @param key the key
* @return the value, or null if not found
*/
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public V get(Object key) { public V get(Page p, Object key) {
Page p = getRootPage();
return (V) Page.get(p, key); return (V) Page.get(p, key);
} }
......
...@@ -183,7 +183,7 @@ public abstract class Page implements Cloneable ...@@ -183,7 +183,7 @@ public abstract class Page implements Cloneable
* @param p the root page * @param p the root page
* @return the value, or null if not found * @return the value, or null if not found
*/ */
public static Object get(Page p, Object key) { static Object get(Page p, Object key) {
while (true) { while (true) {
int index = p.binarySearch(key); int index = p.binarySearch(key);
if (p.isLeaf()) { if (p.isLeaf()) {
......
...@@ -210,7 +210,7 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex { ...@@ -210,7 +210,7 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex {
} }
if (unique != null) { if (unique != null) {
// This code expects that mayHaveDuplicates(row) == false // This code expects that mayHaveDuplicates(row) == false
Iterator<Value> it = map.keyIterator(unique, true); Iterator<Value> it = map.keyIterator(unique, null, true);
while (it.hasNext()) { while (it.hasNext()) {
ValueArray k = (ValueArray) it.next(); ValueArray k = (ValueArray) it.next();
if (compareRows(row, convertToSearchRow(k)) != 0) { if (compareRows(row, convertToSearchRow(k)) != 0) {
......
...@@ -48,12 +48,6 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -48,12 +48,6 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
return new MVRTreeMap<>(this); return new MVRTreeMap<>(this);
} }
@Override
public V get(Object key) {
V result = get(getRootPage(), key);
return result;
}
/** /**
* Iterate over all keys that have an intersection with the given rectangle. * Iterate over all keys that have an intersection with the given rectangle.
* *
...@@ -102,6 +96,7 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -102,6 +96,7 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
* @return the value, or null if not found * @return the value, or null if not found
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override
public V get(Page p, Object key) { public V get(Page p, Object key) {
int keyCount = p.getKeyCount(); int keyCount = p.getKeyCount();
if (!p.isLeaf()) { if (!p.isLeaf()) {
......
...@@ -8,7 +8,9 @@ package org.h2.mvstore.tx; ...@@ -8,7 +8,9 @@ package org.h2.mvstore.tx;
import org.h2.mvstore.Cursor; import org.h2.mvstore.Cursor;
import org.h2.mvstore.DataUtils; import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.Page;
import org.h2.mvstore.type.DataType; import org.h2.mvstore.type.DataType;
import java.util.BitSet;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.Iterator; import java.util.Iterator;
...@@ -96,14 +98,24 @@ public class TransactionMap<K, V> { ...@@ -96,14 +98,24 @@ public class TransactionMap<K, V> {
* @return the size * @return the size
*/ */
public long sizeAsLong() { public long sizeAsLong() {
transaction.store.rwLock.readLock().lock(); TransactionStore store = transaction.store;
store.rwLock.readLock().lock();
try { try {
long sizeRaw = map.sizeAsLong();
MVMap<Long, Object[]> undo = transaction.store.undoLog; MVMap<Long, Object[]> undo = transaction.store.undoLog;
long undoLogSize;
synchronized (undo) { BitSet committingTransactions;
undoLogSize = undo.sizeAsLong(); MVMap.RootReference mapRootReference;
} MVMap.RootReference undoLogRootReference;
do {
committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot();
undoLogRootReference = store.undoLog.getRoot();
} while(committingTransactions != store.committingTransactions.get());
Page undoRootPage = undoLogRootReference.root;
long undoLogSize = undoRootPage.getTotalCount();
Page mapRootPage = mapRootReference.root;
long sizeRaw = mapRootPage.getTotalCount();
if (undoLogSize == 0) { if (undoLogSize == 0) {
return sizeRaw; return sizeRaw;
} }
...@@ -116,7 +128,7 @@ public class TransactionMap<K, V> { ...@@ -116,7 +128,7 @@ public class TransactionMap<K, V> {
K key = cursor.next(); K key = cursor.next();
// cursor.getValue() returns outdated value // cursor.getValue() returns outdated value
VersionedValue data = map.get(key); VersionedValue data = map.get(key);
data = getValue(key, readLogId, data); data = getValue(mapRootPage, undoRootPage, key, readLogId, data, committingTransactions);
if (data != null && data.value != null) { if (data != null && data.value != null) {
size++; size++;
} }
...@@ -256,11 +268,24 @@ public class TransactionMap<K, V> { ...@@ -256,11 +268,24 @@ public class TransactionMap<K, V> {
* update * update
*/ */
public boolean trySet(K key, V value, boolean onlyIfUnchanged) { public boolean trySet(K key, V value, boolean onlyIfUnchanged) {
VersionedValue current = map.get(key); VersionedValue current;
if (onlyIfUnchanged) { if (onlyIfUnchanged) {
VersionedValue old = getValue(key, readLogId); TransactionStore store = transaction.store;
BitSet committingTransactions;
MVMap.RootReference mapRootReference;
MVMap.RootReference undoLogRootReference;
do {
committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot();
undoLogRootReference = store.undoLog.getRoot();
} while(committingTransactions != store.committingTransactions.get());
Page mapRootPage = mapRootReference.root;
current = map.get(mapRootPage, key);
VersionedValue old = getValue(mapRootPage, undoLogRootReference.root, key, readLogId, current, committingTransactions);
if (!map.areValuesEqual(old, current)) { if (!map.areValuesEqual(old, current)) {
long tx = TransactionStore.getTransactionId(current.operationId); assert current != null;
long tx = TransactionStore.getTransactionId(current.getOperationId());
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
if (value == null) { if (value == null) {
// ignore removing an entry // ignore removing an entry
...@@ -277,7 +302,10 @@ public class TransactionMap<K, V> { ...@@ -277,7 +302,10 @@ public class TransactionMap<K, V> {
return false; return false;
} }
} }
} else {
current = map.get(key);
} }
VersionedValue newValue = new VersionedValue( VersionedValue newValue = new VersionedValue(
TransactionStore.getOperationId(transaction.transactionId, transaction.getLogId()), TransactionStore.getOperationId(transaction.transactionId, transaction.getLogId()),
value); value);
...@@ -341,7 +369,7 @@ public class TransactionMap<K, V> { ...@@ -341,7 +369,7 @@ public class TransactionMap<K, V> {
} }
/** /**
* Get the value for the given key. * Get the effective value for the given key.
* *
* @param key the key * @param key the key
* @param maxLogId the maximum log id * @param maxLogId the maximum log id
...@@ -366,29 +394,47 @@ public class TransactionMap<K, V> { ...@@ -366,29 +394,47 @@ public class TransactionMap<K, V> {
// doesn't exist or deleted by a committed transaction // doesn't exist or deleted by a committed transaction
return false; return false;
} }
int tx = TransactionStore.getTransactionId(data.operationId); int tx = TransactionStore.getTransactionId(data.getOperationId());
return tx == transaction.transactionId; return tx == transaction.transactionId;
} }
private VersionedValue getValue(K key, long maxLog) { private VersionedValue getValue(K key, long maxLog) {
transaction.store.rwLock.readLock().lock(); TransactionStore store = transaction.store;
store.rwLock.readLock().lock();
try { try {
VersionedValue data = map.get(key); BitSet committingTransactions;
return getValue(key, maxLog, data); MVMap.RootReference mapRootReference;
MVMap.RootReference undoLogRootReference;
do {
committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot();
undoLogRootReference = store.undoLog.getRoot();
} while(committingTransactions != store.committingTransactions.get());
Page undoRootPage = undoLogRootReference.root;
Page mapRootPage = mapRootReference.root;
VersionedValue data = map.get(mapRootPage, key);
return getValue(mapRootPage, undoRootPage, key, maxLog, data, store.committingTransactions.get());
} finally { } finally {
transaction.store.rwLock.readLock().unlock(); store.rwLock.readLock().unlock();
} }
} }
/** /**
* Get the versioned value for the given key. * Get the versioned value from the raw versioned value (possibly uncommitted),
* as visible by the current transaction.
* *
* @param root Page of the map to get value from at the time when snapshot was taken
* @param undoRoot Page of the undoLog map at the time when snapshot was taken
* @param key the key * @param key the key
* @param maxLog the maximum log id of the entry * @param maxLog the maximum log id of the entry
* @param data the value stored in the main map * @param data the value stored in the main map
* @param committingTransactions set of transactions being committed
* at the time when snapshot was taken
* @return the value * @return the value
*/ */
VersionedValue getValue(K key, long maxLog, VersionedValue data) { private VersionedValue getValue(Page root, Page undoRoot, K key, long maxLog,
VersionedValue data, BitSet committingTransactions) {
while (true) { while (true) {
if (data == null) { if (data == null) {
// doesn't exist or deleted by a committed transaction // doesn't exist or deleted by a committed transaction
...@@ -405,10 +451,12 @@ public class TransactionMap<K, V> { ...@@ -405,10 +451,12 @@ public class TransactionMap<K, V> {
if (TransactionStore.getLogId(id) < maxLog) { if (TransactionStore.getLogId(id) < maxLog) {
return data; return data;
} }
} else if (committingTransactions.get(tx)) {
// transaction which made a change is committed by now
return data;
} }
// get the value before the uncommitted transaction // get the value before the uncommitted transaction
Object[] d; Object d[] = transaction.store.undoLog.get(undoRoot, id);
d = transaction.store.undoLog.get(id);
if (d == null) { if (d == null) {
if (transaction.store.store.isReadOnly()) { if (transaction.store.store.isReadOnly()) {
// uncommitted transaction for a read-only store // uncommitted transaction for a read-only store
...@@ -418,7 +466,7 @@ public class TransactionMap<K, V> { ...@@ -418,7 +466,7 @@ public class TransactionMap<K, V> {
// in the meantime (the transaction might still be open) // in the meantime (the transaction might still be open)
// or it might be changed again in a different // or it might be changed again in a different
// transaction (possibly one with the same id) // transaction (possibly one with the same id)
data = map.get(key); data = map.get(root, key);
} else { } else {
data = (VersionedValue) d[2]; data = (VersionedValue) d[2];
} }
...@@ -545,79 +593,20 @@ public class TransactionMap<K, V> { ...@@ -545,79 +593,20 @@ public class TransactionMap<K, V> {
* @return the iterator * @return the iterator
*/ */
public Iterator<K> keyIterator(K from) { public Iterator<K> keyIterator(K from) {
return keyIterator(from, false); return keyIterator(from, null, false);
} }
/** /**
* Iterate over keys. * Iterate over keys.
* *
* @param from the first key to return * @param from the first key to return
* @param to the last key to return or null if there is no limit
* @param includeUncommitted whether uncommitted entries should be * @param includeUncommitted whether uncommitted entries should be
* included * included
* @return the iterator * @return the iterator
*/ */
public Iterator<K> keyIterator(final K from, final boolean includeUncommitted) { public Iterator<K> keyIterator(K from, K to, boolean includeUncommitted) {
return new Iterator<K>() { return new KeyIterator<>(this, from, to, includeUncommitted);
private K currentKey = from;
private Cursor<K, VersionedValue> cursor = map.cursor(currentKey);
{
fetchNext();
}
private void fetchNext() {
while (cursor.hasNext()) {
K k;
try {
k = cursor.next();
} catch (IllegalStateException e) {
// TODO this is a bit ugly
if (DataUtils.getErrorCode(e.getMessage()) ==
DataUtils.ERROR_CHUNK_NOT_FOUND) {
cursor = map.cursor(currentKey);
// we (should) get the current key again,
// we need to ignore that one
if (!cursor.hasNext()) {
break;
}
cursor.next();
if (!cursor.hasNext()) {
break;
}
k = cursor.next();
} else {
throw e;
}
}
currentKey = k;
if (includeUncommitted) {
return;
}
if (containsKey(k)) {
return;
}
}
currentKey = null;
}
@Override
public boolean hasNext() {
return currentKey != null;
}
@Override
public K next() {
K result = currentKey;
fetchNext();
return result;
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException(
"Removing is not supported");
}
};
} }
/** /**
...@@ -628,82 +617,7 @@ public class TransactionMap<K, V> { ...@@ -628,82 +617,7 @@ public class TransactionMap<K, V> {
* @return the iterator * @return the iterator
*/ */
public Iterator<Map.Entry<K, V>> entryIterator(final K from, final K to) { public Iterator<Map.Entry<K, V>> entryIterator(final K from, final K to) {
return new Iterator<Map.Entry<K, V>>() { return new EntryIterator<>(this, from, to);
private Map.Entry<K, V> current;
private K currentKey = from;
private Cursor<K, VersionedValue> cursor = map.cursor(currentKey);
{
fetchNext();
}
private void fetchNext() {
while (cursor.hasNext()) {
transaction.store.rwLock.readLock().lock();
try {
K k;
try {
k = cursor.next();
} catch (IllegalStateException e) {
// TODO this is a bit ugly
if (DataUtils.getErrorCode(e.getMessage()) ==
DataUtils.ERROR_CHUNK_NOT_FOUND) {
cursor = map.cursor(currentKey);
// we (should) get the current key again,
// we need to ignore that one
if (!cursor.hasNext()) {
break;
}
cursor.next();
if (!cursor.hasNext()) {
break;
}
k = cursor.next();
} else {
throw e;
}
}
final K key = k;
if (to != null && map.getKeyType().compare(k, to) > 0) {
break;
}
// cursor.getValue() returns outdated value
VersionedValue data = map.get(key);
data = getValue(key, readLogId, data);
if (data != null && data.value != null) {
@SuppressWarnings("unchecked")
final V value = (V) data.value;
current = new AbstractMap.SimpleImmutableEntry<>(key, value);
currentKey = key;
return;
}
} finally {
transaction.store.rwLock.readLock().unlock();
}
}
current = null;
currentKey = null;
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public Map.Entry<K, V> next() {
Map.Entry<K, V> result = current;
fetchNext();
return result;
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException(
"Removing is not supported");
}
};
} }
/** /**
...@@ -765,4 +679,105 @@ public class TransactionMap<K, V> { ...@@ -765,4 +679,105 @@ public class TransactionMap<K, V> {
return map.getKeyType(); return map.getKeyType();
} }
private static final class KeyIterator<K> extends TMIterator<K,K> {
public KeyIterator(TransactionMap<K, ?> transactionMap,
K from, K to, boolean includeUncommitted) {
super(transactionMap, from, to, includeUncommitted);
}
@Override
protected K registerCurrent(K key, VersionedValue data) {
return key;
}
}
private static final class EntryIterator<K,V> extends TMIterator<K,Map.Entry<K,V>> {
public EntryIterator(TransactionMap<K, ?> transactionMap, K from, K to) {
super(transactionMap, from, to, false);
}
@Override
@SuppressWarnings("unchecked")
protected Map.Entry<K, V> registerCurrent(K key, VersionedValue data) {
return new AbstractMap.SimpleImmutableEntry<>(key, (V) data.value);
}
}
private abstract static class TMIterator<K,X> implements Iterator<X> {
private final TransactionMap<K,?> transactionMap;
private final BitSet committingTransactions;
private final Cursor<K,VersionedValue> cursor;
private final Page root;
private final Page undoRoot;
private final boolean includeAllUncommitted;
private X current;
protected TMIterator(TransactionMap<K,?> transactionMap, K from, K to, boolean includeAllUncommitted)
{
this.transactionMap = transactionMap;
TransactionStore store = transactionMap.transaction.store;
MVMap<K, VersionedValue> map = transactionMap.map;
BitSet committingTransactions;
MVMap.RootReference mapRootReference;
Page undoRoot;
do {
committingTransactions = store.committingTransactions.get();
undoRoot = store.undoLog.getRootPage();
mapRootReference = map.getRoot();
} while(committingTransactions != store.committingTransactions.get() || undoRoot != store.undoLog.getRootPage());
this.root = mapRootReference.root;
this.undoRoot = undoRoot;
this.cursor = new Cursor<>(mapRootReference.root, from, to);
this.includeAllUncommitted = includeAllUncommitted;
this.committingTransactions = committingTransactions;
}
protected abstract X registerCurrent(K key, VersionedValue data);
private void fetchNext() {
while (cursor.hasNext()) {
K key = cursor.next();
VersionedValue data = cursor.getValue();
if (!includeAllUncommitted) {
data = transactionMap.getValue(root, undoRoot, key, transactionMap.readLogId,
data, committingTransactions);
}
if (data != null && (data.value != null ||
includeAllUncommitted && transactionMap.transaction.transactionId !=
TransactionStore.getTransactionId(data.getOperationId()))) {
current = registerCurrent(key, data);
return;
}
}
current = null;
}
@Override
public final boolean hasNext() {
if(current == null) {
fetchNext();
}
return current != null;
}
@Override
public final X next() {
if(!hasNext()) {
return null;
}
X result = current;
current = null;
return result;
}
@Override
public final void remove() {
throw DataUtils.newUnsupportedOperationException(
"Removing is not supported");
}
}
} }
...@@ -374,7 +374,7 @@ public class TransactionStore { ...@@ -374,7 +374,7 @@ public class TransactionStore {
* @param key the key * @param key the key
* @param oldValue the old value * @param oldValue the old value
*/ */
void log(Transaction t, long logId, int mapId, long log(Transaction t, long logId, int mapId,
Object key, Object oldValue) { Object key, Object oldValue) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(t.getId(), logId);
Object[] log = { mapId, key, oldValue }; Object[] log = { mapId, key, oldValue };
...@@ -393,6 +393,7 @@ public class TransactionStore { ...@@ -393,6 +393,7 @@ public class TransactionStore {
} finally { } finally {
rwLock.writeLock().unlock(); rwLock.writeLock().unlock();
} }
return undoKey;
} }
/** /**
......
...@@ -31,6 +31,10 @@ public class VersionedValue { ...@@ -31,6 +31,10 @@ public class VersionedValue {
this.value = value; this.value = value;
} }
public long getOperationId() {
return operationId;
}
@Override @Override
public String toString() { public String toString() {
return value + (operationId == 0 ? "" : ( return value + (operationId == 0 ? "" : (
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论