Unverified 提交 a4fe2e9d authored 作者: Evgenij Ryazanov's avatar Evgenij Ryazanov 提交者: GitHub

Merge pull request #954 from katzyn/undo_log

Avoid incorrect reads in iterators of TransactionMap
...@@ -118,6 +118,7 @@ public class Merge extends Prepared { ...@@ -118,6 +118,7 @@ public class Merge extends Prepared {
} }
} else { } else {
// process select data for list // process select data for list
query.setNeverLazy(true);
ResultInterface rows = query.query(0); ResultInterface rows = query.query(0);
count = 0; count = 0;
targetTable.fire(session, Trigger.UPDATE | Trigger.INSERT, true); targetTable.fire(session, Trigger.UPDATE | Trigger.INSERT, true);
......
...@@ -344,7 +344,7 @@ public class TransactionStore { ...@@ -344,7 +344,7 @@ public class TransactionStore {
} }
// TODO could synchronize on blocks (100 at a time or so) // TODO could synchronize on blocks (100 at a time or so)
rwLock.writeLock().lock(); rwLock.writeLock().lock();
int oldStatus = t.status; int oldStatus = t.getStatus();
try { try {
t.setStatus(Transaction.STATUS_COMMITTING); t.setStatus(Transaction.STATUS_COMMITTING);
for (long logId = 0; logId < maxLogId; logId++) { for (long logId = 0; logId < maxLogId; logId++) {
...@@ -372,9 +372,7 @@ public class TransactionStore { ...@@ -372,9 +372,7 @@ public class TransactionStore {
if (value.value == null) { if (value.value == null) {
map.remove(key); map.remove(key);
} else { } else {
VersionedValue v2 = new VersionedValue(); map.put(key, new VersionedValue(0L, value.value));
v2.value = value.value;
map.put(key, v2);
} }
} }
} }
...@@ -968,9 +966,10 @@ public class TransactionStore { ...@@ -968,9 +966,10 @@ public class TransactionStore {
long size = 0; long size = 0;
Cursor<K, VersionedValue> cursor = map.cursor(null); Cursor<K, VersionedValue> cursor = map.cursor(null);
while (cursor.hasNext()) { while (cursor.hasNext()) {
VersionedValue data;
K key = cursor.next(); K key = cursor.next();
data = getValue(key, readLogId, cursor.getValue()); // cursor.getValue() returns outdated value
VersionedValue data = map.get(key);
data = getValue(key, readLogId, data);
if (data != null && data.value != null) { if (data != null && data.value != null) {
size++; size++;
} }
...@@ -1053,8 +1052,7 @@ public class TransactionStore { ...@@ -1053,8 +1052,7 @@ public class TransactionStore {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public V putCommitted(K key, V value) { public V putCommitted(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null"); DataUtils.checkArgument(value != null, "The value may not be null");
VersionedValue newValue = new VersionedValue(); VersionedValue newValue = new VersionedValue(0L, value);
newValue.value = value;
VersionedValue oldValue = map.put(key, newValue); VersionedValue oldValue = map.put(key, newValue);
return (V) (oldValue == null ? null : oldValue.value); return (V) (oldValue == null ? null : oldValue.value);
} }
...@@ -1133,10 +1131,9 @@ public class TransactionStore { ...@@ -1133,10 +1131,9 @@ public class TransactionStore {
} }
} }
} }
VersionedValue newValue = new VersionedValue(); VersionedValue newValue = new VersionedValue(
newValue.operationId = getOperationId( getOperationId(transaction.transactionId, transaction.logId),
transaction.transactionId, transaction.logId); value);
newValue.value = value;
if (current == null) { if (current == null) {
// a new value // a new value
transaction.log(mapId, key, current); transaction.log(mapId, key, current);
...@@ -1533,7 +1530,8 @@ public class TransactionStore { ...@@ -1533,7 +1530,8 @@ public class TransactionStore {
if (to != null && map.getKeyType().compare(k, to) > 0) { if (to != null && map.getKeyType().compare(k, to) > 0) {
break; break;
} }
VersionedValue data = cursor.getValue(); // cursor.getValue() returns outdated value
VersionedValue data = map.get(key);
data = getValue(key, readLogId, data); data = getValue(key, readLogId, data);
if (data != null && data.value != null) { if (data != null && data.value != null) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
...@@ -1641,12 +1639,17 @@ public class TransactionStore { ...@@ -1641,12 +1639,17 @@ public class TransactionStore {
/** /**
* The operation id. * The operation id.
*/ */
public long operationId; final long operationId;
/** /**
* The value. * The value.
*/ */
public Object value; final Object value;
VersionedValue(long operationId, Object value) {
this.operationId = operationId;
this.value = value;
}
@Override @Override
public String toString() { public String toString() {
...@@ -1694,9 +1697,7 @@ public class TransactionStore { ...@@ -1694,9 +1697,7 @@ public class TransactionStore {
if (buff.get() == 0) { if (buff.get() == 0) {
// fast path (no op ids or null entries) // fast path (no op ids or null entries)
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
VersionedValue v = new VersionedValue(); obj[i] = new VersionedValue(0L, valueType.read(buff));
v.value = valueType.read(buff);
obj[i] = v;
} }
} else { } else {
// slow path (some entries may be null) // slow path (some entries may be null)
...@@ -1708,12 +1709,14 @@ public class TransactionStore { ...@@ -1708,12 +1709,14 @@ public class TransactionStore {
@Override @Override
public Object read(ByteBuffer buff) { public Object read(ByteBuffer buff) {
VersionedValue v = new VersionedValue(); long operationId = DataUtils.readVarLong(buff);
v.operationId = DataUtils.readVarLong(buff); Object value;
if (buff.get() == 1) { if (buff.get() == 1) {
v.value = valueType.read(buff); value = valueType.read(buff);
} else {
value = null;
} }
return v; return new VersionedValue(operationId, value);
} }
@Override @Override
......
...@@ -12,6 +12,7 @@ import java.sql.SQLException; ...@@ -12,6 +12,7 @@ import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList; import java.util.ArrayList;
import org.h2.jdbc.JdbcSQLException; import org.h2.jdbc.JdbcSQLException;
import org.h2.message.DbException;
import org.h2.test.TestBase; import org.h2.test.TestBase;
import org.h2.util.IOUtils; import org.h2.util.IOUtils;
...@@ -63,6 +64,9 @@ public class TestMvccMultiThreaded2 extends TestBase { ...@@ -63,6 +64,9 @@ public class TestMvccMultiThreaded2 extends TestBase {
ps.setInt(1, 1); ps.setInt(1, 1);
ps.setInt(2, 100); ps.setInt(2, 100);
ps.executeUpdate(); ps.executeUpdate();
ps.setInt(1, 2);
ps.setInt(2, 200);
ps.executeUpdate();
conn.commit(); conn.commit();
ArrayList<SelectForUpdate> threads = new ArrayList<>(); ArrayList<SelectForUpdate> threads = new ArrayList<>();
...@@ -80,9 +84,11 @@ public class TestMvccMultiThreaded2 extends TestBase { ...@@ -80,9 +84,11 @@ public class TestMvccMultiThreaded2 extends TestBase {
@SuppressWarnings("unused") @SuppressWarnings("unused")
int minProcessed = Integer.MAX_VALUE, maxProcessed = 0, totalProcessed = 0; int minProcessed = Integer.MAX_VALUE, maxProcessed = 0, totalProcessed = 0;
boolean allOk = true;
for (SelectForUpdate sfu : threads) { for (SelectForUpdate sfu : threads) {
// make sure all threads have stopped by joining with them // make sure all threads have stopped by joining with them
sfu.join(); sfu.join();
allOk &= sfu.ok;
totalProcessed += sfu.iterationsProcessed; totalProcessed += sfu.iterationsProcessed;
if (sfu.iterationsProcessed > maxProcessed) { if (sfu.iterationsProcessed > maxProcessed) {
maxProcessed = sfu.iterationsProcessed; maxProcessed = sfu.iterationsProcessed;
...@@ -102,6 +108,8 @@ public class TestMvccMultiThreaded2 extends TestBase { ...@@ -102,6 +108,8 @@ public class TestMvccMultiThreaded2 extends TestBase {
IOUtils.closeSilently(conn); IOUtils.closeSilently(conn);
deleteDb(getTestName()); deleteDb(getTestName());
assertTrue(allOk);
} }
/** /**
...@@ -111,6 +119,8 @@ public class TestMvccMultiThreaded2 extends TestBase { ...@@ -111,6 +119,8 @@ public class TestMvccMultiThreaded2 extends TestBase {
public int iterationsProcessed; public int iterationsProcessed;
public boolean ok;
SelectForUpdate() { SelectForUpdate() {
} }
...@@ -130,11 +140,20 @@ public class TestMvccMultiThreaded2 extends TestBase { ...@@ -130,11 +140,20 @@ public class TestMvccMultiThreaded2 extends TestBase {
try { try {
PreparedStatement ps = conn.prepareStatement( PreparedStatement ps = conn.prepareStatement(
"SELECT * FROM test WHERE entity_id = ? FOR UPDATE"); "SELECT * FROM test WHERE entity_id = ? FOR UPDATE");
ps.setString(1, "1"); String id;
int value;
if ((iterationsProcessed & 1) == 0) {
id = "1";
value = 100;
} else {
id = "2";
value = 200;
}
ps.setString(1, id);
ResultSet rs = ps.executeQuery(); ResultSet rs = ps.executeQuery();
assertTrue(rs.next()); assertTrue(rs.next());
assertTrue(rs.getInt(2) == 100); assertTrue(rs.getInt(2) == value);
conn.commit(); conn.commit();
iterationsProcessed++; iterationsProcessed++;
...@@ -149,11 +168,13 @@ public class TestMvccMultiThreaded2 extends TestBase { ...@@ -149,11 +168,13 @@ public class TestMvccMultiThreaded2 extends TestBase {
} }
} catch (SQLException e) { } catch (SQLException e) {
TestBase.logError("SQL error from thread "+getName(), e); TestBase.logError("SQL error from thread "+getName(), e);
throw DbException.convert(e);
} catch (Exception e) { } catch (Exception e) {
TestBase.logError("General error from thread "+getName(), e); TestBase.logError("General error from thread "+getName(), e);
throw e; throw e;
} }
IOUtils.closeSilently(conn); IOUtils.closeSilently(conn);
ok = true;
} }
} }
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论