提交 4339e509 authored 作者: Evgenij Ryazanov's avatar Evgenij Ryazanov

Add support for FOR UPDATE with JOINs with MVStore engine

上级 7500ae66
...@@ -39,10 +39,10 @@ Multiple set operators (UNION, INTERSECT, MINUS, EXCEPT) are evaluated ...@@ -39,10 +39,10 @@ Multiple set operators (UNION, INTERSECT, MINUS, EXCEPT) are evaluated
from left to right. For compatibility with other databases and future versions from left to right. For compatibility with other databases and future versions
of H2 please use parentheses. of H2 please use parentheses.
If FOR UPDATE is specified, the tables are locked for writing. When using If FOR UPDATE is specified, the tables or rows are locked for writing.
MVCC, only the selected rows are locked as in an UPDATE statement. This clause is not allowed in DISTINCT queries and in queries with non-window aggregates, GROUP BY, or HAVING clauses.
In this case, aggregate, GROUP BY, DISTINCT queries or joins When using default MVStore engine only the selected rows are locked as in an UPDATE statement.
are not allowed in this case. With PageStore engine the whole tables are locked.
"," ","
SELECT * FROM TEST; SELECT * FROM TEST;
SELECT * FROM TEST ORDER BY NAME; SELECT * FROM TEST ORDER BY NAME;
......
...@@ -442,14 +442,12 @@ public class Select extends Query { ...@@ -442,14 +442,12 @@ public class Select extends Query {
int rowNumber = 0; int rowNumber = 0;
setCurrentRowNumber(0); setCurrentRowNumber(0);
int sampleSize = getSampleSizeValue(session); int sampleSize = getSampleSizeValue(session);
ArrayList<Row> forUpdateRows = this.isForUpdateMvcc ? Utils.<Row>newSmallArrayList() : null; ArrayList<Row>[] forUpdateRows = initForUpdateRows();
while (topTableFilter.next()) { while (topTableFilter.next()) {
setCurrentRowNumber(rowNumber + 1); setCurrentRowNumber(rowNumber + 1);
if (isConditionMet()) { if (isConditionMet()) {
rowNumber++; rowNumber++;
if (forUpdateRows != null) { addForUpdateRow(forUpdateRows);
topTableFilter.lockRowAdd(forUpdateRows);
}
groupData.nextSource(); groupData.nextSource();
updateAgg(columnCount, stage); updateAgg(columnCount, stage);
if (sampleSize > 0 && rowNumber >= sampleSize) { if (sampleSize > 0 && rowNumber >= sampleSize) {
...@@ -457,9 +455,7 @@ public class Select extends Query { ...@@ -457,9 +455,7 @@ public class Select extends Query {
} }
} }
} }
if (forUpdateRows != null) { lockForUpdateRows(forUpdateRows);
topTableFilter.lockRows(forUpdateRows);
}
groupData.done(); groupData.done();
} }
...@@ -634,7 +630,7 @@ public class Select extends Query { ...@@ -634,7 +630,7 @@ public class Select extends Query {
limitRows = Long.MAX_VALUE; limitRows = Long.MAX_VALUE;
} }
} }
ArrayList<Row> forUpdateRows = this.isForUpdateMvcc ? Utils.<Row>newSmallArrayList() : null; ArrayList<Row>[] forUpdateRows = initForUpdateRows();
int sampleSize = getSampleSizeValue(session); int sampleSize = getSampleSizeValue(session);
LazyResultQueryFlat lazyResult = new LazyResultQueryFlat(expressionArray, LazyResultQueryFlat lazyResult = new LazyResultQueryFlat(expressionArray,
sampleSize, columnCount); sampleSize, columnCount);
...@@ -647,9 +643,7 @@ public class Select extends Query { ...@@ -647,9 +643,7 @@ public class Select extends Query {
} }
Value[] row = null; Value[] row = null;
while (result.getRowCount() < limitRows && lazyResult.next()) { while (result.getRowCount() < limitRows && lazyResult.next()) {
if (forUpdateRows != null) { addForUpdateRow(forUpdateRows);
topTableFilter.lockRowAdd(forUpdateRows);
}
row = lazyResult.currentRow(); row = lazyResult.currentRow();
result.addRow(row); result.addRow(row);
} }
...@@ -660,17 +654,44 @@ public class Select extends Query { ...@@ -660,17 +654,44 @@ public class Select extends Query {
if (sort.compare(expected, row) != 0) { if (sort.compare(expected, row) != 0) {
break; break;
} }
if (forUpdateRows != null) { addForUpdateRow(forUpdateRows);
topTableFilter.lockRowAdd(forUpdateRows);
}
result.addRow(row); result.addRow(row);
} }
result.limitsWereApplied(); result.limitsWereApplied();
} }
lockForUpdateRows(forUpdateRows);
return null;
}
private ArrayList<Row>[] initForUpdateRows() {
if (!this.isForUpdateMvcc) {
return null;
}
int count = filters.size();
@SuppressWarnings("unchecked")
ArrayList<Row>[] rows = new ArrayList[count];
for (int i = 0; i < count; i++) {
rows[i] = Utils.<Row>newSmallArrayList();
}
return rows;
}
private void addForUpdateRow(ArrayList<Row>[] forUpdateRows) {
if (forUpdateRows != null) { if (forUpdateRows != null) {
topTableFilter.lockRows(forUpdateRows); int count = filters.size();
for (int i = 0; i < count; i++) {
filters.get(i).lockRowAdd(forUpdateRows[i]);
}
}
}
private void lockForUpdateRows(ArrayList<Row>[] forUpdateRows) {
if (forUpdateRows != null) {
int count = filters.size();
for (int i = 0; i < count; i++) {
filters.get(i).lockRows(forUpdateRows[i]);
}
} }
return null;
} }
private static void skipOffset(LazyResultSelect lazyResult, long offset, boolean quickOffset) { private static void skipOffset(LazyResultSelect lazyResult, long offset, boolean quickOffset) {
...@@ -770,12 +791,6 @@ public class Select extends Query { ...@@ -770,12 +791,6 @@ public class Select extends Query {
topTableFilter.startQuery(session); topTableFilter.startQuery(session);
topTableFilter.reset(); topTableFilter.reset();
boolean exclusive = isForUpdate && !isForUpdateMvcc; boolean exclusive = isForUpdate && !isForUpdateMvcc;
if (isForUpdateMvcc) {
if (topTableFilter.getJoin() != null) {
throw DbException.getUnsupportedException(
"MVCC=TRUE && FOR UPDATE && JOIN");
}
}
topTableFilter.lock(session, exclusive, exclusive); topTableFilter.lock(session, exclusive, exclusive);
ResultTarget to = result != null ? result : target; ResultTarget to = result != null ? result : target;
lazy &= to == null; lazy &= to == null;
...@@ -1453,8 +1468,7 @@ public class Select extends Query { ...@@ -1453,8 +1468,7 @@ public class Select extends Query {
throw DbException.get(ErrorCode.FOR_UPDATE_IS_NOT_ALLOWED_IN_DISTICT_OR_GROUPED_SELECT); throw DbException.get(ErrorCode.FOR_UPDATE_IS_NOT_ALLOWED_IN_DISTICT_OR_GROUPED_SELECT);
} }
this.isForUpdate = b; this.isForUpdate = b;
if (session.getDatabase().getSettings().selectForUpdateMvcc && if (session.getDatabase().isMVStore()) {
session.getDatabase().isMVStore()) {
isForUpdateMvcc = b; isForUpdateMvcc = b;
} }
} }
......
...@@ -296,14 +296,6 @@ public class DbSettings extends SettingsBase { ...@@ -296,14 +296,6 @@ public class DbSettings extends SettingsBase {
*/ */
public final boolean rowId = get("ROWID", true); public final boolean rowId = get("ROWID", true);
/**
* Database setting <code>SELECT_FOR_UPDATE_MVCC</code>
* (default: true).<br />
* If set, SELECT .. FOR UPDATE queries lock only the selected rows when
* using MVCC.
*/
public final boolean selectForUpdateMvcc = get("SELECT_FOR_UPDATE_MVCC", true);
/** /**
* Database setting <code>SHARE_LINKED_CONNECTIONS</code> * Database setting <code>SHARE_LINKED_CONNECTIONS</code>
* (default: true).<br /> * (default: true).<br />
......
...@@ -158,16 +158,31 @@ public class TestTransaction extends TestDb { ...@@ -158,16 +158,31 @@ public class TestTransaction extends TestDb {
conn.setAutoCommit(false); conn.setAutoCommit(false);
Statement stat = conn.createStatement(); Statement stat = conn.createStatement();
stat.execute("create table test(id int primary key, name varchar)"); stat.execute("create table test(id int primary key, name varchar)");
stat.execute("create table test2(id int primary key, name varchar)");
stat.execute("insert into test values(1, 'Hello'), (2, 'World')"); stat.execute("insert into test values(1, 'Hello'), (2, 'World')");
stat.execute("insert into test2 values(1, 'A'), (2, 'B')");
conn.commit(); conn.commit();
PreparedStatement prep = conn.prepareStatement( Connection conn2;
"select * from test for update"); PreparedStatement prep;
prep = conn.prepareStatement("select * from test for update");
prep.execute(); prep.execute();
Connection conn2 = getConnection("transaction"); conn2 = getConnection("transaction");
conn2.setAutoCommit(false); conn2.setAutoCommit(false);
assertThrows(ErrorCode.LOCK_TIMEOUT_1, conn2.createStatement()). assertThrows(ErrorCode.LOCK_TIMEOUT_1, conn2.createStatement()).
execute("select * from test for update"); execute("select * from test for update");
conn2.close(); conn2.close();
conn.commit();
prep = conn.prepareStatement("select * from test join test2 on test.id = test2.id for update");
prep.execute();
conn2 = getConnection("transaction");
conn2.setAutoCommit(false);
assertThrows(ErrorCode.LOCK_TIMEOUT_1, conn2.createStatement()).
execute("select * from test for update");
assertThrows(ErrorCode.LOCK_TIMEOUT_1, conn2.createStatement()).
execute("select * from test2 for update");
conn2.close();
conn.commit();
conn.close(); conn.close();
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论