提交 799303c9 authored 作者: Thomas Mueller's avatar Thomas Mueller

Issue 315: Access to LOBs could cause a Java level deadlock.

上级 2bddbdf2
...@@ -17,7 +17,8 @@ Change Log ...@@ -17,7 +17,8 @@ Change Log
<h1>Change Log</h1> <h1>Change Log</h1>
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>Support for the ICU4J collator. <ul><li>Issue 315: Access to LOBs could cause a Java level deadlock.
</li><li>Support for the ICU4J collator.
</li><li>Improved Oracle compatibility: support for NVL2. Thanks to litailang for the patch! </li><li>Improved Oracle compatibility: support for NVL2. Thanks to litailang for the patch!
</li><li>Improved PostgreSQL compatibility: support for RANDOM() in addition to RAND(). </li><li>Improved PostgreSQL compatibility: support for RANDOM() in addition to RAND().
</li><li>There was a classloader memory leak problem because a class contained a static </li><li>There was a classloader memory leak problem because a class contained a static
......
...@@ -80,31 +80,33 @@ public class LobStorage { ...@@ -80,31 +80,33 @@ public class LobStorage {
/** /**
* Initialize the lob storage. * Initialize the lob storage.
*/ */
public synchronized void init() { public void init() {
if (init) { if (init) {
return; return;
} }
conn = handler.getLobConnection(); synchronized (handler) {
init = true; conn = handler.getLobConnection();
if (conn == null) { init = true;
return; if (conn == null) {
} return;
try { }
Statement stat = conn.createStatement(); try {
// stat.execute("SET UNDO_LOG 0"); Statement stat = conn.createStatement();
// stat.execute("SET REDO_LOG_BINARY 0"); // stat.execute("SET UNDO_LOG 0");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOBS + "(ID BIGINT PRIMARY KEY, BYTE_COUNT BIGINT, TABLE INT) HIDDEN"); // stat.execute("SET REDO_LOG_BINARY 0");
stat.execute("CREATE INDEX IF NOT EXISTS INFORMATION_SCHEMA.INDEX_LOB_TABLE ON " + LOBS + "(TABLE)"); stat.execute("CREATE TABLE IF NOT EXISTS " + LOBS + "(ID BIGINT PRIMARY KEY, BYTE_COUNT BIGINT, TABLE INT) HIDDEN");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_MAP + "(LOB BIGINT, SEQ INT, HASH INT, BLOCK BIGINT, PRIMARY KEY(LOB, SEQ)) HIDDEN"); stat.execute("CREATE INDEX IF NOT EXISTS INFORMATION_SCHEMA.INDEX_LOB_TABLE ON " + LOBS + "(TABLE)");
stat.execute("CREATE INDEX IF NOT EXISTS INFORMATION_SCHEMA.INDEX_LOB_MAP_DATA_LOB ON " + LOB_MAP + "(BLOCK, LOB)"); stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_MAP + "(LOB BIGINT, SEQ INT, HASH INT, BLOCK BIGINT, PRIMARY KEY(LOB, SEQ)) HIDDEN");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_DATA + "(BLOCK BIGINT PRIMARY KEY, COMPRESSED INT, DATA BINARY) HIDDEN"); stat.execute("CREATE INDEX IF NOT EXISTS INFORMATION_SCHEMA.INDEX_LOB_MAP_DATA_LOB ON " + LOB_MAP + "(BLOCK, LOB)");
ResultSet rs; stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_DATA + "(BLOCK BIGINT PRIMARY KEY, COMPRESSED INT, DATA BINARY) HIDDEN");
rs = stat.executeQuery("SELECT MAX(BLOCK) FROM " + LOB_DATA); ResultSet rs;
rs.next(); rs = stat.executeQuery("SELECT MAX(BLOCK) FROM " + LOB_DATA);
nextBlock = rs.getLong(1) + 1; rs.next();
stat.close(); nextBlock = rs.getLong(1) + 1;
} catch (SQLException e) { stat.close();
throw DbException.convert(e); } catch (SQLException e) {
throw DbException.convert(e);
}
} }
} }
...@@ -173,41 +175,42 @@ public class LobStorage { ...@@ -173,41 +175,42 @@ public class LobStorage {
return ValueLob.createSmallLob(type, small); return ValueLob.createSmallLob(type, small);
} }
byte[] readBlock(long lob, int seq) throws SQLException {
synchronized (handler) {
String sql = "SELECT COMPRESSED, DATA FROM " + LOB_MAP + " M " +
"INNER JOIN " + LOB_DATA + " D ON M.BLOCK = D.BLOCK " +
"WHERE M.LOB = ? AND M.SEQ = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lob);
prep.setInt(2, seq);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry: "+ lob + "/" + seq).getSQLException();
}
int compressed = rs.getInt(1);
byte[] buffer = rs.getBytes(2);
if (compressed != 0) {
buffer = compress.expand(buffer);
}
reuse(sql, prep);
return buffer;
}
}
/** /**
* An input stream that reads from a LOB. * An input stream that reads from a LOB.
*/ */
public static class LobInputStream extends InputStream { public class LobInputStream extends InputStream {
private final Connection conn;
private PreparedStatement prepSelect;
private byte[] buffer; private byte[] buffer;
private int pos; private int pos;
private long remainingBytes; private long remainingBytes;
private long lob; private long lob;
private int seq; private int seq;
private CompressTool compress;
public LobInputStream(Connection conn, long lob, long byteCount) throws IOException { public LobInputStream(long lob, long byteCount) {
this.conn = conn;
this.lob = lob; this.lob = lob;
if (byteCount == -1) { remainingBytes = byteCount;
try {
this.lob = lob;
PreparedStatement prep = conn.prepareStatement(
"SELECT BYTE_COUNT FROM " + LOBS + " WHERE ID = ?");
prep.setLong(1, lob);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob: "+ lob).getSQLException();
}
remainingBytes = rs.getLong(1);
rs.close();
} catch (SQLException e) {
throw DbException.convertToIOException(e);
}
} else {
remainingBytes = byteCount;
}
} }
public int read() throws IOException { public int read() throws IOException {
...@@ -257,27 +260,7 @@ public class LobStorage { ...@@ -257,27 +260,7 @@ public class LobStorage {
return; return;
} }
try { try {
if (prepSelect == null) { buffer = readBlock(lob, seq++);
prepSelect = conn.prepareStatement(
"SELECT COMPRESSED, DATA FROM " + LOB_MAP + " M " +
"INNER JOIN " + LOB_DATA + " D ON M.BLOCK = D.BLOCK " +
"WHERE M.LOB = ? AND M.SEQ = ?");
}
prepSelect.setLong(1, lob);
prepSelect.setInt(2, seq);
ResultSet rs = prepSelect.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry: "+ lob + "/" + seq).getSQLException();
}
seq++;
int compressed = rs.getInt(1);
buffer = rs.getBytes(2);
if (compressed != 0) {
if (compress == null) {
compress = CompressTool.getInstance();
}
buffer = compress.expand(buffer);
}
pos = 0; pos = 0;
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convertToIOException(e); throw DbException.convertToIOException(e);
...@@ -286,7 +269,12 @@ public class LobStorage { ...@@ -286,7 +269,12 @@ public class LobStorage {
} }
private synchronized PreparedStatement prepare(String sql) throws SQLException { private PreparedStatement prepare(String sql) throws SQLException {
if (SysProperties.CHECK2) {
if (!Thread.holdsLock(handler)) {
throw DbException.throwInternalError();
}
}
PreparedStatement prep = prepared.remove(sql); PreparedStatement prep = prepared.remove(sql);
if (prep == null) { if (prep == null) {
prep = conn.prepareStatement(sql); prep = conn.prepareStatement(sql);
...@@ -294,45 +282,52 @@ public class LobStorage { ...@@ -294,45 +282,52 @@ public class LobStorage {
return prep; return prep;
} }
private synchronized void reuse(String sql, PreparedStatement prep) { private void reuse(String sql, PreparedStatement prep) {
if (SysProperties.CHECK2) {
if (!Thread.holdsLock(handler)) {
throw DbException.throwInternalError();
}
}
prepared.put(sql, prep); prepared.put(sql, prep);
} }
private synchronized void deleteLob(long lob) throws SQLException { private void deleteLob(long lob) throws SQLException {
String sql = "SELECT BLOCK, HASH FROM " + LOB_MAP + " D WHERE D.LOB = ? " + synchronized (handler) {
"AND NOT EXISTS(SELECT 1 FROM " + LOB_MAP + " O " + String sql = "SELECT BLOCK, HASH FROM " + LOB_MAP + " D WHERE D.LOB = ? " +
"WHERE O.BLOCK = D.BLOCK AND O.LOB <> ?)"; "AND NOT EXISTS(SELECT 1 FROM " + LOB_MAP + " O " +
PreparedStatement prep = prepare(sql); "WHERE O.BLOCK = D.BLOCK AND O.LOB <> ?)";
prep.setLong(1, lob); PreparedStatement prep = prepare(sql);
prep.setLong(2, lob); prep.setLong(1, lob);
ResultSet rs = prep.executeQuery(); prep.setLong(2, lob);
ArrayList<Long> blocks = New.arrayList(); ResultSet rs = prep.executeQuery();
while (rs.next()) { ArrayList<Long> blocks = New.arrayList();
blocks.add(rs.getLong(1)); while (rs.next()) {
int hash = rs.getInt(2); blocks.add(rs.getLong(1));
setHashCacheBlock(hash, -1); int hash = rs.getInt(2);
} setHashCacheBlock(hash, -1);
reuse(sql, prep); }
reuse(sql, prep);
sql = "DELETE FROM " + LOB_MAP + " WHERE LOB = ?"; sql = "DELETE FROM " + LOB_MAP + " WHERE LOB = ?";
prep = prepare(sql); prep = prepare(sql);
prep.setLong(1, lob); prep.setLong(1, lob);
prep.execute(); prep.execute();
reuse(sql, prep); reuse(sql, prep);
sql = "DELETE FROM " + LOB_DATA + " WHERE BLOCK = ?"; sql = "DELETE FROM " + LOB_DATA + " WHERE BLOCK = ?";
prep = prepare(sql); prep = prepare(sql);
for (long block : blocks) { for (long block : blocks) {
prep.setLong(1, block); prep.setLong(1, block);
prep.execute();
}
reuse(sql, prep);
sql = "DELETE FROM " + LOBS + " WHERE ID = ?";
prep = prepare(sql);
prep.setLong(1, lob);
prep.execute(); prep.execute();
reuse(sql, prep);
} }
reuse(sql, prep);
sql = "DELETE FROM " + LOBS + " WHERE ID = ?";
prep = prepare(sql);
prep.setLong(1, lob);
prep.execute();
reuse(sql, prep);
} }
/** /**
...@@ -344,7 +339,24 @@ public class LobStorage { ...@@ -344,7 +339,24 @@ public class LobStorage {
*/ */
public InputStream getInputStream(long lobId, long byteCount) throws IOException { public InputStream getInputStream(long lobId, long byteCount) throws IOException {
init(); init();
return new LobInputStream(conn, lobId, byteCount); if (byteCount == -1) {
synchronized (handler) {
try {
String sql = "SELECT BYTE_COUNT FROM " + LOBS + " WHERE ID = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob: "+ lobId).getSQLException();
}
byteCount = rs.getLong(1);
reuse(sql, prep);
} catch (SQLException e) {
throw DbException.convertToIOException(e);
}
}
}
return new LobInputStream(lobId, byteCount);
} }
private ValueLobDb addLob(InputStream in, long maxLength, int type) { private ValueLobDb addLob(InputStream in, long maxLength, int type) {
...@@ -378,7 +390,7 @@ public class LobStorage { ...@@ -378,7 +390,7 @@ public class LobStorage {
small = b; small = b;
break; break;
} }
synchronized (this) { synchronized (handler) {
if (seq == 0) { if (seq == 0) {
lobId = getNextLobId(); lobId = getNextLobId();
} }
...@@ -407,18 +419,20 @@ public class LobStorage { ...@@ -407,18 +419,20 @@ public class LobStorage {
} }
private ValueLobDb registerLob(int type, long lobId, int tableId, long byteCount) { private ValueLobDb registerLob(int type, long lobId, int tableId, long byteCount) {
try { synchronized (handler) {
String sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) VALUES(?, ?, ?)"; try {
PreparedStatement prep = prepare(sql); String sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) VALUES(?, ?, ?)";
prep.setLong(1, lobId); PreparedStatement prep = prepare(sql);
prep.setLong(2, byteCount); prep.setLong(1, lobId);
prep.setInt(3, tableId); prep.setLong(2, byteCount);
prep.execute(); prep.setInt(3, tableId);
reuse(sql, prep); prep.execute();
ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, byteCount); reuse(sql, prep);
return v; ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, byteCount);
} catch (SQLException e) { return v;
throw DbException.convert(e); } catch (SQLException e) {
throw DbException.convert(e);
}
} }
} }
...@@ -431,31 +445,33 @@ public class LobStorage { ...@@ -431,31 +445,33 @@ public class LobStorage {
* @param length the length * @param length the length
* @return the new lob * @return the new lob
*/ */
public synchronized ValueLobDb copyLob(int type, long oldLobId, int tableId, long length) { public ValueLobDb copyLob(int type, long oldLobId, int tableId, long length) {
try { synchronized (handler) {
init(); try {
long lobId = getNextLobId(); init();
String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) " + long lobId = getNextLobId();
"SELECT ?, SEQ, HASH, BLOCK FROM " + LOB_MAP + " WHERE LOB = ?"; String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) " +
PreparedStatement prep = prepare(sql); "SELECT ?, SEQ, HASH, BLOCK FROM " + LOB_MAP + " WHERE LOB = ?";
prep.setLong(1, lobId); PreparedStatement prep = prepare(sql);
prep.setLong(2, oldLobId); prep.setLong(1, lobId);
prep.executeUpdate(); prep.setLong(2, oldLobId);
reuse(sql, prep); prep.executeUpdate();
reuse(sql, prep);
sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) " + sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) " +
"SELECT ?, BYTE_COUNT, ? FROM " + LOBS + " WHERE ID = ?"; "SELECT ?, BYTE_COUNT, ? FROM " + LOBS + " WHERE ID = ?";
prep = prepare(sql); prep = prepare(sql);
prep.setLong(1, lobId); prep.setLong(1, lobId);
prep.setLong(2, tableId); prep.setLong(2, tableId);
prep.setLong(3, oldLobId); prep.setLong(3, oldLobId);
prep.executeUpdate(); prep.executeUpdate();
reuse(sql, prep); reuse(sql, prep);
ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, length); ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, length);
return v; return v;
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convert(e); throw DbException.convert(e);
}
} }
} }
...@@ -486,48 +502,50 @@ public class LobStorage { ...@@ -486,48 +502,50 @@ public class LobStorage {
* @param b the data * @param b the data
* @param compressAlgorithm the compression algorithm (may be null) * @param compressAlgorithm the compression algorithm (may be null)
*/ */
synchronized void storeBlock(long lobId, int seq, byte[] b, String compressAlgorithm) throws SQLException { void storeBlock(long lobId, int seq, byte[] b, String compressAlgorithm) throws SQLException {
long block; long block;
boolean blockExists = false; boolean blockExists = false;
if (compressAlgorithm != null) { if (compressAlgorithm != null) {
b = compress.compress(b, compressAlgorithm); b = compress.compress(b, compressAlgorithm);
} }
int hash = Arrays.hashCode(b); int hash = Arrays.hashCode(b);
block = getHashCacheBlock(hash); synchronized (handler) {
if (block != -1) { block = getHashCacheBlock(hash);
String sql = "SELECT COMPRESSED, DATA FROM " + LOB_DATA + if (block != -1) {
" WHERE BLOCK = ?"; String sql = "SELECT COMPRESSED, DATA FROM " + LOB_DATA +
PreparedStatement prep = prepare(sql); " WHERE BLOCK = ?";
prep.setLong(1, block); PreparedStatement prep = prepare(sql);
ResultSet rs = prep.executeQuery(); prep.setLong(1, block);
if (rs.next()) { ResultSet rs = prep.executeQuery();
boolean compressed = rs.getInt(1) != 0; if (rs.next()) {
byte[] compare = rs.getBytes(2); boolean compressed = rs.getInt(1) != 0;
if (compressed == (compressAlgorithm != null) && Arrays.equals(b, compare)) { byte[] compare = rs.getBytes(2);
blockExists = true; if (compressed == (compressAlgorithm != null) && Arrays.equals(b, compare)) {
blockExists = true;
}
} }
reuse(sql, prep);
} }
reuse(sql, prep); if (!blockExists) {
} block = nextBlock++;
if (!blockExists) { setHashCacheBlock(hash, block);
block = nextBlock++; String sql = "INSERT INTO " + LOB_DATA + "(BLOCK, COMPRESSED, DATA) VALUES(?, ?, ?)";
setHashCacheBlock(hash, block); PreparedStatement prep = prepare(sql);
String sql = "INSERT INTO " + LOB_DATA + "(BLOCK, COMPRESSED, DATA) VALUES(?, ?, ?)"; prep.setLong(1, block);
prep.setInt(2, compressAlgorithm == null ? 0 : 1);
prep.setBytes(3, b);
prep.execute();
reuse(sql, prep);
}
String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) VALUES(?, ?, ?, ?)";
PreparedStatement prep = prepare(sql); PreparedStatement prep = prepare(sql);
prep.setLong(1, block); prep.setLong(1, lobId);
prep.setInt(2, compressAlgorithm == null ? 0 : 1); prep.setInt(2, seq);
prep.setBytes(3, b); prep.setLong(3, hash);
prep.setLong(4, block);
prep.execute(); prep.execute();
reuse(sql, prep); reuse(sql, prep);
} }
String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) VALUES(?, ?, ?, ?)";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
prep.setInt(2, seq);
prep.setLong(3, hash);
prep.setLong(4, block);
prep.execute();
reuse(sql, prep);
} }
/** /**
...@@ -651,16 +669,18 @@ public class LobStorage { ...@@ -651,16 +669,18 @@ public class LobStorage {
* @param table the table * @param table the table
*/ */
public void setTable(long lobId, int table) { public void setTable(long lobId, int table) {
try { synchronized (handler) {
init(); try {
String sql = "UPDATE " + LOBS + " SET TABLE = ? WHERE ID = ?"; init();
PreparedStatement prep = prepare(sql); String sql = "UPDATE " + LOBS + " SET TABLE = ? WHERE ID = ?";
prep.setInt(1, table); PreparedStatement prep = prepare(sql);
prep.setLong(2, lobId); prep.setInt(1, table);
prep.executeUpdate(); prep.setLong(2, lobId);
reuse(sql, prep); prep.executeUpdate();
} catch (SQLException e) { reuse(sql, prep);
throw DbException.convert(e); } catch (SQLException e) {
throw DbException.convert(e);
}
} }
} }
......
...@@ -54,6 +54,7 @@ public class TestLob extends TestBase { ...@@ -54,6 +54,7 @@ public class TestLob extends TestBase {
} }
public void test() throws Exception { public void test() throws Exception {
testDeadlock();
testCopyManyLobs(); testCopyManyLobs();
testCopyLob(); testCopyLob();
testConcurrentCreate(); testConcurrentCreate();
...@@ -95,6 +96,41 @@ public class TestLob extends TestBase { ...@@ -95,6 +96,41 @@ public class TestLob extends TestBase {
IOUtils.deleteRecursive(TEMP_DIR, true); IOUtils.deleteRecursive(TEMP_DIR, true);
} }
/**
* Test for issue 315: Java Level Deadlock on Database & Session Objects
*/
private void testDeadlock() throws Exception {
deleteDb("lob");
Connection conn = getConnection("lob");
Statement stat = conn.createStatement();
stat.execute("create table test(id int primary key, name clob)");
stat.execute("insert into test select x, space(10000) from system_range(1, 3)");
final Connection conn2 = getConnection("lob");
Task task = new Task() {
public void call() throws Exception {
Statement stat = conn2.createStatement();
stat.setFetchSize(1);
for (int i = 0; !stop; i++) {
ResultSet rs = stat.executeQuery("select * from test where id > -" + i);
while (rs.next()) {
// ignore
}
}
}
};
task.execute();
stat.execute("create table test2(id int primary key, name clob)");
for (int i = 0; i < 1000; i++) {
stat.execute("delete from test2");
stat.execute("insert into test2 values(1, space(10000 + " + i + "))");
}
task.get();
conn.close();
conn2.close();
}
private void testCopyManyLobs() throws Exception { private void testCopyManyLobs() throws Exception {
deleteDb("lob"); deleteDb("lob");
Connection conn = getConnection("lob"); Connection conn = getConnection("lob");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论