提交 a3c06157 authored 作者: S.Vladykin's avatar S.Vladykin

refactoring after review

上级 cd874de3
......@@ -1081,10 +1081,6 @@ public class TableFilter implements ColumnResolver {
return session;
}
private static boolean isRow(Object x) {
return x instanceof Row;
}
/**
* A visitor for table filters.
*/
......@@ -1145,7 +1141,11 @@ public class TableFilter implements ColumnResolver {
private Value getValue(JoinFilter filter, Column column) {
Object x = current.row[filter.id];
assert x != null;
Row row = isRow(x) ? (Row) x : ((Cursor) x).get();
Row row = current.isRow(filter.id) ? (Row) x : ((Cursor) x).get();
int columnId = column.getColumnId();
if (columnId == -1) {
return ValueLong.get(row.getKey());
}
Value value = row.getValue(column.getColumnId());
if (value == null) {
throw DbException.throwInternalError("value is null: " + column + " " + row);
......@@ -1154,6 +1154,11 @@ public class TableFilter implements ColumnResolver {
}
private void start() {
if (filtersCount > 32) {
// This is because we store state in a 64 bit field, 2 bits per joined table.
throw DbException.getUnsupportedException("To many tables in join (at most 32 supported).");
}
// fill filters
filters = new JoinFilter[filtersCount];
JoinFilter jf = top;
......@@ -1164,9 +1169,12 @@ public class TableFilter implements ColumnResolver {
// initialize current row
current = new JoinRow(new Object[filtersCount]);
current.row[top.id] = top.filter.cursor;
current.incrementState(top.id, 2);
assert current.isCursor(top.id);
// initialize top cursor
top.filter.cursor.find(top.filter.session, top.filter.indexConditions);
// we need fake first row because batchedNext always will move to the next row
JoinRow fake = new JoinRow(null);
fake.next = current;
......@@ -1202,7 +1210,7 @@ public class TableFilter implements ColumnResolver {
}
}
private Cursor get(Future<Cursor> f) {
private static Cursor get(Future<Cursor> f) {
try {
return f.get();
} catch (Exception e) {
......@@ -1272,12 +1280,12 @@ public class TableFilter implements ColumnResolver {
} else {
// here we don't care if the current was dropped
current = current.next;
assert !isRow(current.row[jfId]);
assert !current.isRow(jfId);
while (current.row[jfId] == null) {
assert jfId != top.id;
// need to go left and fetch more search rows
jfId--;
assert !isRow(current.row[jfId]);
assert !current.isRow(jfId);
}
}
}
......@@ -1285,32 +1293,37 @@ public class TableFilter implements ColumnResolver {
@SuppressWarnings("unchecked")
private void fetchCurrent(final int jfId) {
assert current.prev == null || isRow(current.prev.row[jfId]) : "prev must be already fetched";
assert jfId == 0 || isRow(current.row[jfId - 1]) : "left must be already fetched";
assert current.prev == null || current.prev.isRow(jfId) : "prev must be already fetched";
assert jfId == 0 || current.isRow(jfId - 1) : "left must be already fetched";
assert !current.isRow(jfId) : "double fetching";
Object x = current.row[jfId];
assert x != null : "x null";
assert !isRow(x) : "double fetching";
final JoinFilter jf = filters[jfId];
// in case of outer join we don't have any future around empty cursor
boolean newCursor = x == EMPTY_CURSOR;
if (!newCursor && x instanceof Future) {
if (!newCursor && current.isFuture(jfId)) {
// get cursor from a future
current.row[jfId] = x = get((Future<Cursor>) x);
current.incrementState(jfId, 1);
assert current.isCursor(jfId);
newCursor = true;
}
Cursor c = (Cursor) x;
assert c != null;
JoinFilter join = jf.join;
for (;;) {
if (c == null || !c.next()) {
if (newCursor && jf.isOuterJoin()) {
// replace cursor with null-row
current.row[jfId] = jf.getNullRow();
current.incrementState(jfId, 1);
assert current.isRow(jfId);
c = null;
newCursor = false;
} else {
......@@ -1336,14 +1349,19 @@ public class TableFilter implements ColumnResolver {
current = current.copyBehind(jf.id);
// get current row from cursor
current.row[jfId] = c.get();
current.incrementState(jfId, 1);
assert current.isRow(jfId);
}
if (joinEmpty) {
assert current.row[join.id] == null;
current.row[join.id] = EMPTY_CURSOR;
current.incrementState(join.id, 2);
assert current.isCursor(join.id);
}
return;
}
}
@Override
public String toString() {
return "JoinBatch->\nprev->" + (current == null ? null : current.prev) +
......@@ -1438,7 +1456,7 @@ public class TableFilter implements ColumnResolver {
// go backwards and assign futures
ListIterator<Future<Cursor>> iter = result.listIterator(result.size());
for (;;) {
assert isRow(current.row[id - 1]);
assert current.isRow(id - 1);
if (current.row[id] == EMPTY_CURSOR) {
// outer join support - skip row with existing empty cursor
current = current.prev;
......@@ -1446,7 +1464,15 @@ public class TableFilter implements ColumnResolver {
}
assert current.row[id] == null;
Future<Cursor> future = iter.previous();
current.row[id] = future == null ? EMPTY_CURSOR : future;
if (future == null) {
current.row[id] = EMPTY_CURSOR;
current.incrementState(id, 2);
assert current.isCursor(id);
} else {
current.row[id] = future;
current.incrementState(id, 1);
assert current.isFuture(id);
}
if (current.prev == null || !iter.hasPrevious()) {
break;
}
......@@ -1458,9 +1484,9 @@ public class TableFilter implements ColumnResolver {
while (current.prev != null && current.prev.row[id] == EMPTY_CURSOR) {
current = current.prev;
}
assert current.prev == null || isRow(current.prev.row[id]);
assert current.prev == null || current.prev.isRow(id);
assert current.row[id] != null;
assert !isRow(current.row[id]);
assert !current.isRow(id);
// the last updated row
return current;
......@@ -1476,6 +1502,12 @@ public class TableFilter implements ColumnResolver {
* Linked row in batched join.
*/
private static final class JoinRow {
private static final long S_FUTURE = 1;
private static final long S_CURSOR = 2;
private static final long S_ROW = 3;
private static final long S_MASK = 3;
/**
* May contain one of the following:
* <br/>- {@code null}: means that we need to get future cursor for this row
......@@ -1484,19 +1516,54 @@ public class TableFilter implements ColumnResolver {
* <br/>- {@link Row}: the {@link Row} is already fetched and is ready to be used
*/
Object[] row;
long state;
JoinRow prev;
JoinRow next;
/**
* @param row Row.
*/
private JoinRow(Object[] row) {
this.row = row;
}
/**
* @param joinFilterId Join filter id.
* @return Row state.
*/
private long getState(int joinFilterId) {
return (state >>> (joinFilterId << 1)) & S_MASK;
}
/**
* Allows to do a state transition in the following order:
* 0. Slot contains {@code null} (no constant because simple null check is sufficient).
* 1. Slot contains {@link Future} ({@link #S_FUTURE}).
* 2. Slot contains {@link Cursor} ({@link #S_CURSOR}).
* 3. Slot contains {@link Row} ({@link #S_ROW}).
*
* @param joinFilterId {@link JoinRow} filter id.
* @param i Increment by this number of moves.
*/
private void incrementState(int joinFilterId, long i) {
state += i << (joinFilterId << 1);
}
private boolean isRow(int joinFilterId) {
return getState(joinFilterId) == S_ROW;
}
private boolean isFuture(int joinFilterId) {
return getState(joinFilterId) == S_FUTURE;
}
private boolean isCursor(int joinFilterId) {
return getState(joinFilterId) == S_CURSOR;
}
private boolean isComplete() {
return isRow(row[row.length - 1]);
return isRow(row.length - 1);
}
private boolean isDropped() {
......@@ -1513,8 +1580,14 @@ public class TableFilter implements ColumnResolver {
row = null;
}
/**
* Copy this JoinRow behind itself in linked list of all in progress rows.
*
* @param jfId The last fetched filter id.
* @return The copy.
*/
private JoinRow copyBehind(int jfId) {
assert row[jfId] instanceof Cursor;
assert isCursor(jfId);
assert jfId + 1 == row.length || row[jfId + 1] == null;
Object[] r = new Object[row.length];
......@@ -1522,6 +1595,7 @@ public class TableFilter implements ColumnResolver {
System.arraycopy(row, 0, r, 0, jfId);
}
JoinRow copy = new JoinRow(r);
copy.state = state;
if (prev != null) {
copy.prev = prev;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论