Skip to content
项目
群组
代码片段
帮助
正在加载...
帮助
为 GitLab 提交贡献
登录/注册
切换导航
H
h2database
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分枝图
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
计划
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
分枝图
统计图
创建新议题
作业
提交
议题看板
打开侧边栏
Administrator
h2database
Commits
4574944c
提交
4574944c
authored
10月 27, 2015
作者:
Thomas Mueller
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #189 from svladykin/batchJoin2
Batching support for index lookups and batched joins.
上级
79ea94ee
998283ae
隐藏空白字符变更
内嵌
并排
正在显示
8 个修改的文件
包含
1023 行增加
和
51 行删除
+1023
-51
CommandContainer.java
h2/src/main/org/h2/command/CommandContainer.java
+1
-1
BaseIndex.java
h2/src/main/org/h2/index/BaseIndex.java
+3
-8
Index.java
h2/src/main/org/h2/index/Index.java
+5
-21
IndexLookupBatch.java
h2/src/main/org/h2/index/IndexLookupBatch.java
+50
-0
MultiVersionIndex.java
h2/src/main/org/h2/index/MultiVersionIndex.java
+3
-7
TableFilter.java
h2/src/main/org/h2/table/TableFilter.java
+630
-2
DoneFuture.java
h2/src/main/org/h2/util/DoneFuture.java
+55
-0
TestTableEngines.java
h2/src/test/org/h2/test/db/TestTableEngines.java
+276
-12
没有找到文件。
h2/src/main/org/h2/command/CommandContainer.java
浏览文件 @
4574944c
...
...
@@ -17,7 +17,7 @@ import org.h2.value.ValueNull;
* Represents a single SQL statements.
* It wraps a prepared statement.
*/
class
CommandContainer
extends
Command
{
public
class
CommandContainer
extends
Command
{
private
Prepared
prepared
;
private
boolean
readOnlyKnown
;
...
...
h2/src/main/org/h2/index/BaseIndex.java
浏览文件 @
4574944c
...
...
@@ -427,13 +427,8 @@ public abstract class BaseIndex extends SchemaObjectBase implements Index {
}
@Override
public
int
getPreferedLookupBatchSize
()
{
// No batched lookups supported by default.
return
0
;
}
@Override
public
List
<
Future
<
Cursor
>>
findBatched
(
TableFilter
filter
,
List
<
SearchRow
>
firstLastPairs
)
{
throw
DbException
.
throwInternalError
(
"Must not be called if getPreferedLookupBatchSize() is 0."
);
public
IndexLookupBatch
createLookupBatch
(
TableFilter
filter
)
{
// Lookup batching is not supported.
return
null
;
}
}
h2/src/main/org/h2/index/Index.java
浏览文件 @
4574944c
...
...
@@ -259,27 +259,11 @@ public interface Index extends SchemaObject {
void
setSortedInsertMode
(
boolean
sortedInsertMode
);
/**
*
If this index can do batched lookups, it may return it's preferred batch size,
*
otherwise it must return 0
.
*
Creates new lookup batch. Note that returned {@link IndexLookupBatch} instance
*
can be used multiple times
.
*
* @
return preferred batch size or 0 if lookup batching is not supported
* @
see #findBatched(TableFilter, Collection)
* @
param filter Table filter.
* @
return Created batch or {@code null} if batched lookup is not supported by this index.
*/
int
getPreferedLookupBatchSize
();
/**
* Do batched lookup over the given collection of {@link SearchRow} pairs as in
* {@link #find(TableFilter, SearchRow, SearchRow)}.
* <br/><br/>
* Correct implementation must always return number of future cursors equal to
* {@code firstLastPairs.size() / 2}. Instead of {@link Future} containing empty
* {@link Cursor} it is possible to put {@code null} in result list.
*
* @param filter the table filter
* @param firstLastPairs List of batched search row pairs as in
* {@link #find(TableFilter, SearchRow, SearchRow)}, the collection will be reused by H2,
* thus it makes sense to defensively copy contents if needed.
* @return batched cursors for respective search row pairs in the same order
*/
List
<
Future
<
Cursor
>>
findBatched
(
TableFilter
filter
,
List
<
SearchRow
>
firstLastPairs
);
IndexLookupBatch
createLookupBatch
(
TableFilter
filter
);
}
h2/src/main/org/h2/index/IndexLookupBatch.java
0 → 100644
浏览文件 @
4574944c
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package
org
.
h2
.
index
;
import
java.util.List
;
import
java.util.concurrent.Future
;
import
org.h2.result.SearchRow
;
/**
* Support for asynchronous batched lookups in indexes. The flow is the following:
* H2 engine will be calling {@link #addSearchRows(SearchRow, SearchRow)} until
* method {@link #isBatchFull()}} will return {@code true} or there are no more
* search rows to add. Then method {@link #find()} will be called to execute batched lookup.
* Note that a single instance of {@link IndexLookupBatch} can be reused for multiple
* sequential batched lookups.
*
* @see Index#createLookupBatch(TableFilter)
* @author Sergi Vladykin
*/
public
interface
IndexLookupBatch
{
/**
* Add search row pair to the batch.
*
* @param first the first row, or null for no limit
* @param last the last row, or null for no limit
* @see Index#find(TableFilter, SearchRow, SearchRow)
*/
void
addSearchRows
(
SearchRow
first
,
SearchRow
last
);
/**
* Check if this batch is full.
*
* @return {@code true} If batch is full, will not accept any
* more rows and {@link #find()} can be executed.
*/
boolean
isBatchFull
();
/**
* Execute batched lookup and return future cursor for each provided
* search row pair. Note that this method must return exactly the same number
* of future cursors in result list as number of {@link #addSearchRows(SearchRow, SearchRow)}
* calls has been done before {@link #find()} call exactly in the same order.
*
* @return List of future cursors for collected search rows.
*/
List
<
Future
<
Cursor
>>
find
();
}
h2/src/main/org/h2/index/MultiVersionIndex.java
浏览文件 @
4574944c
...
...
@@ -389,12 +389,8 @@ public class MultiVersionIndex implements Index {
}
@Override
public
int
getPreferedLookupBatchSize
()
{
return
0
;
}
@Override
public
List
<
Future
<
Cursor
>>
findBatched
(
TableFilter
filter
,
List
<
SearchRow
>
firstLastPairs
)
{
throw
DbException
.
throwInternalError
(
"Must never be called."
);
public
IndexLookupBatch
createLookupBatch
(
TableFilter
filter
)
{
// Lookup batching is not supported.
return
null
;
}
}
h2/src/main/org/h2/table/TableFilter.java
浏览文件 @
4574944c
...
...
@@ -5,7 +5,13 @@
*/
package
org
.
h2
.
table
;
import
java.util.AbstractList
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.ListIterator
;
import
java.util.concurrent.Future
;
import
org.h2.command.Parser
;
import
org.h2.command.dml.Select
;
import
org.h2.engine.Right
;
...
...
@@ -16,13 +22,16 @@ import org.h2.expression.Comparison;
import
org.h2.expression.ConditionAndOr
;
import
org.h2.expression.Expression
;
import
org.h2.expression.ExpressionColumn
;
import
org.h2.index.Cursor
;
import
org.h2.index.Index
;
import
org.h2.index.IndexLookupBatch
;
import
org.h2.index.IndexCondition
;
import
org.h2.index.IndexCursor
;
import
org.h2.message.DbException
;
import
org.h2.result.Row
;
import
org.h2.result.SearchRow
;
import
org.h2.result.SortOrder
;
import
org.h2.util.DoneFuture
;
import
org.h2.util.New
;
import
org.h2.util.StatementBuilder
;
import
org.h2.util.StringUtils
;
...
...
@@ -40,6 +49,33 @@ public class TableFilter implements ColumnResolver {
private
static
final
int
BEFORE_FIRST
=
0
,
FOUND
=
1
,
AFTER_LAST
=
2
,
NULL_ROW
=
3
;
private
static
final
Cursor
EMPTY_CURSOR
=
new
Cursor
()
{
@Override
public
boolean
previous
()
{
return
false
;
}
@Override
public
boolean
next
()
{
return
false
;
}
@Override
public
SearchRow
getSearchRow
()
{
return
null
;
}
@Override
public
Row
get
()
{
return
null
;
}
@Override
public
String
toString
()
{
return
"EMPTY_CURSOR"
;
}
};
/**
* Whether this is a direct or indirect (nested) outer join
*/
...
...
@@ -54,6 +90,12 @@ public class TableFilter implements ColumnResolver {
private
int
scanCount
;
private
boolean
evaluatable
;
/**
* Batched join support.
*/
private
JoinBatch
joinBatch
;
private
JoinFilter
joinFilter
;
/**
* Indicates that this filter is used in the plan.
*/
...
...
@@ -291,16 +333,38 @@ public class TableFilter implements ColumnResolver {
* Start the query. This will reset the scan counts.
*
* @param s the session
* @return join batch if query runs over index which supports batched lookups, null otherwise
*/
public
void
startQuery
(
Session
s
)
{
public
JoinBatch
startQuery
(
Session
s
)
{
joinBatch
=
null
;
joinFilter
=
null
;
this
.
session
=
s
;
scanCount
=
0
;
if
(
nestedJoin
!=
null
)
{
nestedJoin
.
startQuery
(
s
);
}
JoinBatch
batch
=
null
;
if
(
join
!=
null
)
{
join
.
startQuery
(
s
);
batch
=
join
.
startQuery
(
s
);
}
IndexLookupBatch
lookupBatch
=
null
;
if
(
batch
==
null
&&
select
!=
null
&&
select
.
getTopTableFilter
()
!=
this
)
{
lookupBatch
=
index
.
createLookupBatch
(
this
);
if
(
lookupBatch
!=
null
)
{
batch
=
new
JoinBatch
(
join
);
}
}
if
(
batch
!=
null
)
{
if
(
nestedJoin
!=
null
)
{
throw
DbException
.
getUnsupportedException
(
"nested join with batched index"
);
}
if
(
lookupBatch
==
null
)
{
lookupBatch
=
index
.
createLookupBatch
(
this
);
}
joinBatch
=
batch
;
joinFilter
=
batch
.
register
(
this
,
lookupBatch
);
}
return
batch
;
}
/**
...
...
@@ -323,6 +387,10 @@ public class TableFilter implements ColumnResolver {
* @return true if there are
*/
public
boolean
next
()
{
if
(
joinBatch
!=
null
)
{
// will happen only on topTableFilter since jbatch.next does not call join.next()
return
joinBatch
.
next
();
}
if
(
state
==
AFTER_LAST
)
{
return
false
;
}
else
if
(
state
==
BEFORE_FIRST
)
{
...
...
@@ -882,6 +950,9 @@ public class TableFilter implements ColumnResolver {
@Override
public
Value
getValue
(
Column
column
)
{
if
(
joinBatch
!=
null
)
{
return
joinBatch
.
getValue
(
joinFilter
,
column
);
}
if
(
currentSearchRow
==
null
)
{
return
null
;
}
...
...
@@ -1031,4 +1102,561 @@ public class TableFilter implements ColumnResolver {
void
accept
(
TableFilter
f
);
}
/**
* Support for asynchronous batched index lookups on joins.
*
* @see Index#findBatched(TableFilter, java.util.Collection)
* @see Index#getPreferedLookupBatchSize()
*
* @author Sergi Vladykin
*/
private
static
final
class
JoinBatch
{
int
filtersCount
;
JoinFilter
[]
filters
;
JoinFilter
top
;
boolean
started
;
JoinRow
current
;
boolean
found
;
/**
* This filter joined after this batched join and can be used normally.
*/
final
TableFilter
additionalFilter
;
/**
* @param additionalFilter table filter after this batched join.
*/
private
JoinBatch
(
TableFilter
additionalFilter
)
{
this
.
additionalFilter
=
additionalFilter
;
}
/**
* @param filter table filter
* @param lookupBatch lookup batch
*/
private
JoinFilter
register
(
TableFilter
filter
,
IndexLookupBatch
lookupBatch
)
{
assert
filter
!=
null
;
filtersCount
++;
return
top
=
new
JoinFilter
(
lookupBatch
,
filter
,
top
);
}
/**
* @param filterId table filter id
* @param column column
* @return column value for current row
*/
private
Value
getValue
(
JoinFilter
filter
,
Column
column
)
{
Object
x
=
current
.
row
(
filter
.
id
);
assert
x
!=
null
;
Row
row
=
current
.
isRow
(
filter
.
id
)
?
(
Row
)
x
:
((
Cursor
)
x
).
get
();
int
columnId
=
column
.
getColumnId
();
if
(
columnId
==
-
1
)
{
return
ValueLong
.
get
(
row
.
getKey
());
}
Value
value
=
row
.
getValue
(
column
.
getColumnId
());
if
(
value
==
null
)
{
throw
DbException
.
throwInternalError
(
"value is null: "
+
column
+
" "
+
row
);
}
return
value
;
}
private
void
start
()
{
if
(
filtersCount
>
32
)
{
// This is because we store state in a 64 bit field, 2 bits per joined table.
throw
DbException
.
getUnsupportedException
(
"Too many tables in join (at most 32 supported)."
);
}
// fill filters
filters
=
new
JoinFilter
[
filtersCount
];
JoinFilter
jf
=
top
;
for
(
int
i
=
0
;
i
<
filtersCount
;
i
++)
{
filters
[
jf
.
id
=
i
]
=
jf
;
jf
=
jf
.
join
;
}
// initialize current row
current
=
new
JoinRow
(
new
Object
[
filtersCount
]);
current
.
updateRow
(
top
.
id
,
top
.
filter
.
cursor
,
JoinRow
.
S_NULL
,
JoinRow
.
S_CURSOR
);
// initialize top cursor
top
.
filter
.
cursor
.
find
(
top
.
filter
.
session
,
top
.
filter
.
indexConditions
);
// we need fake first row because batchedNext always will move to the next row
JoinRow
fake
=
new
JoinRow
(
null
);
fake
.
next
=
current
;
current
=
fake
;
}
private
boolean
next
()
{
if
(!
started
)
{
start
();
started
=
true
;
}
if
(
additionalFilter
==
null
)
{
if
(
batchedNext
())
{
assert
current
.
isComplete
();
return
true
;
}
return
false
;
}
for
(;;)
{
if
(!
found
)
{
if
(!
batchedNext
())
{
return
false
;
}
assert
current
.
isComplete
();
found
=
true
;
additionalFilter
.
reset
();
}
// we call furtherFilter in usual way outside of this batch because it is more effective
if
(
additionalFilter
.
next
())
{
return
true
;
}
found
=
false
;
}
}
private
static
Cursor
get
(
Future
<
Cursor
>
f
)
{
try
{
return
f
.
get
();
}
catch
(
Exception
e
)
{
throw
DbException
.
convert
(
e
);
}
}
private
boolean
batchedNext
()
{
if
(
current
==
null
)
{
// after last
return
false
;
}
// go next
current
=
current
.
next
;
if
(
current
==
null
)
{
return
false
;
}
current
.
prev
=
null
;
final
int
lastJfId
=
filtersCount
-
1
;
int
jfId
=
lastJfId
;
while
(
current
.
row
(
jfId
)
==
null
)
{
// lookup for the first non fetched filter for the current row
jfId
--;
}
for
(;;)
{
fetchCurrent
(
jfId
);
if
(!
current
.
isDropped
())
{
// if current was not dropped then it must be fetched successfully
if
(
jfId
==
lastJfId
)
{
// the whole join row is ready to be returned
return
true
;
}
JoinFilter
join
=
filters
[
jfId
+
1
];
if
(
join
.
isBatchFull
())
{
// get future cursors for join and go right to fetch them
current
=
join
.
find
(
current
);
}
if
(
current
.
row
(
join
.
id
)
!=
null
)
{
// either find called or outer join with null row
jfId
=
join
.
id
;
continue
;
}
}
// we have to go down and fetch next cursors for jfId if it is possible
if
(
current
.
next
==
null
)
{
// either dropped or null-row
if
(
current
.
isDropped
())
{
current
=
current
.
prev
;
if
(
current
==
null
)
{
return
false
;
}
}
assert
!
current
.
isDropped
();
assert
jfId
!=
lastJfId
;
jfId
=
0
;
while
(
current
.
row
(
jfId
)
!=
null
)
{
jfId
++;
}
// force find on half filled batch (there must be either searchRows
// or Cursor.EMPTY set for null-rows)
current
=
filters
[
jfId
].
find
(
current
);
}
else
{
// here we don't care if the current was dropped
current
=
current
.
next
;
assert
!
current
.
isRow
(
jfId
);
while
(
current
.
row
(
jfId
)
==
null
)
{
assert
jfId
!=
top
.
id
;
// need to go left and fetch more search rows
jfId
--;
assert
!
current
.
isRow
(
jfId
);
}
}
}
}
@SuppressWarnings
(
"unchecked"
)
private
void
fetchCurrent
(
final
int
jfId
)
{
assert
current
.
prev
==
null
||
current
.
prev
.
isRow
(
jfId
)
:
"prev must be already fetched"
;
assert
jfId
==
0
||
current
.
isRow
(
jfId
-
1
)
:
"left must be already fetched"
;
assert
!
current
.
isRow
(
jfId
)
:
"double fetching"
;
Object
x
=
current
.
row
(
jfId
);
assert
x
!=
null
:
"x null"
;
final
JoinFilter
jf
=
filters
[
jfId
];
// in case of outer join we don't have any future around empty cursor
boolean
newCursor
=
x
==
EMPTY_CURSOR
;
if
(!
newCursor
&&
current
.
isFuture
(
jfId
))
{
// get cursor from a future
x
=
get
((
Future
<
Cursor
>)
x
);
current
.
updateRow
(
jfId
,
x
,
JoinRow
.
S_FUTURE
,
JoinRow
.
S_CURSOR
);
newCursor
=
true
;
}
Cursor
c
=
(
Cursor
)
x
;
assert
c
!=
null
;
JoinFilter
join
=
jf
.
join
;
for
(;;)
{
if
(
c
==
null
||
!
c
.
next
())
{
if
(
newCursor
&&
jf
.
isOuterJoin
())
{
// replace cursor with null-row
current
.
updateRow
(
jfId
,
jf
.
getNullRow
(),
JoinRow
.
S_CURSOR
,
JoinRow
.
S_ROW
);
c
=
null
;
newCursor
=
false
;
}
else
{
// cursor is done, drop it
current
.
drop
();
return
;
}
}
if
(!
jf
.
isOk
(
c
==
null
))
{
// try another row from the cursor
continue
;
}
boolean
joinEmpty
=
false
;
if
(
join
!=
null
&&
!
join
.
collectSearchRows
())
{
if
(
join
.
isOuterJoin
())
{
joinEmpty
=
true
;
}
else
{
// join will fail, try next row in the cursor
continue
;
}
}
if
(
c
!=
null
)
{
current
=
current
.
copyBehind
(
jfId
);
// get current row from cursor
current
.
updateRow
(
jfId
,
c
.
get
(),
JoinRow
.
S_CURSOR
,
JoinRow
.
S_ROW
);
}
if
(
joinEmpty
)
{
current
.
updateRow
(
join
.
id
,
EMPTY_CURSOR
,
JoinRow
.
S_NULL
,
JoinRow
.
S_CURSOR
);
}
return
;
}
}
@Override
public
String
toString
()
{
return
"JoinBatch->\nprev->"
+
(
current
==
null
?
null
:
current
.
prev
)
+
"\ncurr->"
+
current
+
"\nnext->"
+
(
current
==
null
?
null
:
current
.
next
);
}
}
/**
* Table filter participating in batched join.
*/
private
static
final
class
JoinFilter
{
final
TableFilter
filter
;
final
JoinFilter
join
;
int
id
;
IndexLookupBatch
lookupBatch
;
private
JoinFilter
(
IndexLookupBatch
lookupBatch
,
TableFilter
filter
,
JoinFilter
join
)
{
this
.
filter
=
filter
;
this
.
join
=
join
;
this
.
lookupBatch
=
lookupBatch
!=
null
?
lookupBatch
:
new
FakeLookupBatch
(
filter
);
}
public
Row
getNullRow
()
{
return
filter
.
table
.
getNullRow
();
}
private
boolean
isOuterJoin
()
{
return
filter
.
joinOuter
;
}
private
boolean
isBatchFull
()
{
return
lookupBatch
.
isBatchFull
();
}
private
boolean
isOk
(
boolean
ignoreJoinCondition
)
{
boolean
filterOk
=
filter
.
isOk
(
filter
.
filterCondition
);
boolean
joinOk
=
filter
.
isOk
(
filter
.
joinCondition
);
return
filterOk
&&
(
ignoreJoinCondition
||
joinOk
);
}
private
boolean
collectSearchRows
()
{
assert
!
isBatchFull
();
IndexCursor
c
=
filter
.
cursor
;
c
.
prepare
(
filter
.
session
,
filter
.
indexConditions
);
if
(
c
.
isAlwaysFalse
())
{
return
false
;
}
lookupBatch
.
addSearchRows
(
c
.
getStart
(),
c
.
getEnd
());
return
true
;
}
private
JoinRow
find
(
JoinRow
current
)
{
assert
current
!=
null
;
// lookupBatch is allowed to be empty when we have some null-rows and forced find call
List
<
Future
<
Cursor
>>
result
=
lookupBatch
.
find
();
// go backwards and assign futures
for
(
int
i
=
result
.
size
();
i
>
0
;)
{
assert
current
.
isRow
(
id
-
1
);
if
(
current
.
row
(
id
)
==
EMPTY_CURSOR
)
{
// outer join support - skip row with existing empty cursor
current
=
current
.
prev
;
continue
;
}
assert
current
.
row
(
id
)
==
null
;
Future
<
Cursor
>
future
=
result
.
get
(--
i
);
if
(
future
==
null
)
{
current
.
updateRow
(
id
,
EMPTY_CURSOR
,
JoinRow
.
S_NULL
,
JoinRow
.
S_CURSOR
);
}
else
{
current
.
updateRow
(
id
,
future
,
JoinRow
.
S_NULL
,
JoinRow
.
S_FUTURE
);
}
if
(
current
.
prev
==
null
||
i
==
0
)
{
break
;
}
current
=
current
.
prev
;
}
// handle empty cursors (because of outer joins) at the beginning
while
(
current
.
prev
!=
null
&&
current
.
prev
.
row
(
id
)
==
EMPTY_CURSOR
)
{
current
=
current
.
prev
;
}
assert
current
.
prev
==
null
||
current
.
prev
.
isRow
(
id
);
assert
current
.
row
(
id
)
!=
null
;
assert
!
current
.
isRow
(
id
);
// the last updated row
return
current
;
}
@Override
public
String
toString
()
{
return
"JoinFilter->"
+
filter
;
}
}
/**
* Linked row in batched join.
*/
private
static
final
class
JoinRow
{
private
static
final
long
S_NULL
=
0
;
private
static
final
long
S_FUTURE
=
1
;
private
static
final
long
S_CURSOR
=
2
;
private
static
final
long
S_ROW
=
3
;
private
static
final
long
S_MASK
=
3
;
/**
* May contain one of the following:
* <br/>- {@code null}: means that we need to get future cursor for this row
* <br/>- {@link Future}: means that we need to get a new {@link Cursor} from the {@link Future}
* <br/>- {@link Cursor}: means that we need to fetch {@link Row}s from the {@link Cursor}
* <br/>- {@link Row}: the {@link Row} is already fetched and is ready to be used
*/
Object
[]
row
;
long
state
;
JoinRow
prev
;
JoinRow
next
;
/**
* @param row Row.
*/
private
JoinRow
(
Object
[]
row
)
{
this
.
row
=
row
;
}
/**
* @param joinFilterId Join filter id.
* @return Row state.
*/
private
long
getState
(
int
joinFilterId
)
{
return
(
state
>>>
(
joinFilterId
<<
1
))
&
S_MASK
;
}
/**
* Allows to do a state transition in the following order:
* 0. Slot contains {@code null} ({@link #S_NULL}).
* 1. Slot contains {@link Future} ({@link #S_FUTURE}).
* 2. Slot contains {@link Cursor} ({@link #S_CURSOR}).
* 3. Slot contains {@link Row} ({@link #S_ROW}).
*
* @param joinFilterId {@link JoinRow} filter id.
* @param i Increment by this number of moves.
*/
private
void
incrementState
(
int
joinFilterId
,
long
i
)
{
assert
i
>
0
:
i
;
state
+=
i
<<
(
joinFilterId
<<
1
);
}
private
void
updateRow
(
int
joinFilterId
,
Object
x
,
long
oldState
,
long
newState
)
{
assert
getState
(
joinFilterId
)
==
oldState
:
"old state: "
+
getState
(
joinFilterId
);
row
[
joinFilterId
]
=
x
;
incrementState
(
joinFilterId
,
newState
-
oldState
);
assert
getState
(
joinFilterId
)
==
newState
:
"new state: "
+
getState
(
joinFilterId
);
}
private
Object
row
(
int
joinFilterId
)
{
return
row
[
joinFilterId
];
}
private
boolean
isRow
(
int
joinFilterId
)
{
return
getState
(
joinFilterId
)
==
S_ROW
;
}
private
boolean
isFuture
(
int
joinFilterId
)
{
return
getState
(
joinFilterId
)
==
S_FUTURE
;
}
private
boolean
isCursor
(
int
joinFilterId
)
{
return
getState
(
joinFilterId
)
==
S_CURSOR
;
}
private
boolean
isComplete
()
{
return
isRow
(
row
.
length
-
1
);
}
private
boolean
isDropped
()
{
return
row
==
null
;
}
private
void
drop
()
{
if
(
prev
!=
null
)
{
prev
.
next
=
next
;
}
if
(
next
!=
null
)
{
next
.
prev
=
prev
;
}
row
=
null
;
}
/**
* Copy this JoinRow behind itself in linked list of all in progress rows.
*
* @param jfId The last fetched filter id.
* @return The copy.
*/
private
JoinRow
copyBehind
(
int
jfId
)
{
assert
isCursor
(
jfId
);
assert
jfId
+
1
==
row
.
length
||
row
[
jfId
+
1
]
==
null
;
Object
[]
r
=
new
Object
[
row
.
length
];
if
(
jfId
!=
0
)
{
System
.
arraycopy
(
row
,
0
,
r
,
0
,
jfId
);
}
JoinRow
copy
=
new
JoinRow
(
r
);
copy
.
state
=
state
;
if
(
prev
!=
null
)
{
copy
.
prev
=
prev
;
prev
.
next
=
copy
;
}
prev
=
copy
;
copy
.
next
=
this
;
return
copy
;
}
@Override
public
String
toString
()
{
return
"JoinRow->"
+
Arrays
.
toString
(
row
);
}
}
/**
* Fake Lookup batch for indexes which do not support batching but have to participate
* in batched joins.
*/
private
static
class
FakeLookupBatch
implements
IndexLookupBatch
{
final
TableFilter
filter
;
SearchRow
first
;
SearchRow
last
;
boolean
full
;
final
List
<
Future
<
Cursor
>>
result
=
new
SingletonList
<
Future
<
Cursor
>>();
/**
* @param index Index.
*/
public
FakeLookupBatch
(
TableFilter
filter
)
{
this
.
filter
=
filter
;
}
@Override
public
void
addSearchRows
(
SearchRow
first
,
SearchRow
last
)
{
assert
!
full
;
this
.
first
=
first
;
this
.
last
=
last
;
full
=
true
;
}
@Override
public
boolean
isBatchFull
()
{
return
full
;
}
@Override
public
List
<
Future
<
Cursor
>>
find
()
{
if
(!
full
)
{
return
Collections
.
emptyList
();
}
Cursor
c
=
filter
.
getIndex
().
find
(
filter
,
first
,
last
);
result
.
set
(
0
,
new
DoneFuture
<
Cursor
>(
c
));
full
=
false
;
first
=
last
=
null
;
return
result
;
}
}
/**
* Simple singleton list.
*/
private
static
class
SingletonList
<
E
>
extends
AbstractList
<
E
>
{
private
E
element
;
@Override
public
E
get
(
int
index
)
{
assert
index
==
0
;
return
element
;
}
@Override
public
E
set
(
int
index
,
E
element
)
{
assert
index
==
0
;
this
.
element
=
element
;
return
null
;
}
@Override
public
int
size
()
{
return
1
;
}
}
}
h2/src/main/org/h2/util/DoneFuture.java
0 → 100644
浏览文件 @
4574944c
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package
org
.
h2
.
util
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.Future
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
/**
* Future which is already done.
*
* @param <T> Result value.
* @author Sergi Vladykin
*/
public
class
DoneFuture
<
T
>
implements
Future
<
T
>
{
final
T
x
;
public
DoneFuture
(
T
x
)
{
this
.
x
=
x
;
}
@Override
public
T
get
()
throws
InterruptedException
,
ExecutionException
{
return
x
;
}
@Override
public
T
get
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
return
x
;
}
@Override
public
boolean
isDone
()
{
return
true
;
}
@Override
public
boolean
cancel
(
boolean
mayInterruptIfRunning
)
{
return
false
;
}
@Override
public
boolean
isCancelled
()
{
return
false
;
}
@Override
public
String
toString
()
{
return
"DoneFuture->"
+
x
;
}
}
h2/src/test/org/h2/test/db/TestTableEngines.java
浏览文件 @
4574944c
...
...
@@ -16,16 +16,24 @@ import java.util.Collections;
import
java.util.Comparator
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Random
;
import
java.util.Set
;
import
java.util.TreeSet
;
import
java.util.concurrent.Callable
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.Future
;
import
java.util.concurrent.ThreadFactory
;
import
org.h2.api.TableEngine
;
import
org.h2.command.ddl.CreateTableData
;
import
org.h2.command.dml.OptimizerHints
;
import
org.h2.engine.Constants
;
import
org.h2.engine.Session
;
import
org.h2.expression.Expression
;
import
org.h2.index.BaseIndex
;
import
org.h2.index.Cursor
;
import
org.h2.index.Index
;
import
org.h2.index.IndexLookupBatch
;
import
org.h2.index.IndexType
;
import
org.h2.index.SingleRowCursor
;
import
org.h2.message.DbException
;
...
...
@@ -37,6 +45,7 @@ import org.h2.table.Table;
import
org.h2.table.TableBase
;
import
org.h2.table.TableFilter
;
import
org.h2.test.TestBase
;
import
org.h2.util.DoneFuture
;
import
org.h2.util.New
;
import
org.h2.value.Value
;
import
org.h2.value.ValueInt
;
...
...
@@ -65,6 +74,7 @@ public class TestTableEngines extends TestBase {
testEngineParams
();
testSimpleQuery
();
testMultiColumnTreeSetIndex
();
testBatchedJoin
();
}
private
void
testEarlyFilter
()
throws
SQLException
{
...
...
@@ -332,7 +342,181 @@ public class TestTableEngines extends TestBase {
deleteDb
(
"tableEngine"
);
}
private
void
testBatchedJoin
()
throws
SQLException
{
deleteDb
(
"tableEngine"
);
Connection
conn
=
getConnection
(
"tableEngine;OPTIMIZE_REUSE_RESULTS=0"
);
Statement
stat
=
conn
.
createStatement
();
TreeSetIndex
.
exec
=
Executors
.
newFixedThreadPool
(
8
,
new
ThreadFactory
()
{
@Override
public
Thread
newThread
(
Runnable
r
)
{
Thread
t
=
new
Thread
(
r
);
t
.
setDaemon
(
true
);
return
t
;
}
});
enableJoinReordering
(
false
);
try
{
doTestBatchedJoin
(
stat
,
1
,
0
,
0
);
doTestBatchedJoin
(
stat
,
0
,
1
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
1
);
doTestBatchedJoin
(
stat
,
0
,
2
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
2
);
doTestBatchedJoin
(
stat
,
0
,
0
,
3
);
doTestBatchedJoin
(
stat
,
0
,
0
,
4
);
doTestBatchedJoin
(
stat
,
0
,
0
,
5
);
doTestBatchedJoin
(
stat
,
0
,
3
,
1
);
doTestBatchedJoin
(
stat
,
0
,
3
,
3
);
doTestBatchedJoin
(
stat
,
0
,
3
,
7
);
doTestBatchedJoin
(
stat
,
0
,
4
,
1
);
doTestBatchedJoin
(
stat
,
0
,
4
,
6
);
doTestBatchedJoin
(
stat
,
0
,
4
,
20
);
doTestBatchedJoin
(
stat
,
0
,
10
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
10
);
doTestBatchedJoin
(
stat
,
0
,
20
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
20
);
doTestBatchedJoin
(
stat
,
0
,
20
,
20
);
doTestBatchedJoin
(
stat
,
3
,
7
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
5
);
doTestBatchedJoin
(
stat
,
0
,
8
,
1
);
doTestBatchedJoin
(
stat
,
0
,
2
,
1
);
}
finally
{
enableJoinReordering
(
true
);
TreeSetIndex
.
exec
.
shutdownNow
();
}
deleteDb
(
"tableEngine"
);
}
/**
* @param enable Enabled.
*/
private
void
enableJoinReordering
(
boolean
enable
)
{
OptimizerHints
hints
=
null
;
if
(!
enable
)
{
hints
=
new
OptimizerHints
();
hints
.
setJoinReorderEnabled
(
false
);
}
OptimizerHints
.
set
(
hints
);
}
private
void
doTestBatchedJoin
(
Statement
stat
,
int
...
batchSizes
)
throws
SQLException
{
ArrayList
<
TreeSetTable
>
tables
=
New
.
arrayList
(
batchSizes
.
length
);
for
(
int
i
=
0
;
i
<
batchSizes
.
length
;
i
++)
{
stat
.
executeUpdate
(
"DROP TABLE IF EXISTS T"
+
i
);
stat
.
executeUpdate
(
"CREATE TABLE T"
+
i
+
"(A INT, B INT) ENGINE \""
+
TreeSetIndexTableEngine
.
class
.
getName
()
+
"\""
);
tables
.
add
(
TreeSetIndexTableEngine
.
created
);
stat
.
executeUpdate
(
"CREATE INDEX IDX_B ON T"
+
i
+
"(B)"
);
stat
.
executeUpdate
(
"CREATE INDEX IDX_A ON T"
+
i
+
"(A)"
);
PreparedStatement
insert
=
stat
.
getConnection
().
prepareStatement
(
"INSERT INTO T"
+
i
+
" VALUES (?,?)"
);
for
(
int
j
=
i
,
size
=
i
+
10
;
j
<
size
;
j
++)
{
insert
.
setInt
(
1
,
j
);
insert
.
setInt
(
2
,
j
);
insert
.
executeUpdate
();
}
for
(
TreeSetTable
table
:
tables
)
{
assertEquals
(
10
,
table
.
getRowCount
(
null
));
}
}
int
[]
zeroBatchSizes
=
new
int
[
batchSizes
.
length
];
int
tests
=
1
<<
(
batchSizes
.
length
*
4
);
for
(
int
test
=
0
;
test
<
tests
;
test
++)
{
String
query
=
generateQuery
(
test
,
batchSizes
.
length
);
// System.out.println(Arrays.toString(batchSizes) + ": " + test + " -> " + query);
setBatchSize
(
tables
,
batchSizes
);
List
<
List
<
Object
>>
res1
=
query
(
stat
,
query
);
setBatchSize
(
tables
,
zeroBatchSizes
);
List
<
List
<
Object
>>
res2
=
query
(
stat
,
query
);
// System.out.println(res1 + " " + res2);
if
(!
res2
.
equals
(
res1
))
{
System
.
err
.
println
(
Arrays
.
toString
(
batchSizes
)
+
": "
+
res1
+
" "
+
res2
);
System
.
err
.
println
(
"Test "
+
test
);
System
.
err
.
println
(
query
);
for
(
TreeSetTable
table
:
tables
)
{
System
.
err
.
println
(
table
.
getName
()
+
" = "
+
query
(
stat
,
"select * from "
+
table
.
getName
()));
}
fail
();
}
}
}
private
static
void
setBatchSize
(
ArrayList
<
TreeSetTable
>
tables
,
int
...
batchSizes
)
{
for
(
int
i
=
0
;
i
<
batchSizes
.
length
;
i
++)
{
int
batchSize
=
batchSizes
[
i
];
for
(
Index
idx
:
tables
.
get
(
i
).
getIndexes
())
{
((
TreeSetIndex
)
idx
).
preferedBatchSize
=
batchSize
;
}
}
}
private
String
generateQuery
(
int
t
,
int
tables
)
{
final
int
withLeft
=
1
;
final
int
withFalse
=
2
;
final
int
withWhere
=
4
;
final
int
withOnIsNull
=
8
;
StringBuilder
b
=
new
StringBuilder
();
b
.
append
(
"select count(*) from "
);
StringBuilder
where
=
new
StringBuilder
();
for
(
int
i
=
0
;
i
<
tables
;
i
++)
{
if
(
i
!=
0
)
{
if
((
t
&
withLeft
)
!=
0
)
{
b
.
append
(
" left "
);
}
b
.
append
(
" join "
);
}
b
.
append
(
"\nT"
).
append
(
i
).
append
(
' '
);
if
(
i
!=
0
)
{
boolean
even
=
(
i
&
1
)
==
0
;
if
((
t
&
withOnIsNull
)
!=
0
)
{
b
.
append
(
" on T"
).
append
(
i
-
1
).
append
(
even
?
".B"
:
".A"
).
append
(
" is null"
);
}
else
if
((
t
&
withFalse
)
!=
0
)
{
b
.
append
(
" on false "
);
}
else
{
b
.
append
(
" on T"
).
append
(
i
-
1
).
append
(
even
?
".B = "
:
".A = "
);
b
.
append
(
"T"
).
append
(
i
).
append
(
even
?
".B "
:
".A "
);
}
}
if
((
t
&
withWhere
)
!=
0
)
{
if
(
where
.
length
()
!=
0
)
{
where
.
append
(
" and "
);
}
where
.
append
(
" T"
).
append
(
i
).
append
(
".A > 5"
);
}
t
>>>=
4
;
}
if
(
where
.
length
()
!=
0
)
{
b
.
append
(
"\nwhere "
).
append
(
where
);
}
return
b
.
toString
();
}
private
void
checkResultsNoOrder
(
Statement
stat
,
int
size
,
String
query1
,
String
query2
)
throws
SQLException
{
List
<
List
<
Object
>>
res1
=
query
(
stat
,
query1
);
...
...
@@ -710,9 +894,12 @@ public class TestTableEngines extends TestBase {
* A table engine that internally uses a tree set.
*/
public
static
class
TreeSetIndexTableEngine
implements
TableEngine
{
static
TreeSetTable
created
;
@Override
public
Table
createTable
(
CreateTableData
data
)
{
return
new
TreeSetTable
(
data
);
return
created
=
new
TreeSetTable
(
data
);
}
}
...
...
@@ -881,8 +1068,15 @@ public class TestTableEngines extends TestBase {
* An index that internally uses a tree set.
*/
private
static
class
TreeSetIndex
extends
BaseIndex
implements
Comparator
<
SearchRow
>
{
final
TreeSet
<
SearchRow
>
set
=
new
TreeSet
<
SearchRow
>(
this
);
/**
* Executor service to test batched joins.
*/
private
static
ExecutorService
exec
;
private
final
TreeSet
<
SearchRow
>
set
=
new
TreeSet
<
SearchRow
>(
this
);
private
int
preferedBatchSize
;
TreeSetIndex
(
Table
t
,
String
name
,
IndexColumn
[]
cols
,
IndexType
type
)
{
initBaseIndex
(
t
,
0
,
name
,
cols
,
type
);
}
...
...
@@ -890,12 +1084,73 @@ public class TestTableEngines extends TestBase {
@Override
public
int
compare
(
SearchRow
o1
,
SearchRow
o2
)
{
int
res
=
compareRows
(
o1
,
o2
);
if
(
res
==
0
&&
(
o1
.
getKey
()
==
Long
.
MAX_VALUE
||
o2
.
getKey
()
==
Long
.
MAX_VALUE
))
{
res
=
-
1
;
if
(
res
==
0
)
{
if
(
o1
.
getKey
()
==
Long
.
MAX_VALUE
||
o2
.
getKey
()
==
Long
.
MIN_VALUE
)
{
res
=
1
;
}
else
if
(
o1
.
getKey
()
==
Long
.
MIN_VALUE
||
o2
.
getKey
()
==
Long
.
MAX_VALUE
)
{
res
=
-
1
;
}
}
return
res
;
}
@Override
public
IndexLookupBatch
createLookupBatch
(
final
TableFilter
filter
)
{
final
int
preferedSize
=
preferedBatchSize
;
return
preferedSize
==
0
?
null
:
new
IndexLookupBatch
()
{
List
<
SearchRow
>
searchRows
=
New
.
arrayList
();
@Override
public
boolean
isBatchFull
()
{
return
searchRows
.
size
()
>=
preferedSize
*
2
;
}
@Override
public
List
<
Future
<
Cursor
>>
find
()
{
List
<
Future
<
Cursor
>>
res
=
findBatched
(
filter
,
searchRows
);
searchRows
.
clear
();
return
res
;
}
@Override
public
void
addSearchRows
(
SearchRow
first
,
SearchRow
last
)
{
assert
!
isBatchFull
();
searchRows
.
add
(
first
);
searchRows
.
add
(
last
);
}
};
}
public
List
<
Future
<
Cursor
>>
findBatched
(
final
TableFilter
filter
,
List
<
SearchRow
>
firstLastPairs
)
{
ArrayList
<
Future
<
Cursor
>>
result
=
New
.
arrayList
(
firstLastPairs
.
size
());
final
Random
rnd
=
new
Random
();
for
(
int
i
=
0
;
i
<
firstLastPairs
.
size
();
i
+=
2
)
{
final
SearchRow
first
=
firstLastPairs
.
get
(
i
);
final
SearchRow
last
=
firstLastPairs
.
get
(
i
+
1
);
Future
<
Cursor
>
future
;
if
(
rnd
.
nextBoolean
())
{
IteratorCursor
c
=
(
IteratorCursor
)
find
(
filter
,
first
,
last
);
if
(
c
.
it
.
hasNext
())
{
future
=
new
DoneFuture
<
Cursor
>(
c
);
}
else
{
// we can return null instead of future of empty cursor
future
=
null
;
}
}
else
{
future
=
exec
.
submit
(
new
Callable
<
Cursor
>()
{
@Override
public
Cursor
call
()
throws
Exception
{
if
(
rnd
.
nextInt
(
50
)
==
0
)
{
Thread
.
sleep
(
0
,
500
);
}
return
find
(
filter
,
first
,
last
);
}
});
}
result
.
add
(
future
);
}
return
result
;
}
@Override
public
void
close
(
Session
session
)
{
// No-op.
...
...
@@ -911,10 +1166,10 @@ public class TestTableEngines extends TestBase {
set
.
remove
(
row
);
}
private
static
SearchRow
mark
(
SearchRow
row
)
{
private
static
SearchRow
mark
(
SearchRow
row
,
boolean
first
)
{
if
(
row
!=
null
)
{
// Mark this row to be a search row.
row
.
setKey
(
Long
.
MAX_VALUE
);
row
.
setKey
(
first
?
Long
.
MIN_VALUE
:
Long
.
MAX_VALUE
);
}
return
row
;
}
...
...
@@ -926,19 +1181,23 @@ public class TestTableEngines extends TestBase {
subSet
=
Collections
.
emptySet
();
}
else
{
if
(
first
!=
null
)
{
first
=
set
.
floor
(
mark
(
first
));
first
=
set
.
floor
(
mark
(
first
,
true
));
}
if
(
last
!=
null
)
{
last
=
set
.
ceiling
(
mark
(
last
));
last
=
set
.
ceiling
(
mark
(
last
,
false
));
}
if
(
first
==
null
&&
last
==
null
)
{
subSet
=
set
;
}
else
if
(
first
!=
null
)
{
subSet
=
set
.
tailSet
(
first
,
true
);
if
(
last
!=
null
)
{
subSet
=
set
.
subSet
(
first
,
true
,
last
,
true
);
}
else
{
subSet
=
set
.
tailSet
(
first
,
true
);
}
}
else
if
(
last
!=
null
)
{
subSet
=
set
.
headSet
(
last
,
true
);
}
else
{
subSet
=
set
.
subSet
(
first
,
true
,
last
,
true
);
throw
new
IllegalStateException
(
);
}
}
return
new
IteratorCursor
(
subSet
.
iterator
());
...
...
@@ -1031,6 +1290,11 @@ public class TestTableEngines extends TestBase {
public
Row
get
()
{
return
current
;
}
@Override
public
String
toString
()
{
return
"IterCursor->"
+
current
;
}
}
/**
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论