提交 fdb4eb02 authored 作者: noelgrandin's avatar noelgrandin

fix deadlock in concurrent LOB update

上级 3c754151
......@@ -18,7 +18,7 @@ Change Log
<h1>Change Log</h1>
<h2>Next Version (unreleased)</h2>
<ul><li>-
<ul><li>Fix deadlock when updating LOB's concurrently. See TestLob#testDeadlock2()
</li></ul>
<h2>Version 1.3.172 (2013-05-25)</h2>
......
......@@ -2356,7 +2356,7 @@ public class Database implements DataHandler {
}
@Override
public Connection getLobConnection() {
public JdbcConnection getLobConnection() {
String url = Constants.CONN_URL_INTERNAL;
JdbcConnection conn = new JdbcConnection(systemSession, systemUser.getName(), url);
conn.setTraceLevel(TraceSystem.OFF);
......
......@@ -9,7 +9,6 @@ package org.h2.store;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
......@@ -17,11 +16,11 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import org.h2.constant.ErrorCode;
import org.h2.constant.SysProperties;
import org.h2.engine.Constants;
import org.h2.engine.Database;
import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
import org.h2.tools.CompressTool;
import org.h2.util.IOUtils;
......@@ -60,7 +59,7 @@ public class LobStorageBackend implements LobStorageInterface {
*/
private static final int HASH_CACHE_SIZE = 4 * 1024;
private Connection conn;
private JdbcConnection conn;
private final HashMap<String, PreparedStatement> prepared = New.hashMap();
private long nextBlock;
private final CompressTool compress = CompressTool.getInstance();
......@@ -191,6 +190,8 @@ public class LobStorageBackend implements LobStorageInterface {
* @return the block (expanded if stored compressed)
*/
byte[] readBlock(long lob, int seq) throws SQLException {
// we have to take the lock on the session before the lock on the database to prevent ABBA deadlocks
synchronized (conn.getSession()) {
synchronized (database) {
String sql = "SELECT COMPRESSED, DATA FROM " + LOB_MAP + " M " +
"INNER JOIN " + LOB_DATA + " D ON M.BLOCK = D.BLOCK " +
......@@ -211,6 +212,7 @@ public class LobStorageBackend implements LobStorageInterface {
return buffer;
}
}
}
/**
* Retrieve the sequence id and position that is smaller than the requested
......@@ -223,6 +225,7 @@ public class LobStorageBackend implements LobStorageInterface {
* the sequence, and the offset
*/
long[] skipBuffer(long lob, long pos) throws SQLException {
synchronized (conn.getSession()) {
synchronized (database) {
String sql = "SELECT MAX(SEQ), MAX(POS) FROM " + LOB_MAP +
" WHERE LOB = ? AND POS < ?";
......@@ -237,7 +240,8 @@ public class LobStorageBackend implements LobStorageInterface {
rs.close();
reuse(sql, prep);
// upgraded: offset not set
return wasNull ? null : new long[]{seq, pos};
return wasNull ? null : new long[] { seq, pos };
}
}
}
......@@ -266,6 +270,7 @@ public class LobStorageBackend implements LobStorageInterface {
@Override
public void removeLob(long lob) {
try {
synchronized (conn.getSession()) {
synchronized (database) {
String sql = "SELECT BLOCK, HASH FROM " + LOB_MAP + " D WHERE D.LOB = ? " +
"AND NOT EXISTS(SELECT 1 FROM " + LOB_MAP + " O " +
......@@ -302,6 +307,7 @@ public class LobStorageBackend implements LobStorageInterface {
prep.execute();
reuse(sql, prep);
}
}
} catch (SQLException e) {
throw DbException.convert(e);
}
......@@ -311,6 +317,7 @@ public class LobStorageBackend implements LobStorageInterface {
public InputStream getInputStream(long lobId, byte[] hmac, long byteCount) throws IOException {
init();
if (byteCount == -1) {
synchronized (conn.getSession()) {
synchronized (database) {
try {
String sql = "SELECT BYTE_COUNT FROM " + LOBS + " WHERE ID = ?";
......@@ -318,7 +325,7 @@ public class LobStorageBackend implements LobStorageInterface {
prep.setLong(1, lobId);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob: "+ lobId).getSQLException();
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob: " + lobId).getSQLException();
}
byteCount = rs.getLong(1);
reuse(sql, prep);
......@@ -327,6 +334,7 @@ public class LobStorageBackend implements LobStorageInterface {
}
}
}
}
return new LobInputStream(lobId, byteCount);
}
......@@ -360,12 +368,14 @@ public class LobStorageBackend implements LobStorageInterface {
small = b;
break;
}
synchronized (conn.getSession()) {
synchronized (database) {
if (seq == 0) {
lobId = getNextLobId();
}
storeBlock(lobId, seq, length, b, compressAlgorithm);
}
}
length += len;
}
if (lobId == -1 && small == null) {
......@@ -390,6 +400,7 @@ public class LobStorageBackend implements LobStorageInterface {
}
private ValueLobDb registerLob(int type, long lobId, int tableId, long byteCount) {
synchronized (conn.getSession()) {
synchronized (database) {
try {
String sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) VALUES(?, ?, ?)";
......@@ -406,9 +417,11 @@ public class LobStorageBackend implements LobStorageInterface {
}
}
}
}
@Override
public ValueLobDb copyLob(int type, long oldLobId, int tableId, long length) {
synchronized (conn.getSession()) {
synchronized (database) {
try {
init();
......@@ -437,6 +450,7 @@ public class LobStorageBackend implements LobStorageInterface {
}
}
}
}
private long getHashCacheBlock(int hash) {
if (HASH_CACHE_SIZE > 0) {
......@@ -481,6 +495,7 @@ public class LobStorageBackend implements LobStorageInterface {
b = compress.compress(b, compressAlgorithm);
}
int hash = Arrays.hashCode(b);
synchronized (conn.getSession()) {
synchronized (database) {
block = getHashCacheBlock(hash);
if (block != -1) {
......@@ -520,6 +535,7 @@ public class LobStorageBackend implements LobStorageInterface {
reuse(sql, prep);
}
}
}
@Override
public Value createBlob(InputStream in, long maxLength) {
......@@ -545,6 +561,7 @@ public class LobStorageBackend implements LobStorageInterface {
@Override
public void setTable(long lobId, int table) {
synchronized (conn.getSession()) {
synchronized (database) {
try {
init();
......@@ -559,6 +576,7 @@ public class LobStorageBackend implements LobStorageInterface {
}
}
}
}
/**
* An input stream that reads from a LOB.
......
......@@ -9,6 +9,7 @@ package org.h2.test.db;
import java.io.ByteArrayInputStream;
import java.io.CharArrayReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader;
......@@ -64,6 +65,7 @@ public class TestLob extends TestBase {
testBlobInputStreamSeek(true);
testBlobInputStreamSeek(false);
testDeadlock();
testDeadlock2();
testCopyManyLobs();
testCopyLob();
testConcurrentCreate();
......@@ -275,6 +277,103 @@ public class TestLob extends TestBase {
conn2.close();
}
private final class Deadlock2Task1 extends Task {
public final Connection conn;
private Deadlock2Task1() throws SQLException {
this.conn = getDeadlock2Connection();
}
@Override
public void call() throws Exception {
Random random = new Random();
Statement stat = conn.createStatement();
char[] tmp = new char[1024];
while (!stop) {
try {
ResultSet rs = stat.executeQuery("select name from test where id = " + random.nextInt(999));
if (rs.next()) {
Reader r = rs.getClob("name").getCharacterStream();
while ( r.read(tmp) > 0) {}
r.close();
}
rs.close();
} catch (SQLException ex) {
// ignore "LOB gone away", this can happen in the presence of concurrent updates
if (ex.getErrorCode() != ErrorCode.IO_EXCEPTION_2) {
ex.printStackTrace();
}
} catch (IOException ex) {
// ignore "LOB gone away", this can happen in the presence of concurrent updates
if (!(ex.getCause() instanceof SQLException)) {
ex.printStackTrace();
}
SQLException ex2 = (SQLException) ex.getCause();
if (ex2.getErrorCode() != 90028) {
ex.printStackTrace();
}
}
}
}
}
private final class Deadlock2Task2 extends Task {
public final Connection conn;
private Deadlock2Task2() throws SQLException {
this.conn = getDeadlock2Connection();
}
@Override
public void call() throws Exception {
Random random = new Random();
Statement stat = conn.createStatement();
while (!stop) {
stat.execute("update test set counter = " + random.nextInt(10) + " where id = " + random.nextInt(1000));
}
}
}
private void testDeadlock2() throws Exception {
deleteDb("lob");
Connection conn = getDeadlock2Connection();
Statement stat = conn.createStatement();
stat.execute("create cached table test(id int not null identity, name clob, counter int)");
stat.execute("insert into test(id, name) select x, space(100000) from system_range(1, 1000)");
Deadlock2Task1 task1 = new Deadlock2Task1();
Deadlock2Task1 task2 = new Deadlock2Task1();
Deadlock2Task1 task3 = new Deadlock2Task1();
Deadlock2Task1 task4 = new Deadlock2Task1();
Deadlock2Task2 task5 = new Deadlock2Task2();
Deadlock2Task2 task6 = new Deadlock2Task2();
task1.execute();
task2.execute();
task3.execute();
task4.execute();
task5.execute();
task6.execute();
for (int i = 0; i < 1000; i++) {
stat.execute("insert into test values(null, space(10000 + " + i + "), 1)");
}
task1.get();
task1.conn.close();
task2.get();
task2.conn.close();
task3.get();
task3.conn.close();
task4.get();
task4.conn.close();
task5.get();
task5.conn.close();
task6.get();
task6.conn.close();
conn.close();
}
private Connection getDeadlock2Connection() throws SQLException {
return getConnection("lob;MULTI_THREADED=TRUE;LOCK_TIMEOUT=60000");
}
private void testCopyManyLobs() throws Exception {
deleteDb("lob");
Connection conn = getConnection("lob");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论