提交 05ea7099 authored 作者: Evgenij Ryazanov's avatar Evgenij Ryazanov

Use optimized implementations with variable frames when they are not modified within partition

上级 aa3c98ba
......@@ -2709,7 +2709,7 @@ ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW EXCLUDE TIES
UNBOUNDED PRECEDING|value PRECEDING|CURRENT ROW
","
A window frame preceding clause.
If value is specified it should be non-negative value or parameter.
If value is specified it should not be negative.
","
UNBOUNDED PRECEDING
1 PRECEDING
......@@ -2721,7 +2721,7 @@ UNBOUNDED PRECEDING|value PRECEDING|CURRENT ROW
|value FOLLOWING|UNBOUNDED FOLLOWING
","
A window frame bound clause.
If value is specified it should be non-negative value or parameter.
If value is specified it should not be negative.
","
UNBOUNDED PRECEDING
UNBOUNDED FOLLOWING
......
......@@ -95,10 +95,15 @@ public abstract class AbstractAggregate extends DataAnalysisOperation {
boolean grouped = frame == null
|| frame.getUnits() != WindowFrameUnits.ROWS && frame.getExclusion().isGroupOrNoOthers();
if (frame == null) {
aggregateFastPartition(session, result, ordered, rowIdColumn, grouped);
aggregateFastPartition(session, result, ordered, -1, rowIdColumn, grouped);
return;
}
if (frame.isVariableBounds()) {
int frameParametersOffset = getWindowFrameParametersOffset();
boolean variableBounds = frame.isVariableBounds();
if (variableBounds) {
variableBounds = checkVariableBounds(frame, ordered, frameParametersOffset);
}
if (variableBounds) {
grouped = false;
} else if (frame.getExclusion() == WindowFrameExclusion.EXCLUDE_NO_OTHERS) {
WindowFrameBound following = frame.getFollowing();
......@@ -108,18 +113,17 @@ public abstract class AbstractAggregate extends DataAnalysisOperation {
if (unboundedFollowing) {
aggregateWholePartition(session, result, ordered, rowIdColumn);
} else {
aggregateFastPartition(session, result, ordered, rowIdColumn, grouped);
aggregateFastPartition(session, result, ordered, frameParametersOffset, rowIdColumn, grouped);
}
return;
}
if (unboundedFollowing) {
aggregateFastPartitionInReverse(session, result, ordered, rowIdColumn, grouped);
aggregateFastPartitionInReverse(session, result, ordered, frameParametersOffset, rowIdColumn, grouped);
return;
}
}
// All other types of frames (slow)
int size = ordered.size();
int frameParametersOffset = getWindowFrameParametersOffset();
for (int i = 0; i < size;) {
Object aggregateData = createAggregateData();
for (Iterator<Value[]> iter = WindowFrame.iterator(over, session, ordered, getOverOrderBySort(),
......@@ -131,14 +135,39 @@ public abstract class AbstractAggregate extends DataAnalysisOperation {
}
}
private static boolean checkVariableBounds(WindowFrame frame, ArrayList<Value[]> ordered,
int frameParametersOffset) {
int size = ordered.size();
int offset = frameParametersOffset;
if (frame.getStarting().isVariable()) {
Value v = ordered.get(0)[offset];
for (int i = 1; i < size; i++) {
if (!v.equals(ordered.get(i)[offset])) {
return true;
}
}
offset++;
}
if (frame.getFollowing() != null && frame.getFollowing().isVariable()) {
Value v = ordered.get(0)[offset];
for (int i = 1; i < size; i++) {
if (!v.equals(ordered.get(i)[offset])) {
return true;
}
}
}
return false;
}
private void aggregateFastPartition(Session session, HashMap<Integer, Value> result, ArrayList<Value[]> ordered,
int rowIdColumn, boolean grouped) {
int frameParametersOffset, int rowIdColumn, boolean grouped) {
Object aggregateData = createAggregateData();
int size = ordered.size();
int lastIncludedRow = -1;
Value r = null;
for (int i = 0; i < size;) {
int newLast = WindowFrame.getEndIndex(over, session, ordered, getOverOrderBySort(), i);
int newLast = WindowFrame.getEndIndex(over, session, ordered, frameParametersOffset, getOverOrderBySort(),
i);
assert newLast >= lastIncludedRow;
if (newLast > lastIncludedRow) {
for (int j = lastIncludedRow + 1; j <= newLast; j++) {
......@@ -154,12 +183,13 @@ public abstract class AbstractAggregate extends DataAnalysisOperation {
}
private void aggregateFastPartitionInReverse(Session session, HashMap<Integer, Value> result,
ArrayList<Value[]> ordered, int rowIdColumn, boolean grouped) {
ArrayList<Value[]> ordered, int frameParametersOffset, int rowIdColumn, boolean grouped) {
Object aggregateData = createAggregateData();
int firstIncludedRow = ordered.size();
Value r = null;
for (int i = firstIncludedRow - 1; i >= 0;) {
int newLast = over.getWindowFrame().getStartIndex(session, ordered, getOverOrderBySort(), i);
int newLast = over.getWindowFrame().getStartIndex(session, ordered, frameParametersOffset,
getOverOrderBySort(), i);
assert newLast <= firstIncludedRow;
if (newLast < firstIncludedRow) {
for (int j = firstIncludedRow - 1; j >= newLast; j--) {
......
......@@ -232,6 +232,8 @@ public final class WindowFrame {
* the session
* @param orderedRows
* ordered rows
* @param frameParametersOffset
* offset of window frame parameters
* @param sortOrder
* sort order
* @param currentRow
......@@ -241,11 +243,11 @@ public final class WindowFrame {
* if over is not null and its exclusion clause is not EXCLUDE
* NO OTHERS
*/
public static int getEndIndex(Window over, Session session, ArrayList<Value[]> orderedRows, SortOrder sortOrder,
int currentRow) {
public static int getEndIndex(Window over, Session session, ArrayList<Value[]> orderedRows,
int frameParametersOffset, SortOrder sortOrder, int currentRow) {
WindowFrame frame = over.getWindowFrame();
if (frame != null) {
return frame.getEndIndex(session, orderedRows, sortOrder, currentRow);
return frame.getEndIndex(session, orderedRows, frameParametersOffset, sortOrder, currentRow);
}
int endIndex = orderedRows.size() - 1;
return over.getOrderBy() == null ? endIndex : toGroupEnd(orderedRows, sortOrder, currentRow, endIndex);
......@@ -505,6 +507,8 @@ public final class WindowFrame {
* the session
* @param orderedRows
* ordered rows
* @param frameParametersOffset
* offset of window frame parameters
* @param sortOrder
* sort order
* @param currentRow
......@@ -513,11 +517,12 @@ public final class WindowFrame {
* @throws UnsupportedOperationException
* if exclusion clause is not EXCLUDE NO OTHERS
*/
public int getStartIndex(Session session, ArrayList<Value[]> orderedRows, SortOrder sortOrder, int currentRow) {
public int getStartIndex(Session session, ArrayList<Value[]> orderedRows, int frameParametersOffset,
SortOrder sortOrder, int currentRow) {
if (exclusion != WindowFrameExclusion.EXCLUDE_NO_OTHERS) {
throw new UnsupportedOperationException();
}
int startIndex = getIndex(session, orderedRows, sortOrder, currentRow, starting, -1, false);
int startIndex = getIndex(session, orderedRows, sortOrder, currentRow, starting, frameParametersOffset, false);
if (startIndex < 0) {
startIndex = 0;
}
......@@ -531,6 +536,8 @@ public final class WindowFrame {
* the session
* @param orderedRows
* ordered rows
* @param frameParametersOffset
* offset of window frame parameters
* @param sortOrder
* sort order
* @param currentRow
......@@ -539,11 +546,17 @@ public final class WindowFrame {
* @throws UnsupportedOperationException
* if exclusion clause is not EXCLUDE NO OTHERS
*/
private int getEndIndex(Session session, ArrayList<Value[]> orderedRows, SortOrder sortOrder, int currentRow) {
private int getEndIndex(Session session, ArrayList<Value[]> orderedRows, int frameParametersOffset,
SortOrder sortOrder, int currentRow) {
if (exclusion != WindowFrameExclusion.EXCLUDE_NO_OTHERS) {
throw new UnsupportedOperationException();
}
int endIndex = following != null ? getIndex(session, orderedRows, sortOrder, currentRow, following, -1, true)
int followingOffset = frameParametersOffset;
if (starting.isVariable()) {
followingOffset++;
}
int endIndex = following != null
? getIndex(session, orderedRows, sortOrder, currentRow, following, followingOffset, true)
: units == WindowFrameUnits.ROWS ? currentRow
: toGroupEnd(orderedRows, sortOrder, currentRow, orderedRows.size() - 1);
int size = orderedRows.size();
......
......@@ -614,5 +614,25 @@ SELECT ID, VALUE,
> 8 4 [1, 2, 3, 4, 5, 6, 7, 8] [4, 5, 6, 7, 8] [1, 2, 3, 4, 5, 6, 7, 8] [4, 5, 6, 7, 8]
> rows: 8
SELECT ID, VALUE,
ARRAY_AGG(ID ORDER BY ID) OVER
(PARTITION BY VALUE ORDER BY ID ROWS BETWEEN VALUE / 3 PRECEDING AND VALUE / 3 FOLLOWING) A,
ARRAY_AGG(ID ORDER BY ID) OVER
(PARTITION BY VALUE ORDER BY ID ROWS BETWEEN UNBOUNDED PRECEDING AND VALUE / 3 FOLLOWING) AP,
ARRAY_AGG(ID ORDER BY ID) OVER
(PARTITION BY VALUE ORDER BY ID ROWS BETWEEN VALUE / 3 PRECEDING AND UNBOUNDED FOLLOWING) AF
FROM TEST;
> ID VALUE A AP AF
> -- ----- ------ ------ ------
> 1 1 [1] [1] [1, 2]
> 2 1 [2] [1, 2] [2]
> 3 2 [3] [3] [3, 4]
> 4 2 [4] [3, 4] [4]
> 5 3 [5, 6] [5, 6] [5, 6]
> 6 3 [5, 6] [5, 6] [5, 6]
> 7 4 [7, 8] [7, 8] [7, 8]
> 8 4 [7, 8] [7, 8] [7, 8]
> rows: 8
DROP TABLE TEST;
> ok
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论