提交 f4ee562c authored 作者: Thomas Mueller Graf's avatar Thomas Mueller Graf

Merge branch 'master' of https://github.com/h2database/h2database

......@@ -17,7 +17,7 @@ import org.h2.value.ValueNull;
* Represents a single SQL statements.
* It wraps a prepared statement.
*/
class CommandContainer extends Command {
public class CommandContainer extends Command {
private Prepared prepared;
private boolean readOnlyKnown;
......
......@@ -130,7 +130,7 @@ public class Delete extends Prepared {
condition = condition.optimize(session);
condition.createIndexConditions(session, tableFilter);
}
PlanItem item = tableFilter.getBestPlanItem(session, 1);
PlanItem item = tableFilter.getBestPlanItem(session, new TableFilter[]{tableFilter}, 0);
tableFilter.setPlanItem(item);
tableFilter.prepare();
}
......
......@@ -389,7 +389,7 @@ public class ScriptCommand extends ScriptBase {
}
private int generateInsertValues(int count, Table table) throws IOException {
PlanItem plan = table.getBestPlanItem(session, null, null, null);
PlanItem plan = table.getBestPlanItem(session, null, null, -1, null);
Index index = plan.getIndex();
Cursor cursor = index.find(session, null, null);
Column[] columns = table.getColumns();
......
......@@ -187,7 +187,7 @@ public class Update extends Prepared {
e.mapColumns(tableFilter, 0);
expressionMap.put(c, e.optimize(session));
}
PlanItem item = tableFilter.getBestPlanItem(session, 1);
PlanItem item = tableFilter.getBestPlanItem(session, new TableFilter[] {tableFilter}, 0);
tableFilter.setPlanItem(item);
tableFilter.prepare();
}
......
......@@ -5,8 +5,6 @@
*/
package org.h2.index;
import java.util.List;
import java.util.concurrent.Future;
import org.h2.api.ErrorCode;
import org.h2.engine.Constants;
import org.h2.engine.DbObject;
......@@ -152,12 +150,13 @@ public abstract class BaseIndex extends SchemaObjectBase implements Index {
*
* @param masks the search mask
* @param rowCount the number of rows in the index
* @param filter the table filter
* @param filters all joined table filters
* @param filter the current table filter index
* @param sortOrder the sort order
* @return the estimated cost
*/
protected long getCostRangeIndex(int[] masks, long rowCount,
TableFilter filter, SortOrder sortOrder) {
TableFilter[] filters, int filter, SortOrder sortOrder) {
rowCount += Constants.COST_ROW_OFFSET;
long cost = rowCount;
long rows = rowCount;
......@@ -201,6 +200,7 @@ public abstract class BaseIndex extends SchemaObjectBase implements Index {
boolean sortOrderMatches = true;
int coveringCount = 0;
int[] sortTypes = sortOrder.getSortTypes();
TableFilter tableFilter = filters == null ? null : filters[filter];
for (int i = 0, len = sortTypes.length; i < len; i++) {
if (i >= indexColumns.length) {
// we can still use this index if we are sorting by more
......@@ -209,7 +209,7 @@ public abstract class BaseIndex extends SchemaObjectBase implements Index {
// more of the order by columns
break;
}
Column col = sortOrder.getColumn(i, filter);
Column col = sortOrder.getColumn(i, tableFilter);
if (col == null) {
sortOrderMatches = false;
break;
......@@ -427,13 +427,8 @@ public abstract class BaseIndex extends SchemaObjectBase implements Index {
}
@Override
public int getPreferedLookupBatchSize() {
// No batched lookups supported by default.
return 0;
}
@Override
public List<Future<Cursor>> findBatched(TableFilter filter, List<SearchRow> firstLastPairs) {
throw DbException.throwInternalError("Must not be called if getPreferedLookupBatchSize() is 0.");
public IndexLookupBatch createLookupBatch(TableFilter filter) {
// Lookup batching is not supported.
return null;
}
}
......@@ -52,8 +52,8 @@ public class FunctionIndex extends BaseIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
if (masks != null) {
throw DbException.getUnsupportedException("ALIAS");
}
......
......@@ -113,8 +113,8 @@ public class HashIndex extends BaseIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
for (Column column : columns) {
int index = column.getColumnId();
int mask = masks[index];
......
......@@ -83,11 +83,12 @@ public interface Index extends SchemaObject {
* @param session the session
* @param masks per-column comparison bit masks, null means 'always false',
* see constants in IndexCondition
* @param filter the table filter
* @param filters all joined table filters
* @param filter the current table filter index
* @param sortOrder the sort order
* @return the estimated cost
*/
double getCost(Session session, int[] masks, TableFilter filter,
double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
SortOrder sortOrder);
/**
......@@ -259,27 +260,11 @@ public interface Index extends SchemaObject {
void setSortedInsertMode(boolean sortedInsertMode);
/**
* If this index can do batched lookups, it may return it's preferred batch size,
* otherwise it must return 0.
* Creates new lookup batch. Note that returned {@link IndexLookupBatch} instance
* can be used multiple times.
*
* @return preferred batch size or 0 if lookup batching is not supported
* @see #findBatched(TableFilter, Collection)
* @param filter Table filter.
* @return Created batch or {@code null} if batched lookup is not supported by this index.
*/
int getPreferedLookupBatchSize();
/**
* Do batched lookup over the given collection of {@link SearchRow} pairs as in
* {@link #find(TableFilter, SearchRow, SearchRow)}.
* <br/><br/>
* Correct implementation must always return number of future cursors equal to
* {@code firstLastPairs.size() / 2}. Instead of {@link Future} containing empty
* {@link Cursor} it is possible to put {@code null} in result list.
*
* @param filter the table filter
* @param firstLastPairs List of batched search row pairs as in
* {@link #find(TableFilter, SearchRow, SearchRow)}, the collection will be reused by H2,
* thus it makes sense to defensively copy contents if needed.
* @return batched cursors for respective search row pairs in the same order
*/
List<Future<Cursor>> findBatched(TableFilter filter, List<SearchRow> firstLastPairs);
IndexLookupBatch createLookupBatch(TableFilter filter);
}
......@@ -347,6 +347,33 @@ public class IndexCondition {
return column;
}
/**
* Get expression.
*
* @return Expression.
*/
public Expression getExpression() {
return expression;
}
/**
* Get expression list.
*
* @return Expression list.
*/
public List<Expression> getExpressionList() {
return expressionList;
}
/**
* Get expression query.
*
* @return Expression query.
*/
public Query getExpressionQuery() {
return expressionQuery;
}
/**
* Check if the expression can be evaluated.
*
......
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.index;
import java.util.List;
import java.util.concurrent.Future;
import org.h2.result.SearchRow;
/**
* Support for asynchronous batched lookups in indexes. The flow is the following:
* H2 engine will be calling {@link #addSearchRows(SearchRow, SearchRow)} until
* method {@link #isBatchFull()}} will return {@code true} or there are no more
* search rows to add. Then method {@link #find()} will be called to execute batched lookup.
* Note that a single instance of {@link IndexLookupBatch} can be reused for multiple
* sequential batched lookups.
*
* @see Index#createLookupBatch(TableFilter)
* @author Sergi Vladykin
*/
public interface IndexLookupBatch {
/**
* Add search row pair to the batch.
*
* @param first the first row, or null for no limit
* @param last the last row, or null for no limit
* @see Index#find(TableFilter, SearchRow, SearchRow)
*/
void addSearchRows(SearchRow first, SearchRow last);
/**
* Check if this batch is full.
*
* @return {@code true} If batch is full, will not accept any
* more rows and {@link #find()} can be executed.
*/
boolean isBatchFull();
/**
* Execute batched lookup and return future cursor for each provided
* search row pair. Note that this method must return exactly the same number
* of future cursors in result list as number of {@link #addSearchRows(SearchRow, SearchRow)}
* calls has been done before {@link #find()} call exactly in the same order.
*
* @return List of future cursors for collected search rows.
*/
List<Future<Cursor>> find();
}
......@@ -141,10 +141,10 @@ public class LinkedIndex extends BaseIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return 100 + getCostRangeIndex(masks, rowCount +
Constants.COST_ROW_OFFSET, filter, sortOrder);
Constants.COST_ROW_OFFSET, filters, filter, sortOrder);
}
@Override
......
......@@ -52,13 +52,13 @@ public class MetaIndex extends BaseIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
if (scan) {
return 10 * MetaTable.ROW_COUNT_APPROXIMATION;
}
return getCostRangeIndex(masks, MetaTable.ROW_COUNT_APPROXIMATION,
filter, sortOrder);
filters, filter, sortOrder);
}
@Override
......
......@@ -142,9 +142,9 @@ public class MultiVersionIndex implements Index {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
return base.getCost(session, masks, filter, sortOrder);
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return base.getCost(session, masks, filters, filter, sortOrder);
}
@Override
......@@ -389,12 +389,8 @@ public class MultiVersionIndex implements Index {
}
@Override
public int getPreferedLookupBatchSize() {
return 0;
}
@Override
public List<Future<Cursor>> findBatched(TableFilter filter, List<SearchRow> firstLastPairs) {
throw DbException.throwInternalError("Must never be called.");
public IndexLookupBatch createLookupBatch(TableFilter filter) {
// Lookup batching is not supported.
return null;
}
}
......@@ -130,8 +130,8 @@ public class NonUniqueHashIndex extends BaseIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
for (Column column : columns) {
int index = column.getColumnId();
int mask = masks[index];
......
......@@ -217,10 +217,10 @@ public class PageBtreeIndex extends PageIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return 10 * getCostRangeIndex(masks, tableData.getRowCount(session),
filter, sortOrder);
filters, filter, sortOrder);
}
@Override
......
......@@ -10,7 +10,6 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.h2.api.ErrorCode;
import org.h2.engine.Constants;
import org.h2.engine.Session;
......@@ -310,8 +309,8 @@ public class PageDataIndex extends PageIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
long cost = 10 * (tableData.getRowCountApproximation() +
Constants.COST_ROW_OFFSET);
return cost;
......
......@@ -96,10 +96,10 @@ public class PageDelegateIndex extends PageIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return 10 * getCostRangeIndex(masks, mainIndex.getRowCount(session),
filter, sortOrder);
filters, filter, sortOrder);
}
@Override
......
......@@ -62,8 +62,8 @@ public class RangeIndex extends BaseIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return 1;
}
......
......@@ -174,8 +174,8 @@ public class ScanIndex extends BaseIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return tableData.getRowCountApproximation() + Constants.COST_ROW_OFFSET;
}
......
......@@ -180,7 +180,7 @@ public class SpatialTreeIndex extends BaseIndex implements SpatialIndex {
@Override
protected long getCostRangeIndex(int[] masks, long rowCount,
TableFilter filter, SortOrder sortOrder) {
TableFilter[] filters, int filter, SortOrder sortOrder) {
return getCostRangeIndex(masks, rowCount, columns);
}
......@@ -208,10 +208,10 @@ public class SpatialTreeIndex extends BaseIndex implements SpatialIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return getCostRangeIndex(masks, table.getRowCountApproximation(),
filter, sortOrder);
filters, filter, sortOrder);
}
@Override
......
......@@ -318,10 +318,10 @@ public class TreeIndex extends BaseIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
public double getCost(Session session, int[] masks, TableFilter[] filters, int filter,
SortOrder sortOrder) {
return getCostRangeIndex(masks, tableData.getRowCountApproximation(),
filter, sortOrder);
filters, filter, sortOrder);
}
@Override
......
......@@ -115,7 +115,7 @@ public class ViewIndex extends BaseIndex implements SpatialIndex {
@Override
public synchronized double getCost(Session session, int[] masks,
TableFilter filter, SortOrder sortOrder) {
TableFilter[] filters, int filter, SortOrder sortOrder) {
if (recursive) {
return 1000;
}
......
......@@ -6,7 +6,6 @@
package org.h2.mvstore.db;
import java.util.List;
import org.h2.engine.Session;
import org.h2.index.BaseIndex;
import org.h2.index.Cursor;
......@@ -89,10 +88,10 @@ public class MVDelegateIndex extends BaseIndex implements MVIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
return 10 * getCostRangeIndex(masks,
mainIndex.getRowCountApproximation(), filter, sortOrder);
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return 10 * getCostRangeIndex(masks, mainIndex.getRowCountApproximation(),
filters, filter, sortOrder);
}
@Override
......
......@@ -10,7 +10,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import org.h2.api.ErrorCode;
import org.h2.engine.Constants;
import org.h2.engine.Database;
......@@ -217,8 +216,8 @@ public class MVPrimaryIndex extends BaseIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
try {
long cost = 10 * (dataMap.sizeAsLongMax() + Constants.COST_ROW_OFFSET);
return cost;
......
......@@ -10,7 +10,6 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import org.h2.api.ErrorCode;
import org.h2.engine.Database;
import org.h2.engine.Session;
......@@ -352,11 +351,11 @@ public class MVSecondaryIndex extends BaseIndex implements MVIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
try {
return 10 * getCostRangeIndex(masks,
dataMap.sizeAsLongMax(), filter, sortOrder);
return 10 * getCostRangeIndex(masks, dataMap.sizeAsLongMax(),
filters, filter, sortOrder);
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.OBJECT_CLOSED, e);
}
......
......@@ -7,7 +7,6 @@ package org.h2.mvstore.db;
import java.util.Iterator;
import java.util.List;
import org.h2.api.ErrorCode;
import org.h2.engine.Database;
import org.h2.engine.Session;
......@@ -33,7 +32,6 @@ import org.h2.value.Value;
import org.h2.value.ValueGeometry;
import org.h2.value.ValueLong;
import org.h2.value.ValueNull;
import com.vividsolutions.jts.geom.Envelope;
import com.vividsolutions.jts.geom.Geometry;
......@@ -239,15 +237,15 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex, MVIndex {
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return getCostRangeIndex(masks, table.getRowCountApproximation(),
filter, sortOrder);
filters, filter, sortOrder);
}
@Override
protected long getCostRangeIndex(int[] masks, long rowCount,
TableFilter filter, SortOrder sortOrder) {
TableFilter[] filters, int filter, SortOrder sortOrder) {
return SpatialTreeIndex.getCostRangeIndex(masks, rowCount, columns);
}
......
......@@ -106,9 +106,9 @@ public class Plan {
public double calculateCost(Session session) {
double cost = 1;
boolean invalidPlan = false;
int level = 1;
for (TableFilter tableFilter : allFilters) {
PlanItem item = tableFilter.getBestPlanItem(session, level++);
for (int i = 0; i < allFilters.length; i++) {
TableFilter tableFilter = allFilters[i];
PlanItem item = tableFilter.getBestPlanItem(session, allFilters, i);
planItems.put(tableFilter, item);
cost += cost * item.cost;
setEvaluatable(tableFilter, true);
......
......@@ -18,10 +18,19 @@ public class PlanItem {
*/
double cost;
private int[] masks;
private Index index;
private PlanItem joinPlan;
private PlanItem nestedJoinPlan;
void setMasks(int[] masks) {
this.masks = masks;
}
int[] getMasks() {
return masks;
}
void setIndex(Index index) {
this.index = index;
}
......
......@@ -678,20 +678,21 @@ public abstract class Table extends SchemaObjectBase {
* @param session the session
* @param masks per-column comparison bit masks, null means 'always false',
* see constants in IndexCondition
* @param filter the table filter
* @param filters all joined table filters
* @param filter the current table filter index
* @param sortOrder the sort order
* @return the plan item
*/
public PlanItem getBestPlanItem(Session session, int[] masks,
TableFilter filter, SortOrder sortOrder) {
TableFilter[] filters, int filter, SortOrder sortOrder) {
PlanItem item = new PlanItem();
item.setIndex(getScanIndex(session));
item.cost = item.getIndex().getCost(session, null, null, null);
item.cost = item.getIndex().getCost(session, null, filters, filter, null);
ArrayList<Index> indexes = getIndexes();
if (indexes != null && masks != null) {
for (int i = 1, size = indexes.size(); i < size; i++) {
Index index = indexes.get(i);
double cost = index.getCost(session, masks, filter, sortOrder);
double cost = index.getCost(session, masks, filters, filter, sortOrder);
if (cost < item.cost) {
item.cost = cost;
item.setIndex(index);
......
......@@ -5,7 +5,12 @@
*/
package org.h2.table;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Future;
import org.h2.command.Parser;
import org.h2.command.dml.Select;
import org.h2.engine.Right;
......@@ -16,13 +21,16 @@ import org.h2.expression.Comparison;
import org.h2.expression.ConditionAndOr;
import org.h2.expression.Expression;
import org.h2.expression.ExpressionColumn;
import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexCondition;
import org.h2.index.IndexCursor;
import org.h2.message.DbException;
import org.h2.result.Row;
import org.h2.result.SearchRow;
import org.h2.result.SortOrder;
import org.h2.util.DoneFuture;
import org.h2.util.New;
import org.h2.util.StatementBuilder;
import org.h2.util.StringUtils;
......@@ -40,6 +48,33 @@ public class TableFilter implements ColumnResolver {
private static final int BEFORE_FIRST = 0, FOUND = 1, AFTER_LAST = 2,
NULL_ROW = 3;
private static final Cursor EMPTY_CURSOR = new Cursor() {
@Override
public boolean previous() {
return false;
}
@Override
public boolean next() {
return false;
}
@Override
public SearchRow getSearchRow() {
return null;
}
@Override
public Row get() {
return null;
}
@Override
public String toString() {
return "EMPTY_CURSOR";
}
};
/**
* Whether this is a direct or indirect (nested) outer join
*/
......@@ -51,9 +86,16 @@ public class TableFilter implements ColumnResolver {
private final Select select;
private String alias;
private Index index;
private int[] masks;
private int scanCount;
private boolean evaluatable;
/**
* Batched join support.
*/
private JoinBatch joinBatch;
private JoinFilter joinFilter;
/**
* Indicates that this filter is used in the plan.
*/
......@@ -154,15 +196,16 @@ public class TableFilter implements ColumnResolver {
* order.
*
* @param s the session
* @param level 1 for the first table in a join, 2 for the second, and so on
* @param filters all joined table filters
* @param filter the current table filter index
* @return the best plan item
*/
public PlanItem getBestPlanItem(Session s, int level) {
public PlanItem getBestPlanItem(Session s, TableFilter[] filters, int filter) {
PlanItem item;
if (indexConditions.size() == 0) {
item = new PlanItem();
item.setIndex(table.getScanIndex(s));
item.cost = item.getIndex().getCost(s, null, null, null);
item.cost = item.getIndex().getCost(s, null, filters, filter, null);
} else {
int len = table.getColumns().length;
int[] masks = new int[len];
......@@ -182,22 +225,25 @@ public class TableFilter implements ColumnResolver {
if (select != null) {
sortOrder = select.getSortOrder();
}
item = table.getBestPlanItem(s, masks, this, sortOrder);
item = table.getBestPlanItem(s, masks, filters, filter, sortOrder);
item.setMasks(masks);
// The more index conditions, the earlier the table.
// This is to ensure joins without indexes run quickly:
// x (x.a=10); y (x.b=y.b) - see issue 113
item.cost -= item.cost * indexConditions.size() / 100 / level;
item.cost -= item.cost * indexConditions.size() / 100 / (filter + 1);
}
if (nestedJoin != null) {
setEvaluatable(nestedJoin);
item.setNestedJoinPlan(nestedJoin.getBestPlanItem(s, level));
item.setNestedJoinPlan(nestedJoin.getBestPlanItem(s, filters, filter));
// TODO optimizer: calculate cost of a join: should use separate
// expected row number and lookup cost
item.cost += item.cost * item.getNestedJoinPlan().cost;
}
if (join != null) {
setEvaluatable(join);
item.setJoinPlan(join.getBestPlanItem(s, level));
filter += nestedJoin == null ? 1 : 2;
assert filters[filter] == join;
item.setJoinPlan(join.getBestPlanItem(s, filters, filter));
// TODO optimizer: calculate cost of a join: should use separate
// expected row number and lookup cost
item.cost += item.cost * item.getJoinPlan().cost;
......@@ -225,7 +271,7 @@ public class TableFilter implements ColumnResolver {
}
/**
* Set what plan item (index, cost) to use use.
* Set what plan item (index, cost, masks) to use.
*
* @param item the plan item
*/
......@@ -236,6 +282,7 @@ public class TableFilter implements ColumnResolver {
return;
}
setIndex(item.getIndex());
masks = item.getMasks();
if (nestedJoin != null) {
if (item.getNestedJoinPlan() != null) {
nestedJoin.setPlanItem(item.getNestedJoinPlan());
......@@ -291,16 +338,38 @@ public class TableFilter implements ColumnResolver {
* Start the query. This will reset the scan counts.
*
* @param s the session
* @return join batch if query runs over index which supports batched lookups, null otherwise
*/
public void startQuery(Session s) {
public JoinBatch startQuery(Session s) {
joinBatch = null;
joinFilter = null;
this.session = s;
scanCount = 0;
if (nestedJoin != null) {
nestedJoin.startQuery(s);
}
JoinBatch batch = null;
if (join != null) {
join.startQuery(s);
batch = join.startQuery(s);
}
IndexLookupBatch lookupBatch = null;
if (batch == null && select != null && select.getTopTableFilter() != this) {
lookupBatch = index.createLookupBatch(this);
if (lookupBatch != null) {
batch = new JoinBatch(join);
}
}
if (batch != null) {
if (nestedJoin != null) {
throw DbException.getUnsupportedException("nested join with batched index");
}
if (lookupBatch == null) {
lookupBatch = index.createLookupBatch(this);
}
joinBatch = batch;
joinFilter = batch.register(this, lookupBatch);
}
return batch;
}
/**
......@@ -323,6 +392,10 @@ public class TableFilter implements ColumnResolver {
* @return true if there are
*/
public boolean next() {
if (joinBatch != null) {
// will happen only on topTableFilter since jbatch.next does not call join.next()
return joinBatch.next();
}
if (state == AFTER_LAST) {
return false;
} else if (state == BEFORE_FIRST) {
......@@ -728,6 +801,14 @@ public class TableFilter implements ColumnResolver {
}
}
public int[] getMasks() {
return masks;
}
public ArrayList<IndexCondition> getIndexConditions() {
return indexConditions;
}
public Index getIndex() {
return index;
}
......@@ -882,6 +963,9 @@ public class TableFilter implements ColumnResolver {
@Override
public Value getValue(Column column) {
if (joinBatch != null) {
return joinBatch.getValue(joinFilter, column);
}
if (currentSearchRow == null) {
return null;
}
......@@ -1031,4 +1115,561 @@ public class TableFilter implements ColumnResolver {
void accept(TableFilter f);
}
/**
* Support for asynchronous batched index lookups on joins.
*
* @see Index#findBatched(TableFilter, java.util.Collection)
* @see Index#getPreferedLookupBatchSize()
*
* @author Sergi Vladykin
*/
private static final class JoinBatch {
int filtersCount;
JoinFilter[] filters;
JoinFilter top;
boolean started;
JoinRow current;
boolean found;
/**
* This filter joined after this batched join and can be used normally.
*/
final TableFilter additionalFilter;
/**
* @param additionalFilter table filter after this batched join.
*/
private JoinBatch(TableFilter additionalFilter) {
this.additionalFilter = additionalFilter;
}
/**
* @param filter table filter
* @param lookupBatch lookup batch
*/
private JoinFilter register(TableFilter filter, IndexLookupBatch lookupBatch) {
assert filter != null;
filtersCount++;
return top = new JoinFilter(lookupBatch, filter, top);
}
/**
* @param filterId table filter id
* @param column column
* @return column value for current row
*/
private Value getValue(JoinFilter filter, Column column) {
Object x = current.row(filter.id);
assert x != null;
Row row = current.isRow(filter.id) ? (Row) x : ((Cursor) x).get();
int columnId = column.getColumnId();
if (columnId == -1) {
return ValueLong.get(row.getKey());
}
Value value = row.getValue(column.getColumnId());
if (value == null) {
throw DbException.throwInternalError("value is null: " + column + " " + row);
}
return value;
}
private void start() {
if (filtersCount > 32) {
// This is because we store state in a 64 bit field, 2 bits per joined table.
throw DbException.getUnsupportedException("Too many tables in join (at most 32 supported).");
}
// fill filters
filters = new JoinFilter[filtersCount];
JoinFilter jf = top;
for (int i = 0; i < filtersCount; i++) {
filters[jf.id = i] = jf;
jf = jf.join;
}
// initialize current row
current = new JoinRow(new Object[filtersCount]);
current.updateRow(top.id, top.filter.cursor, JoinRow.S_NULL, JoinRow.S_CURSOR);
// initialize top cursor
top.filter.cursor.find(top.filter.session, top.filter.indexConditions);
// we need fake first row because batchedNext always will move to the next row
JoinRow fake = new JoinRow(null);
fake.next = current;
current = fake;
}
private boolean next() {
if (!started) {
start();
started = true;
}
if (additionalFilter == null) {
if (batchedNext()) {
assert current.isComplete();
return true;
}
return false;
}
for (;;) {
if (!found) {
if (!batchedNext()) {
return false;
}
assert current.isComplete();
found = true;
additionalFilter.reset();
}
// we call furtherFilter in usual way outside of this batch because it is more effective
if (additionalFilter.next()) {
return true;
}
found = false;
}
}
private static Cursor get(Future<Cursor> f) {
try {
return f.get();
} catch (Exception e) {
throw DbException.convert(e);
}
}
private boolean batchedNext() {
if (current == null) {
// after last
return false;
}
// go next
current = current.next;
if (current == null) {
return false;
}
current.prev = null;
final int lastJfId = filtersCount - 1;
int jfId = lastJfId;
while (current.row(jfId) == null) {
// lookup for the first non fetched filter for the current row
jfId--;
}
for (;;) {
fetchCurrent(jfId);
if (!current.isDropped()) {
// if current was not dropped then it must be fetched successfully
if (jfId == lastJfId) {
// the whole join row is ready to be returned
return true;
}
JoinFilter join = filters[jfId + 1];
if (join.isBatchFull()) {
// get future cursors for join and go right to fetch them
current = join.find(current);
}
if (current.row(join.id) != null) {
// either find called or outer join with null row
jfId = join.id;
continue;
}
}
// we have to go down and fetch next cursors for jfId if it is possible
if (current.next == null) {
// either dropped or null-row
if (current.isDropped()) {
current = current.prev;
if (current == null) {
return false;
}
}
assert !current.isDropped();
assert jfId != lastJfId;
jfId = 0;
while (current.row(jfId) != null) {
jfId++;
}
// force find on half filled batch (there must be either searchRows
// or Cursor.EMPTY set for null-rows)
current = filters[jfId].find(current);
} else {
// here we don't care if the current was dropped
current = current.next;
assert !current.isRow(jfId);
while (current.row(jfId) == null) {
assert jfId != top.id;
// need to go left and fetch more search rows
jfId--;
assert !current.isRow(jfId);
}
}
}
}
@SuppressWarnings("unchecked")
private void fetchCurrent(final int jfId) {
assert current.prev == null || current.prev.isRow(jfId) : "prev must be already fetched";
assert jfId == 0 || current.isRow(jfId - 1) : "left must be already fetched";
assert !current.isRow(jfId) : "double fetching";
Object x = current.row(jfId);
assert x != null : "x null";
final JoinFilter jf = filters[jfId];
// in case of outer join we don't have any future around empty cursor
boolean newCursor = x == EMPTY_CURSOR;
if (!newCursor && current.isFuture(jfId)) {
// get cursor from a future
x = get((Future<Cursor>) x);
current.updateRow(jfId, x, JoinRow.S_FUTURE, JoinRow.S_CURSOR);
newCursor = true;
}
Cursor c = (Cursor) x;
assert c != null;
JoinFilter join = jf.join;
for (;;) {
if (c == null || !c.next()) {
if (newCursor && jf.isOuterJoin()) {
// replace cursor with null-row
current.updateRow(jfId, jf.getNullRow(), JoinRow.S_CURSOR, JoinRow.S_ROW);
c = null;
newCursor = false;
} else {
// cursor is done, drop it
current.drop();
return;
}
}
if (!jf.isOk(c == null)) {
// try another row from the cursor
continue;
}
boolean joinEmpty = false;
if (join != null && !join.collectSearchRows()) {
if (join.isOuterJoin()) {
joinEmpty = true;
} else {
// join will fail, try next row in the cursor
continue;
}
}
if (c != null) {
current = current.copyBehind(jfId);
// get current row from cursor
current.updateRow(jfId, c.get(), JoinRow.S_CURSOR, JoinRow.S_ROW);
}
if (joinEmpty) {
current.updateRow(join.id, EMPTY_CURSOR, JoinRow.S_NULL, JoinRow.S_CURSOR);
}
return;
}
}
@Override
public String toString() {
return "JoinBatch->\nprev->" + (current == null ? null : current.prev) +
"\ncurr->" + current +
"\nnext->" + (current == null ? null : current.next);
}
}
/**
* Table filter participating in batched join.
*/
private static final class JoinFilter {
final TableFilter filter;
final JoinFilter join;
int id;
IndexLookupBatch lookupBatch;
private JoinFilter(IndexLookupBatch lookupBatch, TableFilter filter, JoinFilter join) {
this.filter = filter;
this.join = join;
this.lookupBatch = lookupBatch != null ? lookupBatch : new FakeLookupBatch(filter);
}
public Row getNullRow() {
return filter.table.getNullRow();
}
private boolean isOuterJoin() {
return filter.joinOuter;
}
private boolean isBatchFull() {
return lookupBatch.isBatchFull();
}
private boolean isOk(boolean ignoreJoinCondition) {
boolean filterOk = filter.isOk(filter.filterCondition);
boolean joinOk = filter.isOk(filter.joinCondition);
return filterOk && (ignoreJoinCondition || joinOk);
}
private boolean collectSearchRows() {
assert !isBatchFull();
IndexCursor c = filter.cursor;
c.prepare(filter.session, filter.indexConditions);
if (c.isAlwaysFalse()) {
return false;
}
lookupBatch.addSearchRows(c.getStart(), c.getEnd());
return true;
}
private JoinRow find(JoinRow current) {
assert current != null;
// lookupBatch is allowed to be empty when we have some null-rows and forced find call
List<Future<Cursor>> result = lookupBatch.find();
// go backwards and assign futures
for (int i = result.size(); i > 0;) {
assert current.isRow(id - 1);
if (current.row(id) == EMPTY_CURSOR) {
// outer join support - skip row with existing empty cursor
current = current.prev;
continue;
}
assert current.row(id) == null;
Future<Cursor> future = result.get(--i);
if (future == null) {
current.updateRow(id, EMPTY_CURSOR, JoinRow.S_NULL, JoinRow.S_CURSOR);
} else {
current.updateRow(id, future, JoinRow.S_NULL, JoinRow.S_FUTURE);
}
if (current.prev == null || i == 0) {
break;
}
current = current.prev;
}
// handle empty cursors (because of outer joins) at the beginning
while (current.prev != null && current.prev.row(id) == EMPTY_CURSOR) {
current = current.prev;
}
assert current.prev == null || current.prev.isRow(id);
assert current.row(id) != null;
assert !current.isRow(id);
// the last updated row
return current;
}
@Override
public String toString() {
return "JoinFilter->" + filter;
}
}
/**
* Linked row in batched join.
*/
private static final class JoinRow {
private static final long S_NULL = 0;
private static final long S_FUTURE = 1;
private static final long S_CURSOR = 2;
private static final long S_ROW = 3;
private static final long S_MASK = 3;
/**
* May contain one of the following:
* <br/>- {@code null}: means that we need to get future cursor for this row
* <br/>- {@link Future}: means that we need to get a new {@link Cursor} from the {@link Future}
* <br/>- {@link Cursor}: means that we need to fetch {@link Row}s from the {@link Cursor}
* <br/>- {@link Row}: the {@link Row} is already fetched and is ready to be used
*/
Object[] row;
long state;
JoinRow prev;
JoinRow next;
/**
* @param row Row.
*/
private JoinRow(Object[] row) {
this.row = row;
}
/**
* @param joinFilterId Join filter id.
* @return Row state.
*/
private long getState(int joinFilterId) {
return (state >>> (joinFilterId << 1)) & S_MASK;
}
/**
* Allows to do a state transition in the following order:
* 0. Slot contains {@code null} ({@link #S_NULL}).
* 1. Slot contains {@link Future} ({@link #S_FUTURE}).
* 2. Slot contains {@link Cursor} ({@link #S_CURSOR}).
* 3. Slot contains {@link Row} ({@link #S_ROW}).
*
* @param joinFilterId {@link JoinRow} filter id.
* @param i Increment by this number of moves.
*/
private void incrementState(int joinFilterId, long i) {
assert i > 0 : i;
state += i << (joinFilterId << 1);
}
private void updateRow(int joinFilterId, Object x, long oldState, long newState) {
assert getState(joinFilterId) == oldState : "old state: " + getState(joinFilterId);
row[joinFilterId] = x;
incrementState(joinFilterId, newState - oldState);
assert getState(joinFilterId) == newState : "new state: " + getState(joinFilterId);
}
private Object row(int joinFilterId) {
return row[joinFilterId];
}
private boolean isRow(int joinFilterId) {
return getState(joinFilterId) == S_ROW;
}
private boolean isFuture(int joinFilterId) {
return getState(joinFilterId) == S_FUTURE;
}
private boolean isCursor(int joinFilterId) {
return getState(joinFilterId) == S_CURSOR;
}
private boolean isComplete() {
return isRow(row.length - 1);
}
private boolean isDropped() {
return row == null;
}
private void drop() {
if (prev != null) {
prev.next = next;
}
if (next != null) {
next.prev = prev;
}
row = null;
}
/**
* Copy this JoinRow behind itself in linked list of all in progress rows.
*
* @param jfId The last fetched filter id.
* @return The copy.
*/
private JoinRow copyBehind(int jfId) {
assert isCursor(jfId);
assert jfId + 1 == row.length || row[jfId + 1] == null;
Object[] r = new Object[row.length];
if (jfId != 0) {
System.arraycopy(row, 0, r, 0, jfId);
}
JoinRow copy = new JoinRow(r);
copy.state = state;
if (prev != null) {
copy.prev = prev;
prev.next = copy;
}
prev = copy;
copy.next = this;
return copy;
}
@Override
public String toString() {
return "JoinRow->" + Arrays.toString(row);
}
}
/**
* Fake Lookup batch for indexes which do not support batching but have to participate
* in batched joins.
*/
private static class FakeLookupBatch implements IndexLookupBatch {
final TableFilter filter;
SearchRow first;
SearchRow last;
boolean full;
final List<Future<Cursor>> result = new SingletonList<Future<Cursor>>();
/**
* @param index Index.
*/
public FakeLookupBatch(TableFilter filter) {
this.filter = filter;
}
@Override
public void addSearchRows(SearchRow first, SearchRow last) {
assert !full;
this.first = first;
this.last = last;
full = true;
}
@Override
public boolean isBatchFull() {
return full;
}
@Override
public List<Future<Cursor>> find() {
if (!full) {
return Collections.emptyList();
}
Cursor c = filter.getIndex().find(filter, first, last);
result.set(0, new DoneFuture<Cursor>(c));
full = false;
first = last = null;
return result;
}
}
/**
* Simple singleton list.
*/
private static class SingletonList<E> extends AbstractList<E> {
private E element;
@Override
public E get(int index) {
assert index == 0;
return element;
}
@Override
public E set(int index, E element) {
assert index == 0;
this.element = element;
return null;
}
@Override
public int size() {
return 1;
}
}
}
......@@ -225,9 +225,9 @@ public class TableView extends Table {
@Override
public PlanItem getBestPlanItem(Session session, int[] masks,
TableFilter filter, SortOrder sortOrder) {
TableFilter[] filters, int filter, SortOrder sortOrder) {
PlanItem item = new PlanItem();
item.cost = index.getCost(session, masks, filter, sortOrder);
item.cost = index.getCost(session, masks, filters, filter, sortOrder);
final CacheKey cacheKey = new CacheKey(masks, session);
synchronized (this) {
......@@ -434,7 +434,7 @@ public class TableView extends Table {
throw DbException.get(ErrorCode.VIEW_IS_INVALID_2,
createException, getSQL(), msg);
}
PlanItem item = getBestPlanItem(session, null, null, null);
PlanItem item = getBestPlanItem(session, null, null, -1, null);
return item.getIndex();
}
......
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.util;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Future which is already done.
*
* @param <T> Result value.
* @author Sergi Vladykin
*/
public class DoneFuture<T> implements Future<T> {
final T x;
public DoneFuture(T x) {
this.x = x;
}
@Override
public T get() throws InterruptedException, ExecutionException {
return x;
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return x;
}
@Override
public boolean isDone() {
return true;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public String toString() {
return "DoneFuture->" + x;
}
}
......@@ -5,6 +5,7 @@
*/
package org.h2.util;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.File;
......@@ -13,6 +14,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.io.StringReader;
import java.io.StringWriter;
import java.io.Writer;
import java.lang.reflect.Array;
......@@ -361,9 +363,22 @@ public class SourceCompiler {
}
private static void handleSyntaxError(String output) {
if (output.startsWith("Note:") || output.startsWith("warning:")) {
// just a warning (e.g. unchecked or unsafe operations)
} else if (output.length() > 0) {
boolean syntaxError = false;
final BufferedReader reader = new BufferedReader(new StringReader(output));
try {
for (String line; (line = reader.readLine()) != null; ) {
if (line.startsWith("Note:") || line.startsWith("warning:")) {
// just a warning (e.g. unchecked or unsafe operations)
} else {
syntaxError = true;
break;
}
}
} catch (IOException ignored) {
// exception ignored
}
if (syntaxError) {
output = StringUtils.replaceAll(output, COMPILE_DIR, "");
throw DbException.get(ErrorCode.SYNTAX_ERROR_1, output);
}
......
org.h2.test.ap.TestAnnotationProcessor
\ No newline at end of file
package org.h2.test.ap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import javax.annotation.processing.AbstractProcessor;
import javax.annotation.processing.RoundEnvironment;
import javax.lang.model.SourceVersion;
import javax.lang.model.element.TypeElement;
import javax.tools.Diagnostic;
public class TestAnnotationProcessor extends AbstractProcessor {
public static final String MESSAGES_KEY = TestAnnotationProcessor.class.getName() + "-messages";
public Set<String> getSupportedAnnotationTypes() {
for (OutputMessage outputMessage : findMessages()) {
processingEnv.getMessager().printMessage(outputMessage.kind, outputMessage.message);
}
return Collections.emptySet();
}
private List<OutputMessage> findMessages() {
final String messagesStr = System.getProperty(MESSAGES_KEY);
if (messagesStr == null || messagesStr.isEmpty()) {
return Collections.emptyList();
} else {
final List<OutputMessage> outputMessages = new ArrayList<OutputMessage>();
for (String msg : messagesStr.split("\\|")) {
final String[] split = msg.split(",");
if (split.length == 2) {
outputMessages.add(new OutputMessage(Diagnostic.Kind.valueOf(split[0]), split[1]));
} else {
throw new IllegalStateException("Unable to parse messages definition for: '" + messagesStr + "'");
}
}
return outputMessages;
}
}
public SourceVersion getSupportedSourceVersion() {
return SourceVersion.RELEASE_6;
}
@Override
public boolean process(Set<? extends TypeElement> annotations, RoundEnvironment roundEnv) {
return false;
}
private static class OutputMessage {
public final Diagnostic.Kind kind;
public final String message;
private OutputMessage(Diagnostic.Kind kind, String message) {
this.kind = kind;
this.message = message;
}
}
}
......@@ -41,8 +41,10 @@ import org.h2.api.Aggregate;
import org.h2.api.AggregateFunction;
import org.h2.api.ErrorCode;
import org.h2.engine.Constants;
import org.h2.jdbc.JdbcSQLException;
import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase;
import org.h2.test.ap.TestAnnotationProcessor;
import org.h2.tools.SimpleResultSet;
import org.h2.util.IOUtils;
import org.h2.util.New;
......@@ -103,6 +105,7 @@ public class TestFunctions extends TestBase implements AggregateFunction {
testTranslate();
testGenerateSeries();
testFileWrite();
testAnnotationProcessorsOutput();
deleteDb("functions");
}
......@@ -1163,7 +1166,7 @@ public class TestFunctions extends TestBase implements AggregateFunction {
call.execute();
assertEquals(Integer[].class.getName(), call.getArray(1).getArray()
.getClass().getName());
assertEquals(new Integer[] { 2, 1 }, (Integer[]) call.getObject(1));
assertEquals(new Integer[]{2, 1}, (Integer[]) call.getObject(1));
stat.execute("drop alias array_test");
......@@ -1778,6 +1781,83 @@ public class TestFunctions extends TestBase implements AggregateFunction {
conn.close();
}
private void testAnnotationProcessorsOutput() throws SQLException {
testAnnotationProcessorsOutput_emptyKey();
testAnnotationProcessorsOutput_invalidKey();
testAnnotationProcessorsOutput_oneInvalidKey();
testAnnotationProcessorsOutput_warnAndError();
}
private void testAnnotationProcessorsOutput_emptyKey() throws SQLException {
try {
System.setProperty(TestAnnotationProcessor.MESSAGES_KEY, "");
callCompiledFunction("test_atp_empty_key");
} finally {
System.clearProperty(TestAnnotationProcessor.MESSAGES_KEY);
}
}
private void testAnnotationProcessorsOutput_invalidKey() throws SQLException {
try {
System.setProperty(TestAnnotationProcessor.MESSAGES_KEY, "invalid");
callCompiledFunction("test_atp_invalid_key");
fail();
} catch (JdbcSQLException e) {
assertEquals(ErrorCode.SYNTAX_ERROR_1, e.getErrorCode());
assertContains(e.getMessage(), "'invalid'");
} finally {
System.clearProperty(TestAnnotationProcessor.MESSAGES_KEY);
}
}
private void testAnnotationProcessorsOutput_oneInvalidKey() throws SQLException {
try {
System.setProperty(TestAnnotationProcessor.MESSAGES_KEY, "invalid,foo");
callCompiledFunction("test_atp_one_invalid_key");
fail();
} catch (JdbcSQLException e) {
assertEquals(ErrorCode.SYNTAX_ERROR_1, e.getErrorCode());
assertContains(e.getMessage(), "enum");
assertContains(e.getMessage(), "Kind.invalid");
} finally {
System.clearProperty(TestAnnotationProcessor.MESSAGES_KEY);
}
}
private void testAnnotationProcessorsOutput_warnAndError() throws SQLException {
try {
System.setProperty(TestAnnotationProcessor.MESSAGES_KEY, "WARNING,foo1|ERROR,foo2");
callCompiledFunction("test_atp_warn_and_error");
fail();
} catch (JdbcSQLException e) {
assertEquals(ErrorCode.SYNTAX_ERROR_1, e.getErrorCode());
assertContains(e.getMessage(), "foo1");
assertContains(e.getMessage(), "foo2");
} finally {
System.clearProperty(TestAnnotationProcessor.MESSAGES_KEY);
}
}
private void callCompiledFunction(String functionName) throws SQLException {
deleteDb("functions");
Connection conn = getConnection("functions");
Statement stat = conn.createStatement();
ResultSet rs;
stat.execute("create alias " + functionName + " AS "
+ "$$ boolean " + functionName + "() "
+ "{ return true; } $$;");
PreparedStatement stmt = conn.prepareStatement(
"select " + functionName + "() from dual");
rs = stmt.executeQuery();
rs.next();
assertEquals(Boolean.class.getName(), rs.getObject(1).getClass().getName());
stat.execute("drop alias " + functionName + "");
conn.close();
}
private void assertCallResult(String expected, Statement stat, String sql)
throws SQLException {
ResultSet rs = stat.executeQuery("CALL " + sql);
......
......@@ -16,16 +16,24 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import org.h2.api.TableEngine;
import org.h2.command.ddl.CreateTableData;
import org.h2.command.dml.OptimizerHints;
import org.h2.engine.Constants;
import org.h2.engine.Session;
import org.h2.expression.Expression;
import org.h2.index.BaseIndex;
import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.index.IndexLookupBatch;
import org.h2.index.IndexType;
import org.h2.index.SingleRowCursor;
import org.h2.message.DbException;
......@@ -37,6 +45,7 @@ import org.h2.table.Table;
import org.h2.table.TableBase;
import org.h2.table.TableFilter;
import org.h2.test.TestBase;
import org.h2.util.DoneFuture;
import org.h2.util.New;
import org.h2.value.Value;
import org.h2.value.ValueInt;
......@@ -65,6 +74,7 @@ public class TestTableEngines extends TestBase {
testEngineParams();
testSimpleQuery();
testMultiColumnTreeSetIndex();
testBatchedJoin();
}
private void testEarlyFilter() throws SQLException {
......@@ -332,7 +342,181 @@ public class TestTableEngines extends TestBase {
deleteDb("tableEngine");
}
private void testBatchedJoin() throws SQLException {
deleteDb("tableEngine");
Connection conn = getConnection("tableEngine;OPTIMIZE_REUSE_RESULTS=0");
Statement stat = conn.createStatement();
TreeSetIndex.exec = Executors.newFixedThreadPool(8, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
enableJoinReordering(false);
try {
doTestBatchedJoin(stat, 1, 0, 0);
doTestBatchedJoin(stat, 0, 1, 0);
doTestBatchedJoin(stat, 0, 0, 1);
doTestBatchedJoin(stat, 0, 2, 0);
doTestBatchedJoin(stat, 0, 0, 2);
doTestBatchedJoin(stat, 0, 0, 3);
doTestBatchedJoin(stat, 0, 0, 4);
doTestBatchedJoin(stat, 0, 0, 5);
doTestBatchedJoin(stat, 0, 3, 1);
doTestBatchedJoin(stat, 0, 3, 3);
doTestBatchedJoin(stat, 0, 3, 7);
doTestBatchedJoin(stat, 0, 4, 1);
doTestBatchedJoin(stat, 0, 4, 6);
doTestBatchedJoin(stat, 0, 4, 20);
doTestBatchedJoin(stat, 0, 10, 0);
doTestBatchedJoin(stat, 0, 0, 10);
doTestBatchedJoin(stat, 0, 20, 0);
doTestBatchedJoin(stat, 0, 0, 20);
doTestBatchedJoin(stat, 0, 20, 20);
doTestBatchedJoin(stat, 3, 7, 0);
doTestBatchedJoin(stat, 0, 0, 5);
doTestBatchedJoin(stat, 0, 8, 1);
doTestBatchedJoin(stat, 0, 2, 1);
} finally {
enableJoinReordering(true);
TreeSetIndex.exec.shutdownNow();
}
deleteDb("tableEngine");
}
/**
* @param enable Enabled.
*/
private void enableJoinReordering(boolean enable) {
OptimizerHints hints = null;
if (!enable) {
hints = new OptimizerHints();
hints.setJoinReorderEnabled(false);
}
OptimizerHints.set(hints);
}
private void doTestBatchedJoin(Statement stat, int... batchSizes) throws SQLException {
ArrayList<TreeSetTable> tables = New.arrayList(batchSizes.length);
for (int i = 0; i < batchSizes.length; i++) {
stat.executeUpdate("DROP TABLE IF EXISTS T" + i);
stat.executeUpdate("CREATE TABLE T" + i + "(A INT, B INT) ENGINE \"" +
TreeSetIndexTableEngine.class.getName() + "\"");
tables.add(TreeSetIndexTableEngine.created);
stat.executeUpdate("CREATE INDEX IDX_B ON T" + i + "(B)");
stat.executeUpdate("CREATE INDEX IDX_A ON T" + i + "(A)");
PreparedStatement insert = stat.getConnection().prepareStatement(
"INSERT INTO T"+ i + " VALUES (?,?)");
for (int j = i, size = i + 10; j < size; j++) {
insert.setInt(1, j);
insert.setInt(2, j);
insert.executeUpdate();
}
for (TreeSetTable table : tables) {
assertEquals(10, table.getRowCount(null));
}
}
int[] zeroBatchSizes = new int[batchSizes.length];
int tests = 1 << (batchSizes.length * 4);
for (int test = 0; test < tests; test++) {
String query = generateQuery(test, batchSizes.length);
// System.out.println(Arrays.toString(batchSizes) + ": " + test + " -> " + query);
setBatchSize(tables, batchSizes);
List<List<Object>> res1 = query(stat, query);
setBatchSize(tables, zeroBatchSizes);
List<List<Object>> res2 = query(stat, query);
// System.out.println(res1 + " " + res2);
if (!res2.equals(res1)) {
System.err.println(Arrays.toString(batchSizes) + ": " + res1 + " " + res2);
System.err.println("Test " + test);
System.err.println(query);
for (TreeSetTable table : tables) {
System.err.println(table.getName() + " = " +
query(stat, "select * from " + table.getName()));
}
fail();
}
}
}
private static void setBatchSize(ArrayList<TreeSetTable> tables, int... batchSizes) {
for (int i = 0; i < batchSizes.length; i++) {
int batchSize = batchSizes[i];
for (Index idx : tables.get(i).getIndexes()) {
((TreeSetIndex) idx).preferedBatchSize = batchSize;
}
}
}
private String generateQuery(int t, int tables) {
final int withLeft = 1;
final int withFalse = 2;
final int withWhere = 4;
final int withOnIsNull = 8;
StringBuilder b = new StringBuilder();
b.append("select count(*) from ");
StringBuilder where = new StringBuilder();
for (int i = 0; i < tables; i++) {
if (i != 0) {
if ((t & withLeft) != 0) {
b.append(" left ");
}
b.append(" join ");
}
b.append("\nT").append(i).append(' ');
if (i != 0) {
boolean even = (i & 1) == 0;
if ((t & withOnIsNull) != 0) {
b.append(" on T").append(i - 1).append(even ? ".B" : ".A").append(" is null");
} else if ((t & withFalse) != 0) {
b.append(" on false ");
} else {
b.append(" on T").append(i - 1).append(even ? ".B = " : ".A = ");
b.append("T").append(i).append(even ? ".B " : ".A ");
}
}
if ((t & withWhere) != 0) {
if (where.length() != 0) {
where.append(" and ");
}
where.append(" T").append(i).append(".A > 5");
}
t >>>= 4;
}
if (where.length() != 0) {
b.append("\nwhere ").append(where);
}
return b.toString();
}
private void checkResultsNoOrder(Statement stat, int size, String query1, String query2)
throws SQLException {
List<List<Object>> res1 = query(stat, query1);
......@@ -476,7 +660,7 @@ public class TestTableEngines extends TestBase {
@Override
public double getCost(Session session, int[] masks,
TableFilter filter, SortOrder sortOrder) {
TableFilter[] filters, int filter, SortOrder sortOrder) {
return 0;
}
......@@ -710,9 +894,12 @@ public class TestTableEngines extends TestBase {
* A table engine that internally uses a tree set.
*/
public static class TreeSetIndexTableEngine implements TableEngine {
static TreeSetTable created;
@Override
public Table createTable(CreateTableData data) {
return new TreeSetTable(data);
return created = new TreeSetTable(data);
}
}
......@@ -727,8 +914,8 @@ public class TestTableEngines extends TestBase {
TreeSetIndex scan = new TreeSetIndex(this, "scan",
IndexColumn.wrap(getColumns()), IndexType.createScan(false)) {
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return getRowCount(session) + Constants.COST_ROW_OFFSET;
}
};
......@@ -881,8 +1068,15 @@ public class TestTableEngines extends TestBase {
* An index that internally uses a tree set.
*/
private static class TreeSetIndex extends BaseIndex implements Comparator<SearchRow> {
final TreeSet<SearchRow> set = new TreeSet<SearchRow>(this);
/**
* Executor service to test batched joins.
*/
private static ExecutorService exec;
private final TreeSet<SearchRow> set = new TreeSet<SearchRow>(this);
private int preferedBatchSize;
TreeSetIndex(Table t, String name, IndexColumn[] cols, IndexType type) {
initBaseIndex(t, 0, name, cols, type);
}
......@@ -890,12 +1084,74 @@ public class TestTableEngines extends TestBase {
@Override
public int compare(SearchRow o1, SearchRow o2) {
int res = compareRows(o1, o2);
if (res == 0 && (o1.getKey() == Long.MAX_VALUE || o2.getKey() == Long.MAX_VALUE)) {
res = -1;
if (res == 0) {
if (o1.getKey() == Long.MAX_VALUE || o2.getKey() == Long.MIN_VALUE) {
res = 1;
} else if (o1.getKey() == Long.MIN_VALUE || o2.getKey() == Long.MAX_VALUE) {
res = -1;
}
}
return res;
}
@Override
public IndexLookupBatch createLookupBatch(final TableFilter filter) {
assert filter.getMasks() != null || "scan".equals(getName());
final int preferedSize = preferedBatchSize;
return preferedSize == 0 ? null : new IndexLookupBatch() {
List<SearchRow> searchRows = New.arrayList();
@Override public boolean isBatchFull() {
return searchRows.size() >= preferedSize * 2;
}
@Override
public List<Future<Cursor>> find() {
List<Future<Cursor>> res = findBatched(filter, searchRows);
searchRows.clear();
return res;
}
@Override
public void addSearchRows(SearchRow first, SearchRow last) {
assert !isBatchFull();
searchRows.add(first);
searchRows.add(last);
}
};
}
public List<Future<Cursor>> findBatched(final TableFilter filter, List<SearchRow> firstLastPairs) {
ArrayList<Future<Cursor>> result = New.arrayList(firstLastPairs.size());
final Random rnd = new Random();
for (int i = 0; i < firstLastPairs.size(); i += 2) {
final SearchRow first = firstLastPairs.get(i);
final SearchRow last = firstLastPairs.get(i + 1);
Future<Cursor> future;
if (rnd.nextBoolean()) {
IteratorCursor c = (IteratorCursor) find(filter, first, last);
if (c.it.hasNext()) {
future = new DoneFuture<Cursor>(c);
} else {
// we can return null instead of future of empty cursor
future = null;
}
} else {
future = exec.submit(new Callable<Cursor>() {
@Override
public Cursor call() throws Exception {
if (rnd.nextInt(50) == 0) {
Thread.sleep(0, 500);
}
return find(filter, first, last);
}
});
}
result.add(future);
}
return result;
}
@Override
public void close(Session session) {
// No-op.
......@@ -911,10 +1167,10 @@ public class TestTableEngines extends TestBase {
set.remove(row);
}
private static SearchRow mark(SearchRow row) {
private static SearchRow mark(SearchRow row, boolean first) {
if (row != null) {
// Mark this row to be a search row.
row.setKey(Long.MAX_VALUE);
row.setKey(first ? Long.MIN_VALUE : Long.MAX_VALUE);
}
return row;
}
......@@ -926,28 +1182,32 @@ public class TestTableEngines extends TestBase {
subSet = Collections.emptySet();
} else {
if (first != null) {
first = set.floor(mark(first));
first = set.floor(mark(first, true));
}
if (last != null) {
last = set.ceiling(mark(last));
last = set.ceiling(mark(last, false));
}
if (first == null && last == null) {
subSet = set;
} else if (first != null) {
subSet = set.tailSet(first, true);
if (last != null) {
subSet = set.subSet(first, true, last, true);
} else {
subSet = set.tailSet(first, true);
}
} else if (last != null) {
subSet = set.headSet(last, true);
} else {
subSet = set.subSet(first, true, last, true);
throw new IllegalStateException();
}
}
return new IteratorCursor(subSet.iterator());
}
@Override
public double getCost(Session session, int[] masks, TableFilter filter,
SortOrder sortOrder) {
return getCostRangeIndex(masks, set.size(), filter, sortOrder);
public double getCost(Session session, int[] masks,
TableFilter[] filters, int filter, SortOrder sortOrder) {
return getCostRangeIndex(masks, set.size(), filters, filter, sortOrder);
}
@Override
......@@ -1031,6 +1291,11 @@ public class TestTableEngines extends TestBase {
public Row get() {
return current;
}
@Override
public String toString() {
return "IterCursor->" + current;
}
}
/**
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论