Skip to content
项目
群组
代码片段
帮助
正在加载...
帮助
为 GitLab 提交贡献
登录/注册
切换导航
H
h2database
项目
项目
详情
活动
周期分析
仓库
仓库
文件
提交
分支
标签
贡献者
分枝图
比较
统计图
议题
0
议题
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
CI / CD
CI / CD
流水线
作业
计划
统计图
Wiki
Wiki
代码片段
代码片段
成员
成员
折叠边栏
关闭边栏
活动
分枝图
统计图
创建新议题
作业
提交
议题看板
打开侧边栏
Administrator
h2database
Commits
4b0ce7b8
提交
4b0ce7b8
authored
2月 27, 2013
作者:
Thomas Mueller
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
MVStore: support concurrent transactions (two-phase commit)
上级
bf7e2a7a
显示空白字符变更
内嵌
并排
正在显示
4 个修改的文件
包含
472 行增加
和
99 行删除
+472
-99
mvstore.html
h2/src/docsrc/html/mvstore.html
+2
-1
MVStore.java
h2/src/main/org/h2/mvstore/MVStore.java
+1
-1
TransactionStore.java
h2/src/main/org/h2/mvstore/TransactionStore.java
+324
-85
TestTransactionStore.java
h2/src/test/org/h2/test/store/TestTransactionStore.java
+145
-12
没有找到文件。
h2/src/docsrc/html/mvstore.html
浏览文件 @
4b0ce7b8
...
@@ -56,7 +56,7 @@ But it can be also directly within an application, without using JDBC or SQL.
...
@@ -56,7 +56,7 @@ But it can be also directly within an application, without using JDBC or SQL.
</li><li>
Both file-based persistence and in-memory operation are supported.
</li><li>
Both file-based persistence and in-memory operation are supported.
</li><li>
It is intended to be fast, simple to use, and small.
</li><li>
It is intended to be fast, simple to use, and small.
</li><li>
Old versions of the data can be read concurrently with all other operations.
</li><li>
Old versions of the data can be read concurrently with all other operations.
</li><li>
Transaction are supported.
</li><li>
Transaction are supported
(including concurrent transactions and 2-phase commit)
.
</li><li>
The tool is very modular. It supports pluggable data types / serialization,
</li><li>
The tool is very modular. It supports pluggable data types / serialization,
pluggable map implementations (B-tree, R-tree, concurrent B-tree currently), BLOB storage,
pluggable map implementations (B-tree, R-tree, concurrent B-tree currently), BLOB storage,
and a file system abstraction to support encrypted files and zip files.
and a file system abstraction to support encrypted files and zip files.
...
@@ -227,6 +227,7 @@ and the entries of a transaction are removed when the transaction is committed).
...
@@ -227,6 +227,7 @@ and the entries of a transaction are removed when the transaction is committed).
The storage overhead of this utility is very small compared to the overhead of a regular transaction log.
The storage overhead of this utility is very small compared to the overhead of a regular transaction log.
The tool supports PostgreSQL style "read committed" transaction isolation.
The tool supports PostgreSQL style "read committed" transaction isolation.
There is no limit on the size of a transaction (the log is not kept in memory).
There is no limit on the size of a transaction (the log is not kept in memory).
The tool supports savepoints, two-phase commit, and other features typically available in a database.
</p>
</p>
<h3
id=
"inMemory"
>
In-Memory Performance and Usage
</h3>
<h3
id=
"inMemory"
>
In-Memory Performance and Usage
</h3>
...
...
h2/src/main/org/h2/mvstore/MVStore.java
浏览文件 @
4b0ce7b8
...
@@ -44,8 +44,8 @@ H:3,...
...
@@ -44,8 +44,8 @@ H:3,...
TODO:
TODO:
- rolling docs review: at convert "Features" to top-level (linked) entries
- rolling docs review: at convert "Features" to top-level (linked) entries
- mvcc with multiple transactions
- mvcc with multiple transactions
- test LZ4
- additional test async write / read algorithm for speed and errors
- additional test async write / read algorithm for speed and errors
- move setters to the builder, except for setRetainVersion, setReuseSpace,
- move setters to the builder, except for setRetainVersion, setReuseSpace,
and settings that are persistent (setStoreVersion)
and settings that are persistent (setStoreVersion)
...
...
h2/src/main/org/h2/mvstore/TransactionStore.java
浏览文件 @
4b0ce7b8
...
@@ -6,24 +6,36 @@
...
@@ -6,24 +6,36 @@
*/
*/
package
org
.
h2
.
mvstore
;
package
org
.
h2
.
mvstore
;
import
java.util.ArrayList
;
import
java.util.HashMap
;
import
java.util.List
;
import
java.util.Map
;
import
java.util.Map
;
import
org.h2.util.New
;
/**
/**
* A store that supports concurrent transactions.
* A store that supports concurrent transactions.
*/
*/
public
class
TransactionStore
{
public
class
TransactionStore
{
private
static
final
String
LAST_TRANSACTION_ID
=
"lastTransactionId"
;
/**
/**
* The store.
* The store.
*/
*/
final
MVStore
store
;
final
MVStore
store
;
/**
/**
* The map of open transaction.
* The
persisted
map of open transaction.
* Key: transactionId, value:
baseVersion
.
* Key: transactionId, value:
[ status, nameÊ]
.
*/
*/
final
MVMap
<
Long
,
Long
>
openTransactions
;
final
MVMap
<
Long
,
Object
[]>
openTransactions
;
/**
* The map of open transaction objects.
* Key: transactionId, value: transaction object.
*/
final
HashMap
<
Long
,
Transaction
>
openTransactionMap
=
New
.
hashMap
();
/**
/**
* The undo log.
* The undo log.
...
@@ -41,6 +53,8 @@ public class TransactionStore {
...
@@ -41,6 +53,8 @@ public class TransactionStore {
*/
*/
private
final
MVMap
<
String
,
String
>
settings
;
private
final
MVMap
<
String
,
String
>
settings
;
private
long
lastTransactionIdStored
;
private
long
lastTransactionId
;
private
long
lastTransactionId
;
/**
/**
...
@@ -52,33 +66,59 @@ public class TransactionStore {
...
@@ -52,33 +66,59 @@ public class TransactionStore {
this
.
store
=
store
;
this
.
store
=
store
;
settings
=
store
.
openMap
(
"settings"
);
settings
=
store
.
openMap
(
"settings"
);
openTransactions
=
store
.
openMap
(
"openTransactions"
,
openTransactions
=
store
.
openMap
(
"openTransactions"
,
new
MVMapConcurrent
.
Builder
<
Long
,
Long
>());
new
MVMapConcurrent
.
Builder
<
Long
,
Object
[]
>());
//
TODO one undo log per transaction to speed up commit
//
commit could be faster if we have one undo log per transaction,
//
(alternative: add a range delete operation for maps)
//
or a range delete operation for maps
undoLog
=
store
.
openMap
(
"undoLog"
,
undoLog
=
store
.
openMap
(
"undoLog"
,
new
MVMapConcurrent
.
Builder
<
long
[],
Object
[]>());
new
MVMapConcurrent
.
Builder
<
long
[],
Object
[]>());
init
();
init
();
}
}
private
void
init
()
{
private
void
init
()
{
String
s
=
settings
.
get
(
"lastTransaction"
);
String
s
=
settings
.
get
(
LAST_TRANSACTION_ID
);
if
(
s
!=
null
)
{
if
(
s
!=
null
)
{
lastTransactionId
=
Long
.
parseLong
(
s
);
lastTransactionId
=
Long
.
parseLong
(
s
);
lastTransactionIdStored
=
lastTransactionId
;
}
}
Long
t
=
openTransactions
.
lastKey
();
Long
lastKey
=
openTransactions
.
lastKey
();
if
(
t
!=
null
)
{
if
(
lastKey
!=
null
&&
lastKey
.
longValue
()
>
lastTransactionId
)
{
if
(
t
.
longValue
()
>
lastTransactionId
)
{
throw
DataUtils
.
newIllegalStateException
(
"Last transaction not stored"
);
throw
DataUtils
.
newIllegalStateException
(
"Last transaction not stored"
);
}
}
// TODO rollback all old, stored transactions (if there are any)
Cursor
<
Long
>
cursor
=
openTransactions
.
keyIterator
(
null
);
while
(
cursor
.
hasNext
())
{
long
id
=
cursor
.
next
();
Object
[]
data
=
openTransactions
.
get
(
id
);
int
status
=
(
Integer
)
data
[
0
];
String
name
=
(
String
)
data
[
1
];
long
[]
next
=
{
id
+
1
,
0
};
long
[]
last
=
undoLog
.
floorKey
(
next
);
if
(
last
==
null
)
{
// no entry
}
else
if
(
last
[
0
]
==
id
)
{
Transaction
t
=
new
Transaction
(
this
,
id
,
status
,
name
,
last
[
1
]);
openTransactionMap
.
put
(
id
,
t
);
}
}
}
}
}
/**
* Get the list of currently open transactions.
*
* @return the list of transactions
*/
public
synchronized
List
<
Transaction
>
getOpenTransactions
()
{
ArrayList
<
Transaction
>
list
=
New
.
arrayList
();
list
.
addAll
(
openTransactionMap
.
values
());
return
list
;
}
/**
/**
* Close the transaction store.
* Close the transaction store.
*/
*/
public
synchronized
void
close
()
{
public
synchronized
void
close
()
{
settings
.
put
(
"lastTransaction"
,
""
+
lastTransactionId
);
// to avoid losing transaction ids (so just cosmetic)
settings
.
put
(
LAST_TRANSACTION_ID
,
""
+
lastTransactionId
);
store
.
commit
();
}
}
/**
/**
...
@@ -87,14 +127,43 @@ public class TransactionStore {
...
@@ -87,14 +127,43 @@ public class TransactionStore {
* @return the transaction
* @return the transaction
*/
*/
public
synchronized
Transaction
begin
()
{
public
synchronized
Transaction
begin
()
{
long
baseVersion
=
store
.
getCurrentVersion
();
store
.
incrementVersion
();
store
.
incrementVersion
();
long
transactionId
=
lastTransactionId
++;
long
transactionId
=
lastTransactionId
++;
if
(
lastTransactionId
%
32
==
0
)
{
if
(
lastTransactionId
>
lastTransactionIdStored
)
{
settings
.
put
(
"lastTransaction"
,
""
+
lastTransactionId
+
32
);
lastTransactionIdStored
+=
32
;
settings
.
put
(
LAST_TRANSACTION_ID
,
""
+
lastTransactionIdStored
);
}
int
status
=
Transaction
.
STATUS_OPEN
;
Object
[]
v
=
{
status
,
null
};
openTransactions
.
put
(
transactionId
,
v
);
Transaction
t
=
new
Transaction
(
this
,
transactionId
,
status
,
null
,
0
);
openTransactionMap
.
put
(
transactionId
,
t
);
return
t
;
}
/**
* Prepare a transaction.
*
* @param transactionId the transaction id
*/
void
prepare
(
long
transactionId
)
{
Object
[]
old
=
openTransactions
.
get
(
transactionId
);
Object
[]
v
=
{
Transaction
.
STATUS_PREPARED
,
old
[
1
]
};
openTransactions
.
put
(
transactionId
,
v
);
store
.
commit
();
}
}
openTransactions
.
put
(
transactionId
,
baseVersion
);
return
new
Transaction
(
this
,
transactionId
);
/**
* Set the name of a transaction.
*
* @param transactionId the transaction id
* @param name the new name
*/
void
setTransactionName
(
long
transactionId
,
String
name
)
{
Object
[]
old
=
openTransactions
.
get
(
transactionId
);
Object
[]
v
=
{
old
[
0
],
name
};
openTransactions
.
put
(
transactionId
,
v
);
store
.
commit
();
}
}
/**
/**
...
@@ -104,27 +173,30 @@ public class TransactionStore {
...
@@ -104,27 +173,30 @@ public class TransactionStore {
* @param maxLogId the last log id
* @param maxLogId the last log id
*/
*/
void
commit
(
long
transactionId
,
long
maxLogId
)
{
void
commit
(
long
transactionId
,
long
maxLogId
)
{
// TODO commit should be much faster
store
.
incrementVersion
();
store
.
incrementVersion
();
for
(
long
logId
=
0
;
logId
<
maxLogId
;
logId
++)
{
for
(
long
logId
=
0
;
logId
<
maxLogId
;
logId
++)
{
Object
[]
op
=
undoLog
.
get
(
new
long
[]
{
Object
[]
op
=
undoLog
.
get
(
new
long
[]
{
transactionId
,
logId
});
transactionId
,
logId
});
int
mapId
=
((
Integer
)
op
[
1
]).
intValue
();
int
opType
=
(
Integer
)
op
[
0
];
if
(
opType
==
Transaction
.
OP_REMOVE
)
{
int
mapId
=
(
Integer
)
op
[
1
];
Map
<
String
,
String
>
meta
=
store
.
getMetaMap
();
Map
<
String
,
String
>
meta
=
store
.
getMetaMap
();
String
m
=
meta
.
get
(
"map."
+
mapId
);
String
m
=
meta
.
get
(
"map."
+
mapId
);
String
mapName
=
DataUtils
.
parseMap
(
m
).
get
(
"name"
);
String
mapName
=
DataUtils
.
parseMap
(
m
).
get
(
"name"
);
MVMap
<
Object
,
Object
[]>
map
=
store
.
openMap
(
mapName
);
MVMap
<
Object
,
Object
[]>
map
=
store
.
openMap
(
mapName
);
Object
key
=
op
[
2
];
Object
key
=
op
[
2
];
Object
[]
value
=
map
.
get
(
key
);
Object
[]
value
=
map
.
get
(
key
);
if
(
value
==
null
)
{
// possibly the entry was added later on
//
already removed
//
so we have to check
}
else
if
(
value
[
2
]
==
null
)
{
if
(
value
[
2
]
==
null
)
{
// remove the value
// remove the value
map
.
remove
(
key
);
map
.
remove
(
key
);
}
}
}
undoLog
.
remove
(
logId
);
undoLog
.
remove
(
logId
);
}
}
openTransactions
.
remove
(
transactionId
);
openTransactions
.
remove
(
transactionId
);
openTransactionMap
.
remove
(
transactionId
);
store
.
commit
();
store
.
commit
();
}
}
...
@@ -137,6 +209,7 @@ public class TransactionStore {
...
@@ -137,6 +209,7 @@ public class TransactionStore {
void
rollback
(
long
transactionId
,
long
maxLogId
)
{
void
rollback
(
long
transactionId
,
long
maxLogId
)
{
rollbackTo
(
transactionId
,
maxLogId
,
0
);
rollbackTo
(
transactionId
,
maxLogId
,
0
);
openTransactions
.
remove
(
transactionId
);
openTransactions
.
remove
(
transactionId
);
openTransactionMap
.
remove
(
transactionId
);
store
.
commit
();
store
.
commit
();
}
}
...
@@ -188,6 +261,26 @@ public class TransactionStore {
...
@@ -188,6 +261,26 @@ public class TransactionStore {
*/
*/
public
static
class
Transaction
{
public
static
class
Transaction
{
/**
* The status of an open transaction.
*/
public
static
final
int
STATUS_OPEN
=
0
;
/**
* The status of a prepared transaction.
*/
public
static
final
int
STATUS_PREPARED
=
1
;
/**
* The status of a closed transaction (committed or rolled back).
*/
public
static
final
int
STATUS_CLOSED
=
2
;
/**
* The operation type for changes in a map.
*/
static
final
int
OP_REMOVE
=
0
,
OP_ADD
=
1
,
OP_SET
=
2
;
/**
/**
* The transaction store.
* The transaction store.
*/
*/
...
@@ -198,12 +291,56 @@ public class TransactionStore {
...
@@ -198,12 +291,56 @@ public class TransactionStore {
*/
*/
final
long
transactionId
;
final
long
transactionId
;
private
int
status
;
private
String
name
;
private
long
logId
;
private
long
logId
;
private
boolean
closed
;
Transaction
(
TransactionStore
store
,
long
transactionId
)
{
Transaction
(
TransactionStore
store
,
long
transactionId
,
int
status
,
String
name
,
long
logId
)
{
this
.
store
=
store
;
this
.
store
=
store
;
this
.
transactionId
=
transactionId
;
this
.
transactionId
=
transactionId
;
this
.
status
=
status
;
this
.
name
=
name
;
this
.
logId
=
logId
;
}
/**
* Get the transaction id.
*
* @return the transaction id
*/
public
long
getId
()
{
return
transactionId
;
}
/**
* Get the transaction status.
*
* @return the status
*/
public
int
getStatus
()
{
return
status
;
}
/**
* Set the name of the transaction.
*
* @param name the new name
*/
public
void
setName
(
String
name
)
{
checkOpen
();
store
.
setTransactionName
(
transactionId
,
name
);
this
.
name
=
name
;
}
/**
* Get the name of the transaction.
*
* @return name the name
*/
public
String
getName
()
{
return
name
;
}
}
/**
/**
...
@@ -212,6 +349,7 @@ public class TransactionStore {
...
@@ -212,6 +349,7 @@ public class TransactionStore {
* @return the savepoint id
* @return the savepoint id
*/
*/
public
long
setSavepoint
()
{
public
long
setSavepoint
()
{
checkOpen
();
store
.
store
.
incrementVersion
();
store
.
store
.
incrementVersion
();
return
logId
;
return
logId
;
}
}
...
@@ -219,18 +357,18 @@ public class TransactionStore {
...
@@ -219,18 +357,18 @@ public class TransactionStore {
/**
/**
* Add a log entry.
* Add a log entry.
*
*
* @param
baseVersion the old version
* @param
opType the operation type
* @param mapId the map id
* @param mapId the map id
* @param key the key
* @param key the key
*/
*/
void
log
(
long
baseVersion
,
int
mapId
,
Object
key
)
{
void
log
(
int
opType
,
int
mapId
,
Object
key
)
{
long
[]
undoKey
=
{
transactionId
,
logId
++
};
long
[]
undoKey
=
{
transactionId
,
logId
++
};
Object
[]
log
=
new
Object
[]
{
baseVersion
,
mapId
,
key
};
Object
[]
log
=
new
Object
[]
{
opType
,
mapId
,
key
};
store
.
undoLog
.
put
(
undoKey
,
log
);
store
.
undoLog
.
put
(
undoKey
,
log
);
}
}
/**
/**
* Open a data map.
* Open a data map
where reads are always up to date
.
*
*
* @param <K> the key type
* @param <K> the key type
* @param <V> the value type
* @param <V> the value type
...
@@ -238,40 +376,73 @@ public class TransactionStore {
...
@@ -238,40 +376,73 @@ public class TransactionStore {
* @return the transaction map
* @return the transaction map
*/
*/
public
<
K
,
V
>
TransactionMap
<
K
,
V
>
openMap
(
String
name
)
{
public
<
K
,
V
>
TransactionMap
<
K
,
V
>
openMap
(
String
name
)
{
return
new
TransactionMap
<
K
,
V
>(
this
,
name
);
checkOpen
();
return
new
TransactionMap
<
K
,
V
>(
this
,
name
,
-
1
);
}
/**
* Open a data map where reads are based on the specified version / savepoint.
*
* @param <K> the key type
* @param <V> the value type
* @param name the name of the map
* @param readVersion the version used for reading
* @return the transaction map
*/
public
<
K
,
V
>
TransactionMap
<
K
,
V
>
openMap
(
String
name
,
long
readVersion
)
{
checkOpen
();
// TODO read from a stable version of the data within
// one 'statement'
return
new
TransactionMap
<
K
,
V
>(
this
,
name
,
readVersion
);
}
/**
* Roll back to the given savepoint. This is only allowed if the
* transaction is open.
*
* @param savepointId the savepoint id
*/
public
void
rollbackToSavepoint
(
long
savepointId
)
{
checkOpen
();
store
.
rollbackTo
(
transactionId
,
this
.
logId
,
savepointId
);
this
.
logId
=
savepointId
;
}
/**
* Prepare the transaction. Afterwards, the transaction can only be
* committed or rolled back.
*/
public
void
prepare
()
{
checkOpen
();
store
.
prepare
(
transactionId
);
status
=
STATUS_PREPARED
;
}
}
/**
/**
* Commit the transaction. Afterwards, this transaction is closed.
* Commit the transaction. Afterwards, this transaction is closed.
*/
*/
public
void
commit
()
{
public
void
commit
()
{
closed
=
true
;
if
(
status
!=
STATUS_CLOSED
)
{
store
.
commit
(
transactionId
,
logId
);
store
.
commit
(
transactionId
,
logId
);
status
=
STATUS_CLOSED
;
}
}
}
/**
/**
* Roll the transaction back. Afterwards, this transaction is closed.
* Roll the transaction back. Afterwards, this transaction is closed.
*/
*/
public
void
rollback
()
{
public
void
rollback
()
{
closed
=
true
;
if
(
status
!=
STATUS_CLOSED
)
{
store
.
rollback
(
transactionId
,
logId
);
store
.
rollback
(
transactionId
,
logId
);
status
=
STATUS_CLOSED
;
}
}
/**
* Roll back to the given savepoint.
*
* @param savepointId the savepoint id
*/
public
void
rollbackToSavepoint
(
long
savepointId
)
{
store
.
rollbackTo
(
transactionId
,
this
.
logId
,
savepointId
);
this
.
logId
=
savepointId
;
}
}
/**
/**
* Check whether this transaction is still open.
* Check whether this transaction is still open.
*/
*/
void
checkOpen
()
{
void
checkOpen
()
{
if
(
closed
)
{
if
(
status
!=
STATUS_OPEN
)
{
throw
DataUtils
.
newIllegalStateException
(
"Transaction is closed"
);
throw
DataUtils
.
newIllegalStateException
(
"Transaction is closed"
);
}
}
}
}
...
@@ -288,18 +459,31 @@ public class TransactionStore {
...
@@ -288,18 +459,31 @@ public class TransactionStore {
private
Transaction
transaction
;
private
Transaction
transaction
;
private
final
int
mapId
;
/**
/**
* The
newest version of the data
.
* The
map used for writing (the latest version)
.
* Key: key.
* Key: key
the key of the data
.
* Value: { transactionId, oldVersion, value }
* Value: { transactionId, oldVersion, value }
*/
*/
private
final
MVMap
<
K
,
Object
[]>
map
;
private
final
MVMap
<
K
,
Object
[]>
mapWrite
;
private
final
int
mapId
;
TransactionMap
(
Transaction
transaction
,
String
name
)
{
/**
* The map used for reading (possibly an older version).
* Key: key the key of the data.
* Value: { transactionId, oldVersion, value }
*/
private
final
MVMap
<
K
,
Object
[]>
mapRead
;
TransactionMap
(
Transaction
transaction
,
String
name
,
long
readVersion
)
{
this
.
transaction
=
transaction
;
this
.
transaction
=
transaction
;
map
=
transaction
.
store
.
store
.
openMap
(
name
);
mapWrite
=
transaction
.
store
.
store
.
openMap
(
name
);
mapId
=
map
.
getId
();
mapId
=
mapWrite
.
getId
();
if
(
readVersion
>=
0
)
{
mapRead
=
mapWrite
.
openVersion
(
readVersion
);
}
else
{
mapRead
=
mapWrite
;
}
}
}
/**
/**
...
@@ -307,10 +491,10 @@ public class TransactionStore {
...
@@ -307,10 +491,10 @@ public class TransactionStore {
*
*
* @return the size
* @return the size
*/
*/
public
long
s
ize
()
{
public
long
getS
ize
()
{
// TODO this method is very slow
// TODO this method is very slow
long
size
=
0
;
long
size
=
0
;
Cursor
<
K
>
cursor
=
map
.
keyIterator
(
null
);
Cursor
<
K
>
cursor
=
map
Read
.
keyIterator
(
null
);
while
(
cursor
.
hasNext
())
{
while
(
cursor
.
hasNext
())
{
K
key
=
cursor
.
next
();
K
key
=
cursor
.
next
();
if
(
get
(
key
)
!=
null
)
{
if
(
get
(
key
)
!=
null
)
{
...
@@ -325,20 +509,41 @@ public class TransactionStore {
...
@@ -325,20 +509,41 @@ public class TransactionStore {
}
}
/**
/**
* Update the value for the given key. If the row is locked, this method
* Remove an entry.
* <p>
* If the row is locked, this method
* will retry until the row could be updated or until a lock timeout.
*
* @param key the key
* @throws IllegalStateException if a lock timeout occurs
*/
public
V
remove
(
K
key
)
{
return
set
(
key
,
null
);
}
/**
* Update the value for the given key.
* <p>
* If the row is locked, this method
* will retry until the row could be updated or until a lock timeout.
* will retry until the row could be updated or until a lock timeout.
*
*
* @param key the key
* @param key the key
* @param value the new value (null to remove the row)
* @param value the new value (null to remove the row)
* @throws IllegalStateException if a lock timeout occurs
* @throws IllegalStateException if a lock timeout occurs
*/
*/
public
void
put
(
K
key
,
V
value
)
{
public
V
put
(
K
key
,
V
value
)
{
DataUtils
.
checkArgument
(
value
!=
null
,
"The value may not be null"
);
return
set
(
key
,
value
);
}
public
V
set
(
K
key
,
V
value
)
{
checkOpen
();
checkOpen
();
long
start
=
0
;
long
start
=
0
;
while
(
true
)
{
while
(
true
)
{
boolean
ok
=
tryPut
(
key
,
value
);
V
old
=
get
(
key
);
boolean
ok
=
trySet
(
key
,
value
);
if
(
ok
)
{
if
(
ok
)
{
return
;
return
old
;
}
}
// an uncommitted transaction:
// an uncommitted transaction:
// wait until it is committed, or until the lock timeout
// wait until it is committed, or until the lock timeout
...
@@ -364,24 +569,58 @@ public class TransactionStore {
...
@@ -364,24 +569,58 @@ public class TransactionStore {
}
}
/**
/**
* Try to update the value for the given key. This will fail if the row
* Try to remove the value for the given key.
* is not locked by another transaction (that means, if another open
* <p>
* transaction added or updated the row).
* This will fail if the row is locked by another transaction (that
* means, if another open transaction changed the row).
*
* @param key the key
* @return whether the entry could be removed
*/
public
boolean
tryRemove
(
K
key
)
{
return
trySet
(
key
,
null
);
}
/**
* Try to update the value for the given key.
* <p>
* This will fail if the row is locked by another transaction (that
* means, if another open transaction changed the row).
*
*
* @param key the key
* @param key the key
* @param value the new value
* @param value the new value
* @return whether the
value
could be updated
* @return whether the
entry
could be updated
*/
*/
public
boolean
tryPut
(
K
key
,
V
value
)
{
public
boolean
tryPut
(
K
key
,
V
value
)
{
Object
[]
current
=
map
.
get
(
key
);
DataUtils
.
checkArgument
(
value
!=
null
,
"The value may not be null"
);
return
trySet
(
key
,
value
);
}
private
boolean
trySet
(
K
key
,
V
value
)
{
Object
[]
current
=
mapWrite
.
get
(
key
);
long
oldVersion
=
transaction
.
store
.
store
.
getCurrentVersion
()
-
1
;
long
oldVersion
=
transaction
.
store
.
store
.
getCurrentVersion
()
-
1
;
int
opType
;
if
(
current
==
null
||
current
[
2
]
==
null
)
{
if
(
value
==
null
)
{
// remove a removed value
opType
=
Transaction
.
OP_SET
;
}
else
{
opType
=
Transaction
.
OP_ADD
;
}
}
else
{
if
(
value
==
null
)
{
opType
=
Transaction
.
OP_REMOVE
;
}
else
{
opType
=
Transaction
.
OP_SET
;
}
}
Object
[]
newValue
=
{
transaction
.
transactionId
,
oldVersion
,
value
};
Object
[]
newValue
=
{
transaction
.
transactionId
,
oldVersion
,
value
};
if
(
current
==
null
)
{
if
(
current
==
null
)
{
// a new value
// a new value
newValue
[
1
]
=
null
;
newValue
[
1
]
=
null
;
Object
[]
old
=
map
.
putIfAbsent
(
key
,
newValue
);
Object
[]
old
=
map
Write
.
putIfAbsent
(
key
,
newValue
);
if
(
old
==
null
)
{
if
(
old
==
null
)
{
transaction
.
log
(
o
ldVersion
,
mapId
,
key
);
transaction
.
log
(
o
pType
,
mapId
,
key
);
return
true
;
return
true
;
}
}
return
false
;
return
false
;
...
@@ -389,13 +628,13 @@ public class TransactionStore {
...
@@ -389,13 +628,13 @@ public class TransactionStore {
long
tx
=
((
Long
)
current
[
0
]).
longValue
();
long
tx
=
((
Long
)
current
[
0
]).
longValue
();
if
(
tx
==
transaction
.
transactionId
)
{
if
(
tx
==
transaction
.
transactionId
)
{
// added or updated by this transaction
// added or updated by this transaction
if
(
map
.
replace
(
key
,
current
,
newValue
))
{
if
(
map
Write
.
replace
(
key
,
current
,
newValue
))
{
if
(
current
[
1
]
==
null
)
{
if
(
current
[
1
]
==
null
)
{
transaction
.
log
(
o
ldVersion
,
mapId
,
key
);
transaction
.
log
(
o
pType
,
mapId
,
key
);
}
else
{
}
else
{
long
c
=
(
Long
)
current
[
1
];
long
c
=
(
Long
)
current
[
1
];
if
(
c
!=
oldVersion
)
{
if
(
c
!=
oldVersion
)
{
transaction
.
log
(
o
ldVersion
,
mapId
,
key
);
transaction
.
log
(
o
pType
,
mapId
,
key
);
}
}
}
}
return
true
;
return
true
;
...
@@ -405,12 +644,12 @@ public class TransactionStore {
...
@@ -405,12 +644,12 @@ public class TransactionStore {
return
false
;
return
false
;
}
}
// added or updated by another transaction
// added or updated by another transaction
Long
base
=
transaction
.
store
.
openTransactions
.
get
(
tx
);
boolean
open
=
transaction
.
store
.
openTransactions
.
containsKey
(
tx
);
if
(
base
==
null
)
{
if
(
!
open
)
{
// the transaction is committed:
// the transaction is committed:
// overwrite the value
// overwrite the value
if
(
map
.
replace
(
key
,
current
,
newValue
))
{
if
(
map
Write
.
replace
(
key
,
current
,
newValue
))
{
transaction
.
log
(
o
ldVersion
,
mapId
,
key
);
transaction
.
log
(
o
pType
,
mapId
,
key
);
return
true
;
return
true
;
}
}
// somebody else was faster
// somebody else was faster
...
@@ -430,7 +669,7 @@ public class TransactionStore {
...
@@ -430,7 +669,7 @@ public class TransactionStore {
public
public
V
get
(
K
key
)
{
V
get
(
K
key
)
{
checkOpen
();
checkOpen
();
MVMap
<
K
,
Object
[]>
m
=
map
;
MVMap
<
K
,
Object
[]>
m
=
map
Read
;
while
(
true
)
{
while
(
true
)
{
Object
[]
data
=
m
.
get
(
key
);
Object
[]
data
=
m
.
get
(
key
);
long
tx
;
long
tx
;
...
@@ -444,8 +683,8 @@ public class TransactionStore {
...
@@ -444,8 +683,8 @@ public class TransactionStore {
return
(
V
)
data
[
2
];
return
(
V
)
data
[
2
];
}
}
// added or updated by another transaction
// added or updated by another transaction
Long
base
=
transaction
.
store
.
openTransactions
.
get
(
tx
);
boolean
open
=
transaction
.
store
.
openTransactions
.
containsKey
(
tx
);
if
(
base
==
null
)
{
if
(
!
open
)
{
// it is committed
// it is committed
return
(
V
)
data
[
2
];
return
(
V
)
data
[
2
];
}
}
...
@@ -456,7 +695,7 @@ public class TransactionStore {
...
@@ -456,7 +695,7 @@ public class TransactionStore {
return
null
;
return
null
;
}
}
long
oldVersion
=
(
Long
)
data
[
1
];
long
oldVersion
=
(
Long
)
data
[
1
];
m
=
map
.
openVersion
(
oldVersion
);
m
=
map
Write
.
openVersion
(
oldVersion
);
}
}
}
}
...
...
h2/src/test/org/h2/test/store/TestTransactionStore.java
浏览文件 @
4b0ce7b8
...
@@ -12,12 +12,14 @@ import java.sql.ResultSet;
...
@@ -12,12 +12,14 @@ import java.sql.ResultSet;
import
java.sql.SQLException
;
import
java.sql.SQLException
;
import
java.sql.Statement
;
import
java.sql.Statement
;
import
java.util.ArrayList
;
import
java.util.ArrayList
;
import
java.util.List
;
import
java.util.Random
;
import
java.util.Random
;
import
org.h2.mvstore.MVStore
;
import
org.h2.mvstore.MVStore
;
import
org.h2.mvstore.TransactionStore
;
import
org.h2.mvstore.TransactionStore
;
import
org.h2.mvstore.TransactionStore.Transaction
;
import
org.h2.mvstore.TransactionStore.Transaction
;
import
org.h2.mvstore.TransactionStore.TransactionMap
;
import
org.h2.mvstore.TransactionStore.TransactionMap
;
import
org.h2.store.fs.FileUtils
;
import
org.h2.test.TestBase
;
import
org.h2.test.TestBase
;
import
org.h2.util.New
;
import
org.h2.util.New
;
...
@@ -36,12 +38,143 @@ public class TestTransactionStore extends TestBase {
...
@@ -36,12 +38,143 @@ public class TestTransactionStore extends TestBase {
}
}
public
void
test
()
throws
Exception
{
public
void
test
()
throws
Exception
{
// testMultiStatement();
testTwoPhaseCommit
();
testSavepoint
();
testSavepoint
();
testConcurrentTransactionsReadCommitted
();
testConcurrentTransactionsReadCommitted
();
testSingleConnection
();
testSingleConnection
();
testCompareWithPostgreSQL
();
testCompareWithPostgreSQL
();
}
}
/**
* Tests behavior when used for a sequence of SQL statements. Each statement
* uses a savepoint. Within a statement, a change by the statement itself is
* not seen; the change is only seen when the statement finished.
*/
// private void testMultiStatement() {
// MVStore s = MVStore.open(null);
// TransactionStore ts = new TransactionStore(s);
// Transaction tx;
// TransactionMap<String, String> m;
// long startUpdate;
//
// tx = ts.begin();
//
// // start of statement
// // insert into test(id, name) values(1, 'Hello'), (2, 'World')
// startUpdate = tx.setSavepoint();
// m = tx.openMap("test", startUpdate);
// TODO putIfAbsent
// m.put("1", "Hello");
// m.put("2", "World");
// // not seen yet
// assertNull(m.get("1"));
//
// // start of statement
// startUpdate = tx.setSavepoint();
// // update test set primaryKey = primaryKey + 1
// m = tx.openMap("test", startUpdate);
//
// for (Cursor)
//
// tx.commit();
//
//
//
// m.put("2", "World");
// m.put("1", "Hallo");
// m.remove("2");
// m.put("3", "!");
// long logId = tx.setSavepoint();
// m.put("1", "Hi");
// m.put("2", ".");
// m.remove("3");
// tx.rollbackToSavepoint(logId);
// assertEquals("Hallo", m.get("1"));
// assertNull(m.get("2"));
// assertEquals("!", m.get("3"));
// tx.rollback();
//
// tx = ts.begin();
// m = tx.openMap("test");
// assertNull(m.get("1"));
// assertNull(m.get("2"));
// assertNull(m.get("3"));
//
// ts.close();
// s.close();
// }
private
void
testTwoPhaseCommit
()
throws
Exception
{
String
fileName
=
getBaseDir
()
+
"/testTwoPhaseCommit.h3"
;
FileUtils
.
delete
(
fileName
);
MVStore
s
;
TransactionStore
ts
;
Transaction
tx
;
Transaction
txOld
;
TransactionMap
<
String
,
String
>
m
;
List
<
Transaction
>
list
;
s
=
MVStore
.
open
(
fileName
);
ts
=
new
TransactionStore
(
s
);
tx
=
ts
.
begin
();
assertEquals
(
null
,
tx
.
getName
());
tx
.
setName
(
"first transaction"
);
assertEquals
(
"first transaction"
,
tx
.
getName
());
assertEquals
(
0
,
tx
.
getId
());
assertEquals
(
Transaction
.
STATUS_OPEN
,
tx
.
getStatus
());
m
=
tx
.
openMap
(
"test"
);
m
.
put
(
"1"
,
"Hello"
);
list
=
ts
.
getOpenTransactions
();
assertEquals
(
1
,
list
.
size
());
txOld
=
list
.
get
(
0
);
assertTrue
(
tx
==
txOld
);
s
.
commit
();
ts
.
close
();
s
.
close
();
s
=
MVStore
.
open
(
fileName
);
ts
=
new
TransactionStore
(
s
);
tx
=
ts
.
begin
();
assertEquals
(
1
,
tx
.
getId
());
m
=
tx
.
openMap
(
"test"
);
assertEquals
(
null
,
m
.
get
(
"1"
));
list
=
ts
.
getOpenTransactions
();
assertEquals
(
2
,
list
.
size
());
txOld
=
list
.
get
(
0
);
assertEquals
(
0
,
txOld
.
getId
());
assertEquals
(
Transaction
.
STATUS_OPEN
,
txOld
.
getStatus
());
assertEquals
(
"first transaction"
,
txOld
.
getName
());
txOld
.
prepare
();
assertEquals
(
Transaction
.
STATUS_PREPARED
,
txOld
.
getStatus
());
txOld
=
list
.
get
(
1
);
assertEquals
(
1
,
txOld
.
getId
());
assertNull
(
txOld
.
getName
());
assertEquals
(
Transaction
.
STATUS_OPEN
,
txOld
.
getStatus
());
txOld
.
rollback
();
s
.
commit
();
s
.
close
();
s
=
MVStore
.
open
(
fileName
);
ts
=
new
TransactionStore
(
s
);
tx
=
ts
.
begin
();
m
=
tx
.
openMap
(
"test"
);
// TransactionStore was not closed, so we lost some ids
assertEquals
(
33
,
tx
.
getId
());
list
=
ts
.
getOpenTransactions
();
assertEquals
(
2
,
list
.
size
());
txOld
=
list
.
get
(
0
);
assertEquals
(
0
,
txOld
.
getId
());
assertEquals
(
Transaction
.
STATUS_PREPARED
,
txOld
.
getStatus
());
assertEquals
(
"first transaction"
,
txOld
.
getName
());
txOld
.
commit
();
assertEquals
(
"Hello"
,
m
.
get
(
"1"
));
s
.
close
();
FileUtils
.
delete
(
fileName
);
}
private
void
testSavepoint
()
throws
Exception
{
private
void
testSavepoint
()
throws
Exception
{
MVStore
s
=
MVStore
.
open
(
null
);
MVStore
s
=
MVStore
.
open
(
null
);
TransactionStore
ts
=
new
TransactionStore
(
s
);
TransactionStore
ts
=
new
TransactionStore
(
s
);
...
@@ -53,12 +186,12 @@ public class TestTransactionStore extends TestBase {
...
@@ -53,12 +186,12 @@ public class TestTransactionStore extends TestBase {
m
.
put
(
"1"
,
"Hello"
);
m
.
put
(
"1"
,
"Hello"
);
m
.
put
(
"2"
,
"World"
);
m
.
put
(
"2"
,
"World"
);
m
.
put
(
"1"
,
"Hallo"
);
m
.
put
(
"1"
,
"Hallo"
);
m
.
put
(
"2"
,
null
);
m
.
remove
(
"2"
);
m
.
put
(
"3"
,
"!"
);
m
.
put
(
"3"
,
"!"
);
long
logId
=
tx
.
setSavepoint
();
long
logId
=
tx
.
setSavepoint
();
m
.
put
(
"1"
,
"Hi"
);
m
.
put
(
"1"
,
"Hi"
);
m
.
put
(
"2"
,
"."
);
m
.
put
(
"2"
,
"."
);
m
.
put
(
"3"
,
null
);
m
.
remove
(
"3"
);
tx
.
rollbackToSavepoint
(
logId
);
tx
.
rollbackToSavepoint
(
logId
);
assertEquals
(
"Hallo"
,
m
.
get
(
"1"
));
assertEquals
(
"Hallo"
,
m
.
get
(
"1"
));
assertNull
(
m
.
get
(
"2"
));
assertNull
(
m
.
get
(
"2"
));
...
@@ -140,8 +273,8 @@ public class TestTransactionStore extends TestBase {
...
@@ -140,8 +273,8 @@ public class TestTransactionStore extends TestBase {
size
++;
size
++;
}
}
buff
.
append
(
'\n'
);
buff
.
append
(
'\n'
);
if
(
size
!=
map
.
s
ize
())
{
if
(
size
!=
map
.
getS
ize
())
{
assertEquals
(
size
,
map
.
s
ize
());
assertEquals
(
size
,
map
.
getS
ize
());
}
}
}
}
int
x
=
r
.
nextInt
(
rowCount
);
int
x
=
r
.
nextInt
(
rowCount
);
...
@@ -191,13 +324,13 @@ public class TestTransactionStore extends TestBase {
...
@@ -191,13 +324,13 @@ public class TestTransactionStore extends TestBase {
try
{
try
{
int
c
=
stat
.
executeUpdate
(
"delete from test where id = "
+
x
);
int
c
=
stat
.
executeUpdate
(
"delete from test where id = "
+
x
);
if
(
c
==
1
)
{
if
(
c
==
1
)
{
map
.
put
(
x
,
null
);
map
.
remove
(
x
);
}
else
{
}
else
{
assertNull
(
map
.
get
(
x
));
assertNull
(
map
.
get
(
x
));
}
}
}
catch
(
SQLException
e
)
{
}
catch
(
SQLException
e
)
{
assertTrue
(
map
.
get
(
x
)
!=
null
);
assertTrue
(
map
.
get
(
x
)
!=
null
);
assertFalse
(
map
.
try
Put
(
x
,
null
));
assertFalse
(
map
.
try
Remove
(
x
));
// PostgreSQL needs to rollback
// PostgreSQL needs to rollback
buff
.
append
(
" -> rollback"
);
buff
.
append
(
" -> rollback"
);
stat
.
getConnection
().
rollback
();
stat
.
getConnection
().
rollback
();
...
@@ -245,7 +378,7 @@ public class TestTransactionStore extends TestBase {
...
@@ -245,7 +378,7 @@ public class TestTransactionStore extends TestBase {
m1
=
tx1
.
openMap
(
"test"
);
m1
=
tx1
.
openMap
(
"test"
);
m1
.
put
(
"1"
,
"Hello"
);
m1
.
put
(
"1"
,
"Hello"
);
m1
.
put
(
"2"
,
"World"
);
m1
.
put
(
"2"
,
"World"
);
m1
.
put
(
"3"
,
null
);
m1
.
remove
(
"3"
);
tx1
.
commit
();
tx1
.
commit
();
// start new transaction to read old data
// start new transaction to read old data
...
@@ -256,7 +389,7 @@ public class TestTransactionStore extends TestBase {
...
@@ -256,7 +389,7 @@ public class TestTransactionStore extends TestBase {
tx1
=
ts
.
begin
();
tx1
=
ts
.
begin
();
m1
=
tx1
.
openMap
(
"test"
);
m1
=
tx1
.
openMap
(
"test"
);
m1
.
put
(
"1"
,
"Hallo"
);
m1
.
put
(
"1"
,
"Hallo"
);
m1
.
put
(
"2"
,
null
);
m1
.
remove
(
"2"
);
m1
.
put
(
"3"
,
"!"
);
m1
.
put
(
"3"
,
"!"
);
assertEquals
(
"Hello"
,
m2
.
get
(
"1"
));
assertEquals
(
"Hello"
,
m2
.
get
(
"1"
));
...
@@ -274,13 +407,13 @@ public class TestTransactionStore extends TestBase {
...
@@ -274,13 +407,13 @@ public class TestTransactionStore extends TestBase {
m1
.
put
(
"2"
,
"World"
);
m1
.
put
(
"2"
,
"World"
);
assertNull
(
m2
.
get
(
"2"
));
assertNull
(
m2
.
get
(
"2"
));
assertFalse
(
m2
.
try
Put
(
"2"
,
null
));
assertFalse
(
m2
.
try
Remove
(
"2"
));
assertFalse
(
m2
.
tryPut
(
"2"
,
"Welt"
));
assertFalse
(
m2
.
tryPut
(
"2"
,
"Welt"
));
tx2
=
ts
.
begin
();
tx2
=
ts
.
begin
();
m2
=
tx2
.
openMap
(
"test"
);
m2
=
tx2
.
openMap
(
"test"
);
assertNull
(
m2
.
get
(
"2"
));
assertNull
(
m2
.
get
(
"2"
));
m1
.
put
(
"2"
,
null
);
m1
.
remove
(
"2"
);
assertNull
(
m2
.
get
(
"2"
));
assertNull
(
m2
.
get
(
"2"
));
tx1
.
commit
();
tx1
.
commit
();
...
@@ -337,7 +470,7 @@ public class TestTransactionStore extends TestBase {
...
@@ -337,7 +470,7 @@ public class TestTransactionStore extends TestBase {
tx
=
ts
.
begin
();
tx
=
ts
.
begin
();
m
=
tx
.
openMap
(
"test"
);
m
=
tx
.
openMap
(
"test"
);
m
.
put
(
"1"
,
"Hallo"
);
m
.
put
(
"1"
,
"Hallo"
);
m
.
put
(
"2"
,
null
);
m
.
remove
(
"2"
);
m
.
put
(
"3"
,
"!"
);
m
.
put
(
"3"
,
"!"
);
assertEquals
(
"Hallo"
,
m
.
get
(
"1"
));
assertEquals
(
"Hallo"
,
m
.
get
(
"1"
));
assertNull
(
m
.
get
(
"2"
));
assertNull
(
m
.
get
(
"2"
));
...
@@ -353,7 +486,7 @@ public class TestTransactionStore extends TestBase {
...
@@ -353,7 +486,7 @@ public class TestTransactionStore extends TestBase {
tx
=
ts
.
begin
();
tx
=
ts
.
begin
();
m
=
tx
.
openMap
(
"test"
);
m
=
tx
.
openMap
(
"test"
);
m
.
put
(
"1"
,
"Hallo"
);
m
.
put
(
"1"
,
"Hallo"
);
m
.
put
(
"2"
,
null
);
m
.
remove
(
"2"
);
m
.
put
(
"3"
,
"!"
);
m
.
put
(
"3"
,
"!"
);
assertEquals
(
"Hallo"
,
m
.
get
(
"1"
));
assertEquals
(
"Hallo"
,
m
.
get
(
"1"
));
assertNull
(
m
.
get
(
"2"
));
assertNull
(
m
.
get
(
"2"
));
...
...
编写
预览
Markdown
格式
0%
重试
或
添加新文件
添加附件
取消
您添加了
0
人
到此讨论。请谨慎行事。
请先完成此评论的编辑!
取消
请
注册
或者
登录
后发表评论