提交 26f915bd authored 作者: Thomas Mueller's avatar Thomas Mueller

Fix for issue #143, deadlock between two sessions hitting the same sequence on a column.

上级 ebf435d5
......@@ -983,14 +983,16 @@ public class Database implements DataHandler {
* @param session the session
* @param obj the object to add
*/
public synchronized void addSchemaObject(Session session, SchemaObject obj) {
public void addSchemaObject(Session session, SchemaObject obj) {
int id = obj.getId();
if (id > 0 && !starting) {
checkWritingAllowed();
}
lockMeta(session);
obj.getSchema().add(obj);
addMeta(session, obj);
synchronized (this) {
obj.getSchema().add(obj);
addMeta(session, obj);
}
}
/**
......@@ -1791,7 +1793,7 @@ public class Database implements DataHandler {
* @param session the session
* @param obj the object to be removed
*/
public synchronized void removeSchemaObject(Session session,
public void removeSchemaObject(Session session,
SchemaObject obj) {
int type = obj.getType();
if (type == DbObject.TABLE_OR_VIEW) {
......@@ -1817,22 +1819,24 @@ public class Database implements DataHandler {
}
checkWritingAllowed();
lockMeta(session);
Comment comment = findComment(obj);
if (comment != null) {
removeDatabaseObject(session, comment);
}
obj.getSchema().remove(obj);
int id = obj.getId();
if (!starting) {
Table t = getDependentTable(obj, null);
if (t != null) {
obj.getSchema().add(obj);
throw DbException.get(ErrorCode.CANNOT_DROP_2, obj.getSQL(),
t.getSQL());
synchronized (this) {
Comment comment = findComment(obj);
if (comment != null) {
removeDatabaseObject(session, comment);
}
obj.getSchema().remove(obj);
int id = obj.getId();
if (!starting) {
Table t = getDependentTable(obj, null);
if (t != null) {
obj.getSchema().add(obj);
throw DbException.get(ErrorCode.CANNOT_DROP_2, obj.getSQL(),
t.getSQL());
}
obj.removeChildrenAndResources(session);
}
obj.removeChildrenAndResources(session);
removeMeta(session, id);
}
removeMeta(session, id);
}
/**
......
......@@ -33,12 +33,8 @@ public class Sequence extends SchemaObjectBase {
private long maxValue;
private boolean cycle;
private boolean belongsToTable;
/**
* The last valueWithMargin we flushed. We do a little dance with this to
* avoid an ABBA deadlock.
*/
private long lastFlushValueWithMargin;
private Object flushSync = new Object();
private boolean writeWithMargin;
/**
* Creates a new sequence for an auto-increment column.
......@@ -217,15 +213,16 @@ public class Sequence extends SchemaObjectBase {
@Override
public synchronized String getCreateSQL() {
long v = writeWithMargin ? valueWithMargin : value;
StringBuilder buff = new StringBuilder("CREATE SEQUENCE ");
buff.append(getSQL()).append(" START WITH ").append(value);
buff.append(getSQL()).append(" START WITH ").append(v);
if (increment != 1) {
buff.append(" INCREMENT BY ").append(increment);
}
if (minValue != getDefaultMinValue(value, increment)) {
if (minValue != getDefaultMinValue(v, increment)) {
buff.append(" MINVALUE ").append(minValue);
}
if (maxValue != getDefaultMaxValue(value, increment)) {
if (maxValue != getDefaultMaxValue(v, increment)) {
buff.append(" MAXVALUE ").append(maxValue);
}
if (cycle) {
......@@ -248,13 +245,11 @@ public class Sequence extends SchemaObjectBase {
*/
public long getNext(Session session) {
boolean needsFlush = false;
long retVal;
long flushValueWithMargin = -1;
long result;
synchronized (this) {
if ((increment > 0 && value >= valueWithMargin) ||
(increment < 0 && value <= valueWithMargin)) {
valueWithMargin += increment * cacheSize;
flushValueWithMargin = valueWithMargin;
needsFlush = true;
}
if ((increment > 0 && value > maxValue) ||
......@@ -262,19 +257,18 @@ public class Sequence extends SchemaObjectBase {
if (cycle) {
value = increment > 0 ? minValue : maxValue;
valueWithMargin = value + (increment * cacheSize);
flushValueWithMargin = valueWithMargin;
needsFlush = true;
} else {
throw DbException.get(ErrorCode.SEQUENCE_EXHAUSTED, getName());
}
}
retVal = value;
result = value;
value += increment;
}
if (needsFlush) {
flush(session, flushValueWithMargin);
flush(session);
}
return retVal;
return result;
}
/**
......@@ -283,7 +277,7 @@ public class Sequence extends SchemaObjectBase {
public void flushWithoutMargin() {
if (valueWithMargin != value) {
valueWithMargin = value;
flush(null, valueWithMargin);
flush(null);
}
}
......@@ -291,47 +285,39 @@ public class Sequence extends SchemaObjectBase {
* Flush the current value, including the margin, to disk.
*
* @param session the session
* @param flushValueWithMargin whether to reserve more entries
*/
public void flush(Session session, long flushValueWithMargin) {
public void flush(Session session) {
if (isTemporary()) {
return;
}
if (session == null || !database.isSysTableLockedBy(session)) {
// This session may not lock the sys table (except if it already has
// locked it) because it must be committed immediately, otherwise
// other threads can not access the sys table.
Session sysSession = database.getSystemSession();
synchronized (sysSession) {
flushInternal(sysSession, flushValueWithMargin);
synchronized (flushSync) {
flushInternal(sysSession);
}
sysSession.commit(false);
}
} else {
synchronized (session) {
flushInternal(session, flushValueWithMargin);
synchronized (flushSync) {
flushInternal(session);
}
}
}
}
private void flushInternal(Session session, long flushValueWithMargin) {
private void flushInternal(Session session) {
final boolean metaWasLocked = database.lockMeta(session);
synchronized (this) {
if (flushValueWithMargin == lastFlushValueWithMargin) {
if (!metaWasLocked) {
database.unlockMeta(session);
}
return;
}
}
// just for this case, use the value with the margin for the script
long realValue = value;
// just for this case, use the value with the margin
try {
value = valueWithMargin;
if (!isTemporary()) {
database.updateMeta(session, this);
}
writeWithMargin = true;
database.updateMeta(session, this);
} finally {
value = realValue;
}
synchronized (this) {
lastFlushValueWithMargin = flushValueWithMargin;
writeWithMargin = false;
}
if (!metaWasLocked) {
database.unlockMeta(session);
......
......@@ -348,7 +348,7 @@ public class Column {
if (update) {
sequence.modify(now + inc, null, null, null);
session.setLastIdentity(ValueLong.get(now));
sequence.flush(session, 0);
sequence.flush(session);
}
}
}
......
......@@ -13,6 +13,8 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.h2.api.Trigger;
import org.h2.test.TestBase;
import org.h2.util.Task;
......@@ -33,7 +35,6 @@ public class TestSequence extends TestBase {
@Override
public void test() throws Exception {
testConcurrentCreate();
;if(true)return;
testSchemaSearchPath();
testAlterSequenceColumn();
testAlterSequence();
......@@ -49,24 +50,17 @@ public class TestSequence extends TestBase {
}
private void testConcurrentCreate() throws Exception {
while(true)
try {
testConcurrentCreate2();
} catch(Exception e) {
System.out.println(e);
}
}
private void testConcurrentCreate2() throws Exception {
deleteDb("sequence");
final String url = getURL("sequence;MULTI_THREADED=1", true);
final String url = getURL("sequence;MULTI_THREADED=1;LOCK_TIMEOUT=2000", true);
Connection conn = getConnection(url);
Task[] tasks = new Task[2];
try {
Statement stat = conn.createStatement();
stat.execute("create table dummy(id bigint primary key)");
stat.execute("create table test(id bigint primary key)");
stat.execute("create sequence test_seq cache 2");
for (int i = 0; i < tasks.length; i++) {
final int x = i;
tasks[i] = new Task() {
@Override
public void call() throws Exception {
......@@ -81,18 +75,30 @@ public class TestSequence extends TestBase {
if (Math.random() < 0.01) {
prep2.execute();
}
if (Math.random() < 0.01) {
createDropTrigger(conn);
}
}
} finally {
conn.close();
}
}
private void createDropTrigger(Connection conn) throws Exception {
String triggerName = "t_" + x;
Statement stat = conn.createStatement();
stat.execute("create trigger " + triggerName +
" before insert on dummy call \"" +
TriggerTest.class.getName() + "\"");
stat.execute("drop trigger " + triggerName);
}
}.execute();
}
Thread.sleep(100);
Thread.sleep(1000);
for (Task t : tasks) {
t.get();
}
stat.execute("shutdown immediately");
} finally {
for (Task t : tasks) {
t.join();
......@@ -413,4 +419,35 @@ public class TestSequence extends TestBase {
long value = rs.getLong(1);
return value;
}
/**
* A test trigger.
*/
public static class TriggerTest implements Trigger {
@Override
public void init(Connection conn, String schemaName,
String triggerName, String tableName, boolean before, int type)
throws SQLException {
conn.createStatement().executeQuery("call next value for test_seq");
}
@Override
public void fire(Connection conn, Object[] oldRow, Object[] newRow)
throws SQLException {
// ignore
}
@Override
public void close() throws SQLException {
// ignore
}
@Override
public void remove() throws SQLException {
// ignore
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论