提交 571bda0c authored 作者: S.Vladykin's avatar S.Vladykin

Moved JoinBatch to the top level

上级 84fd6dae
......@@ -5,12 +5,7 @@
*/
package org.h2.table;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import org.h2.command.Parser;
import org.h2.command.dml.Select;
import org.h2.engine.Right;
......@@ -21,7 +16,6 @@ import org.h2.expression.Comparison;
import org.h2.expression.ConditionAndOr;
import org.h2.expression.Expression;
import org.h2.expression.ExpressionColumn;
import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexCondition;
......@@ -30,7 +24,6 @@ import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.result.SortOrder;
import org.h2.util.DoneFuture;
import org.h2.util.New;
import org.h2.util.StatementBuilder;
import org.h2.util.StringUtils;
......@@ -48,33 +41,6 @@ public class TableFilter implements ColumnResolver {
private static final int BEFORE_FIRST = 0, FOUND = 1, AFTER_LAST = 2,
NULL_ROW = 3;
private static final Cursor EMPTY_CURSOR = new Cursor() {
@Override
public boolean previous() {
return false;
}
@Override
public boolean next() {
return false;
}
@Override
public SearchRow getSearchRow() {
return null;
}
@Override
public Row get() {
return null;
}
@Override
public String toString() {
return "EMPTY_CURSOR";
}
};
/**
* Whether this is a direct or indirect (nested) outer join
*/
......@@ -94,8 +60,8 @@ public class TableFilter implements ColumnResolver {
* Batched join support.
*/
private JoinBatch joinBatch;
private JoinFilter joinFilter;
int joinFilterId = -1;
/**
* Indicates that this filter is used in the plan.
*/
......@@ -168,6 +134,10 @@ public class TableFilter implements ColumnResolver {
hashCode = session.nextObjectId();
}
public IndexCursor getIndexCursor() {
return cursor;
}
@Override
public Select getSelect() {
return select;
......@@ -342,7 +312,7 @@ public class TableFilter implements ColumnResolver {
*/
public JoinBatch startQuery(Session s) {
joinBatch = null;
joinFilter = null;
joinFilterId = -1;
this.session = s;
scanCount = 0;
if (nestedJoin != null) {
......@@ -367,7 +337,7 @@ public class TableFilter implements ColumnResolver {
lookupBatch = index.createLookupBatch(this);
}
joinBatch = batch;
joinFilter = batch.register(this, lookupBatch);
batch.register(this, lookupBatch);
}
return batch;
}
......@@ -505,7 +475,7 @@ public class TableFilter implements ColumnResolver {
// scanCount);
}
private boolean isOk(Expression condition) {
boolean isOk(Expression condition) {
if (condition == null) {
return true;
}
......@@ -964,7 +934,7 @@ public class TableFilter implements ColumnResolver {
@Override
public Value getValue(Column column) {
if (joinBatch != null) {
return joinBatch.getValue(joinFilter, column);
return joinBatch.getValue(joinFilterId, column);
}
if (currentSearchRow == null) {
return null;
......@@ -1114,562 +1084,4 @@ public class TableFilter implements ColumnResolver {
*/
void accept(TableFilter f);
}
/**
* Support for asynchronous batched index lookups on joins.
*
* @see Index#findBatched(TableFilter, java.util.Collection)
* @see Index#getPreferedLookupBatchSize()
*
* @author Sergi Vladykin
*/
private static final class JoinBatch {
int filtersCount;
JoinFilter[] filters;
JoinFilter top;
boolean started;
JoinRow current;
boolean found;
/**
* This filter joined after this batched join and can be used normally.
*/
final TableFilter additionalFilter;
/**
* @param additionalFilter table filter after this batched join.
*/
private JoinBatch(TableFilter additionalFilter) {
this.additionalFilter = additionalFilter;
}
/**
* @param filter table filter
* @param lookupBatch lookup batch
*/
private JoinFilter register(TableFilter filter, IndexLookupBatch lookupBatch) {
assert filter != null;
filtersCount++;
return top = new JoinFilter(lookupBatch, filter, top);
}
/**
* @param filterId table filter id
* @param column column
* @return column value for current row
*/
private Value getValue(JoinFilter filter, Column column) {
Object x = current.row(filter.id);
assert x != null;
Row row = current.isRow(filter.id) ? (Row) x : ((Cursor) x).get();
int columnId = column.getColumnId();
if (columnId == -1) {
return ValueLong.get(row.getKey());
}
Value value = row.getValue(column.getColumnId());
if (value == null) {
throw DbException.throwInternalError("value is null: " + column + " " + row);
}
return value;
}
private void start() {
if (filtersCount > 32) {
// This is because we store state in a 64 bit field, 2 bits per joined table.
throw DbException.getUnsupportedException("Too many tables in join (at most 32 supported).");
}
// fill filters
filters = new JoinFilter[filtersCount];
JoinFilter jf = top;
for (int i = 0; i < filtersCount; i++) {
filters[jf.id = i] = jf;
jf = jf.join;
}
// initialize current row
current = new JoinRow(new Object[filtersCount]);
current.updateRow(top.id, top.filter.cursor, JoinRow.S_NULL, JoinRow.S_CURSOR);
// initialize top cursor
top.filter.cursor.find(top.filter.session, top.filter.indexConditions);
// we need fake first row because batchedNext always will move to the next row
JoinRow fake = new JoinRow(null);
fake.next = current;
current = fake;
}
private boolean next() {
if (!started) {
start();
started = true;
}
if (additionalFilter == null) {
if (batchedNext()) {
assert current.isComplete();
return true;
}
return false;
}
for (;;) {
if (!found) {
if (!batchedNext()) {
return false;
}
assert current.isComplete();
found = true;
additionalFilter.reset();
}
// we call furtherFilter in usual way outside of this batch because it is more effective
if (additionalFilter.next()) {
return true;
}
found = false;
}
}
private static Cursor get(Future<Cursor> f) {
try {
return f.get();
} catch (Exception e) {
throw DbException.convert(e);
}
}
private boolean batchedNext() {
if (current == null) {
// after last
return false;
}
// go next
current = current.next;
if (current == null) {
return false;
}
current.prev = null;
final int lastJfId = filtersCount - 1;
int jfId = lastJfId;
while (current.row(jfId) == null) {
// lookup for the first non fetched filter for the current row
jfId--;
}
for (;;) {
fetchCurrent(jfId);
if (!current.isDropped()) {
// if current was not dropped then it must be fetched successfully
if (jfId == lastJfId) {
// the whole join row is ready to be returned
return true;
}
JoinFilter join = filters[jfId + 1];
if (join.isBatchFull()) {
// get future cursors for join and go right to fetch them
current = join.find(current);
}
if (current.row(join.id) != null) {
// either find called or outer join with null row
jfId = join.id;
continue;
}
}
// we have to go down and fetch next cursors for jfId if it is possible
if (current.next == null) {
// either dropped or null-row
if (current.isDropped()) {
current = current.prev;
if (current == null) {
return false;
}
}
assert !current.isDropped();
assert jfId != lastJfId;
jfId = 0;
while (current.row(jfId) != null) {
jfId++;
}
// force find on half filled batch (there must be either searchRows
// or Cursor.EMPTY set for null-rows)
current = filters[jfId].find(current);
} else {
// here we don't care if the current was dropped
current = current.next;
assert !current.isRow(jfId);
while (current.row(jfId) == null) {
assert jfId != top.id;
// need to go left and fetch more search rows
jfId--;
assert !current.isRow(jfId);
}
}
}
}
@SuppressWarnings("unchecked")
private void fetchCurrent(final int jfId) {
assert current.prev == null || current.prev.isRow(jfId) : "prev must be already fetched";
assert jfId == 0 || current.isRow(jfId - 1) : "left must be already fetched";
assert !current.isRow(jfId) : "double fetching";
Object x = current.row(jfId);
assert x != null : "x null";
final JoinFilter jf = filters[jfId];
// in case of outer join we don't have any future around empty cursor
boolean newCursor = x == EMPTY_CURSOR;
if (!newCursor && current.isFuture(jfId)) {
// get cursor from a future
x = get((Future<Cursor>) x);
current.updateRow(jfId, x, JoinRow.S_FUTURE, JoinRow.S_CURSOR);
newCursor = true;
}
Cursor c = (Cursor) x;
assert c != null;
JoinFilter join = jf.join;
for (;;) {
if (c == null || !c.next()) {
if (newCursor && jf.isOuterJoin()) {
// replace cursor with null-row
current.updateRow(jfId, jf.getNullRow(), JoinRow.S_CURSOR, JoinRow.S_ROW);
c = null;
newCursor = false;
} else {
// cursor is done, drop it
current.drop();
return;
}
}
if (!jf.isOk(c == null)) {
// try another row from the cursor
continue;
}
boolean joinEmpty = false;
if (join != null && !join.collectSearchRows()) {
if (join.isOuterJoin()) {
joinEmpty = true;
} else {
// join will fail, try next row in the cursor
continue;
}
}
if (c != null) {
current = current.copyBehind(jfId);
// get current row from cursor
current.updateRow(jfId, c.get(), JoinRow.S_CURSOR, JoinRow.S_ROW);
}
if (joinEmpty) {
current.updateRow(join.id, EMPTY_CURSOR, JoinRow.S_NULL, JoinRow.S_CURSOR);
}
return;
}
}
@Override
public String toString() {
return "JoinBatch->\nprev->" + (current == null ? null : current.prev) +
"\ncurr->" + current +
"\nnext->" + (current == null ? null : current.next);
}
}
/**
* Table filter participating in batched join.
*/
private static final class JoinFilter {
final TableFilter filter;
final JoinFilter join;
int id;
IndexLookupBatch lookupBatch;
private JoinFilter(IndexLookupBatch lookupBatch, TableFilter filter, JoinFilter join) {
this.filter = filter;
this.join = join;
this.lookupBatch = lookupBatch != null ? lookupBatch : new FakeLookupBatch(filter);
}
public Row getNullRow() {
return filter.table.getNullRow();
}
private boolean isOuterJoin() {
return filter.joinOuter;
}
private boolean isBatchFull() {
return lookupBatch.isBatchFull();
}
private boolean isOk(boolean ignoreJoinCondition) {
boolean filterOk = filter.isOk(filter.filterCondition);
boolean joinOk = filter.isOk(filter.joinCondition);
return filterOk && (ignoreJoinCondition || joinOk);
}
private boolean collectSearchRows() {
assert !isBatchFull();
IndexCursor c = filter.cursor;
c.prepare(filter.session, filter.indexConditions);
if (c.isAlwaysFalse()) {
return false;
}
lookupBatch.addSearchRows(c.getStart(), c.getEnd());
return true;
}
private JoinRow find(JoinRow current) {
assert current != null;
// lookupBatch is allowed to be empty when we have some null-rows and forced find call
List<Future<Cursor>> result = lookupBatch.find();
// go backwards and assign futures
for (int i = result.size(); i > 0;) {
assert current.isRow(id - 1);
if (current.row(id) == EMPTY_CURSOR) {
// outer join support - skip row with existing empty cursor
current = current.prev;
continue;
}
assert current.row(id) == null;
Future<Cursor> future = result.get(--i);
if (future == null) {
current.updateRow(id, EMPTY_CURSOR, JoinRow.S_NULL, JoinRow.S_CURSOR);
} else {
current.updateRow(id, future, JoinRow.S_NULL, JoinRow.S_FUTURE);
}
if (current.prev == null || i == 0) {
break;
}
current = current.prev;
}
// handle empty cursors (because of outer joins) at the beginning
while (current.prev != null && current.prev.row(id) == EMPTY_CURSOR) {
current = current.prev;
}
assert current.prev == null || current.prev.isRow(id);
assert current.row(id) != null;
assert !current.isRow(id);
// the last updated row
return current;
}
@Override
public String toString() {
return "JoinFilter->" + filter;
}
}
/**
* Linked row in batched join.
*/
private static final class JoinRow {
private static final long S_NULL = 0;
private static final long S_FUTURE = 1;
private static final long S_CURSOR = 2;
private static final long S_ROW = 3;
private static final long S_MASK = 3;
/**
* May contain one of the following:
* <br/>- {@code null}: means that we need to get future cursor for this row
* <br/>- {@link Future}: means that we need to get a new {@link Cursor} from the {@link Future}
* <br/>- {@link Cursor}: means that we need to fetch {@link Row}s from the {@link Cursor}
* <br/>- {@link Row}: the {@link Row} is already fetched and is ready to be used
*/
Object[] row;
long state;
JoinRow prev;
JoinRow next;
/**
* @param row Row.
*/
private JoinRow(Object[] row) {
this.row = row;
}
/**
* @param joinFilterId Join filter id.
* @return Row state.
*/
private long getState(int joinFilterId) {
return (state >>> (joinFilterId << 1)) & S_MASK;
}
/**
* Allows to do a state transition in the following order:
* 0. Slot contains {@code null} ({@link #S_NULL}).
* 1. Slot contains {@link Future} ({@link #S_FUTURE}).
* 2. Slot contains {@link Cursor} ({@link #S_CURSOR}).
* 3. Slot contains {@link Row} ({@link #S_ROW}).
*
* @param joinFilterId {@link JoinRow} filter id.
* @param i Increment by this number of moves.
*/
private void incrementState(int joinFilterId, long i) {
assert i > 0 : i;
state += i << (joinFilterId << 1);
}
private void updateRow(int joinFilterId, Object x, long oldState, long newState) {
assert getState(joinFilterId) == oldState : "old state: " + getState(joinFilterId);
row[joinFilterId] = x;
incrementState(joinFilterId, newState - oldState);
assert getState(joinFilterId) == newState : "new state: " + getState(joinFilterId);
}
private Object row(int joinFilterId) {
return row[joinFilterId];
}
private boolean isRow(int joinFilterId) {
return getState(joinFilterId) == S_ROW;
}
private boolean isFuture(int joinFilterId) {
return getState(joinFilterId) == S_FUTURE;
}
private boolean isCursor(int joinFilterId) {
return getState(joinFilterId) == S_CURSOR;
}
private boolean isComplete() {
return isRow(row.length - 1);
}
private boolean isDropped() {
return row == null;
}
private void drop() {
if (prev != null) {
prev.next = next;
}
if (next != null) {
next.prev = prev;
}
row = null;
}
/**
* Copy this JoinRow behind itself in linked list of all in progress rows.
*
* @param jfId The last fetched filter id.
* @return The copy.
*/
private JoinRow copyBehind(int jfId) {
assert isCursor(jfId);
assert jfId + 1 == row.length || row[jfId + 1] == null;
Object[] r = new Object[row.length];
if (jfId != 0) {
System.arraycopy(row, 0, r, 0, jfId);
}
JoinRow copy = new JoinRow(r);
copy.state = state;
if (prev != null) {
copy.prev = prev;
prev.next = copy;
}
prev = copy;
copy.next = this;
return copy;
}
@Override
public String toString() {
return "JoinRow->" + Arrays.toString(row);
}
}
/**
* Fake Lookup batch for indexes which do not support batching but have to participate
* in batched joins.
*/
private static class FakeLookupBatch implements IndexLookupBatch {
final TableFilter filter;
SearchRow first;
SearchRow last;
boolean full;
final List<Future<Cursor>> result = new SingletonList<Future<Cursor>>();
/**
* @param index Index.
*/
public FakeLookupBatch(TableFilter filter) {
this.filter = filter;
}
@Override
public void addSearchRows(SearchRow first, SearchRow last) {
assert !full;
this.first = first;
this.last = last;
full = true;
}
@Override
public boolean isBatchFull() {
return full;
}
@Override
public List<Future<Cursor>> find() {
if (!full) {
return Collections.emptyList();
}
Cursor c = filter.getIndex().find(filter, first, last);
result.set(0, new DoneFuture<Cursor>(c));
full = false;
first = last = null;
return result;
}
}
/**
* Simple singleton list.
*/
private static class SingletonList<E> extends AbstractList<E> {
private E element;
@Override
public E get(int index) {
assert index == 0;
return element;
}
@Override
public E set(int index, E element) {
assert index == 0;
this.element = element;
return null;
}
@Override
public int size() {
return 1;
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论