提交 42f3c30e authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: in the multi-threaded mode, NullPointerException and other exceptions could occur.

上级 e598ac9c
...@@ -19,6 +19,7 @@ Change Log ...@@ -19,6 +19,7 @@ Change Log
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>Improve error message when the user specifies an unsupported combination of database settings. <ul><li>Improve error message when the user specifies an unsupported combination of database settings.
</li><li>MVStore: in the multi-threaded mode, NullPointerException and other exceptions could occur.
</li><li>MVStore: some database file could not be compacted due to a bug in </li><li>MVStore: some database file could not be compacted due to a bug in
the bookkeeping of the fill rate. Also, database file were compacted quite slowly. the bookkeeping of the fill rate. Also, database file were compacted quite slowly.
This has been improved; but more changes in this area are expected. This has been improved; but more changes in this area are expected.
...@@ -31,7 +32,7 @@ Change Log ...@@ -31,7 +32,7 @@ Change Log
at session commit (LobStorageMap.removeLob). at session commit (LobStorageMap.removeLob).
</li><li>Remove the "h2.MAX_MEMORY_ROWS_DISTINCT" system property to reduce confusion. </li><li>Remove the "h2.MAX_MEMORY_ROWS_DISTINCT" system property to reduce confusion.
We already have the MAX_MEMORY_ROWS setting which does a very similar thing, and is better documented. We already have the MAX_MEMORY_ROWS setting which does a very similar thing, and is better documented.
</li><li>Issue 554: Web Console in an IFrame not fully supported </li><li>Issue 554: Web Console in an IFrame was not fully supported.
</li></ul> </li></ul>
<h2>Version 1.4.177 Beta (2014-04-12)</h2> <h2>Version 1.4.177 Beta (2014-04-12)</h2>
......
...@@ -240,6 +240,8 @@ public class Database implements DataHandler { ...@@ -240,6 +240,8 @@ public class Database implements DataHandler {
ci.getProperty("LOG", PageStore.LOG_MODE_SYNC); ci.getProperty("LOG", PageStore.LOG_MODE_SYNC);
this.javaObjectSerializerName = this.javaObjectSerializerName =
ci.getProperty("JAVA_OBJECT_SERIALIZER", null); ci.getProperty("JAVA_OBJECT_SERIALIZER", null);
this.multiThreaded =
ci.getProperty("MULTI_THREADED", false);
boolean closeAtVmShutdown = boolean closeAtVmShutdown =
dbSettings.dbCloseOnExit; dbSettings.dbCloseOnExit;
......
...@@ -78,7 +78,7 @@ MVStore: ...@@ -78,7 +78,7 @@ MVStore:
possibly using a callback for serialization possibly using a callback for serialization
- optional pluggable checksum mechanism (per page), which - optional pluggable checksum mechanism (per page), which
requires that everything is a page (including headers) requires that everything is a page (including headers)
- rename setStoreVersion to setDataVersion or similar - rename setStoreVersion to setDataVersion, setSchemaVersion or similar
- temporary file storage - temporary file storage
- simple rollback method (rollback to last committed version) - simple rollback method (rollback to last committed version)
- MVMap to implement SortedMap, then NavigableMap - MVMap to implement SortedMap, then NavigableMap
...@@ -111,6 +111,8 @@ MVStore: ...@@ -111,6 +111,8 @@ MVStore:
- Page: to save memory, combine keys & values into one array - Page: to save memory, combine keys & values into one array
(also children & counts). Maybe remove some other (also children & counts). Maybe remove some other
fields (childrenCount for example) fields (childrenCount for example)
- Support SortedMap for MVMap
- compact: copy whole pages (without having to open all maps)
*/ */
......
...@@ -158,8 +158,11 @@ public class MVTableEngine implements TableEngine { ...@@ -158,8 +158,11 @@ public class MVTableEngine implements TableEngine {
public Store(Database db, MVStore store) { public Store(Database db, MVStore store) {
this.db = db; this.db = db;
this.store = store; this.store = store;
this.transactionStore = new TransactionStore(store, this.transactionStore = new TransactionStore(
new ValueDataType(null, db, null)); store,
new ValueDataType(null, db, null),
db.isMultiThreaded()
);
} }
public MVStore getStore() { public MVStore getStore() {
......
...@@ -28,11 +28,6 @@ import org.h2.util.New; ...@@ -28,11 +28,6 @@ import org.h2.util.New;
*/ */
public class TransactionStore { public class TransactionStore {
/**
* Whether the concurrent maps should be used.
*/
private static final boolean CONCURRENT = false;
/** /**
* The store. * The store.
*/ */
...@@ -56,6 +51,11 @@ public class TransactionStore { ...@@ -56,6 +51,11 @@ public class TransactionStore {
*/ */
final MVMap<Long, Object[]> undoLog; final MVMap<Long, Object[]> undoLog;
/**
* Whether concurrent maps should be used.
*/
private final boolean concurrent;
/** /**
* The map of maps. * The map of maps.
*/ */
...@@ -79,7 +79,7 @@ public class TransactionStore { ...@@ -79,7 +79,7 @@ public class TransactionStore {
* @param store the store * @param store the store
*/ */
public TransactionStore(MVStore store) { public TransactionStore(MVStore store) {
this(store, new ObjectDataType()); this(store, new ObjectDataType(), false);
} }
/** /**
...@@ -87,10 +87,12 @@ public class TransactionStore { ...@@ -87,10 +87,12 @@ public class TransactionStore {
* *
* @param store the store * @param store the store
* @param dataType the data type for map keys and values * @param dataType the data type for map keys and values
* @param concurrent whether concurrent maps should be used
*/ */
public TransactionStore(MVStore store, DataType dataType) { public TransactionStore(MVStore store, DataType dataType, boolean concurrent) {
this.store = store; this.store = store;
this.dataType = dataType; this.dataType = dataType;
this.concurrent = concurrent;
preparedTransactions = store.openMap("openTransactions", preparedTransactions = store.openMap("openTransactions",
new MVMap.Builder<Integer, Object[]>()); new MVMap.Builder<Integer, Object[]>());
VersionedValueType oldValueType = new VersionedValueType(dataType); VersionedValueType oldValueType = new VersionedValueType(dataType);
...@@ -362,7 +364,7 @@ public class TransactionStore { ...@@ -362,7 +364,7 @@ public class TransactionStore {
} }
VersionedValueType vt = new VersionedValueType(valueType); VersionedValueType vt = new VersionedValueType(valueType);
MVMap<K, VersionedValue> map; MVMap<K, VersionedValue> map;
if (CONCURRENT) { if (concurrent) {
MVMapConcurrent.Builder<K, VersionedValue> builder = MVMapConcurrent.Builder<K, VersionedValue> builder =
new MVMapConcurrent.Builder<K, VersionedValue>(). new MVMapConcurrent.Builder<K, VersionedValue>().
keyType(keyType).valueType(vt); keyType(keyType).valueType(vt);
...@@ -1219,9 +1221,16 @@ public class TransactionStore { ...@@ -1219,9 +1221,16 @@ public class TransactionStore {
d = transaction.store.undoLog.get(id); d = transaction.store.undoLog.get(id);
} }
if (d == null) { if (d == null) {
// this entry was committed or rolled back // this entry should be committed or rolled back
// in the meantime (the transaction might still be open) // in the meantime (the transaction might still be open)
data = map.get(key); data = map.get(key);
if (data != null && data.operationId == id) {
// the transaction was not committed correctly
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_CORRUPT,
"The transaction log might be corrupt for key {0}",
key);
}
} else { } else {
data = (VersionedValue) d[2]; data = (VersionedValue) d[2];
} }
...@@ -1232,10 +1241,8 @@ public class TransactionStore { ...@@ -1232,10 +1241,8 @@ public class TransactionStore {
if (id2 != 0) { if (id2 != 0) {
int tx2 = getTransactionId(id2); int tx2 = getTransactionId(id2);
if (tx2 != tx) { if (tx2 != tx) {
// a different transaction // a different transaction - ok
break; } else if (getLogId(id2) > getLogId(id)) {
}
if (getLogId(id2) > getLogId(id)) {
// newer than before // newer than before
break; break;
} }
......
...@@ -127,6 +127,7 @@ import org.h2.test.store.TestSpinLock; ...@@ -127,6 +127,7 @@ import org.h2.test.store.TestSpinLock;
import org.h2.test.store.TestStreamStore; import org.h2.test.store.TestStreamStore;
import org.h2.test.store.TestTransactionStore; import org.h2.test.store.TestTransactionStore;
import org.h2.test.synth.TestBtreeIndex; import org.h2.test.synth.TestBtreeIndex;
import org.h2.test.synth.TestConcurrentUpdate;
import org.h2.test.synth.TestCrashAPI; import org.h2.test.synth.TestCrashAPI;
import org.h2.test.synth.TestDiskFull; import org.h2.test.synth.TestDiskFull;
import org.h2.test.synth.TestFuzzOptimizations; import org.h2.test.synth.TestFuzzOptimizations;
...@@ -704,6 +705,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1` ...@@ -704,6 +705,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
// synth // synth
new TestBtreeIndex().runTest(this); new TestBtreeIndex().runTest(this);
new TestConcurrentUpdate().runTest(this);
new TestDiskFull().runTest(this); new TestDiskFull().runTest(this);
new TestCrashAPI().runTest(this); new TestCrashAPI().runTest(this);
new TestFuzzOptimizations().runTest(this); new TestFuzzOptimizations().runTest(this);
......
/*
* Copyright 2004-2013 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.test.synth;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Random;
import org.h2.api.ErrorCode;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* A concurrent test.
*/
public class TestConcurrentUpdate extends TestBase {
private static final int THREADS = 3;
private static final int ROW_COUNT = 10;
/**
* Run just this test.
*
* @param a ignored
*/
public static void main(String... a) throws Exception {
TestBase t = TestBase.createCaller().init();
t.config.memory = true;
t.test();
}
@Override
public void test() throws Exception {
final String url = getURL("concurrent;MULTI_THREADED=TRUE", true);
Connection conn = getConnection(url);
Statement stat = conn.createStatement();
stat.execute("create table test(id int primary key, name varchar)");
Task[] tasks = new Task[THREADS];
for (int i = 0; i < THREADS; i++) {
final int threadId = i;
Task t = new Task() {
@Override
public void call() throws Exception {
Random r = new Random(threadId);
Connection conn = getConnection(url);
PreparedStatement insert = conn.prepareStatement(
"insert into test values(?, ?)");
PreparedStatement update = conn.prepareStatement(
"update test set name = ? where id = ?");
PreparedStatement delete = conn.prepareStatement(
"delete from test where id = ?");
PreparedStatement select = conn.prepareStatement(
"select * from test where id = ?");
while (!stop) {
try {
int x = r.nextInt(ROW_COUNT);
String data = "x" + r.nextInt(ROW_COUNT);
switch (r.nextInt(3)) {
case 0:
insert.setInt(1, x);
insert.setString(2, data);
insert.execute();
break;
case 1:
update.setString(1, data);
update.setInt(2, x);
update.execute();
break;
case 2:
delete.setInt(1, x);
delete.execute();
break;
case 4:
select.setInt(1, x);
ResultSet rs = select.executeQuery();
while (rs.next()) {
rs.getString(2);
}
break;
}
} catch (SQLException e) {
handleException(e);
}
}
conn.close();
}
};
tasks[i] = t;
t.execute();
}
// test 2 seconds
for (int i = 0; i < 200; i++) {
Thread.sleep(10);
for (Task t : tasks) {
if (t.isFinished()) {
i = 1000;
break;
}
}
}
for (Task t : tasks) {
t.get();
}
conn.close();
}
void handleException(SQLException e) throws SQLException {
switch (e.getErrorCode()) {
case ErrorCode.CONCURRENT_UPDATE_1:
case ErrorCode.DUPLICATE_KEY_1:
case ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1:
case ErrorCode.LOCK_TIMEOUT_1:
break;
default:
throw e;
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论