Skip to content
项目
群组
代码片段
帮助
正在加载...
帮助
为 GitLab 提交贡献
登录/注册
切换导航
H
h2database
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分枝图
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
计划
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
分枝图
统计图
创建新议题
作业
提交
议题看板
打开侧边栏
Administrator
h2database
Commits
5ac45917
提交
5ac45917
authored
5月 26, 2018
作者:
andrei
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
Use DecisionMakers for transactional update/lock/commit/rollback
上级
dccf853b
隐藏空白字符变更
内嵌
并排
正在显示
20 个修改的文件
包含
758 行增加
和
306 行删除
+758
-306
Command.java
h2/src/main/org/h2/command/Command.java
+98
-99
Database.java
h2/src/main/org/h2/engine/Database.java
+6
-0
Session.java
h2/src/main/org/h2/engine/Session.java
+26
-8
DataUtils.java
h2/src/main/org/h2/mvstore/DataUtils.java
+5
-0
MVMap.java
h2/src/main/org/h2/mvstore/MVMap.java
+8
-1
MVPrimaryIndex.java
h2/src/main/org/h2/mvstore/db/MVPrimaryIndex.java
+21
-3
MVTable.java
h2/src/main/org/h2/mvstore/db/MVTable.java
+22
-19
MVTableEngine.java
h2/src/main/org/h2/mvstore/db/MVTableEngine.java
+1
-1
MVRTreeMap.java
h2/src/main/org/h2/mvstore/rtree/MVRTreeMap.java
+1
-0
CommitDecisionMaker.java
h2/src/main/org/h2/mvstore/tx/CommitDecisionMaker.java
+69
-0
RollbackDecisionMaker.java
h2/src/main/org/h2/mvstore/tx/RollbackDecisionMaker.java
+70
-0
Transaction.java
h2/src/main/org/h2/mvstore/tx/Transaction.java
+113
-3
TransactionMap.java
h2/src/main/org/h2/mvstore/tx/TransactionMap.java
+71
-53
TransactionStore.java
h2/src/main/org/h2/mvstore/tx/TransactionStore.java
+36
-54
TxDecisionMaker.java
h2/src/main/org/h2/mvstore/tx/TxDecisionMaker.java
+180
-0
VersionedValue.java
h2/src/main/org/h2/mvstore/tx/VersionedValue.java
+2
-0
Table.java
h2/src/main/org/h2/table/Table.java
+1
-1
TestBase.java
h2/src/test/org/h2/test/TestBase.java
+1
-1
TestMvcc4.java
h2/src/test/org/h2/test/mvcc/TestMvcc4.java
+11
-37
TestMvccMultiThreaded.java
h2/src/test/org/h2/test/mvcc/TestMvccMultiThreaded.java
+16
-26
没有找到文件。
h2/src/main/org/h2/command/Command.java
浏览文件 @
5ac45917
...
...
@@ -7,6 +7,8 @@ package org.h2.command;
import
java.sql.SQLException
;
import
java.util.ArrayList
;
import
java.util.concurrent.TimeUnit
;
import
org.h2.api.ErrorCode
;
import
org.h2.engine.Constants
;
import
org.h2.engine.Database
;
...
...
@@ -184,7 +186,7 @@ public abstract class Command implements CommandInterface {
startTimeNanos
=
0
;
long
start
=
0
;
Database
database
=
session
.
getDatabase
();
Object
sync
=
database
.
isMultiThreaded
()
?
(
Object
)
session
:
(
Object
)
database
;
Object
sync
=
database
.
getMvStore
()
!=
null
?
null
:
database
.
isMultiThreaded
()
?
session
:
database
;
session
.
waitIfExclusiveModeEnabled
();
boolean
callStop
=
true
;
boolean
writing
=
!
isReadOnly
();
...
...
@@ -193,47 +195,54 @@ public abstract class Command implements CommandInterface {
// wait
}
}
synchronized
(
sync
)
{
session
.
setCurrentCommand
(
this
,
false
);
try
{
while
(
true
)
{
database
.
checkPowerOff
();
try
{
ResultInterface
result
=
query
(
maxrows
);
callStop
=
!
result
.
isLazy
();
return
result
;
}
catch
(
DbException
e
)
{
start
=
filterConcurrentUpdate
(
e
,
start
);
}
catch
(
OutOfMemoryError
e
)
{
callStop
=
false
;
// there is a serious problem:
// the transaction may be applied partially
// in this case we need to panic:
// close the database
database
.
shutdownImmediately
();
throw
DbException
.
convert
(
e
);
}
catch
(
Throwable
e
)
{
throw
DbException
.
convert
(
e
);
session
.
startStatementWithinTransaction
();
session
.
setCurrentCommand
(
this
,
false
);
try
{
while
(
true
)
{
database
.
checkPowerOff
();
try
{
ResultInterface
result
;
if
(
sync
!=
null
)
{
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized
(
sync
)
{
result
=
query
(
maxrows
);
}
}
else
{
result
=
query
(
maxrows
);
}
}
}
catch
(
DbException
e
)
{
e
=
e
.
addSQL
(
sql
);
SQLException
s
=
e
.
getSQLException
();
database
.
exceptionThrown
(
s
,
sql
);
if
(
s
.
getErrorCode
()
==
ErrorCode
.
OUT_OF_MEMORY
)
{
callStop
=
!
result
.
isLazy
();
return
result
;
}
catch
(
DbException
e
)
{
start
=
filterConcurrentUpdate
(
e
,
start
);
}
catch
(
OutOfMemoryError
e
)
{
callStop
=
false
;
// there is a serious problem:
// the transaction may be applied partially
// in this case we need to panic:
// close the database
database
.
shutdownImmediately
();
throw
e
;
throw
DbException
.
convert
(
e
);
}
catch
(
Throwable
e
)
{
throw
DbException
.
convert
(
e
);
}
database
.
checkPowerOff
();
}
}
catch
(
DbException
e
)
{
e
=
e
.
addSQL
(
sql
);
SQLException
s
=
e
.
getSQLException
();
database
.
exceptionThrown
(
s
,
sql
);
if
(
s
.
getErrorCode
()
==
ErrorCode
.
OUT_OF_MEMORY
)
{
callStop
=
false
;
database
.
shutdownImmediately
();
throw
e
;
}
finally
{
if
(
callStop
)
{
stop
();
}
if
(
writing
)
{
database
.
afterWriting
();
}
}
database
.
checkPowerOff
();
throw
e
;
}
finally
{
if
(
callStop
)
{
stop
();
}
if
(
writing
)
{
database
.
afterWriting
();
}
}
}
...
...
@@ -242,7 +251,7 @@ public abstract class Command implements CommandInterface {
public
ResultWithGeneratedKeys
executeUpdate
(
Object
generatedKeysRequest
)
{
long
start
=
0
;
Database
database
=
session
.
getDatabase
();
Object
sync
=
database
.
isMultiThreaded
()
?
(
Object
)
session
:
(
Object
)
database
;
Object
sync
=
database
.
getMvStore
()
!=
null
?
null
:
database
.
isMultiThreaded
()
?
session
:
database
;
session
.
waitIfExclusiveModeEnabled
();
boolean
callStop
=
true
;
boolean
writing
=
!
isReadOnly
();
...
...
@@ -251,54 +260,61 @@ public abstract class Command implements CommandInterface {
// wait
}
}
synchronized
(
sync
)
{
Session
.
Savepoint
rollback
=
session
.
setSavepoint
();
session
.
setCurrentCommand
(
this
,
generatedKeysRequest
);
try
{
while
(
true
)
{
database
.
checkPowerOff
();
try
{
int
updateCount
=
update
();
if
(!
Boolean
.
FALSE
.
equals
(
generatedKeysRequest
))
{
return
new
ResultWithGeneratedKeys
.
WithKeys
(
updateCount
,
session
.
getGeneratedKeys
().
getKeys
(
session
));
Session
.
Savepoint
rollback
=
session
.
setSavepoint
();
session
.
startStatementWithinTransaction
();
session
.
setCurrentCommand
(
this
,
generatedKeysRequest
);
try
{
while
(
true
)
{
database
.
checkPowerOff
();
try
{
int
updateCount
;
if
(
sync
!=
null
)
{
//noinspection SynchronizationOnLocalVariableOrMethodParameter
synchronized
(
sync
)
{
updateCount
=
update
();
}
return
ResultWithGeneratedKeys
.
of
(
updateCount
);
}
catch
(
DbException
e
)
{
start
=
filterConcurrentUpdate
(
e
,
start
);
}
catch
(
OutOfMemoryError
e
)
{
callStop
=
false
;
database
.
shutdownImmediately
();
throw
DbException
.
convert
(
e
);
}
catch
(
Throwable
e
)
{
throw
DbException
.
convert
(
e
);
}
else
{
updateCount
=
update
();
}
}
}
catch
(
DbException
e
)
{
e
=
e
.
addSQL
(
sql
);
SQLException
s
=
e
.
getSQLException
();
database
.
exceptionThrown
(
s
,
sql
);
if
(
s
.
getErrorCode
()
==
ErrorCode
.
OUT_OF_MEMORY
)
{
if
(!
Boolean
.
FALSE
.
equals
(
generatedKeysRequest
))
{
return
new
ResultWithGeneratedKeys
.
WithKeys
(
updateCount
,
session
.
getGeneratedKeys
().
getKeys
(
session
));
}
return
ResultWithGeneratedKeys
.
of
(
updateCount
);
}
catch
(
DbException
e
)
{
start
=
filterConcurrentUpdate
(
e
,
start
);
}
catch
(
OutOfMemoryError
e
)
{
callStop
=
false
;
database
.
shutdownImmediately
();
throw
e
;
}
database
.
checkPowerOff
();
if
(
s
.
getErrorCode
()
==
ErrorCode
.
DEADLOCK_1
)
{
session
.
rollback
();
}
else
{
session
.
rollbackTo
(
rollback
,
false
);
throw
DbException
.
convert
(
e
);
}
catch
(
Throwable
e
)
{
throw
DbException
.
convert
(
e
);
}
}
}
catch
(
DbException
e
)
{
e
=
e
.
addSQL
(
sql
);
SQLException
s
=
e
.
getSQLException
();
database
.
exceptionThrown
(
s
,
sql
);
if
(
s
.
getErrorCode
()
==
ErrorCode
.
OUT_OF_MEMORY
)
{
callStop
=
false
;
database
.
shutdownImmediately
();
throw
e
;
}
database
.
checkPowerOff
();
if
(
s
.
getErrorCode
()
==
ErrorCode
.
DEADLOCK_1
)
{
session
.
rollback
();
}
else
{
session
.
rollbackTo
(
rollback
,
false
);
}
throw
e
;
}
finally
{
try
{
if
(
callStop
)
{
stop
();
}
}
finally
{
try
{
if
(
callStop
)
{
stop
();
}
}
finally
{
if
(
writing
)
{
database
.
afterWriting
();
}
if
(
writing
)
{
database
.
afterWriting
();
}
}
}
...
...
@@ -311,26 +327,9 @@ public abstract class Command implements CommandInterface {
errorCode
!=
ErrorCode
.
ROW_NOT_FOUND_WHEN_DELETING_1
)
{
throw
e
;
}
long
now
=
System
.
nanoTime
()
/
1_000_000
;
if
(
start
!=
0
&&
now
-
start
>
session
.
getLockTimeout
())
{
throw
DbException
.
get
(
ErrorCode
.
LOCK_TIMEOUT_1
,
e
.
getCause
(),
""
);
}
Database
database
=
session
.
getDatabase
();
int
sleep
=
1
+
MathUtils
.
randomInt
(
10
);
while
(
true
)
{
try
{
if
(
database
.
isMultiThreaded
())
{
Thread
.
sleep
(
sleep
);
}
else
{
database
.
wait
(
sleep
);
}
}
catch
(
InterruptedException
e1
)
{
// ignore
}
long
slept
=
System
.
nanoTime
()
/
1_000_000
-
now
;
if
(
slept
>=
sleep
)
{
break
;
}
long
now
=
System
.
nanoTime
();
if
(
start
!=
0
&&
now
-
start
>
TimeUnit
.
MILLISECONDS
.
toNanos
(
session
.
getLockTimeout
()))
{
throw
DbException
.
get
(
ErrorCode
.
LOCK_TIMEOUT_1
,
e
);
}
return
start
==
0
?
now
:
start
;
}
...
...
h2/src/main/org/h2/engine/Database.java
浏览文件 @
5ac45917
...
...
@@ -331,6 +331,12 @@ public class Database implements DataHandler {
}
}
public
long
getLockTimeout
()
{
Setting
setting
=
findSetting
(
SetTypes
.
getTypeName
(
SetTypes
.
DEFAULT_LOCK_TIMEOUT
));
return
setting
==
null
?
Constants
.
INITIAL_LOCK_TIMEOUT
:
setting
.
getIntValue
();
}
/**
* Create a new row for a table.
*
...
...
h2/src/main/org/h2/engine/Session.java
浏览文件 @
5ac45917
...
...
@@ -64,6 +64,8 @@ import org.h2.value.ValueString;
*/
public
class
Session
extends
SessionWithState
implements
TransactionStore
.
RollbackListener
{
public
enum
State
{
INIT
,
RUNNING
,
BLOCKED
,
SLEEP
,
CLOSED
}
/**
* This special log position means that the log entry has been written.
*/
...
...
@@ -158,6 +160,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
private
ArrayList
<
Value
>
temporaryLobs
;
private
Transaction
transaction
;
private
State
state
=
State
.
INIT
;
private
long
startStatement
=
-
1
;
public
Session
(
Database
database
,
User
user
,
int
id
)
{
...
...
@@ -167,10 +170,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
this
.
undoLog
=
new
UndoLog
(
this
);
this
.
user
=
user
;
this
.
id
=
id
;
Setting
setting
=
database
.
findSetting
(
SetTypes
.
getTypeName
(
SetTypes
.
DEFAULT_LOCK_TIMEOUT
));
this
.
lockTimeout
=
setting
==
null
?
Constants
.
INITIAL_LOCK_TIMEOUT
:
setting
.
getIntValue
();
this
.
lockTimeout
=
(
int
)
database
.
getLockTimeout
();
this
.
currentSchemaName
=
Constants
.
SCHEMA_MAIN
;
this
.
columnNamerConfiguration
=
ColumnNamerConfiguration
.
getDefault
();
}
...
...
@@ -861,6 +861,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
@Override
public
void
close
()
{
if
(!
closed
)
{
state
=
State
.
CLOSED
;
try
{
database
.
checkPowerOff
();
...
...
@@ -874,9 +875,9 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
// and we need to unlock before we call removeSession(), which might
// want to take the meta lock using the system session.
database
.
unlockMeta
(
this
);
database
.
removeSession
(
this
);
}
finally
{
closed
=
true
;
database
.
removeSession
(
this
);
}
}
}
...
...
@@ -1212,11 +1213,15 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
if
(
lastThrottle
+
TimeUnit
.
MILLISECONDS
.
toNanos
(
Constants
.
THROTTLE_DELAY
)
>
time
)
{
return
;
}
State
prevState
=
this
.
state
;
lastThrottle
=
time
+
throttleNs
;
try
{
this
.
state
=
State
.
SLEEP
;
Thread
.
sleep
(
TimeUnit
.
NANOSECONDS
.
toMillis
(
throttleNs
));
}
catch
(
Exception
e
)
{
// ignore InterruptedException
}
finally
{
this
.
state
=
prevState
;
}
}
...
...
@@ -1244,6 +1249,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
long
now
=
System
.
nanoTime
();
cancelAtNs
=
now
+
TimeUnit
.
MILLISECONDS
.
toNanos
(
queryTimeout
);
}
state
=
command
==
null
?
State
.
SLEEP
:
State
.
RUNNING
;
}
/**
...
...
@@ -1633,7 +1639,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
public
Value
getTransactionId
()
{
if
(
database
.
getMvStore
()
!=
null
)
{
if
(
transaction
==
null
)
{
if
(
transaction
==
null
||
!
transaction
.
hasChanges
()
)
{
return
ValueNull
.
INSTANCE
;
}
return
ValueString
.
get
(
Long
.
toString
(
getTransaction
().
getSequenceNum
()));
...
...
@@ -1674,14 +1680,14 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
database
.
shutdownImmediately
();
throw
DbException
.
get
(
ErrorCode
.
DATABASE_IS_CLOSED
);
}
transaction
=
store
.
getTransactionStore
().
begin
(
this
);
transaction
=
store
.
getTransactionStore
().
begin
(
this
,
this
.
lockTimeout
,
id
);
}
startStatement
=
-
1
;
}
return
transaction
;
}
p
ublic
long
getStatementSavepoint
()
{
p
rivate
long
getStatementSavepoint
()
{
if
(
startStatement
==
-
1
)
{
startStatement
=
getTransaction
().
setSavepoint
();
}
...
...
@@ -1754,6 +1760,18 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
tablesToAnalyze
.
add
(
table
);
}
public
State
getState
()
{
return
getBlockingSessionId
()
!=
0
?
State
.
BLOCKED
:
state
;
}
public
void
setState
(
State
state
)
{
this
.
state
=
state
;
}
public
int
getBlockingSessionId
()
{
return
transaction
==
null
?
0
:
transaction
.
getBlockerId
();
}
@Override
public
void
onRollback
(
MVMap
<
Object
,
VersionedValue
>
map
,
Object
key
,
VersionedValue
existingValue
,
...
...
h2/src/main/org/h2/mvstore/DataUtils.java
浏览文件 @
5ac45917
...
...
@@ -101,6 +101,11 @@ public final class DataUtils {
*/
public
static
final
int
ERROR_TRANSACTION_TOO_BIG
=
104
;
/**
* Deadlock discovered and one of transactions involved chosen as victim and rolled back.
*/
public
static
final
int
ERROR_TRANSACTIONS_DEADLOCK
=
105
;
/**
* The type for leaf page.
*/
...
...
h2/src/main/org/h2/mvstore/MVMap.java
浏览文件 @
5ac45917
...
...
@@ -1382,7 +1382,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
}
public
enum
Decision
{
ABORT
,
REMOVE
,
PUT
}
public
enum
Decision
{
ABORT
,
REMOVE
,
PUT
,
REPEAT
}
/**
* Class DecisionMaker provides callback interface (and should become a such in Java 8)
...
...
@@ -1520,6 +1520,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
boolean
needUnlock
=
false
;
try
{
switch
(
decision
)
{
case
REPEAT:
decisionMaker
.
reset
();
continue
;
case
ABORT:
if
(
rootReference
!=
getRoot
())
{
decisionMaker
.
reset
();
...
...
@@ -1528,6 +1531,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return
result
;
case
REMOVE:
{
if
(
index
<
0
)
{
if
(
rootReference
!=
getRoot
())
{
decisionMaker
.
reset
();
continue
;
}
return
null
;
}
if
(
attempt
>
2
&&
!(
needUnlock
=
lockRoot
(
decisionMaker
,
rootReference
,
...
...
h2/src/main/org/h2/mvstore/db/MVPrimaryIndex.java
浏览文件 @
5ac45917
...
...
@@ -115,12 +115,18 @@ public class MVPrimaryIndex extends BaseIndex {
TransactionMap
<
Value
,
Value
>
map
=
getMap
(
session
);
Value
key
=
ValueLong
.
get
(
row
.
getKey
());
try
{
if
(
map
.
put
(
key
,
ValueArray
.
get
(
row
.
getValueList
()))
!=
null
)
{
Value
oldValue
=
map
.
putIfAbsent
(
key
,
ValueArray
.
get
(
row
.
getValueList
()));
if
(
oldValue
!=
null
)
{
String
sql
=
"PRIMARY KEY ON "
+
table
.
getSQL
();
if
(
mainIndexColumn
>=
0
&&
mainIndexColumn
<
indexColumns
.
length
)
{
sql
+=
"("
+
indexColumns
[
mainIndexColumn
].
getSQL
()
+
")"
;
}
DbException
e
=
DbException
.
get
(
ErrorCode
.
DUPLICATE_KEY_1
,
sql
);
int
errorCode
=
ErrorCode
.
CONCURRENT_UPDATE_1
;
if
(
map
.
get
(
key
)
!=
null
)
{
// committed
errorCode
=
ErrorCode
.
DUPLICATE_KEY_1
;
}
DbException
e
=
DbException
.
get
(
errorCode
,
sql
+
" "
+
oldValue
);
e
.
setSource
(
this
);
throw
e
;
}
...
...
@@ -156,6 +162,19 @@ public class MVPrimaryIndex extends BaseIndex {
}
}
public
void
lockRows
(
Session
session
,
Iterator
<
Row
>
rowsForUpdate
)
{
TransactionMap
<
Value
,
Value
>
map
=
getMap
(
session
);
while
(
rowsForUpdate
.
hasNext
())
{
Row
row
=
rowsForUpdate
.
next
();
long
key
=
row
.
getKey
();
try
{
map
.
lock
(
ValueLong
.
get
(
key
));
}
catch
(
IllegalStateException
ex
)
{
throw
mvTable
.
convertException
(
ex
);
}
}
}
@Override
public
Cursor
find
(
Session
session
,
SearchRow
first
,
SearchRow
last
)
{
ValueLong
min
,
max
;
...
...
@@ -410,5 +429,4 @@ public class MVPrimaryIndex extends BaseIndex {
}
}
}
h2/src/main/org/h2/mvstore/db/MVTable.java
浏览文件 @
5ac45917
...
...
@@ -10,6 +10,7 @@ import java.util.ArrayList;
import
java.util.Collections
;
import
java.util.Comparator
;
import
java.util.HashSet
;
import
java.util.Iterator
;
import
java.util.Set
;
import
java.util.concurrent.ConcurrentHashMap
;
import
java.util.concurrent.TimeUnit
;
...
...
@@ -708,7 +709,11 @@ public class MVTable extends TableBase {
index
.
remove
(
session
,
row
);
}
}
catch
(
Throwable
e
)
{
t
.
rollbackToSavepoint
(
savepoint
);
try
{
t
.
rollbackToSavepoint
(
savepoint
);
}
catch
(
Throwable
nested
)
{
e
.
addSuppressed
(
nested
);
}
throw
DbException
.
convert
(
e
);
}
analyzeIfRequired
(
session
);
...
...
@@ -734,26 +739,21 @@ public class MVTable extends TableBase {
index
.
add
(
session
,
row
);
}
}
catch
(
Throwable
e
)
{
t
.
rollbackToSavepoint
(
savepoint
);
DbException
de
=
DbException
.
convert
(
e
);
if
(
de
.
getErrorCode
()
==
ErrorCode
.
DUPLICATE_KEY_1
)
{
for
(
Index
index
:
indexes
)
{
if
(
index
.
getIndexType
().
isUnique
()
&&
index
instanceof
MultiVersionIndex
)
{
MultiVersionIndex
mv
=
(
MultiVersionIndex
)
index
;
if
(
mv
.
isUncommittedFromOtherSession
(
session
,
row
))
{
throw
DbException
.
get
(
ErrorCode
.
CONCURRENT_UPDATE_1
,
index
.
getName
());
}
}
}
try
{
t
.
rollbackToSavepoint
(
savepoint
);
}
catch
(
Throwable
nested
)
{
e
.
addSuppressed
(
nested
);
}
throw
de
;
throw
DbException
.
convert
(
e
)
;
}
analyzeIfRequired
(
session
);
}
@Override
public
void
lockRows
(
Session
session
,
Iterator
<
Row
>
rowsForUpdate
)
{
primaryIndex
.
lockRows
(
session
,
rowsForUpdate
);
}
private
void
analyzeIfRequired
(
Session
session
)
{
synchronized
(
this
)
{
if
(
nextAnalyze
==
0
||
nextAnalyze
>
changesSinceAnalyze
++)
{
...
...
@@ -919,12 +919,15 @@ public class MVTable extends TableBase {
* @return the database exception
*/
DbException
convertException
(
IllegalStateException
e
)
{
i
f
(
DataUtils
.
getErrorCode
(
e
.
getMessage
())
==
DataUtils
.
ERROR_TRANSACTION_LOCKED
)
{
i
nt
errorCode
=
DataUtils
.
getErrorCode
(
e
.
getMessage
());
if
(
errorCode
==
DataUtils
.
ERROR_TRANSACTION_LOCKED
)
{
throw
DbException
.
get
(
ErrorCode
.
CONCURRENT_UPDATE_1
,
e
,
getName
());
}
if
(
errorCode
==
DataUtils
.
ERROR_TRANSACTIONS_DEADLOCK
)
{
throw
DbException
.
get
(
ErrorCode
.
DEADLOCK_1
,
e
,
getName
());
}
return
store
.
convertIllegalStateException
(
e
);
}
}
h2/src/main/org/h2/mvstore/db/MVTableEngine.java
浏览文件 @
5ac45917
...
...
@@ -165,7 +165,7 @@ public class MVTableEngine implements TableEngine {
}
this
.
transactionStore
=
new
TransactionStore
(
store
,
new
ValueDataType
(
db
.
getCompareMode
(),
db
,
null
));
new
ValueDataType
(
db
.
getCompareMode
(),
db
,
null
)
,
db
.
getLockTimeout
()
);
// transactionStore.init();
}
catch
(
IllegalStateException
e
)
{
throw
convertIllegalStateException
(
e
);
...
...
h2/src/main/org/h2/mvstore/rtree/MVRTreeMap.java
浏览文件 @
5ac45917
...
...
@@ -180,6 +180,7 @@ public final class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
result
=
index
<
0
?
null
:
(
V
)
p
.
getValue
(
index
);
Decision
decision
=
decisionMaker
.
decide
(
result
,
value
);
switch
(
decision
)
{
case
REPEAT:
break
;
case
ABORT:
break
;
case
REMOVE:
if
(
index
>=
0
)
{
...
...
h2/src/main/org/h2/mvstore/tx/CommitDecisionMaker.java
0 → 100644
浏览文件 @
5ac45917
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package
org
.
h2
.
mvstore
.
tx
;
import
org.h2.mvstore.MVMap
;
/**
* Class CommitDecisionMaker makes a decision during post-commit processing
* about how to transform uncommitted map entry into committed one,
* based on undo log information.
*
* @author <a href='mailto:andrei.tokar@gmail.com'>Andrei Tokar</a>
*/
final
class
CommitDecisionMaker
extends
MVMap
.
DecisionMaker
<
VersionedValue
>
{
private
long
undoKey
;
private
MVMap
.
Decision
decision
;
public
void
setUndoKey
(
long
undoKey
)
{
this
.
undoKey
=
undoKey
;
reset
();
}
@Override
public
MVMap
.
Decision
decide
(
VersionedValue
existingValue
,
VersionedValue
providedValue
)
{
assert
decision
==
null
;
if
(
existingValue
==
null
)
{
// map entry was treated as committed already and
// removed already by another commited by now transaction
decision
=
MVMap
.
Decision
.
ABORT
;
}
else
{
if
(
existingValue
.
getOperationId
()
==
undoKey
)
{
// this is final undo log entry for this key
if
(
existingValue
.
value
==
null
)
{
decision
=
MVMap
.
Decision
.
REMOVE
;
}
else
{
decision
=
MVMap
.
Decision
.
PUT
;
}
}
else
{
// this is not a final undo log entry for this key,
// or map entry was treated as committed already and
// overwritten already by another transaction
// see TxDecisionMaker.decide()
decision
=
MVMap
.
Decision
.
ABORT
;
}
}
return
decision
;
}
@SuppressWarnings
(
"unchecked"
)
@Override
public
VersionedValue
selectValue
(
VersionedValue
existingValue
,
VersionedValue
providedValue
)
{
assert
decision
==
MVMap
.
Decision
.
PUT
;
assert
existingValue
!=
null
;
return
new
VersionedValue
(
0L
,
existingValue
.
value
);
}
@Override
public
void
reset
()
{
decision
=
null
;
}
@Override
public
String
toString
()
{
return
"commit "
+
TransactionStore
.
getTransactionId
(
undoKey
);
}
}
h2/src/main/org/h2/mvstore/tx/RollbackDecisionMaker.java
0 → 100644
浏览文件 @
5ac45917
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package
org
.
h2
.
mvstore
.
tx
;
import
org.h2.mvstore.MVMap
;
/**
* Class RollbackDecisionMaker process undo log record during transaction rollback.
*
* @author <a href='mailto:andrei.tokar@gmail.com'>Andrei Tokar</a>
*/
final
class
RollbackDecisionMaker
extends
MVMap
.
DecisionMaker
<
Object
[]>
{
private
final
TransactionStore
store
;
private
final
long
transactionId
;
private
final
long
toLogId
;
private
final
TransactionStore
.
RollbackListener
listener
;
private
MVMap
.
Decision
decision
;
RollbackDecisionMaker
(
TransactionStore
store
,
long
transactionId
,
long
toLogId
,
TransactionStore
.
RollbackListener
listener
)
{
this
.
store
=
store
;
this
.
transactionId
=
transactionId
;
this
.
toLogId
=
toLogId
;
this
.
listener
=
listener
;
}
@Override
public
MVMap
.
Decision
decide
(
Object
[]
existingValue
,
Object
[]
providedValue
)
{
assert
decision
==
null
;
assert
existingValue
!=
null
;
/*
if (existingValue == null) {
// this should only be possible during initialization
// when previously database was abruptly killed
// assert !store.isInitialized();
decision = MVMap.Decision.ABORT;
} else {
*/
VersionedValue
valueToRestore
=
(
VersionedValue
)
existingValue
[
2
];
long
operationId
;
if
(
valueToRestore
==
null
||
(
operationId
=
valueToRestore
.
getOperationId
())
==
0
||
TransactionStore
.
getTransactionId
(
operationId
)
==
transactionId
&&
TransactionStore
.
getLogId
(
operationId
)
<
toLogId
)
{
int
mapId
=
(
Integer
)
existingValue
[
0
];
MVMap
<
Object
,
VersionedValue
>
map
=
store
.
openMap
(
mapId
);
if
(
map
!=
null
&&
!
map
.
isClosed
())
{
Object
key
=
existingValue
[
1
];
VersionedValue
previousValue
=
map
.
operate
(
key
,
valueToRestore
,
MVMap
.
DecisionMaker
.
DEFAULT
);
listener
.
onRollback
(
map
,
key
,
previousValue
,
valueToRestore
);
}
}
decision
=
MVMap
.
Decision
.
REMOVE
;
// }
return
decision
;
}
@Override
public
void
reset
()
{
decision
=
null
;
}
@Override
public
String
toString
()
{
return
"rollback-"
+
transactionId
;
}
}
h2/src/main/org/h2/mvstore/tx/Transaction.java
浏览文件 @
5ac45917
...
...
@@ -106,19 +106,60 @@ public class Transaction {
*/
private
final
AtomicLong
statusAndLogId
;
/**
* Reference to a counter for an earliest store version used by this transaction.
* Referenced version and all newer ones can not be discarded
* at least until this transaction ends.
*/
private
MVStore
.
TxCounter
txCounter
;
/**
* Transaction name.
*/
private
String
name
;
/**
* Indicates whether this transaction was stored in preparedTransactions map
*/
boolean
wasStored
;
/**
* How long to wait for blocking transaction to commit or rollback.
*/
final
long
timeoutMillis
;
/**
* Identification of the owner of this transaction,
* usually the owner is a database session.
*/
private
final
int
ownerId
;
/**
* Blocking transaction, if any
*/
private
volatile
Transaction
blockingTransaction
;
/**
* Map on which this transaction is blocked.
*/
MVMap
blockingMap
;
/**
* Key in blockingMap on which this transaction is blocked.
*/
Object
blockingKey
;
Transaction
(
TransactionStore
store
,
int
transactionId
,
long
sequenceNum
,
int
status
,
String
name
,
long
logId
,
TransactionStore
.
RollbackListener
listener
)
{
String
name
,
long
logId
,
long
timeoutMillis
,
int
ownerId
,
TransactionStore
.
RollbackListener
listener
)
{
this
.
store
=
store
;
this
.
transactionId
=
transactionId
;
this
.
sequenceNum
=
sequenceNum
;
this
.
statusAndLogId
=
new
AtomicLong
(
composeState
(
status
,
logId
,
false
));
this
.
name
=
name
;
this
.
timeoutMillis
=
timeoutMillis
;
this
.
ownerId
=
ownerId
;
this
.
listener
=
listener
;
}
...
...
@@ -201,6 +242,10 @@ public class Transaction {
return
name
;
}
public
int
getBlockerId
()
{
return
blockingTransaction
==
null
?
0
:
blockingTransaction
.
ownerId
;
}
/**
* Create a new savepoint.
*
...
...
@@ -217,8 +262,8 @@ public class Transaction {
public
void
markStatementEnd
()
{
MVStore
.
TxCounter
counter
=
txCounter
;
txCounter
=
null
;
if
(
counter
!=
null
)
{
txCounter
=
null
;
store
.
store
.
deregisterVersionUsage
(
counter
);
}
}
...
...
@@ -322,9 +367,9 @@ public class Transaction {
boolean
hasChanges
=
false
;
try
{
long
state
=
setStatus
(
STATUS_COMMITTING
);
long
logId
=
getLogId
(
state
);
hasChanges
=
hasChanges
(
state
);
if
(
hasChanges
)
{
long
logId
=
getLogId
(
state
);
store
.
commit
(
this
,
logId
);
}
}
catch
(
Throwable
e
)
{
...
...
@@ -364,6 +409,7 @@ public class Transaction {
"while rollback to savepoint was in progress"
,
transactionId
);
}
notifyAllWaitingTransactions
();
}
}
...
...
@@ -420,6 +466,70 @@ public class Transaction {
}
}
void
closeIt
()
{
long
lastState
=
setStatus
(
STATUS_CLOSED
);
store
.
store
.
deregisterVersionUsage
(
txCounter
);
if
(
hasChanges
(
lastState
)
||
hasRollback
(
lastState
))
{
notifyAllWaitingTransactions
();
}
}
private
synchronized
void
notifyAllWaitingTransactions
()
{
notifyAll
();
}
public
boolean
waitFor
(
Transaction
toWaitFor
)
{
if
(
isDeadlocked
(
toWaitFor
))
{
StringBuilder
details
=
new
StringBuilder
(
String
.
format
(
"Transaction %d has been chosen as a deadlock victim. Details:%n"
,
transactionId
));
for
(
Transaction
tx
=
toWaitFor
,
nextTx
;
(
nextTx
=
tx
.
blockingTransaction
)
!=
null
;
tx
=
nextTx
)
{
details
.
append
(
String
.
format
(
"Transaction %d attempts to update map <%s> entry with key <%s> modified by transaction %s%n"
,
tx
.
transactionId
,
tx
.
blockingMap
.
getName
(),
tx
.
blockingKey
,
tx
.
blockingTransaction
));
if
(
nextTx
==
this
)
{
details
.
append
(
String
.
format
(
"Transaction %d attempts to update map <%s> entry with key <%s> modified by transaction %s%n"
,
transactionId
,
blockingMap
.
getName
(),
blockingKey
,
toWaitFor
));
throw
DataUtils
.
newIllegalStateException
(
DataUtils
.
ERROR_TRANSACTIONS_DEADLOCK
,
details
.
toString
());
}
}
}
blockingTransaction
=
toWaitFor
;
try
{
return
toWaitFor
.
waitForThisToEnd
(
timeoutMillis
);
}
finally
{
blockingMap
=
null
;
blockingKey
=
null
;
blockingTransaction
=
null
;
}
}
private
boolean
isDeadlocked
(
Transaction
toWaitFor
)
{
for
(
Transaction
tx
=
toWaitFor
,
nextTx
;
(
nextTx
=
tx
.
blockingTransaction
)
!=
null
&&
tx
.
getStatus
()
==
Transaction
.
STATUS_OPEN
;
tx
=
nextTx
)
{
if
(
nextTx
==
this
)
{
return
true
;
}
}
return
false
;
}
private
synchronized
boolean
waitForThisToEnd
(
long
millis
)
{
long
until
=
System
.
currentTimeMillis
()
+
millis
;
int
status
;
while
((
status
=
getStatus
())
!=
STATUS_CLOSED
&&
status
!=
STATUS_ROLLING_BACK
)
{
long
dur
=
until
-
System
.
currentTimeMillis
();
if
(
dur
<=
0
)
{
return
false
;
}
try
{
wait
(
dur
);
}
catch
(
InterruptedException
ex
)
{
return
false
;
}
}
return
true
;
}
/**
* Remove the map.
*
...
...
h2/src/main/org/h2/mvstore/tx/TransactionMap.java
浏览文件 @
5ac45917
...
...
@@ -181,7 +181,7 @@ public class TransactionMap<K, V> {
* @throws IllegalStateException if a lock timeout occurs
*/
public
V
remove
(
K
key
)
{
return
set
(
key
,
null
);
return
set
(
key
,
(
V
)
null
);
}
/**
...
...
@@ -200,6 +200,36 @@ public class TransactionMap<K, V> {
return
set
(
key
,
value
);
}
/**
* Put the value for the given key if entry for this key does not exist.
* It is atomic equivalent of the following expression:
* contains(key) ? get(k) : put(key, value);
*
* @param key the key
* @param value the new value (not null)
* @return the old value
*/
public
V
putIfAbsent
(
K
key
,
V
value
)
{
DataUtils
.
checkArgument
(
value
!=
null
,
"The value may not be null"
);
TxDecisionMaker
decisionMaker
=
new
TxDecisionMaker
.
PutIfAbsentDecisionMaker
(
map
.
getId
(),
key
,
value
,
transaction
);
return
set
(
key
,
decisionMaker
);
}
/**
* Lock row for the given key.
* <p>
* If the row is locked, this method will retry until the row could be
* updated or until a lock timeout.
*
* @param key the key
* @return the locked value
* @throws IllegalStateException if a lock timeout occurs
*/
public
V
lock
(
K
key
)
{
TxDecisionMaker
decisionMaker
=
new
TxDecisionMaker
.
LockDecisionMaker
(
map
.
getId
(),
key
,
transaction
);
return
set
(
key
,
decisionMaker
);
}
/**
* Update the value for the given key, without adding an undo log entry.
*
...
...
@@ -216,14 +246,39 @@ public class TransactionMap<K, V> {
}
private
V
set
(
K
key
,
V
value
)
{
transaction
.
checkNotClosed
();
V
old
=
get
(
key
);
boolean
ok
=
trySet
(
key
,
value
,
false
);
if
(
ok
)
{
return
old
;
}
throw
DataUtils
.
newIllegalStateException
(
DataUtils
.
ERROR_TRANSACTION_LOCKED
,
"Entry is locked"
);
TxDecisionMaker
decisionMaker
=
new
TxDecisionMaker
.
PutDecisionMaker
(
map
.
getId
(),
key
,
value
,
transaction
);
return
set
(
key
,
decisionMaker
);
}
private
V
set
(
K
key
,
TxDecisionMaker
decisionMaker
)
{
TransactionStore
store
=
transaction
.
store
;
Transaction
blockingTransaction
;
long
sequenceNumWhenStarted
;
VersionedValue
result
;
do
{
sequenceNumWhenStarted
=
store
.
openTransactions
.
get
().
getVersion
();
assert
transaction
.
getBlockerId
()
==
0
;
result
=
map
.
put
(
key
,
VersionedValue
.
DUMMY
,
decisionMaker
);
MVMap
.
Decision
decision
=
decisionMaker
.
getDecision
();
assert
decision
!=
null
;
assert
decision
!=
MVMap
.
Decision
.
REPEAT
;
blockingTransaction
=
decisionMaker
.
getBlockingTransaction
();
if
(
decision
!=
MVMap
.
Decision
.
ABORT
||
blockingTransaction
==
null
)
{
transaction
.
blockingMap
=
null
;
transaction
.
blockingKey
=
null
;
//noinspection unchecked
return
result
==
null
?
null
:
(
V
)
result
.
value
;
}
decisionMaker
.
reset
();
transaction
.
blockingMap
=
map
;
transaction
.
blockingKey
=
key
;
}
while
(
blockingTransaction
.
sequenceNum
>
sequenceNumWhenStarted
||
transaction
.
waitFor
(
blockingTransaction
));
throw
DataUtils
.
newIllegalStateException
(
DataUtils
.
ERROR_TRANSACTION_LOCKED
,
"Map entry <{0}> with key <{1}> and value {2} is locked by tx {3} and can not be updated by tx {4} within allocated time interval {5} ms."
,
map
.
getName
(),
key
,
result
,
blockingTransaction
.
transactionId
,
transaction
.
transactionId
,
transaction
.
timeoutMillis
);
}
/**
...
...
@@ -302,54 +357,17 @@ public class TransactionMap<K, V> {
return
false
;
}
}
}
else
{
current
=
map
.
get
(
key
);
}
VersionedValue
newValue
=
new
VersionedValue
(
TransactionStore
.
getOperationId
(
transaction
.
transactionId
,
transaction
.
getLogId
()),
value
);
if
(
current
==
null
)
{
// a new value
transaction
.
log
(
mapId
,
key
,
current
);
VersionedValue
old
=
map
.
putIfAbsent
(
key
,
newValue
);
if
(
old
!=
null
)
{
transaction
.
logUndo
();
return
false
;
}
return
true
;
}
long
id
=
current
.
operationId
;
if
(
id
==
0
)
{
// committed
transaction
.
log
(
mapId
,
key
,
current
);
// the transaction is committed:
// overwrite the value
if
(!
map
.
replace
(
key
,
current
,
newValue
))
{
// somebody else was faster
transaction
.
logUndo
();
return
false
;
}
return
true
;
}
int
tx
=
TransactionStore
.
getTransactionId
(
current
.
operationId
);
if
(
tx
==
transaction
.
transactionId
)
{
// added or updated by this transaction
transaction
.
log
(
mapId
,
key
,
current
);
if
(!
map
.
replace
(
key
,
current
,
newValue
))
{
// strange, somebody overwrote the value
// even though the change was not committed
transaction
.
logUndo
();
return
false
;
}
try
{
set
(
key
,
value
);
return
true
;
}
catch
(
IllegalStateException
e
)
{
return
false
;
}
// the transaction is not yet committed
return
false
;
}
/**
* Get the
value for the given key at the time when this map was opened
.
* Get the
effective value for the given key
.
*
* @param key the key
* @return the value or null
...
...
@@ -666,7 +684,7 @@ public class TransactionMap<K, V> {
@Override
public
void
remove
()
{
throw
DataUtils
.
newUnsupportedOperationException
(
"Remov
ing
is not supported"
);
"Remov
al
is not supported"
);
}
};
}
...
...
@@ -776,7 +794,7 @@ public class TransactionMap<K, V> {
@Override
public
final
void
remove
()
{
throw
DataUtils
.
newUnsupportedOperationException
(
"Remov
ing
is not supported"
);
"Remov
al
is not supported"
);
}
}
}
h2/src/main/org/h2/mvstore/tx/TransactionStore.java
浏览文件 @
5ac45917
...
...
@@ -31,6 +31,11 @@ public class TransactionStore {
*/
final
MVStore
store
;
/**
* Default blocked transaction timeout
*/
private
final
long
timeoutMillis
;
/**
* The persisted map of prepared transactions.
* Key: transactionId, value: [ status, name ].
...
...
@@ -115,7 +120,7 @@ public class TransactionStore {
* @param store the store
*/
public
TransactionStore
(
MVStore
store
)
{
this
(
store
,
new
ObjectDataType
());
this
(
store
,
new
ObjectDataType
()
,
0
);
}
/**
...
...
@@ -123,10 +128,12 @@ public class TransactionStore {
*
* @param store the store
* @param dataType the data type for map keys and values
* @param timeoutMillis lock aquisition timeout in milliseconds, 0 means no wait
*/
public
TransactionStore
(
MVStore
store
,
DataType
dataType
)
{
public
TransactionStore
(
MVStore
store
,
DataType
dataType
,
long
timeoutMillis
)
{
this
.
store
=
store
;
this
.
dataType
=
dataType
;
this
.
timeoutMillis
=
timeoutMillis
;
preparedTransactions
=
store
.
openMap
(
"openTransactions"
,
new
MVMap
.
Builder
<
Integer
,
Object
[]>());
DataType
oldValueType
=
new
VersionedValue
.
Type
(
dataType
);
...
...
@@ -188,7 +195,7 @@ public class TransactionStore {
assert
lastUndoKey
!=
null
;
assert
getTransactionId
(
lastUndoKey
)
==
transactionId
;
long
logId
=
getLogId
(
lastUndoKey
)
+
1
;
registerTransaction
(
transactionId
,
status
,
name
,
logId
,
listener
);
registerTransaction
(
transactionId
,
status
,
name
,
logId
,
timeoutMillis
,
0
,
listener
);
key
=
undoLog
.
ceilingKey
(
nextTxUndoKey
);
}
}
...
...
@@ -247,7 +254,7 @@ public class TransactionStore {
* @param operationId the operation id
* @return the transaction id
*/
static
int
getTransactionId
(
long
operationId
)
{
public
static
int
getTransactionId
(
long
operationId
)
{
return
(
int
)
(
operationId
>>>
LOG_ID_BITS
);
}
...
...
@@ -302,23 +309,28 @@ public class TransactionStore {
* @return the transaction
*/
public
Transaction
begin
()
{
return
begin
(
RollbackListener
.
NONE
);
return
begin
(
RollbackListener
.
NONE
,
timeoutMillis
,
0
);
}
/**
* Begin a new transaction.
* @param listener to be notified in case of a rollback
*
* @param timeoutMillis to wait for a blocking transaction
* @param ownerId of the owner (Session?) to be reported by getBlockerId
* @return the transaction
*/
public
Transaction
begin
(
RollbackListener
listener
)
{
Transaction
transaction
=
registerTransaction
(
0
,
Transaction
.
STATUS_OPEN
,
null
,
0
,
listener
);
public
Transaction
begin
(
RollbackListener
listener
,
long
timeoutMillis
,
int
ownerId
)
{
if
(
timeoutMillis
<=
0
)
{
timeoutMillis
=
this
.
timeoutMillis
;
}
Transaction
transaction
=
registerTransaction
(
0
,
Transaction
.
STATUS_OPEN
,
null
,
0
,
timeoutMillis
,
ownerId
,
listener
);
return
transaction
;
}
private
Transaction
registerTransaction
(
int
txId
,
int
status
,
String
name
,
long
logId
,
RollbackListener
listener
)
{
long
timeoutMillis
,
int
ownerId
,
RollbackListener
listener
)
{
int
transactionId
;
long
sequenceNo
;
boolean
success
;
...
...
@@ -343,7 +355,8 @@ public class TransactionStore {
success
=
openTransactions
.
compareAndSet
(
original
,
clone
);
}
while
(!
success
);
Transaction
transaction
=
new
Transaction
(
this
,
transactionId
,
sequenceNo
,
status
,
name
,
logId
,
listener
);
Transaction
transaction
=
new
Transaction
(
this
,
transactionId
,
sequenceNo
,
status
,
name
,
logId
,
timeoutMillis
,
ownerId
,
listener
);
assert
transactions
.
get
(
transactionId
)
==
null
;
transactions
.
set
(
transactionId
,
transaction
);
...
...
@@ -441,6 +454,7 @@ public class TransactionStore {
// made by this transaction, to be considered as "committed"
flipCommittingTransactionsBit
(
transactionId
,
true
);
CommitDecisionMaker
commitDecisionMaker
=
new
CommitDecisionMaker
();
// TODO could synchronize on blocks (100 at a time or so)
rwLock
.
writeLock
().
lock
();
try
{
...
...
@@ -461,18 +475,8 @@ public class TransactionStore {
MVMap
<
Object
,
VersionedValue
>
map
=
openMap
(
mapId
);
if
(
map
!=
null
)
{
// might be null if map was removed later
Object
key
=
op
[
1
];
VersionedValue
value
=
map
.
get
(
key
);
if
(
value
!=
null
)
{
// only commit (remove/update) value if we've reached
// last undoLog entry for a given key
if
(
value
.
operationId
==
undoKey
)
{
if
(
value
.
value
==
null
)
{
map
.
remove
(
key
);
}
else
{
map
.
put
(
key
,
new
VersionedValue
(
0L
,
value
.
value
));
}
}
}
commitDecisionMaker
.
setUndoKey
(
undoKey
);
map
.
operate
(
key
,
null
,
commitDecisionMaker
);
}
undoLog
.
remove
(
undoKey
);
}
...
...
@@ -576,11 +580,13 @@ public class TransactionStore {
* and amount of unsaved changes is sizable.
*
* @param t the transaction
* @param hasChanges false for R/O tx
* @param hasChanges true if transaction has done any updated
* (even if fully rolled back),
* false if just data access
*/
synchronized
void
endTransaction
(
Transaction
t
,
boolean
hasChanges
)
{
t
.
closeIt
();
int
txId
=
t
.
transactionId
;
t
.
setStatus
(
Transaction
.
STATUS_CLOSED
);
assert
transactions
.
get
(
txId
)
==
t
:
transactions
.
get
(
txId
)
+
" != "
+
t
;
transactions
.
set
(
txId
,
null
);
...
...
@@ -632,35 +638,12 @@ public class TransactionStore {
// TODO could synchronize on blocks (100 at a time or so)
rwLock
.
writeLock
().
lock
();
try
{
int
transactionId
=
t
.
getId
();
RollbackDecisionMaker
decisionMaker
=
new
RollbackDecisionMaker
(
this
,
transactionId
,
toLogId
,
t
.
listener
);
for
(
long
logId
=
maxLogId
-
1
;
logId
>=
toLogId
;
logId
--)
{
Long
undoKey
=
getOperationId
(
t
.
getId
(),
logId
);
Object
[]
op
=
undoLog
.
get
(
undoKey
);
if
(
op
==
null
)
{
// partially rolled back: load previous
undoKey
=
undoLog
.
floorKey
(
undoKey
);
if
(
undoKey
==
null
||
getTransactionId
(
undoKey
)
!=
t
.
getId
())
{
break
;
}
logId
=
getLogId
(
undoKey
)
+
1
;
continue
;
}
int
mapId
=
((
Integer
)
op
[
0
]).
intValue
();
MVMap
<
Object
,
VersionedValue
>
map
=
openMap
(
mapId
);
if
(
map
!=
null
)
{
Object
key
=
op
[
1
];
VersionedValue
oldValue
=
(
VersionedValue
)
op
[
2
];
VersionedValue
currentValue
;
if
(
oldValue
==
null
)
{
// this transaction added the value
currentValue
=
map
.
remove
(
key
);
}
else
{
// this transaction updated the value
currentValue
=
map
.
put
(
key
,
oldValue
);
}
t
.
listener
.
onRollback
(
map
,
key
,
currentValue
,
oldValue
);
}
undoLog
.
remove
(
undoKey
);
Long
undoKey
=
getOperationId
(
transactionId
,
logId
);
undoLog
.
operate
(
undoKey
,
null
,
decisionMaker
);
decisionMaker
.
reset
();
}
}
finally
{
rwLock
.
writeLock
().
unlock
();
...
...
@@ -886,5 +869,4 @@ public class TransactionStore {
}
}
}
h2/src/main/org/h2/mvstore/tx/TxDecisionMaker.java
0 → 100644
浏览文件 @
5ac45917
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package
org
.
h2
.
mvstore
.
tx
;
import
org.h2.mvstore.MVMap
;
import
static
org
.
h2
.
mvstore
.
tx
.
TransactionStore
.
getTransactionId
;
/**
* Class TxDecisionMaker.
*
* @author <a href='mailto:andrei.tokar@gmail.com'>Andrei Tokar</a>
*/
public
abstract
class
TxDecisionMaker
extends
MVMap
.
DecisionMaker
<
VersionedValue
>
{
private
final
int
mapId
;
private
final
Object
key
;
final
Object
value
;
private
final
Transaction
transaction
;
long
undoKey
;
private
Transaction
blockingTransaction
;
private
MVMap
.
Decision
decision
;
TxDecisionMaker
(
int
mapId
,
Object
key
,
Object
value
,
Transaction
transaction
)
{
this
.
mapId
=
mapId
;
this
.
key
=
key
;
this
.
value
=
value
;
this
.
transaction
=
transaction
;
}
@Override
public
MVMap
.
Decision
decide
(
VersionedValue
existingValue
,
VersionedValue
providedValue
)
{
assert
decision
==
null
;
assert
providedValue
!=
null
;
long
id
;
int
blockingId
;
// if map does not have that entry yet
if
(
existingValue
==
null
||
// or entry is a committed one
(
id
=
existingValue
.
getOperationId
())
==
0
||
// or it came from the same transaction
isThisTransaction
(
blockingId
=
getTransactionId
(
id
)))
{
logIt
(
existingValue
);
decision
=
MVMap
.
Decision
.
PUT
;
}
else
if
(
isCommitted
(
blockingId
))
{
// Condition above means that entry belongs to a committing transaction.
// We assume that we are looking at the final value for this transaction,
// and if it's not the case, then it will fail later,
// because a tree root has definitely been changed.
logIt
(
existingValue
.
value
==
null
?
null
:
new
VersionedValue
(
0L
,
existingValue
.
value
));
decision
=
MVMap
.
Decision
.
PUT
;
}
else
if
(
fetchTransaction
(
blockingId
)
==
null
)
{
// condition above means transaction has been committed/rplled back and closed by now
return
setDecision
(
MVMap
.
Decision
.
REPEAT
);
}
else
{
// this entry comes from a different transaction, and this transaction is not committed yet
// should wait on blockingTransaction that was determined earlier
decision
=
MVMap
.
Decision
.
ABORT
;
}
return
decision
;
}
@Override
public
final
void
reset
()
{
if
(
decision
!=
null
&&
decision
!=
MVMap
.
Decision
.
ABORT
&&
decision
!=
MVMap
.
Decision
.
REPEAT
)
{
// positive decision has been made already and undo record created,
// but map was updated afterwards and undo record deletion required
transaction
.
logUndo
();
}
blockingTransaction
=
null
;
decision
=
null
;
}
public
final
MVMap
.
Decision
getDecision
()
{
return
decision
;
}
final
Transaction
getBlockingTransaction
()
{
return
blockingTransaction
;
}
final
void
logIt
(
VersionedValue
value
)
{
undoKey
=
transaction
.
log
(
mapId
,
key
,
value
);
}
final
boolean
isThisTransaction
(
int
transactionId
)
{
return
transactionId
==
transaction
.
transactionId
;
}
final
boolean
isCommitted
(
int
transactionId
)
{
return
transaction
.
store
.
committingTransactions
.
get
().
get
(
transactionId
);
}
final
Transaction
fetchTransaction
(
int
transactionId
)
{
return
(
blockingTransaction
=
transaction
.
store
.
getTransaction
(
transactionId
));
}
final
MVMap
.
Decision
setDecision
(
MVMap
.
Decision
d
)
{
return
decision
=
d
;
}
@Override
public
final
String
toString
()
{
return
"txdm "
+
transaction
.
transactionId
;
}
public
static
class
PutDecisionMaker
extends
TxDecisionMaker
{
PutDecisionMaker
(
int
mapId
,
Object
key
,
Object
value
,
Transaction
transaction
)
{
super
(
mapId
,
key
,
value
,
transaction
);
}
@SuppressWarnings
(
"unchecked"
)
@Override
public
final
VersionedValue
selectValue
(
VersionedValue
existingValue
,
VersionedValue
providedValue
)
{
return
new
VersionedValue
(
undoKey
,
value
);
}
}
public
static
final
class
PutIfAbsentDecisionMaker
extends
PutDecisionMaker
{
PutIfAbsentDecisionMaker
(
int
mapId
,
Object
key
,
Object
value
,
Transaction
transaction
)
{
super
(
mapId
,
key
,
value
,
transaction
);
}
@Override
public
MVMap
.
Decision
decide
(
VersionedValue
existingValue
,
VersionedValue
providedValue
)
{
assert
getDecision
()
==
null
;
assert
providedValue
!=
null
;
int
blockingId
;
// if map does not have that entry yet
if
(
existingValue
==
null
)
{
logIt
(
null
);
return
setDecision
(
MVMap
.
Decision
.
PUT
);
}
else
{
long
id
=
existingValue
.
getOperationId
();
if
(
id
==
0
// entry is a committed one
// or it came from the same transaction
||
isThisTransaction
(
blockingId
=
getTransactionId
(
id
)))
{
if
(
existingValue
.
value
!=
null
)
{
return
setDecision
(
MVMap
.
Decision
.
ABORT
);
}
logIt
(
existingValue
);
return
setDecision
(
MVMap
.
Decision
.
PUT
);
}
else
if
(
isCommitted
(
blockingId
)
&&
existingValue
.
value
==
null
)
{
// entry belongs to a committing transaction
// and therefore will be committed soon
logIt
(
null
);
return
setDecision
(
MVMap
.
Decision
.
PUT
);
}
else
if
(
fetchTransaction
(
blockingId
)
==
null
)
{
// map already has specified key from uncommitted
// at the time transaction, which is closed by now
// we can retry right away
return
setDecision
(
MVMap
.
Decision
.
REPEAT
);
}
else
{
// map already has specified key from uncommitted transaction
// we need to wait for it to close and then try again
return
setDecision
(
MVMap
.
Decision
.
ABORT
);
}
}
}
}
public
static
final
class
LockDecisionMaker
extends
TxDecisionMaker
{
LockDecisionMaker
(
int
mapId
,
Object
key
,
Transaction
transaction
)
{
super
(
mapId
,
key
,
null
,
transaction
);
}
@SuppressWarnings
(
"unchecked"
)
@Override
public
VersionedValue
selectValue
(
VersionedValue
existingValue
,
VersionedValue
providedValue
)
{
return
new
VersionedValue
(
undoKey
,
existingValue
==
null
?
null
:
existingValue
.
value
);
}
}
}
h2/src/main/org/h2/mvstore/tx/VersionedValue.java
浏览文件 @
5ac45917
...
...
@@ -16,6 +16,8 @@ import java.nio.ByteBuffer;
*/
public
class
VersionedValue
{
public
static
final
VersionedValue
DUMMY
=
new
VersionedValue
(
0L
,
new
Object
());
/**
* The operation id.
*/
...
...
h2/src/main/org/h2/table/Table.java
浏览文件 @
5ac45917
...
...
@@ -511,7 +511,7 @@ public abstract class Table extends SchemaObjectBase {
try
{
removeRow
(
session
,
o
);
}
catch
(
DbException
e
)
{
if
(
e
.
getErrorCode
()
==
ErrorCode
.
CONCURRENT_UPDATE_1
)
{
if
(
e
.
getErrorCode
()
==
ErrorCode
.
CONCURRENT_UPDATE_1
||
e
.
getErrorCode
()
==
ErrorCode
.
ROW_NOT_FOUND_WHEN_DELETING_1
)
{
session
.
rollbackTo
(
rollback
,
false
);
session
.
startStatementWithinTransaction
();
rollback
=
session
.
setSavepoint
();
...
...
h2/src/test/org/h2/test/TestBase.java
浏览文件 @
5ac45917
...
...
@@ -151,7 +151,7 @@ public abstract class TestBase {
}
}
catch
(
Throwable
e
)
{
println
(
"FAIL "
+
e
.
toString
());
logError
(
"FAIL "
+
e
.
toString
(),
e
);
logError
(
"FAIL
("
+
conf
+
")
"
+
e
.
toString
(),
e
);
if
(
config
.
stopOnError
)
{
throw
new
AssertionError
(
"ERROR"
);
}
...
...
h2/src/test/org/h2/test/mvcc/TestMvcc4.java
浏览文件 @
5ac45917
...
...
@@ -11,7 +11,6 @@ import java.sql.ResultSet;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.sql.Timestamp
;
import
java.util.Map
;
import
java.util.concurrent.CountDownLatch
;
import
org.h2.test.TestBase
;
...
...
@@ -42,7 +41,8 @@ public class TestMvcc4 extends TestBase {
}
private
void
testSelectForUpdateAndUpdateConcurrency
()
throws
SQLException
{
Connection
setup
=
getConnection
(
"mvcc4"
);
deleteDb
(
"mvcc4"
);
Connection
setup
=
getConnection
(
"mvcc4;MULTI_THREADED=TRUE"
);
setup
.
setAutoCommit
(
false
);
{
...
...
@@ -82,7 +82,13 @@ public class TestMvcc4 extends TestBase {
ps
.
executeQuery
().
next
();
executedUpdate
.
countDown
();
waitForThreadToBlockOnDB
(
mainThread
);
// interrogate new "blocker_id" metatable field instead of
// relying on stacktraces!? to determine when session is blocking
PreparedStatement
stmt
=
c2
.
prepareStatement
(
"SELECT * FROM INFORMATION_SCHEMA.SESSIONS WHERE BLOCKER_ID = SESSION_ID()"
);
ResultSet
resultSet
;
do
{
resultSet
=
stmt
.
executeQuery
();
}
while
(!
resultSet
.
next
());
c2
.
commit
();
c2
.
close
();
...
...
@@ -103,7 +109,7 @@ public class TestMvcc4 extends TestBase {
// for lock case.
PreparedStatement
ps
=
c1
.
prepareStatement
(
"UPDATE test SET lastUpdated = ?"
);
ps
.
setTimestamp
(
1
,
new
Timestamp
(
System
.
currentTimeMillis
()));
ps
.
executeUpdate
(
);
assertEquals
(
2
,
ps
.
executeUpdate
()
);
c1
.
commit
();
c1
.
close
();
...
...
@@ -114,44 +120,12 @@ public class TestMvcc4 extends TestBase {
ps
=
verify
.
prepareStatement
(
"SELECT COUNT(*) FROM test"
);
ResultSet
rs
=
ps
.
executeQuery
();
assertTrue
(
rs
.
next
());
assert
True
(
rs
.
getInt
(
1
)
==
2
);
assert
Equals
(
2
,
rs
.
getInt
(
1
)
);
verify
.
commit
();
verify
.
close
();
setup
.
close
();
}
/**
* Wait for the given thread to block on synchronizing on the database
* object.
*
* @param t the thread
*/
void
waitForThreadToBlockOnDB
(
Thread
t
)
{
while
(
true
)
{
// sleep the first time through the loop so we give the main thread
// a chance
try
{
Thread
.
sleep
(
20
);
}
catch
(
InterruptedException
e1
)
{
// ignore
}
// TODO must not use getAllStackTraces, as the method names are
// implementation details
Map
<
Thread
,
StackTraceElement
[]>
threadMap
=
Thread
.
getAllStackTraces
();
StackTraceElement
[]
elements
=
threadMap
.
get
(
t
);
if
(
elements
!=
null
&&
elements
.
length
>
1
&&
(
config
.
multiThreaded
?
"sleep"
.
equals
(
elements
[
0
]
.
getMethodName
())
:
"wait"
.
equals
(
elements
[
0
]
.
getMethodName
()))
&&
"filterConcurrentUpdate"
.
equals
(
elements
[
1
].
getMethodName
()))
{
return
;
}
}
}
}
...
...
h2/src/test/org/h2/test/mvcc/TestMvccMultiThreaded.java
浏览文件 @
5ac45917
...
...
@@ -7,11 +7,11 @@ package org.h2.test.mvcc;
import
java.sql.Connection
;
import
java.sql.ResultSet
;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.util.ArrayList
;
import
java.util.concurrent.C
ountDownLatch
;
import
java.util.concurrent.C
yclicBarrier
;
import
org.h2.api.ErrorCode
;
import
org.h2.test.TestAll
;
import
org.h2.test.TestBase
;
import
org.h2.util.Task
;
...
...
@@ -36,11 +36,8 @@ public class TestMvccMultiThreaded extends TestBase {
}
testConcurrentSelectForUpdate
();
testMergeWithUniqueKeyViolation
();
// not supported currently
if
(!
config
.
multiThreaded
)
{
testConcurrentMerge
();
testConcurrentUpdate
();
}
testConcurrentMerge
();
testConcurrentUpdate
();
}
private
void
testConcurrentSelectForUpdate
()
throws
Exception
{
...
...
@@ -55,21 +52,11 @@ public class TestMvccMultiThreaded extends TestBase {
Task
task
=
new
Task
()
{
@Override
public
void
call
()
throws
Exception
{
Connection
conn
=
getConnection
(
getTestName
());
Statement
stat
=
conn
.
createStatement
();
try
{
try
(
Connection
conn
=
getConnection
(
getTestName
()))
{
Statement
stat
=
conn
.
createStatement
();
while
(!
stop
)
{
try
{
stat
.
execute
(
"select * from test where id=1 for update"
);
}
catch
(
SQLException
e
)
{
int
errorCode
=
e
.
getErrorCode
();
assertTrue
(
e
.
getMessage
(),
errorCode
==
ErrorCode
.
DEADLOCK_1
||
errorCode
==
ErrorCode
.
LOCK_TIMEOUT_1
);
}
stat
.
execute
(
"select * from test where id=1 for update"
);
}
}
finally
{
conn
.
close
();
}
}
}.
execute
();
...
...
@@ -113,7 +100,6 @@ public class TestMvccMultiThreaded extends TestBase {
conn
.
createStatement
().
execute
(
"create table test(id int primary key, name varchar)"
);
Task
[]
tasks
=
new
Task
[
len
];
final
boolean
[]
stop
=
{
false
};
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
final
Connection
c
=
connList
[
i
];
c
.
setAutoCommit
(
false
);
...
...
@@ -124,14 +110,12 @@ public class TestMvccMultiThreaded extends TestBase {
c
.
createStatement
().
execute
(
"merge into test values(1, 'x')"
);
c
.
commit
();
Thread
.
sleep
(
1
);
}
}
};
tasks
[
i
].
execute
();
}
Thread
.
sleep
(
1000
);
stop
[
0
]
=
true
;
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
tasks
[
i
].
get
();
}
...
...
@@ -157,18 +141,24 @@ public class TestMvccMultiThreaded extends TestBase {
final
int
count
=
1000
;
Task
[]
tasks
=
new
Task
[
len
];
final
C
ountDownLatch
latch
=
new
CountDownLatch
(
len
);
final
C
yclicBarrier
barrier
=
new
CyclicBarrier
(
len
);
for
(
int
i
=
0
;
i
<
len
;
i
++)
{
final
int
x
=
i
;
// Recent changes exposed a race condition in this test itself.
// Without preliminary record locking, counter will be off.
connList
[
x
].
setAutoCommit
(
false
);
tasks
[
i
]
=
new
Task
()
{
@Override
public
void
call
()
throws
Exception
{
for
(
int
a
=
0
;
a
<
count
;
a
++)
{
ResultSet
rs
=
connList
[
x
].
createStatement
().
executeQuery
(
"select value from test for update"
);
assertTrue
(
rs
.
next
());
connList
[
x
].
createStatement
().
execute
(
"update test set value=value+1"
);
latch
.
countDown
();
latch
.
await
();
connList
[
x
].
commit
();
barrier
.
await
();
}
}
};
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论