提交 69f3867b authored 作者: S.Vladykin's avatar S.Vladykin

ready

上级 cb74fee7
......@@ -64,6 +64,8 @@ public final class JoinBatch {
}
};
private static final Future<Cursor> EMPTY_FUTURE_CURSOR = new DoneFuture<Cursor>(EMPTY_CURSOR);
private boolean batchedSubQuery;
private Future<Cursor> viewTopFutureCursor;
......@@ -164,7 +166,6 @@ public final class JoinBatch {
// initialize top cursor
Cursor cursor;
if (batchedSubQuery) {
// we are at the batched sub-query
assert viewTopFutureCursor != null;
cursor = get(viewTopFutureCursor);
} else {
......@@ -313,7 +314,7 @@ public final class JoinBatch {
if (newCursor) {
if (jfId == 0) {
// the top cursor is new and empty, drop it
// the top cursor is new and empty, then the whole select will not produce any rows
current.drop();
return;
}
......@@ -385,7 +386,8 @@ public final class JoinBatch {
Query query = viewIndex.getQuery();
query.prepareJoinBatch();
if (query.isUnion()) {
return new ViewIndexLookupBatchUnion(viewIndex);
ViewIndexLookupBatchUnion unionBatch = new ViewIndexLookupBatchUnion(viewIndex);
return unionBatch.hasBatchedQueries() ? unionBatch : null;
}
JoinBatch jb = ((Select) query).getJoinBatch();
if (jb == null) {
......@@ -717,7 +719,8 @@ public final class JoinBatch {
/**
* Base class for SELECT and SELECT UNION view index lookup batches.
*/
private abstract static class ViewIndexLookupBatchBase implements IndexLookupBatch {
private abstract static class ViewIndexLookupBatchBase<R extends QueryRunnerBase>
implements IndexLookupBatch {
protected final ViewIndex viewIndex;
protected final ArrayList<Future<Cursor>> result = New.arrayList();
protected int resultSize;
......@@ -726,9 +729,9 @@ public final class JoinBatch {
this.viewIndex = viewIndex;
}
protected abstract boolean collectSearchRows();
protected abstract boolean collectSearchRows(R r);
protected abstract QueryRunnerBase newQueryRunner();
protected abstract R newQueryRunner();
protected abstract void startQueryRunners();
......@@ -736,7 +739,7 @@ public final class JoinBatch {
if (resultSize < 0) {
// method find was called, we need to reset futures to initial state for reuse
for (int i = 0, size = -resultSize; i < size; i++) {
((QueryRunnerBase) result.get(i)).reset();
queryRunner(i).reset();
}
resultSize = 0;
return true;
......@@ -744,23 +747,29 @@ public final class JoinBatch {
return false;
}
@SuppressWarnings("unchecked")
protected R queryRunner(int i) {
return (R) result.get(i);
}
@Override
public final boolean addSearchRows(SearchRow first, SearchRow last) {
resetAfterFind();
viewIndex.setupQueryParameters(viewIndex.getSession(), first, last, null);
if (!collectSearchRows()) {
return false;
}
QueryRunnerBase r;
R r;
if (resultSize < result.size()) {
// get reused runner
r = (QueryRunnerBase) result.get(resultSize);
r = queryRunner(resultSize);
} else {
// create new runner
result.add(r = newQueryRunner());
}
r.first = first;
r.last = last;
if (!collectSearchRows(r)) {
r.clear();
return false;
}
resultSize++;
return true;
}
......@@ -826,23 +835,24 @@ public final class JoinBatch {
/**
* View index lookup batch for a simple SELECT.
*/
private final class ViewIndexLookupBatch extends ViewIndexLookupBatchBase {
private final class ViewIndexLookupBatch extends ViewIndexLookupBatchBase<QueryRunner> {
private ViewIndexLookupBatch(ViewIndex viewIndex) {
super(viewIndex);
}
@Override
protected QueryRunnerBase newQueryRunner() {
protected QueryRunner newQueryRunner() {
return new QueryRunner(viewIndex);
}
@Override
protected boolean collectSearchRows() {
protected boolean collectSearchRows(QueryRunner r) {
return top.collectSearchRows();
}
@Override
public boolean isBatchFull() {
assert resultSize >= 0;
return top.isBatchFull();
}
......@@ -856,78 +866,91 @@ public final class JoinBatch {
", expected :" + resultSize);
}
for (int i = 0; i < resultSize; i++) {
QueryRunner r = (QueryRunner) result.get(i);
QueryRunner r = queryRunner(i);
r.topFutureCursor = topFutureCursors.get(i);
}
}
}
/**
* Query runner.
*/
private final class QueryRunner extends QueryRunnerBase {
private Future<Cursor> topFutureCursor;
/**
* Query runner.
*/
private final class QueryRunner extends QueryRunnerBase {
private Future<Cursor> topFutureCursor;
public QueryRunner(ViewIndex viewIndex) {
super(viewIndex);
}
public QueryRunner(ViewIndex viewIndex) {
super(viewIndex);
}
protected void clear() {
super.clear();
topFutureCursor = null;
}
protected void clear() {
topFutureCursor = null;
super.clear();
@Override
protected Cursor run() throws Exception {
if (topFutureCursor == null) {
// if the top cursor is empty then the whole query will produce empty result
return EMPTY_CURSOR;
}
@Override
protected Cursor run() throws Exception {
if (topFutureCursor == null) {
// if the top cursor is empty then the whole query will produce empty result
return EMPTY_CURSOR;
}
JoinBatch.this.viewTopFutureCursor = topFutureCursor;
LocalResult localResult;
try {
localResult = viewIndex.getQuery().query(0);
} finally {
JoinBatch.this.viewTopFutureCursor = null;
}
return newCursor(localResult);
viewIndex.setupQueryParameters(viewIndex.getSession(), first, last, null);
JoinBatch.this.viewTopFutureCursor = topFutureCursor;
LocalResult localResult;
try {
localResult = viewIndex.getQuery().query(0);
} finally {
JoinBatch.this.viewTopFutureCursor = null;
}
return newCursor(localResult);
}
}
/**
* View index lookup batch for UNION queries.
*/
private static final class ViewIndexLookupBatchUnion extends ViewIndexLookupBatchBase {
private ArrayList<JoinFilter> tops = New.arrayList();
private ArrayList<JoinBatch> joinBatches = New.arrayList();
private static final class ViewIndexLookupBatchUnion
extends ViewIndexLookupBatchBase<QueryRunnerUnion> {
private ArrayList<JoinFilter> filters;
private ArrayList<JoinBatch> joinBatches;
private boolean onlyBatchedQueries = true;
protected ViewIndexLookupBatchUnion(ViewIndex viewIndex) {
super(viewIndex);
collectTopTableFilters(viewIndex.getQuery());
collectJoinBatches(viewIndex.getQuery());
}
private void collectTopTableFilters(Query query) {
private boolean hasBatchedQueries() {
return joinBatches != null;
}
private void collectJoinBatches(Query query) {
if (query.isUnion()) {
SelectUnion union = (SelectUnion) query;
collectTopTableFilters(union.getLeft());
collectTopTableFilters(union.getRight());
collectJoinBatches(union.getLeft());
collectJoinBatches(union.getRight());
} else {
Select select = (Select) query;
JoinBatch jb = select.getJoinBatch();
if (jb == null) {
// TODO need some wrapper for a non-batched query
onlyBatchedQueries = false;
} else {
assert !jb.batchedSubQuery;
jb.batchedSubQuery = true;
if (joinBatches == null) {
joinBatches = New.arrayList();
filters = New.arrayList();
}
filters.add(jb.filters[0]);
joinBatches.add(jb);
}
assert !jb.batchedSubQuery;
jb.batchedSubQuery = true;
tops.add(jb.filters[0]);
}
}
@Override
public boolean isBatchFull() {
// if at least one is full
for (int i = 0; i < tops.size(); i++) {
if (tops.get(i).isBatchFull()) {
for (int i = 0; i < filters.size(); i++) {
if (filters.get(i).isBatchFull()) {
return true;
}
}
......@@ -935,44 +958,76 @@ public final class JoinBatch {
}
@Override
protected boolean collectSearchRows() {
for (int i = 0; i < tops.size(); i++) {
if (tops.get(i).collectSearchRows()) {
// TODO
protected boolean collectSearchRows(QueryRunnerUnion r) {
boolean collected = false;
for (int i = 0; i < filters.size(); i++) {
if (filters.get(i).collectSearchRows()) {
collected = true;
} else {
r.topFutureCursors[i] = EMPTY_FUTURE_CURSOR;
}
}
return true;
return collected || !onlyBatchedQueries;
}
@Override
protected QueryRunnerBase newQueryRunner() {
return new QueryRunnerUnion();
protected QueryRunnerUnion newQueryRunner() {
return new QueryRunnerUnion(this);
}
@Override
protected void startQueryRunners() {
// TODO Auto-generated method stub
for (int i = 0; i < filters.size(); i++) {
List<Future<Cursor>> topFutureCursors = filters.get(i).find();
for (int j = 0, k = 0; j < resultSize; j++) {
Future<Cursor>[] cs = queryRunner(j).topFutureCursors;
if (cs[j] == null) {
cs[j] = topFutureCursors.get(k++);
}
}
}
}
}
/**
* Query runner for UNION.
*/
private class QueryRunnerUnion extends QueryRunnerBase {
public QueryRunnerUnion() {
super(ViewIndexLookupBatchUnion.this.viewIndex);
/**
* Query runner for UNION.
*/
private static class QueryRunnerUnion extends QueryRunnerBase {
private ViewIndexLookupBatchUnion batchUnion;
private Future<Cursor>[] topFutureCursors;
@SuppressWarnings("unchecked")
public QueryRunnerUnion(ViewIndexLookupBatchUnion batchUnion) {
super(batchUnion.viewIndex);
this.batchUnion = batchUnion;
topFutureCursors = new Future[batchUnion.filters.size()];
}
@Override
protected void clear() {
super.clear();
for (int i = 0; i < topFutureCursors.length; i++) {
topFutureCursors[i] = null;
}
}
@Override
protected void clear() {
super.clear();
// TODO
@Override
protected Cursor run() throws Exception {
viewIndex.setupQueryParameters(viewIndex.getSession(), first, last, null);
ArrayList<JoinBatch> joinBatches = batchUnion.joinBatches;
for (int i = 0, size = joinBatches.size(); i < size; i++) {
assert topFutureCursors[i] != null;
joinBatches.get(i).viewTopFutureCursor = topFutureCursors[i];
}
@Override
protected Cursor run() throws Exception {
// TODO Auto-generated method stub
return null;
LocalResult localResult;
try {
localResult = viewIndex.getQuery().query(0);
} finally {
for (int i = 0, size = joinBatches.size(); i < size; i++) {
joinBatches.get(i).viewTopFutureCursor = null;
}
}
return newCursor(localResult);
}
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论