提交 ccc92707 authored 作者: noelgrandin's avatar noelgrandin

Fix an LOB deadlock between reading and updating LOB columns.

上级 23cd2f75
......@@ -65,6 +65,7 @@ Change Log
It will now throw an exception.
</li><li>Query Statistics: new feature which stores the newest 100 SQL queries executed and their performance data.
Useful for tracking down badly performing queries.
</li><li>Fix an LOB deadlock between reading and updating LOB columns.
</li></ul>
<h2>Version 1.3.173 (2013-07-28)</h2>
......
......@@ -113,6 +113,7 @@ public class Database implements DataHandler {
private int nextTempTableId;
private User systemUser;
private Session systemSession;
private Session lobSession;
private Table meta;
private Index metaIdIndex;
private FileLock lock;
......@@ -628,6 +629,7 @@ public class Database implements DataHandler {
roles.put(Constants.PUBLIC_ROLE_NAME, publicRole);
systemUser.setAdmin(true);
systemSession = new Session(this, systemUser, ++nextSessionId);
lobSession = new Session(this, systemUser, ++nextSessionId);
CreateTableData data = new CreateTableData();
ArrayList<Column> cols = data.columns;
Column columnId = new Column("ID", Value.INT);
......@@ -1056,11 +1058,11 @@ public class Database implements DataHandler {
exclusiveSession = null;
}
userSessions.remove(session);
if (session != systemSession) {
if (session != systemSession && session != lobSession) {
trace.info("disconnecting session #{0}", session.getId());
}
}
if (userSessions.size() == 0 && session != systemSession) {
if (userSessions.size() == 0 && session != systemSession && session != lobSession) {
if (closeDelay == 0) {
close(false);
} else if (closeDelay < 0) {
......@@ -1072,7 +1074,7 @@ public class Database implements DataHandler {
delayedCloser.start();
}
}
if (session != systemSession && session != null) {
if (session != systemSession && session != lobSession && session != null) {
trace.info("disconnected session #{0}", session.getId());
}
}
......@@ -1276,6 +1278,10 @@ public class Database implements DataHandler {
systemSession.close();
systemSession = null;
}
if (lobSession != null) {
lobSession.close();
lobSession = null;
}
if (lock != null) {
if (fileLockMethod == FileLock.LOCK_SERIALIZED) {
// wait before deleting the .lock file,
......@@ -1461,9 +1467,13 @@ public class Database implements DataHandler {
}
// copy, to ensure the reference is stable
Session sys = systemSession;
Session lob = lobSession;
if (includingSystemSession && sys != null) {
list.add(sys);
}
if (includingSystemSession && lob != null) {
list.add(lob);
}
Session[] array = new Session[list.size()];
list.toArray(array);
return array;
......@@ -2475,13 +2485,20 @@ public class Database implements DataHandler {
return lobStorage;
}
public JdbcConnection getLobConnection() {
public JdbcConnection getLobConnectionForInit() {
String url = Constants.CONN_URL_INTERNAL;
JdbcConnection conn = new JdbcConnection(systemSession, systemUser.getName(), url);
conn.setTraceLevel(TraceSystem.OFF);
return conn;
}
public JdbcConnection getLobConnectionForRegularUse() {
String url = Constants.CONN_URL_INTERNAL;
JdbcConnection conn = new JdbcConnection(lobSession, systemUser.getName(), url);
conn.setTraceLevel(TraceSystem.OFF);
return conn;
}
public void setLogMode(int log) {
if (log < 0 || log > 2) {
throw DbException.getInvalidValueException("LOG", log);
......
......@@ -34,9 +34,38 @@ import org.h2.value.ValueLobDb;
/**
* This class stores LOB objects in the database.
* This is the back-end i.e. the server side of the LOB storage.
* <p>
* Using the system session
* <p>
* Why do we use the system session to store the data? Some LOB operations can take a very long time.
* If we did them on a normal session, we would be locking the LOB tables for long periods of time,
* which is extremely detrimental to the rest of the system.
* Perhaps when we shift to the MVStore engine, we can revisit this design decision.
* <p>
* Locking Discussion
* <p>
* Normally, the locking order in H2 is: first lock the Session object, then lock the Database object.
* However, in the case of the LOB data, we are using the system session to store the data.
* If we locked the normal way, we see deadlocks caused by the following pattern:
* <pre>
* Thread 1:
* locks normal session
* locks database
* waiting to lock system session
* Thread 2:
* locks system session
* waiting to lock database.
* </pre>
* So, in this class alone, we do two things: we have our very own dedicated session, the LOB session,
* and we take the locks in this order: first the Database object, and then the LOB session.
* Since we own the LOB session, no-one else can lock on it, and we are safe.
*/
public class LobStorageBackend implements LobStorageInterface {
/**
* Locking Discussion
* --------------------
*/
/**
* The name of the lob data table. If this table exists, then lob storage is
* used.
......@@ -85,14 +114,15 @@ public class LobStorageBackend implements LobStorageInterface {
if (init) {
return;
}
conn = database.getLobConnection();
init = true;
conn = database.getLobConnectionForRegularUse();
JdbcConnection initConn = database.getLobConnectionForInit();
try {
Statement stat = conn.createStatement();
Statement stat = initConn.createStatement();
// stat.execute("SET UNDO_LOG 0");
// stat.execute("SET REDO_LOG_BINARY 0");
boolean create = true;
PreparedStatement prep = conn.prepareStatement(
PreparedStatement prep = initConn.prepareStatement(
"SELECT ZERO() FROM INFORMATION_SCHEMA.COLUMNS WHERE " +
"TABLE_SCHEMA=? AND TABLE_NAME=? AND COLUMN_NAME=?");
prep.setString(1, "INFORMATION_SCHEMA");
......@@ -101,7 +131,7 @@ public class LobStorageBackend implements LobStorageInterface {
ResultSet rs;
rs = prep.executeQuery();
if (rs.next()) {
prep = conn.prepareStatement(
prep = initConn.prepareStatement(
"SELECT ZERO() FROM INFORMATION_SCHEMA.TABLES WHERE " +
"TABLE_SCHEMA=? AND TABLE_NAME=?");
prep.setString(1, "INFORMATION_SCHEMA");
......@@ -190,10 +220,10 @@ public class LobStorageBackend implements LobStorageInterface {
* @return the block (expanded if stored compressed)
*/
byte[] readBlock(long block) throws SQLException {
// we have to take the lock on the session
// before the lock on the database to prevent ABBA deadlocks
synchronized (conn.getSession()) {
synchronized (database) {
// see locking discussion at the top
assertNotHolds(conn.getSession());
synchronized (database) {
synchronized (conn.getSession()) {
String sql = "SELECT COMPRESSED, DATA FROM " + LOB_DATA + " WHERE BLOCK = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, block);
......@@ -224,8 +254,10 @@ public class LobStorageBackend implements LobStorageInterface {
* the sequence, and the offset
*/
long[] skipBuffer(long lob, long pos) throws SQLException {
synchronized (conn.getSession()) {
synchronized (database) {
// see locking discussion at the top
assertNotHolds(conn.getSession());
synchronized (database) {
synchronized (conn.getSession()) {
String sql = "SELECT MAX(SEQ), MAX(POS) FROM " + LOB_MAP +
" WHERE LOB = ? AND POS < ?";
PreparedStatement prep = prepare(sql);
......@@ -281,8 +313,10 @@ public class LobStorageBackend implements LobStorageInterface {
@Override
public void removeLob(long lob) {
try {
synchronized (conn.getSession()) {
synchronized (database) {
// see locking discussion at the top
assertNotHolds(conn.getSession());
synchronized (database) {
synchronized (conn.getSession()) {
String sql = "SELECT BLOCK, HASH FROM " + LOB_MAP + " D WHERE D.LOB = ? " +
"AND NOT EXISTS(SELECT 1 FROM " + LOB_MAP + " O " +
"WHERE O.BLOCK = D.BLOCK AND O.LOB <> ?)";
......@@ -328,7 +362,13 @@ public class LobStorageBackend implements LobStorageInterface {
public InputStream getInputStream(long lobId, byte[] hmac, long byteCount) throws IOException {
try {
init();
return new LobInputStream(lobId, byteCount);
assertNotHolds(conn.getSession());
// see locking discussion at the top
synchronized (database) {
synchronized(conn.getSession()) {
return new LobInputStream(lobId, byteCount);
}
}
} catch (SQLException e) {
throw DbException.convertToIOException(e);
}
......@@ -365,8 +405,10 @@ public class LobStorageBackend implements LobStorageInterface {
small = b;
break;
}
synchronized (conn.getSession()) {
synchronized (database) {
assertNotHolds(conn.getSession());
// see locking discussion at the top
synchronized (database) {
synchronized (conn.getSession()) {
if (seq == 0) {
lobId = getNextLobId();
}
......@@ -400,8 +442,10 @@ public class LobStorageBackend implements LobStorageInterface {
}
private ValueLobDb registerLob(int type, long lobId, int tableId, long byteCount, long precision) {
synchronized (conn.getSession()) {
synchronized (database) {
assertNotHolds(conn.getSession());
// see locking discussion at the top
synchronized (database) {
synchronized (conn.getSession()) {
try {
String sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) VALUES(?, ?, ?)";
PreparedStatement prep = prepare(sql);
......@@ -421,8 +465,10 @@ public class LobStorageBackend implements LobStorageInterface {
@Override
public ValueLobDb copyLob(int type, long oldLobId, int tableId, long length) {
synchronized (conn.getSession()) {
synchronized (database) {
assertNotHolds(conn.getSession());
// see locking discussion at the top
synchronized (database) {
synchronized (conn.getSession()) {
try {
init();
long lobId = getNextLobId();
......@@ -495,46 +541,44 @@ public class LobStorageBackend implements LobStorageInterface {
b = compress.compress(b, compressAlgorithm);
}
int hash = Arrays.hashCode(b);
synchronized (conn.getSession()) {
synchronized (database) {
block = getHashCacheBlock(hash);
if (block != -1) {
String sql = "SELECT COMPRESSED, DATA FROM " + LOB_DATA +
" WHERE BLOCK = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, block);
ResultSet rs = prep.executeQuery();
if (rs.next()) {
boolean compressed = rs.getInt(1) != 0;
byte[] compare = rs.getBytes(2);
if (compressed == (compressAlgorithm != null) && Arrays.equals(b, compare)) {
blockExists = true;
}
}
reuse(sql, prep);
}
if (!blockExists) {
block = nextBlock++;
setHashCacheBlock(hash, block);
String sql = "INSERT INTO " + LOB_DATA + "(BLOCK, COMPRESSED, DATA) VALUES(?, ?, ?)";
PreparedStatement prep = prepare(sql);
prep.setLong(1, block);
prep.setInt(2, compressAlgorithm == null ? 0 : 1);
prep.setBytes(3, b);
prep.execute();
reuse(sql, prep);
assertHolds(conn.getSession());
assertHolds(database);
block = getHashCacheBlock(hash);
if (block != -1) {
String sql = "SELECT COMPRESSED, DATA FROM " + LOB_DATA +
" WHERE BLOCK = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, block);
ResultSet rs = prep.executeQuery();
if (rs.next()) {
boolean compressed = rs.getInt(1) != 0;
byte[] compare = rs.getBytes(2);
if (compressed == (compressAlgorithm != null) && Arrays.equals(b, compare)) {
blockExists = true;
}
String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, POS, HASH, BLOCK) VALUES(?, ?, ?, ?, ?)";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
prep.setInt(2, seq);
prep.setLong(3, pos);
prep.setLong(4, hash);
prep.setLong(5, block);
prep.execute();
reuse(sql, prep);
}
reuse(sql, prep);
}
if (!blockExists) {
block = nextBlock++;
setHashCacheBlock(hash, block);
String sql = "INSERT INTO " + LOB_DATA + "(BLOCK, COMPRESSED, DATA) VALUES(?, ?, ?)";
PreparedStatement prep = prepare(sql);
prep.setLong(1, block);
prep.setInt(2, compressAlgorithm == null ? 0 : 1);
prep.setBytes(3, b);
prep.execute();
reuse(sql, prep);
}
String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, POS, HASH, BLOCK) VALUES(?, ?, ?, ?, ?)";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
prep.setInt(2, seq);
prep.setLong(3, pos);
prep.setLong(4, hash);
prep.setLong(5, block);
prep.execute();
reuse(sql, prep);
}
@Override
......@@ -560,8 +604,10 @@ public class LobStorageBackend implements LobStorageInterface {
@Override
public void setTable(long lobId, int table) {
synchronized (conn.getSession()) {
synchronized (database) {
assertNotHolds(conn.getSession());
// see locking discussion at the top
synchronized (database) {
synchronized (conn.getSession()) {
try {
init();
String sql = "UPDATE " + LOBS + " SET TABLE = ? WHERE ID = ?";
......@@ -576,7 +622,19 @@ public class LobStorageBackend implements LobStorageInterface {
}
}
}
private static void assertNotHolds(Object lock) {
if (Thread.holdsLock(lock)) {
throw DbException.throwInternalError();
}
}
private static void assertHolds(Object lock) {
if (!Thread.holdsLock(lock)) {
throw DbException.throwInternalError();
}
}
/**
* An input stream that reads from a LOB.
*/
......@@ -613,45 +671,44 @@ public class LobStorageBackend implements LobStorageInterface {
// we have to take the lock on the session
// before the lock on the database to prevent ABBA deadlocks
synchronized (conn.getSession()) {
synchronized (database) {
if (byteCount == -1) {
String sql = "SELECT BYTE_COUNT FROM " + LOBS + " WHERE ID = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry: " + lobId).getSQLException();
}
byteCount = rs.getLong(1);
reuse(sql, prep);
}
this.remainingBytes = byteCount;
assertHolds(conn.getSession());
assertHolds(database);
if (byteCount == -1) {
String sql = "SELECT BYTE_COUNT FROM " + LOBS + " WHERE ID = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry: " + lobId).getSQLException();
}
byteCount = rs.getLong(1);
reuse(sql, prep);
}
this.remainingBytes = byteCount;
String sql = "SELECT COUNT(*) FROM " + LOB_MAP + " WHERE LOB = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry: " + lobId).getSQLException();
}
int lobMapCount = rs.getInt(1);
reuse(sql, prep);
String sql = "SELECT COUNT(*) FROM " + LOB_MAP + " WHERE LOB = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry: " + lobId).getSQLException();
}
int lobMapCount = rs.getInt(1);
reuse(sql, prep);
this.lobMapBlocks = new long[lobMapCount];
this.lobMapBlocks = new long[lobMapCount];
sql = "SELECT BLOCK FROM " + LOB_MAP + " WHERE LOB = ? ORDER BY SEQ";
prep = prepare(sql);
prep.setLong(1, lobId);
rs = prep.executeQuery();
int i = 0;
while (rs.next()) {
this.lobMapBlocks[i] = rs.getLong(1);
i++;
}
reuse(sql, prep);
}
sql = "SELECT BLOCK FROM " + LOB_MAP + " WHERE LOB = ? ORDER BY SEQ";
prep = prepare(sql);
prep.setLong(1, lobId);
rs = prep.executeQuery();
int i = 0;
while (rs.next()) {
this.lobMapBlocks[i] = rs.getLong(1);
i++;
}
reuse(sql, prep);
}
@Override
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论