提交 6762b765 authored 作者: S.Vladykin's avatar S.Vladykin

Batched join for SELECT sub-queries.

上级 508ed18e
......@@ -948,7 +948,7 @@ public class Select extends Query {
expressionArray = new Expression[expressions.size()];
expressions.toArray(expressionArray);
if (!session.isParsingView()) {
topTableFilter.prepareBatch(0);
topTableFilter.prepareJoinBatch(0);
}
isPrepared = true;
}
......
......@@ -27,9 +27,11 @@ public interface IndexLookupBatch {
*
* @param first the first row, or null for no limit
* @param last the last row, or null for no limit
* @return {@code false} if this search row pair is known to produce no results
* and thus the given row pair was not added
* @see Index#find(TableFilter, SearchRow, SearchRow)
*/
void addSearchRows(SearchRow first, SearchRow last);
boolean addSearchRows(SearchRow first, SearchRow last);
/**
* Check if this batch is full.
......
......@@ -24,7 +24,7 @@ public class ViewCursor implements Cursor {
private final SearchRow first, last;
private Row current;
ViewCursor(ViewIndex index, LocalResult result, SearchRow first,
public ViewCursor(ViewIndex index, LocalResult result, SearchRow first,
SearchRow last) {
this.table = index.getTable();
this.index = index;
......
......@@ -347,6 +347,10 @@ public class ViewIndex extends BaseIndex implements SpatialIndex {
param.setValue(v);
}
public Query getQuery() {
return query;
}
private Query getQuery(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
Query q = prepareSubQuery(querySQL, session, masks, filters, filter, sortOrder, true);
......
......@@ -14,12 +14,14 @@ import java.util.concurrent.Future;
import org.h2.index.Cursor;
import org.h2.index.IndexCursor;
import org.h2.index.IndexLookupBatch;
import org.h2.index.ViewCursor;
import org.h2.index.ViewIndex;
import org.h2.message.DbException;
import org.h2.result.LocalResult;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.util.DoneFuture;
import org.h2.util.IntArray;
import org.h2.util.LazyFuture;
import org.h2.util.New;
import org.h2.value.Value;
import org.h2.value.ValueLong;
......@@ -59,9 +61,8 @@ public final class JoinBatch {
}
};
private static final Future<Cursor> PLACEHOLDER = new DoneFuture<Cursor>(null);
private ViewIndexLookupBatch viewIndexLookupBatch;
private Future<Cursor> viewTopFutureCursor;
private JoinFilter[] filters;
private JoinFilter top;
......@@ -157,12 +158,20 @@ public final class JoinBatch {
private void start() {
// initialize current row
current = new JoinRow(new Object[filters.length]);
// initialize top cursor
Cursor cursor;
if (viewIndexLookupBatch == null) {
// initialize top cursor
IndexCursor indexCursor = top.filter.getIndexCursor();
current.updateRow(top.id, indexCursor, JoinRow.S_NULL, JoinRow.S_CURSOR);
indexCursor.find(top.filter.getSession(), top.filter.getIndexConditions());
}
// it is a top level batched query
TableFilter f = top.filter;
IndexCursor indexCursor = f.getIndexCursor();
indexCursor.find(f.getSession(), f.getIndexConditions());
cursor = indexCursor;
} else {
// we are at the batched sub-query
assert viewTopFutureCursor != null;
cursor = get(viewTopFutureCursor);
}
current.updateRow(top.id, cursor, JoinRow.S_NULL, JoinRow.S_CURSOR);
// we need fake first row because batchedNext always will move to the next row
JoinRow fake = new JoinRow(null);
fake.next = current;
......@@ -202,15 +211,17 @@ public final class JoinBatch {
found = false;
}
}
private static Cursor get(Future<Cursor> f) {
Cursor c;
try {
return f.get();
c = f.get();
} catch (Exception e) {
throw DbException.convert(e);
}
return c == null ? EMPTY_CURSOR : c;
}
private boolean batchedNext() {
if (current == null) {
// after last
......@@ -294,17 +305,23 @@ public final class JoinBatch {
Object x = current.row(jfId);
assert x != null : "x null";
final JoinFilter jf = filters[jfId];
// in case of outer join we don't have any future around empty cursor
boolean newCursor = x == EMPTY_CURSOR;
if (!newCursor && current.isFuture(jfId)) {
if (newCursor) {
if (jfId == 0) {
// the top cursor is new and empty, drop it
current.drop();
return;
}
} else if (current.isFuture(jfId)) {
// get cursor from a future
x = get((Future<Cursor>) x);
current.updateRow(jfId, x, JoinRow.S_FUTURE, JoinRow.S_CURSOR);
newCursor = true;
}
final JoinFilter jf = filters[jfId];
Cursor c = (Cursor) x;
assert c != null;
JoinFilter join = jf.join;
......@@ -337,10 +354,11 @@ public final class JoinBatch {
}
if (c != null) {
current = current.copyBehind(jfId);
// get current row from cursor
// update jf, set current row from cursor
current.updateRow(jfId, c.get(), JoinRow.S_CURSOR, JoinRow.S_ROW);
}
if (joinEmpty) {
// update jf.join, set an empty cursor
current.updateRow(join.id, EMPTY_CURSOR, JoinRow.S_NULL, JoinRow.S_CURSOR);
}
return;
......@@ -348,13 +366,11 @@ public final class JoinBatch {
}
/**
* @return Adapter to allow joining to this batch in sub-queries.
* @return Adapter to allow joining to this batch in sub-queries and views.
*/
public IndexLookupBatch asViewIndexLookupBatch(ViewIndex viewIndex) {
if (viewIndexLookupBatch == null) {
viewIndexLookupBatch = new ViewIndexLookupBatch(viewIndex);
}
return viewIndexLookupBatch;
assert viewIndexLookupBatch == null;
return viewIndexLookupBatch = new ViewIndexLookupBatch(viewIndex);
}
@Override
......@@ -417,8 +433,7 @@ public final class JoinBatch {
if (c.isAlwaysFalse()) {
return false;
}
lookupBatch.addSearchRows(c.getStart(), c.getEnd());
return true;
return lookupBatch.addSearchRows(c.getStart(), c.getEnd());
}
private List<Future<Cursor>> find() {
......@@ -627,11 +642,12 @@ public final class JoinBatch {
}
@Override
public void addSearchRows(SearchRow first, SearchRow last) {
public boolean addSearchRows(SearchRow first, SearchRow last) {
assert !full;
this.first = first;
this.last = last;
full = true;
return true;
}
@Override
......@@ -678,58 +694,131 @@ public final class JoinBatch {
}
/**
* Lookup batch over this join batch for a sub-query or view.
* Index lookup batch over this join batch for a sub-query or view.
*/
private final class ViewIndexLookupBatch implements IndexLookupBatch {
private final ViewIndex viewIndex;
private final IntArray counts = new IntArray();
private final ArrayList<Future<Cursor>> result = New.arrayList();
private int resultSize;
private ViewIndexLookupBatch(ViewIndex viewIndex) {
this.viewIndex = viewIndex;
}
@Override
public void addSearchRows(SearchRow first, SearchRow last) {
viewIndex.setupQueryParameters(viewIndex.getSession(), first, last, null);
if (top.collectSearchRows()) {
if (top.isBatchFull()) {
fetch();
private boolean resetAfterFind() {
if (resultSize < 0) {
// method find was called, we need to reset futures to initial state for reuse
for (int i = 0, size = -resultSize; i < size; i++) {
((QueryRunner) result.get(i)).reset();
}
resultSize = 0;
return true;
}
return false;
}
private void fetch() {
// TODO
@Override
public boolean addSearchRows(SearchRow first, SearchRow last) {
resetAfterFind();
viewIndex.setupQueryParameters(viewIndex.getSession(), first, last, null);
if (!top.collectSearchRows()) {
return false;
}
QueryRunner r;
if (resultSize < result.size()) {
// get reused runner
r = (QueryRunner) result.get(resultSize);
} else {
// create new runner
result.add(r = new QueryRunner());
}
r.first = first;
r.last = last;
resultSize++;
return true;
}
private void onNextCursorStart() {
}
@Override
public boolean isBatchFull() {
return top.isBatchFull();
}
@Override
public List<Future<Cursor>> find() {
state = State.FIND_CALLED;
return result;
if (resultSize == 0) {
return Collections.emptyList();
}
// we do batched find only for top table filter and then lazily run the ViewIndex query
// for each received top future cursor
List<Future<Cursor>> topFutureCursors = top.find();
if (topFutureCursors.size() != resultSize) {
throw DbException.throwInternalError("Unexpected result size: " + result.size() +
", expected :" + resultSize);
}
for (int i = 0; i < resultSize; i++) {
QueryRunner r = (QueryRunner) result.get(i);
r.topFutureCursor = topFutureCursors.get(i);
}
List<Future<Cursor>> list = resultSize == result.size() ?
result : result.subList(0, resultSize);
// mark that method find was called
resultSize = -resultSize;
return list;
}
@Override
public void reset() {
state = State.COLLECTING;
result.clear();
if (resultSize != 0 && !resetAfterFind()) {
// find was not called, need to just clear runners
for (int i = 0; i < resultSize; i++) {
((QueryRunner) result.get(i)).clear();
}
resultSize = 0;
}
JoinBatch.this.reset();
}
}
/**
* State of the ViewIndexLookupBatch
*/
enum State {
COLLECTING, FULL, FIND_CALLED
/**
* Lazy query runner.
*/
private class QueryRunner extends LazyFuture<Cursor> {
private Future<Cursor> topFutureCursor;
private SearchRow first;
private SearchRow last;
private void clear() {
topFutureCursor = null;
first = last = null;
}
@Override
public boolean reset() {
if (super.reset()) {
return true;
}
// this query runner was never executed, need to clear manually
clear();
return false;
}
@Override
protected Cursor run() throws Exception {
if (topFutureCursor == null) {
// if the top cursor is empty then the whole query will produce empty result
return EMPTY_CURSOR;
}
JoinBatch.this.viewTopFutureCursor = topFutureCursor;
LocalResult localResult;
try {
localResult = viewIndex.getQuery().query(0);
} finally {
JoinBatch.this.viewTopFutureCursor = null;
}
ViewCursor cursor = new ViewCursor(viewIndex, localResult, first, last);
clear();
return cursor;
}
}
}
}
......@@ -363,18 +363,21 @@ public class TableFilter implements ColumnResolver {
* @param id join filter id (index of this table filter in join list)
* @return join batch if query runs over index which supports batched lookups, {@code null} otherwise
*/
public JoinBatch prepareBatch(int id) {
public JoinBatch prepareJoinBatch(int id) {
joinBatch = null;
joinFilterId = -1;
JoinBatch jb = null;
if (join != null) {
jb = join.prepareBatch(id + 1);
jb = join.prepareJoinBatch(id + 1);
}
IndexLookupBatch lookupBatch = null;
// the globally top table filter does not need batching, if isAlwaysTopTableFilter is false
// then we either not a top table filter or top table filter in a sub-query which is not
// top in outer query, thus we need to enable batching here to allow outer query run batched
// join against this sub-query
// For globally top table filter we don't need to create lookup batch, because
// currently it will not be used (this will be shown in ViewIndex.getPlanSQL()). Probably
// later on it will make sense to create it to better support X IN (...) conditions,
// but this needs to be implemented separately. If isAlwaysTopTableFilter is false
// then we either not a top table filter or top table filter in a sub-query, which
// in turn is not top in outer query, thus we need to enable batching here to allow
// outer query run batched join against this sub-query.
if (jb == null && select != null && !isAlwaysTopTableFilter(id)) {
lookupBatch = index.createLookupBatch(this);
if (lookupBatch != null) {
......@@ -387,13 +390,9 @@ public class TableFilter implements ColumnResolver {
}
joinBatch = jb;
joinFilterId = id;
// for globally top table filter we don't need to create lookup batch
// currently it will not be used, probably later on it will make sense
// to create it to better support X IN (...) conditions, but this needs
// to be implemented separately
if (lookupBatch == null && !isAlwaysTopTableFilter(id)) {
// index.createLookupBatch will be called only once because jb can be created only if
// lookupBatch is not null from the first call above
// index.createLookupBatch will be called at most once because jb can be
// created only if lookupBatch is already not null from the call above.
lookupBatch = index.createLookupBatch(this);
}
jb.register(this, lookupBatch);
......
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.util;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.h2.message.DbException;
/**
* Single threaded lazy future.
*
* @param <T>
* @author Sergi Vladykin
*/
public abstract class LazyFuture<T> implements Future<T> {
private static final int S_READY = 0;
private static final int S_DONE = 1;
private static final int S_ERROR = 2;
private static final int S_CANCELED = 3;
private int state = S_READY;
private T result;
private Exception error;
/**
* Reset this future to the initial state.
*
* @return {@code false} if it was already in initial state
*/
public boolean reset() {
if (state == S_READY) {
return false;
}
state = S_READY;
result = null;
error = null;
return true;
}
/**
* Run computation and produce the result.
*
* @return the result of computation
*/
protected abstract T run() throws Exception;
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (state != S_READY) {
return false;
}
state = S_CANCELED;
return true;
}
@Override
public T get() throws InterruptedException, ExecutionException {
switch (state) {
case S_READY:
try {
result = run();
state = S_DONE;
} catch (Exception e) {
error = e;
if (e instanceof InterruptedException) {
throw (InterruptedException) e;
}
throw new ExecutionException(e);
} finally {
if (state != S_DONE) {
state = S_ERROR;
}
}
return result;
case S_DONE:
return result;
case S_ERROR:
throw new ExecutionException(error);
case S_CANCELED:
throw new CancellationException();
default:
throw DbException.throwInternalError();
}
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException {
return get();
}
@Override
public boolean isCancelled() {
return state == S_CANCELED;
}
@Override
public boolean isDone() {
return state == S_DONE;
}
}
......@@ -1160,10 +1160,11 @@ public class TestTableEngines extends TestBase {
}
@Override
public void addSearchRows(SearchRow first, SearchRow last) {
public boolean addSearchRows(SearchRow first, SearchRow last) {
assert !isBatchFull();
searchRows.add(first);
searchRows.add(last);
return true;
}
@Override
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论