Unverified 提交 d6ad398c authored 作者: Andrei Tokar's avatar Andrei Tokar 提交者: GitHub

Merge pull request #1058 from h2database/txcommit-atomic

Txcommit atomic
......@@ -737,6 +737,9 @@ public class Database implements DataHandler {
systemUser.setAdmin(true);
systemSession = new Session(this, systemUser, ++nextSessionId);
lobSession = new Session(this, systemUser, ++nextSessionId);
if(mvStore != null) {
mvStore.getTransactionStore().init(systemSession);
}
CreateTableData data = new CreateTableData();
ArrayList<Column> cols = data.columns;
Column columnId = new Column("ID", Value.INT);
......@@ -762,6 +765,7 @@ public class Database implements DataHandler {
metaIdIndex = meta.addIndex(systemSession, "SYS_ID",
0, pkCols, IndexType.createPrimaryKey(
false, false), true, null);
systemSession.commit(true);
objectIds.set(0);
starting = true;
Cursor cursor = metaIdIndex.find(systemSession, null, null);
......@@ -777,6 +781,7 @@ public class Database implements DataHandler {
rec.execute(this, systemSession, eventListener);
}
}
systemSession.commit(true);
if (mvStore != null) {
mvStore.initTransactions();
mvStore.removeTemporaryMaps(objectIds);
......
......@@ -30,10 +30,12 @@ import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.db.MVTable;
import org.h2.mvstore.db.MVTableEngine;
import org.h2.mvstore.tx.TransactionStore.Change;
import org.h2.mvstore.tx.TransactionStore;
import org.h2.mvstore.tx.Transaction;
import org.h2.mvstore.tx.VersionedValue;
import org.h2.result.ResultInterface;
import org.h2.result.Row;
import org.h2.result.SortOrder;
......@@ -59,7 +61,7 @@ import org.h2.value.ValueString;
* mode, this object resides on the server side and communicates with a
* SessionRemote object on the client side.
*/
public class Session extends SessionWithState {
public class Session extends SessionWithState implements TransactionStore.RollbackListener {
/**
* This special log position means that the log entry has been written.
......@@ -649,20 +651,22 @@ public class Session extends SessionWithState {
currentTransactionName = null;
transactionStart = 0;
if (transaction != null) {
// increment the data mod count, so that other sessions
// see the changes
// TODO should not rely on locking
if (!locks.isEmpty()) {
for (Table t : locks) {
if (t instanceof MVTable) {
((MVTable) t).commit();
try {
// increment the data mod count, so that other sessions
// see the changes
// TODO should not rely on locking
if (!locks.isEmpty()) {
for (Table t : locks) {
if (t instanceof MVTable) {
((MVTable) t).commit();
}
}
}
transaction.commit();
} finally {
transaction = null;
}
transaction.commit();
transaction = null;
}
if (containsUncommitted()) {
} else if (containsUncommitted()) {
// need to commit even if rollback is not possible
// (create/drop table and so on)
database.commit(this);
......@@ -768,18 +772,9 @@ public class Session extends SessionWithState {
checkCommitRollback();
currentTransactionName = null;
transactionStart = 0;
boolean needCommit = false;
if (undoLog.size() > 0) {
rollbackTo(null, false);
needCommit = true;
}
if (transaction != null) {
boolean needCommit = undoLog.size() > 0 || transaction != null;
if(needCommit) {
rollbackTo(null, false);
needCommit = true;
// rollback stored the undo operations in the transaction
// committing will end the transaction
transaction.commit();
transaction = null;
}
if (!locks.isEmpty() || needCommit) {
database.commit(this);
......@@ -806,29 +801,11 @@ public class Session extends SessionWithState {
undoLog.removeLast(trimToSize);
}
if (transaction != null) {
long savepointId = savepoint == null ? 0 : savepoint.transactionSavepoint;
HashMap<String, MVTable> tableMap =
database.getMvStore().getTables();
Iterator<Change> it = transaction.getChanges(savepointId);
while (it.hasNext()) {
Change c = it.next();
MVTable t = tableMap.get(c.mapName);
if (t != null) {
long key = ((ValueLong) c.key).getLong();
ValueArray value = (ValueArray) c.value;
short op;
Row row;
if (value == null) {
op = UndoLogRecord.INSERT;
row = t.getRow(this, key);
} else {
op = UndoLogRecord.DELETE;
row = createRow(value.getList(), Row.MEMORY_CALCULATE);
}
row.setKey(key);
UndoLogRecord log = new UndoLogRecord(t, op, row);
log.undo(this);
}
if (savepoint == null) {
transaction.rollback();
transaction = null;
} else {
transaction.rollbackToSavepoint(savepoint.transactionSavepoint);
}
}
if (savepoints != null) {
......@@ -841,6 +818,13 @@ public class Session extends SessionWithState {
}
}
}
// Because cache may have captured query result (in Query.lastResult),
// which is based on data from uncommitted transaction.,
// It is not valid after rollback, therefore cache has to be cleared.
if(queryCache != null) {
queryCache.clear();
}
}
@Override
......@@ -1112,7 +1096,7 @@ public class Session extends SessionWithState {
*/
public boolean containsUncommitted() {
if (database.getMvStore() != null) {
return transaction != null;
return transaction != null && transaction.hasChanges();
}
return firstUncommittedLog != Session.LOG_WRITTEN;
}
......@@ -1688,7 +1672,7 @@ public class Session extends SessionWithState {
database.shutdownImmediately();
throw DbException.get(ErrorCode.DATABASE_IS_CLOSED);
}
transaction = store.getTransactionStore().begin();
transaction = store.getTransactionStore().begin(this);
}
startStatement = -1;
}
......@@ -1768,6 +1752,62 @@ public class Session extends SessionWithState {
tablesToAnalyze.add(table);
}
@Override
public void onRollback(MVMap<Object, VersionedValue> map, Object key,
VersionedValue existingValue,
VersionedValue restoredValue) {
// Here we are relying on the fact that map which backs table's primary index
// has the same name as the table itself
MVTableEngine.Store store = database.getMvStore();
if(store != null) {
MVTable table = store.getTable(map.getName());
if (table != null) {
long recKey = ((ValueLong)key).getLong();
Row oldRow = getRowFromVersionedValue(table, recKey, existingValue);
Row newRow = getRowFromVersionedValue(table, recKey, restoredValue);
table.fireAfterRow(this, oldRow, newRow, true);
if (table.getContainsLargeObject()) {
if (oldRow != null) {
for (int i = 0, len = oldRow.getColumnCount(); i < len; i++) {
Value v = oldRow.getValue(i);
if (v.isLinkedToTable()) {
removeAtCommit(v);
}
}
}
if (newRow != null) {
for (int i = 0, len = newRow.getColumnCount(); i < len; i++) {
Value v = newRow.getValue(i);
if (v.isLinkedToTable()) {
removeAtCommitStop(v);
}
}
}
}
}
}
}
private static Row getRowFromVersionedValue(MVTable table, long recKey,
VersionedValue versionedValue) {
Object value = versionedValue == null ? null : versionedValue.value;
if (value == null) {
return null;
}
Row result;
if(value instanceof Row) {
result = (Row) value;
assert result.getKey() == recKey : result.getKey() + " != " + recKey;
} else {
ValueArray array = (ValueArray) value;
result = table.createRow(array.getList(), 0);
result.setKey(recKey);
}
return result;
}
/**
* Represents a savepoint (a position in a transaction to where one can roll
* back to).
......
......@@ -1103,11 +1103,11 @@ public class MVStore {
}
}
} finally {
// in any case reset the current store version,
// to allow closing the store
currentStoreVersion = -1;
currentStoreThread.set(null);
}
// in any case reset the current store version,
// to allow closing the store
currentStoreVersion = -1;
currentStoreThread.set(null);
}
}
private void storeNow() {
......
......@@ -162,7 +162,7 @@ public class MVTableEngine implements TableEngine {
this.transactionStore = new TransactionStore(
store,
new ValueDataType(db.getCompareMode(), db, null));
transactionStore.init();
// transactionStore.init();
} catch (IllegalStateException e) {
throw convertIllegalStateException(e);
}
......@@ -206,8 +206,8 @@ public class MVTableEngine implements TableEngine {
return transactionStore;
}
public HashMap<String, MVTable> getTables() {
return new HashMap<>(tableMap);
public MVTable getTable(String tableName) {
return tableMap.get(tableName);
}
/**
......
......@@ -81,13 +81,24 @@ public class Transaction {
*/
final TransactionStore store;
/**
* Listener for this transaction's rollback changes.
*/
final TransactionStore.RollbackListener listener;
/**
* The transaction id.
* More appropriate name for this field would be "slotId"
*/
final int transactionId;
/**
* This is really a transaction identity, because it's not re-used.
*/
public final long sequenceNum;
/*
* Transation state is an atomic composite field:
* Transaction state is an atomic composite field:
* bit 45 : flag whether transaction had rollback(s)
* bits 44-41 : status
* bits 40 : overflow control bit, 1 indicates overflow
......@@ -99,12 +110,16 @@ public class Transaction {
private String name;
Transaction(TransactionStore store, int transactionId, int status,
String name, long logId) {
boolean wasStored;
Transaction(TransactionStore store, int transactionId, long sequenceNum, int status,
String name, long logId, TransactionStore.RollbackListener listener) {
this.store = store;
this.transactionId = transactionId;
this.sequenceNum = sequenceNum;
this.statusAndLogId = new AtomicLong(composeState(status, logId, false));
this.name = name;
this.listener = listener;
}
public int getId() {
......@@ -151,9 +166,9 @@ public class Transaction {
currentStatus == STATUS_COMMITTED ||
currentStatus == STATUS_ROLLED_BACK;
break;
default:
valid = false;
break;
default:
valid = false;
break;
}
if (!valid) {
throw DataUtils.newIllegalStateException(
......@@ -168,6 +183,10 @@ public class Transaction {
}
}
public boolean hasChanges() {
return hasChanges(statusAndLogId.get());
}
public void setName(String name) {
checkNotClosed();
this.name = name;
......@@ -257,7 +276,7 @@ public class Transaction {
* @return the transaction map
*/
public <K, V> TransactionMap<K, V> openMap(String name,
DataType keyType, DataType valueType) {
DataType keyType, DataType valueType) {
MVMap<K, VersionedValue> map = store.openMap(name, keyType, valueType);
return openMap(map);
}
......@@ -289,10 +308,12 @@ public class Transaction {
* Commit the transaction. Afterwards, this transaction is closed.
*/
public void commit() {
assert store.openTransactions.get().get(transactionId);
long state = setStatus(STATUS_COMMITTING);
long logId = Transaction.getLogId(state);
int oldStatus = Transaction.getStatus(state);
store.commit(this, logId, oldStatus);
boolean hasChanges = hasChanges(state);
store.commit(this, logId, hasChanges);
}
/**
......@@ -330,7 +351,7 @@ public class Transaction {
store.rollbackTo(this, logId, 0);
}
} finally {
store.endTransaction(this, STATUS_ROLLED_BACK);
store.endTransaction(this, true);
}
}
......@@ -373,7 +394,7 @@ public class Transaction {
@Override
public String toString() {
long state = statusAndLogId.get();
return transactionId + " " + STATUS_NAMES[getStatus(state)] + " " + getLogId(state);
return transactionId + "(" + sequenceNum + ") " + STATUS_NAMES[getStatus(state)] + " " + getLogId(state);
}
......
......@@ -47,7 +47,7 @@ public class TransactionMap<K, V> {
final Transaction transaction;
TransactionMap(Transaction transaction, MVMap<K, VersionedValue> map,
int mapId) {
int mapId) {
this.transaction = transaction;
this.map = map;
this.mapId = mapId;
......
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore.tx;
import java.util.BitSet;
/**
* Class VersionedBitSet extends standard BitSet to add a version field.
* This will allow bit set and version to be changed atomically.
*/
final class VersionedBitSet extends BitSet
{
private long version;
public VersionedBitSet() {}
public long getVersion() {
return version;
}
public void setVersion(long version) {
this.version = version;
}
@Override
public VersionedBitSet clone() {
return (VersionedBitSet)super.clone();
}
}
......@@ -24,7 +24,7 @@ public class VersionedValue {
/**
* The value.
*/
final Object value;
public final Object value;
VersionedValue(long operationId, Object value) {
this.operationId = operationId;
......
......@@ -621,8 +621,12 @@ public abstract class Table extends SchemaObjectBase {
}
}
public Row createRow(Value[] data, int memory) {
return database.createRow(data, memory);
}
public Row getTemplateRow() {
return database.createRow(new Value[columns.length], Row.MEMORY_CALCULATE);
return createRow(new Value[columns.length], Row.MEMORY_CALCULATE);
}
/**
......
......@@ -107,7 +107,7 @@ public class TestMVStoreTool extends TestBase {
assertEquals(size2, FileUtils.size(fileNameNew));
MVStoreTool.compact(fileNameCompressed, true);
assertEquals(size3, FileUtils.size(fileNameCompressed));
trace("Recompacted in " + (System.currentTimeMillis() - start) + " ms.");
trace("Re-compacted in " + (System.currentTimeMillis() - start) + " ms.");
start = System.currentTimeMillis();
MVStore s1 = new MVStore.Builder().
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论