Skip to content
项目
群组
代码片段
帮助
正在加载...
帮助
为 GitLab 提交贡献
登录/注册
切换导航
H
h2database
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分枝图
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
计划
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
分枝图
统计图
创建新议题
作业
提交
议题看板
打开侧边栏
Administrator
h2database
Commits
8a96db5a
提交
8a96db5a
authored
9 年前
作者:
S.Vladykin
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Batching support for index lookups and batched joins.
上级
b2a70c1c
显示空白字符变更
内嵌
并排
正在显示
9 个修改的文件
包含
920 行增加
和
32 行删除
+920
-32
CommandContainer.java
h2/src/main/org/h2/command/CommandContainer.java
+1
-1
BaseIndex.java
h2/src/main/org/h2/index/BaseIndex.java
+14
-0
Index.java
h2/src/main/org/h2/index/Index.java
+28
-1
IndexCursor.java
h2/src/main/org/h2/index/IndexCursor.java
+33
-5
MultiVersionIndex.java
h2/src/main/org/h2/index/MultiVersionIndex.java
+13
-1
Table.java
h2/src/main/org/h2/table/Table.java
+10
-8
TableFilter.java
h2/src/main/org/h2/table/TableFilter.java
+511
-4
DoneFuture.java
h2/src/main/org/h2/util/DoneFuture.java
+55
-0
TestTableEngines.java
h2/src/test/org/h2/test/db/TestTableEngines.java
+255
-12
没有找到文件。
h2/src/main/org/h2/command/CommandContainer.java
浏览文件 @
8a96db5a
...
...
@@ -17,7 +17,7 @@ import org.h2.value.ValueNull;
* Represents a single SQL statements.
* It wraps a prepared statement.
*/
class
CommandContainer
extends
Command
{
public
class
CommandContainer
extends
Command
{
private
Prepared
prepared
;
private
boolean
readOnlyKnown
;
...
...
This diff is collapsed.
Click to expand it.
h2/src/main/org/h2/index/BaseIndex.java
浏览文件 @
8a96db5a
...
...
@@ -5,6 +5,10 @@
*/
package
org
.
h2
.
index
;
import
java.util.Collection
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.concurrent.Future
;
import
org.h2.api.ErrorCode
;
import
org.h2.engine.Constants
;
import
org.h2.engine.DbObject
;
...
...
@@ -424,4 +428,14 @@ public abstract class BaseIndex extends SchemaObjectBase implements Index {
// ignore
}
@Override
public
int
getPreferedLookupBatchSize
()
{
// No batched lookups supported by default.
return
0
;
}
@Override
public
List
<
Future
<
Cursor
>>
findBatched
(
TableFilter
filter
,
List
<
SearchRow
>
firstLastPairs
)
{
throw
DbException
.
throwInternalError
(
"Must not be called if getPreferedLookupBatchSize() is 0."
);
}
}
This diff is collapsed.
Click to expand it.
h2/src/main/org/h2/index/Index.java
浏览文件 @
8a96db5a
...
...
@@ -5,6 +5,9 @@
*/
package
org
.
h2
.
index
;
import
java.util.Collection
;
import
java.util.List
;
import
java.util.concurrent.Future
;
import
org.h2.engine.Session
;
import
org.h2.result.Row
;
import
org.h2.result.SearchRow
;
...
...
@@ -256,4 +259,28 @@ public interface Index extends SchemaObject {
*/
void
setSortedInsertMode
(
boolean
sortedInsertMode
);
/**
* If this index can do batched lookups, it may return it's preferred batch size,
* otherwise it must return 0.
*
* @return preferred batch size or 0 if lookup batching is not supported
* @see #findBatched(TableFilter, Collection)
*/
int
getPreferedLookupBatchSize
();
/**
* Do batched lookup over the given collection of {@link SearchRow} pairs as in
* {@link #find(TableFilter, SearchRow, SearchRow)}.
* <br/><br/>
* Correct implementation must always return number of future cursors equal to
* {@code firstLastPairs.size() / 2}. Instead of {@link Future} containing empty
* {@link Cursor} it is possible to put {@code null} in result list.
*
* @param filter the table filter
* @param firstLastPairs List of batched search row pairs as in
* {@link #find(TableFilter, SearchRow, SearchRow)}, the collection will be reused by H2,
* thus it makes sense to defensively copy contents if needed.
* @return batched cursors for respective search row pairs in the same order
*/
List
<
Future
<
Cursor
>>
findBatched
(
TableFilter
filter
,
List
<
SearchRow
>
firstLastPairs
);
}
This diff is collapsed.
Click to expand it.
h2/src/main/org/h2/index/IndexCursor.java
浏览文件 @
8a96db5a
...
...
@@ -68,12 +68,12 @@ public class IndexCursor implements Cursor {
}
/**
*
Re-evaluate the start and end values of the index search for rows
.
*
Prepare this index cursor to make a lookup in index
.
*
* @param s
the session
* @param indexConditions
the index conditions
* @param s
Session.
* @param indexConditions
Index conditions.
*/
public
void
find
(
Session
s
,
ArrayList
<
IndexCondition
>
indexConditions
)
{
public
void
prepare
(
Session
s
,
ArrayList
<
IndexCondition
>
indexConditions
)
{
this
.
session
=
s
;
alwaysFalse
=
false
;
start
=
end
=
null
;
...
...
@@ -148,6 +148,16 @@ public class IndexCursor implements Cursor {
}
}
}
}
/**
* Re-evaluate the start and end values of the index search for rows.
*
* @param s the session
* @param indexConditions the index conditions
*/
public
void
find
(
Session
s
,
ArrayList
<
IndexCondition
>
indexConditions
)
{
prepare
(
s
,
indexConditions
);
if
(
inColumn
!=
null
)
{
return
;
}
...
...
@@ -252,6 +262,24 @@ public class IndexCursor implements Cursor {
return
alwaysFalse
;
}
/**
* Get start search row.
*
* @return search row
*/
public
SearchRow
getStart
()
{
return
start
;
}
/**
* Get end search row.
*
* @return search row
*/
public
SearchRow
getEnd
()
{
return
end
;
}
@Override
public
Row
get
()
{
if
(
cursor
==
null
)
{
...
...
This diff is collapsed.
Click to expand it.
h2/src/main/org/h2/index/MultiVersionIndex.java
浏览文件 @
8a96db5a
...
...
@@ -6,7 +6,10 @@
package
org
.
h2
.
index
;
import
java.util.ArrayList
;
import
java.util.Collection
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.concurrent.Future
;
import
org.h2.api.ErrorCode
;
import
org.h2.engine.Database
;
import
org.h2.engine.DbObject
;
...
...
@@ -387,4 +390,13 @@ public class MultiVersionIndex implements Index {
delta
.
setSortedInsertMode
(
sortedInsertMode
);
}
@Override
public
int
getPreferedLookupBatchSize
()
{
return
0
;
}
@Override
public
List
<
Future
<
Cursor
>>
findBatched
(
TableFilter
filter
,
List
<
SearchRow
>
firstLastPairs
)
{
throw
DbException
.
throwInternalError
(
"Must never be called."
);
}
}
This diff is collapsed.
Click to expand it.
h2/src/main/org/h2/table/Table.java
浏览文件 @
8a96db5a
...
...
@@ -6,6 +6,7 @@
package
org
.
h2
.
table
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.HashMap
;
import
java.util.HashSet
;
import
java.util.Set
;
...
...
@@ -104,7 +105,7 @@ public abstract class Table extends SchemaObjectBase {
private
ArrayList
<
TableView
>
views
;
private
boolean
checkForeignKeyConstraints
=
true
;
private
boolean
onCommitDrop
,
onCommitTruncate
;
private
Row
nullRow
;
private
volatile
Row
nullRow
;
public
Table
(
Schema
schema
,
int
id
,
String
name
,
boolean
persistIndexes
,
boolean
persistData
)
{
...
...
@@ -616,14 +617,15 @@ public abstract class Table extends SchemaObjectBase {
return
new
SimpleRow
(
new
Value
[
columns
.
length
]);
}
synchronized
Row
getNullRow
()
{
if
(
nullRow
==
null
)
{
nullRow
=
new
Row
(
new
Value
[
columns
.
length
],
1
);
for
(
int
i
=
0
;
i
<
columns
.
length
;
i
++)
{
nullRow
.
setValue
(
i
,
ValueNull
.
INSTANCE
);
}
Row
getNullRow
()
{
Row
row
=
nullRow
;
if
(
row
==
null
)
{
// Here can be concurrently produced more than one row, but it must be ok.
Value
[]
values
=
new
Value
[
columns
.
length
];
Arrays
.
fill
(
values
,
ValueNull
.
INSTANCE
);
nullRow
=
row
=
new
Row
(
values
,
1
);
}
return
nullR
ow
;
return
r
ow
;
}
public
Column
[]
getColumns
()
{
...
...
This diff is collapsed.
Click to expand it.
h2/src/main/org/h2/table/TableFilter.java
浏览文件 @
8a96db5a
...
...
@@ -6,6 +6,11 @@
package
org
.
h2
.
table
;
import
java.util.ArrayList
;
import
java.util.Arrays
;
import
java.util.Collections
;
import
java.util.List
;
import
java.util.ListIterator
;
import
java.util.concurrent.Future
;
import
org.h2.command.Parser
;
import
org.h2.command.dml.Select
;
import
org.h2.engine.Right
;
...
...
@@ -16,6 +21,7 @@ import org.h2.expression.Comparison;
import
org.h2.expression.ConditionAndOr
;
import
org.h2.expression.Expression
;
import
org.h2.expression.ExpressionColumn
;
import
org.h2.index.Cursor
;
import
org.h2.index.Index
;
import
org.h2.index.IndexCondition
;
import
org.h2.index.IndexCursor
;
...
...
@@ -23,6 +29,7 @@ import org.h2.message.DbException;
import
org.h2.result.Row
;
import
org.h2.result.SearchRow
;
import
org.h2.result.SortOrder
;
import
org.h2.util.DoneFuture
;
import
org.h2.util.New
;
import
org.h2.util.StatementBuilder
;
import
org.h2.util.StringUtils
;
...
...
@@ -40,6 +47,33 @@ public class TableFilter implements ColumnResolver {
private
static
final
int
BEFORE_FIRST
=
0
,
FOUND
=
1
,
AFTER_LAST
=
2
,
NULL_ROW
=
3
;
private
static
final
Cursor
EMPTY_CURSOR
=
new
Cursor
()
{
@Override
public
boolean
previous
()
{
return
false
;
}
@Override
public
boolean
next
()
{
return
false
;
}
@Override
public
SearchRow
getSearchRow
()
{
return
null
;
}
@Override
public
Row
get
()
{
return
null
;
}
@Override
public
String
toString
()
{
return
"EMPTY_CURSOR"
;
}
};
/**
* Whether this is a direct or indirect (nested) outer join
*/
...
...
@@ -54,6 +88,12 @@ public class TableFilter implements ColumnResolver {
private
int
scanCount
;
private
boolean
evaluatable
;
/**
* Batched join support.
*/
private
JoinBatch
jbatch
;
private
JoinFilter
jfilter
;
/**
* Indicates that this filter is used in the plan.
*/
...
...
@@ -291,16 +331,32 @@ public class TableFilter implements ColumnResolver {
* Start the query. This will reset the scan counts.
*
* @param s the session
* @return join batch if query runs over index which supports batched lookups, null otherwise
*/
public
void
startQuery
(
Session
s
)
{
public
JoinBatch
startQuery
(
Session
s
)
{
jbatch
=
null
;
jfilter
=
null
;
this
.
session
=
s
;
scanCount
=
0
;
if
(
nestedJoin
!=
null
)
{
nestedJoin
.
startQuery
(
s
);
}
JoinBatch
batch
=
null
;
if
(
join
!=
null
)
{
join
.
startQuery
(
s
);
batch
=
join
.
startQuery
(
s
);
}
if
(
batch
==
null
&&
index
.
getPreferedLookupBatchSize
()
!=
0
&&
select
!=
null
&&
select
.
getTopTableFilter
()
!=
this
)
{
batch
=
new
JoinBatch
(
join
);
}
if
(
batch
!=
null
)
{
if
(
nestedJoin
!=
null
)
{
throw
DbException
.
getUnsupportedException
(
"nested join with batched index"
);
}
jbatch
=
batch
;
jfilter
=
batch
.
register
(
this
);
}
return
batch
;
}
/**
...
...
@@ -323,6 +379,10 @@ public class TableFilter implements ColumnResolver {
* @return true if there are
*/
public
boolean
next
()
{
if
(
jbatch
!=
null
)
{
// will happen only on topTableFilter since jbatch.next does not call join.next()
return
jbatch
.
next
();
}
if
(
state
==
AFTER_LAST
)
{
return
false
;
}
else
if
(
state
==
BEFORE_FIRST
)
{
...
...
@@ -882,6 +942,9 @@ public class TableFilter implements ColumnResolver {
@Override
public
Value
getValue
(
Column
column
)
{
if
(
jbatch
!=
null
)
{
return
jbatch
.
getValue
(
jfilter
,
column
);
}
if
(
currentSearchRow
==
null
)
{
return
null
;
}
...
...
@@ -1018,6 +1081,10 @@ public class TableFilter implements ColumnResolver {
return
session
;
}
private
static
boolean
isRow
(
Object
x
)
{
return
x
instanceof
Row
;
}
/**
* A visitor for table filters.
*/
...
...
@@ -1031,4 +1098,444 @@ public class TableFilter implements ColumnResolver {
void
accept
(
TableFilter
f
);
}
/**
* Support for asynchronous batched index lookups on joins.
*
* @see Index#findBatched(TableFilter, java.util.Collection)
* @see Index#getPreferedLookupBatchSize()
*
* @author Sergi Vladykin
*/
private
static
final
class
JoinBatch
{
int
filtersCount
;
JoinFilter
[]
filters
;
JoinFilter
top
;
boolean
started
;
JoinRow
current
;
boolean
found
;
/**
* This filter joined after this batched join and can be used normally.
*/
final
TableFilter
furtherFilter
;
/**
* @param furtherFilter table filter after this batched join.
*/
private
JoinBatch
(
TableFilter
furtherFilter
)
{
this
.
furtherFilter
=
furtherFilter
;
}
/**
* @param filter table filter
*/
private
JoinFilter
register
(
TableFilter
filter
)
{
assert
filter
!=
null
;
filtersCount
++;
return
top
=
new
JoinFilter
(
filter
,
top
);
}
/**
* @param filterId table filter id
* @param column column
* @return column value for current row
*/
private
Value
getValue
(
JoinFilter
filter
,
Column
column
)
{
Object
x
=
current
.
row
[
filter
.
id
];
assert
x
!=
null
;
Row
row
=
isRow
(
x
)
?
(
Row
)
x
:
((
Cursor
)
x
).
get
();
Value
value
=
row
.
getValue
(
column
.
getColumnId
());
if
(
value
==
null
)
{
throw
DbException
.
throwInternalError
(
"value is null: "
+
column
+
" "
+
row
);
}
return
value
;
}
private
void
start
()
{
// fill filters
filters
=
new
JoinFilter
[
filtersCount
];
JoinFilter
jf
=
top
;
for
(
int
i
=
0
;
i
<
filtersCount
;
i
++)
{
filters
[
jf
.
id
=
i
]
=
jf
;
jf
=
jf
.
join
;
}
// initialize current row
current
=
new
JoinRow
(
new
Object
[
filtersCount
]);
current
.
row
[
top
.
id
]
=
top
.
filter
.
cursor
;
// initialize top cursor
top
.
filter
.
cursor
.
find
(
top
.
filter
.
session
,
top
.
filter
.
indexConditions
);
// we need fake first row because batchedNext always will move to the next row
JoinRow
fake
=
new
JoinRow
(
null
);
fake
.
next
=
current
;
current
=
fake
;
}
private
boolean
next
()
{
if
(!
started
)
{
start
();
started
=
true
;
}
if
(
furtherFilter
==
null
)
{
if
(
batchedNext
())
{
assert
current
.
isComplete
();
return
true
;
}
return
false
;
}
for
(;;)
{
if
(!
found
)
{
if
(!
batchedNext
())
{
return
false
;
}
assert
current
.
isComplete
();
found
=
true
;
furtherFilter
.
reset
();
}
// we call furtherFilter in usual way outside of this batch because it is more effective
if
(
furtherFilter
.
next
())
{
return
true
;
}
found
=
false
;
}
}
private
Cursor
get
(
Future
<
Cursor
>
f
)
{
try
{
return
f
.
get
();
}
catch
(
Exception
e
)
{
throw
DbException
.
convert
(
e
);
}
}
private
boolean
batchedNext
()
{
if
(
current
==
null
)
{
// after last
return
false
;
}
// go next
current
=
current
.
next
;
if
(
current
==
null
)
{
return
false
;
}
current
.
prev
=
null
;
final
int
lastJfId
=
filtersCount
-
1
;
int
jfId
=
lastJfId
;
while
(
current
.
row
[
jfId
]
==
null
)
{
// lookup for the first non fetched filter for the current row
jfId
--;
}
for
(;;)
{
fetchCurrent
(
jfId
);
if
(!
current
.
isDropped
())
{
// if current was not dropped then it must be fetched successfully
if
(
jfId
==
lastJfId
)
{
// the whole join row is ready to be returned
return
true
;
}
JoinFilter
join
=
filters
[
jfId
+
1
];
if
(
join
.
isBatchFull
())
{
// get future cursors for join and go right to fetch them
current
=
join
.
find
(
current
);
}
if
(
current
.
row
[
join
.
id
]
!=
null
)
{
// either find called or outer join with null row
jfId
=
join
.
id
;
continue
;
}
}
// we have to go down and fetch next cursors for jfId if it is possible
if
(
current
.
next
==
null
)
{
// either dropped or null-row
if
(
current
.
isDropped
())
{
current
=
current
.
prev
;
if
(
current
==
null
)
{
return
false
;
}
}
assert
!
current
.
isDropped
();
assert
jfId
!=
lastJfId
;
jfId
=
0
;
while
(
current
.
row
[
jfId
]
!=
null
)
{
jfId
++;
}
// force find on half filled batch (there must be either searchRows
// or Cursor.EMPTY set for null-rows)
current
=
filters
[
jfId
].
find
(
current
);
}
else
{
// here we don't care if the current was dropped
current
=
current
.
next
;
assert
!
isRow
(
current
.
row
[
jfId
]);
while
(
current
.
row
[
jfId
]
==
null
)
{
assert
jfId
!=
top
.
id
;
// need to go left and fetch more search rows
jfId
--;
assert
!
isRow
(
current
.
row
[
jfId
]);
}
}
}
}
@SuppressWarnings
(
"unchecked"
)
private
void
fetchCurrent
(
final
int
jfId
)
{
assert
current
.
prev
==
null
||
isRow
(
current
.
prev
.
row
[
jfId
])
:
"prev must be already fetched"
;
assert
jfId
==
0
||
isRow
(
current
.
row
[
jfId
-
1
])
:
"left must be already fetched"
;
Object
x
=
current
.
row
[
jfId
];
assert
x
!=
null
:
"x null"
;
assert
!
isRow
(
x
)
:
"double fetching"
;
final
JoinFilter
jf
=
filters
[
jfId
];
// in case of outer join we don't have any future around empty cursor
boolean
newCursor
=
x
==
EMPTY_CURSOR
;
if
(!
newCursor
&&
x
instanceof
Future
)
{
// get cursor from a future
current
.
row
[
jfId
]
=
x
=
get
((
Future
<
Cursor
>)
x
);
newCursor
=
true
;
}
Cursor
c
=
(
Cursor
)
x
;
assert
c
!=
null
;
JoinFilter
join
=
jf
.
join
;
for
(;;)
{
if
(
c
==
null
||
!
c
.
next
())
{
if
(
newCursor
&&
jf
.
isOuterJoin
())
{
// replace cursor with null-row
current
.
row
[
jfId
]
=
jf
.
getNullRow
();
c
=
null
;
newCursor
=
false
;
}
else
{
// cursor is done, drop it
current
.
drop
();
return
;
}
}
if
(!
jf
.
isOk
(
c
==
null
))
{
// try another row from the cursor
continue
;
}
boolean
joinEmpty
=
false
;
if
(
join
!=
null
&&
!
join
.
collectSearchRows
())
{
if
(
join
.
isOuterJoin
())
{
joinEmpty
=
true
;
}
else
{
// join will fail, try next row in the cursor
continue
;
}
}
if
(
c
!=
null
)
{
current
=
current
.
copyBehind
(
jf
.
id
);
// get current row from cursor
current
.
row
[
jfId
]
=
c
.
get
();
}
if
(
joinEmpty
)
{
current
.
row
[
join
.
id
]
=
EMPTY_CURSOR
;
}
return
;
}
}
@Override
public
String
toString
()
{
return
"JoinBatch->\nprev->"
+
(
current
==
null
?
null
:
current
.
prev
)
+
"\ncurr->"
+
current
+
"\nnext->"
+
(
current
==
null
?
null
:
current
.
next
);
}
}
/**
* Table filter participating in batched join.
*/
private
static
final
class
JoinFilter
{
final
TableFilter
filter
;
final
int
batchSize
;
final
JoinFilter
join
;
int
id
;
/**
* Search rows batch.
*/
final
ArrayList
<
SearchRow
>
searchRows
;
private
JoinFilter
(
TableFilter
filter
,
JoinFilter
join
)
{
this
.
filter
=
filter
;
this
.
join
=
join
;
batchSize
=
filter
.
getIndex
().
getPreferedLookupBatchSize
();
if
(
batchSize
<
0
)
{
throw
DbException
.
throwInternalError
(
"Index with negative preferred batch size."
);
}
searchRows
=
New
.
arrayList
(
batchSize
==
0
?
2
:
Math
.
min
(
batchSize
*
2
,
32
));
}
public
Row
getNullRow
()
{
return
filter
.
table
.
getNullRow
();
}
private
boolean
isOuterJoin
()
{
return
filter
.
joinOuter
;
}
private
boolean
isBatchFull
()
{
if
(
batchSize
==
0
)
{
return
searchRows
.
size
()
>=
2
;
}
return
searchRows
.
size
()
>=
batchSize
*
2
;
}
private
boolean
isOk
(
boolean
ignireJoinCondition
)
{
boolean
filterOk
=
filter
.
isOk
(
filter
.
filterCondition
);
boolean
joinOk
=
filter
.
isOk
(
filter
.
joinCondition
);
return
filterOk
&&
(
ignireJoinCondition
||
joinOk
);
}
private
boolean
collectSearchRows
()
{
assert
!
isBatchFull
();
filter
.
cursor
.
prepare
(
filter
.
session
,
filter
.
indexConditions
);
if
(
filter
.
cursor
.
isAlwaysFalse
())
{
return
false
;
}
searchRows
.
add
(
filter
.
cursor
.
getStart
());
searchRows
.
add
(
filter
.
cursor
.
getEnd
());
return
true
;
}
private
JoinRow
find
(
JoinRow
current
)
{
assert
current
!=
null
;
assert
(
searchRows
.
size
()
&
1
)
==
0
:
"searchRows & 1, "
+
searchRows
.
size
();
// searchRows are allowed to be empty when we have some null-rows and forced find call
if
(!
searchRows
.
isEmpty
())
{
assert
searchRows
.
size
()
>=
2
:
"searchRows >= 2, "
+
searchRows
.
size
();
List
<
Future
<
Cursor
>>
result
;
if
(
batchSize
==
0
)
{
assert
searchRows
.
size
()
==
2
:
"2"
;
Cursor
c
=
filter
.
index
.
find
(
filter
,
searchRows
.
get
(
0
),
searchRows
.
get
(
1
));
result
=
Collections
.<
Future
<
Cursor
>>
singletonList
(
new
DoneFuture
<
Cursor
>(
c
));
}
else
{
result
=
filter
.
index
.
findBatched
(
filter
,
searchRows
);
if
(
result
==
null
)
{
throw
DbException
.
throwInternalError
(
"Index.findBatched returned null"
);
}
}
if
(
result
.
size
()
!=
searchRows
.
size
()
>>>
1
)
{
throw
DbException
.
throwInternalError
(
"wrong number of futures"
);
}
searchRows
.
clear
();
// go backwards and assign futures
ListIterator
<
Future
<
Cursor
>>
iter
=
result
.
listIterator
(
result
.
size
());
for
(;;)
{
assert
isRow
(
current
.
row
[
id
-
1
]);
if
(
current
.
row
[
id
]
==
EMPTY_CURSOR
)
{
// outer join support - skip row with existing empty cursor
current
=
current
.
prev
;
continue
;
}
assert
current
.
row
[
id
]
==
null
;
Future
<
Cursor
>
future
=
iter
.
previous
();
current
.
row
[
id
]
=
future
==
null
?
EMPTY_CURSOR
:
future
;
if
(
current
.
prev
==
null
||
!
iter
.
hasPrevious
())
{
break
;
}
current
=
current
.
prev
;
}
}
// handle empty cursors (because of outer joins) at the beginning
while
(
current
.
prev
!=
null
&&
current
.
prev
.
row
[
id
]
==
EMPTY_CURSOR
)
{
current
=
current
.
prev
;
}
assert
current
.
prev
==
null
||
isRow
(
current
.
prev
.
row
[
id
]);
assert
current
.
row
[
id
]
!=
null
;
assert
!
isRow
(
current
.
row
[
id
]);
// the last updated row
return
current
;
}
@Override
public
String
toString
()
{
return
"JoinFilter->"
+
filter
;
}
}
/**
* Linked row in batched join.
*/
private
static
final
class
JoinRow
{
/**
* May contain one of the following:
* <br/>- {@code null}: means that we need to get future cursor for this row
* <br/>- {@link Future}: means that we need to get a new {@link Cursor} from the {@link Future}
* <br/>- {@link Cursor}: means that we need to fetch {@link Row}s from the {@link Cursor}
* <br/>- {@link Row}: the {@link Row} is already fetched and is ready to be used
*/
Object
[]
row
;
JoinRow
prev
;
JoinRow
next
;
/**
* @param row Row.
*/
private
JoinRow
(
Object
[]
row
)
{
this
.
row
=
row
;
}
private
boolean
isComplete
()
{
return
isRow
(
row
[
row
.
length
-
1
]);
}
private
boolean
isDropped
()
{
return
row
==
null
;
}
private
void
drop
()
{
if
(
prev
!=
null
)
{
prev
.
next
=
next
;
}
if
(
next
!=
null
)
{
next
.
prev
=
prev
;
}
row
=
null
;
}
private
JoinRow
copyBehind
(
int
jfId
)
{
assert
row
[
jfId
]
instanceof
Cursor
;
assert
jfId
+
1
==
row
.
length
||
row
[
jfId
+
1
]
==
null
;
Object
[]
r
=
new
Object
[
row
.
length
];
if
(
jfId
!=
0
)
{
System
.
arraycopy
(
row
,
0
,
r
,
0
,
jfId
);
}
JoinRow
copy
=
new
JoinRow
(
r
);
if
(
prev
!=
null
)
{
copy
.
prev
=
prev
;
prev
.
next
=
copy
;
}
prev
=
copy
;
copy
.
next
=
this
;
return
copy
;
}
@Override
public
String
toString
()
{
return
"JoinRow->"
+
Arrays
.
toString
(
row
);
}
}
}
This diff is collapsed.
Click to expand it.
h2/src/main/org/h2/util/DoneFuture.java
0 → 100644
浏览文件 @
8a96db5a
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package
org
.
h2
.
util
;
import
java.util.concurrent.ExecutionException
;
import
java.util.concurrent.Future
;
import
java.util.concurrent.TimeUnit
;
import
java.util.concurrent.TimeoutException
;
/**
* Future which is already done.
*
* @param <T> Result value.
* @author Sergi Vladykin
*/
public
class
DoneFuture
<
T
>
implements
Future
<
T
>
{
final
T
x
;
public
DoneFuture
(
T
x
)
{
this
.
x
=
x
;
}
@Override
public
T
get
()
throws
InterruptedException
,
ExecutionException
{
return
x
;
}
@Override
public
T
get
(
long
timeout
,
TimeUnit
unit
)
throws
InterruptedException
,
ExecutionException
,
TimeoutException
{
return
x
;
}
@Override
public
boolean
isDone
()
{
return
true
;
}
@Override
public
boolean
cancel
(
boolean
mayInterruptIfRunning
)
{
return
false
;
}
@Override
public
boolean
isCancelled
()
{
return
false
;
}
@Override
public
String
toString
()
{
return
"DoneFuture->"
+
x
;
}
}
This diff is collapsed.
Click to expand it.
h2/src/test/org/h2/test/db/TestTableEngines.java
浏览文件 @
8a96db5a
...
...
@@ -16,10 +16,17 @@ import java.util.Collections;
import
java.util.Comparator
;
import
java.util.Iterator
;
import
java.util.List
;
import
java.util.Random
;
import
java.util.Set
;
import
java.util.TreeSet
;
import
java.util.concurrent.Callable
;
import
java.util.concurrent.ExecutorService
;
import
java.util.concurrent.Executors
;
import
java.util.concurrent.Future
;
import
java.util.concurrent.ThreadFactory
;
import
org.h2.api.TableEngine
;
import
org.h2.command.ddl.CreateTableData
;
import
org.h2.command.dml.OptimizerHints
;
import
org.h2.engine.Constants
;
import
org.h2.engine.Session
;
import
org.h2.expression.Expression
;
...
...
@@ -37,6 +44,7 @@ import org.h2.table.Table;
import
org.h2.table.TableBase
;
import
org.h2.table.TableFilter
;
import
org.h2.test.TestBase
;
import
org.h2.util.DoneFuture
;
import
org.h2.util.New
;
import
org.h2.value.Value
;
import
org.h2.value.ValueInt
;
...
...
@@ -65,6 +73,7 @@ public class TestTableEngines extends TestBase {
testEngineParams
();
testSimpleQuery
();
testMultiColumnTreeSetIndex
();
testBatchedJoin
();
}
private
void
testEarlyFilter
()
throws
SQLException
{
...
...
@@ -333,6 +342,180 @@ public class TestTableEngines extends TestBase {
deleteDb
(
"tableEngine"
);
}
private
void
testBatchedJoin
()
throws
SQLException
{
deleteDb
(
"tableEngine"
);
Connection
conn
=
getConnection
(
"tableEngine;OPTIMIZE_REUSE_RESULTS=0"
);
Statement
stat
=
conn
.
createStatement
();
TreeSetIndex
.
exec
=
Executors
.
newFixedThreadPool
(
8
,
new
ThreadFactory
()
{
@Override
public
Thread
newThread
(
Runnable
r
)
{
Thread
t
=
new
Thread
(
r
);
t
.
setDaemon
(
true
);
return
t
;
}
});
enableJoinReordering
(
false
);
try
{
doTestBatchedJoin
(
stat
,
1
,
0
,
0
);
doTestBatchedJoin
(
stat
,
0
,
1
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
1
);
doTestBatchedJoin
(
stat
,
0
,
2
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
2
);
doTestBatchedJoin
(
stat
,
0
,
0
,
3
);
doTestBatchedJoin
(
stat
,
0
,
0
,
4
);
doTestBatchedJoin
(
stat
,
0
,
0
,
5
);
doTestBatchedJoin
(
stat
,
0
,
3
,
1
);
doTestBatchedJoin
(
stat
,
0
,
3
,
3
);
doTestBatchedJoin
(
stat
,
0
,
3
,
7
);
doTestBatchedJoin
(
stat
,
0
,
4
,
1
);
doTestBatchedJoin
(
stat
,
0
,
4
,
6
);
doTestBatchedJoin
(
stat
,
0
,
4
,
20
);
doTestBatchedJoin
(
stat
,
0
,
10
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
10
);
doTestBatchedJoin
(
stat
,
0
,
20
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
20
);
doTestBatchedJoin
(
stat
,
0
,
20
,
20
);
doTestBatchedJoin
(
stat
,
3
,
7
,
0
);
doTestBatchedJoin
(
stat
,
0
,
0
,
5
);
doTestBatchedJoin
(
stat
,
0
,
8
,
1
);
doTestBatchedJoin
(
stat
,
0
,
2
,
1
);
}
finally
{
enableJoinReordering
(
true
);
TreeSetIndex
.
exec
.
shutdownNow
();
}
deleteDb
(
"tableEngine"
);
}
/**
* @param enable Enabled.
*/
private
void
enableJoinReordering
(
boolean
enable
)
{
OptimizerHints
hints
=
null
;
if
(!
enable
)
{
hints
=
new
OptimizerHints
();
hints
.
setJoinReorderEnabled
(
false
);
}
OptimizerHints
.
set
(
hints
);
}
private
void
doTestBatchedJoin
(
Statement
stat
,
int
...
batchSizes
)
throws
SQLException
{
ArrayList
<
TreeSetTable
>
tables
=
New
.
arrayList
(
batchSizes
.
length
);
for
(
int
i
=
0
;
i
<
batchSizes
.
length
;
i
++)
{
stat
.
executeUpdate
(
"DROP TABLE IF EXISTS T"
+
i
);
stat
.
executeUpdate
(
"CREATE TABLE T"
+
i
+
"(A INT, B INT) ENGINE \""
+
TreeSetIndexTableEngine
.
class
.
getName
()
+
"\""
);
tables
.
add
(
TreeSetIndexTableEngine
.
created
);
stat
.
executeUpdate
(
"CREATE INDEX IDX_B ON T"
+
i
+
"(B)"
);
stat
.
executeUpdate
(
"CREATE INDEX IDX_A ON T"
+
i
+
"(A)"
);
PreparedStatement
insert
=
stat
.
getConnection
().
prepareStatement
(
"INSERT INTO T"
+
i
+
" VALUES (?,?)"
);
for
(
int
j
=
i
,
size
=
i
+
10
;
j
<
size
;
j
++)
{
insert
.
setInt
(
1
,
j
);
insert
.
setInt
(
2
,
j
);
insert
.
executeUpdate
();
}
for
(
TreeSetTable
table
:
tables
)
{
assertEquals
(
10
,
table
.
getRowCount
(
null
));
}
}
int
[]
zeroBatchSizes
=
new
int
[
batchSizes
.
length
];
int
tests
=
1
<<
(
batchSizes
.
length
*
4
);
for
(
int
test
=
0
;
test
<
tests
;
test
++)
{
String
query
=
generateQuery
(
test
,
batchSizes
.
length
);
// System.out.println(Arrays.toString(batchSizes) + ": " + test + " -> " + query);
setBatchSize
(
tables
,
batchSizes
);
List
<
List
<
Object
>>
res1
=
query
(
stat
,
query
);
setBatchSize
(
tables
,
zeroBatchSizes
);
List
<
List
<
Object
>>
res2
=
query
(
stat
,
query
);
// System.out.println(res1 + " " + res2);
if
(!
res2
.
equals
(
res1
))
{
System
.
err
.
println
(
Arrays
.
toString
(
batchSizes
)
+
": "
+
res1
+
" "
+
res2
);
System
.
err
.
println
(
"Test "
+
test
);
System
.
err
.
println
(
query
);
for
(
TreeSetTable
table
:
tables
)
{
System
.
err
.
println
(
table
.
getName
()
+
" = "
+
query
(
stat
,
"select * from "
+
table
.
getName
()));
}
fail
();
}
}
}
private
static
void
setBatchSize
(
ArrayList
<
TreeSetTable
>
tables
,
int
...
batchSizes
)
{
for
(
int
i
=
0
;
i
<
batchSizes
.
length
;
i
++)
{
int
batchSize
=
batchSizes
[
i
];
for
(
Index
idx
:
tables
.
get
(
i
).
getIndexes
())
{
((
TreeSetIndex
)
idx
).
preferedBatchSize
=
batchSize
;
}
}
}
private
String
generateQuery
(
int
t
,
int
tables
)
{
final
int
withLeft
=
1
;
final
int
withFalse
=
2
;
final
int
withWhere
=
4
;
final
int
withOnIsNull
=
8
;
StringBuilder
b
=
new
StringBuilder
();
b
.
append
(
"select count(*) from "
);
StringBuilder
where
=
new
StringBuilder
();
for
(
int
i
=
0
;
i
<
tables
;
i
++)
{
if
(
i
!=
0
)
{
if
((
t
&
withLeft
)
!=
0
)
{
b
.
append
(
" left "
);
}
b
.
append
(
" join "
);
}
b
.
append
(
"\nT"
).
append
(
i
).
append
(
' '
);
if
(
i
!=
0
)
{
boolean
even
=
(
i
&
1
)
==
0
;
if
((
t
&
withOnIsNull
)
!=
0
)
{
b
.
append
(
" on T"
).
append
(
i
-
1
).
append
(
even
?
".B"
:
".A"
).
append
(
" is null"
);
}
else
if
((
t
&
withFalse
)
!=
0
)
{
b
.
append
(
" on false "
);
}
else
{
b
.
append
(
" on T"
).
append
(
i
-
1
).
append
(
even
?
".B = "
:
".A = "
);
b
.
append
(
"T"
).
append
(
i
).
append
(
even
?
".B "
:
".A "
);
}
}
if
((
t
&
withWhere
)
!=
0
)
{
if
(
where
.
length
()
!=
0
)
{
where
.
append
(
" and "
);
}
where
.
append
(
" T"
).
append
(
i
).
append
(
".A > 5"
);
}
t
>>>=
4
;
}
if
(
where
.
length
()
!=
0
)
{
b
.
append
(
"\nwhere "
).
append
(
where
);
}
return
b
.
toString
();
}
private
void
checkResultsNoOrder
(
Statement
stat
,
int
size
,
String
query1
,
String
query2
)
throws
SQLException
{
List
<
List
<
Object
>>
res1
=
query
(
stat
,
query1
);
...
...
@@ -710,9 +893,12 @@ public class TestTableEngines extends TestBase {
* A table engine that internally uses a tree set.
*/
public
static
class
TreeSetIndexTableEngine
implements
TableEngine
{
static
TreeSetTable
created
;
@Override
public
Table
createTable
(
CreateTableData
data
)
{
return
new
TreeSetTable
(
data
);
return
created
=
new
TreeSetTable
(
data
);
}
}
...
...
@@ -881,7 +1067,14 @@ public class TestTableEngines extends TestBase {
* An index that internally uses a tree set.
*/
private
static
class
TreeSetIndex
extends
BaseIndex
implements
Comparator
<
SearchRow
>
{
final
TreeSet
<
SearchRow
>
set
=
new
TreeSet
<
SearchRow
>(
this
);
/**
* Executor service to test batched joins.
*/
private
static
ExecutorService
exec
;
private
final
TreeSet
<
SearchRow
>
set
=
new
TreeSet
<
SearchRow
>(
this
);
private
int
preferedBatchSize
;
TreeSetIndex
(
Table
t
,
String
name
,
IndexColumn
[]
cols
,
IndexType
type
)
{
initBaseIndex
(
t
,
0
,
name
,
cols
,
type
);
...
...
@@ -890,12 +1083,53 @@ public class TestTableEngines extends TestBase {
@Override
public
int
compare
(
SearchRow
o1
,
SearchRow
o2
)
{
int
res
=
compareRows
(
o1
,
o2
);
if
(
res
==
0
&&
(
o1
.
getKey
()
==
Long
.
MAX_VALUE
||
o2
.
getKey
()
==
Long
.
MAX_VALUE
))
{
if
(
res
==
0
)
{
if
(
o1
.
getKey
()
==
Long
.
MAX_VALUE
||
o2
.
getKey
()
==
Long
.
MIN_VALUE
)
{
res
=
1
;
}
else
if
(
o1
.
getKey
()
==
Long
.
MIN_VALUE
||
o2
.
getKey
()
==
Long
.
MAX_VALUE
)
{
res
=
-
1
;
}
}
return
res
;
}
@Override
public
int
getPreferedLookupBatchSize
()
{
return
preferedBatchSize
;
}
@Override
public
List
<
Future
<
Cursor
>>
findBatched
(
final
TableFilter
filter
,
List
<
SearchRow
>
firstLastPairs
)
{
ArrayList
<
Future
<
Cursor
>>
result
=
New
.
arrayList
(
firstLastPairs
.
size
());
final
Random
rnd
=
new
Random
();
for
(
int
i
=
0
;
i
<
firstLastPairs
.
size
();
i
+=
2
)
{
final
SearchRow
first
=
firstLastPairs
.
get
(
i
);
final
SearchRow
last
=
firstLastPairs
.
get
(
i
+
1
);
Future
<
Cursor
>
future
;
if
(
rnd
.
nextBoolean
())
{
IteratorCursor
c
=
(
IteratorCursor
)
find
(
filter
,
first
,
last
);
if
(
c
.
it
.
hasNext
())
{
future
=
new
DoneFuture
<
Cursor
>(
c
);
}
else
{
// we can return null instead of future of empty cursor
future
=
null
;
}
}
else
{
future
=
exec
.
submit
(
new
Callable
<
Cursor
>()
{
@Override
public
Cursor
call
()
throws
Exception
{
if
(
rnd
.
nextInt
(
50
)
==
0
)
{
Thread
.
sleep
(
0
,
500
);
}
return
find
(
filter
,
first
,
last
);
}
});
}
result
.
add
(
future
);
}
return
result
;
}
@Override
public
void
close
(
Session
session
)
{
// No-op.
...
...
@@ -911,10 +1145,10 @@ public class TestTableEngines extends TestBase {
set
.
remove
(
row
);
}
private
static
SearchRow
mark
(
SearchRow
row
)
{
private
static
SearchRow
mark
(
SearchRow
row
,
boolean
first
)
{
if
(
row
!=
null
)
{
// Mark this row to be a search row.
row
.
setKey
(
Long
.
MAX_VALUE
);
row
.
setKey
(
first
?
Long
.
MIN_VALUE
:
Long
.
MAX_VALUE
);
}
return
row
;
}
...
...
@@ -926,19 +1160,23 @@ public class TestTableEngines extends TestBase {
subSet
=
Collections
.
emptySet
();
}
else
{
if
(
first
!=
null
)
{
first
=
set
.
floor
(
mark
(
first
));
first
=
set
.
floor
(
mark
(
first
,
true
));
}
if
(
last
!=
null
)
{
last
=
set
.
ceiling
(
mark
(
last
));
last
=
set
.
ceiling
(
mark
(
last
,
false
));
}
if
(
first
==
null
&&
last
==
null
)
{
subSet
=
set
;
}
else
if
(
first
!=
null
)
{
if
(
last
!=
null
)
{
subSet
=
set
.
subSet
(
first
,
true
,
last
,
true
);
}
else
{
subSet
=
set
.
tailSet
(
first
,
true
);
}
}
else
if
(
last
!=
null
)
{
subSet
=
set
.
headSet
(
last
,
true
);
}
else
{
subSet
=
set
.
subSet
(
first
,
true
,
last
,
true
);
throw
new
IllegalStateException
(
);
}
}
return
new
IteratorCursor
(
subSet
.
iterator
());
...
...
@@ -1031,6 +1269,11 @@ public class TestTableEngines extends TestBase {
public
Row
get
()
{
return
current
;
}
@Override
public
String
toString
()
{
return
"IterCursor->"
+
current
;
}
}
/**
...
...
This diff is collapsed.
Click to expand it.
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论