提交 58425db8 authored 作者: S.Vladykin's avatar S.Vladykin

plan and tests

上级 233ddba8
......@@ -7,6 +7,7 @@ package org.h2.command;
import java.util.ArrayList;
import org.h2.api.DatabaseEventListener;
import org.h2.command.dml.Explain;
import org.h2.command.dml.Query;
import org.h2.expression.Parameter;
import org.h2.expression.ParameterInterface;
......@@ -47,8 +48,18 @@ public class CommandContainer extends Command {
@Override
public void prepareJoinBatch() {
if (session.isJoinBatchEnabled() && prepared.isQuery()) {
if (session.isJoinBatchEnabled()) {
prepareJoinBatch(prepared);
}
}
private static void prepareJoinBatch(Prepared prepared) {
if (prepared.isQuery()) {
if (prepared.getType() == CommandInterface.SELECT) {
((Query) prepared).prepareJoinBatch();
} else if (prepared.getType() == CommandInterface.EXPLAIN) {
prepareJoinBatch(((Explain) prepared).getCommand());
}
}
}
......
......@@ -294,6 +294,9 @@ public class AlterTableAddConstraint extends SchemaCommand {
}
private static Index getUniqueIndex(Table t, IndexColumn[] cols) {
if (t.getIndexes() == null) {
return null;
}
for (Index idx : t.getIndexes()) {
if (canUseUniqueIndex(idx, t, cols)) {
return idx;
......@@ -303,6 +306,9 @@ public class AlterTableAddConstraint extends SchemaCommand {
}
private static Index getIndex(Table t, IndexColumn[] cols, boolean moreColumnOk) {
if (t.getIndexes() == null) {
return null;
}
for (Index idx : t.getIndexes()) {
if (canUseIndex(idx, t, cols, moreColumnOk)) {
return idx;
......
......@@ -132,7 +132,7 @@ public class Delete extends Prepared {
}
PlanItem item = tableFilter.getBestPlanItem(session, new TableFilter[]{tableFilter}, 0);
tableFilter.setPlanItem(item);
tableFilter.prepare();
tableFilter.prepare(false);
}
@Override
......
......@@ -40,6 +40,10 @@ public class Explain extends Prepared {
this.command = command;
}
public Prepared getCommand() {
return command;
}
@Override
public void prepare() {
command.prepare();
......
......@@ -24,11 +24,6 @@ public class NoOperation extends Prepared {
return 0;
}
@Override
public boolean isQuery() {
return false;
}
@Override
public boolean isTransactional() {
return true;
......
......@@ -1010,9 +1010,7 @@ public class Select extends Query {
setEvaluatableRecursive(topTableFilter);
if (!parse) {
topTableFilter.prepare();
}
topTableFilter.prepare(parse);
return planCost;
}
......
......@@ -189,7 +189,7 @@ public class Update extends Prepared {
}
PlanItem item = tableFilter.getBestPlanItem(session, new TableFilter[] {tableFilter}, 0);
tableFilter.setPlanItem(item);
tableFilter.prepare();
tableFilter.prepare(false);
}
@Override
......
......@@ -51,6 +51,13 @@ public interface IndexLookupBatch {
*/
List<Future<Cursor>> find();
/**
* Get plan for EXPLAIN.
*
* @return plan
*/
String getPlanSQL();
/**
* Reset this batch to clear state. This method will be called before each query execution.
*/
......
......@@ -95,16 +95,6 @@ public final class JoinBatch {
this.additionalFilter = additionalFilter;
}
/**
* Check if the index at the given table filter really supports batching in this query.
*
* @param joinFilterId joined table filter id
* @return {@code true} if index really supports batching in this query
*/
public boolean isBatchedIndex(int joinFilterId) {
return filters[joinFilterId].isBatched();
}
/**
* Get the lookup batch for the given table filter.
*
......@@ -413,7 +403,6 @@ public final class JoinBatch {
private final TableFilter filter;
private final JoinFilter join;
private final int id;
private final boolean fakeBatch;
private final IndexLookupBatch lookupBatch;
......@@ -421,17 +410,17 @@ public final class JoinBatch {
this.filter = filter;
this.id = filter.getJoinFilterId();
this.join = join;
fakeBatch = lookupBatch == null;
this.lookupBatch = fakeBatch ? new FakeLookupBatch(filter) : lookupBatch;
if (lookupBatch == null && id != 0) {
lookupBatch = new FakeLookupBatch(filter);
}
private boolean isBatched() {
return !fakeBatch;
this.lookupBatch = lookupBatch;
}
private void reset() {
if (lookupBatch != null) {
lookupBatch.reset();
}
}
private Row getNullRow() {
return filter.getTable().getNullRow();
......@@ -657,6 +646,11 @@ public final class JoinBatch {
this.filter = filter;
}
@Override
public String getPlanSQL() {
return "fake";
}
@Override
public void reset() {
full = false;
......@@ -722,30 +716,37 @@ public final class JoinBatch {
private abstract static class ViewIndexLookupBatchBase<R extends QueryRunnerBase>
implements IndexLookupBatch {
protected final ViewIndex viewIndex;
protected final ArrayList<Future<Cursor>> result = New.arrayList();
protected int resultSize;
private final ArrayList<Future<Cursor>> result = New.arrayList();
private int resultSize;
private boolean findCalled;
protected ViewIndexLookupBatchBase(ViewIndex viewIndex) {
this.viewIndex = viewIndex;
}
@Override
public String getPlanSQL() {
return "view";
}
protected abstract boolean collectSearchRows(R r);
protected abstract R newQueryRunner();
protected abstract void startQueryRunners();
protected abstract void startQueryRunners(int resultSize);
protected final boolean resetAfterFind() {
if (resultSize < 0) {
if (!findCalled) {
return false;
}
findCalled = false;
// method find was called, we need to reset futures to initial state for reuse
for (int i = 0, size = -resultSize; i < size; i++) {
for (int i = 0; i < resultSize; i++) {
queryRunner(i).reset();
}
resultSize = 0;
return true;
}
return false;
}
@SuppressWarnings("unchecked")
protected R queryRunner(int i) {
......@@ -790,12 +791,9 @@ public final class JoinBatch {
if (resultSize == 0) {
return Collections.emptyList();
}
startQueryRunners();
List<Future<Cursor>> list = resultSize == result.size() ?
result : result.subList(0, resultSize);
// mark that method find was called
resultSize = -resultSize;
return list;
findCalled = true;
startQueryRunners(resultSize);
return resultSize == result.size() ? result : result.subList(0, resultSize);
}
}
......@@ -852,17 +850,16 @@ public final class JoinBatch {
@Override
public boolean isBatchFull() {
assert resultSize >= 0;
return top.isBatchFull();
}
@Override
protected void startQueryRunners() {
protected void startQueryRunners(int resultSize) {
// we do batched find only for top table filter and then lazily run the ViewIndex query
// for each received top future cursor
List<Future<Cursor>> topFutureCursors = top.find();
if (topFutureCursors.size() != resultSize) {
throw DbException.throwInternalError("Unexpected result size: " + result.size() +
throw DbException.throwInternalError("Unexpected result size: " + topFutureCursors.size() +
", expected :" + resultSize);
}
for (int i = 0; i < resultSize; i++) {
......@@ -976,15 +973,18 @@ public final class JoinBatch {
}
@Override
protected void startQueryRunners() {
for (int i = 0; i < filters.size(); i++) {
List<Future<Cursor>> topFutureCursors = filters.get(i).find();
for (int j = 0, k = 0; j < resultSize; j++) {
Future<Cursor>[] cs = queryRunner(j).topFutureCursors;
if (cs[j] == null) {
cs[j] = topFutureCursors.get(k++);
}
}
protected void startQueryRunners(int resultSize) {
for (int f = 0; f < filters.size(); f++) {
List<Future<Cursor>> topFutureCursors = filters.get(f).find();
int r = 0, c = 0;
for (; r < resultSize; r++) {
Future<Cursor>[] cs = queryRunner(r).topFutureCursors;
if (cs[f] == null) {
cs[f] = topFutureCursors.get(c++);
}
}
assert r == resultSize;
assert c == topFutureCursors.size();
}
}
}
......
......@@ -269,8 +269,10 @@ public class TableFilter implements ColumnResolver {
/**
* Prepare reading rows. This method will remove all index conditions that
* can not be used, and optimize the conditions.
*
* @param parse if we are parsing sub-query
*/
public void prepare() {
public void prepare(boolean parse) {
// forget all unused index conditions
// the indexConditions list may be modified here
for (int i = 0; i < indexConditions.size(); i++) {
......@@ -289,14 +291,15 @@ public class TableFilter implements ColumnResolver {
if (SysProperties.CHECK && nestedJoin == this) {
DbException.throwInternalError("self join");
}
nestedJoin.prepare();
nestedJoin.prepare(parse);
}
if (join != null) {
if (SysProperties.CHECK && join == this) {
DbException.throwInternalError("self join");
}
join.prepare();
join.prepare(parse);
}
if (!parse) {
if (filterCondition != null) {
filterCondition = filterCondition.optimize(session);
}
......@@ -304,6 +307,7 @@ public class TableFilter implements ColumnResolver {
joinCondition = joinCondition.optimize(session);
}
}
}
/**
* Start the query. This will reset the scan counts.
......@@ -784,11 +788,16 @@ public class TableFilter implements ColumnResolver {
buff.append('\n');
StatementBuilder planBuff = new StatementBuilder();
if (joinBatch != null) {
if (joinBatch.isBatchedIndex(joinFilterId)) {
planBuff.append("batched:true ");
} else if (joinFilterId != 0) {
// top table filter does not need to fake batching, it works as usual in this case
planBuff.append("batched:fake ");
IndexLookupBatch lookupBatch = joinBatch.getLookupBatch(joinFilterId);
if (lookupBatch == null) {
if (joinFilterId != 0) {
throw DbException.throwInternalError();
}
} else {
planBuff.append("batched:");
String batchPlan = lookupBatch.getPlanSQL();
planBuff.append(batchPlan);
planBuff.append(" ");
}
}
planBuff.append(index.getPlanSQL());
......
......@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.api.TableEngine;
import org.h2.command.ddl.CreateTableData;
import org.h2.command.dml.OptimizerHints;
......@@ -36,6 +37,7 @@ import org.h2.index.Index;
import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexType;
import org.h2.index.SingleRowCursor;
import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
......@@ -383,10 +385,24 @@ public class TestTableEngines extends TestBase {
deleteDb("testSubQueryInfo");
}
private void setBatchingEnabled(Statement stat, boolean enabled) throws SQLException {
stat.execute("SET BATCH_JOINS " + enabled);
if (!config.networked) {
Session s = (Session) ((JdbcConnection) stat.getConnection()).getSession();
assertEquals(enabled, s.isJoinBatchEnabled());
}
}
private void testBatchedJoin() throws SQLException {
deleteDb("tableEngine");
Connection conn = getConnection("tableEngine;OPTIMIZE_REUSE_RESULTS=0");
deleteDb("testBatchedJoin");
Connection conn = getConnection("testBatchedJoin;OPTIMIZE_REUSE_RESULTS=0;BATCH_JOINS=1");
Statement stat = conn.createStatement();
if (!config.networked) {
Session s = (Session) ((JdbcConnection) conn).getSession();
assertTrue(s.isJoinBatchEnabled());
}
setBatchingEnabled(stat, false);
setBatchingEnabled(stat, true);
TreeSetIndex.exec = Executors.newFixedThreadPool(8, new ThreadFactory() {
@Override
......@@ -399,6 +415,9 @@ public class TestTableEngines extends TestBase {
enableJoinReordering(false);
try {
doTestBatchedJoinSubQueryUnion(stat);
TreeSetIndex.lookupBatches.set(0);
doTestBatchedJoin(stat, 1, 0, 0);
doTestBatchedJoin(stat, 0, 1, 0);
doTestBatchedJoin(stat, 0, 0, 1);
......@@ -429,11 +448,13 @@ public class TestTableEngines extends TestBase {
doTestBatchedJoin(stat, 0, 0, 5);
doTestBatchedJoin(stat, 0, 8, 1);
doTestBatchedJoin(stat, 0, 2, 1);
assertTrue(TreeSetIndex.lookupBatches.get() > 0);
} finally {
enableJoinReordering(true);
TreeSetIndex.exec.shutdownNow();
}
deleteDb("tableEngine");
deleteDb("testBatchedJoin");
}
/**
......@@ -448,6 +469,63 @@ public class TestTableEngines extends TestBase {
OptimizerHints.set(hints);
}
private void checkPlan(Statement stat, String sql) throws SQLException {
ResultSet rs = stat.executeQuery("EXPLAIN " + sql);
assertTrue(rs.next());
String plan = rs.getString(1);
assertEquals(normalize(sql), normalize(plan));
}
private static String normalize(String sql) {
sql = sql.replace('\n', ' ');
return sql.replaceAll("\\s+", " ").trim();
}
private void doTestBatchedJoinSubQueryUnion(Statement stat) throws SQLException {
String engine = '"' + TreeSetIndexTableEngine.class.getName() + '"';
stat.execute("CREATE TABLE t (a int, b int) ENGINE " + engine);
TreeSetTable t = TreeSetIndexTableEngine.created;
stat.execute("CREATE INDEX T_IDX_A ON t(a)");
stat.execute("CREATE INDEX T_IDX_B ON t(b)");
setBatchSize(t, 3);
for (int i = 0; i < 20; i++) {
stat.execute("insert into t values (" + i + "," + i + ")");
}
stat.execute("CREATE TABLE u (a int, b int) ENGINE " + engine);
TreeSetTable u = TreeSetIndexTableEngine.created;
stat.execute("CREATE INDEX U_IDX_A ON u(a)");
stat.execute("CREATE INDEX U_IDX_B ON u(b)");
setBatchSize(u, 0);
for (int i = 0; i < 20; i++) {
stat.execute("insert into u values (" + i + "," + i + ")");
}
checkPlan(stat, "SELECT 1 FROM PUBLIC.T T1 /* PUBLIC.\"scan\" */ "
+ "INNER JOIN PUBLIC.T T2 /* batched:test PUBLIC.T_IDX_B: B = T1.A */ "
+ "ON 1=1 WHERE T1.A = T2.B");
checkPlan(stat, "SELECT 1 FROM PUBLIC.T T1 /* PUBLIC.\"scan\" */ "
+ "INNER JOIN PUBLIC.T T2 /* batched:test PUBLIC.T_IDX_B: B = T1.A */ "
+ "ON 1=1 /* WHERE T1.A = T2.B */ "
+ "INNER JOIN PUBLIC.T T3 /* batched:test PUBLIC.T_IDX_B: B = T2.A */ "
+ "ON 1=1 WHERE (T2.A = T3.B) AND (T1.A = T2.B)");
checkPlan(stat, "SELECT 1 FROM PUBLIC.T T1 /* PUBLIC.\"scan\" */ "
+ "INNER JOIN PUBLIC.U /* batched:fake PUBLIC.U_IDX_A: A = T1.A */ "
+ "ON 1=1 /* WHERE T1.A = U.A */ "
+ "INNER JOIN PUBLIC.T T2 /* batched:test PUBLIC.T_IDX_B: B = U.B */ "
+ "ON 1=1 WHERE (T1.A = U.A) AND (U.B = T2.B)");
checkPlan(stat, "SELECT 1 FROM ( SELECT A FROM PUBLIC.T ) Z "
+ "/* SELECT A FROM PUBLIC.T /++ PUBLIC.\"scan\" ++/ */ "
+ "INNER JOIN PUBLIC.T /* batched:test PUBLIC.T_IDX_B: B = Z.A */ "
+ "ON 1=1 WHERE Z.A = T.B");
checkPlan(stat, "SELECT 1 FROM PUBLIC.T /* PUBLIC.\"scan\" */ "
+ "INNER JOIN ( SELECT A FROM PUBLIC.T ) Z "
+ "/* batched:view SELECT A FROM PUBLIC.T /++ batched:test PUBLIC.T_IDX_A: A IS ?1 ++/ "
+ "WHERE A IS ?1: A = T.B */ ON 1=1 WHERE Z.A = T.B");
stat.execute("DROP TABLE T");
stat.execute("DROP TABLE U");
}
private void doTestBatchedJoin(Statement stat, int... batchSizes) throws SQLException {
ArrayList<TreeSetTable> tables = New.arrayList(batchSizes.length);
......@@ -501,6 +579,9 @@ public class TestTableEngines extends TestBase {
fail();
}
}
for (int i = 0; i < batchSizes.length; i++) {
stat.executeUpdate("DROP TABLE IF EXISTS T" + i);
}
}
private static void assert0(boolean condition, String message) {
......@@ -512,7 +593,15 @@ public class TestTableEngines extends TestBase {
private static void setBatchSize(ArrayList<TreeSetTable> tables, int... batchSizes) {
for (int i = 0; i < batchSizes.length; i++) {
int batchSize = batchSizes[i];
for (Index idx : tables.get(i).getIndexes()) {
setBatchSize(tables.get(i), batchSize);
}
}
private static void setBatchSize(TreeSetTable t, int batchSize) {
if (t.getIndexes() == null) {
t.scan.preferedBatchSize = batchSize;
} else {
for (Index idx : t.getIndexes()) {
((TreeSetIndex) idx).preferedBatchSize = batchSize;
}
}
......@@ -1115,6 +1204,8 @@ public class TestTableEngines extends TestBase {
* An index that internally uses a tree set.
*/
private static class TreeSetIndex extends BaseIndex implements Comparator<SearchRow> {
private static AtomicInteger lookupBatches = new AtomicInteger();
/**
* Executor service to test batched joins.
*/
......@@ -1145,9 +1236,18 @@ public class TestTableEngines extends TestBase {
public IndexLookupBatch createLookupBatch(final TableFilter filter) {
assert filter.getMasks() != null || "scan".equals(getName());
final int preferedSize = preferedBatchSize;
return preferedSize == 0 ? null : new IndexLookupBatch() {
if (preferedSize == 0) {
return null;
}
lookupBatches.incrementAndGet();
return new IndexLookupBatch() {
List<SearchRow> searchRows = New.arrayList();
@Override
public String getPlanSQL() {
return "test";
}
@Override public boolean isBatchFull() {
return searchRows.size() >= preferedSize * 2;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论