提交 6f28f688 authored 作者: Evgenij Ryazanov's avatar Evgenij Ryazanov

Fix SELECT FOR UPDATE in MVStore mode

上级 9db9bca8
...@@ -35,7 +35,6 @@ import org.h2.result.LazyResult; ...@@ -35,7 +35,6 @@ import org.h2.result.LazyResult;
import org.h2.result.LocalResult; import org.h2.result.LocalResult;
import org.h2.result.ResultInterface; import org.h2.result.ResultInterface;
import org.h2.result.ResultTarget; import org.h2.result.ResultTarget;
import org.h2.result.Row;
import org.h2.result.SearchRow; import org.h2.result.SearchRow;
import org.h2.result.SortOrder; import org.h2.result.SortOrder;
import org.h2.table.Column; import org.h2.table.Column;
...@@ -425,6 +424,18 @@ public class Select extends Query { ...@@ -425,6 +424,18 @@ public class Select extends Query {
return count; return count;
} }
boolean isConditionMetForUpdate() {
if (isConditionMet()) {
int count = filters.size();
for (int i = 0; i < count; i++) {
TableFilter tableFilter = filters.get(i);
tableFilter.set(tableFilter.getTable().lockRow(session, tableFilter.get()));
}
return isConditionMet();
}
return false;
}
boolean isConditionMet() { boolean isConditionMet() {
return condition == null || condition.getBooleanValue(session); return condition == null || condition.getBooleanValue(session);
} }
...@@ -498,12 +509,10 @@ public class Select extends Query { ...@@ -498,12 +509,10 @@ public class Select extends Query {
long rowNumber = 0; long rowNumber = 0;
setCurrentRowNumber(0); setCurrentRowNumber(0);
int sampleSize = getSampleSizeValue(session); int sampleSize = getSampleSizeValue(session);
ArrayList<Row>[] forUpdateRows = initForUpdateRows();
while (topTableFilter.next()) { while (topTableFilter.next()) {
setCurrentRowNumber(rowNumber + 1); setCurrentRowNumber(rowNumber + 1);
if (isConditionMet()) { if (isForUpdateMvcc ? isConditionMetForUpdate() : isConditionMet()) {
rowNumber++; rowNumber++;
addForUpdateRow(forUpdateRows);
groupData.nextSource(); groupData.nextSource();
updateAgg(columnCount, stage); updateAgg(columnCount, stage);
if (sampleSize > 0 && rowNumber >= sampleSize) { if (sampleSize > 0 && rowNumber >= sampleSize) {
...@@ -511,7 +520,6 @@ public class Select extends Query { ...@@ -511,7 +520,6 @@ public class Select extends Query {
} }
} }
} }
lockForUpdateRows(forUpdateRows);
groupData.done(); groupData.done();
} }
...@@ -704,9 +712,10 @@ public class Select extends Query { ...@@ -704,9 +712,10 @@ public class Select extends Query {
limitRows = Long.MAX_VALUE; limitRows = Long.MAX_VALUE;
} }
} }
ArrayList<Row>[] forUpdateRows = initForUpdateRows();
int sampleSize = getSampleSizeValue(session); int sampleSize = getSampleSizeValue(session);
LazyResultQueryFlat lazyResult = new LazyResultQueryFlat(expressionArray, sampleSize, columnCount); LazyResultQueryFlat lazyResult = isForUpdateMvcc
? new LazyResultQueryFlatForUpdate(expressionArray, sampleSize, columnCount)
: new LazyResultQueryFlat(expressionArray, sampleSize, columnCount);
skipOffset(lazyResult, offset, quickOffset); skipOffset(lazyResult, offset, quickOffset);
if (result == null) { if (result == null) {
return lazyResult; return lazyResult;
...@@ -716,7 +725,6 @@ public class Select extends Query { ...@@ -716,7 +725,6 @@ public class Select extends Query {
} }
Value[] row = null; Value[] row = null;
while (result.getRowCount() < limitRows && lazyResult.next()) { while (result.getRowCount() < limitRows && lazyResult.next()) {
addForUpdateRow(forUpdateRows);
row = lazyResult.currentRow(); row = lazyResult.currentRow();
result.addRow(row); result.addRow(row);
} }
...@@ -727,46 +735,13 @@ public class Select extends Query { ...@@ -727,46 +735,13 @@ public class Select extends Query {
if (sort.compare(expected, row) != 0) { if (sort.compare(expected, row) != 0) {
break; break;
} }
addForUpdateRow(forUpdateRows);
result.addRow(row); result.addRow(row);
} }
result.limitsWereApplied(); result.limitsWereApplied();
} }
lockForUpdateRows(forUpdateRows);
return null; return null;
} }
private ArrayList<Row>[] initForUpdateRows() {
if (!this.isForUpdateMvcc) {
return null;
}
int count = filters.size();
@SuppressWarnings("unchecked")
ArrayList<Row>[] rows = new ArrayList[count];
for (int i = 0; i < count; i++) {
rows[i] = Utils.<Row>newSmallArrayList();
}
return rows;
}
private void addForUpdateRow(ArrayList<Row>[] forUpdateRows) {
if (forUpdateRows != null) {
int count = filters.size();
for (int i = 0; i < count; i++) {
filters.get(i).lockRowAdd(forUpdateRows[i]);
}
}
}
private void lockForUpdateRows(ArrayList<Row>[] forUpdateRows) {
if (forUpdateRows != null) {
int count = filters.size();
for (int i = 0; i < count; i++) {
filters.get(i).lockRows(forUpdateRows[i]);
}
}
}
private static void skipOffset(LazyResultSelect lazyResult, long offset, boolean quickOffset) { private static void skipOffset(LazyResultSelect lazyResult, long offset, boolean quickOffset) {
if (quickOffset) { if (quickOffset) {
while (offset > 0 && lazyResult.skip()) { while (offset > 0 && lazyResult.skip()) {
...@@ -1871,7 +1846,7 @@ public class Select extends Query { ...@@ -1871,7 +1846,7 @@ public class Select extends Query {
/** /**
* Lazy execution for a flat query. * Lazy execution for a flat query.
*/ */
private final class LazyResultQueryFlat extends LazyResultSelect { private class LazyResultQueryFlat extends LazyResultSelect {
int sampleSize; int sampleSize;
...@@ -1884,7 +1859,7 @@ public class Select extends Query { ...@@ -1884,7 +1859,7 @@ public class Select extends Query {
protected Value[] fetchNextRow() { protected Value[] fetchNextRow() {
while ((sampleSize <= 0 || rowNumber < sampleSize) && topTableFilter.next()) { while ((sampleSize <= 0 || rowNumber < sampleSize) && topTableFilter.next()) {
setCurrentRowNumber(rowNumber + 1); setCurrentRowNumber(rowNumber + 1);
if (isConditionMet()) { if (isSelectConditionMet()) {
++rowNumber; ++rowNumber;
Value[] row = new Value[columnCount]; Value[] row = new Value[columnCount];
for (int i = 0; i < columnCount; i++) { for (int i = 0; i < columnCount; i++) {
...@@ -1901,13 +1876,33 @@ public class Select extends Query { ...@@ -1901,13 +1876,33 @@ public class Select extends Query {
protected boolean skipNextRow() { protected boolean skipNextRow() {
while ((sampleSize <= 0 || rowNumber < sampleSize) && topTableFilter.next()) { while ((sampleSize <= 0 || rowNumber < sampleSize) && topTableFilter.next()) {
setCurrentRowNumber(rowNumber + 1); setCurrentRowNumber(rowNumber + 1);
if (isConditionMet()) { if (isSelectConditionMet()) {
++rowNumber; ++rowNumber;
return true; return true;
} }
} }
return false; return false;
} }
boolean isSelectConditionMet() {
return isConditionMet();
}
}
/**
* Lazy execution for a flat for update query.
*/
private final class LazyResultQueryFlatForUpdate extends LazyResultQueryFlat {
LazyResultQueryFlatForUpdate(Expression[] expressions, int sampleSize, int columnCount) {
super(expressions, sampleSize, columnCount);
}
@Override
boolean isSelectConditionMet() {
return isConditionMetForUpdate();
}
} }
/** /**
......
...@@ -214,20 +214,6 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -214,20 +214,6 @@ public class MVPrimaryIndex extends BaseIndex {
} }
} }
/**
* Lock a set of rows.
*
* @param session database session
* @param rowsForUpdate rows to lock
*/
void lockRows(Session session, Iterable<Row> rowsForUpdate) {
TransactionMap<Value, Value> map = getMap(session);
for (Row row : rowsForUpdate) {
long key = row.getKey();
lockRow(map, key);
}
}
/** /**
* Lock a single row. * Lock a single row.
* *
......
...@@ -734,11 +734,6 @@ public class MVTable extends TableBase { ...@@ -734,11 +734,6 @@ public class MVTable extends TableBase {
analyzeIfRequired(session); analyzeIfRequired(session);
} }
@Override
public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
primaryIndex.lockRows(session, rowsForUpdate);
}
@Override @Override
public Row lockRow(Session session, Row row) { public Row lockRow(Session session, Row row) {
return primaryIndex.lockRow(session, row); return primaryIndex.lockRow(session, row);
......
...@@ -176,18 +176,6 @@ public abstract class Table extends SchemaObjectBase { ...@@ -176,18 +176,6 @@ public abstract class Table extends SchemaObjectBase {
*/ */
public abstract void removeRow(Session session, Row row); public abstract void removeRow(Session session, Row row);
/**
* Locks rows, preventing any updated to them, except from the session specified.
*
* @param session the session
* @param rowsForUpdate rows to lock
*/
public void lockRows(Session session, Iterable<Row> rowsForUpdate) {
for (Row row : rowsForUpdate) {
lockRow(session, row);
}
}
/** /**
* Locks row, preventing any updated to it, except from the session specified. * Locks row, preventing any updated to it, except from the session specified.
* *
......
...@@ -1177,15 +1177,6 @@ public class TableFilter implements ColumnResolver { ...@@ -1177,15 +1177,6 @@ public class TableFilter implements ColumnResolver {
} }
} }
/**
* Lock the given rows.
*
* @param forUpdateRows the rows to lock
*/
public void lockRows(Iterable<Row> forUpdateRows) {
table.lockRows(session, forUpdateRows);
}
public TableFilter getNestedJoin() { public TableFilter getNestedJoin() {
return nestedJoin; return nestedJoin;
} }
......
...@@ -31,11 +31,13 @@ public class TestTransaction extends TestDb { ...@@ -31,11 +31,13 @@ public class TestTransaction extends TestDb {
* @param a ignored * @param a ignored
*/ */
public static void main(String... a) throws Exception { public static void main(String... a) throws Exception {
TestBase.createCaller().init().test(); TestBase init = TestBase.createCaller().init();
init.config.multiThreaded = true;
init.test();
} }
@Override @Override
public void test() throws SQLException { public void test() throws Exception {
testClosingConnectionWithSessionTempTable(); testClosingConnectionWithSessionTempTable();
testClosingConnectionWithLockedTable(); testClosingConnectionWithLockedTable();
testConstraintCreationRollback(); testConstraintCreationRollback();
...@@ -45,6 +47,7 @@ public class TestTransaction extends TestDb { ...@@ -45,6 +47,7 @@ public class TestTransaction extends TestDb {
testRollback(); testRollback();
testRollback2(); testRollback2();
testForUpdate(); testForUpdate();
testForUpdate2();
testSetTransaction(); testSetTransaction();
testReferential(); testReferential();
testSavepoint(); testSavepoint();
...@@ -218,6 +221,81 @@ public class TestTransaction extends TestDb { ...@@ -218,6 +221,81 @@ public class TestTransaction extends TestDb {
conn.close(); conn.close();
} }
private void testForUpdate2() throws Exception {
// Exclude some configurations to avoid spending too much time in sleep()
if (config.mvStore && !config.multiThreaded || config.networked || config.cipher != null) {
return;
}
deleteDb("transaction");
Connection conn1 = getConnection("transaction");
Connection conn2 = getConnection("transaction");
Statement stat1 = conn1.createStatement();
stat1.execute("CREATE TABLE TEST (ID INT PRIMARY KEY, V INT)");
stat1.execute("INSERT INTO TEST VALUES (1, 0)");
conn1.setAutoCommit(false);
conn2.createStatement().execute("SET LOCK_TIMEOUT 2000");
if (config.mvStore) {
testForUpdate2(conn1, stat1, conn2, false);
}
testForUpdate2(conn1, stat1, conn2, true);
conn1.close();
conn2.close();
}
void testForUpdate2(Connection conn1, Statement stat1, Connection conn2, boolean forUpdate)
throws Exception {
testForUpdate2(conn1, stat1, conn2, forUpdate, false);
testForUpdate2(conn1, stat1, conn2, forUpdate, true);
}
void testForUpdate2(Connection conn1, Statement stat1, Connection conn2, boolean forUpdate,
boolean window) throws Exception {
testForUpdate2(conn1, stat1, conn2, forUpdate, window, false);
testForUpdate2(conn1, stat1, conn2, forUpdate, window, true);
}
void testForUpdate2(Connection conn1, Statement stat1, final Connection conn2, boolean forUpdate,
boolean window, boolean excluded) throws Exception {
stat1.execute("UPDATE TEST SET V = 1 WHERE ID = 1");
conn1.commit();
stat1.execute("UPDATE TEST SET V = 2 WHERE ID = 1");
final int[] res = new int[1];
final Exception[] ex = new Exception[1];
StringBuilder builder = new StringBuilder("SELECT V");
if (window) {
builder.append(", RANK() OVER (ORDER BY ID)");
}
builder.append(" FROM TEST WHERE ID = 1");
if (excluded) {
builder.append(" AND V = 1");
}
if (forUpdate) {
builder.append(" FOR UPDATE");
}
String query = builder.toString();
final PreparedStatement prep2 = conn2.prepareStatement(query);
Thread t = new Thread() {
@Override
public void run() {
try {
ResultSet resultSet = prep2.executeQuery();
res[0] = resultSet.next() ? resultSet.getInt(1) : -1;
conn2.commit();
} catch (SQLException e) {
ex[0] = e;
}
}
};
t.start();
Thread.sleep(500);
conn1.commit();
t.join();
if (ex[0] != null) {
throw ex[0];
}
assertEquals(forUpdate ? excluded ? -1 : 2 : 1, res[0]);
}
private void testRollback() throws SQLException { private void testRollback() throws SQLException {
deleteDb("transaction"); deleteDb("transaction");
Connection conn = getConnection("transaction"); Connection conn = getConnection("transaction");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论