Unverified 提交 81bd7b8f authored 作者: Evgenij Ryazanov's avatar Evgenij Ryazanov 提交者: GitHub

Merge pull request #1656 from katzyn/window

Optimize window aggregates with ORDER BY + UNBOUNDED PRECEDING + no exclusions
......@@ -21,6 +21,8 @@ Change Log
<h2>Next Version (unreleased)</h2>
<ul>
<li>PR #1656: Optimize window aggregates with ORDER BY + UNBOUNDED PRECEDING + no exclusions
</li>
<li>Issue #1654: OOM in TestMemoryUsage, in big mode
</li>
<li>Issue #1651: TIMESTAMP values near DST may be changed in MVStore database due to UTC-based PageStore format in some
......
......@@ -44,7 +44,9 @@ public interface Aggregate {
void add(Object value) throws SQLException;
/**
* This method returns the computed aggregate value.
* This method returns the computed aggregate value. This method must
* preserve previously added values and must be able to reevaluate result if
* more values were added since its previous invocation.
*
* @return the aggregated value
*/
......
......@@ -47,7 +47,9 @@ public interface AggregateFunction {
void add(Object value) throws SQLException;
/**
* This method returns the computed aggregate value.
* This method returns the computed aggregate value. This method must
* preserve previously added values and must be able to reevaluate result if
* more values were added since its previous invocation.
*
* @return the aggregated value
*/
......
......@@ -16,6 +16,10 @@ import org.h2.engine.Session;
import org.h2.expression.Expression;
import org.h2.expression.analysis.DataAnalysisOperation;
import org.h2.expression.analysis.WindowFrame;
import org.h2.expression.analysis.WindowFrameBound;
import org.h2.expression.analysis.WindowFrameBoundType;
import org.h2.expression.analysis.WindowFrameExclusion;
import org.h2.expression.analysis.WindowFrameUnits;
import org.h2.table.ColumnResolver;
import org.h2.table.TableFilter;
import org.h2.value.Value;
......@@ -83,27 +87,69 @@ public abstract class AbstractAggregate extends DataAnalysisOperation {
protected void getOrderedResultLoop(Session session, HashMap<Integer, Value> result, ArrayList<Value[]> ordered,
int rowIdColumn) {
WindowFrame frame = over.getWindowFrame();
/*
* With RANGE (default) or GROUPS units and EXCLUDE GROUP or EXCLUDE NO
* OTHERS (default) exclusion all rows in the group have the same value
* of window aggregate function.
*/
boolean grouped = frame == null
|| frame.getUnits() != WindowFrameUnits.ROWS && frame.getExclusion().isGroupOrNoOthers();
if (frame == null) {
if (over.getOrderBy() == null) {
assert over.getOrderBy() != null;
aggregateFastPartition(session, result, ordered, rowIdColumn, grouped);
return;
}
if (frame.getStarting().getType() == WindowFrameBoundType.UNBOUNDED_PRECEDING
&& frame.getExclusion() == WindowFrameExclusion.EXCLUDE_NO_OTHERS) {
WindowFrameBound following = frame.getFollowing();
if (following != null && following.getType() == WindowFrameBoundType.UNBOUNDED_FOLLOWING) {
aggregateWholePartition(session, result, ordered, rowIdColumn);
return;
} else {
aggregateFastPartition(session, result, ordered, rowIdColumn, grouped);
}
} else if (frame.isFullPartition()) {
aggregateWholePartition(session, result, ordered, rowIdColumn);
return;
}
// All other types of frames (slow)
int size = ordered.size();
for (int i = 0; i < size; i++) {
for (int i = 0; i < size;) {
Object aggregateData = createAggregateData();
for (Iterator<Value[]> iter = WindowFrame.iterator(over, session, ordered, getOverOrderBySort(), i,
false); iter.hasNext();) {
updateFromExpressions(session, aggregateData, iter.next());
}
result.put(ordered.get(i)[rowIdColumn].getInt(), getAggregatedValue(session, aggregateData));
i = processGroup(session, result, ordered, rowIdColumn, i, size, aggregateData, grouped);
}
}
private void aggregateFastPartition(Session session, HashMap<Integer, Value> result, ArrayList<Value[]> ordered,
int rowIdColumn, boolean grouped) {
Object aggregateData = createAggregateData();
int size = ordered.size();
int lastIncludedRow = -1;
for (int i = 0; i < size;) {
int newLast = WindowFrame.getEndIndex(over, session, ordered, getOverOrderBySort(), i);
assert newLast >= lastIncludedRow;
if (newLast > lastIncludedRow) {
for (int j = lastIncludedRow + 1; j <= newLast; j++) {
updateFromExpressions(session, aggregateData, ordered.get(j));
}
lastIncludedRow = newLast;
}
i = processGroup(session, result, ordered, rowIdColumn, i, size, aggregateData, grouped);
}
}
private int processGroup(Session session, HashMap<Integer, Value> result, ArrayList<Value[]> ordered,
int rowIdColumn, int i, int size, Object aggregateData, boolean grouped) {
Value[] firstRowInGroup = ordered.get(i), currentRowInGroup = firstRowInGroup;
Value r = getAggregatedValue(session, aggregateData);
do {
result.put(currentRowInGroup[rowIdColumn].getInt(), r);
} while (++i < size && grouped
&& overOrderBySort.compare(firstRowInGroup, currentRowInGroup = ordered.get(i)) == 0);
return i;
}
private void aggregateWholePartition(Session session, HashMap<Integer, Value> result, ArrayList<Value[]> ordered,
int rowIdColumn) {
// Aggregate values from the whole partition
......
......@@ -219,6 +219,35 @@ public final class WindowFrame {
reverse);
}
/**
* Returns end index for the specified frame, or default end index if frame
* is null.
*
* @param over
* window
* @param session
* the session
* @param orderedRows
* ordered rows
* @param sortOrder
* sort order
* @param currentRow
* index of the current row
* @return end index
* @throws UnsupportedOperationException
* if over is not null and its exclusion clause is not EXCLUDE
* NO OTHERS
*/
public static int getEndIndex(Window over, Session session, ArrayList<Value[]> orderedRows, SortOrder sortOrder,
int currentRow) {
WindowFrame frame = over.getWindowFrame();
if (frame != null) {
return frame.getEndIndex(session, orderedRows, sortOrder, currentRow);
}
int endIndex = orderedRows.size() - 1;
return over.getOrderBy() == null ? endIndex : toGroupEnd(orderedRows, sortOrder, currentRow, endIndex);
}
private static Iterator<Value[]> plainIterator(ArrayList<Value[]> orderedRows, int startIndex, int endIndex,
boolean reverse) {
if (endIndex < startIndex) {
......@@ -308,6 +337,42 @@ public final class WindowFrame {
this.exclusion = exclusion;
}
/**
* Returns the units.
*
* @return the units
*/
public WindowFrameUnits getUnits() {
return units;
}
/**
* Returns the starting clause.
*
* @return the starting clause
*/
public WindowFrameBound getStarting() {
return starting;
}
/**
* Returns the following clause.
*
* @return the following clause, or null
*/
public WindowFrameBound getFollowing() {
return following;
}
/**
* Returns the exclusion clause.
*
* @return the exclusion clause
*/
public WindowFrameExclusion getExclusion() {
return exclusion;
}
/**
* Checks validity of this frame.
*
......@@ -320,18 +385,6 @@ public final class WindowFrame {
&& s.compareTo(f) <= 0;
}
/**
* Returns whether window frame specification contains all rows in
* partition.
*
* @return whether window frame specification contains all rows in partition
*/
public boolean isFullPartition() {
return starting.getType() == WindowFrameBoundType.UNBOUNDED_PRECEDING && following != null
&& following.getType() == WindowFrameBoundType.UNBOUNDED_FOLLOWING
&& exclusion == WindowFrameExclusion.EXCLUDE_NO_OTHERS;
}
/**
* Returns iterator.
*
......@@ -345,7 +398,6 @@ public final class WindowFrame {
* index of the current row
* @param reverse
* whether iterator should iterate in reverse order
*
* @return iterator
*/
public Iterator<Value[]> iterator(Session session, ArrayList<Value[]> orderedRows, SortOrder sortOrder,
......@@ -372,6 +424,35 @@ public final class WindowFrame {
: plainIterator(orderedRows, startIndex, endIndex, reverse);
}
/**
* Returns end index of this frame,
*
* @param session
* the session
* @param orderedRows
* ordered rows
* @param sortOrder
* sort order
* @param currentRow
* index of the current row
* @return end index
* @throws UnsupportedOperationException
* if exclusion clause is not EXCLUDE NO OTHERS
*/
private int getEndIndex(Session session, ArrayList<Value[]> orderedRows, SortOrder sortOrder, int currentRow) {
if (exclusion != WindowFrameExclusion.EXCLUDE_NO_OTHERS) {
throw new UnsupportedOperationException();
}
int endIndex = following != null ? getIndex(session, orderedRows, sortOrder, currentRow, following, true)
: units == WindowFrameUnits.ROWS ? currentRow
: toGroupEnd(orderedRows, sortOrder, currentRow, orderedRows.size() - 1);
int size = orderedRows.size();
if (endIndex >= size) {
endIndex = size - 1;
}
return endIndex;
}
private int getIndex(Session session, ArrayList<Value[]> orderedRows, SortOrder sortOrder, int currentRow,
WindowFrameBound bound, boolean forFollowing) {
int size = orderedRows.size();
......
......@@ -38,6 +38,17 @@ public enum WindowFrameExclusion {
this.sql = sql;
}
/**
* Returns true if this exclusion clause excludes or includes the whole
* group.
*
* @return true if this exclusion clause is {@link #EXCLUDE_GROUP} or
* {@link #EXCLUDE_NO_OTHERS}
*/
public boolean isGroupOrNoOthers() {
return this == WindowFrameExclusion.EXCLUDE_GROUP || this == EXCLUDE_NO_OTHERS;
}
/**
* Returns SQL representation.
*
......
......@@ -805,4 +805,4 @@ queryparser tokenized freeze factorings recompilation unenclosed rfe dsync
econd irst bcef ordinality nord unnest
analyst occupation distributive josaph aor engineer sajeewa isuru randil kevin doctor businessman artist ashan
corrupts splitted disruption unintentional octets preconditions predicates subq objectweb insn opcodes
preserves masking holder unboxing avert iae transformed subtle
preserves masking holder unboxing avert iae transformed subtle reevaluate exclusions
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论