提交 76c45fe4 authored 作者: Thomas Mueller's avatar Thomas Mueller

With the MVStore option, when using multiple threads that concurrently create…

With the MVStore option, when using multiple threads that concurrently create indexes or tables, it was relatively easy to get a lock timeout on the "SYS" table.
上级 57863eee
......@@ -208,7 +208,7 @@ public class MVSecondaryIndex extends BaseIndex implements MVIndex {
try {
map.put(array, ValueNull.INSTANCE);
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1,
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1,
e, table.getName());
}
if (indexType.isUnique()) {
......@@ -246,7 +246,7 @@ public class MVSecondaryIndex extends BaseIndex implements MVIndex {
getSQL() + ": " + row.getKey());
}
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1,
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1,
e, table.getName());
}
}
......
......@@ -137,7 +137,7 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex, MVIndex {
try {
map.put(key, ValueLong.get(0));
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1,
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1,
e, table.getName());
}
if (indexType.isUnique()) {
......@@ -184,7 +184,7 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex, MVIndex {
getSQL() + ": " + row.getKey());
}
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1,
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1,
e, table.getName());
}
}
......
......@@ -34,7 +34,6 @@ import org.h2.result.SortOrder;
import org.h2.schema.SchemaObject;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.RegularTable;
import org.h2.table.Table;
import org.h2.table.TableBase;
import org.h2.util.MathUtils;
......@@ -81,15 +80,12 @@ public class MVTable extends TableBase {
/**
* Initialize the table.
*
*
* @param session the session
*/
void init(Session session) {
primaryIndex = new MVPrimaryIndex(session.getDatabase(),
this, getId(),
IndexColumn.wrap(getColumns()),
IndexType.createScan(true)
);
primaryIndex = new MVPrimaryIndex(session.getDatabase(), this, getId(),
IndexColumn.wrap(getColumns()), IndexType.createScan(true));
indexes.add(primaryIndex);
}
......@@ -120,10 +116,7 @@ public class MVTable extends TableBase {
if (lockExclusiveSession == session) {
return;
}
synchronized (database) {
if (lockExclusiveSession == session) {
return;
}
synchronized (getLockSyncObject()) {
session.setWaitForLock(this, Thread.currentThread());
waitingSessions.addLast(session);
try {
......@@ -134,6 +127,21 @@ public class MVTable extends TableBase {
}
}
}
/**
* The the object on which to synchronize and wait on. For the
* multi-threaded mode, this is this object, but for non-multi-threaded, it
* is the database, as in this case all operations are synchronized on the
* database object.
*
* @return the lock sync object
*/
private Object getLockSyncObject() {
if (database.isMultiThreaded()) {
return this;
}
return database;
}
private void doLock1(Session session, int lockMode, boolean exclusive) {
traceLock(session, exclusive, "requesting for");
......@@ -150,8 +158,7 @@ public class MVTable extends TableBase {
if (checkDeadlock) {
ArrayList<Session> sessions = checkDeadlock(session, null, null);
if (sessions != null) {
throw DbException.get(
ErrorCode.DEADLOCK_1,
throw DbException.get(ErrorCode.DEADLOCK_1,
getDeadlockDetails(sessions));
}
} else {
......@@ -184,7 +191,7 @@ public class MVTable extends TableBase {
if (sleep == 0) {
sleep = 1;
}
database.wait(sleep);
getLockSyncObject().wait(sleep);
} catch (InterruptedException e) {
// ignore
}
......@@ -209,7 +216,8 @@ public class MVTable extends TableBase {
} else {
if (lockExclusiveSession == null) {
if (lockMode == Constants.LOCK_MODE_READ_COMMITTED) {
if (!database.isMultiThreaded() && !database.isMultiVersion()) {
if (!database.isMultiThreaded() &&
!database.isMultiVersion()) {
// READ_COMMITTED: a read lock is acquired,
// but released immediately after the operation
// is complete.
......@@ -237,20 +245,17 @@ public class MVTable extends TableBase {
for (Session s : sessions) {
Table lock = s.getWaitForLock();
Thread thread = s.getWaitForLockThread();
buff.append("\nSession ").
append(s.toString()).
append(" on thread ").
append(thread.getName()).
append(" is waiting to lock ").
append(lock.toString()).
append(" while locking ");
buff.append("\nSession ").append(s.toString())
.append(" on thread ").append(thread.getName())
.append(" is waiting to lock ").append(lock.toString())
.append(" while locking ");
int i = 0;
for (Table t : s.getLocks()) {
if (i++ > 0) {
buff.append(", ");
}
buff.append(t.toString());
if (t instanceof RegularTable) {
if (t instanceof MVTable) {
if (((MVTable) t).lockExclusiveSession == s) {
buff.append(" (exclusive)");
} else {
......@@ -267,7 +272,7 @@ public class MVTable extends TableBase {
public ArrayList<Session> checkDeadlock(Session session, Session clash,
Set<Session> visited) {
// only one deadlock check at any given time
synchronized (RegularTable.class) {
synchronized (MVTable.class) {
if (clash == null) {
// verification is started
clash = session;
......@@ -300,7 +305,8 @@ public class MVTable extends TableBase {
if (error == null && lockExclusiveSession != null) {
Table t = lockExclusiveSession.getWaitForLock();
if (t != null) {
error = t.checkDeadlock(lockExclusiveSession, clash, visited);
error = t.checkDeadlock(lockExclusiveSession, clash,
visited);
if (error != null) {
error.add(session);
}
......@@ -313,8 +319,8 @@ public class MVTable extends TableBase {
private void traceLock(Session session, boolean exclusive, String s) {
if (traceLock.isDebugEnabled()) {
traceLock.debug("{0} {1} {2} {3}", session.getId(),
exclusive ? "exclusive write lock" : "shared read lock",
s, getName());
exclusive ? "exclusive write lock" : "shared read lock", s,
getName());
}
}
......@@ -335,12 +341,12 @@ public class MVTable extends TableBase {
if (lockExclusiveSession == s) {
lockExclusiveSession = null;
}
if (lockSharedSessions.size() > 0) {
lockSharedSessions.remove(s);
}
synchronized (database) {
synchronized (getLockSyncObject()) {
if (lockSharedSessions.size() > 0) {
lockSharedSessions.remove(s);
}
if (!waitingSessions.isEmpty()) {
database.notifyAll();
getLockSyncObject().notifyAll();
}
}
}
......@@ -348,7 +354,8 @@ public class MVTable extends TableBase {
@Override
public boolean canTruncate() {
if (getCheckForeignKeyConstraints() && database.getReferentialIntegrity()) {
if (getCheckForeignKeyConstraints() &&
database.getReferentialIntegrity()) {
ArrayList<Constraint> constraints = getConstraints();
if (constraints != null) {
for (int i = 0, size = constraints.size(); i < size; i++) {
......@@ -397,7 +404,7 @@ public class MVTable extends TableBase {
}
MVIndex index;
// TODO support in-memory indexes
// if (isPersistIndexes() && indexType.isPersistent()) {
// if (isPersistIndexes() && indexType.isPersistent()) {
int mainIndexColumn;
mainIndexColumn = getMainIndexColumn(indexType, cols);
if (database.isStarting()) {
......@@ -409,15 +416,13 @@ public class MVTable extends TableBase {
}
if (mainIndexColumn != -1) {
primaryIndex.setMainIndexColumn(mainIndexColumn);
index = new MVDelegateIndex(this, indexId,
indexName, primaryIndex, indexType);
index = new MVDelegateIndex(this, indexId, indexName, primaryIndex,
indexType);
} else if (indexType.isSpatial()) {
index = new MVSpatialIndex(session.getDatabase(),
this, indexId,
index = new MVSpatialIndex(session.getDatabase(), this, indexId,
indexName, cols, indexType);
} else {
index = new MVSecondaryIndex(session.getDatabase(),
this, indexId,
index = new MVSecondaryIndex(session.getDatabase(), this, indexId,
indexName, cols, indexType);
}
if (index.needRebuild()) {
......@@ -439,7 +444,8 @@ public class MVTable extends TableBase {
private void rebuildIndex(Session session, MVIndex index, String indexName) {
try {
if (session.getDatabase().getMvStore() == null || index instanceof MVSpatialIndex) {
if (session.getDatabase().getMvStore() == null ||
index instanceof MVSpatialIndex) {
// in-memory
rebuildIndexBuffered(session, index);
} else {
......@@ -509,8 +515,8 @@ public class MVTable extends TableBase {
addRowsToIndex(session, buffer, index);
}
if (SysProperties.CHECK && remaining != 0) {
DbException.throwInternalError("rowcount remaining=" + remaining
+ " " + getName());
DbException.throwInternalError("rowcount remaining=" + remaining +
" " + getName());
}
}
......@@ -536,8 +542,8 @@ public class MVTable extends TableBase {
}
addRowsToIndex(session, buffer, index);
if (SysProperties.CHECK && remaining != 0) {
DbException.throwInternalError("rowcount remaining=" + remaining
+ " " + getName());
DbException.throwInternalError("rowcount remaining=" + remaining +
" " + getName());
}
}
......@@ -552,7 +558,7 @@ public class MVTable extends TableBase {
if (first.sortType != SortOrder.ASCENDING) {
return -1;
}
switch(first.column.getType()) {
switch (first.column.getType()) {
case Value.BYTE:
case Value.SHORT:
case Value.INT:
......@@ -724,11 +730,12 @@ public class MVTable extends TableBase {
indexes.remove(index);
}
if (SysProperties.CHECK) {
for (SchemaObject obj : database.getAllSchemaObjects(DbObject.INDEX)) {
for (SchemaObject obj : database
.getAllSchemaObjects(DbObject.INDEX)) {
Index index = (Index) obj;
if (index.getTable() == this) {
DbException.throwInternalError(
"index not dropped: " + index.getName());
DbException.throwInternalError("index not dropped: " +
index.getName());
}
}
}
......@@ -761,7 +768,7 @@ public class MVTable extends TableBase {
/**
* Get the transaction to use for this session.
*
*
* @param session the session
* @return the transaction
*/
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论