提交 5b8df111 authored 作者: Andrei Tokar's avatar Andrei Tokar

Introduce last commited value into a VersionedValue

上级 c6c72d96
......@@ -49,7 +49,7 @@ final class CommitDecisionMaker extends MVMap.DecisionMaker<VersionedValue> {
public VersionedValue selectValue(VersionedValue existingValue, VersionedValue providedValue) {
assert decision == MVMap.Decision.PUT;
assert existingValue != null;
return new VersionedValue(0L, existingValue.value);
return VersionedValue.getInstance(existingValue.value);
}
@Override
......
......@@ -233,7 +233,7 @@ public class TransactionMap<K, V> {
*/
public V putCommitted(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null");
VersionedValue newValue = new VersionedValue(0L, value);
VersionedValue newValue = VersionedValue.getInstance(value);
VersionedValue oldValue = map.put(key, newValue);
@SuppressWarnings("unchecked")
V result = (V) (oldValue == null ? null : oldValue.value);
......@@ -448,42 +448,43 @@ public class TransactionMap<K, V> {
* @return the value
*/
VersionedValue getValue(Page root, Page undoRoot, K key, long maxLog,
VersionedValue data, BitSet committingTransactions) {
VersionedValue data, BitSet committingTransactions) {
// TODO: This method is overly complicated and has a bunch of extra parameters
// TODO: to support maxLog feature, which is not really used by H2
long id;
int tx;
while (true) {
if (data == null) {
// doesn't exist or deleted by a committed transaction
return null;
}
long id = data.operationId;
if (id == 0) {
// it is committed
return data;
}
int tx = TransactionStore.getTransactionId(id);
if (tx == transaction.transactionId) {
// added by this transaction
if (TransactionStore.getLogId(id) < maxLog) {
return data;
}
} else if (committingTransactions.get(tx)) {
// transaction which made a change is committed by now
return data;
}
// get the value before the uncommitted transaction
Object d[] = transaction.store.undoLog.get(undoRoot, id);
if (d == null) {
if (transaction.store.store.isReadOnly()) {
// uncommitted transaction for a read-only store
return null;
// If value doesn't exist or it was deleted by a committed transaction,
// or if value is a committed one, just return it.
if (data != null &&
(id = data.getOperationId()) != 0) {
if ((tx = TransactionStore.getTransactionId(id)) == transaction.transactionId) {
// current value comes from our transaction
if (TransactionStore.getLogId(id) >= maxLog) {
Object d[] = transaction.store.undoLog.get(undoRoot, id);
if (d == null) {
if (transaction.store.store.isReadOnly()) {
// uncommitted transaction for a read-only store
return null;
}
// this entry should be committed or rolled back
// in the meantime (the transaction might still be open)
// or it might be changed again in a different
// transaction (possibly one with the same id)
data = map.get(root, key);
} else {
data = (VersionedValue) d[2];
}
continue;
}
} else if (!committingTransactions.get(tx)) {
// current value comes from another uncommitted transaction
// take committed value instead
Object committedValue = data.getCommittedValue();
data = committedValue == null ? null : VersionedValue.getInstance(committedValue);
}
// this entry should be committed or rolled back
// in the meantime (the transaction might still be open)
// or it might be changed again in a different
// transaction (possibly one with the same id)
data = map.get(root, key);
} else {
data = (VersionedValue) d[2];
}
return data;
}
}
......
......@@ -46,7 +46,7 @@ public abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue
// We assume that we are looking at the final value for this transaction,
// and if it's not the case, then it will fail later,
// because a tree root has definitely been changed.
logIt(existingValue.value == null ? null : new VersionedValue(0L, existingValue.value));
logIt(existingValue.value == null ? null : VersionedValue.getInstance(existingValue.value));
decision = MVMap.Decision.PUT;
} else if(fetchTransaction(blockingId) == null) {
// condition above means transaction has been committed/rplled back and closed by now
......@@ -113,7 +113,8 @@ public abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue
@SuppressWarnings("unchecked")
@Override
public final VersionedValue selectValue(VersionedValue existingValue, VersionedValue providedValue) {
return new VersionedValue(undoKey, value);
return VersionedValue.getInstance(undoKey, value,
existingValue == null ? null : existingValue.getCommittedValue());
}
}
......@@ -171,7 +172,8 @@ public abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue
@SuppressWarnings("unchecked")
@Override
public VersionedValue selectValue(VersionedValue existingValue, VersionedValue providedValue) {
return new VersionedValue(undoKey, existingValue == null ? null : existingValue.value);
assert existingValue != null; // otherwise, what's there to lock?
return VersionedValue.getInstance(undoKey, existingValue.value, existingValue.getCommittedValue());
}
}
}
......@@ -5,44 +5,90 @@
*/
package org.h2.mvstore.tx;
import org.h2.engine.Constants;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.DataType;
import java.nio.ByteBuffer;
/**
* A versioned value (possibly null). It contains a pointer to the old
* value, and the value itself.
* A versioned value (possibly null).
* It contains current value and latest committed value if current one is uncommitted.
* Also for uncommitted values it contains operationId - a combination of
* transactionId and logId.
*/
public class VersionedValue {
public static final VersionedValue DUMMY = new VersionedValue(0L, new Object());
public static final VersionedValue DUMMY = new VersionedValue(new Object());
/**
* The operation id.
*/
final long operationId;
/**
* The value.
* The current value.
*/
public final Object value;
VersionedValue(long operationId, Object value) {
this.operationId = operationId;
static VersionedValue getInstance(Object value) {
assert value != null;
return new VersionedValue(value);
}
public static VersionedValue getInstance(long operationId, Object value, Object committedValue) {
return new Uncommitted(operationId, value, committedValue);
}
private VersionedValue(Object value) {
this.value = value;
}
public boolean isCommitted() {
return true;
}
public long getOperationId() {
return operationId;
return 0L;
}
public Object getCommittedValue() {
return value;
}
@Override
public String toString() {
return value + (operationId == 0 ? "" : (
" " +
TransactionStore.getTransactionId(operationId) + "/" +
TransactionStore.getLogId(operationId)));
return String.valueOf(value);
}
private static class Uncommitted extends VersionedValue
{
private final long operationId;
private final Object committedValue;
Uncommitted(long operationId, Object value, Object committedValue) {
super(value);
assert operationId != 0;
this.operationId = operationId;
this.committedValue = committedValue;
}
@Override
public boolean isCommitted() {
return false;
}
@Override
public long getOperationId() {
return operationId;
}
@Override
public Object getCommittedValue() {
return committedValue;
}
@Override
public String toString() {
return super.toString() +
" " + TransactionStore.getTransactionId(operationId) + "/" +
TransactionStore.getLogId(operationId) + " " + committedValue;
}
}
/**
......@@ -58,8 +104,18 @@ public class VersionedValue {
@Override
public int getMemory(Object obj) {
if(obj == null) return 0;
VersionedValue v = (VersionedValue) obj;
return valueType.getMemory(v.value) + 8;
int res = Constants.MEMORY_OBJECT + 8 + 2 * Constants.MEMORY_POINTER +
getValMemory(v.value);
if (v.getOperationId() != 0) {
res += getValMemory(v.getCommittedValue());
}
return res;
}
private int getValMemory(Object obj) {
return obj == null ? 0 : valueType.getMemory(obj);
}
@Override
......@@ -69,7 +125,7 @@ public class VersionedValue {
}
VersionedValue a = (VersionedValue) aObj;
VersionedValue b = (VersionedValue) bObj;
long comp = a.operationId - b.operationId;
long comp = a.getOperationId() - b.getOperationId();
if (comp == 0) {
return valueType.compare(a.value, b.value);
}
......@@ -81,7 +137,7 @@ public class VersionedValue {
if (buff.get() == 0) {
// fast path (no op ids or null entries)
for (int i = 0; i < len; i++) {
obj[i] = new VersionedValue(0L, valueType.read(buff));
obj[i] = new VersionedValue(valueType.read(buff));
}
} else {
// slow path (some entries may be null)
......@@ -94,13 +150,14 @@ public class VersionedValue {
@Override
public Object read(ByteBuffer buff) {
long operationId = DataUtils.readVarLong(buff);
Object value;
if (buff.get() == 1) {
value = valueType.read(buff);
if (operationId == 0) {
return new VersionedValue(valueType.read(buff));
} else {
value = null;
byte flags = buff.get();
Object value = (flags & 1) != 0 ? valueType.read(buff) : null;
Object committedValue = (flags & 2) != 0 ? valueType.read(buff) : null;
return new Uncommitted(operationId, value, committedValue);
}
return new VersionedValue(operationId, value);
}
@Override
......@@ -108,7 +165,7 @@ public class VersionedValue {
boolean fastPath = true;
for (int i = 0; i < len; i++) {
VersionedValue v = (VersionedValue) obj[i];
if (v.operationId != 0 || v.value == null) {
if (v.getOperationId() != 0 || v.value == null) {
fastPath = false;
}
}
......@@ -131,14 +188,21 @@ public class VersionedValue {
@Override
public void write(WriteBuffer buff, Object obj) {
VersionedValue v = (VersionedValue) obj;
buff.putVarLong(v.operationId);
if (v.value == null) {
buff.put((byte) 0);
} else {
buff.put((byte) 1);
long operationId = v.getOperationId();
buff.putVarLong(operationId);
if (operationId == 0) {
valueType.write(buff, v.value);
} else {
Object committedValue = v.getCommittedValue();
int flags = (v.value == null ? 0 : 1) | (committedValue == null ? 0 : 2);
buff.put((byte) flags);
if (v.value != null) {
valueType.write(buff, v.value);
}
if (committedValue != null) {
valueType.write(buff, committedValue);
}
}
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论