Unverified 提交 16677754 authored 作者: Andrei Tokar's avatar Andrei Tokar 提交者: GitHub

Merge pull request #1167 from h2database/txcommit-atomic

MVStore: Undo log synchronization removal
...@@ -7,6 +7,8 @@ package org.h2.command; ...@@ -7,6 +7,8 @@ package org.h2.command;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.h2.api.ErrorCode; import org.h2.api.ErrorCode;
import org.h2.engine.Constants; import org.h2.engine.Constants;
import org.h2.engine.Database; import org.h2.engine.Database;
...@@ -184,7 +186,7 @@ public abstract class Command implements CommandInterface { ...@@ -184,7 +186,7 @@ public abstract class Command implements CommandInterface {
startTimeNanos = 0; startTimeNanos = 0;
long start = 0; long start = 0;
Database database = session.getDatabase(); Database database = session.getDatabase();
Object sync = database.isMultiThreaded() ? (Object) session : (Object) database; Object sync = database.isMultiThreaded() || database.getMvStore() != null ? session : database;
session.waitIfExclusiveModeEnabled(); session.waitIfExclusiveModeEnabled();
boolean callStop = true; boolean callStop = true;
boolean writing = !isReadOnly(); boolean writing = !isReadOnly();
...@@ -193,7 +195,9 @@ public abstract class Command implements CommandInterface { ...@@ -193,7 +195,9 @@ public abstract class Command implements CommandInterface {
// wait // wait
} }
} }
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (sync) { synchronized (sync) {
session.startStatementWithinTransaction();
session.setCurrentCommand(this, false); session.setCurrentCommand(this, false);
try { try {
while (true) { while (true) {
...@@ -242,7 +246,7 @@ public abstract class Command implements CommandInterface { ...@@ -242,7 +246,7 @@ public abstract class Command implements CommandInterface {
public ResultWithGeneratedKeys executeUpdate(Object generatedKeysRequest) { public ResultWithGeneratedKeys executeUpdate(Object generatedKeysRequest) {
long start = 0; long start = 0;
Database database = session.getDatabase(); Database database = session.getDatabase();
Object sync = database.isMultiThreaded() ? (Object) session : (Object) database; Object sync = database.isMultiThreaded() || database.getMvStore() != null ? session : database;
session.waitIfExclusiveModeEnabled(); session.waitIfExclusiveModeEnabled();
boolean callStop = true; boolean callStop = true;
boolean writing = !isReadOnly(); boolean writing = !isReadOnly();
...@@ -251,8 +255,10 @@ public abstract class Command implements CommandInterface { ...@@ -251,8 +255,10 @@ public abstract class Command implements CommandInterface {
// wait // wait
} }
} }
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized (sync) { synchronized (sync) {
Session.Savepoint rollback = session.setSavepoint(); Session.Savepoint rollback = session.setSavepoint();
session.startStatementWithinTransaction();
session.setCurrentCommand(this, generatedKeysRequest); session.setCurrentCommand(this, generatedKeysRequest);
try { try {
while (true) { while (true) {
...@@ -311,25 +317,30 @@ public abstract class Command implements CommandInterface { ...@@ -311,25 +317,30 @@ public abstract class Command implements CommandInterface {
errorCode != ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1) { errorCode != ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1) {
throw e; throw e;
} }
long now = System.nanoTime() / 1_000_000; long now = System.nanoTime();
if (start != 0 && now - start > session.getLockTimeout()) { if (start != 0 && TimeUnit.NANOSECONDS.toMillis(now - start) > session.getLockTimeout()) {
throw DbException.get(ErrorCode.LOCK_TIMEOUT_1, e.getCause(), ""); throw DbException.get(ErrorCode.LOCK_TIMEOUT_1, e);
} }
// Only in PageStore mode we need to sleep here to avoid buzy wait loop
Database database = session.getDatabase(); Database database = session.getDatabase();
int sleep = 1 + MathUtils.randomInt(10); if (database.getMvStore() == null) {
while (true) { int sleep = 1 + MathUtils.randomInt(10);
try { while (true) {
if (database.isMultiThreaded()) { try {
Thread.sleep(sleep); if (database.isMultiThreaded()) {
} else { Thread.sleep(sleep);
database.wait(sleep); } else {
// although nobody going to notify us
// it is vital to give up lock on a database
database.wait(sleep);
}
} catch (InterruptedException e1) {
// ignore
}
long slept = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - now);
if (slept >= sleep) {
break;
} }
} catch (InterruptedException e1) {
// ignore
}
long slept = System.nanoTime() / 1_000_000 - now;
if (slept >= sleep) {
break;
} }
} }
return start == 0 ? now : start; return start == 0 ? now : start;
......
...@@ -594,7 +594,8 @@ public class Select extends Query { ...@@ -594,7 +594,8 @@ public class Select extends Query {
} }
} }
ArrayList<Row> forUpdateRows = null; ArrayList<Row> forUpdateRows = null;
if (isForUpdateMvcc) { boolean lockRows = this.isForUpdateMvcc;
if (lockRows) {
forUpdateRows = Utils.newSmallArrayList(); forUpdateRows = Utils.newSmallArrayList();
} }
int sampleSize = getSampleSizeValue(session); int sampleSize = getSampleSizeValue(session);
...@@ -604,7 +605,7 @@ public class Select extends Query { ...@@ -604,7 +605,7 @@ public class Select extends Query {
return lazyResult; return lazyResult;
} }
while (lazyResult.next()) { while (lazyResult.next()) {
if (isForUpdateMvcc) { if (lockRows) {
topTableFilter.lockRowAdd(forUpdateRows); topTableFilter.lockRowAdd(forUpdateRows);
} }
result.addRow(lazyResult.currentRow()); result.addRow(lazyResult.currentRow());
...@@ -613,7 +614,7 @@ public class Select extends Query { ...@@ -613,7 +614,7 @@ public class Select extends Query {
break; break;
} }
} }
if (isForUpdateMvcc) { if (lockRows) {
topTableFilter.lockRows(forUpdateRows); topTableFilter.lockRows(forUpdateRows);
} }
return null; return null;
......
...@@ -135,6 +135,7 @@ public class Update extends Prepared { ...@@ -135,6 +135,7 @@ public class Update extends Prepared {
} }
newRow.setValue(i, newValue); newRow.setValue(i, newValue);
} }
newRow.setKey(oldRow.getKey());
if (setOnUpdate || updateToCurrentValuesReturnsZero) { if (setOnUpdate || updateToCurrentValuesReturnsZero) {
setOnUpdate = false; setOnUpdate = false;
for (int i = 0; i < columnCount; i++) { for (int i = 0; i < columnCount; i++) {
......
...@@ -45,7 +45,6 @@ import org.h2.schema.SchemaObject; ...@@ -45,7 +45,6 @@ import org.h2.schema.SchemaObject;
import org.h2.schema.Sequence; import org.h2.schema.Sequence;
import org.h2.schema.TriggerObject; import org.h2.schema.TriggerObject;
import org.h2.security.auth.Authenticator; import org.h2.security.auth.Authenticator;
import org.h2.security.auth.AuthenticatorFactory;
import org.h2.store.DataHandler; import org.h2.store.DataHandler;
import org.h2.store.FileLock; import org.h2.store.FileLock;
import org.h2.store.FileLockMethod; import org.h2.store.FileLockMethod;
...@@ -336,6 +335,12 @@ public class Database implements DataHandler { ...@@ -336,6 +335,12 @@ public class Database implements DataHandler {
} }
} }
public int getLockTimeout() {
Setting setting = findSetting(
SetTypes.getTypeName(SetTypes.DEFAULT_LOCK_TIMEOUT));
return setting == null ? Constants.INITIAL_LOCK_TIMEOUT : setting.getIntValue();
}
/** /**
* Create a new row for a table. * Create a new row for a table.
* *
......
...@@ -23,7 +23,6 @@ import org.h2.command.Parser; ...@@ -23,7 +23,6 @@ import org.h2.command.Parser;
import org.h2.command.Prepared; import org.h2.command.Prepared;
import org.h2.command.ddl.Analyze; import org.h2.command.ddl.Analyze;
import org.h2.command.dml.Query; import org.h2.command.dml.Query;
import org.h2.command.dml.SetTypes;
import org.h2.constraint.Constraint; import org.h2.constraint.Constraint;
import org.h2.index.Index; import org.h2.index.Index;
import org.h2.index.ViewIndex; import org.h2.index.ViewIndex;
...@@ -64,6 +63,8 @@ import org.h2.value.ValueString; ...@@ -64,6 +63,8 @@ import org.h2.value.ValueString;
*/ */
public class Session extends SessionWithState implements TransactionStore.RollbackListener { public class Session extends SessionWithState implements TransactionStore.RollbackListener {
public enum State { INIT, RUNNING, BLOCKED, SLEEP, CLOSED }
/** /**
* This special log position means that the log entry has been written. * This special log position means that the log entry has been written.
*/ */
...@@ -158,6 +159,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -158,6 +159,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
private ArrayList<Value> temporaryLobs; private ArrayList<Value> temporaryLobs;
private Transaction transaction; private Transaction transaction;
private State state = State.INIT;
private long startStatement = -1; private long startStatement = -1;
public Session(Database database, User user, int id) { public Session(Database database, User user, int id) {
...@@ -167,10 +169,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -167,10 +169,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
this.undoLog = new UndoLog(this); this.undoLog = new UndoLog(this);
this.user = user; this.user = user;
this.id = id; this.id = id;
Setting setting = database.findSetting( this.lockTimeout = database.getLockTimeout();
SetTypes.getTypeName(SetTypes.DEFAULT_LOCK_TIMEOUT));
this.lockTimeout = setting == null ?
Constants.INITIAL_LOCK_TIMEOUT : setting.getIntValue();
this.currentSchemaName = Constants.SCHEMA_MAIN; this.currentSchemaName = Constants.SCHEMA_MAIN;
this.columnNamerConfiguration = ColumnNamerConfiguration.getDefault(); this.columnNamerConfiguration = ColumnNamerConfiguration.getDefault();
} }
...@@ -861,6 +860,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -861,6 +860,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
@Override @Override
public void close() { public void close() {
if (!closed) { if (!closed) {
state = State.CLOSED;
try { try {
database.checkPowerOff(); database.checkPowerOff();
...@@ -874,9 +874,9 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -874,9 +874,9 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
// and we need to unlock before we call removeSession(), which might // and we need to unlock before we call removeSession(), which might
// want to take the meta lock using the system session. // want to take the meta lock using the system session.
database.unlockMeta(this); database.unlockMeta(this);
database.removeSession(this);
} finally { } finally {
closed = true; closed = true;
database.removeSession(this);
} }
} }
} }
...@@ -1212,11 +1212,15 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -1212,11 +1212,15 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
if (lastThrottle + TimeUnit.MILLISECONDS.toNanos(Constants.THROTTLE_DELAY) > time) { if (lastThrottle + TimeUnit.MILLISECONDS.toNanos(Constants.THROTTLE_DELAY) > time) {
return; return;
} }
State prevState = this.state;
lastThrottle = time + throttleNs; lastThrottle = time + throttleNs;
try { try {
this.state = State.SLEEP;
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(throttleNs)); Thread.sleep(TimeUnit.NANOSECONDS.toMillis(throttleNs));
} catch (Exception e) { } catch (Exception e) {
// ignore InterruptedException // ignore InterruptedException
} finally {
this.state = prevState;
} }
} }
...@@ -1244,6 +1248,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -1244,6 +1248,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
long now = System.nanoTime(); long now = System.nanoTime();
cancelAtNs = now + TimeUnit.MILLISECONDS.toNanos(queryTimeout); cancelAtNs = now + TimeUnit.MILLISECONDS.toNanos(queryTimeout);
} }
state = command == null ? State.SLEEP : State.RUNNING;
} }
/** /**
...@@ -1633,7 +1638,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -1633,7 +1638,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
public Value getTransactionId() { public Value getTransactionId() {
if (database.getMvStore() != null) { if (database.getMvStore() != null) {
if (transaction == null) { if (transaction == null || !transaction.hasChanges()) {
return ValueNull.INSTANCE; return ValueNull.INSTANCE;
} }
return ValueString.get(Long.toString(getTransaction().getSequenceNum())); return ValueString.get(Long.toString(getTransaction().getSequenceNum()));
...@@ -1674,14 +1679,14 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -1674,14 +1679,14 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
database.shutdownImmediately(); database.shutdownImmediately();
throw DbException.get(ErrorCode.DATABASE_IS_CLOSED); throw DbException.get(ErrorCode.DATABASE_IS_CLOSED);
} }
transaction = store.getTransactionStore().begin(this); transaction = store.getTransactionStore().begin(this, this.lockTimeout, id);
} }
startStatement = -1; startStatement = -1;
} }
return transaction; return transaction;
} }
public long getStatementSavepoint() { private long getStatementSavepoint() {
if (startStatement == -1) { if (startStatement == -1) {
startStatement = getTransaction().setSavepoint(); startStatement = getTransaction().setSavepoint();
} }
...@@ -1754,6 +1759,18 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -1754,6 +1759,18 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
tablesToAnalyze.add(table); tablesToAnalyze.add(table);
} }
public State getState() {
return getBlockingSessionId() != 0 ? State.BLOCKED : state;
}
public void setState(State state) {
this.state = state;
}
public int getBlockingSessionId() {
return transaction == null ? 0 : transaction.getBlockerId();
}
@Override @Override
public void onRollback(MVMap<Object, VersionedValue> map, Object key, public void onRollback(MVMap<Object, VersionedValue> map, Object key,
VersionedValue existingValue, VersionedValue existingValue,
......
...@@ -453,10 +453,10 @@ public class JdbcConnection extends TraceObject ...@@ -453,10 +453,10 @@ public class JdbcConnection extends TraceObject
debugCode("setAutoCommit(" + autoCommit + ");"); debugCode("setAutoCommit(" + autoCommit + ");");
} }
checkClosed(); checkClosed();
if (autoCommit && !session.getAutoCommit()) {
commit();
}
synchronized (session) { synchronized (session) {
if (autoCommit && !session.getAutoCommit()) {
commit();
}
session.setAutoCommit(autoCommit); session.setAutoCommit(autoCommit);
} }
} catch (Exception e) { } catch (Exception e) {
......
...@@ -150,7 +150,7 @@ public class Cursor<K, V> implements Iterator<K> { ...@@ -150,7 +150,7 @@ public class Cursor<K, V> implements Iterator<K> {
* @param p the page to start from * @param p the page to start from
* @param key the key to search, null means search for the first key * @param key the key to search, null means search for the first key
*/ */
public static CursorPos traverseDown(Page p, Object key) { private static CursorPos traverseDown(Page p, Object key) {
CursorPos cursorPos = null; CursorPos cursorPos = null;
while (!p.isLeaf()) { while (!p.isLeaf()) {
assert p.getKeyCount() > 0; assert p.getKeyCount() > 0;
......
...@@ -101,6 +101,11 @@ public final class DataUtils { ...@@ -101,6 +101,11 @@ public final class DataUtils {
*/ */
public static final int ERROR_TRANSACTION_TOO_BIG = 104; public static final int ERROR_TRANSACTION_TOO_BIG = 104;
/**
* Deadlock discovered and one of transactions involved chosen as victim and rolled back.
*/
public static final int ERROR_TRANSACTIONS_DEADLOCK = 105;
/** /**
* The type for leaf page. * The type for leaf page.
*/ */
......
...@@ -131,6 +131,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -131,6 +131,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/ */
@Override @Override
public V put(K key, V value) { public V put(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null");
return put(key, value, DecisionMaker.PUT); return put(key, value, DecisionMaker.PUT);
} }
...@@ -142,7 +143,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -142,7 +143,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the old value if the key existed, or null otherwise * @return the old value if the key existed, or null otherwise
*/ */
public final V put(K key, V value, DecisionMaker<? super V> decisionMaker) { public final V put(K key, V value, DecisionMaker<? super V> decisionMaker) {
DataUtils.checkArgument(value != null, "The value may not be null");
return operate(key, value, decisionMaker); return operate(key, value, decisionMaker);
} }
...@@ -1382,7 +1382,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1382,7 +1382,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
} }
public enum Decision { ABORT, REMOVE, PUT } public enum Decision { ABORT, REMOVE, PUT, REPEAT }
/** /**
* Class DecisionMaker provides callback interface (and should become a such in Java 8) * Class DecisionMaker provides callback interface (and should become a such in Java 8)
...@@ -1520,6 +1520,9 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1520,6 +1520,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
boolean needUnlock = false; boolean needUnlock = false;
try { try {
switch (decision) { switch (decision) {
case REPEAT:
decisionMaker.reset();
continue;
case ABORT: case ABORT:
if(rootReference != getRoot()) { if(rootReference != getRoot()) {
decisionMaker.reset(); decisionMaker.reset();
...@@ -1528,6 +1531,10 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1528,6 +1531,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return result; return result;
case REMOVE: { case REMOVE: {
if (index < 0) { if (index < 0) {
if(rootReference != getRoot()) {
decisionMaker.reset();
continue;
}
return null; return null;
} }
if (attempt > 2 && !(needUnlock = lockRoot(decisionMaker, rootReference, if (attempt > 2 && !(needUnlock = lockRoot(decisionMaker, rootReference,
......
...@@ -115,12 +115,18 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -115,12 +115,18 @@ public class MVPrimaryIndex extends BaseIndex {
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, Value> map = getMap(session);
Value key = ValueLong.get(row.getKey()); Value key = ValueLong.get(row.getKey());
try { try {
if (map.put(key, ValueArray.get(row.getValueList())) != null) { Value oldValue = map.putIfAbsent(key, ValueArray.get(row.getValueList()));
if (oldValue != null) {
String sql = "PRIMARY KEY ON " + table.getSQL(); String sql = "PRIMARY KEY ON " + table.getSQL();
if (mainIndexColumn >= 0 && mainIndexColumn < indexColumns.length) { if (mainIndexColumn >= 0 && mainIndexColumn < indexColumns.length) {
sql += "(" + indexColumns[mainIndexColumn].getSQL() + ")"; sql += "(" + indexColumns[mainIndexColumn].getSQL() + ")";
} }
DbException e = DbException.get(ErrorCode.DUPLICATE_KEY_1, sql); int errorCode = ErrorCode.CONCURRENT_UPDATE_1;
if (map.get(key) != null) {
// committed
errorCode = ErrorCode.DUPLICATE_KEY_1;
}
DbException e = DbException.get(errorCode, sql + " " + oldValue);
e.setSource(this); e.setSource(this);
throw e; throw e;
} }
...@@ -156,6 +162,18 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -156,6 +162,18 @@ public class MVPrimaryIndex extends BaseIndex {
} }
} }
public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
TransactionMap<Value, Value> map = getMap(session);
for (Row row : rowsForUpdate) {
long key = row.getKey();
try {
map.lock(ValueLong.get(key));
} catch (IllegalStateException ex) {
throw mvTable.convertException(ex);
}
}
}
@Override @Override
public Cursor find(Session session, SearchRow first, SearchRow last) { public Cursor find(Session session, SearchRow first, SearchRow last) {
ValueLong min, max; ValueLong min, max;
...@@ -410,5 +428,4 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -410,5 +428,4 @@ public class MVPrimaryIndex extends BaseIndex {
} }
} }
} }
...@@ -26,7 +26,6 @@ import org.h2.engine.SysProperties; ...@@ -26,7 +26,6 @@ import org.h2.engine.SysProperties;
import org.h2.index.Cursor; import org.h2.index.Cursor;
import org.h2.index.Index; import org.h2.index.Index;
import org.h2.index.IndexType; import org.h2.index.IndexType;
import org.h2.index.MultiVersionIndex;
import org.h2.message.DbException; import org.h2.message.DbException;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.mvstore.DataUtils; import org.h2.mvstore.DataUtils;
...@@ -708,7 +707,11 @@ public class MVTable extends TableBase { ...@@ -708,7 +707,11 @@ public class MVTable extends TableBase {
index.remove(session, row); index.remove(session, row);
} }
} catch (Throwable e) { } catch (Throwable e) {
t.rollbackToSavepoint(savepoint); try {
t.rollbackToSavepoint(savepoint);
} catch (Throwable nested) {
e.addSuppressed(nested);
}
throw DbException.convert(e); throw DbException.convert(e);
} }
analyzeIfRequired(session); analyzeIfRequired(session);
...@@ -734,26 +737,21 @@ public class MVTable extends TableBase { ...@@ -734,26 +737,21 @@ public class MVTable extends TableBase {
index.add(session, row); index.add(session, row);
} }
} catch (Throwable e) { } catch (Throwable e) {
t.rollbackToSavepoint(savepoint); try {
DbException de = DbException.convert(e); t.rollbackToSavepoint(savepoint);
if (de.getErrorCode() == ErrorCode.DUPLICATE_KEY_1) { } catch (Throwable nested) {
for (Index index : indexes) { e.addSuppressed(nested);
if (index.getIndexType().isUnique() &&
index instanceof MultiVersionIndex) {
MultiVersionIndex mv = (MultiVersionIndex) index;
if (mv.isUncommittedFromOtherSession(session, row)) {
throw DbException.get(
ErrorCode.CONCURRENT_UPDATE_1,
index.getName());
}
}
}
} }
throw de; throw DbException.convert(e);
} }
analyzeIfRequired(session); analyzeIfRequired(session);
} }
@Override
public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
primaryIndex.lockRows(session, rowsForUpdate);
}
private void analyzeIfRequired(Session session) { private void analyzeIfRequired(Session session) {
synchronized (this) { synchronized (this) {
if (nextAnalyze == 0 || nextAnalyze > changesSinceAnalyze++) { if (nextAnalyze == 0 || nextAnalyze > changesSinceAnalyze++) {
...@@ -919,12 +917,15 @@ public class MVTable extends TableBase { ...@@ -919,12 +917,15 @@ public class MVTable extends TableBase {
* @return the database exception * @return the database exception
*/ */
DbException convertException(IllegalStateException e) { DbException convertException(IllegalStateException e) {
if (DataUtils.getErrorCode(e.getMessage()) == int errorCode = DataUtils.getErrorCode(e.getMessage());
DataUtils.ERROR_TRANSACTION_LOCKED) { if (errorCode == DataUtils.ERROR_TRANSACTION_LOCKED) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1,
e, getName()); e, getName());
} }
if (errorCode == DataUtils.ERROR_TRANSACTIONS_DEADLOCK) {
throw DbException.get(ErrorCode.DEADLOCK_1,
e, getName());
}
return store.convertIllegalStateException(e); return store.convertIllegalStateException(e);
} }
} }
...@@ -165,7 +165,7 @@ public class MVTableEngine implements TableEngine { ...@@ -165,7 +165,7 @@ public class MVTableEngine implements TableEngine {
} }
this.transactionStore = new TransactionStore( this.transactionStore = new TransactionStore(
store, store,
new ValueDataType(db.getCompareMode(), db, null)); new ValueDataType(db.getCompareMode(), db, null), db.getLockTimeout());
// transactionStore.init(); // transactionStore.init();
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
throw convertIllegalStateException(e); throw convertIllegalStateException(e);
......
...@@ -180,6 +180,7 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> { ...@@ -180,6 +180,7 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
result = index < 0 ? null : (V)p.getValue(index); result = index < 0 ? null : (V)p.getValue(index);
Decision decision = decisionMaker.decide(result, value); Decision decision = decisionMaker.decide(result, value);
switch (decision) { switch (decision) {
case REPEAT: break;
case ABORT: break; case ABORT: break;
case REMOVE: case REMOVE:
if(index >= 0) { if(index >= 0) {
......
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore.tx;
import org.h2.mvstore.MVMap;
/**
* Class CommitDecisionMaker makes a decision during post-commit processing
* about how to transform uncommitted map entry into committed one,
* based on undo log information.
*
* @author <a href='mailto:andrei.tokar@gmail.com'>Andrei Tokar</a>
*/
final class CommitDecisionMaker extends MVMap.DecisionMaker<VersionedValue> {
private long undoKey;
private MVMap.Decision decision;
void setUndoKey(long undoKey) {
this.undoKey = undoKey;
reset();
}
@Override
public MVMap.Decision decide(VersionedValue existingValue, VersionedValue providedValue) {
assert decision == null;
if (existingValue == null ||
// map entry was treated as already committed, and then
// it has been removed by another transaction (commited and closed by now )
existingValue.getOperationId() != undoKey) {
// this is not a final undo log entry for this key,
// or map entry was treated as already committed and then
// overwritten by another transaction
// see TxDecisionMaker.decide()
decision = MVMap.Decision.ABORT;
} else /* this is final undo log entry for this key */ if (existingValue.value == null) {
decision = MVMap.Decision.REMOVE;
} else {
decision = MVMap.Decision.PUT;
}
return decision;
}
@SuppressWarnings("unchecked")
@Override
public VersionedValue selectValue(VersionedValue existingValue, VersionedValue providedValue) {
assert decision == MVMap.Decision.PUT;
assert existingValue != null;
return new VersionedValue(0L, existingValue.value);
}
@Override
public void reset() {
decision = null;
}
@Override
public String toString() {
return "commit " + TransactionStore.getTransactionId(undoKey);
}
}
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore.tx;
import org.h2.mvstore.MVMap;
/**
* Class RollbackDecisionMaker process undo log record during transaction rollback.
*
* @author <a href='mailto:andrei.tokar@gmail.com'>Andrei Tokar</a>
*/
final class RollbackDecisionMaker extends MVMap.DecisionMaker<Object[]> {
private final TransactionStore store;
private final long transactionId;
private final long toLogId;
private final TransactionStore.RollbackListener listener;
private MVMap.Decision decision;
RollbackDecisionMaker(TransactionStore store, long transactionId, long toLogId,
TransactionStore.RollbackListener listener) {
this.store = store;
this.transactionId = transactionId;
this.toLogId = toLogId;
this.listener = listener;
}
@Override
public MVMap.Decision decide(Object[] existingValue, Object[] providedValue) {
assert decision == null;
assert existingValue != null;
VersionedValue valueToRestore = (VersionedValue) existingValue[2];
long operationId;
if (valueToRestore == null ||
(operationId = valueToRestore.getOperationId()) == 0 ||
TransactionStore.getTransactionId(operationId) == transactionId
&& TransactionStore.getLogId(operationId) < toLogId) {
int mapId = (Integer) existingValue[0];
MVMap<Object, VersionedValue> map = store.openMap(mapId);
if (map != null && !map.isClosed()) {
Object key = existingValue[1];
VersionedValue previousValue = map.operate(key, valueToRestore, MVMap.DecisionMaker.DEFAULT);
listener.onRollback(map, key, previousValue, valueToRestore);
}
}
decision = MVMap.Decision.REMOVE;
return decision;
}
@Override
public void reset() {
decision = null;
}
@Override
public String toString() {
return "rollback-" + transactionId;
}
}
...@@ -106,19 +106,60 @@ public class Transaction { ...@@ -106,19 +106,60 @@ public class Transaction {
*/ */
private final AtomicLong statusAndLogId; private final AtomicLong statusAndLogId;
/**
* Reference to a counter for an earliest store version used by this transaction.
* Referenced version and all newer ones can not be discarded
* at least until this transaction ends.
*/
private MVStore.TxCounter txCounter; private MVStore.TxCounter txCounter;
/**
* Transaction name.
*/
private String name; private String name;
/**
* Indicates whether this transaction was stored in preparedTransactions map
*/
boolean wasStored; boolean wasStored;
/**
* How long to wait for blocking transaction to commit or rollback.
*/
final int timeoutMillis;
/**
* Identification of the owner of this transaction,
* usually the owner is a database session.
*/
private final int ownerId;
/**
* Blocking transaction, if any
*/
private volatile Transaction blockingTransaction;
/**
* Map on which this transaction is blocked.
*/
MVMap<?,VersionedValue> blockingMap;
/**
* Key in blockingMap on which this transaction is blocked.
*/
Object blockingKey;
Transaction(TransactionStore store, int transactionId, long sequenceNum, int status, Transaction(TransactionStore store, int transactionId, long sequenceNum, int status,
String name, long logId, TransactionStore.RollbackListener listener) { String name, long logId, int timeoutMillis, int ownerId,
TransactionStore.RollbackListener listener) {
this.store = store; this.store = store;
this.transactionId = transactionId; this.transactionId = transactionId;
this.sequenceNum = sequenceNum; this.sequenceNum = sequenceNum;
this.statusAndLogId = new AtomicLong(composeState(status, logId, false)); this.statusAndLogId = new AtomicLong(composeState(status, logId, false));
this.name = name; this.name = name;
this.timeoutMillis = timeoutMillis;
this.ownerId = ownerId;
this.listener = listener; this.listener = listener;
} }
...@@ -201,6 +242,10 @@ public class Transaction { ...@@ -201,6 +242,10 @@ public class Transaction {
return name; return name;
} }
public int getBlockerId() {
return blockingTransaction == null ? 0 : blockingTransaction.ownerId;
}
/** /**
* Create a new savepoint. * Create a new savepoint.
* *
...@@ -217,8 +262,8 @@ public class Transaction { ...@@ -217,8 +262,8 @@ public class Transaction {
public void markStatementEnd() { public void markStatementEnd() {
MVStore.TxCounter counter = txCounter; MVStore.TxCounter counter = txCounter;
txCounter = null;
if(counter != null) { if(counter != null) {
txCounter = null;
store.store.deregisterVersionUsage(counter); store.store.deregisterVersionUsage(counter);
} }
} }
...@@ -230,7 +275,7 @@ public class Transaction { ...@@ -230,7 +275,7 @@ public class Transaction {
* @param key the key * @param key the key
* @param oldValue the old value * @param oldValue the old value
*/ */
void log(int mapId, Object key, VersionedValue oldValue) { long log(int mapId, Object key, VersionedValue oldValue) {
long currentState = statusAndLogId.getAndIncrement(); long currentState = statusAndLogId.getAndIncrement();
long logId = getLogId(currentState); long logId = getLogId(currentState);
if (logId >= LOG_ID_LIMIT) { if (logId >= LOG_ID_LIMIT) {
...@@ -239,7 +284,10 @@ public class Transaction { ...@@ -239,7 +284,10 @@ public class Transaction {
"Transaction {0} has too many changes", "Transaction {0} has too many changes",
transactionId); transactionId);
} }
store.log(this, logId, mapId, key, oldValue); int currentStatus = getStatus(currentState);
checkOpen(currentStatus);
long undoKey = store.addUndoLogRecord(transactionId, logId, new Object[]{ mapId, key, oldValue });
return undoKey;
} }
/** /**
...@@ -254,7 +302,9 @@ public class Transaction { ...@@ -254,7 +302,9 @@ public class Transaction {
"Transaction {0} has internal error", "Transaction {0} has internal error",
transactionId); transactionId);
} }
store.logUndo(this, logId); int currentStatus = getStatus(currentState);
checkOpen(currentStatus);
store.removeUndoLogRecord(transactionId, logId);
} }
/** /**
...@@ -313,11 +363,29 @@ public class Transaction { ...@@ -313,11 +363,29 @@ public class Transaction {
*/ */
public void commit() { public void commit() {
assert store.openTransactions.get().get(transactionId); assert store.openTransactions.get().get(transactionId);
long state = setStatus(STATUS_COMMITTING); Throwable ex = null;
long logId = Transaction.getLogId(state); boolean hasChanges = false;
boolean hasChanges = hasChanges(state); try {
long state = setStatus(STATUS_COMMITTING);
store.commit(this, logId, hasChanges); hasChanges = hasChanges(state);
if (hasChanges) {
long logId = getLogId(state);
store.commit(this, logId);
}
} catch (Throwable e) {
ex = e;
throw e;
} finally {
try {
store.endTransaction(this, hasChanges);
} catch (Throwable e) {
if (ex == null) {
throw e;
} else {
ex.addSuppressed(e);
}
}
}
} }
/** /**
...@@ -341,6 +409,7 @@ public class Transaction { ...@@ -341,6 +409,7 @@ public class Transaction {
"while rollback to savepoint was in progress", "while rollback to savepoint was in progress",
transactionId); transactionId);
} }
notifyAllWaitingTransactions();
} }
} }
...@@ -376,14 +445,89 @@ public class Transaction { ...@@ -376,14 +445,89 @@ public class Transaction {
return getLogId(statusAndLogId.get()); return getLogId(statusAndLogId.get());
} }
/**
* Check whether this transaction is open.
*/
private void checkOpen(int status) {
if (status != STATUS_OPEN) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
"Transaction {0} has status {1}, not open", transactionId, status);
}
}
/** /**
* Check whether this transaction is open or prepared. * Check whether this transaction is open or prepared.
*/ */
void checkNotClosed() { void checkNotClosed() {
if (getStatus() == STATUS_CLOSED) { if (getStatus() == STATUS_CLOSED) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_CLOSED, "Transaction is closed"); DataUtils.ERROR_CLOSED, "Transaction {0} is closed", transactionId);
}
}
void closeIt() {
long lastState = setStatus(STATUS_CLOSED);
store.store.deregisterVersionUsage(txCounter);
if(hasChanges(lastState) || hasRollback(lastState)) {
notifyAllWaitingTransactions();
}
}
private synchronized void notifyAllWaitingTransactions() {
notifyAll();
}
public boolean waitFor(Transaction toWaitFor) {
if (isDeadlocked(toWaitFor)) {
StringBuilder details = new StringBuilder(String.format("Transaction %d has been chosen as a deadlock victim. Details:%n", transactionId));
for(Transaction tx = toWaitFor, nextTx; (nextTx = tx.blockingTransaction) != null; tx = nextTx) {
details.append(String.format("Transaction %d attempts to update map <%s> entry with key <%s> modified by transaction %s%n",
tx.transactionId, tx.blockingMap.getName(), tx.blockingKey, tx.blockingTransaction));
if (nextTx == this) {
details.append(String.format("Transaction %d attempts to update map <%s> entry with key <%s> modified by transaction %s%n",
transactionId, blockingMap.getName(), blockingKey, toWaitFor));
throw DataUtils.newIllegalStateException(DataUtils.ERROR_TRANSACTIONS_DEADLOCK, details.toString());
}
}
}
blockingTransaction = toWaitFor;
try {
return toWaitFor.waitForThisToEnd(timeoutMillis);
} finally {
blockingMap = null;
blockingKey = null;
blockingTransaction = null;
}
}
private boolean isDeadlocked(Transaction toWaitFor) {
for(Transaction tx = toWaitFor, nextTx;
(nextTx = tx.blockingTransaction) != null && tx.getStatus() == Transaction.STATUS_OPEN;
tx = nextTx) {
if (nextTx == this) {
return true;
}
}
return false;
}
private synchronized boolean waitForThisToEnd(int millis) {
long until = System.currentTimeMillis() + millis;
int status;
while((status = getStatus()) != STATUS_CLOSED && status != STATUS_ROLLING_BACK) {
long dur = until - System.currentTimeMillis();
if(dur <= 0) {
return false;
}
try {
wait(dur);
} catch (InterruptedException ex) {
return false;
}
} }
return true;
} }
/** /**
......
...@@ -10,9 +10,9 @@ import org.h2.mvstore.DataUtils; ...@@ -10,9 +10,9 @@ import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.Page; import org.h2.mvstore.Page;
import org.h2.mvstore.type.DataType; import org.h2.mvstore.type.DataType;
import java.util.BitSet;
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.BitSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
...@@ -99,76 +99,70 @@ public class TransactionMap<K, V> { ...@@ -99,76 +99,70 @@ public class TransactionMap<K, V> {
*/ */
public long sizeAsLong() { public long sizeAsLong() {
TransactionStore store = transaction.store; TransactionStore store = transaction.store;
store.rwLock.readLock().lock();
try {
MVMap<Long, Object[]> undo = transaction.store.undoLog;
BitSet committingTransactions; BitSet committingTransactions;
MVMap.RootReference mapRootReference; MVMap.RootReference mapRootReference;
MVMap.RootReference undoLogRootReference; MVMap.RootReference undoLogRootReference;
do { do {
committingTransactions = store.committingTransactions.get(); committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot(); mapRootReference = map.getRoot();
undoLogRootReference = store.undoLog.getRoot(); undoLogRootReference = store.undoLog.getRoot();
} while(committingTransactions != store.committingTransactions.get()); } while(committingTransactions != store.committingTransactions.get() ||
mapRootReference != map.getRoot());
Page undoRootPage = undoLogRootReference.root;
long undoLogSize = undoRootPage.getTotalCount(); Page undoRootPage = undoLogRootReference.root;
Page mapRootPage = mapRootReference.root; long undoLogSize = undoRootPage.getTotalCount();
long sizeRaw = mapRootPage.getTotalCount(); Page mapRootPage = mapRootReference.root;
if (undoLogSize == 0) { long size = mapRootPage.getTotalCount();
return sizeRaw; if (undoLogSize == 0) {
} return size;
if (undoLogSize > sizeRaw) { }
// the undo log is larger than the map - if (undoLogSize > size) {
// count the entries of the map // the undo log is larger than the map -
long size = 0; // count the entries of the map
Cursor<K, VersionedValue> cursor = map.cursor(null); size = 0;
while (cursor.hasNext()) { Cursor<K, VersionedValue> cursor = map.cursor(null);
K key = cursor.next(); while (cursor.hasNext()) {
VersionedValue data = cursor.getValue(); K key = cursor.next();
data = getValue(mapRootPage, undoRootPage, key, readLogId, data, committingTransactions); VersionedValue data = cursor.getValue();
if (data != null && data.value != null) { data = getValue(mapRootPage, undoRootPage, key, readLogId, data, committingTransactions);
size++; if (data != null && data.value != null) {
} size++;
} }
return size;
} }
// the undo log is smaller than the map - return size;
// scan the undo log and subtract invisible entries }
synchronized (undo) { // the undo log is smaller than the map -
// re-fetch in case any transaction was committed now // scan the undo log and subtract invisible entries
long size = map.sizeAsLong(); MVMap<Object, Integer> temp = store.createTempMap();
MVMap<Object, Integer> temp = transaction.store try {
.createTempMap(); Cursor<Long, Object[]> cursor = new Cursor<>(undoRootPage, null);
try { while (cursor.hasNext()) {
for (Map.Entry<Long, Object[]> e : undo.entrySet()) { cursor.next();
Object[] op = e.getValue(); Object[] op = cursor.getValue();
int m = (Integer) op[0]; int m = (int) op[0];
if (m != mapId) { if (m != mapId) {
// a different map - ignore // a different map - ignore
continue; continue;
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
K key = (K) op[1]; K key = (K) op[1];
if (get(key) == null) { VersionedValue data = map.get(mapRootPage, key);
Integer old = temp.put(key, 1); data = getValue(mapRootPage, undoRootPage, key, readLogId, data, committingTransactions);
// count each key only once (there might be if (data == null || data.value == null) {
// multiple Integer old = temp.put(key, 1);
// changes for the same key) // count each key only once (there might be
if (old == null) { // multiple
size--; // changes for the same key)
} if (old == null) {
} size--;
} }
} finally {
transaction.store.store.removeMap(temp);
} }
return size;
} }
} finally { } finally {
transaction.store.rwLock.readLock().unlock(); transaction.store.store.removeMap(temp);
} }
return size;
} }
/** /**
...@@ -181,7 +175,7 @@ public class TransactionMap<K, V> { ...@@ -181,7 +175,7 @@ public class TransactionMap<K, V> {
* @throws IllegalStateException if a lock timeout occurs * @throws IllegalStateException if a lock timeout occurs
*/ */
public V remove(K key) { public V remove(K key) {
return set(key, null); return set(key, (V)null);
} }
/** /**
...@@ -200,6 +194,36 @@ public class TransactionMap<K, V> { ...@@ -200,6 +194,36 @@ public class TransactionMap<K, V> {
return set(key, value); return set(key, value);
} }
/**
* Put the value for the given key if entry for this key does not exist.
* It is atomic equivalent of the following expression:
* contains(key) ? get(k) : put(key, value);
*
* @param key the key
* @param value the new value (not null)
* @return the old value
*/
public V putIfAbsent(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null");
TxDecisionMaker decisionMaker = new TxDecisionMaker.PutIfAbsentDecisionMaker(map.getId(), key, value, transaction);
return set(key, decisionMaker);
}
/**
* Lock row for the given key.
* <p>
* If the row is locked, this method will retry until the row could be
* updated or until a lock timeout.
*
* @param key the key
* @return the locked value
* @throws IllegalStateException if a lock timeout occurs
*/
public V lock(K key) {
TxDecisionMaker decisionMaker = new TxDecisionMaker.LockDecisionMaker(map.getId(), key, transaction);
return set(key, decisionMaker);
}
/** /**
* Update the value for the given key, without adding an undo log entry. * Update the value for the given key, without adding an undo log entry.
* *
...@@ -207,23 +231,53 @@ public class TransactionMap<K, V> { ...@@ -207,23 +231,53 @@ public class TransactionMap<K, V> {
* @param value the value * @param value the value
* @return the old value * @return the old value
*/ */
@SuppressWarnings("unchecked")
public V putCommitted(K key, V value) { public V putCommitted(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null"); DataUtils.checkArgument(value != null, "The value may not be null");
VersionedValue newValue = new VersionedValue(0L, value); VersionedValue newValue = new VersionedValue(0L, value);
VersionedValue oldValue = map.put(key, newValue); VersionedValue oldValue = map.put(key, newValue);
return (V) (oldValue == null ? null : oldValue.value); @SuppressWarnings("unchecked")
V result = (V) (oldValue == null ? null : oldValue.value);
return result;
} }
private V set(K key, V value) { private V set(K key, V value) {
transaction.checkNotClosed(); TxDecisionMaker decisionMaker = new TxDecisionMaker.PutDecisionMaker(map.getId(), key, value, transaction);
V old = get(key); return set(key, decisionMaker);
boolean ok = trySet(key, value, false); }
if (ok) {
return old; private V set(K key, TxDecisionMaker decisionMaker) {
} TransactionStore store = transaction.store;
throw DataUtils.newIllegalStateException( Transaction blockingTransaction;
DataUtils.ERROR_TRANSACTION_LOCKED, "Entry is locked"); long sequenceNumWhenStarted;
VersionedValue result;
do {
sequenceNumWhenStarted = store.openTransactions.get().getVersion();
assert transaction.getBlockerId() == 0;
// although second parameter (value) is not really used,
// since TxDecisionMaker has it embedded,
// MVRTreeMap has weird traversal logic based on it,
// and any non-null value will do
result = map.put(key, VersionedValue.DUMMY, decisionMaker);
MVMap.Decision decision = decisionMaker.getDecision();
assert decision != null;
assert decision != MVMap.Decision.REPEAT;
blockingTransaction = decisionMaker.getBlockingTransaction();
if (decision != MVMap.Decision.ABORT || blockingTransaction == null) {
transaction.blockingMap = null;
transaction.blockingKey = null;
@SuppressWarnings("unchecked")
V res = result == null ? null : (V) result.value;
return res;
}
decisionMaker.reset();
transaction.blockingMap = map;
transaction.blockingKey = key;
} while (blockingTransaction.sequenceNum > sequenceNumWhenStarted || transaction.waitFor(blockingTransaction));
throw DataUtils.newIllegalStateException(DataUtils.ERROR_TRANSACTION_LOCKED,
"Map entry <{0}> with key <{1}> and value {2} is locked by tx {3} and can not be updated by tx {4} within allocated time interval {5} ms.",
map.getName(), key, result, blockingTransaction.transactionId, transaction.transactionId, transaction.timeoutMillis);
} }
/** /**
...@@ -277,7 +331,8 @@ public class TransactionMap<K, V> { ...@@ -277,7 +331,8 @@ public class TransactionMap<K, V> {
committingTransactions = store.committingTransactions.get(); committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot(); mapRootReference = map.getRoot();
undoLogRootReference = store.undoLog.getRoot(); undoLogRootReference = store.undoLog.getRoot();
} while(committingTransactions != store.committingTransactions.get()); } while(committingTransactions != store.committingTransactions.get() ||
mapRootReference != map.getRoot());
Page mapRootPage = mapRootReference.root; Page mapRootPage = mapRootReference.root;
current = map.get(mapRootPage, key); current = map.get(mapRootPage, key);
...@@ -302,54 +357,17 @@ public class TransactionMap<K, V> { ...@@ -302,54 +357,17 @@ public class TransactionMap<K, V> {
return false; return false;
} }
} }
} else {
current = map.get(key);
} }
try {
VersionedValue newValue = new VersionedValue( set(key, value);
TransactionStore.getOperationId(transaction.transactionId, transaction.getLogId()),
value);
if (current == null) {
// a new value
transaction.log(mapId, key, current);
VersionedValue old = map.putIfAbsent(key, newValue);
if (old != null) {
transaction.logUndo();
return false;
}
return true;
}
long id = current.operationId;
if (id == 0) {
// committed
transaction.log(mapId, key, current);
// the transaction is committed:
// overwrite the value
if (!map.replace(key, current, newValue)) {
// somebody else was faster
transaction.logUndo();
return false;
}
return true;
}
int tx = TransactionStore.getTransactionId(current.operationId);
if (tx == transaction.transactionId) {
// added or updated by this transaction
transaction.log(mapId, key, current);
if (!map.replace(key, current, newValue)) {
// strange, somebody overwrote the value
// even though the change was not committed
transaction.logUndo();
return false;
}
return true; return true;
} catch (IllegalStateException e) {
return false;
} }
// the transaction is not yet committed
return false;
} }
/** /**
* Get the value for the given key at the time when this map was opened. * Get the effective value for the given key.
* *
* @param key the key * @param key the key
* @return the value or null * @return the value or null
...@@ -400,24 +418,20 @@ public class TransactionMap<K, V> { ...@@ -400,24 +418,20 @@ public class TransactionMap<K, V> {
private VersionedValue getValue(K key, long maxLog) { private VersionedValue getValue(K key, long maxLog) {
TransactionStore store = transaction.store; TransactionStore store = transaction.store;
store.rwLock.readLock().lock(); BitSet committingTransactions;
try { MVMap.RootReference mapRootReference;
BitSet committingTransactions; MVMap.RootReference undoLogRootReference;
MVMap.RootReference mapRootReference; do {
MVMap.RootReference undoLogRootReference; committingTransactions = store.committingTransactions.get();
do { mapRootReference = map.getRoot();
committingTransactions = store.committingTransactions.get(); undoLogRootReference = store.undoLog.getRoot();
mapRootReference = map.getRoot(); } while(committingTransactions != store.committingTransactions.get() ||
undoLogRootReference = store.undoLog.getRoot(); mapRootReference != map.getRoot());
} while(committingTransactions != store.committingTransactions.get());
Page undoRootPage = undoLogRootReference.root; Page undoRootPage = undoLogRootReference.root;
Page mapRootPage = mapRootReference.root; Page mapRootPage = mapRootReference.root;
VersionedValue data = map.get(mapRootPage, key); VersionedValue data = map.get(mapRootPage, key);
return getValue(mapRootPage, undoRootPage, key, maxLog, data, store.committingTransactions.get()); return getValue(mapRootPage, undoRootPage, key, maxLog, data, committingTransactions);
} finally {
store.rwLock.readLock().unlock();
}
} }
/** /**
...@@ -666,7 +680,7 @@ public class TransactionMap<K, V> { ...@@ -666,7 +680,7 @@ public class TransactionMap<K, V> {
@Override @Override
public void remove() { public void remove() {
throw DataUtils.newUnsupportedOperationException( throw DataUtils.newUnsupportedOperationException(
"Removing is not supported"); "Removal is not supported");
} }
}; };
} }
...@@ -776,7 +790,7 @@ public class TransactionMap<K, V> { ...@@ -776,7 +790,7 @@ public class TransactionMap<K, V> {
@Override @Override
public final void remove() { public final void remove() {
throw DataUtils.newUnsupportedOperationException( throw DataUtils.newUnsupportedOperationException(
"Removing is not supported"); "Removal is not supported");
} }
} }
} }
...@@ -13,7 +13,6 @@ import java.util.Iterator; ...@@ -13,7 +13,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray; import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.h2.mvstore.DataUtils; import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore; import org.h2.mvstore.MVStore;
...@@ -31,11 +30,16 @@ public class TransactionStore { ...@@ -31,11 +30,16 @@ public class TransactionStore {
*/ */
final MVStore store; final MVStore store;
/**
* Default blocked transaction timeout
*/
private final int timeoutMillis;
/** /**
* The persisted map of prepared transactions. * The persisted map of prepared transactions.
* Key: transactionId, value: [ status, name ]. * Key: transactionId, value: [ status, name ].
*/ */
final MVMap<Integer, Object[]> preparedTransactions; private final MVMap<Integer, Object[]> preparedTransactions;
/** /**
* The undo log. * The undo log.
...@@ -49,12 +53,6 @@ public class TransactionStore { ...@@ -49,12 +53,6 @@ public class TransactionStore {
*/ */
final MVMap<Long, Object[]> undoLog; final MVMap<Long, Object[]> undoLog;
/**
* the reader/writer lock for the undo-log. Allows us to process multiple
* selects in parallel.
*/
final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
/** /**
* The map of maps. * The map of maps.
*/ */
...@@ -115,7 +113,7 @@ public class TransactionStore { ...@@ -115,7 +113,7 @@ public class TransactionStore {
* @param store the store * @param store the store
*/ */
public TransactionStore(MVStore store) { public TransactionStore(MVStore store) {
this(store, new ObjectDataType()); this(store, new ObjectDataType(), 0);
} }
/** /**
...@@ -123,10 +121,12 @@ public class TransactionStore { ...@@ -123,10 +121,12 @@ public class TransactionStore {
* *
* @param store the store * @param store the store
* @param dataType the data type for map keys and values * @param dataType the data type for map keys and values
* @param timeoutMillis lock aquisition timeout in milliseconds, 0 means no wait
*/ */
public TransactionStore(MVStore store, DataType dataType) { public TransactionStore(MVStore store, DataType dataType, int timeoutMillis) {
this.store = store; this.store = store;
this.dataType = dataType; this.dataType = dataType;
this.timeoutMillis = timeoutMillis;
preparedTransactions = store.openMap("openTransactions", preparedTransactions = store.openMap("openTransactions",
new MVMap.Builder<Integer, Object[]>()); new MVMap.Builder<Integer, Object[]>());
DataType oldValueType = new VersionedValue.Type(dataType); DataType oldValueType = new VersionedValue.Type(dataType);
...@@ -162,39 +162,34 @@ public class TransactionStore { ...@@ -162,39 +162,34 @@ public class TransactionStore {
store.removeMap(temp); store.removeMap(temp);
} }
} }
rwLock.writeLock().lock(); if (!undoLog.isEmpty()) {
try { Long key = undoLog.firstKey();
if (!undoLog.isEmpty()) { while (key != null) {
Long key = undoLog.firstKey(); int transactionId = getTransactionId(key);
while (key != null) { if (!openTransactions.get().get(transactionId)) {
int transactionId = getTransactionId(key); Object[] data = preparedTransactions.get(transactionId);
if (!openTransactions.get().get(transactionId)) { int status;
Object[] data = preparedTransactions.get(transactionId); String name;
int status; if (data == null) {
String name; if (undoLog.containsKey(getOperationId(transactionId, 0))) {
if (data == null) { status = Transaction.STATUS_OPEN;
if (undoLog.containsKey(getOperationId(transactionId, 0))) {
status = Transaction.STATUS_OPEN;
} else {
status = Transaction.STATUS_COMMITTING;
}
name = null;
} else { } else {
status = (Integer) data[0]; status = Transaction.STATUS_COMMITTING;
name = (String) data[1];
} }
long nextTxUndoKey = getOperationId(transactionId + 1, 0); name = null;
Long lastUndoKey = undoLog.lowerKey(nextTxUndoKey); } else {
assert lastUndoKey != null; status = (Integer) data[0];
assert getTransactionId(lastUndoKey) == transactionId; name = (String) data[1];
long logId = getLogId(lastUndoKey) + 1;
registerTransaction(transactionId, status, name, logId, listener);
key = undoLog.ceilingKey(nextTxUndoKey);
} }
long nextTxUndoKey = getOperationId(transactionId + 1, 0);
Long lastUndoKey = undoLog.lowerKey(nextTxUndoKey);
assert lastUndoKey != null;
assert getTransactionId(lastUndoKey) == transactionId;
long logId = getLogId(lastUndoKey) + 1;
registerTransaction(transactionId, status, name, logId, timeoutMillis, 0, listener);
key = undoLog.ceilingKey(nextTxUndoKey);
} }
} }
} finally {
rwLock.writeLock().unlock();
} }
init = true; init = true;
} }
...@@ -270,23 +265,18 @@ public class TransactionStore { ...@@ -270,23 +265,18 @@ public class TransactionStore {
if(!init) { if(!init) {
init(); init();
} }
rwLock.readLock().lock(); ArrayList<Transaction> list = new ArrayList<>();
try { int transactionId = 0;
ArrayList<Transaction> list = new ArrayList<>(); BitSet bitSet = openTransactions.get();
int transactionId = 0; while((transactionId = bitSet.nextSetBit(transactionId + 1)) > 0) {
BitSet bitSet = openTransactions.get(); Transaction transaction = getTransaction(transactionId);
while((transactionId = bitSet.nextSetBit(transactionId + 1)) > 0) { if(transaction != null) {
Transaction transaction = getTransaction(transactionId); if(transaction.getStatus() != Transaction.STATUS_CLOSED) {
if(transaction != null) { list.add(transaction);
if(transaction.getStatus() != Transaction.STATUS_CLOSED) {
list.add(transaction);
}
} }
} }
return list;
} finally {
rwLock.readLock().unlock();
} }
return list;
} }
/** /**
...@@ -302,23 +292,28 @@ public class TransactionStore { ...@@ -302,23 +292,28 @@ public class TransactionStore {
* @return the transaction * @return the transaction
*/ */
public Transaction begin() { public Transaction begin() {
return begin(RollbackListener.NONE); return begin(RollbackListener.NONE, timeoutMillis, 0);
} }
/** /**
* Begin a new transaction. * Begin a new transaction.
* @param listener to be notified in case of a rollback * @param listener to be notified in case of a rollback
* * @param timeoutMillis to wait for a blocking transaction
* @param ownerId of the owner (Session?) to be reported by getBlockerId
* @return the transaction * @return the transaction
*/ */
public Transaction begin(RollbackListener listener) { public Transaction begin(RollbackListener listener, int timeoutMillis, int ownerId) {
Transaction transaction = registerTransaction(0, Transaction.STATUS_OPEN, null, 0, listener);
if(timeoutMillis <= 0) {
timeoutMillis = this.timeoutMillis;
}
Transaction transaction = registerTransaction(0, Transaction.STATUS_OPEN, null, 0,
timeoutMillis, ownerId, listener);
return transaction; return transaction;
} }
private Transaction registerTransaction(int txId, int status, String name, long logId, private Transaction registerTransaction(int txId, int status, String name, long logId,
RollbackListener listener) { int timeoutMillis, int ownerId, RollbackListener listener) {
int transactionId; int transactionId;
long sequenceNo; long sequenceNo;
boolean success; boolean success;
...@@ -343,7 +338,8 @@ public class TransactionStore { ...@@ -343,7 +338,8 @@ public class TransactionStore {
success = openTransactions.compareAndSet(original, clone); success = openTransactions.compareAndSet(original, clone);
} while(!success); } while(!success);
Transaction transaction = new Transaction(this, transactionId, sequenceNo, status, name, logId, listener); Transaction transaction = new Transaction(this, transactionId, sequenceNo, status, name, logId,
timeoutMillis, ownerId, listener);
assert transactions.get(transactionId) == null; assert transactions.get(transactionId) == null;
transactions.set(transactionId, transaction); transactions.set(transactionId, transaction);
...@@ -366,55 +362,41 @@ public class TransactionStore { ...@@ -366,55 +362,41 @@ public class TransactionStore {
} }
/** /**
* Log an entry. * Add an undo log entry.
* *
* @param t the transaction * @param transactionId id of the transaction
* @param logId the log id * @param logId sequential number of the log record within transaction
* @param mapId the map id * @param undoLogRecord Object[mapId, key, previousValue]
* @param key the key */
* @param oldValue the old value long addUndoLogRecord(int transactionId, long logId, Object[] undoLogRecord) {
*/ Long undoKey = getOperationId(transactionId, logId);
long log(Transaction t, long logId, int mapId, if (logId == 0) {
Object key, Object oldValue) { if (undoLog.containsKey(undoKey)) {
Long undoKey = getOperationId(t.getId(), logId); throw DataUtils.newIllegalStateException(
Object[] log = { mapId, key, oldValue }; DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS,
rwLock.writeLock().lock(); "An old transaction with the same id " +
try { "is still open: {0}",
if (logId == 0) { transactionId);
if (undoLog.containsKey(undoKey)) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS,
"An old transaction with the same id " +
"is still open: {0}",
t.getId());
}
} }
undoLog.put(undoKey, log);
} finally {
rwLock.writeLock().unlock();
} }
undoLog.put(undoKey, undoLogRecord);
return undoKey; return undoKey;
} }
/** /**
* Remove a log entry. * Remove a log entry.
* *
* @param t the transaction * @param transactionId id of the transaction
* @param logId the log id * @param logId sequential number of the log record within transaction
*/ */
public void logUndo(Transaction t, long logId) { public void removeUndoLogRecord(int transactionId, long logId) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(transactionId, logId);
rwLock.writeLock().lock(); Object[] old = undoLog.remove(undoKey);
try { if (old == null) {
Object[] old = undoLog.remove(undoKey); throw DataUtils.newIllegalStateException(
if (old == null) { DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
throw DataUtils.newIllegalStateException( "Transaction {0} was concurrently rolled back",
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE, transactionId);
"Transaction {0} was concurrently rolled back",
t.getId());
}
} finally {
rwLock.writeLock().unlock();
} }
} }
...@@ -435,11 +417,8 @@ public class TransactionStore { ...@@ -435,11 +417,8 @@ public class TransactionStore {
* *
* @param t the transaction * @param t the transaction
* @param maxLogId the last log id * @param maxLogId the last log id
* @param hasChanges true if there were updates within specified
* transaction (even fully rolled back),
* false if just data access
*/ */
void commit(Transaction t, long maxLogId, boolean hasChanges) { void commit(Transaction t, long maxLogId) {
if (store.isClosed()) { if (store.isClosed()) {
return; return;
} }
...@@ -448,8 +427,7 @@ public class TransactionStore { ...@@ -448,8 +427,7 @@ public class TransactionStore {
// made by this transaction, to be considered as "committed" // made by this transaction, to be considered as "committed"
flipCommittingTransactionsBit(transactionId, true); flipCommittingTransactionsBit(transactionId, true);
// TODO could synchronize on blocks (100 at a time or so) CommitDecisionMaker commitDecisionMaker = new CommitDecisionMaker();
rwLock.writeLock().lock();
try { try {
for (long logId = 0; logId < maxLogId; logId++) { for (long logId = 0; logId < maxLogId; logId++) {
Long undoKey = getOperationId(transactionId, logId); Long undoKey = getOperationId(transactionId, logId);
...@@ -468,26 +446,14 @@ public class TransactionStore { ...@@ -468,26 +446,14 @@ public class TransactionStore {
MVMap<Object, VersionedValue> map = openMap(mapId); MVMap<Object, VersionedValue> map = openMap(mapId);
if (map != null) { // might be null if map was removed later if (map != null) { // might be null if map was removed later
Object key = op[1]; Object key = op[1];
VersionedValue value = map.get(key); commitDecisionMaker.setUndoKey(undoKey);
if (value != null) { map.operate(key, null, commitDecisionMaker);
// only commit (remove/update) value if we've reached
// last undoLog entry for a given key
if (value.operationId == undoKey) {
if (value.value == null) {
map.remove(key);
} else {
map.put(key, new VersionedValue(0L, value.value));
}
}
}
} }
undoLog.remove(undoKey); undoLog.remove(undoKey);
} }
} finally { } finally {
rwLock.writeLock().unlock();
flipCommittingTransactionsBit(transactionId, false); flipCommittingTransactionsBit(transactionId, false);
} }
endTransaction(t, hasChanges);
} }
private void flipCommittingTransactionsBit(int transactionId, boolean flag) { private void flipCommittingTransactionsBit(int transactionId, boolean flag) {
...@@ -584,11 +550,13 @@ public class TransactionStore { ...@@ -584,11 +550,13 @@ public class TransactionStore {
* and amount of unsaved changes is sizable. * and amount of unsaved changes is sizable.
* *
* @param t the transaction * @param t the transaction
* @param hasChanges false for R/O tx * @param hasChanges true if transaction has done any updates
* (even if they are fully rolled back),
* false if it just performed a data access
*/ */
synchronized void endTransaction(Transaction t, boolean hasChanges) { synchronized void endTransaction(Transaction t, boolean hasChanges) {
t.closeIt();
int txId = t.transactionId; int txId = t.transactionId;
t.setStatus(Transaction.STATUS_CLOSED);
assert transactions.get(txId) == t : transactions.get(txId) + " != " + t; assert transactions.get(txId) == t : transactions.get(txId) + " != " + t;
transactions.set(txId, null); transactions.set(txId, null);
...@@ -637,41 +605,12 @@ public class TransactionStore { ...@@ -637,41 +605,12 @@ public class TransactionStore {
* @param toLogId the log id to roll back to * @param toLogId the log id to roll back to
*/ */
void rollbackTo(Transaction t, long maxLogId, long toLogId) { void rollbackTo(Transaction t, long maxLogId, long toLogId) {
// TODO could synchronize on blocks (100 at a time or so) int transactionId = t.getId();
rwLock.writeLock().lock(); RollbackDecisionMaker decisionMaker = new RollbackDecisionMaker(this, transactionId, toLogId, t.listener);
try { for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
for (long logId = maxLogId - 1; logId >= toLogId; logId--) { Long undoKey = getOperationId(transactionId, logId);
Long undoKey = getOperationId(t.getId(), logId); undoLog.operate(undoKey, null, decisionMaker);
Object[] op = undoLog.get(undoKey); decisionMaker.reset();
if (op == null) {
// partially rolled back: load previous
undoKey = undoLog.floorKey(undoKey);
if (undoKey == null ||
getTransactionId(undoKey) != t.getId()) {
break;
}
logId = getLogId(undoKey) + 1;
continue;
}
int mapId = ((Integer) op[0]).intValue();
MVMap<Object, VersionedValue> map = openMap(mapId);
if (map != null) {
Object key = op[1];
VersionedValue oldValue = (VersionedValue) op[2];
VersionedValue currentValue;
if (oldValue == null) {
// this transaction added the value
currentValue = map.remove(key);
} else {
// this transaction updated the value
currentValue = map.put(key, oldValue);
}
t.listener.onRollback(map, key, currentValue, oldValue);
}
undoLog.remove(undoKey);
}
} finally {
rwLock.writeLock().unlock();
} }
} }
...@@ -692,33 +631,28 @@ public class TransactionStore { ...@@ -692,33 +631,28 @@ public class TransactionStore {
private Change current; private Change current;
private void fetchNext() { private void fetchNext() {
rwLock.writeLock().lock(); int transactionId = t.getId();
try { while (logId >= toLogId) {
int transactionId = t.getId(); Long undoKey = getOperationId(transactionId, logId);
while (logId >= toLogId) { Object[] op = undoLog.get(undoKey);
Long undoKey = getOperationId(transactionId, logId); logId--;
Object[] op = undoLog.get(undoKey); if (op == null) {
logId--; // partially rolled back: load previous
if (op == null) { undoKey = undoLog.floorKey(undoKey);
// partially rolled back: load previous if (undoKey == null ||
undoKey = undoLog.floorKey(undoKey); getTransactionId(undoKey) != transactionId) {
if (undoKey == null || break;
getTransactionId(undoKey) != transactionId) {
break;
}
logId = getLogId(undoKey);
continue;
}
int mapId = ((Integer) op[0]).intValue();
MVMap<Object, VersionedValue> m = openMap(mapId);
if (m != null) { // could be null if map was removed later on
VersionedValue oldValue = (VersionedValue) op[2];
current = new Change(m.getName(), op[1], oldValue == null ? null : oldValue.value);
return;
} }
logId = getLogId(undoKey);
continue;
}
int mapId = (int)op[0];
MVMap<Object, VersionedValue> m = openMap(mapId);
if (m != null) { // could be null if map was removed later on
VersionedValue oldValue = (VersionedValue) op[2];
current = new Change(m.getName(), op[1], oldValue == null ? null : oldValue.value);
return;
} }
} finally {
rwLock.writeLock().unlock();
} }
current = null; current = null;
} }
...@@ -894,5 +828,4 @@ public class TransactionStore { ...@@ -894,5 +828,4 @@ public class TransactionStore {
} }
} }
} }
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore.tx;
import org.h2.mvstore.MVMap;
/**
* Class TxDecisionMaker.
*
* @author <a href='mailto:andrei.tokar@gmail.com'>Andrei Tokar</a>
*/
public abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue> {
private final int mapId;
private final Object key;
final Object value;
private final Transaction transaction;
long undoKey;
private Transaction blockingTransaction;
private MVMap.Decision decision;
TxDecisionMaker(int mapId, Object key, Object value, Transaction transaction) {
this.mapId = mapId;
this.key = key;
this.value = value;
this.transaction = transaction;
}
@Override
public MVMap.Decision decide(VersionedValue existingValue, VersionedValue providedValue) {
assert decision == null;
long id;
int blockingId;
// if map does not have that entry yet
if (existingValue == null ||
// or entry is a committed one
(id = existingValue.getOperationId()) == 0 ||
// or it came from the same transaction
isThisTransaction(blockingId = TransactionStore.getTransactionId(id))) {
logIt(existingValue);
decision = MVMap.Decision.PUT;
} else if (isCommitted(blockingId)) {
// Condition above means that entry belongs to a committing transaction.
// We assume that we are looking at the final value for this transaction,
// and if it's not the case, then it will fail later,
// because a tree root has definitely been changed.
logIt(existingValue.value == null ? null : new VersionedValue(0L, existingValue.value));
decision = MVMap.Decision.PUT;
} else if(fetchTransaction(blockingId) == null) {
// condition above means transaction has been committed/rplled back and closed by now
decision = MVMap.Decision.REPEAT;
} else {
// this entry comes from a different transaction, and this transaction is not committed yet
// should wait on blockingTransaction that was determined earlier
decision = MVMap.Decision.ABORT;
}
return decision;
}
@Override
public final void reset() {
if (decision != null && decision != MVMap.Decision.ABORT && decision != MVMap.Decision.REPEAT) {
// positive decision has been made already and undo record created,
// but map was updated afterwards and undo record deletion required
transaction.logUndo();
}
blockingTransaction = null;
decision = null;
}
public final MVMap.Decision getDecision() {
return decision;
}
final Transaction getBlockingTransaction() {
return blockingTransaction;
}
final void logIt(VersionedValue value) {
undoKey = transaction.log(mapId, key, value);
}
final boolean isThisTransaction(int transactionId) {
return transactionId == transaction.transactionId;
}
final boolean isCommitted(int transactionId) {
return transaction.store.committingTransactions.get().get(transactionId);
}
final Transaction fetchTransaction(int transactionId) {
return (blockingTransaction = transaction.store.getTransaction(transactionId));
}
final MVMap.Decision setDecision(MVMap.Decision d) {
return decision = d;
}
@Override
public final String toString() {
return "txdm " + transaction.transactionId;
}
public static class PutDecisionMaker extends TxDecisionMaker
{
PutDecisionMaker(int mapId, Object key, Object value, Transaction transaction) {
super(mapId, key, value, transaction);
}
@SuppressWarnings("unchecked")
@Override
public final VersionedValue selectValue(VersionedValue existingValue, VersionedValue providedValue) {
return new VersionedValue(undoKey, value);
}
}
public static final class PutIfAbsentDecisionMaker extends PutDecisionMaker
{
PutIfAbsentDecisionMaker(int mapId, Object key, Object value, Transaction transaction) {
super(mapId, key, value, transaction);
}
@Override
public MVMap.Decision decide(VersionedValue existingValue, VersionedValue providedValue) {
assert getDecision() == null;
int blockingId;
// if map does not have that entry yet
if (existingValue == null) {
logIt(null);
return setDecision(MVMap.Decision.PUT);
} else {
long id = existingValue.getOperationId();
if (id == 0 // entry is a committed one
// or it came from the same transaction
|| isThisTransaction(blockingId = TransactionStore.getTransactionId(id))) {
if(existingValue.value != null) {
return setDecision(MVMap.Decision.ABORT);
}
logIt(existingValue);
return setDecision(MVMap.Decision.PUT);
} else if (isCommitted(blockingId) && existingValue.value == null) {
// entry belongs to a committing transaction
// and therefore will be committed soon
logIt(null);
return setDecision(MVMap.Decision.PUT);
} else if(fetchTransaction(blockingId) == null) {
// map already has specified key from uncommitted
// at the time transaction, which is closed by now
// we can retry right away
return setDecision(MVMap.Decision.REPEAT);
} else {
// map already has specified key from uncommitted transaction
// we need to wait for it to close and then try again
return setDecision(MVMap.Decision.ABORT);
}
}
}
}
public static final class LockDecisionMaker extends TxDecisionMaker
{
LockDecisionMaker(int mapId, Object key, Transaction transaction) {
super(mapId, key, null, transaction);
}
@SuppressWarnings("unchecked")
@Override
public VersionedValue selectValue(VersionedValue existingValue, VersionedValue providedValue) {
return new VersionedValue(undoKey, existingValue == null ? null : existingValue.value);
}
}
}
...@@ -16,6 +16,8 @@ import java.nio.ByteBuffer; ...@@ -16,6 +16,8 @@ import java.nio.ByteBuffer;
*/ */
public class VersionedValue { public class VersionedValue {
public static final VersionedValue DUMMY = new VersionedValue(0L, new Object());
/** /**
* The operation id. * The operation id.
*/ */
......
...@@ -506,7 +506,9 @@ public class MetaTable extends Table { ...@@ -506,7 +506,9 @@ public class MetaTable extends Table {
"SESSION_START", "SESSION_START",
"STATEMENT", "STATEMENT",
"STATEMENT_START", "STATEMENT_START",
"CONTAINS_UNCOMMITTED" "CONTAINS_UNCOMMITTED",
"STATE",
"BLOCKER_ID INT"
); );
break; break;
} }
...@@ -1078,6 +1080,8 @@ public class MetaTable extends Table { ...@@ -1078,6 +1080,8 @@ public class MetaTable extends Table {
Long.toString(fs.getWriteCount())); Long.toString(fs.getWriteCount()));
add(rows, "info.FILE_READ", add(rows, "info.FILE_READ",
Long.toString(fs.getReadCount())); Long.toString(fs.getReadCount()));
int updateFailureRatio = (int)(10000 * mvStore.getStore().getUpdateFailureRatio());
add(rows, "info.UPDATE_FAILURE_PERCENT", "" + updateFailureRatio / 100 + "." + updateFailureRatio % 100 + "%");
long size; long size;
try { try {
size = fs.getFile().size(); size = fs.getFile().size();
...@@ -1822,6 +1826,7 @@ public class MetaTable extends Table { ...@@ -1822,6 +1826,7 @@ public class MetaTable extends Table {
if (start == 0) { if (start == 0) {
start = now; start = now;
} }
int blockingSessionId = s.getBlockingSessionId();
add(rows, add(rows,
// ID // ID
Integer.toString(s.getId()), Integer.toString(s.getId()),
...@@ -1834,7 +1839,11 @@ public class MetaTable extends Table { ...@@ -1834,7 +1839,11 @@ public class MetaTable extends Table {
// STATEMENT_START // STATEMENT_START
new Timestamp(start).toString(), new Timestamp(start).toString(),
// CONTAINS_UNCOMMITTED // CONTAINS_UNCOMMITTED
Boolean.toString(s.containsUncommitted()) Boolean.toString(s.containsUncommitted()),
// STATE
String.valueOf(s.getState()),
// BLOCKER_ID INT
blockingSessionId == 0 ? null : String.valueOf(blockingSessionId)
); );
} }
} }
......
...@@ -177,6 +177,22 @@ public abstract class Table extends SchemaObjectBase { ...@@ -177,6 +177,22 @@ public abstract class Table extends SchemaObjectBase {
*/ */
public abstract void removeRow(Session session, Row row); public abstract void removeRow(Session session, Row row);
/**
* Locks rows, preventing any updated to them, except from the session specified.
*
* @param session the session
* @param rowsForUpdate rows to lock
*/
public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
for (Row row : rowsForUpdate) {
Row newRow = row.getCopy();
removeRow(session, row);
session.log(this, UndoLogRecord.DELETE, row);
addRow(session, newRow);
session.log(this, UndoLogRecord.INSERT, newRow);
}
}
/** /**
* Remove all rows from the table and indexes. * Remove all rows from the table and indexes.
* *
...@@ -493,7 +509,7 @@ public abstract class Table extends SchemaObjectBase { ...@@ -493,7 +509,7 @@ public abstract class Table extends SchemaObjectBase {
try { try {
removeRow(session, o); removeRow(session, o);
} catch (DbException e) { } catch (DbException e) {
if (e.getErrorCode() == ErrorCode.CONCURRENT_UPDATE_1) { if (e.getErrorCode() == ErrorCode.CONCURRENT_UPDATE_1 || e.getErrorCode() == ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1) {
session.rollbackTo(rollback, false); session.rollbackTo(rollback, false);
session.startStatementWithinTransaction(); session.startStatementWithinTransaction();
rollback = session.setSavepoint(); rollback = session.setSavepoint();
......
...@@ -15,7 +15,6 @@ import org.h2.command.dml.Select; ...@@ -15,7 +15,6 @@ import org.h2.command.dml.Select;
import org.h2.engine.Right; import org.h2.engine.Right;
import org.h2.engine.Session; import org.h2.engine.Session;
import org.h2.engine.SysProperties; import org.h2.engine.SysProperties;
import org.h2.engine.UndoLogRecord;
import org.h2.expression.Comparison; import org.h2.expression.Comparison;
import org.h2.expression.ConditionAndOr; import org.h2.expression.ConditionAndOr;
import org.h2.expression.Expression; import org.h2.expression.Expression;
...@@ -1158,14 +1157,8 @@ public class TableFilter implements ColumnResolver { ...@@ -1158,14 +1157,8 @@ public class TableFilter implements ColumnResolver {
* *
* @param forUpdateRows the rows to lock * @param forUpdateRows the rows to lock
*/ */
public void lockRows(ArrayList<Row> forUpdateRows) { public void lockRows(Iterable<Row> forUpdateRows) {
for (Row row : forUpdateRows) { table.lockRows(session, forUpdateRows);
Row newRow = row.getCopy();
table.removeRow(session, row);
session.log(table, UndoLogRecord.DELETE, row);
table.addRow(session, newRow);
session.log(table, UndoLogRecord.INSERT, newRow);
}
} }
public TableFilter getNestedJoin() { public TableFilter getNestedJoin() {
......
...@@ -151,7 +151,7 @@ public abstract class TestBase { ...@@ -151,7 +151,7 @@ public abstract class TestBase {
} }
} catch (Throwable e) { } catch (Throwable e) {
println("FAIL " + e.toString()); println("FAIL " + e.toString());
logError("FAIL " + e.toString(), e); logError("FAIL ("+conf+") " + e.toString(), e);
if (config.stopOnError) { if (config.stopOnError) {
throw new AssertionError("ERROR"); throw new AssertionError("ERROR");
} }
......
...@@ -51,7 +51,13 @@ public class TestIndex extends TestBase { ...@@ -51,7 +51,13 @@ public class TestIndex extends TestBase {
testHashIndexOnMemoryTable(); testHashIndexOnMemoryTable();
testErrorMessage(); testErrorMessage();
testDuplicateKeyException(); testDuplicateKeyException();
testConcurrentUpdate(); int to = config.lockTimeout;
config.lockTimeout = 50000;
try {
testConcurrentUpdate();
} finally {
config.lockTimeout = to;
}
testNonUniqueHashIndex(); testNonUniqueHashIndex();
testRenamePrimaryKey(); testRenamePrimaryKey();
testRandomized(); testRandomized();
......
...@@ -296,7 +296,7 @@ public class TestOptimizations extends TestBase { ...@@ -296,7 +296,7 @@ public class TestOptimizations extends TestBase {
assertEquals(11, rs.getInt(1)); assertEquals(11, rs.getInt(1));
assertEquals("World", rs.getString(2)); assertEquals("World", rs.getString(2));
rs.next(); rs.next();
assertEquals(21, rs.getInt(1)); assertEquals(20, rs.getInt(1));
assertEquals("Hallo", rs.getString(2)); assertEquals("Hallo", rs.getString(2));
assertFalse(rs.next()); assertFalse(rs.next());
stat.execute("drop table test"); stat.execute("drop table test");
......
...@@ -96,7 +96,11 @@ public class TestTriggersConstraints extends TestBase implements Trigger { ...@@ -96,7 +96,11 @@ public class TestTriggersConstraints extends TestBase implements Trigger {
stat.execute("update test2 set id = 3"); stat.execute("update test2 set id = 3");
task.get(); task.get();
} catch (SQLException e) { } catch (SQLException e) {
assertEquals(ErrorCode.LOCK_TIMEOUT_1, e.getErrorCode()); int errorCode = e.getErrorCode();
assertTrue(String.valueOf(errorCode),
ErrorCode.LOCK_TIMEOUT_1 == errorCode ||
ErrorCode.DEADLOCK_1 == errorCode ||
ErrorCode.COMMIT_ROLLBACK_NOT_ALLOWED == errorCode);
} }
conn2.rollback(); conn2.rollback();
conn.rollback(); conn.rollback();
......
...@@ -11,7 +11,6 @@ import java.sql.ResultSet; ...@@ -11,7 +11,6 @@ import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import org.h2.test.TestBase; import org.h2.test.TestBase;
...@@ -42,7 +41,8 @@ public class TestMvcc4 extends TestBase { ...@@ -42,7 +41,8 @@ public class TestMvcc4 extends TestBase {
} }
private void testSelectForUpdateAndUpdateConcurrency() throws SQLException { private void testSelectForUpdateAndUpdateConcurrency() throws SQLException {
Connection setup = getConnection("mvcc4"); deleteDb("mvcc4");
Connection setup = getConnection("mvcc4;MULTI_THREADED=TRUE");
setup.setAutoCommit(false); setup.setAutoCommit(false);
{ {
...@@ -82,7 +82,13 @@ public class TestMvcc4 extends TestBase { ...@@ -82,7 +82,13 @@ public class TestMvcc4 extends TestBase {
ps.executeQuery().next(); ps.executeQuery().next();
executedUpdate.countDown(); executedUpdate.countDown();
waitForThreadToBlockOnDB(mainThread); // interrogate new "blocker_id" metatable field instead of
// relying on stacktraces!? to determine when session is blocking
PreparedStatement stmt = c2.prepareStatement("SELECT * FROM INFORMATION_SCHEMA.SESSIONS WHERE BLOCKER_ID = SESSION_ID()");
ResultSet resultSet;
do {
resultSet = stmt.executeQuery();
} while(!resultSet.next());
c2.commit(); c2.commit();
c2.close(); c2.close();
...@@ -103,7 +109,7 @@ public class TestMvcc4 extends TestBase { ...@@ -103,7 +109,7 @@ public class TestMvcc4 extends TestBase {
// for lock case. // for lock case.
PreparedStatement ps = c1.prepareStatement("UPDATE test SET lastUpdated = ?"); PreparedStatement ps = c1.prepareStatement("UPDATE test SET lastUpdated = ?");
ps.setTimestamp(1, new Timestamp(System.currentTimeMillis())); ps.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
ps.executeUpdate(); assertEquals(2, ps.executeUpdate());
c1.commit(); c1.commit();
c1.close(); c1.close();
...@@ -114,44 +120,12 @@ public class TestMvcc4 extends TestBase { ...@@ -114,44 +120,12 @@ public class TestMvcc4 extends TestBase {
ps = verify.prepareStatement("SELECT COUNT(*) FROM test"); ps = verify.prepareStatement("SELECT COUNT(*) FROM test");
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
assertTrue(rs.next()); assertTrue(rs.next());
assertTrue(rs.getInt(1) == 2); assertEquals(2,rs.getInt(1));
verify.commit(); verify.commit();
verify.close(); verify.close();
setup.close(); setup.close();
} }
/**
* Wait for the given thread to block on synchronizing on the database
* object.
*
* @param t the thread
*/
void waitForThreadToBlockOnDB(Thread t) {
while (true) {
// sleep the first time through the loop so we give the main thread
// a chance
try {
Thread.sleep(20);
} catch (InterruptedException e1) {
// ignore
}
// TODO must not use getAllStackTraces, as the method names are
// implementation details
Map<Thread, StackTraceElement[]> threadMap = Thread.getAllStackTraces();
StackTraceElement[] elements = threadMap.get(t);
if (elements != null
&&
elements.length > 1 &&
(config.multiThreaded ? "sleep".equals(elements[0]
.getMethodName()) : "wait".equals(elements[0]
.getMethodName())) &&
"filterConcurrentUpdate"
.equals(elements[1].getMethodName())) {
return;
}
}
}
} }
......
...@@ -7,10 +7,9 @@ package org.h2.test.mvcc; ...@@ -7,10 +7,9 @@ package org.h2.test.mvcc;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier;
import org.h2.api.ErrorCode; import org.h2.api.ErrorCode;
import org.h2.test.TestBase; import org.h2.test.TestBase;
import org.h2.util.Task; import org.h2.util.Task;
...@@ -36,11 +35,8 @@ public class TestMvccMultiThreaded extends TestBase { ...@@ -36,11 +35,8 @@ public class TestMvccMultiThreaded extends TestBase {
} }
testConcurrentSelectForUpdate(); testConcurrentSelectForUpdate();
testMergeWithUniqueKeyViolation(); testMergeWithUniqueKeyViolation();
// not supported currently testConcurrentMerge();
if (!config.multiThreaded) { testConcurrentUpdate();
testConcurrentMerge();
testConcurrentUpdate();
}
} }
private void testConcurrentSelectForUpdate() throws Exception { private void testConcurrentSelectForUpdate() throws Exception {
...@@ -55,21 +51,11 @@ public class TestMvccMultiThreaded extends TestBase { ...@@ -55,21 +51,11 @@ public class TestMvccMultiThreaded extends TestBase {
Task task = new Task() { Task task = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
Connection conn = getConnection(getTestName()); try (Connection conn = getConnection(getTestName())) {
Statement stat = conn.createStatement(); Statement stat = conn.createStatement();
try {
while (!stop) { while (!stop) {
try { stat.execute("select * from test where id=1 for update");
stat.execute("select * from test where id=1 for update");
} catch (SQLException e) {
int errorCode = e.getErrorCode();
assertTrue(e.getMessage(),
errorCode == ErrorCode.DEADLOCK_1 ||
errorCode == ErrorCode.LOCK_TIMEOUT_1);
}
} }
} finally {
conn.close();
} }
} }
}.execute(); }.execute();
...@@ -113,7 +99,6 @@ public class TestMvccMultiThreaded extends TestBase { ...@@ -113,7 +99,6 @@ public class TestMvccMultiThreaded extends TestBase {
conn.createStatement().execute( conn.createStatement().execute(
"create table test(id int primary key, name varchar)"); "create table test(id int primary key, name varchar)");
Task[] tasks = new Task[len]; Task[] tasks = new Task[len];
final boolean[] stop = { false };
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
final Connection c = connList[i]; final Connection c = connList[i];
c.setAutoCommit(false); c.setAutoCommit(false);
...@@ -124,14 +109,12 @@ public class TestMvccMultiThreaded extends TestBase { ...@@ -124,14 +109,12 @@ public class TestMvccMultiThreaded extends TestBase {
c.createStatement().execute( c.createStatement().execute(
"merge into test values(1, 'x')"); "merge into test values(1, 'x')");
c.commit(); c.commit();
Thread.sleep(1);
} }
} }
}; };
tasks[i].execute(); tasks[i].execute();
} }
Thread.sleep(1000); Thread.sleep(1000);
stop[0] = true;
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
tasks[i].get(); tasks[i].get();
} }
...@@ -157,18 +140,24 @@ public class TestMvccMultiThreaded extends TestBase { ...@@ -157,18 +140,24 @@ public class TestMvccMultiThreaded extends TestBase {
final int count = 1000; final int count = 1000;
Task[] tasks = new Task[len]; Task[] tasks = new Task[len];
final CountDownLatch latch = new CountDownLatch(len); final CyclicBarrier barrier = new CyclicBarrier(len);
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
final int x = i; final int x = i;
// Recent changes exposed a race condition in this test itself.
// Without preliminary record locking, counter will be off.
connList[x].setAutoCommit(false);
tasks[i] = new Task() { tasks[i] = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
for (int a = 0; a < count; a++) { for (int a = 0; a < count; a++) {
ResultSet rs = connList[x].createStatement().executeQuery(
"select value from test for update");
assertTrue(rs.next());
connList[x].createStatement().execute( connList[x].createStatement().execute(
"update test set value=value+1"); "update test set value=value+1");
latch.countDown(); connList[x].commit();
latch.await(); barrier.await();
} }
} }
}; };
......
...@@ -74,8 +74,8 @@ public class TestAutoReconnect extends TestBase { ...@@ -74,8 +74,8 @@ public class TestAutoReconnect extends TestBase {
"/" + getTestName() + ";OPEN_NEW=TRUE"); "/" + getTestName() + ";OPEN_NEW=TRUE");
conn.close(); conn.close();
conn = getConnection("jdbc:h2:tcp://localhost/" + getBaseDir() + conn = getConnection("jdbc:h2:tcp://localhost:" + tcp.getPort() +
"/" + getTestName()); "/" + getBaseDir() + "/" + getTestName());
assertThrows(ErrorCode.DATABASE_ALREADY_OPEN_1, this). assertThrows(ErrorCode.DATABASE_ALREADY_OPEN_1, this).
getConnection("jdbc:h2:" + getBaseDir() + getConnection("jdbc:h2:" + getBaseDir() +
"/" + getTestName() + ";AUTO_SERVER=TRUE;OPEN_NEW=TRUE"); "/" + getTestName() + ";AUTO_SERVER=TRUE;OPEN_NEW=TRUE");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论