Unverified 提交 259d09c5 authored 作者: Andrei Tokar's avatar Andrei Tokar 提交者: GitHub

Merge pull request #1376 from h2database/multi-threaded-kernel

TestMultiThreadedKernel is back
......@@ -154,7 +154,6 @@ public class Insert extends Prepared implements ResultTarget {
Mode mode = session.getDatabase().getMode();
int columnLen = columns.length;
for (int x = 0; x < listSize; x++) {
session.startStatementWithinTransaction();
generatedKeys.nextRow();
Row newRow = table.getTemplateRow();
Expression[] expr = list.get(x);
......
......@@ -102,6 +102,7 @@ public class Database implements DataHandler {
private static final ThreadLocal<Session> META_LOCK_DEBUGGING;
private static final ThreadLocal<Database> META_LOCK_DEBUGGING_DB;
private static final ThreadLocal<Throwable> META_LOCK_DEBUGGING_STACK;
private static final Session[] EMPTY_SESSION_ARRAY = new Session[0];
static {
boolean a = false;
......@@ -160,7 +161,7 @@ public class Database implements DataHandler {
private Index metaIdIndex;
private FileLock lock;
private WriterThread writer;
private boolean starting;
private volatile boolean starting;
private TraceSystem traceSystem;
private Trace trace;
private final FileLockMethod fileLockMethod;
......@@ -179,7 +180,7 @@ public class Database implements DataHandler {
private int allowLiterals = Constants.ALLOW_LITERALS_ALL;
private int powerOffCount = initialPowerOffCount;
private int closeDelay;
private volatile int closeDelay;
private DelayedDatabaseCloser delayedCloser;
private volatile boolean closing;
private boolean ignoreCase;
......@@ -786,10 +787,10 @@ public class Database implements DataHandler {
data.create = create;
data.isHidden = true;
data.session = systemSession;
starting = true;
meta = mainSchema.createTable(data);
handleUpgradeIssues();
IndexColumn[] pkCols = IndexColumn.wrap(new Column[] { columnId });
starting = true;
metaIdIndex = meta.addIndex(systemSession, "SYS_ID",
0, pkCols, IndexType.createPrimaryKey(
false, false), true, null);
......@@ -953,12 +954,15 @@ public class Database implements DataHandler {
}
}
private synchronized void addMeta(Session session, DbObject obj) {
private void addMeta(Session session, DbObject obj) {
assert Thread.holdsLock(this);
int id = obj.getId();
if (id > 0 && !starting && !obj.isTemporary()) {
Row r = meta.getTemplateRow();
MetaRecord.populateRowFromDBObject(obj, r);
synchronized (objectIds) {
objectIds.set(id);
}
if (SysProperties.CHECK) {
verifyMetaLocked(session);
}
......@@ -1049,7 +1053,7 @@ public class Database implements DataHandler {
* @param session the session
* @param id the id of the object to remove
*/
public synchronized void removeMeta(Session session, int id) {
public void removeMeta(Session session, int id) {
if (id > 0 && !starting) {
SearchRow r = meta.getTemplateSimpleRow(false);
r.setValue(0, ValueInt.get(id));
......@@ -1075,9 +1079,27 @@ public class Database implements DataHandler {
unlockMeta(session);
}
}
if (isMVStore()) {
// release of the object id has to be postponed until the end of the transaction,
// otherwise it might be re-used prematurely, and it would make
// rollback impossible or lead to MVMaps name collision,
// so until then ids are accumulated within session
session.scheduleDatabaseObjectIdForRelease(id);
} else {
// but PageStore, on the other hand, for reasons unknown to me,
// requires immediate id release
synchronized (this) {
objectIds.clear(id);
}
}
}
}
void releaseDatabaseObjectIds(BitSet idsToRelease) {
synchronized (objectIds) {
objectIds.andNot(idsToRelease);
}
}
@SuppressWarnings("unchecked")
private HashMap<String, DbObject> getMap(int type) {
......@@ -1307,7 +1329,7 @@ public class Database implements DataHandler {
}
private synchronized void closeAllSessionsException(Session except) {
Session[] all = userSessions.toArray(new Session[userSessions.size()]);
Session[] all = userSessions.toArray(EMPTY_SESSION_ARRAY);
for (Session s : all) {
if (s != except) {
try {
......@@ -1573,9 +1595,13 @@ public class Database implements DataHandler {
*
* @return the id
*/
public synchronized int allocateObjectId() {
int i = objectIds.nextClearBit(0);
public int allocateObjectId() {
Object lock = isMVStore() ? objectIds : this;
int i;
synchronized (lock) {
i = objectIds.nextClearBit(0);
objectIds.set(i);
}
return i;
}
......@@ -1763,7 +1789,6 @@ public class Database implements DataHandler {
*/
public void updateMeta(Session session, DbObject obj) {
if (isMVStore()) {
synchronized (this) {
int id = obj.getId();
if (id > 0) {
if (!starting && !obj.isTemporary()) {
......@@ -1775,6 +1800,7 @@ public class Database implements DataHandler {
}
}
// for temporary objects
synchronized (objectIds) {
objectIds.set(id);
}
}
......@@ -2336,7 +2362,7 @@ public class Database implements DataHandler {
return lockMode;
}
public synchronized void setCloseDelay(int value) {
public void setCloseDelay(int value) {
this.closeDelay = value;
}
......
......@@ -7,6 +7,7 @@ package org.h2.engine;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
......@@ -162,6 +163,12 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
private State state = State.INIT;
private long startStatement = -1;
/**
* Set of database object ids to be released at the end of transaction
*/
private BitSet idsToRelease;
public Session(Database database, User user, int id) {
this.database = database;
this.queryTimeout = database.getSettings().maxQueryTimeout;
......@@ -627,6 +634,18 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
return command;
}
/**
* Arranges for the specified database object id to be released
* at the end of the current transaction.
* @param id to be scheduled
*/
void scheduleDatabaseObjectIdForRelease(int id) {
if (idsToRelease == null) {
idsToRelease = new BitSet();
}
idsToRelease.set(id);
}
public Database getDatabase() {
return database;
}
......@@ -746,6 +765,10 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
removeLobMap = null;
}
unlockAll();
if (idsToRelease != null) {
database.releaseDatabaseObjectIds(idsToRelease);
idsToRelease = null;
}
}
/**
......@@ -762,6 +785,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
if (!locks.isEmpty() || needCommit) {
database.commit(this);
}
idsToRelease = null;
cleanTempTables(false);
if (autoCommitAtTransactionEnd) {
autoCommit = true;
......@@ -853,6 +877,7 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
removeTemporaryLobs(false);
cleanTempTables(true);
commit(true); // temp table rempval may have opened new transaction
if (undoLog != null) {
undoLog.clear();
}
......@@ -961,7 +986,17 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
private void cleanTempTables(boolean closeSession) {
if (localTempTables != null && localTempTables.size() > 0) {
if (database.isMVStore()) {
_cleanTempTables(closeSession);
} else {
synchronized (database) {
_cleanTempTables(closeSession);
}
}
}
}
private void _cleanTempTables(boolean closeSession) {
Iterator<Table> it = localTempTables.values().iterator();
while (it.hasNext()) {
Table table = it.next();
......@@ -983,8 +1018,6 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
}
}
}
}
}
public Random getRandom() {
if (random == null) {
......
......@@ -930,7 +930,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
int id = getId();
String mapName = store.getMapName(id);
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_CLOSED, "Map {0}({1}) is closed", mapName, id, store.getPanicException());
DataUtils.ERROR_CLOSED, "Map {0}({1}) is closed. {2}", mapName, id, store.getPanicException());
}
if (readOnly) {
throw DataUtils.newUnsupportedOperationException(
......
......@@ -54,6 +54,7 @@ public class MVPrimaryIndex extends BaseIndex {
ValueDataType keyType = new ValueDataType();
ValueDataType valueType = new ValueDataType(db, sortTypes);
mapName = "table." + getId();
assert db.isStarting() || !db.getStore().getMvStore().getMetaMap().containsKey("name." + mapName);
Transaction t = mvTable.getTransactionBegin();
dataMap = t.openMap(mapName, keyType, valueType);
dataMap.map.setVolatile(!table.isPersistData() || !indexType.isPersistent());
......
......@@ -58,6 +58,7 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex {
// even for unique indexes, as some of the index columns could be null
keyColumns = columns.length + 1;
String mapName = "index." + getId();
assert db.isStarting() || !db.getStore().getMvStore().getMetaMap().containsKey("name." + mapName);
int[] sortTypes = new int[keyColumns];
for (int i = 0; i < columns.length; i++) {
sortTypes[i] = columns[i].sortType;
......
......@@ -809,16 +809,18 @@ public class MVTable extends TableBase {
}
database.getStore().removeTable(this);
super.removeChildrenAndResources(session);
// go backwards because database.removeIndex will
// call table.removeIndex
// remove scan index (at position 0 on the list) last
while (indexes.size() > 1) {
Index index = indexes.get(1);
index.remove(session);
if (index.getName() != null) {
database.removeSchemaObject(session, index);
}
// needed for session temporary indexes
indexes.remove(index);
}
primaryIndex.remove(session);
indexes.clear();
if (SysProperties.CHECK) {
for (SchemaObject obj : database
.getAllSchemaObjects(DbObject.INDEX)) {
......@@ -829,8 +831,6 @@ public class MVTable extends TableBase {
}
}
}
primaryIndex.remove(session);
database.removeMeta(session, getId());
close(session);
invalidate();
}
......
......@@ -415,7 +415,7 @@ public class TransactionStore {
* @param map the map
*/
<K, V> void removeMap(TransactionMap<K, V> map) {
store.removeMap(map.map, true);
store.removeMap(map.map, false);
}
/**
......
......@@ -1153,7 +1153,7 @@ public class MetaTable extends Table {
add(rows, "info.PAGE_COUNT",
Long.toString(pageCount));
add(rows, "info.PAGE_SIZE",
Integer.toString(pageSize));
Integer.toString(mvStore.getPageSplitSize()));
add(rows, "info.CACHE_MAX_SIZE",
Integer.toString(mvStore.getCacheSize()));
add(rows, "info.CACHE_SIZE",
......
......@@ -84,7 +84,7 @@ public class TestScalability implements Database.DatabaseTest {
dbs.add(createDbEntry(id++, "H2", 64, h2Url));
final String mvUrl = "jdbc:h2:./data/mvTest;" +
"LOCK_TIMEOUT=10000;MULTI_THREADED=1;LOCK_MODE=0";
"MULTI_THREADED=1;LOCK_MODE=0";
dbs.add(createDbEntry(id++, "MV", 1, mvUrl));
dbs.add(createDbEntry(id++, "MV", 2, mvUrl));
dbs.add(createDbEntry(id++, "MV", 4, mvUrl));
......
......@@ -57,7 +57,7 @@ public class TestBackup extends TestDb {
return;
}
deleteDb("backup");
String url = getURL("backup;multi_threaded=true", true);
String url = getURL("backup;MULTI_THREADED=TRUE", true);
Connection conn = getConnection(url);
final Statement stat = conn.createStatement();
stat.execute("create table test(id int primary key, name varchar)");
......
......@@ -29,7 +29,7 @@ public class TestLargeBlob extends TestDb {
@Override
public boolean isEnabled() {
if (!config.big || config.memory || config.mvStore || config.networked) {
if (!config.big || config.memory || config.networked) {
return false;
}
return true;
......
......@@ -42,22 +42,13 @@ public class TestMultiThreadedKernel extends TestDb {
TestBase.createCaller().init().test();
}
@Override
public boolean isEnabled() {
if (config.mvStore) { // FIXME can't see why test should not work in MVStore mode
return false;
}
return true;
}
@Override
public void test() throws Exception {
deleteDb("multiThreadedKernel");
testConcurrentRead();
testCache();
deleteDb("multiThreadedKernel");
final String url = getURL("multiThreadedKernel;" +
"DB_CLOSE_DELAY=-1;MULTI_THREADED=1", true);
final String url = getURL("multiThreadedKernel;DB_CLOSE_DELAY=-1", true);
final String user = getUser(), password = getPassword();
int len = 3;
Thread[] threads = new Thread[len];
......@@ -110,8 +101,7 @@ public class TestMultiThreadedKernel extends TestDb {
final int count = 1000;
ArrayList<Task> list = new ArrayList<>(size);
final Connection[] connections = new Connection[count];
String url = getURL("multiThreadedKernel;" +
"MULTI_THREADED=TRUE;CACHE_SIZE=16", true);
String url = getURL("multiThreadedKernel;CACHE_SIZE=16", true);
for (int i = 0; i < size; i++) {
final Connection conn = DriverManager.getConnection(
url, getUser(), getPassword());
......@@ -151,8 +141,7 @@ public class TestMultiThreadedKernel extends TestDb {
final int count = 100;
ArrayList<Task> list = new ArrayList<>(size);
final Connection[] connections = new Connection[count];
String url = getURL("multiThreadedKernel;" +
"MULTI_THREADED=TRUE;CACHE_SIZE=1", true);
String url = getURL("multiThreadedKernel;CACHE_SIZE=1", true);
for (int i = 0; i < size; i++) {
final Connection conn = DriverManager.getConnection(
url, getUser(), getPassword());
......@@ -187,4 +176,8 @@ public class TestMultiThreadedKernel extends TestDb {
}
}
@Override
protected String getURL(String name, boolean admin) {
return super.getURL(name + ";MULTI_THREADED=1;LOCK_TIMEOUT=2000", admin);
}
}
......@@ -58,7 +58,9 @@ public class TestOutOfMemory extends TestDb {
testDatabaseUsingInMemoryFileSystem();
}
System.gc();
if (!config.networked) { // for some unknown reason it fails
testUpdateWhenNearlyOutOfMemory();
}
} finally {
System.gc();
}
......
......@@ -29,7 +29,7 @@ public class RecoverLobTest extends TestDb {
@Override
public boolean isEnabled() {
if (config.mvStore || config.memory) {
if (config.memory) {
return false;
}
return true;
......
......@@ -122,14 +122,6 @@ public class TestMultiThreaded extends TestDb {
}
}
@Override
public boolean isEnabled() {
if (config.mvStore) {
return false;
}
return true;
}
@Override
public void test() throws Exception {
deleteDb("multiThreaded");
......
......@@ -34,7 +34,7 @@ public class TestMultiThreadedKernel extends TestDb implements Runnable {
@Override
public boolean isEnabled() {
if (config.networked || config.mvStore) {
if (config.networked) {
return false;
}
return true;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论