提交 904fe287 authored 作者: andrei's avatar andrei

code review comments addressed

上级 00dbc15d
...@@ -818,6 +818,10 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -818,6 +818,10 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
} }
} }
} }
// Because cache may have captured query result (in Query.lastResult),
// which is based on data from uncommitted transaction.,
// It is not valid after rollback, therefore cache has to be cleared.
if(queryCache != null) { if(queryCache != null) {
queryCache.clear(); queryCache.clear();
} }
...@@ -1788,19 +1792,17 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -1788,19 +1792,17 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
private static Row getRowFromVersionedValue(MVTable table, long recKey, private static Row getRowFromVersionedValue(MVTable table, long recKey,
VersionedValue versionedValue) { VersionedValue versionedValue) {
Object value = versionedValue == null ? null : versionedValue.value; Object value = versionedValue == null ? null : versionedValue.value;
Row result = null; if (value == null) {
if (value != null) { return null;
Row result11; }
Row result;
if(value instanceof Row) { if(value instanceof Row) {
result11 = (Row) value; result = (Row) value;
assert result11.getKey() == recKey assert result.getKey() == recKey : result.getKey() + " != " + recKey;
: result11.getKey() + " != " + recKey;
} else { } else {
ValueArray array = (ValueArray) value; ValueArray array = (ValueArray) value;
result11 = table.createRow(array.getList(), 0); result = table.createRow(array.getList(), 0);
result11.setKey(recKey); result.setKey(recKey);
}
result = result11;
} }
return result; return result;
} }
......
...@@ -98,7 +98,7 @@ public class Transaction { ...@@ -98,7 +98,7 @@ public class Transaction {
public final long sequenceNum; public final long sequenceNum;
/* /*
* Transation state is an atomic composite field: * Transaction state is an atomic composite field:
* bit 45 : flag whether transaction had rollback(s) * bit 45 : flag whether transaction had rollback(s)
* bits 44-41 : status * bits 44-41 : status
* bits 40 : overflow control bit, 1 indicates overflow * bits 40 : overflow control bit, 1 indicates overflow
......
...@@ -63,8 +63,20 @@ public class TransactionStore { ...@@ -63,8 +63,20 @@ public class TransactionStore {
private final DataType dataType; private final DataType dataType;
/**
* This BitSet is used as vacancy indicator for transaction slots in transactions[].
* It provides easy way to find first unoccupied slot, and also allows for copy-on-write
* non-blocking updates.
*/
final AtomicReference<VersionedBitSet> openTransactions = new AtomicReference<>(new VersionedBitSet()); final AtomicReference<VersionedBitSet> openTransactions = new AtomicReference<>(new VersionedBitSet());
/**
* This is intended to be the source of ultimate truth about transaction being committed.
* Once bit is set, corresponding transaction is logically committed,
* although it might be plenty of "uncommitted" entries in various maps
* and undo record are still around.
* Nevertheless, all of those should be considered by other transactions as committed.
*/
final AtomicReference<BitSet> committingTransactions = new AtomicReference<>(new BitSet()); final AtomicReference<BitSet> committingTransactions = new AtomicReference<>(new BitSet());
private boolean init; private boolean init;
...@@ -78,6 +90,7 @@ public class TransactionStore { ...@@ -78,6 +90,7 @@ public class TransactionStore {
/** /**
* Array holding all open transaction objects. * Array holding all open transaction objects.
* Position in array is "transaction id". * Position in array is "transaction id".
* VolatileReferenceArray would do the job here, but there is no such thing in Java yet
*/ */
private final AtomicReferenceArray<Transaction> transactions = new AtomicReferenceArray<>(MAX_OPEN_TRANSACTIONS); private final AtomicReferenceArray<Transaction> transactions = new AtomicReferenceArray<>(MAX_OPEN_TRANSACTIONS);
...@@ -91,7 +104,7 @@ public class TransactionStore { ...@@ -91,7 +104,7 @@ public class TransactionStore {
* Hard limit on the number of concurrently opened transactions * Hard limit on the number of concurrently opened transactions
*/ */
// TODO: introduce constructor parameter instead of a static field, driven by URL parameter // TODO: introduce constructor parameter instead of a static field, driven by URL parameter
private static final int MAX_OPEN_TRANSACTIONS = 0x100; private static final int MAX_OPEN_TRANSACTIONS = 0x400;
...@@ -195,7 +208,7 @@ public class TransactionStore { ...@@ -195,7 +208,7 @@ public class TransactionStore {
*/ */
public void setMaxTransactionId(int max) { public void setMaxTransactionId(int max) {
DataUtils.checkArgument(max <= MAX_OPEN_TRANSACTIONS, DataUtils.checkArgument(max <= MAX_OPEN_TRANSACTIONS,
"Concurrent transactions limit is too hight: {0}", max); "Concurrent transactions limit is too high: {0}", max);
this.maxTransactionId = max; this.maxTransactionId = max;
} }
...@@ -322,7 +335,7 @@ public class TransactionStore { ...@@ -322,7 +335,7 @@ public class TransactionStore {
"There are {0} open transactions", "There are {0} open transactions",
transactionId - 1); transactionId - 1);
} }
VersionedBitSet clone = original.cloneIt(); VersionedBitSet clone = original.clone();
clone.set(transactionId); clone.set(transactionId);
sequenceNo = clone.getVersion() + 1; sequenceNo = clone.getVersion() + 1;
clone.setVersion(sequenceNo); clone.setVersion(sequenceNo);
...@@ -331,8 +344,8 @@ public class TransactionStore { ...@@ -331,8 +344,8 @@ public class TransactionStore {
Transaction transaction = new Transaction(this, transactionId, sequenceNo, status, name, logId, listener); Transaction transaction = new Transaction(this, transactionId, sequenceNo, status, name, logId, listener);
success = transactions.compareAndSet(transactionId, null, transaction); assert transactions.get(transactionId) == null;
assert success; transactions.set(transactionId, transaction);
return transaction; return transaction;
} }
...@@ -420,7 +433,9 @@ public class TransactionStore { ...@@ -420,7 +433,9 @@ public class TransactionStore {
* *
* @param t the transaction * @param t the transaction
* @param maxLogId the last log id * @param maxLogId the last log id
* @param hasChanges false for R/O tx * @param hasChanges true if there were updates within specified
* transaction (even fully rolled back),
* false if just data access
*/ */
void commit(Transaction t, long maxLogId, boolean hasChanges) { void commit(Transaction t, long maxLogId, boolean hasChanges) {
if (store.isClosed()) { if (store.isClosed()) {
...@@ -571,12 +586,14 @@ public class TransactionStore { ...@@ -571,12 +586,14 @@ public class TransactionStore {
int txId = t.transactionId; int txId = t.transactionId;
t.setStatus(Transaction.STATUS_CLOSED); t.setStatus(Transaction.STATUS_CLOSED);
boolean success = transactions.compareAndSet(txId, t, null); assert transactions.get(txId) == t : transactions.get(txId) + " != " + t;
assert success; transactions.set(txId, null);
boolean success;
do { do {
VersionedBitSet original = openTransactions.get(); VersionedBitSet original = openTransactions.get();
assert original.get(txId); assert original.get(txId);
VersionedBitSet clone = original.cloneIt(); VersionedBitSet clone = original.clone();
clone.clear(txId); clone.clear(txId);
success = openTransactions.compareAndSet(original, clone); success = openTransactions.compareAndSet(original, clone);
} while(!success); } while(!success);
...@@ -755,16 +772,31 @@ public class TransactionStore { ...@@ -755,16 +772,31 @@ public class TransactionStore {
} }
} }
/**
* This listener can be registered with the transaction to be notified of
* every compensating change during transaction rollback.
* Normally this is not required, if no external resources were modified,
* because state of all transactional maps will be restored automatically.
* Only state of external resources, possibly modified by triggers
* need to be restored.
*/
public interface RollbackListener { public interface RollbackListener {
RollbackListener NONE = new RollbackListener() { RollbackListener NONE = new RollbackListener() {
@Override @Override
public void onRollback(MVMap<Object, VersionedValue> map, Object key, public void onRollback(MVMap<Object, VersionedValue> map, Object key,
VersionedValue existingValue, VersionedValue restoredValue) { VersionedValue existingValue, VersionedValue restoredValue) {
// do nothing
} }
}; };
/**
* Notified of a single map change (add/update/remove)
* @param map modified
* @param key of the modified entry
* @param existingValue value in the map (null if delete is rolled back)
* @param restoredValue value to be restored (null if add is rolled back)
*/
void onRollback(MVMap<Object,VersionedValue> map, Object key, void onRollback(MVMap<Object,VersionedValue> map, Object key,
VersionedValue existingValue, VersionedValue restoredValue); VersionedValue existingValue, VersionedValue restoredValue);
} }
......
...@@ -10,8 +10,6 @@ import java.util.BitSet; ...@@ -10,8 +10,6 @@ import java.util.BitSet;
/** /**
* Class VersionedBitSet extends standard BitSet to add a version field. * Class VersionedBitSet extends standard BitSet to add a version field.
* This will allow bit set and version to be changed atomically. * This will allow bit set and version to be changed atomically.
*
* @author <a href='mailto:andrei.tokar@gmail.com'>Andrei Tokar</a>
*/ */
final class VersionedBitSet extends BitSet final class VersionedBitSet extends BitSet
{ {
...@@ -27,15 +25,8 @@ final class VersionedBitSet extends BitSet ...@@ -27,15 +25,8 @@ final class VersionedBitSet extends BitSet
this.version = version; this.version = version;
} }
public VersionedBitSet cloneIt() {
VersionedBitSet res = (VersionedBitSet) super.clone();
res.version = version;
return res;
}
@Override @Override
@SuppressWarnings("MethodDoesntCallSuperMethod") public VersionedBitSet clone() {
public Object clone() { return (VersionedBitSet)super.clone();
return cloneIt();
} }
} }
...@@ -107,7 +107,7 @@ public class TestMVStoreTool extends TestBase { ...@@ -107,7 +107,7 @@ public class TestMVStoreTool extends TestBase {
assertEquals(size2, FileUtils.size(fileNameNew)); assertEquals(size2, FileUtils.size(fileNameNew));
MVStoreTool.compact(fileNameCompressed, true); MVStoreTool.compact(fileNameCompressed, true);
assertEquals(size3, FileUtils.size(fileNameCompressed)); assertEquals(size3, FileUtils.size(fileNameCompressed));
trace("Recompacted in " + (System.currentTimeMillis() - start) + " ms."); trace("Re-compacted in " + (System.currentTimeMillis() - start) + " ms.");
start = System.currentTimeMillis(); start = System.currentTimeMillis();
MVStore s1 = new MVStore.Builder(). MVStore s1 = new MVStore.Builder().
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论