提交 9b92121d authored 作者: noelgrandin@gmail.com's avatar noelgrandin@gmail.com

Issue 567: h2 hangs for a long time then (sometimes) recovers.

Introduce a queue when doing table locking to prevent session starvation.
上级 d20f5855
...@@ -29,6 +29,8 @@ Change Log ...@@ -29,6 +29,8 @@ Change Log
and which servers that are available. (patch from Nikolaj Fogh) and which servers that are available. (patch from Nikolaj Fogh)
</li><li>Fix bug in changing encrypted DB password that kept the file handle </li><li>Fix bug in changing encrypted DB password that kept the file handle
open when the wrong password was supplied. (test case from Jens Hohmuth). open when the wrong password was supplied. (test case from Jens Hohmuth).
</li><li>Issue 567: h2 hangs for a long time then (sometimes) recovers.
Introduce a queue when doing table locking to prevent session starvation.
</li></ul> </li></ul>
<h2>Version 1.4.179 Beta (2014-06-23)</h2> <h2>Version 1.4.179 Beta (2014-06-23)</h2>
......
...@@ -5,12 +5,12 @@ ...@@ -5,12 +5,12 @@
*/ */
package org.h2.mvstore.db; package org.h2.mvstore.db;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import org.h2.api.DatabaseEventListener; import org.h2.api.DatabaseEventListener;
import org.h2.api.ErrorCode; import org.h2.api.ErrorCode;
import org.h2.command.ddl.Analyze; import org.h2.command.ddl.Analyze;
...@@ -48,10 +48,14 @@ import org.h2.value.Value; ...@@ -48,10 +48,14 @@ import org.h2.value.Value;
public class MVTable extends TableBase { public class MVTable extends TableBase {
private MVPrimaryIndex primaryIndex; private MVPrimaryIndex primaryIndex;
private ArrayList<Index> indexes = New.arrayList(); private final ArrayList<Index> indexes = New.arrayList();
private long lastModificationId; private long lastModificationId;
private volatile Session lockExclusiveSession; private volatile Session lockExclusiveSession;
private HashSet<Session> lockSharedSessions = New.hashSet(); private final HashSet<Session> lockSharedSessions = New.hashSet();
/**
* FIFO queue to prevent starvation, since Java's synchronized locking is biased.
*/
private final ArrayDeque<Session> waitingSessions = new ArrayDeque<Session>();
private final Trace traceLock; private final Trace traceLock;
private int changesSinceAnalyze; private int changesSinceAnalyze;
private int nextAnalyze; private int nextAnalyze;
...@@ -60,13 +64,6 @@ public class MVTable extends TableBase { ...@@ -60,13 +64,6 @@ public class MVTable extends TableBase {
private final TransactionStore store; private final TransactionStore store;
/**
* True if one thread ever was waiting to lock this table. This is to avoid
* calling notifyAll if no session was ever waiting to lock this table. If
* set, the flag stays. In theory, it could be reset, however not sure when.
*/
private boolean waitForLock;
public MVTable(CreateTableData data, MVTableEngine.Store store) { public MVTable(CreateTableData data, MVTableEngine.Store store) {
super(data); super(data);
nextAnalyze = database.getSettings().analyzeAuto; nextAnalyze = database.getSettings().analyzeAuto;
...@@ -125,51 +122,25 @@ public class MVTable extends TableBase { ...@@ -125,51 +122,25 @@ public class MVTable extends TableBase {
return; return;
} }
session.setWaitForLock(this, Thread.currentThread()); session.setWaitForLock(this, Thread.currentThread());
waitingSessions.addLast(session);
try { try {
doLock(session, lockMode, exclusive); doLock1(session, lockMode, exclusive);
} finally { } finally {
session.setWaitForLock(null, null); session.setWaitForLock(null, null);
waitingSessions.remove(session);
} }
} }
} }
private void doLock(Session session, int lockMode, boolean exclusive) { private void doLock1(Session session, int lockMode, boolean exclusive) {
traceLock(session, exclusive, "requesting for"); traceLock(session, exclusive, "requesting for");
// don't get the current time unless necessary // don't get the current time unless necessary
long max = 0; long max = 0;
boolean checkDeadlock = false; boolean checkDeadlock = false;
while (true) { while (true) {
if (exclusive) { // if I'm the next one in the queue
if (lockExclusiveSession == null) { if (waitingSessions.getFirst() == session) {
if (lockSharedSessions.isEmpty()) { if (doLock2(session, lockMode, exclusive)) {
traceLock(session, exclusive, "added for");
session.addLock(this);
lockExclusiveSession = session;
return;
} else if (lockSharedSessions.size() == 1 && lockSharedSessions.contains(session)) {
traceLock(session, exclusive, "add (upgraded) for ");
lockExclusiveSession = session;
return;
}
}
} else {
if (lockExclusiveSession == null) {
if (lockMode == Constants.LOCK_MODE_READ_COMMITTED) {
if (!database.isMultiThreaded() && !database.isMultiVersion()) {
// READ_COMMITTED: a read lock is acquired,
// but released immediately after the operation
// is complete.
// When allowing only one thread, no lock is
// required.
// Row level locks work like read committed.
return;
}
}
if (!lockSharedSessions.contains(session)) {
traceLock(session, exclusive, "ok");
session.addLock(this);
lockSharedSessions.add(session);
}
return; return;
} }
} }
...@@ -210,13 +181,50 @@ public class MVTable extends TableBase { ...@@ -210,13 +181,50 @@ public class MVTable extends TableBase {
if (sleep == 0) { if (sleep == 0) {
sleep = 1; sleep = 1;
} }
waitForLock = true;
database.wait(sleep); database.wait(sleep);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
} }
} }
} }
private boolean doLock2(Session session, int lockMode, boolean exclusive) {
if (exclusive) {
if (lockExclusiveSession == null) {
if (lockSharedSessions.isEmpty()) {
traceLock(session, exclusive, "added for");
session.addLock(this);
lockExclusiveSession = session;
return true;
} else if (lockSharedSessions.size() == 1 && lockSharedSessions.contains(session)) {
traceLock(session, exclusive, "add (upgraded) for ");
lockExclusiveSession = session;
return true;
}
}
} else {
if (lockExclusiveSession == null) {
if (lockMode == Constants.LOCK_MODE_READ_COMMITTED) {
if (!database.isMultiThreaded() && !database.isMultiVersion()) {
// READ_COMMITTED: a read lock is acquired,
// but released immediately after the operation
// is complete.
// When allowing only one thread, no lock is
// required.
// Row level locks work like read committed.
return true;
}
}
if (!lockSharedSessions.contains(session)) {
traceLock(session, exclusive, "ok");
session.addLock(this);
lockSharedSessions.add(session);
}
return true;
}
}
return false;
}
private static String getDeadlockDetails(ArrayList<Session> sessions) { private static String getDeadlockDetails(ArrayList<Session> sessions) {
// We add the thread details here to make it easier for customers to // We add the thread details here to make it easier for customers to
...@@ -326,10 +334,8 @@ public class MVTable extends TableBase { ...@@ -326,10 +334,8 @@ public class MVTable extends TableBase {
if (lockSharedSessions.size() > 0) { if (lockSharedSessions.size() > 0) {
lockSharedSessions.remove(s); lockSharedSessions.remove(s);
} }
// TODO lock: maybe we need we fifo-queue to make sure nobody
// starves. check what other databases do
synchronized (database) { synchronized (database) {
if (database.getSessionCount() > 1 && waitForLock) { if (!waitingSessions.isEmpty()) {
database.notifyAll(); database.notifyAll();
} }
} }
......
...@@ -5,6 +5,7 @@ ...@@ -5,6 +5,7 @@
*/ */
package org.h2.table; package org.h2.table;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator; import java.util.Comparator;
...@@ -54,6 +55,10 @@ public class RegularTable extends TableBase { ...@@ -54,6 +55,10 @@ public class RegularTable extends TableBase {
private long rowCount; private long rowCount;
private volatile Session lockExclusiveSession; private volatile Session lockExclusiveSession;
private HashSet<Session> lockSharedSessions = New.hashSet(); private HashSet<Session> lockSharedSessions = New.hashSet();
/**
* FIFO queue to prevent starvation, since Java's synchronized locking is biased.
*/
private final ArrayDeque<Session> waitingSessions = new ArrayDeque<Session>();
private final Trace traceLock; private final Trace traceLock;
private final ArrayList<Index> indexes = New.arrayList(); private final ArrayList<Index> indexes = New.arrayList();
private long lastModificationId; private long lastModificationId;
...@@ -63,13 +68,6 @@ public class RegularTable extends TableBase { ...@@ -63,13 +68,6 @@ public class RegularTable extends TableBase {
private int nextAnalyze; private int nextAnalyze;
private Column rowIdColumn; private Column rowIdColumn;
/**
* True if one thread ever was waiting to lock this table. This is to avoid
* calling notifyAll if no session was ever waiting to lock this table. If
* set, the flag stays. In theory, it could be reset, however not sure when.
*/
private boolean waitForLock;
public RegularTable(CreateTableData data) { public RegularTable(CreateTableData data) {
super(data); super(data);
nextAnalyze = database.getSettings().analyzeAuto; nextAnalyze = database.getSettings().analyzeAuto;
...@@ -463,51 +461,25 @@ public class RegularTable extends TableBase { ...@@ -463,51 +461,25 @@ public class RegularTable extends TableBase {
return; return;
} }
session.setWaitForLock(this, Thread.currentThread()); session.setWaitForLock(this, Thread.currentThread());
waitingSessions.addLast(session);
try { try {
doLock(session, lockMode, exclusive); doLock1(session, lockMode, exclusive);
} finally { } finally {
session.setWaitForLock(null, null); session.setWaitForLock(null, null);
waitingSessions.remove(session);
} }
} }
} }
private void doLock(Session session, int lockMode, boolean exclusive) { private void doLock1(Session session, int lockMode, boolean exclusive) {
traceLock(session, exclusive, "requesting for"); traceLock(session, exclusive, "requesting for");
// don't get the current time unless necessary // don't get the current time unless necessary
long max = 0; long max = 0;
boolean checkDeadlock = false; boolean checkDeadlock = false;
while (true) { while (true) {
if (exclusive) { // if I'm the next one in the queue
if (lockExclusiveSession == null) { if (waitingSessions.getFirst() == session) {
if (lockSharedSessions.isEmpty()) { if (doLock2(session, lockMode, exclusive)) {
traceLock(session, exclusive, "added for");
session.addLock(this);
lockExclusiveSession = session;
return;
} else if (lockSharedSessions.size() == 1 && lockSharedSessions.contains(session)) {
traceLock(session, exclusive, "add (upgraded) for ");
lockExclusiveSession = session;
return;
}
}
} else {
if (lockExclusiveSession == null) {
if (lockMode == Constants.LOCK_MODE_READ_COMMITTED) {
if (!database.isMultiThreaded() && !database.isMultiVersion()) {
// READ_COMMITTED: a read lock is acquired,
// but released immediately after the operation
// is complete.
// When allowing only one thread, no lock is
// required.
// Row level locks work like read committed.
return;
}
}
if (!lockSharedSessions.contains(session)) {
traceLock(session, exclusive, "ok");
session.addLock(this);
lockSharedSessions.add(session);
}
return; return;
} }
} }
...@@ -545,7 +517,6 @@ public class RegularTable extends TableBase { ...@@ -545,7 +517,6 @@ public class RegularTable extends TableBase {
if (sleep == 0) { if (sleep == 0) {
sleep = 1; sleep = 1;
} }
waitForLock = true;
database.wait(sleep); database.wait(sleep);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// ignore // ignore
...@@ -553,6 +524,43 @@ public class RegularTable extends TableBase { ...@@ -553,6 +524,43 @@ public class RegularTable extends TableBase {
} }
} }
private boolean doLock2(Session session, int lockMode, boolean exclusive) {
if (exclusive) {
if (lockExclusiveSession == null) {
if (lockSharedSessions.isEmpty()) {
traceLock(session, exclusive, "added for");
session.addLock(this);
lockExclusiveSession = session;
return true;
} else if (lockSharedSessions.size() == 1 && lockSharedSessions.contains(session)) {
traceLock(session, exclusive, "add (upgraded) for ");
lockExclusiveSession = session;
return true;
}
}
} else {
if (lockExclusiveSession == null) {
if (lockMode == Constants.LOCK_MODE_READ_COMMITTED) {
if (!database.isMultiThreaded() && !database.isMultiVersion()) {
// READ_COMMITTED: a read lock is acquired,
// but released immediately after the operation
// is complete.
// When allowing only one thread, no lock is
// required.
// Row level locks work like read committed.
return true;
}
}
if (!lockSharedSessions.contains(session)) {
traceLock(session, exclusive, "ok");
session.addLock(this);
lockSharedSessions.add(session);
}
return true;
}
}
return false;
}
private static String getDeadlockDetails(ArrayList<Session> sessions) { private static String getDeadlockDetails(ArrayList<Session> sessions) {
// We add the thread details here to make it easier for customers to // We add the thread details here to make it easier for customers to
// match up these error messages with their own logs. // match up these error messages with their own logs.
...@@ -655,10 +663,8 @@ public class RegularTable extends TableBase { ...@@ -655,10 +663,8 @@ public class RegularTable extends TableBase {
if (lockSharedSessions.size() > 0) { if (lockSharedSessions.size() > 0) {
lockSharedSessions.remove(s); lockSharedSessions.remove(s);
} }
// TODO lock: maybe we need we fifo-queue to make sure nobody
// starves. check what other databases do
synchronized (database) { synchronized (database) {
if (database.getSessionCount() > 1 && waitForLock) { if (!waitingSessions.isEmpty()) {
database.notifyAll(); database.notifyAll();
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论