提交 b01cdcaf authored 作者: andrei's avatar andrei

Committing transactions BitSet, Open Transactions array

atomic handling of open/committing tx bitsets and array
TxRollbackListener
上级 73e45928
...@@ -737,6 +737,9 @@ public class Database implements DataHandler { ...@@ -737,6 +737,9 @@ public class Database implements DataHandler {
systemUser.setAdmin(true); systemUser.setAdmin(true);
systemSession = new Session(this, systemUser, ++nextSessionId); systemSession = new Session(this, systemUser, ++nextSessionId);
lobSession = new Session(this, systemUser, ++nextSessionId); lobSession = new Session(this, systemUser, ++nextSessionId);
if(mvStore != null) {
mvStore.getTransactionStore().init(systemSession);
}
CreateTableData data = new CreateTableData(); CreateTableData data = new CreateTableData();
ArrayList<Column> cols = data.columns; ArrayList<Column> cols = data.columns;
Column columnId = new Column("ID", Value.INT); Column columnId = new Column("ID", Value.INT);
...@@ -762,6 +765,7 @@ public class Database implements DataHandler { ...@@ -762,6 +765,7 @@ public class Database implements DataHandler {
metaIdIndex = meta.addIndex(systemSession, "SYS_ID", metaIdIndex = meta.addIndex(systemSession, "SYS_ID",
0, pkCols, IndexType.createPrimaryKey( 0, pkCols, IndexType.createPrimaryKey(
false, false), true, null); false, false), true, null);
systemSession.commit(true);
objectIds.set(0); objectIds.set(0);
starting = true; starting = true;
Cursor cursor = metaIdIndex.find(systemSession, null, null); Cursor cursor = metaIdIndex.find(systemSession, null, null);
...@@ -777,6 +781,7 @@ public class Database implements DataHandler { ...@@ -777,6 +781,7 @@ public class Database implements DataHandler {
rec.execute(this, systemSession, eventListener); rec.execute(this, systemSession, eventListener);
} }
} }
systemSession.commit(true);
if (mvStore != null) { if (mvStore != null) {
mvStore.initTransactions(); mvStore.initTransactions();
mvStore.removeTemporaryMaps(objectIds); mvStore.removeTemporaryMaps(objectIds);
......
...@@ -30,10 +30,12 @@ import org.h2.jdbc.JdbcConnection; ...@@ -30,10 +30,12 @@ import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException; import org.h2.message.DbException;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.message.TraceSystem; import org.h2.message.TraceSystem;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.db.MVTable; import org.h2.mvstore.db.MVTable;
import org.h2.mvstore.db.MVTableEngine; import org.h2.mvstore.db.MVTableEngine;
import org.h2.mvstore.tx.TransactionStore.Change; import org.h2.mvstore.tx.TransactionStore;
import org.h2.mvstore.tx.Transaction; import org.h2.mvstore.tx.Transaction;
import org.h2.mvstore.tx.VersionedValue;
import org.h2.result.ResultInterface; import org.h2.result.ResultInterface;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.result.SortOrder; import org.h2.result.SortOrder;
...@@ -59,7 +61,7 @@ import org.h2.value.ValueString; ...@@ -59,7 +61,7 @@ import org.h2.value.ValueString;
* mode, this object resides on the server side and communicates with a * mode, this object resides on the server side and communicates with a
* SessionRemote object on the client side. * SessionRemote object on the client side.
*/ */
public class Session extends SessionWithState { public class Session extends SessionWithState implements TransactionStore.RollbackListener {
/** /**
* This special log position means that the log entry has been written. * This special log position means that the log entry has been written.
...@@ -649,20 +651,22 @@ public class Session extends SessionWithState { ...@@ -649,20 +651,22 @@ public class Session extends SessionWithState {
currentTransactionName = null; currentTransactionName = null;
transactionStart = 0; transactionStart = 0;
if (transaction != null) { if (transaction != null) {
// increment the data mod count, so that other sessions try {
// see the changes // increment the data mod count, so that other sessions
// TODO should not rely on locking // see the changes
if (!locks.isEmpty()) { // TODO should not rely on locking
for (Table t : locks) { if (!locks.isEmpty()) {
if (t instanceof MVTable) { for (Table t : locks) {
((MVTable) t).commit(); if (t instanceof MVTable) {
((MVTable) t).commit();
}
} }
} }
transaction.commit();
} finally {
transaction = null;
} }
transaction.commit(); } else if (containsUncommitted()) {
transaction = null;
}
if (containsUncommitted()) {
// need to commit even if rollback is not possible // need to commit even if rollback is not possible
// (create/drop table and so on) // (create/drop table and so on)
database.commit(this); database.commit(this);
...@@ -768,18 +772,9 @@ public class Session extends SessionWithState { ...@@ -768,18 +772,9 @@ public class Session extends SessionWithState {
checkCommitRollback(); checkCommitRollback();
currentTransactionName = null; currentTransactionName = null;
transactionStart = 0; transactionStart = 0;
boolean needCommit = false; boolean needCommit = undoLog.size() > 0 || transaction != null;
if (undoLog.size() > 0) { if(needCommit) {
rollbackTo(null, false); rollbackTo(null, false);
needCommit = true;
}
if (transaction != null) {
rollbackTo(null, false);
needCommit = true;
// rollback stored the undo operations in the transaction
// committing will end the transaction
transaction.commit();
transaction = null;
} }
if (!locks.isEmpty() || needCommit) { if (!locks.isEmpty() || needCommit) {
database.commit(this); database.commit(this);
...@@ -806,29 +801,11 @@ public class Session extends SessionWithState { ...@@ -806,29 +801,11 @@ public class Session extends SessionWithState {
undoLog.removeLast(trimToSize); undoLog.removeLast(trimToSize);
} }
if (transaction != null) { if (transaction != null) {
long savepointId = savepoint == null ? 0 : savepoint.transactionSavepoint; if (savepoint == null) {
HashMap<String, MVTable> tableMap = transaction.rollback();
database.getMvStore().getTables(); transaction = null;
Iterator<Change> it = transaction.getChanges(savepointId); } else {
while (it.hasNext()) { transaction.rollbackToSavepoint(savepoint.transactionSavepoint);
Change c = it.next();
MVTable t = tableMap.get(c.mapName);
if (t != null) {
long key = ((ValueLong) c.key).getLong();
ValueArray value = (ValueArray) c.value;
short op;
Row row;
if (value == null) {
op = UndoLogRecord.INSERT;
row = t.getRow(this, key);
} else {
op = UndoLogRecord.DELETE;
row = createRow(value.getList(), Row.MEMORY_CALCULATE);
}
row.setKey(key);
UndoLogRecord log = new UndoLogRecord(t, op, row);
log.undo(this);
}
} }
} }
if (savepoints != null) { if (savepoints != null) {
...@@ -841,6 +818,9 @@ public class Session extends SessionWithState { ...@@ -841,6 +818,9 @@ public class Session extends SessionWithState {
} }
} }
} }
if(queryCache != null) {
queryCache.clear();
}
} }
@Override @Override
...@@ -1112,7 +1092,7 @@ public class Session extends SessionWithState { ...@@ -1112,7 +1092,7 @@ public class Session extends SessionWithState {
*/ */
public boolean containsUncommitted() { public boolean containsUncommitted() {
if (database.getMvStore() != null) { if (database.getMvStore() != null) {
return transaction != null; return transaction != null && transaction.hasChanges();
} }
return firstUncommittedLog != Session.LOG_WRITTEN; return firstUncommittedLog != Session.LOG_WRITTEN;
} }
...@@ -1688,7 +1668,7 @@ public class Session extends SessionWithState { ...@@ -1688,7 +1668,7 @@ public class Session extends SessionWithState {
database.shutdownImmediately(); database.shutdownImmediately();
throw DbException.get(ErrorCode.DATABASE_IS_CLOSED); throw DbException.get(ErrorCode.DATABASE_IS_CLOSED);
} }
transaction = store.getTransactionStore().begin(); transaction = store.getTransactionStore().begin(this);
} }
startStatement = -1; startStatement = -1;
} }
...@@ -1768,6 +1748,64 @@ public class Session extends SessionWithState { ...@@ -1768,6 +1748,64 @@ public class Session extends SessionWithState {
tablesToAnalyze.add(table); tablesToAnalyze.add(table);
} }
@Override
public void onRollback(MVMap<Object, VersionedValue> map, Object key,
VersionedValue existingValue,
VersionedValue restoredValue) {
// Here we are relying on the fact that map which backs table's primary index
// has the same name as the table itself
MVTableEngine.Store store = database.getMvStore();
if(store != null) {
MVTable table = store.getTable(map.getName());
if (table != null) {
long recKey = ((ValueLong)key).getLong();
Row oldRow = getRowFromVersionedValue(table, recKey, existingValue);
Row newRow = getRowFromVersionedValue(table, recKey, restoredValue);
table.fireAfterRow(this, oldRow, newRow, true);
if (table.getContainsLargeObject()) {
if (oldRow != null) {
for (int i = 0, len = oldRow.getColumnCount(); i < len; i++) {
Value v = oldRow.getValue(i);
if (v.isLinkedToTable()) {
removeAtCommit(v);
}
}
}
if (newRow != null) {
for (int i = 0, len = newRow.getColumnCount(); i < len; i++) {
Value v = newRow.getValue(i);
if (v.isLinkedToTable()) {
removeAtCommitStop(v);
}
}
}
}
}
}
}
private static Row getRowFromVersionedValue(MVTable table, long recKey,
VersionedValue versionedValue) {
Object value = versionedValue == null ? null : versionedValue.value;
Row result = null;
if (value != null) {
Row result11;
if(value instanceof Row) {
result11 = (Row) value;
assert result11.getKey() == recKey
: result11.getKey() + " != " + recKey;
} else {
ValueArray array = (ValueArray) value;
result11 = table.createRow(array.getList(), 0);
result11.setKey(recKey);
}
result = result11;
}
return result;
}
/** /**
* Represents a savepoint (a position in a transaction to where one can roll * Represents a savepoint (a position in a transaction to where one can roll
* back to). * back to).
......
...@@ -162,7 +162,7 @@ public class MVTableEngine implements TableEngine { ...@@ -162,7 +162,7 @@ public class MVTableEngine implements TableEngine {
this.transactionStore = new TransactionStore( this.transactionStore = new TransactionStore(
store, store,
new ValueDataType(db.getCompareMode(), db, null)); new ValueDataType(db.getCompareMode(), db, null));
transactionStore.init(); // transactionStore.init();
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
throw convertIllegalStateException(e); throw convertIllegalStateException(e);
} }
...@@ -206,8 +206,8 @@ public class MVTableEngine implements TableEngine { ...@@ -206,8 +206,8 @@ public class MVTableEngine implements TableEngine {
return transactionStore; return transactionStore;
} }
public HashMap<String, MVTable> getTables() { public MVTable getTable(String tableName) {
return new HashMap<>(tableMap); return tableMap.get(tableName);
} }
/** /**
......
...@@ -81,11 +81,22 @@ public class Transaction { ...@@ -81,11 +81,22 @@ public class Transaction {
*/ */
final TransactionStore store; final TransactionStore store;
/**
* Listener for this transaction's rollback changes.
*/
final TransactionStore.RollbackListener listener;
/** /**
* The transaction id. * The transaction id.
* More appropriate name for this field would be "slotId"
*/ */
final int transactionId; final int transactionId;
/**
* This is really a transaction identity, because it's not re-used.
*/
public final long sequenceNum;
/* /*
* Transation state is an atomic composite field: * Transation state is an atomic composite field:
* bit 45 : flag whether transaction had rollback(s) * bit 45 : flag whether transaction had rollback(s)
...@@ -99,12 +110,16 @@ public class Transaction { ...@@ -99,12 +110,16 @@ public class Transaction {
private String name; private String name;
Transaction(TransactionStore store, int transactionId, int status, boolean wasStored;
String name, long logId) {
Transaction(TransactionStore store, int transactionId, long sequenceNum, int status,
String name, long logId, TransactionStore.RollbackListener listener) {
this.store = store; this.store = store;
this.transactionId = transactionId; this.transactionId = transactionId;
this.sequenceNum = sequenceNum;
this.statusAndLogId = new AtomicLong(composeState(status, logId, false)); this.statusAndLogId = new AtomicLong(composeState(status, logId, false));
this.name = name; this.name = name;
this.listener = listener;
} }
public int getId() { public int getId() {
...@@ -168,6 +183,10 @@ public class Transaction { ...@@ -168,6 +183,10 @@ public class Transaction {
} }
} }
public boolean hasChanges() {
return hasChanges(statusAndLogId.get());
}
public void setName(String name) { public void setName(String name) {
checkNotClosed(); checkNotClosed();
this.name = name; this.name = name;
...@@ -289,10 +308,12 @@ public class Transaction { ...@@ -289,10 +308,12 @@ public class Transaction {
* Commit the transaction. Afterwards, this transaction is closed. * Commit the transaction. Afterwards, this transaction is closed.
*/ */
public void commit() { public void commit() {
assert store.openTransactions.get().get(transactionId);
long state = setStatus(STATUS_COMMITTING); long state = setStatus(STATUS_COMMITTING);
long logId = Transaction.getLogId(state); long logId = Transaction.getLogId(state);
int oldStatus = Transaction.getStatus(state); boolean hasChanges = hasChanges(state);
store.commit(this, logId, oldStatus);
store.commit(this, logId, hasChanges);
} }
/** /**
...@@ -330,7 +351,7 @@ public class Transaction { ...@@ -330,7 +351,7 @@ public class Transaction {
store.rollbackTo(this, logId, 0); store.rollbackTo(this, logId, 0);
} }
} finally { } finally {
store.endTransaction(this, STATUS_ROLLED_BACK); store.endTransaction(this, true);
} }
} }
...@@ -373,7 +394,7 @@ public class Transaction { ...@@ -373,7 +394,7 @@ public class Transaction {
@Override @Override
public String toString() { public String toString() {
long state = statusAndLogId.get(); long state = statusAndLogId.get();
return transactionId + " " + STATUS_NAMES[getStatus(state)] + " " + getLogId(state); return transactionId + "(" + sequenceNum + ") " + STATUS_NAMES[getStatus(state)] + " " + getLogId(state);
} }
......
...@@ -11,6 +11,8 @@ import java.util.BitSet; ...@@ -11,6 +11,8 @@ import java.util.BitSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.h2.mvstore.DataUtils; import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
...@@ -18,7 +20,6 @@ import org.h2.mvstore.MVStore; ...@@ -18,7 +20,6 @@ import org.h2.mvstore.MVStore;
import org.h2.mvstore.WriteBuffer; import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.DataType; import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.ObjectDataType; import org.h2.mvstore.type.ObjectDataType;
import org.h2.util.New;
/** /**
* A store that supports concurrent MVCC read-committed transactions. * A store that supports concurrent MVCC read-committed transactions.
...@@ -62,17 +63,38 @@ public class TransactionStore { ...@@ -62,17 +63,38 @@ public class TransactionStore {
private final DataType dataType; private final DataType dataType;
private final BitSet openTransactions = new BitSet(); final AtomicReference<VersionedBitSet> openTransactions = new AtomicReference<>(new VersionedBitSet());
final AtomicReference<BitSet> committingTransactions = new AtomicReference<>(new BitSet());
private boolean init; private boolean init;
private int maxTransactionId = 0xffff; /**
* Soft limit on the number of concurrently opened transactions.
* Not really needed but used by some test.
*/
private int maxTransactionId = MAX_OPEN_TRANSACTIONS;
/**
* Array holding all open transaction objects.
* Position in array is "transaction id".
*/
private final AtomicReferenceArray<Transaction> transactions = new AtomicReferenceArray<>(MAX_OPEN_TRANSACTIONS);
/** /**
* The next id of a temporary map. * The next id of a temporary map.
*/ */
private int nextTempMapId; private int nextTempMapId;
/**
* Hard limit on the number of concurrently opened transactions
*/
// TODO: introduce constructor parameter instead of a static field, driven by URL parameter
private static final int MAX_OPEN_TRANSACTIONS = 0x100;
/** /**
* Create a new transaction store. * Create a new transaction store.
* *
...@@ -113,25 +135,50 @@ public class TransactionStore { ...@@ -113,25 +135,50 @@ public class TransactionStore {
* If the transaction store is corrupt, this method can throw an exception, * If the transaction store is corrupt, this method can throw an exception,
* in which case the store can only be used for reading. * in which case the store can only be used for reading.
*/ */
public synchronized void init() { public void init() {
init = true; init(RollbackListener.NONE);
// remove all temporary maps }
for (String mapName : store.getMapNames()) {
if (mapName.startsWith("temp.")) { public synchronized void init(RollbackListener listener) {
MVMap<Object, Integer> temp = openTempMap(mapName); if (!init) {
store.removeMap(temp); // remove all temporary maps
for (String mapName : store.getMapNames()) {
if (mapName.startsWith("temp.")) {
MVMap<Object, Integer> temp = openTempMap(mapName);
store.removeMap(temp);
}
} }
} rwLock.writeLock().lock();
rwLock.writeLock().lock(); try {
try { if (!undoLog.isEmpty()) {
if (undoLog.size() > 0) { Long key = undoLog.firstKey();
for (Long key : undoLog.keySet()) { while (key != null) {
int transactionId = getTransactionId(key); int transactionId = getTransactionId(key);
openTransactions.set(transactionId); if (!openTransactions.get().get(transactionId)) {
Object[] data = preparedTransactions.get(transactionId);
int status;
String name;
if (data == null) {
status = Transaction.STATUS_OPEN;
name = null;
} else {
status = (Integer) data[0];
name = (String) data[1];
}
long nextTxUndoKey = getOperationId(transactionId + 1, 0);
Long lastUndoKey = undoLog.lowerKey(nextTxUndoKey);
assert lastUndoKey != null;
assert getTransactionId(lastUndoKey) == transactionId;
long logId = getLogId(lastUndoKey) + 1;
registerTransaction(transactionId, status, name, logId, listener);
key = undoLog.ceilingKey(nextTxUndoKey);
}
}
} }
} finally {
rwLock.writeLock().unlock();
} }
} finally { init = true;
rwLock.writeLock().unlock();
} }
} }
...@@ -143,6 +190,8 @@ public class TransactionStore { ...@@ -143,6 +190,8 @@ public class TransactionStore {
* @param max the maximum id * @param max the maximum id
*/ */
public void setMaxTransactionId(int max) { public void setMaxTransactionId(int max) {
DataUtils.checkArgument(max <= MAX_OPEN_TRANSACTIONS,
"Concurrent transactions limit is too hight: {0}", max);
this.maxTransactionId = max; this.maxTransactionId = max;
} }
...@@ -200,32 +249,21 @@ public class TransactionStore { ...@@ -200,32 +249,21 @@ public class TransactionStore {
* @return the list of transactions (sorted by id) * @return the list of transactions (sorted by id)
*/ */
public List<Transaction> getOpenTransactions() { public List<Transaction> getOpenTransactions() {
if(!init) {
init();
}
rwLock.readLock().lock(); rwLock.readLock().lock();
try { try {
ArrayList<Transaction> list = New.arrayList(); ArrayList<Transaction> list = new ArrayList<>();
Long key = undoLog.firstKey(); int transactionId = 0;
while (key != null) { BitSet bitSet = openTransactions.get();
int transactionId = getTransactionId(key); while((transactionId = bitSet.nextSetBit(transactionId + 1)) > 0) {
key = undoLog.lowerKey(getOperationId(transactionId + 1, 0)); Transaction transaction = getTransaction(transactionId);
long logId = getLogId(key) + 1; if(transaction != null) {
Object[] data = preparedTransactions.get(transactionId); if(transaction.getStatus() != Transaction.STATUS_CLOSED) {
int status; list.add(transaction);
String name;
if (data == null) {
if (undoLog.containsKey(getOperationId(transactionId, 0))) {
status = Transaction.STATUS_OPEN;
} else {
status = Transaction.STATUS_COMMITTING;
} }
name = null;
} else {
status = (Integer) data[0];
name = (String) data[1];
} }
Transaction t = new Transaction(this, transactionId, status,
name, logId);
list.add(t);
key = undoLog.ceilingKey(getOperationId(transactionId + 1, 0));
} }
return list; return list;
} finally { } finally {
...@@ -245,25 +283,54 @@ public class TransactionStore { ...@@ -245,25 +283,54 @@ public class TransactionStore {
* *
* @return the transaction * @return the transaction
*/ */
public synchronized Transaction begin() { public Transaction begin() {
return begin(RollbackListener.NONE);
}
/**
* Begin a new transaction.
* @param listener to be notified in case of a rollback
*
* @return the transaction
*/
public Transaction begin(RollbackListener listener) {
Transaction transaction = registerTransaction(0, Transaction.STATUS_OPEN, null, 0, listener);
return transaction;
}
private Transaction registerTransaction(int txId, int status, String name, long logId,
RollbackListener listener) {
int transactionId; int transactionId;
int status; long sequenceNo;
if (!init) { boolean success;
throw DataUtils.newIllegalStateException( do {
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE, VersionedBitSet original = openTransactions.get();
"Not initialized"); if (txId == 0) {
} transactionId = original.nextClearBit(1);
transactionId = openTransactions.nextClearBit(1); } else {
if (transactionId > maxTransactionId) { transactionId = txId;
throw DataUtils.newIllegalStateException( assert !original.get(transactionId);
DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS, }
"There are {0} open transactions", if (transactionId > maxTransactionId) {
transactionId - 1); throw DataUtils.newIllegalStateException(
} DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS,
openTransactions.set(transactionId); "There are {0} open transactions",
status = Transaction.STATUS_OPEN; transactionId - 1);
return new Transaction(this, transactionId, status, null, 0); }
VersionedBitSet clone = original.cloneIt();
clone.set(transactionId);
sequenceNo = clone.getVersion() + 1;
clone.setVersion(sequenceNo);
success = openTransactions.compareAndSet(original, clone);
} while(!success);
Transaction transaction = new Transaction(this, transactionId, sequenceNo, status, name, logId, listener);
success = transactions.compareAndSet(transactionId, null, transaction);
assert success;
return transaction;
} }
/** /**
...@@ -276,6 +343,7 @@ public class TransactionStore { ...@@ -276,6 +343,7 @@ public class TransactionStore {
t.getName() != null) { t.getName() != null) {
Object[] v = { t.getStatus(), t.getName() }; Object[] v = { t.getStatus(), t.getName() };
preparedTransactions.put(t.getId(), v); preparedTransactions.put(t.getId(), v);
t.wasStored = true;
} }
} }
...@@ -348,23 +416,28 @@ public class TransactionStore { ...@@ -348,23 +416,28 @@ public class TransactionStore {
* *
* @param t the transaction * @param t the transaction
* @param maxLogId the last log id * @param maxLogId the last log id
* @param oldStatus last status * @param hasChanges false for R/O tx
*/ */
void commit(Transaction t, long maxLogId, int oldStatus) { void commit(Transaction t, long maxLogId, boolean hasChanges) {
if (store.isClosed()) { if (store.isClosed()) {
return; return;
} }
int transactionId = t.transactionId;
// this is an atomic action that causes all changes
// made by this transaction, to be considered as "committed"
flipCommittingTransactionsBit(transactionId, true);
// TODO could synchronize on blocks (100 at a time or so) // TODO could synchronize on blocks (100 at a time or so)
rwLock.writeLock().lock(); rwLock.writeLock().lock();
try { try {
for (long logId = 0; logId < maxLogId; logId++) { for (long logId = 0; logId < maxLogId; logId++) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(transactionId, logId);
Object[] op = undoLog.get(undoKey); Object[] op = undoLog.get(undoKey);
if (op == null) { if (op == null) {
// partially committed: load next // partially committed: load next
undoKey = undoLog.ceilingKey(undoKey); undoKey = undoLog.ceilingKey(undoKey);
if (undoKey == null || if (undoKey == null ||
getTransactionId(undoKey) != t.getId()) { getTransactionId(undoKey) != transactionId) {
break; break;
} }
logId = getLogId(undoKey) - 1; logId = getLogId(undoKey) - 1;
...@@ -391,8 +464,20 @@ public class TransactionStore { ...@@ -391,8 +464,20 @@ public class TransactionStore {
} }
} finally { } finally {
rwLock.writeLock().unlock(); rwLock.writeLock().unlock();
flipCommittingTransactionsBit(transactionId, false);
} }
endTransaction(t, oldStatus); endTransaction(t, hasChanges);
}
private void flipCommittingTransactionsBit(int transactionId, boolean flag) {
boolean success;
do {
BitSet original = committingTransactions.get();
assert original.get(transactionId) != flag : flag ? "Double commit" : "Mysterious bit's disappearance";
BitSet clone = (BitSet) original.clone();
clone.set(transactionId, flag);
success = committingTransactions.compareAndSet(original, clone);
} while(!success);
} }
/** /**
...@@ -465,7 +550,7 @@ public class TransactionStore { ...@@ -465,7 +550,7 @@ public class TransactionStore {
* @param mapName the map name * @param mapName the map name
* @return the map * @return the map
*/ */
MVMap<Object, Integer> openTempMap(String mapName) { private MVMap<Object, Integer> openTempMap(String mapName) {
MVMap.Builder<Object, Integer> mapBuilder = MVMap.Builder<Object, Integer> mapBuilder =
new MVMap.Builder<Object, Integer>(). new MVMap.Builder<Object, Integer>().
keyType(dataType); keyType(dataType);
...@@ -476,31 +561,49 @@ public class TransactionStore { ...@@ -476,31 +561,49 @@ public class TransactionStore {
* End this transaction * End this transaction
* *
* @param t the transaction * @param t the transaction
* @param oldStatus status of this transaction * @param hasChanges false for R/O tx
*/ */
synchronized void endTransaction(Transaction t, int oldStatus) { synchronized void endTransaction(Transaction t, boolean hasChanges) {
if (oldStatus == Transaction.STATUS_PREPARED) { int txId = t.transactionId;
preparedTransactions.remove(t.getId());
}
t.setStatus(Transaction.STATUS_CLOSED); t.setStatus(Transaction.STATUS_CLOSED);
openTransactions.clear(t.transactionId);
if (oldStatus == Transaction.STATUS_PREPARED || store.getAutoCommitDelay() == 0) { boolean success = transactions.compareAndSet(txId, t, null);
store.tryCommit(); assert success;
return; do {
} VersionedBitSet original = openTransactions.get();
// to avoid having to store the transaction log, assert original.get(txId);
// if there is no open transaction, VersionedBitSet clone = original.cloneIt();
// and if there have been many changes, store them now clone.clear(txId);
if (undoLog.isEmpty()) { success = openTransactions.compareAndSet(original, clone);
int unsaved = store.getUnsavedMemory(); } while(!success);
int max = store.getAutoCommitMemory();
// save at 3/4 capacity if (hasChanges) {
if (unsaved * 4 > max * 3) { boolean wasStored = t.wasStored;
if (wasStored && !preparedTransactions.isClosed()) {
preparedTransactions.remove(txId);
}
if (wasStored || store.getAutoCommitDelay() == 0) {
store.tryCommit(); store.tryCommit();
} else {
// to avoid having to store the transaction log,
// if there is no open transaction,
// and if there have been many changes, store them now
if (undoLog.isEmpty()) {
int unsaved = store.getUnsavedMemory();
int max = store.getAutoCommitMemory();
// save at 3/4 capacity
if (unsaved * 4 > max * 3) {
store.tryCommit();
}
}
} }
} }
} }
Transaction getTransaction(int transactionId) {
return transactions.get(transactionId);
}
/** /**
* Rollback to an old savepoint. * Rollback to an old savepoint.
* *
...@@ -530,13 +633,15 @@ public class TransactionStore { ...@@ -530,13 +633,15 @@ public class TransactionStore {
if (map != null) { if (map != null) {
Object key = op[1]; Object key = op[1];
VersionedValue oldValue = (VersionedValue) op[2]; VersionedValue oldValue = (VersionedValue) op[2];
VersionedValue currentValue;
if (oldValue == null) { if (oldValue == null) {
// this transaction added the value // this transaction added the value
map.remove(key); currentValue = map.remove(key);
} else { } else {
// this transaction updated the value // this transaction updated the value
map.put(key, oldValue); currentValue = map.put(key, oldValue);
} }
t.listener.onRollback(map, key, currentValue, oldValue);
} }
undoLog.remove(undoKey); undoLog.remove(undoKey);
} }
...@@ -561,22 +666,19 @@ public class TransactionStore { ...@@ -561,22 +666,19 @@ public class TransactionStore {
private long logId = maxLogId - 1; private long logId = maxLogId - 1;
private Change current; private Change current;
{
fetchNext();
}
private void fetchNext() { private void fetchNext() {
rwLock.writeLock().lock(); rwLock.writeLock().lock();
try { try {
int transactionId = t.getId();
while (logId >= toLogId) { while (logId >= toLogId) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(transactionId, logId);
Object[] op = undoLog.get(undoKey); Object[] op = undoLog.get(undoKey);
logId--; logId--;
if (op == null) { if (op == null) {
// partially rolled back: load previous // partially rolled back: load previous
undoKey = undoLog.floorKey(undoKey); undoKey = undoLog.floorKey(undoKey);
if (undoKey == null || if (undoKey == null ||
getTransactionId(undoKey) != t.getId()) { getTransactionId(undoKey) != transactionId) {
break; break;
} }
logId = getLogId(undoKey); logId = getLogId(undoKey);
...@@ -584,15 +686,9 @@ public class TransactionStore { ...@@ -584,15 +686,9 @@ public class TransactionStore {
} }
int mapId = ((Integer) op[0]).intValue(); int mapId = ((Integer) op[0]).intValue();
MVMap<Object, VersionedValue> m = openMap(mapId); MVMap<Object, VersionedValue> m = openMap(mapId);
if (m == null) { if (m != null) { // could be null if map was removed later on
// map was removed later on
} else {
current = new Change();
current.mapName = m.getName();
current.key = op[1];
VersionedValue oldValue = (VersionedValue) op[2]; VersionedValue oldValue = (VersionedValue) op[2];
current.value = oldValue == null ? current = new Change(m.getName(), op[1], oldValue == null ? null : oldValue.value);
null : oldValue.value;
return; return;
} }
} }
...@@ -604,16 +700,19 @@ public class TransactionStore { ...@@ -604,16 +700,19 @@ public class TransactionStore {
@Override @Override
public boolean hasNext() { public boolean hasNext() {
if(current == null) {
fetchNext();
}
return current != null; return current != null;
} }
@Override @Override
public Change next() { public Change next() {
if (current == null) { if(!hasNext()) {
throw DataUtils.newUnsupportedOperationException("no data"); throw DataUtils.newUnsupportedOperationException("no data");
} }
Change result = current; Change result = current;
fetchNext(); current = null;
return result; return result;
} }
...@@ -633,19 +732,38 @@ public class TransactionStore { ...@@ -633,19 +732,38 @@ public class TransactionStore {
/** /**
* The name of the map where the change occurred. * The name of the map where the change occurred.
*/ */
public String mapName; public final String mapName;
/** /**
* The key. * The key.
*/ */
public Object key; public final Object key;
/** /**
* The value. * The value.
*/ */
public Object value; public final Object value;
public Change(String mapName, Object key, Object value) {
this.mapName = mapName;
this.key = key;
this.value = value;
}
} }
public interface RollbackListener {
RollbackListener NONE = new RollbackListener() {
@Override
public void onRollback(MVMap<Object, VersionedValue> map, Object key,
VersionedValue existingValue, VersionedValue restoredValue) {
}
};
void onRollback(MVMap<Object,VersionedValue> map, Object key,
VersionedValue existingValue, VersionedValue restoredValue);
}
/** /**
* A data type that contains an array of objects with the specified data * A data type that contains an array of objects with the specified data
......
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore.tx;
import java.util.BitSet;
/**
* Class VersionedBitSet extends standard BitSet to add a version field.
* This will allow bit set and version to be changed atomically.
*
* @author <a href='mailto:andrei.tokar@gmail.com'>Andrei Tokar</a>
*/
final class VersionedBitSet extends BitSet
{
private long version;
public VersionedBitSet() {}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
public VersionedBitSet cloneIt() {
VersionedBitSet res = (VersionedBitSet) super.clone();
res.version = version;
return res;
}
@Override
@SuppressWarnings("MethodDoesntCallSuperMethod")
public Object clone() {
return cloneIt();
}
}
...@@ -24,7 +24,7 @@ public class VersionedValue { ...@@ -24,7 +24,7 @@ public class VersionedValue {
/** /**
* The value. * The value.
*/ */
final Object value; public final Object value;
VersionedValue(long operationId, Object value) { VersionedValue(long operationId, Object value) {
this.operationId = operationId; this.operationId = operationId;
......
...@@ -621,8 +621,12 @@ public abstract class Table extends SchemaObjectBase { ...@@ -621,8 +621,12 @@ public abstract class Table extends SchemaObjectBase {
} }
} }
public Row createRow(Value[] data, int memory) {
return database.createRow(data, memory);
}
public Row getTemplateRow() { public Row getTemplateRow() {
return database.createRow(new Value[columns.length], Row.MEMORY_CALCULATE); return createRow(new Value[columns.length], Row.MEMORY_CALCULATE);
} }
/** /**
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论