Unverified 提交 68f46168 authored 作者: Andrei Tokar's avatar Andrei Tokar 提交者: GitHub

Merge pull request #1496 from h2database/issue_1479

Fix for Issue 1479
......@@ -97,9 +97,14 @@ public class Delete extends Prepared {
if (table.fireRow()) {
done = table.fireBeforeRow(session, row, null);
}
if (!done) {
if (table.isMVStore()) {
done = table.lockRow(session, row) == null;
}
if (!done) {
rows.add(row);
}
}
count++;
if (limitRows >= 0 && count >= limitRows) {
break;
......
......@@ -344,6 +344,10 @@ public abstract class Query extends Prepared {
if (!cacheableChecked) {
long max = getMaxDataModificationId();
noCache = max == Long.MAX_VALUE;
if (!isEverything(ExpressionVisitor.DETERMINISTIC_VISITOR) ||
!isEverything(ExpressionVisitor.INDEPENDENT_VISITOR)) {
noCache = true;
}
cacheableChecked = true;
}
if (noCache) {
......@@ -356,18 +360,10 @@ public abstract class Query extends Prepared {
return false;
}
}
if (!isEverything(ExpressionVisitor.DETERMINISTIC_VISITOR) ||
!isEverything(ExpressionVisitor.INDEPENDENT_VISITOR)) {
return false;
}
if (db.getModificationDataId() > lastEval &&
getMaxDataModificationId() > lastEval) {
return false;
}
return true;
return getMaxDataModificationId() <= lastEval;
}
public final Value[] getParameterValues() {
private Value[] getParameterValues() {
ArrayList<Parameter> list = getParameters();
if (list == null) {
return new Value[0];
......
......@@ -176,6 +176,10 @@ public class Update extends Prepared {
if (table.fireRow()) {
done = table.fireBeforeRow(session, oldRow, newRow);
}
if (!done) {
if (table.isMVStore()) {
done = table.lockRow(session, oldRow) == null;
}
if (!done) {
rows.add(oldRow);
rows.add(newRow);
......@@ -183,6 +187,7 @@ public class Update extends Prepared {
updatedKeysCollector.add(key);
}
}
}
count++;
}
}
......
......@@ -1539,11 +1539,15 @@ public class Database implements DataHandler {
compactMode == CommandInterface.SHUTDOWN_DEFRAG ||
getSettings().defragAlways;
if (!compactFully && !mvStore.isReadOnly()) {
if (dbSettings.maxCompactTime > 0) {
try {
store.compactFile(dbSettings.maxCompactTime);
} catch (Throwable t) {
trace.error(t, "compactFile");
}
} else {
mvStore.commit();
}
}
store.close(compactFully);
}
......
......@@ -209,17 +209,28 @@ public class MVPrimaryIndex extends BaseIndex {
}
}
public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
void lockRows(Session session, Iterable<Row> rowsForUpdate) {
TransactionMap<Value, Value> map = getMap(session);
for (Row row : rowsForUpdate) {
long key = row.getKey();
lockRow(map, key);
}
}
Row lockRow(Session session, Row row) {
TransactionMap<Value, Value> map = getMap(session);
long key = row.getKey();
ValueArray array = (ValueArray) lockRow(map, key);
return array == null ? null : getRow(session, key, array);
}
private Value lockRow(TransactionMap<Value, Value> map, long key) {
try {
map.lock(ValueLong.get(key));
return map.lock(ValueLong.get(key));
} catch (IllegalStateException ex) {
throw mvTable.convertException(ex);
}
}
}
@Override
public Cursor find(Session session, SearchRow first, SearchRow last) {
......@@ -259,7 +270,10 @@ public class MVPrimaryIndex extends BaseIndex {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_IN_PRIMARY_INDEX,
getSQL(), String.valueOf(key));
}
ValueArray array = (ValueArray) v;
return getRow(session, key, (ValueArray) v);
}
public Row getRow(Session session, long key, ValueArray array) {
Row row = session.createRow(array.getList(), 0);
row.setKey(key);
return row;
......
......@@ -14,6 +14,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.api.DatabaseEventListener;
import org.h2.api.ErrorCode;
......@@ -105,7 +106,7 @@ public class MVTable extends TableBase {
private MVPrimaryIndex primaryIndex;
private final ArrayList<Index> indexes = Utils.newSmallArrayList();
private volatile long lastModificationId;
private final AtomicLong lastModificationId = new AtomicLong();
private volatile Session lockExclusiveSession;
// using a ConcurrentHashMap as a set
......@@ -661,7 +662,7 @@ public class MVTable extends TableBase {
@Override
public void removeRow(Session session, Row row) {
lastModificationId = database.getNextModificationDataId();
syncLastModificationIdWithDatabase();
Transaction t = session.getTransaction();
long savepoint = t.setSavepoint();
try {
......@@ -682,7 +683,7 @@ public class MVTable extends TableBase {
@Override
public void truncate(Session session) {
lastModificationId = database.getNextModificationDataId();
syncLastModificationIdWithDatabase();
for (int i = indexes.size() - 1; i >= 0; i--) {
Index index = indexes.get(i);
index.truncate(session);
......@@ -694,7 +695,7 @@ public class MVTable extends TableBase {
@Override
public void addRow(Session session, Row row) {
lastModificationId = database.getNextModificationDataId();
syncLastModificationIdWithDatabase();
Transaction t = session.getTransaction();
long savepoint = t.setSavepoint();
try {
......@@ -715,7 +716,7 @@ public class MVTable extends TableBase {
@Override
public void updateRow(Session session, Row oldRow, Row newRow) {
newRow.setKey(oldRow.getKey());
lastModificationId = database.getNextModificationDataId();
syncLastModificationIdWithDatabase();
Transaction t = session.getTransaction();
long savepoint = t.setSavepoint();
try {
......@@ -738,6 +739,11 @@ public class MVTable extends TableBase {
primaryIndex.lockRows(session, rowsForUpdate);
}
@Override
public Row lockRow(Session session, Row row) {
return primaryIndex.lockRow(session, row);
}
private void analyzeIfRequired(Session session) {
if (changesUntilAnalyze != null) {
if (changesUntilAnalyze.decrementAndGet() == 0) {
......@@ -777,7 +783,7 @@ public class MVTable extends TableBase {
@Override
public long getMaxDataModificationId() {
return lastModificationId;
return lastModificationId.get();
}
public boolean getContainsLargeObject() {
......@@ -890,8 +896,23 @@ public class MVTable extends TableBase {
*/
public void commit() {
if (database != null) {
lastModificationId = database.getNextModificationDataId();
}
syncLastModificationIdWithDatabase();
}
}
// Field lastModificationId can not be just a volatile, because window of opportunity
// between reading database's modification id and storing this value in the field
// could be exploited by another thread.
// Second thread may do the same with possibly bigger (already advanced) modification id,
// and when first thread finally updates the field, it will result in lastModificationId jumping back.
// This is, of course, unacceptable.
private void syncLastModificationIdWithDatabase() {
long nextModificationDataId = database.getNextModificationDataId();
long currentId;
do {
currentId = lastModificationId.get();
} while (nextModificationDataId > currentId &&
!lastModificationId.compareAndSet(currentId, nextModificationDataId));
}
/**
......
......@@ -175,7 +175,7 @@ public class Transaction {
* @param status to be set
* @return transaction state as it was before status change
*/
long setStatus(int status) {
private long setStatus(int status) {
while (true) {
long currentState = statusAndLogId.get();
long logId = getLogId(currentState);
......@@ -486,11 +486,13 @@ public class Transaction {
"Transaction %d attempts to update map <%s> entry with key <%s>"
+ " modified by transaction %s%n",
transactionId, blockingMap.getName(), blockingKey, toWaitFor));
if (isDeadlocked(toWaitFor)) {
throw DataUtils.newIllegalStateException(DataUtils.ERROR_TRANSACTIONS_DEADLOCK,
details.toString());
}
}
}
}
blockingTransaction = toWaitFor;
try {
......@@ -515,8 +517,10 @@ public class Transaction {
private synchronized boolean waitForThisToEnd(int millis) {
long until = System.currentTimeMillis() + millis;
long state;
int status;
while((status = getStatus()) != STATUS_CLOSED && status != STATUS_ROLLING_BACK) {
while((status = getStatus(state = statusAndLogId.get())) != STATUS_CLOSED
&& status != STATUS_ROLLED_BACK && !hasRollback(state)) {
long dur = until - System.currentTimeMillis();
if(dur <= 0) {
return false;
......
......@@ -49,7 +49,7 @@ abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue> {
// because a tree root has definitely been changed.
logIt(existingValue.value == null ? null : VersionedValue.getInstance(existingValue.value));
decision = MVMap.Decision.PUT;
} else if (fetchTransaction(blockingId) != null) {
} else if (getBlockingTransaction() != null) {
// this entry comes from a different transaction, and this
// transaction is not committed yet
// should wait on blockingTransaction that was determined earlier
......@@ -106,11 +106,17 @@ abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue> {
}
final boolean isCommitted(int transactionId) {
return transaction.store.committingTransactions.get().get(transactionId);
}
Transaction blockingTx;
boolean result;
do {
blockingTx = transaction.store.getTransaction(transactionId);
result = transaction.store.committingTransactions.get().get(transactionId);
} while (blockingTx != transaction.store.getTransaction(transactionId));
final Transaction fetchTransaction(int transactionId) {
return (blockingTransaction = transaction.store.getTransaction(transactionId));
if (!result) {
blockingTransaction = blockingTx;
}
return result;
}
final MVMap.Decision setDecision(MVMap.Decision d) {
......@@ -167,7 +173,7 @@ abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue> {
// and therefore will be committed soon
logIt(null);
return setDecision(MVMap.Decision.PUT);
} else if (fetchTransaction(blockingId) != null) {
} else if (getBlockingTransaction() != null) {
// this entry comes from a different transaction, and this
// transaction is not committed yet
// should wait on blockingTransaction that was determined
......@@ -206,11 +212,22 @@ abstract class TxDecisionMaker extends MVMap.DecisionMaker<VersionedValue> {
super(mapId, key, null, transaction);
}
@Override
public MVMap.Decision decide(VersionedValue existingValue, VersionedValue providedValue) {
MVMap.Decision decision = super.decide(existingValue, providedValue);
if (existingValue == null) {
assert decision == MVMap.Decision.PUT;
decision = setDecision(MVMap.Decision.REMOVE);
}
return decision;
}
@SuppressWarnings("unchecked")
@Override
public VersionedValue selectValue(VersionedValue existingValue, VersionedValue providedValue) {
assert existingValue != null; // otherwise, what's there to lock?
return VersionedValue.getInstance(undoKey, existingValue.value, existingValue.getCommittedValue());
return VersionedValue.getInstance(undoKey,
existingValue == null ? null : existingValue.value,
existingValue == null ? null : existingValue.getCommittedValue());
}
}
}
......@@ -184,12 +184,24 @@ public abstract class Table extends SchemaObjectBase {
*/
public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
for (Row row : rowsForUpdate) {
lockRow(session, row);
}
}
/**
* Locks row, preventing any updated to it, except from the session specified.
*
* @param session the session
* @param row to lock
* @return locked row, or null if row does not exist anymore
*/
public Row lockRow(Session session, Row row) {
Row newRow = row.getCopy();
removeRow(session, row);
session.log(this, UndoLogRecord.DELETE, row);
addRow(session, newRow);
session.log(this, UndoLogRecord.INSERT, newRow);
}
return row;
}
/**
......
......@@ -97,7 +97,7 @@ public abstract class TestDb extends TestBase {
}
if (config.mvStore) {
url = addOption(url, "MV_STORE", "true");
// url = addOption(url, "MVCC", "true");
url = addOption(url, "MAX_COMPACT_TIME", "0"); // to speed up tests
} else {
url = addOption(url, "MV_STORE", "false");
}
......
......@@ -41,7 +41,7 @@ public class TestDriver extends TestDb {
prop.put("password", getPassword());
prop.put("max_compact_time", "1234");
prop.put("unknown", "1234");
String url = getURL("driver", true);
String url = getURL("jdbc:h2:mem:driver", true);
Connection conn = DriverManager.getConnection(url, prop);
ResultSet rs;
rs = conn.createStatement().executeQuery(
......
......@@ -19,8 +19,8 @@ import org.h2.util.Task;
*/
public class TestConcurrentUpdate extends TestDb {
private static final int THREADS = 3;
private static final int ROW_COUNT = 10;
private static final int THREADS = 10;
private static final int ROW_COUNT = 3;
/**
* Run just this test.
......@@ -100,18 +100,17 @@ public class TestConcurrentUpdate extends TestDb {
t.execute();
}
// test 2 seconds
for (int i = 0; i < 200; i++) {
Thread.sleep(10);
Thread.sleep(2000);
boolean success = true;
for (Task t : tasks) {
if (t.isFinished()) {
i = 1000;
break;
}
t.join();
Throwable exception = t.getException();
if (exception != null) {
logError("", exception);
success = false;
}
}
for (Task t : tasks) {
t.get();
}
assert success;
}
}
}
......@@ -25,7 +25,17 @@ public class TestMultiThreaded extends TestDb {
* @param a ignored
*/
public static void main(String... a) throws Exception {
TestBase.createCaller().init().test();
org.h2.test.TestAll config = new org.h2.test.TestAll();
config.memory = true;
config.big = true;
System.out.println(config);
TestBase test = createCaller().init(config);
for (int i = 0; i < 100; i++) {
System.out.println("Pass #" + i);
test.config.beforeTest();
test.test();
test.config.afterTest();
}
}
/**
......@@ -125,11 +135,10 @@ public class TestMultiThreaded extends TestDb {
@Override
public void test() throws Exception {
deleteDb("multiThreaded");
int size = getSize(2, 4);
int size = getSize(2, 20);
Connection[] connList = new Connection[size];
for (int i = 0; i < size; i++) {
connList[i] = getConnection("multiThreaded;MULTI_THREADED=1;" +
"TRACE_LEVEL_SYSTEM_OUT=1");
connList[i] = getConnection("multiThreaded;MULTI_THREADED=1");
}
Connection conn = connList[0];
Statement stat = conn.createStatement();
......@@ -148,15 +157,9 @@ public class TestMultiThreaded extends TestDb {
trace("started " + i);
Thread.sleep(100);
}
for (int t = 0; t < 2; t++) {
Thread.sleep(1000);
for (int i = 0; i < size; i++) {
Processor p = processors[i];
if (p.getException() != null) {
throw new Exception("" + i, p.getException());
}
}
}
try {
Thread.sleep(2000);
} finally {
trace("stopping");
for (int i = 0; i < size; i++) {
Processor p = processors[i];
......@@ -164,10 +167,7 @@ public class TestMultiThreaded extends TestDb {
}
for (int i = 0; i < size; i++) {
Processor p = processors[i];
p.join(100);
if (p.getException() != null) {
throw new Exception(p.getException());
}
p.join(1000);
}
trace("close");
for (int i = 0; i < size; i++) {
......@@ -176,4 +176,16 @@ public class TestMultiThreaded extends TestDb {
deleteDb("multiThreaded");
}
boolean success = true;
for (int i = 0; i < size; i++) {
Processor p = processors[i];
p.join(10000);
Throwable exception = p.getException();
if (exception != null) {
logError("", exception);
success = false;
}
}
assert success;
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论