提交 6b374192 authored 作者: Sergi Vladykin's avatar Sergi Vladykin

Merge pull request #211 from svladykin/batchview2

Batched index lookups for sub-query
......@@ -68,6 +68,11 @@ public abstract class Command implements CommandInterface {
@Override
public abstract boolean isQuery();
/**
* Prepare join batching.
*/
public abstract void prepareJoinBatch();
/**
* Get the list of parameters.
*
......
......@@ -7,6 +7,8 @@ package org.h2.command;
import java.util.ArrayList;
import org.h2.api.DatabaseEventListener;
import org.h2.command.dml.Explain;
import org.h2.command.dml.Query;
import org.h2.expression.Parameter;
import org.h2.expression.ParameterInterface;
import org.h2.result.ResultInterface;
......@@ -44,6 +46,23 @@ public class CommandContainer extends Command {
return prepared.isQuery();
}
@Override
public void prepareJoinBatch() {
if (session.isJoinBatchEnabled()) {
prepareJoinBatch(prepared);
}
}
private static void prepareJoinBatch(Prepared prepared) {
if (prepared.isQuery()) {
if (prepared.getType() == CommandInterface.SELECT) {
((Query) prepared).prepareJoinBatch();
} else if (prepared.getType() == CommandInterface.EXPLAIN) {
prepareJoinBatch(((Explain) prepared).getCommand());
}
}
}
private void recompileIfRequired() {
if (prepared.needRecompile()) {
// TODO test with 'always recompile'
......@@ -65,6 +84,7 @@ public class CommandContainer extends Command {
}
prepared.prepare();
prepared.setModificationMetaId(mod);
prepareJoinBatch();
}
}
......
......@@ -44,6 +44,11 @@ class CommandList extends Command {
return updateCount;
}
@Override
public void prepareJoinBatch() {
command.prepareJoinBatch();
}
@Override
public ResultInterface query(int maxrows) {
ResultInterface result = command.query(maxrows);
......
......@@ -294,6 +294,9 @@ public class AlterTableAddConstraint extends SchemaCommand {
}
private static Index getUniqueIndex(Table t, IndexColumn[] cols) {
if (t.getIndexes() == null) {
return null;
}
for (Index idx : t.getIndexes()) {
if (canUseUniqueIndex(idx, t, cols)) {
return idx;
......@@ -303,6 +306,9 @@ public class AlterTableAddConstraint extends SchemaCommand {
}
private static Index getIndex(Table t, IndexColumn[] cols, boolean moreColumnOk) {
if (t.getIndexes() == null) {
return null;
}
for (Index idx : t.getIndexes()) {
if (canUseIndex(idx, t, cols, moreColumnOk)) {
return idx;
......
......@@ -40,6 +40,10 @@ public class Explain extends Prepared {
this.command = command;
}
public Prepared getCommand() {
return command;
}
@Override
public void prepare() {
command.prepare();
......
......@@ -24,11 +24,6 @@ public class NoOperation extends Prepared {
return 0;
}
@Override
public boolean isQuery() {
return false;
}
@Override
public boolean isTransactional() {
return true;
......
......@@ -71,6 +71,18 @@ public abstract class Query extends Prepared {
super(session);
}
/**
* Check if this is a UNION query.
*
* @return {@code true} if this is a UNION query
*/
public abstract boolean isUnion();
/**
* Prepare join batching.
*/
public abstract void prepareJoinBatch();
/**
* Execute the query without checking the cache. If a target is specified,
* the results are written to it, and the method returns null. If no target
......
......@@ -9,7 +9,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import org.h2.api.ErrorCode;
import org.h2.api.Trigger;
import org.h2.command.CommandInterface;
......@@ -36,6 +35,7 @@ import org.h2.result.SortOrder;
import org.h2.table.Column;
import org.h2.table.ColumnResolver;
import org.h2.table.IndexColumn;
import org.h2.table.JoinBatch;
import org.h2.table.Table;
import org.h2.table.TableFilter;
import org.h2.util.New;
......@@ -87,6 +87,11 @@ public class Select extends Query {
super(session);
}
@Override
public boolean isUnion() {
return false;
}
/**
* Add a table to the query.
*
......@@ -945,6 +950,30 @@ public class Select extends Query {
isPrepared = true;
}
@Override
public void prepareJoinBatch() {
ArrayList<TableFilter> list = New.arrayList();
TableFilter f = getTopTableFilter();
do {
if (f.getNestedJoin() != null) {
// we do not support batching with nested joins
return;
}
list.add(f);
f = f.getJoin();
} while (f != null);
TableFilter[] fs = list.toArray(new TableFilter[list.size()]);
// prepare join batch
JoinBatch jb = null;
for (int i = fs.length - 1; i >= 0; i--) {
jb = fs[i].prepareJoinBatch(jb, fs, i);
}
}
public JoinBatch getJoinBatch() {
return getTopTableFilter().getJoinBatch();
}
@Override
public double getCost() {
return cost;
......
......@@ -7,7 +7,6 @@ package org.h2.command.dml;
import java.util.ArrayList;
import java.util.HashSet;
import org.h2.api.ErrorCode;
import org.h2.command.CommandInterface;
import org.h2.engine.Session;
......@@ -72,6 +71,17 @@ public class SelectUnion extends Query {
this.left = query;
}
@Override
public boolean isUnion() {
return true;
}
@Override
public void prepareJoinBatch() {
left.prepareJoinBatch();
right.prepareJoinBatch();
}
public void setUnionType(int type) {
this.unionType = type;
}
......
......@@ -497,6 +497,15 @@ public class Set extends Prepared {
database.setRowFactory(rowFactory);
break;
}
case SetTypes.BATCH_JOINS: {
int value = getIntValue();
if (value != 0 && value != 1) {
throw DbException.getInvalidValueException("BATCH_JOINS",
getIntValue());
}
session.setJoinBatchEnabled(value == 1);
break;
}
default:
DbException.throwInternalError("type="+type);
}
......
......@@ -228,6 +228,11 @@ public class SetTypes {
*/
public static final int ROW_FACTORY = 43;
/**
* The type of SET BATCH_JOINS statement.
*/
public static final int BATCH_JOINS = 44;
private static final ArrayList<String> TYPES = New.arrayList();
private SetTypes() {
......@@ -280,6 +285,7 @@ public class SetTypes {
list.add(QUERY_STATISTICS, "QUERY_STATISTICS");
list.add(QUERY_STATISTICS_MAX_ENTRIES, "QUERY_STATISTICS_MAX_ENTRIES");
list.add(ROW_FACTORY, "ROW_FACTORY");
list.add(BATCH_JOINS, "BATCH_JOINS");
}
/**
......
......@@ -30,12 +30,14 @@ import org.h2.mvstore.db.TransactionStore.Change;
import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.result.LocalResult;
import org.h2.result.Row;
import org.h2.result.SortOrder;
import org.h2.schema.Schema;
import org.h2.store.DataHandler;
import org.h2.store.InDoubtTransaction;
import org.h2.store.LobStorageFrontend;
import org.h2.table.SubQueryInfo;
import org.h2.table.Table;
import org.h2.table.TableFilter;
import org.h2.util.New;
import org.h2.util.SmallLRUCache;
import org.h2.value.Value;
......@@ -115,6 +117,7 @@ public class Session extends SessionWithState {
private int parsingView;
private volatile SmallLRUCache<Object, ViewIndex> viewIndexCache;
private HashMap<Object, ViewIndex> subQueryIndexCache;
private boolean joinBatchEnabled;
/**
* Temporary LOBs from result sets. Those are kept for some time. The
......@@ -148,12 +151,25 @@ public class Session extends SessionWithState {
this.currentSchemaName = Constants.SCHEMA_MAIN;
}
public void setJoinBatchEnabled(boolean joinBatchEnabled) {
this.joinBatchEnabled = joinBatchEnabled;
}
public boolean isJoinBatchEnabled() {
return joinBatchEnabled;
}
public Row createRow(Value[] data, int memory) {
return database.createRow(data, memory);
}
public void setSubQueryInfo(SubQueryInfo subQueryInfo) {
this.subQueryInfo = subQueryInfo;
public void pushSubQueryInfo(int[] masks, TableFilter[] filters, int filter,
SortOrder sortOrder) {
subQueryInfo = new SubQueryInfo(subQueryInfo, masks, filters, filter, sortOrder);
}
public void popSubQueryInfo() {
subQueryInfo = subQueryInfo.getUpper();
}
public SubQueryInfo getSubQueryInfo() {
......@@ -492,6 +508,7 @@ public class Session extends SessionWithState {
// we can't reuse sub-query indexes, so just drop the whole cache
subQueryIndexCache = null;
}
command.prepareJoinBatch();
if (queryCache != null) {
if (command.isCacheable()) {
queryCache.put(sql, command);
......
......@@ -15,7 +15,8 @@ import org.h2.result.SearchRow;
* method {@link #isBatchFull()}} will return {@code true} or there are no more
* search rows to add. Then method {@link #find()} will be called to execute batched lookup.
* Note that a single instance of {@link IndexLookupBatch} can be reused for multiple
* sequential batched lookups.
* sequential batched lookups, moreover it can be reused for multiple queries for
* the same prepared statement.
*
* @see Index#createLookupBatch(org.h2.table.TableFilter)
* @author Sergi Vladykin
......@@ -26,9 +27,11 @@ public interface IndexLookupBatch {
*
* @param first the first row, or null for no limit
* @param last the last row, or null for no limit
* @return {@code false} if this search row pair is known to produce no results
* and thus the given row pair was not added
* @see Index#find(TableFilter, SearchRow, SearchRow)
*/
void addSearchRows(SearchRow first, SearchRow last);
boolean addSearchRows(SearchRow first, SearchRow last);
/**
* Check if this batch is full.
......@@ -47,4 +50,16 @@ public interface IndexLookupBatch {
* @return List of future cursors for collected search rows.
*/
List<Future<Cursor>> find();
/**
* Get plan for EXPLAIN.
*
* @return plan
*/
String getPlanSQL();
/**
* Reset this batch to clear state. This method will be called before each query execution.
*/
void reset();
}
......@@ -6,8 +6,6 @@
package org.h2.index;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import org.h2.api.ErrorCode;
import org.h2.engine.Database;
import org.h2.engine.DbObject;
......
......@@ -24,7 +24,7 @@ public class ViewCursor implements Cursor {
private final SearchRow first, last;
private Row current;
ViewCursor(ViewIndex index, LocalResult result, SearchRow first,
public ViewCursor(ViewIndex index, LocalResult result, SearchRow first,
SearchRow last) {
this.table = index.getTable();
this.index = index;
......
......@@ -21,7 +21,7 @@ import org.h2.result.SearchRow;
import org.h2.result.SortOrder;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.SubQueryInfo;
import org.h2.table.JoinBatch;
import org.h2.table.TableFilter;
import org.h2.table.TableView;
import org.h2.util.IntArray;
......@@ -93,6 +93,15 @@ public class ViewIndex extends BaseIndex implements SpatialIndex {
}
}
@Override
public IndexLookupBatch createLookupBatch(TableFilter filter) {
if (recursive) {
// we do not support batching for recursive queries
return null;
}
return JoinBatch.createViewIndexLookupBatch(this);
}
public Session getSession() {
return createSession;
}
......@@ -199,21 +208,18 @@ public class ViewIndex extends BaseIndex implements SpatialIndex {
TableFilter[] filters, int filter, SortOrder sortOrder, boolean preliminary) {
assert filters != null;
Prepared p;
SubQueryInfo upper = session.getSubQueryInfo();
SubQueryInfo info = new SubQueryInfo(upper,
masks, filters, filter, sortOrder, preliminary);
session.setSubQueryInfo(info);
session.pushSubQueryInfo(masks, filters, filter, sortOrder);
try {
p = session.prepare(sql, true);
} finally {
session.setSubQueryInfo(upper);
session.popSubQueryInfo();
}
return (Query) p;
}
private Cursor find(Session session, SearchRow first, SearchRow last,
private Cursor findRecursive(Session session, SearchRow first, SearchRow last,
SearchRow intersection) {
if (recursive) {
assert recursive;
LocalResult recResult = view.getRecursiveResult();
if (recResult != null) {
recResult.reset();
......@@ -222,7 +228,7 @@ public class ViewIndex extends BaseIndex implements SpatialIndex {
if (query == null) {
query = (Query) createSession.prepare(querySQL, true);
}
if (!(query instanceof SelectUnion)) {
if (!query.isUnion()) {
throw DbException.get(ErrorCode.SYNTAX_ERROR_2,
"recursive queries without UNION ALL");
}
......@@ -262,6 +268,9 @@ public class ViewIndex extends BaseIndex implements SpatialIndex {
result.done();
return new ViewCursor(this, result, first, last);
}
public void setupQueryParameters(Session session, SearchRow first, SearchRow last,
SearchRow intersection) {
ArrayList<Parameter> paramList = query.getParameters();
if (originalParameters != null) {
for (int i = 0, size = originalParameters.size(); i < size; i++) {
......@@ -298,6 +307,14 @@ public class ViewIndex extends BaseIndex implements SpatialIndex {
setParameter(paramList, idx++, intersection.getValue(i));
}
}
}
private Cursor find(Session session, SearchRow first, SearchRow last,
SearchRow intersection) {
if (recursive) {
return findRecursive(session, first, last, intersection);
}
setupQueryParameters(session, first, last, intersection);
LocalResult result = query.query(0);
return new ViewCursor(this, result, first, last);
}
......@@ -313,6 +330,10 @@ public class ViewIndex extends BaseIndex implements SpatialIndex {
param.setValue(v);
}
public Query getQuery() {
return query;
}
private Query getQuery(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
Query q = prepareSubQuery(querySQL, session, masks, filters, filter, sortOrder, true);
......@@ -454,5 +475,4 @@ public class ViewIndex extends BaseIndex implements SpatialIndex {
public boolean isRecursive() {
return recursive;
}
}
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.table;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import org.h2.command.dml.Query;
import org.h2.command.dml.Select;
import org.h2.command.dml.SelectUnion;
import org.h2.index.Cursor;
import org.h2.index.IndexCursor;
import org.h2.index.IndexLookupBatch;
import org.h2.index.ViewCursor;
import org.h2.index.ViewIndex;
import org.h2.message.DbException;
import org.h2.result.LocalResult;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.util.DoneFuture;
import org.h2.util.LazyFuture;
import org.h2.util.New;
import org.h2.value.Value;
import org.h2.value.ValueLong;
/**
* Support for asynchronous batched index lookups on joins.
*
* @see org.h2.index.Index#createLookupBatch(TableFilter)
* @see IndexLookupBatch
* @author Sergi Vladykin
*/
public final class JoinBatch {
private static final Cursor EMPTY_CURSOR = new Cursor() {
@Override
public boolean previous() {
return false;
}
@Override
public boolean next() {
return false;
}
@Override
public SearchRow getSearchRow() {
return null;
}
@Override
public Row get() {
return null;
}
@Override
public String toString() {
return "EMPTY_CURSOR";
}
};
private static final Future<Cursor> EMPTY_FUTURE_CURSOR = new DoneFuture<Cursor>(EMPTY_CURSOR);
private boolean batchedSubQuery;
private Future<Cursor> viewTopFutureCursor;
private JoinFilter[] filters;
private JoinFilter top;
private boolean started;
private JoinRow current;
private boolean found;
/**
* This filter joined after this batched join and can be used normally.
*/
private final TableFilter additionalFilter;
/**
* @param filtersCount number of filters participating in this batched join
* @param additionalFilter table filter after this batched join.
*/
public JoinBatch(int filtersCount, TableFilter additionalFilter) {
if (filtersCount > 32) {
// This is because we store state in a 64 bit field, 2 bits per joined table.
throw DbException.getUnsupportedException("Too many tables in join (at most 32 supported).");
}
filters = new JoinFilter[filtersCount];
this.additionalFilter = additionalFilter;
}
/**
* Get the lookup batch for the given table filter.
*
* @param joinFilterId joined table filter id
* @return lookup batch
*/
public IndexLookupBatch getLookupBatch(int joinFilterId) {
return filters[joinFilterId].lookupBatch;
}
/**
* Reset state of this batch.
*/
public void reset() {
current = null;
started = false;
found = false;
for (JoinFilter jf : filters) {
jf.reset();
}
if (additionalFilter != null) {
additionalFilter.reset();
}
}
/**
* @param filter table filter
* @param lookupBatch lookup batch
*/
public void register(TableFilter filter, IndexLookupBatch lookupBatch) {
assert filter != null;
top = new JoinFilter(lookupBatch, filter, top);
filters[top.id] = top;
}
/**
* @param filterId table filter id
* @param column column
* @return column value for current row
*/
public Value getValue(int filterId, Column column) {
Object x = current.row(filterId);
assert x != null;
Row row = current.isRow(filterId) ? (Row) x : ((Cursor) x).get();
int columnId = column.getColumnId();
if (columnId == -1) {
return ValueLong.get(row.getKey());
}
Value value = row.getValue(column.getColumnId());
if (value == null) {
throw DbException.throwInternalError("value is null: " + column + " " + row);
}
return value;
}
private void start() {
// initialize current row
current = new JoinRow(new Object[filters.length]);
// initialize top cursor
Cursor cursor;
if (batchedSubQuery) {
assert viewTopFutureCursor != null;
cursor = get(viewTopFutureCursor);
} else {
// setup usual index cursor
TableFilter f = top.filter;
IndexCursor indexCursor = f.getIndexCursor();
indexCursor.find(f.getSession(), f.getIndexConditions());
cursor = indexCursor;
}
current.updateRow(top.id, cursor, JoinRow.S_NULL, JoinRow.S_CURSOR);
// we need fake first row because batchedNext always will move to the next row
JoinRow fake = new JoinRow(null);
fake.next = current;
current = fake;
}
/**
* Get next row from the join batch.
*
* @return
*/
public boolean next() {
if (!started) {
start();
started = true;
}
if (additionalFilter == null) {
if (batchedNext()) {
assert current.isComplete();
return true;
}
return false;
}
for (;;) {
if (!found) {
if (!batchedNext()) {
return false;
}
assert current.isComplete();
found = true;
additionalFilter.reset();
}
// we call furtherFilter in usual way outside of this batch because it is more effective
if (additionalFilter.next()) {
return true;
}
found = false;
}
}
private static Cursor get(Future<Cursor> f) {
Cursor c;
try {
c = f.get();
} catch (Exception e) {
throw DbException.convert(e);
}
return c == null ? EMPTY_CURSOR : c;
}
private boolean batchedNext() {
if (current == null) {
// after last
return false;
}
// go next
current = current.next;
if (current == null) {
return false;
}
current.prev = null;
final int lastJfId = filters.length - 1;
int jfId = lastJfId;
while (current.row(jfId) == null) {
// lookup for the first non fetched filter for the current row
jfId--;
}
for (;;) {
fetchCurrent(jfId);
if (!current.isDropped()) {
// if current was not dropped then it must be fetched successfully
if (jfId == lastJfId) {
// the whole join row is ready to be returned
return true;
}
JoinFilter join = filters[jfId + 1];
if (join.isBatchFull()) {
// get future cursors for join and go right to fetch them
current = join.find(current);
}
if (current.row(join.id) != null) {
// either find called or outer join with null-row
jfId = join.id;
continue;
}
}
// we have to go down and fetch next cursors for jfId if it is possible
if (current.next == null) {
// either dropped or null-row
if (current.isDropped()) {
current = current.prev;
if (current == null) {
return false;
}
}
assert !current.isDropped();
assert jfId != lastJfId;
jfId = 0;
while (current.row(jfId) != null) {
jfId++;
}
// force find on half filled batch (there must be either searchRows
// or Cursor.EMPTY set for null-rows)
current = filters[jfId].find(current);
} else {
// here we don't care if the current was dropped
current = current.next;
assert !current.isRow(jfId);
while (current.row(jfId) == null) {
assert jfId != top.id;
// need to go left and fetch more search rows
jfId--;
assert !current.isRow(jfId);
}
}
}
}
@SuppressWarnings("unchecked")
private void fetchCurrent(final int jfId) {
assert current.prev == null || current.prev.isRow(jfId) : "prev must be already fetched";
assert jfId == 0 || current.isRow(jfId - 1) : "left must be already fetched";
assert !current.isRow(jfId) : "double fetching";
Object x = current.row(jfId);
assert x != null : "x null";
// in case of outer join we don't have any future around empty cursor
boolean newCursor = x == EMPTY_CURSOR;
if (newCursor) {
if (jfId == 0) {
// the top cursor is new and empty, then the whole select will not produce any rows
current.drop();
return;
}
} else if (current.isFuture(jfId)) {
// get cursor from a future
x = get((Future<Cursor>) x);
current.updateRow(jfId, x, JoinRow.S_FUTURE, JoinRow.S_CURSOR);
newCursor = true;
}
final JoinFilter jf = filters[jfId];
Cursor c = (Cursor) x;
assert c != null;
JoinFilter join = jf.join;
for (;;) {
if (c == null || !c.next()) {
if (newCursor && jf.isOuterJoin()) {
// replace cursor with null-row
current.updateRow(jfId, jf.getNullRow(), JoinRow.S_CURSOR, JoinRow.S_ROW);
c = null;
newCursor = false;
} else {
// cursor is done, drop it
current.drop();
return;
}
}
if (!jf.isOk(c == null)) {
// try another row from the cursor
continue;
}
boolean joinEmpty = false;
if (join != null && !join.collectSearchRows()) {
if (join.isOuterJoin()) {
joinEmpty = true;
} else {
// join will fail, try next row in the cursor
continue;
}
}
if (c != null) {
current = current.copyBehind(jfId);
// update jf, set current row from cursor
current.updateRow(jfId, c.get(), JoinRow.S_CURSOR, JoinRow.S_ROW);
}
if (joinEmpty) {
// update jf.join, set an empty cursor
current.updateRow(join.id, EMPTY_CURSOR, JoinRow.S_NULL, JoinRow.S_CURSOR);
}
return;
}
}
/**
* @return Adapter to allow joining to this batch in sub-queries and views.
*/
private IndexLookupBatch viewIndexLookupBatch(ViewIndex viewIndex) {
return new ViewIndexLookupBatch(viewIndex);
}
/**
* Create index lookup batch for a view index.
*
* @param viewIndex view index
* @return index lookup batch or {@code null} if batching is not supported for this query
*/
public static IndexLookupBatch createViewIndexLookupBatch(ViewIndex viewIndex) {
Query query = viewIndex.getQuery();
if (query.isUnion()) {
ViewIndexLookupBatchUnion unionBatch = new ViewIndexLookupBatchUnion(viewIndex);
return unionBatch.initialize() ? unionBatch : null;
}
JoinBatch jb = ((Select) query).getJoinBatch();
if (jb == null || jb.getLookupBatch(0) == null) {
// our sub-query is not batched or is top batched sub-query
return null;
}
assert !jb.batchedSubQuery;
jb.batchedSubQuery = true;
return jb.viewIndexLookupBatch(viewIndex);
}
/**
* Create fake index lookup batch for non-batched table filter.
*
* @param filter the table filter
* @return fake index lookup batch
*/
public static IndexLookupBatch createFakeIndexLookupBatch(TableFilter filter) {
return new FakeLookupBatch(filter);
}
@Override
public String toString() {
return "JoinBatch->\nprev->" + (current == null ? null : current.prev) +
"\ncurr->" + current +
"\nnext->" + (current == null ? null : current.next);
}
/**
* Table filter participating in batched join.
*/
private static final class JoinFilter {
private final TableFilter filter;
private final JoinFilter join;
private final int id;
private final IndexLookupBatch lookupBatch;
private JoinFilter(IndexLookupBatch lookupBatch, TableFilter filter, JoinFilter join) {
this.filter = filter;
this.id = filter.getJoinFilterId();
this.join = join;
this.lookupBatch = lookupBatch;
assert lookupBatch != null || id == 0;
}
private void reset() {
if (lookupBatch != null) {
lookupBatch.reset();
}
}
private Row getNullRow() {
return filter.getTable().getNullRow();
}
private boolean isOuterJoin() {
return filter.isJoinOuter();
}
private boolean isBatchFull() {
return lookupBatch.isBatchFull();
}
private boolean isOk(boolean ignoreJoinCondition) {
boolean filterOk = filter.isOk(filter.getFilterCondition());
boolean joinOk = filter.isOk(filter.getJoinCondition());
return filterOk && (ignoreJoinCondition || joinOk);
}
private boolean collectSearchRows() {
assert !isBatchFull();
IndexCursor c = filter.getIndexCursor();
c.prepare(filter.getSession(), filter.getIndexConditions());
if (c.isAlwaysFalse()) {
return false;
}
return lookupBatch.addSearchRows(c.getStart(), c.getEnd());
}
private List<Future<Cursor>> find() {
return lookupBatch.find();
}
private JoinRow find(JoinRow current) {
assert current != null;
// lookupBatch is allowed to be empty when we have some null-rows and forced find call
List<Future<Cursor>> result = lookupBatch.find();
// go backwards and assign futures
for (int i = result.size(); i > 0;) {
assert current.isRow(id - 1);
if (current.row(id) == EMPTY_CURSOR) {
// outer join support - skip row with existing empty cursor
current = current.prev;
continue;
}
assert current.row(id) == null;
Future<Cursor> future = result.get(--i);
if (future == null) {
current.updateRow(id, EMPTY_CURSOR, JoinRow.S_NULL, JoinRow.S_CURSOR);
} else {
current.updateRow(id, future, JoinRow.S_NULL, JoinRow.S_FUTURE);
}
if (current.prev == null || i == 0) {
break;
}
current = current.prev;
}
// handle empty cursors (because of outer joins) at the beginning
while (current.prev != null && current.prev.row(id) == EMPTY_CURSOR) {
current = current.prev;
}
assert current.prev == null || current.prev.isRow(id);
assert current.row(id) != null;
assert !current.isRow(id);
// the last updated row
return current;
}
@Override
public String toString() {
return "JoinFilter->" + filter;
}
}
/**
* Linked row in batched join.
*/
private static final class JoinRow {
private static final long S_NULL = 0;
private static final long S_FUTURE = 1;
private static final long S_CURSOR = 2;
private static final long S_ROW = 3;
private static final long S_MASK = 3;
/**
* May contain one of the following:
* <br/>- {@code null}: means that we need to get future cursor for this row
* <br/>- {@link Future}: means that we need to get a new {@link Cursor} from the {@link Future}
* <br/>- {@link Cursor}: means that we need to fetch {@link Row}s from the {@link Cursor}
* <br/>- {@link Row}: the {@link Row} is already fetched and is ready to be used
*/
private Object[] row;
private long state;
private JoinRow prev;
private JoinRow next;
/**
* @param row Row.
*/
private JoinRow(Object[] row) {
this.row = row;
}
/**
* @param joinFilterId Join filter id.
* @return Row state.
*/
private long getState(int joinFilterId) {
return (state >>> (joinFilterId << 1)) & S_MASK;
}
/**
* Allows to do a state transition in the following order:
* 0. Slot contains {@code null} ({@link #S_NULL}).
* 1. Slot contains {@link Future} ({@link #S_FUTURE}).
* 2. Slot contains {@link Cursor} ({@link #S_CURSOR}).
* 3. Slot contains {@link Row} ({@link #S_ROW}).
*
* @param joinFilterId {@link JoinRow} filter id.
* @param i Increment by this number of moves.
*/
private void incrementState(int joinFilterId, long i) {
assert i > 0 : i;
state += i << (joinFilterId << 1);
}
private void updateRow(int joinFilterId, Object x, long oldState, long newState) {
assert getState(joinFilterId) == oldState : "old state: " + getState(joinFilterId);
row[joinFilterId] = x;
incrementState(joinFilterId, newState - oldState);
assert getState(joinFilterId) == newState : "new state: " + getState(joinFilterId);
}
private Object row(int joinFilterId) {
return row[joinFilterId];
}
private boolean isRow(int joinFilterId) {
return getState(joinFilterId) == S_ROW;
}
private boolean isFuture(int joinFilterId) {
return getState(joinFilterId) == S_FUTURE;
}
private boolean isCursor(int joinFilterId) {
return getState(joinFilterId) == S_CURSOR;
}
private boolean isComplete() {
return isRow(row.length - 1);
}
private boolean isDropped() {
return row == null;
}
private void drop() {
if (prev != null) {
prev.next = next;
}
if (next != null) {
next.prev = prev;
}
row = null;
}
/**
* Copy this JoinRow behind itself in linked list of all in progress rows.
*
* @param jfId The last fetched filter id.
* @return The copy.
*/
private JoinRow copyBehind(int jfId) {
assert isCursor(jfId);
assert jfId + 1 == row.length || row[jfId + 1] == null;
Object[] r = new Object[row.length];
if (jfId != 0) {
System.arraycopy(row, 0, r, 0, jfId);
}
JoinRow copy = new JoinRow(r);
copy.state = state;
if (prev != null) {
copy.prev = prev;
prev.next = copy;
}
prev = copy;
copy.next = this;
return copy;
}
@Override
public String toString() {
return "JoinRow->" + Arrays.toString(row);
}
}
/**
* Fake Lookup batch for indexes which do not support batching but have to participate
* in batched joins.
*/
private static final class FakeLookupBatch implements IndexLookupBatch {
private final TableFilter filter;
private SearchRow first;
private SearchRow last;
private boolean full;
private final List<Future<Cursor>> result = new SingletonList<Future<Cursor>>();
private FakeLookupBatch(TableFilter filter) {
this.filter = filter;
}
@Override
public String getPlanSQL() {
return "fake";
}
@Override
public void reset() {
full = false;
first = last = null;
result.set(0, null);
}
@Override
public boolean addSearchRows(SearchRow first, SearchRow last) {
assert !full;
this.first = first;
this.last = last;
full = true;
return true;
}
@Override
public boolean isBatchFull() {
return full;
}
@Override
public List<Future<Cursor>> find() {
if (!full) {
return Collections.emptyList();
}
Cursor c = filter.getIndex().find(filter, first, last);
result.set(0, new DoneFuture<Cursor>(c));
full = false;
first = last = null;
return result;
}
}
/**
* Simple singleton list.
*/
private static final class SingletonList<E> extends AbstractList<E> {
private E element;
@Override
public E get(int index) {
assert index == 0;
return element;
}
@Override
public E set(int index, E element) {
assert index == 0;
this.element = element;
return null;
}
@Override
public int size() {
return 1;
}
}
/**
* Base class for SELECT and SELECT UNION view index lookup batches.
*/
private abstract static class ViewIndexLookupBatchBase<R extends QueryRunnerBase>
implements IndexLookupBatch {
protected final ViewIndex viewIndex;
private final ArrayList<Future<Cursor>> result = New.arrayList();
private int resultSize;
private boolean findCalled;
protected ViewIndexLookupBatchBase(ViewIndex viewIndex) {
this.viewIndex = viewIndex;
}
@Override
public String getPlanSQL() {
return "view";
}
protected abstract boolean collectSearchRows(R r);
protected abstract R newQueryRunner();
protected abstract void startQueryRunners(int resultSize);
protected final boolean resetAfterFind() {
if (!findCalled) {
return false;
}
findCalled = false;
// method find was called, we need to reset futures to initial state for reuse
for (int i = 0; i < resultSize; i++) {
queryRunner(i).reset();
}
resultSize = 0;
return true;
}
@SuppressWarnings("unchecked")
protected R queryRunner(int i) {
return (R) result.get(i);
}
@Override
public final boolean addSearchRows(SearchRow first, SearchRow last) {
resetAfterFind();
viewIndex.setupQueryParameters(viewIndex.getSession(), first, last, null);
R r;
if (resultSize < result.size()) {
// get reused runner
r = queryRunner(resultSize);
} else {
// create new runner
result.add(r = newQueryRunner());
}
r.first = first;
r.last = last;
if (!collectSearchRows(r)) {
r.clear();
return false;
}
resultSize++;
return true;
}
@Override
public void reset() {
if (resultSize != 0 && !resetAfterFind()) {
// find was not called, need to just clear runners
for (int i = 0; i < resultSize; i++) {
((QueryRunnerBase) result.get(i)).clear();
}
resultSize = 0;
}
}
@Override
public final List<Future<Cursor>> find() {
if (resultSize == 0) {
return Collections.emptyList();
}
findCalled = true;
startQueryRunners(resultSize);
return resultSize == result.size() ? result : result.subList(0, resultSize);
}
}
/**
* Lazy query runner base.
*/
private abstract static class QueryRunnerBase extends LazyFuture<Cursor> {
protected final ViewIndex viewIndex;
protected SearchRow first;
protected SearchRow last;
public QueryRunnerBase(ViewIndex viewIndex) {
this.viewIndex = viewIndex;
}
protected void clear() {
first = last = null;
}
@Override
public final boolean reset() {
if (super.reset()) {
return true;
}
// this query runner was never executed, need to clear manually
clear();
return false;
}
protected final ViewCursor newCursor(LocalResult localResult) {
ViewCursor cursor = new ViewCursor(viewIndex, localResult, first, last);
clear();
return cursor;
}
}
/**
* View index lookup batch for a simple SELECT.
*/
private final class ViewIndexLookupBatch extends ViewIndexLookupBatchBase<QueryRunner> {
private ViewIndexLookupBatch(ViewIndex viewIndex) {
super(viewIndex);
}
@Override
protected QueryRunner newQueryRunner() {
return new QueryRunner(viewIndex);
}
@Override
protected boolean collectSearchRows(QueryRunner r) {
return top.collectSearchRows();
}
@Override
public boolean isBatchFull() {
return top.isBatchFull();
}
@Override
protected void startQueryRunners(int resultSize) {
// we do batched find only for top table filter and then lazily run the ViewIndex query
// for each received top future cursor
List<Future<Cursor>> topFutureCursors = top.find();
if (topFutureCursors.size() != resultSize) {
throw DbException.throwInternalError("Unexpected result size: " + topFutureCursors.size() +
", expected :" + resultSize);
}
for (int i = 0; i < resultSize; i++) {
QueryRunner r = queryRunner(i);
r.topFutureCursor = topFutureCursors.get(i);
}
}
}
/**
* Query runner.
*/
private final class QueryRunner extends QueryRunnerBase {
private Future<Cursor> topFutureCursor;
public QueryRunner(ViewIndex viewIndex) {
super(viewIndex);
}
protected void clear() {
super.clear();
topFutureCursor = null;
}
@Override
protected Cursor run() throws Exception {
if (topFutureCursor == null) {
// if the top cursor is empty then the whole query will produce empty result
return EMPTY_CURSOR;
}
viewIndex.setupQueryParameters(viewIndex.getSession(), first, last, null);
JoinBatch.this.viewTopFutureCursor = topFutureCursor;
LocalResult localResult;
try {
localResult = viewIndex.getQuery().query(0);
} finally {
JoinBatch.this.viewTopFutureCursor = null;
}
return newCursor(localResult);
}
}
/**
* View index lookup batch for UNION queries.
*/
private static final class ViewIndexLookupBatchUnion
extends ViewIndexLookupBatchBase<QueryRunnerUnion> {
private ArrayList<JoinFilter> filters;
private ArrayList<JoinBatch> joinBatches;
private boolean onlyBatchedQueries = true;
protected ViewIndexLookupBatchUnion(ViewIndex viewIndex) {
super(viewIndex);
}
private boolean initialize() {
return collectJoinBatches(viewIndex.getQuery()) && joinBatches != null;
}
private boolean collectJoinBatches(Query query) {
if (query.isUnion()) {
SelectUnion union = (SelectUnion) query;
return collectJoinBatches(union.getLeft()) &&
collectJoinBatches(union.getRight());
}
Select select = (Select) query;
JoinBatch jb = select.getJoinBatch();
if (jb == null) {
onlyBatchedQueries = false;
} else {
if (jb.getLookupBatch(0) == null) {
// we are top sub-query
return false;
}
assert !jb.batchedSubQuery;
jb.batchedSubQuery = true;
if (joinBatches == null) {
joinBatches = New.arrayList();
filters = New.arrayList();
}
filters.add(jb.filters[0]);
joinBatches.add(jb);
}
return true;
}
@Override
public boolean isBatchFull() {
// if at least one is full
for (int i = 0; i < filters.size(); i++) {
if (filters.get(i).isBatchFull()) {
return true;
}
}
return false;
}
@Override
protected boolean collectSearchRows(QueryRunnerUnion r) {
boolean collected = false;
for (int i = 0; i < filters.size(); i++) {
if (filters.get(i).collectSearchRows()) {
collected = true;
} else {
r.topFutureCursors[i] = EMPTY_FUTURE_CURSOR;
}
}
return collected || !onlyBatchedQueries;
}
@Override
protected QueryRunnerUnion newQueryRunner() {
return new QueryRunnerUnion(this);
}
@Override
protected void startQueryRunners(int resultSize) {
for (int f = 0; f < filters.size(); f++) {
List<Future<Cursor>> topFutureCursors = filters.get(f).find();
int r = 0, c = 0;
for (; r < resultSize; r++) {
Future<Cursor>[] cs = queryRunner(r).topFutureCursors;
if (cs[f] == null) {
cs[f] = topFutureCursors.get(c++);
}
}
assert r == resultSize;
assert c == topFutureCursors.size();
}
}
}
/**
* Query runner for UNION.
*/
private static class QueryRunnerUnion extends QueryRunnerBase {
private ViewIndexLookupBatchUnion batchUnion;
private Future<Cursor>[] topFutureCursors;
@SuppressWarnings("unchecked")
public QueryRunnerUnion(ViewIndexLookupBatchUnion batchUnion) {
super(batchUnion.viewIndex);
this.batchUnion = batchUnion;
topFutureCursors = new Future[batchUnion.filters.size()];
}
@Override
protected void clear() {
super.clear();
for (int i = 0; i < topFutureCursors.length; i++) {
topFutureCursors[i] = null;
}
}
@Override
protected Cursor run() throws Exception {
viewIndex.setupQueryParameters(viewIndex.getSession(), first, last, null);
ArrayList<JoinBatch> joinBatches = batchUnion.joinBatches;
for (int i = 0, size = joinBatches.size(); i < size; i++) {
assert topFutureCursors[i] != null;
joinBatches.get(i).viewTopFutureCursor = topFutureCursors[i];
}
LocalResult localResult;
try {
localResult = viewIndex.getQuery().query(0);
} finally {
for (int i = 0, size = joinBatches.size(); i < size; i++) {
joinBatches.get(i).viewTopFutureCursor = null;
}
}
return newCursor(localResult);
}
}
}
......@@ -19,7 +19,6 @@ public class SubQueryInfo {
private TableFilter[] filters;
private int filter;
private SortOrder sortOrder;
private boolean preliminary;
private SubQueryInfo upper;
/**
......@@ -28,17 +27,14 @@ public class SubQueryInfo {
* @param filters table filters
* @param filter current filter
* @param sortOrder sort order
* @param preliminary if this is a preliminary query optimization
* without global conditions
*/
public SubQueryInfo(SubQueryInfo upper, int[] masks, TableFilter[] filters, int filter,
SortOrder sortOrder, boolean preliminary) {
SortOrder sortOrder) {
this.upper = upper;
this.masks = masks;
this.filters = filters;
this.filter = filter;
this.sortOrder = sortOrder;
this.preliminary = preliminary;
}
public SubQueryInfo getUpper() {
......@@ -60,8 +56,4 @@ public class SubQueryInfo {
public SortOrder getSortOrder() {
return sortOrder;
}
public boolean isPreliminary() {
return preliminary;
}
}
......@@ -127,6 +127,10 @@ public abstract class Table extends SchemaObjectBase {
}
}
public boolean isView() {
return false;
}
/**
* Lock the table for the given session.
* This method waits until the lock is granted.
......
......@@ -5,12 +5,7 @@
*/
package org.h2.table;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import org.h2.command.Parser;
import org.h2.command.dml.Select;
import org.h2.engine.Right;
......@@ -21,16 +16,15 @@ import org.h2.expression.Comparison;
import org.h2.expression.ConditionAndOr;
import org.h2.expression.Expression;
import org.h2.expression.ExpressionColumn;
import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexCondition;
import org.h2.index.IndexCursor;
import org.h2.index.ViewIndex;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.result.SortOrder;
import org.h2.util.DoneFuture;
import org.h2.util.New;
import org.h2.util.StatementBuilder;
import org.h2.util.StringUtils;
......@@ -48,33 +42,6 @@ public class TableFilter implements ColumnResolver {
private static final int BEFORE_FIRST = 0, FOUND = 1, AFTER_LAST = 2,
NULL_ROW = 3;
private static final Cursor EMPTY_CURSOR = new Cursor() {
@Override
public boolean previous() {
return false;
}
@Override
public boolean next() {
return false;
}
@Override
public SearchRow getSearchRow() {
return null;
}
@Override
public Row get() {
return null;
}
@Override
public String toString() {
return "EMPTY_CURSOR";
}
};
/**
* Whether this is a direct or indirect (nested) outer join
*/
......@@ -94,7 +61,7 @@ public class TableFilter implements ColumnResolver {
* Batched join support.
*/
private JoinBatch joinBatch;
private JoinFilter joinFilter;
private int joinFilterId = -1;
/**
* Indicates that this filter is used in the plan.
......@@ -168,6 +135,10 @@ public class TableFilter implements ColumnResolver {
hashCode = session.nextObjectId();
}
public IndexCursor getIndexCursor() {
return cursor;
}
@Override
public Select getSelect() {
return select;
......@@ -339,44 +310,27 @@ public class TableFilter implements ColumnResolver {
* Start the query. This will reset the scan counts.
*
* @param s the session
* @return join batch if query runs over index which supports batched lookups, null otherwise
*/
public JoinBatch startQuery(Session s) {
joinBatch = null;
joinFilter = null;
public void startQuery(Session s) {
this.session = s;
scanCount = 0;
if (nestedJoin != null) {
nestedJoin.startQuery(s);
}
JoinBatch batch = null;
if (join != null) {
batch = join.startQuery(s);
}
IndexLookupBatch lookupBatch = null;
if (batch == null && select != null && select.getTopTableFilter() != this) {
lookupBatch = index.createLookupBatch(this);
if (lookupBatch != null) {
batch = new JoinBatch(join);
}
join.startQuery(s);
}
if (batch != null) {
if (nestedJoin != null) {
throw DbException.getUnsupportedException("nested join with batched index");
}
if (lookupBatch == null) {
lookupBatch = index.createLookupBatch(this);
}
joinBatch = batch;
joinFilter = batch.register(this, lookupBatch);
}
return batch;
}
/**
* Reset to the current position.
*/
public void reset() {
if (joinBatch != null && joinFilterId == 0) {
// reset join batch only on top table filter
joinBatch.reset();
return;
}
if (nestedJoin != null) {
nestedJoin.reset();
}
......@@ -387,6 +341,83 @@ public class TableFilter implements ColumnResolver {
foundOne = false;
}
private boolean isAlwaysTopTableFilter(int filter) {
if (filter != 0) {
return false;
}
// check if we are at the top table filters all the way up
SubQueryInfo info = session.getSubQueryInfo();
for (;;) {
if (info == null) {
return true;
}
if (info.getFilter() != 0) {
return false;
}
info = info.getUpper();
}
}
/**
* Attempt to initialize batched join.
*
* @param id join filter id (index of this table filter in join list)
* @param jb join batch if it is already created
* @return join batch if query runs over index which supports batched lookups, {@code null} otherwise
*/
public JoinBatch prepareJoinBatch(JoinBatch jb, TableFilter[] filters, int filter) {
joinBatch = null;
joinFilterId = -1;
if (getTable().isView()) {
session.pushSubQueryInfo(masks, filters, filter, select.getSortOrder());
try {
((ViewIndex) index).getQuery().prepareJoinBatch();
} finally {
session.popSubQueryInfo();
}
}
// For globally top table filter we don't need to create lookup batch, because
// currently it will not be used (this will be shown in ViewIndex.getPlanSQL()). Probably
// later on it will make sense to create it to better support X IN (...) conditions,
// but this needs to be implemented separately. If isAlwaysTopTableFilter is false
// then we either not a top table filter or top table filter in a sub-query, which
// in turn is not top in outer query, thus we need to enable batching here to allow
// outer query run batched join against this sub-query.
IndexLookupBatch lookupBatch = null;
if (jb == null && select != null && !isAlwaysTopTableFilter(filter)) {
lookupBatch = index.createLookupBatch(this);
if (lookupBatch != null) {
jb = new JoinBatch(filter + 1, join);
}
}
if (jb != null) {
if (nestedJoin != null) {
throw DbException.throwInternalError();
}
joinBatch = jb;
joinFilterId = filter;
if (lookupBatch == null && !isAlwaysTopTableFilter(filter)) {
// createLookupBatch will be called at most once because jb can be
// created only if lookupBatch is already not null from the call above.
lookupBatch = index.createLookupBatch(this);
if (lookupBatch == null) {
// the index does not support lookup batching, need to fake it because we are not top
lookupBatch = JoinBatch.createFakeIndexLookupBatch(this);
}
}
jb.register(this, lookupBatch);
}
return jb;
}
public int getJoinFilterId() {
return joinFilterId;
}
public JoinBatch getJoinBatch() {
return joinBatch;
}
/**
* Check if there are more rows to read.
*
......@@ -394,7 +425,7 @@ public class TableFilter implements ColumnResolver {
*/
public boolean next() {
if (joinBatch != null) {
// will happen only on topTableFilter since jbatch.next does not call join.next()
// will happen only on topTableFilter since joinBatch.next() does not call join.next()
return joinBatch.next();
}
if (state == AFTER_LAST) {
......@@ -506,7 +537,7 @@ public class TableFilter implements ColumnResolver {
// scanCount);
}
private boolean isOk(Expression condition) {
boolean isOk(Expression condition) {
if (condition == null) {
return true;
}
......@@ -753,6 +784,19 @@ public class TableFilter implements ColumnResolver {
if (index != null) {
buff.append('\n');
StatementBuilder planBuff = new StatementBuilder();
if (joinBatch != null) {
IndexLookupBatch lookupBatch = joinBatch.getLookupBatch(joinFilterId);
if (lookupBatch == null) {
if (joinFilterId != 0) {
throw DbException.throwInternalError();
}
} else {
planBuff.append("batched:");
String batchPlan = lookupBatch.getPlanSQL();
planBuff.append(batchPlan);
planBuff.append(" ");
}
}
planBuff.append(index.getPlanSQL());
if (indexConditions.size() > 0) {
planBuff.append(": ");
......@@ -965,7 +1009,7 @@ public class TableFilter implements ColumnResolver {
@Override
public Value getValue(Column column) {
if (joinBatch != null) {
return joinBatch.getValue(joinFilter, column);
return joinBatch.getValue(joinFilterId, column);
}
if (currentSearchRow == null) {
return null;
......@@ -1115,562 +1159,4 @@ public class TableFilter implements ColumnResolver {
*/
void accept(TableFilter f);
}
/**
* Support for asynchronous batched index lookups on joins.
*
* @see Index#findBatched(TableFilter, java.util.Collection)
* @see Index#getPreferedLookupBatchSize()
*
* @author Sergi Vladykin
*/
private static final class JoinBatch {
int filtersCount;
JoinFilter[] filters;
JoinFilter top;
boolean started;
JoinRow current;
boolean found;
/**
* This filter joined after this batched join and can be used normally.
*/
final TableFilter additionalFilter;
/**
* @param additionalFilter table filter after this batched join.
*/
private JoinBatch(TableFilter additionalFilter) {
this.additionalFilter = additionalFilter;
}
/**
* @param filter table filter
* @param lookupBatch lookup batch
*/
private JoinFilter register(TableFilter filter, IndexLookupBatch lookupBatch) {
assert filter != null;
filtersCount++;
return top = new JoinFilter(lookupBatch, filter, top);
}
/**
* @param filterId table filter id
* @param column column
* @return column value for current row
*/
private Value getValue(JoinFilter filter, Column column) {
Object x = current.row(filter.id);
assert x != null;
Row row = current.isRow(filter.id) ? (Row) x : ((Cursor) x).get();
int columnId = column.getColumnId();
if (columnId == -1) {
return ValueLong.get(row.getKey());
}
Value value = row.getValue(column.getColumnId());
if (value == null) {
throw DbException.throwInternalError("value is null: " + column + " " + row);
}
return value;
}
private void start() {
if (filtersCount > 32) {
// This is because we store state in a 64 bit field, 2 bits per joined table.
throw DbException.getUnsupportedException("Too many tables in join (at most 32 supported).");
}
// fill filters
filters = new JoinFilter[filtersCount];
JoinFilter jf = top;
for (int i = 0; i < filtersCount; i++) {
filters[jf.id = i] = jf;
jf = jf.join;
}
// initialize current row
current = new JoinRow(new Object[filtersCount]);
current.updateRow(top.id, top.filter.cursor, JoinRow.S_NULL, JoinRow.S_CURSOR);
// initialize top cursor
top.filter.cursor.find(top.filter.session, top.filter.indexConditions);
// we need fake first row because batchedNext always will move to the next row
JoinRow fake = new JoinRow(null);
fake.next = current;
current = fake;
}
private boolean next() {
if (!started) {
start();
started = true;
}
if (additionalFilter == null) {
if (batchedNext()) {
assert current.isComplete();
return true;
}
return false;
}
for (;;) {
if (!found) {
if (!batchedNext()) {
return false;
}
assert current.isComplete();
found = true;
additionalFilter.reset();
}
// we call furtherFilter in usual way outside of this batch because it is more effective
if (additionalFilter.next()) {
return true;
}
found = false;
}
}
private static Cursor get(Future<Cursor> f) {
try {
return f.get();
} catch (Exception e) {
throw DbException.convert(e);
}
}
private boolean batchedNext() {
if (current == null) {
// after last
return false;
}
// go next
current = current.next;
if (current == null) {
return false;
}
current.prev = null;
final int lastJfId = filtersCount - 1;
int jfId = lastJfId;
while (current.row(jfId) == null) {
// lookup for the first non fetched filter for the current row
jfId--;
}
for (;;) {
fetchCurrent(jfId);
if (!current.isDropped()) {
// if current was not dropped then it must be fetched successfully
if (jfId == lastJfId) {
// the whole join row is ready to be returned
return true;
}
JoinFilter join = filters[jfId + 1];
if (join.isBatchFull()) {
// get future cursors for join and go right to fetch them
current = join.find(current);
}
if (current.row(join.id) != null) {
// either find called or outer join with null row
jfId = join.id;
continue;
}
}
// we have to go down and fetch next cursors for jfId if it is possible
if (current.next == null) {
// either dropped or null-row
if (current.isDropped()) {
current = current.prev;
if (current == null) {
return false;
}
}
assert !current.isDropped();
assert jfId != lastJfId;
jfId = 0;
while (current.row(jfId) != null) {
jfId++;
}
// force find on half filled batch (there must be either searchRows
// or Cursor.EMPTY set for null-rows)
current = filters[jfId].find(current);
} else {
// here we don't care if the current was dropped
current = current.next;
assert !current.isRow(jfId);
while (current.row(jfId) == null) {
assert jfId != top.id;
// need to go left and fetch more search rows
jfId--;
assert !current.isRow(jfId);
}
}
}
}
@SuppressWarnings("unchecked")
private void fetchCurrent(final int jfId) {
assert current.prev == null || current.prev.isRow(jfId) : "prev must be already fetched";
assert jfId == 0 || current.isRow(jfId - 1) : "left must be already fetched";
assert !current.isRow(jfId) : "double fetching";
Object x = current.row(jfId);
assert x != null : "x null";
final JoinFilter jf = filters[jfId];
// in case of outer join we don't have any future around empty cursor
boolean newCursor = x == EMPTY_CURSOR;
if (!newCursor && current.isFuture(jfId)) {
// get cursor from a future
x = get((Future<Cursor>) x);
current.updateRow(jfId, x, JoinRow.S_FUTURE, JoinRow.S_CURSOR);
newCursor = true;
}
Cursor c = (Cursor) x;
assert c != null;
JoinFilter join = jf.join;
for (;;) {
if (c == null || !c.next()) {
if (newCursor && jf.isOuterJoin()) {
// replace cursor with null-row
current.updateRow(jfId, jf.getNullRow(), JoinRow.S_CURSOR, JoinRow.S_ROW);
c = null;
newCursor = false;
} else {
// cursor is done, drop it
current.drop();
return;
}
}
if (!jf.isOk(c == null)) {
// try another row from the cursor
continue;
}
boolean joinEmpty = false;
if (join != null && !join.collectSearchRows()) {
if (join.isOuterJoin()) {
joinEmpty = true;
} else {
// join will fail, try next row in the cursor
continue;
}
}
if (c != null) {
current = current.copyBehind(jfId);
// get current row from cursor
current.updateRow(jfId, c.get(), JoinRow.S_CURSOR, JoinRow.S_ROW);
}
if (joinEmpty) {
current.updateRow(join.id, EMPTY_CURSOR, JoinRow.S_NULL, JoinRow.S_CURSOR);
}
return;
}
}
@Override
public String toString() {
return "JoinBatch->\nprev->" + (current == null ? null : current.prev) +
"\ncurr->" + current +
"\nnext->" + (current == null ? null : current.next);
}
}
/**
* Table filter participating in batched join.
*/
private static final class JoinFilter {
final TableFilter filter;
final JoinFilter join;
int id;
IndexLookupBatch lookupBatch;
private JoinFilter(IndexLookupBatch lookupBatch, TableFilter filter, JoinFilter join) {
this.filter = filter;
this.join = join;
this.lookupBatch = lookupBatch != null ? lookupBatch : new FakeLookupBatch(filter);
}
public Row getNullRow() {
return filter.table.getNullRow();
}
private boolean isOuterJoin() {
return filter.joinOuter;
}
private boolean isBatchFull() {
return lookupBatch.isBatchFull();
}
private boolean isOk(boolean ignoreJoinCondition) {
boolean filterOk = filter.isOk(filter.filterCondition);
boolean joinOk = filter.isOk(filter.joinCondition);
return filterOk && (ignoreJoinCondition || joinOk);
}
private boolean collectSearchRows() {
assert !isBatchFull();
IndexCursor c = filter.cursor;
c.prepare(filter.session, filter.indexConditions);
if (c.isAlwaysFalse()) {
return false;
}
lookupBatch.addSearchRows(c.getStart(), c.getEnd());
return true;
}
private JoinRow find(JoinRow current) {
assert current != null;
// lookupBatch is allowed to be empty when we have some null-rows and forced find call
List<Future<Cursor>> result = lookupBatch.find();
// go backwards and assign futures
for (int i = result.size(); i > 0;) {
assert current.isRow(id - 1);
if (current.row(id) == EMPTY_CURSOR) {
// outer join support - skip row with existing empty cursor
current = current.prev;
continue;
}
assert current.row(id) == null;
Future<Cursor> future = result.get(--i);
if (future == null) {
current.updateRow(id, EMPTY_CURSOR, JoinRow.S_NULL, JoinRow.S_CURSOR);
} else {
current.updateRow(id, future, JoinRow.S_NULL, JoinRow.S_FUTURE);
}
if (current.prev == null || i == 0) {
break;
}
current = current.prev;
}
// handle empty cursors (because of outer joins) at the beginning
while (current.prev != null && current.prev.row(id) == EMPTY_CURSOR) {
current = current.prev;
}
assert current.prev == null || current.prev.isRow(id);
assert current.row(id) != null;
assert !current.isRow(id);
// the last updated row
return current;
}
@Override
public String toString() {
return "JoinFilter->" + filter;
}
}
/**
* Linked row in batched join.
*/
private static final class JoinRow {
private static final long S_NULL = 0;
private static final long S_FUTURE = 1;
private static final long S_CURSOR = 2;
private static final long S_ROW = 3;
private static final long S_MASK = 3;
/**
* May contain one of the following:
* <br/>- {@code null}: means that we need to get future cursor for this row
* <br/>- {@link Future}: means that we need to get a new {@link Cursor} from the {@link Future}
* <br/>- {@link Cursor}: means that we need to fetch {@link Row}s from the {@link Cursor}
* <br/>- {@link Row}: the {@link Row} is already fetched and is ready to be used
*/
Object[] row;
long state;
JoinRow prev;
JoinRow next;
/**
* @param row Row.
*/
private JoinRow(Object[] row) {
this.row = row;
}
/**
* @param joinFilterId Join filter id.
* @return Row state.
*/
private long getState(int joinFilterId) {
return (state >>> (joinFilterId << 1)) & S_MASK;
}
/**
* Allows to do a state transition in the following order:
* 0. Slot contains {@code null} ({@link #S_NULL}).
* 1. Slot contains {@link Future} ({@link #S_FUTURE}).
* 2. Slot contains {@link Cursor} ({@link #S_CURSOR}).
* 3. Slot contains {@link Row} ({@link #S_ROW}).
*
* @param joinFilterId {@link JoinRow} filter id.
* @param i Increment by this number of moves.
*/
private void incrementState(int joinFilterId, long i) {
assert i > 0 : i;
state += i << (joinFilterId << 1);
}
private void updateRow(int joinFilterId, Object x, long oldState, long newState) {
assert getState(joinFilterId) == oldState : "old state: " + getState(joinFilterId);
row[joinFilterId] = x;
incrementState(joinFilterId, newState - oldState);
assert getState(joinFilterId) == newState : "new state: " + getState(joinFilterId);
}
private Object row(int joinFilterId) {
return row[joinFilterId];
}
private boolean isRow(int joinFilterId) {
return getState(joinFilterId) == S_ROW;
}
private boolean isFuture(int joinFilterId) {
return getState(joinFilterId) == S_FUTURE;
}
private boolean isCursor(int joinFilterId) {
return getState(joinFilterId) == S_CURSOR;
}
private boolean isComplete() {
return isRow(row.length - 1);
}
private boolean isDropped() {
return row == null;
}
private void drop() {
if (prev != null) {
prev.next = next;
}
if (next != null) {
next.prev = prev;
}
row = null;
}
/**
* Copy this JoinRow behind itself in linked list of all in progress rows.
*
* @param jfId The last fetched filter id.
* @return The copy.
*/
private JoinRow copyBehind(int jfId) {
assert isCursor(jfId);
assert jfId + 1 == row.length || row[jfId + 1] == null;
Object[] r = new Object[row.length];
if (jfId != 0) {
System.arraycopy(row, 0, r, 0, jfId);
}
JoinRow copy = new JoinRow(r);
copy.state = state;
if (prev != null) {
copy.prev = prev;
prev.next = copy;
}
prev = copy;
copy.next = this;
return copy;
}
@Override
public String toString() {
return "JoinRow->" + Arrays.toString(row);
}
}
/**
* Fake Lookup batch for indexes which do not support batching but have to participate
* in batched joins.
*/
private static class FakeLookupBatch implements IndexLookupBatch {
final TableFilter filter;
SearchRow first;
SearchRow last;
boolean full;
final List<Future<Cursor>> result = new SingletonList<Future<Cursor>>();
/**
* @param index Index.
*/
public FakeLookupBatch(TableFilter filter) {
this.filter = filter;
}
@Override
public void addSearchRows(SearchRow first, SearchRow last) {
assert !full;
this.first = first;
this.last = last;
full = true;
}
@Override
public boolean isBatchFull() {
return full;
}
@Override
public List<Future<Cursor>> find() {
if (!full) {
return Collections.emptyList();
}
Cursor c = filter.getIndex().find(filter, first, last);
result.set(0, new DoneFuture<Cursor>(c));
full = false;
first = last = null;
return result;
}
}
/**
* Simple singleton list.
*/
private static class SingletonList<E> extends AbstractList<E> {
private E element;
@Override
public E get(int index) {
assert index == 0;
return element;
}
@Override
public E set(int index, E element) {
assert index == 0;
this.element = element;
return null;
}
@Override
public int size() {
return 1;
}
}
}
......@@ -218,6 +218,11 @@ public class TableView extends Table {
}
}
@Override
public boolean isView() {
return true;
}
/**
* Check if this view is currently invalid.
*
......
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.util;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.h2.message.DbException;
/**
* Single threaded lazy future.
*
* @param <T>
* @author Sergi Vladykin
*/
public abstract class LazyFuture<T> implements Future<T> {
private static final int S_READY = 0;
private static final int S_DONE = 1;
private static final int S_ERROR = 2;
private static final int S_CANCELED = 3;
private int state = S_READY;
private T result;
private Exception error;
/**
* Reset this future to the initial state.
*
* @return {@code false} if it was already in initial state
*/
public boolean reset() {
if (state == S_READY) {
return false;
}
state = S_READY;
result = null;
error = null;
return true;
}
/**
* Run computation and produce the result.
*
* @return the result of computation
*/
protected abstract T run() throws Exception;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (state != S_READY) {
return false;
}
state = S_CANCELED;
return true;
}
@Override
public T get() throws InterruptedException, ExecutionException {
switch (state) {
case S_READY:
try {
result = run();
state = S_DONE;
} catch (Exception e) {
error = e;
if (e instanceof InterruptedException) {
throw (InterruptedException) e;
}
throw new ExecutionException(e);
} finally {
if (state != S_DONE) {
state = S_ERROR;
}
}
return result;
case S_DONE:
return result;
case S_ERROR:
throw new ExecutionException(error);
case S_CANCELED:
throw new CancellationException();
default:
throw DbException.throwInternalError();
}
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
return get();
}
@Override
public boolean isCancelled() {
return state == S_CANCELED;
}
@Override
public boolean isDone() {
return state != S_READY;
}
}
......@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.api.TableEngine;
import org.h2.command.ddl.CreateTableData;
import org.h2.command.dml.OptimizerHints;
......@@ -36,6 +37,7 @@ import org.h2.index.Index;
import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexType;
import org.h2.index.SingleRowCursor;
import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
......@@ -383,10 +385,24 @@ public class TestTableEngines extends TestBase {
deleteDb("testSubQueryInfo");
}
private void setBatchingEnabled(Statement stat, boolean enabled) throws SQLException {
stat.execute("SET BATCH_JOINS " + enabled);
if (!config.networked) {
Session s = (Session) ((JdbcConnection) stat.getConnection()).getSession();
assertEquals(enabled, s.isJoinBatchEnabled());
}
}
private void testBatchedJoin() throws SQLException {
deleteDb("tableEngine");
Connection conn = getConnection("tableEngine;OPTIMIZE_REUSE_RESULTS=0");
deleteDb("testBatchedJoin");
Connection conn = getConnection("testBatchedJoin;OPTIMIZE_REUSE_RESULTS=0;BATCH_JOINS=1");
Statement stat = conn.createStatement();
if (!config.networked) {
Session s = (Session) ((JdbcConnection) conn).getSession();
assertTrue(s.isJoinBatchEnabled());
}
setBatchingEnabled(stat, false);
setBatchingEnabled(stat, true);
TreeSetIndex.exec = Executors.newFixedThreadPool(8, new ThreadFactory() {
@Override
......@@ -399,6 +415,9 @@ public class TestTableEngines extends TestBase {
enableJoinReordering(false);
try {
doTestBatchedJoinSubQueryUnion(stat);
TreeSetIndex.lookupBatches.set(0);
doTestBatchedJoin(stat, 1, 0, 0);
doTestBatchedJoin(stat, 0, 1, 0);
doTestBatchedJoin(stat, 0, 0, 1);
......@@ -429,11 +448,13 @@ public class TestTableEngines extends TestBase {
doTestBatchedJoin(stat, 0, 0, 5);
doTestBatchedJoin(stat, 0, 8, 1);
doTestBatchedJoin(stat, 0, 2, 1);
assertTrue(TreeSetIndex.lookupBatches.get() > 0);
} finally {
enableJoinReordering(true);
TreeSetIndex.exec.shutdownNow();
}
deleteDb("tableEngine");
deleteDb("testBatchedJoin");
}
/**
......@@ -448,6 +469,139 @@ public class TestTableEngines extends TestBase {
OptimizerHints.set(hints);
}
private void checkPlan(Statement stat, String sql) throws SQLException {
ResultSet rs = stat.executeQuery("EXPLAIN " + sql);
assertTrue(rs.next());
String plan = rs.getString(1);
assertEquals(normalize(sql), normalize(plan));
}
private static String normalize(String sql) {
sql = sql.replace('\n', ' ');
return sql.replaceAll("\\s+", " ").trim();
}
private void doTestBatchedJoinSubQueryUnion(Statement stat) throws SQLException {
String engine = '"' + TreeSetIndexTableEngine.class.getName() + '"';
stat.execute("CREATE TABLE t (a int, b int) ENGINE " + engine);
TreeSetTable t = TreeSetIndexTableEngine.created;
stat.execute("CREATE INDEX T_IDX_A ON t(a)");
stat.execute("CREATE INDEX T_IDX_B ON t(b)");
setBatchSize(t, 3);
for (int i = 0; i < 20; i++) {
stat.execute("insert into t values (" + i + "," + (i + 10) + ")");
}
stat.execute("CREATE TABLE u (a int, b int) ENGINE " + engine);
TreeSetTable u = TreeSetIndexTableEngine.created;
stat.execute("CREATE INDEX U_IDX_A ON u(a)");
stat.execute("CREATE INDEX U_IDX_B ON u(b)");
setBatchSize(u, 0);
for (int i = 10; i < 25; i++) {
stat.execute("insert into u values (" + i + "," + (i - 15)+ ")");
}
checkPlan(stat, "SELECT 1 FROM PUBLIC.T T1 /* PUBLIC.\"scan\" */ "
+ "INNER JOIN PUBLIC.T T2 /* batched:test PUBLIC.T_IDX_B: B = T1.A */ "
+ "ON 1=1 WHERE T1.A = T2.B");
checkPlan(stat, "SELECT 1 FROM PUBLIC.T T1 /* PUBLIC.\"scan\" */ "
+ "INNER JOIN PUBLIC.T T2 /* batched:test PUBLIC.T_IDX_B: B = T1.A */ "
+ "ON 1=1 /* WHERE T1.A = T2.B */ "
+ "INNER JOIN PUBLIC.T T3 /* batched:test PUBLIC.T_IDX_B: B = T2.A */ "
+ "ON 1=1 WHERE (T2.A = T3.B) AND (T1.A = T2.B)");
checkPlan(stat, "SELECT 1 FROM PUBLIC.T T1 /* PUBLIC.\"scan\" */ "
+ "INNER JOIN PUBLIC.U /* batched:fake PUBLIC.U_IDX_A: A = T1.A */ "
+ "ON 1=1 /* WHERE T1.A = U.A */ "
+ "INNER JOIN PUBLIC.T T2 /* batched:test PUBLIC.T_IDX_B: B = U.B */ "
+ "ON 1=1 WHERE (T1.A = U.A) AND (U.B = T2.B)");
checkPlan(stat, "SELECT 1 FROM ( SELECT A FROM PUBLIC.T ) Z "
+ "/* SELECT A FROM PUBLIC.T /++ PUBLIC.\"scan\" ++/ */ "
+ "INNER JOIN PUBLIC.T /* batched:test PUBLIC.T_IDX_B: B = Z.A */ "
+ "ON 1=1 WHERE Z.A = T.B");
checkPlan(stat, "SELECT 1 FROM PUBLIC.T /* PUBLIC.\"scan\" */ "
+ "INNER JOIN ( SELECT A FROM PUBLIC.T ) Z "
+ "/* batched:view SELECT A FROM PUBLIC.T /++ batched:test PUBLIC.T_IDX_A: A IS ?1 ++/ "
+ "WHERE A IS ?1: A = T.B */ ON 1=1 WHERE Z.A = T.B");
checkPlan(stat, "SELECT 1 FROM PUBLIC.T /* PUBLIC.\"scan\" */ "
+ "INNER JOIN ( ((SELECT A FROM PUBLIC.T) UNION ALL (SELECT B FROM PUBLIC.U)) "
+ "UNION ALL (SELECT B FROM PUBLIC.T) ) Z /* batched:view "
+ "((SELECT A FROM PUBLIC.T /++ batched:test PUBLIC.T_IDX_A: A IS ?1 ++/ WHERE A IS ?1) "
+ "UNION ALL "
+ "(SELECT B FROM PUBLIC.U /++ PUBLIC.U_IDX_B: B IS ?1 ++/ WHERE B IS ?1)) "
+ "UNION ALL "
+ "(SELECT B FROM PUBLIC.T /++ batched:test PUBLIC.T_IDX_B: B IS ?1 ++/ WHERE B IS ?1)"
+ ": A = T.A */ ON 1=1 WHERE Z.A = T.A");
checkPlan(stat, "SELECT 1 FROM PUBLIC.T /* PUBLIC.\"scan\" */ "
+ "INNER JOIN ( SELECT U.A FROM PUBLIC.U INNER JOIN PUBLIC.T ON 1=1 WHERE U.B = T.B ) Z "
+ "/* batched:view SELECT U.A FROM PUBLIC.U /++ batched:fake PUBLIC.U_IDX_A: A IS ?1 ++/ "
+ "/++ WHERE U.A IS ?1 ++/ INNER JOIN PUBLIC.T /++ batched:test PUBLIC.T_IDX_B: B = U.B ++/ "
+ "ON 1=1 WHERE (U.A IS ?1) AND (U.B = T.B): A = T.A */ ON 1=1 WHERE Z.A = T.A");
checkPlan(stat, "SELECT 1 FROM PUBLIC.T /* PUBLIC.\"scan\" */ "
+ "INNER JOIN ( SELECT A FROM PUBLIC.U ) Z /* SELECT A FROM PUBLIC.U "
+ "/++ PUBLIC.U_IDX_A: A IS ?1 ++/ WHERE A IS ?1: A = T.A */ ON 1=1 WHERE T.A = Z.A");
checkPlan(stat, "SELECT 1 FROM "
+ "( SELECT U.A FROM PUBLIC.U INNER JOIN PUBLIC.T ON 1=1 WHERE U.B = T.B ) Z "
+ "/* SELECT U.A FROM PUBLIC.U /++ PUBLIC.\"scan\" ++/ "
+ "INNER JOIN PUBLIC.T /++ batched:test PUBLIC.T_IDX_B: B = U.B ++/ "
+ "ON 1=1 WHERE U.B = T.B */ "
+ "INNER JOIN PUBLIC.T /* batched:test PUBLIC.T_IDX_A: A = Z.A */ ON 1=1 WHERE T.A = Z.A");
checkPlan(stat, "SELECT 1 FROM "
+ "( SELECT U.A FROM PUBLIC.T INNER JOIN PUBLIC.U ON 1=1 WHERE T.B = U.B ) Z "
+ "/* SELECT U.A FROM PUBLIC.T /++ PUBLIC.\"scan\" ++/ "
+ "INNER JOIN PUBLIC.U /++ PUBLIC.U_IDX_B: B = T.B ++/ "
+ "ON 1=1 WHERE T.B = U.B */ INNER JOIN PUBLIC.T /* batched:test PUBLIC.T_IDX_A: A = Z.A */ "
+ "ON 1=1 WHERE Z.A = T.A");
checkPlan(stat, "SELECT 1 FROM ( (SELECT A FROM PUBLIC.T) UNION (SELECT A FROM PUBLIC.U) ) Z "
+ "/* (SELECT A FROM PUBLIC.T /++ PUBLIC.\"scan\" ++/) "
+ "UNION "
+ "(SELECT A FROM PUBLIC.U /++ PUBLIC.\"scan\" ++/) */ "
+ "INNER JOIN PUBLIC.T /* batched:test PUBLIC.T_IDX_A: A = Z.A */ ON 1=1 WHERE Z.A = T.A");
checkPlan(stat, "SELECT 1 FROM PUBLIC.U /* PUBLIC.\"scan\" */ "
+ "INNER JOIN ( (SELECT A, B FROM PUBLIC.T) UNION (SELECT B, A FROM PUBLIC.U) ) Z "
+ "/* batched:view (SELECT A, B FROM PUBLIC.T /++ batched:test PUBLIC.T_IDX_B: B IS ?1 ++/ "
+ "WHERE B IS ?1) UNION (SELECT B, A FROM PUBLIC.U /++ PUBLIC.U_IDX_A: A IS ?1 ++/ "
+ "WHERE A IS ?1): B = U.B */ ON 1=1 /* WHERE U.B = Z.B */ "
+ "INNER JOIN PUBLIC.T /* batched:test PUBLIC.T_IDX_A: A = Z.A */ ON 1=1 "
+ "WHERE (U.B = Z.B) AND (Z.A = T.A)");
checkPlan(stat, "SELECT 1 FROM PUBLIC.U /* PUBLIC.\"scan\" */ "
+ "INNER JOIN ( SELECT A, B FROM PUBLIC.U ) Z "
+ "/* batched:fake SELECT A, B FROM PUBLIC.U /++ PUBLIC.U_IDX_A: A IS ?1 ++/ "
+ "WHERE A IS ?1: A = U.A */ ON 1=1 /* WHERE U.A = Z.A */ "
+ "INNER JOIN PUBLIC.T /* batched:test PUBLIC.T_IDX_B: B = Z.B */ "
+ "ON 1=1 WHERE (U.A = Z.A) AND (Z.B = T.B)");
// t: a = [ 0..20), b = [10..30)
// u: a = [10..25), b = [-5..10)
checkBatchedQueryResult(stat, 10,
"select t.a from t, (select t.b from u, t where u.a = t.a) z where t.b = z.b");
checkBatchedQueryResult(stat, 5,
"select t.a from (select t1.b from t t1, t t2 where t1.a = t2.b) z, t where t.b = z.b + 5");
checkBatchedQueryResult(stat, 1,
"select t.a from (select u.b from u, t t2 where u.a = t2.b) z, t where t.b = z.b + 1");
checkBatchedQueryResult(stat, 15,
"select t.a from (select u.b from u, t t2 where u.a = t2.b) z left join t on t.b = z.b");
checkBatchedQueryResult(stat, 15,
"select t.a from (select t1.b from t t1 left join t t2 on t1.a = t2.b) z, t "
+ "where t.b = z.b + 5");
checkBatchedQueryResult(stat, 1, "select t.a from t,(select 5 as b from t union select 10 from u) z "
+ "where t.b = z.b");
checkBatchedQueryResult(stat, 15, "select t.a from u,(select 5 as b, a from t "
+ "union select 10, a from u) z, t where t.b = z.b and z.a = u.a");
stat.execute("DROP TABLE T");
stat.execute("DROP TABLE U");
}
private void checkBatchedQueryResult(Statement stat, int size, String sql) throws SQLException {
setBatchingEnabled(stat, false);
List<List<Object>> expected = query(stat, sql);
assertEquals(size, expected.size());
setBatchingEnabled(stat, true);
List<List<Object>> actual = query(stat, sql);
if (!expected.equals(actual)) {
fail("\nexpected: " + expected + "\nactual: " + actual);
}
}
private void doTestBatchedJoin(Statement stat, int... batchSizes) throws SQLException {
ArrayList<TreeSetTable> tables = New.arrayList(batchSizes.length);
......@@ -501,6 +655,9 @@ public class TestTableEngines extends TestBase {
fail();
}
}
for (int i = 0; i < batchSizes.length; i++) {
stat.executeUpdate("DROP TABLE IF EXISTS T" + i);
}
}
private static void assert0(boolean condition, String message) {
......@@ -512,7 +669,15 @@ public class TestTableEngines extends TestBase {
private static void setBatchSize(ArrayList<TreeSetTable> tables, int... batchSizes) {
for (int i = 0; i < batchSizes.length; i++) {
int batchSize = batchSizes[i];
for (Index idx : tables.get(i).getIndexes()) {
setBatchSize(tables.get(i), batchSize);
}
}
private static void setBatchSize(TreeSetTable t, int batchSize) {
if (t.getIndexes() == null) {
t.scan.preferedBatchSize = batchSize;
} else {
for (Index idx : t.getIndexes()) {
((TreeSetIndex) idx).preferedBatchSize = batchSize;
}
}
......@@ -1115,6 +1280,8 @@ public class TestTableEngines extends TestBase {
* An index that internally uses a tree set.
*/
private static class TreeSetIndex extends BaseIndex implements Comparator<SearchRow> {
private static AtomicInteger lookupBatches = new AtomicInteger();
/**
* Executor service to test batched joins.
*/
......@@ -1145,9 +1312,18 @@ public class TestTableEngines extends TestBase {
public IndexLookupBatch createLookupBatch(final TableFilter filter) {
assert filter.getMasks() != null || "scan".equals(getName());
final int preferedSize = preferedBatchSize;
return preferedSize == 0 ? null : new IndexLookupBatch() {
if (preferedSize == 0) {
return null;
}
lookupBatches.incrementAndGet();
return new IndexLookupBatch() {
List<SearchRow> searchRows = New.arrayList();
@Override
public String getPlanSQL() {
return "test";
}
@Override public boolean isBatchFull() {
return searchRows.size() >= preferedSize * 2;
}
......@@ -1160,10 +1336,16 @@ public class TestTableEngines extends TestBase {
}
@Override
public void addSearchRows(SearchRow first, SearchRow last) {
public boolean addSearchRows(SearchRow first, SearchRow last) {
assert !isBatchFull();
searchRows.add(first);
searchRows.add(last);
return true;
}
@Override
public void reset() {
searchRows.clear();
}
};
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论