提交 423a9972 authored 作者: Thomas Mueller's avatar Thomas Mueller

Lob in database: storing lob objects was not correctly synchronized. This was…

Lob in database: storing lob objects was not correctly synchronized. This was specially a problem when using Connection.createBlob() / createClob().
上级 b336aba3
...@@ -14,6 +14,7 @@ import java.sql.PreparedStatement; ...@@ -14,6 +14,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import org.h2.constant.ErrorCode; import org.h2.constant.ErrorCode;
...@@ -79,7 +80,7 @@ public class LobStorage { ...@@ -79,7 +80,7 @@ public class LobStorage {
/** /**
* Initialize the lob storage. * Initialize the lob storage.
*/ */
public void init() { public synchronized void init() {
if (init) { if (init) {
return; return;
} }
...@@ -107,14 +108,18 @@ public class LobStorage { ...@@ -107,14 +108,18 @@ public class LobStorage {
} }
private long getNextLobId() throws SQLException { private long getNextLobId() throws SQLException {
PreparedStatement prep = prepare("SELECT MAX(LOB) FROM " + LOB_MAP); String sql = "SELECT MAX(LOB) FROM " + LOB_MAP;
PreparedStatement prep = prepare(sql);
ResultSet rs = prep.executeQuery(); ResultSet rs = prep.executeQuery();
rs.next(); rs.next();
long x = rs.getLong(1) + 1; long x = rs.getLong(1) + 1;
prep = prepare("SELECT MAX(ID) FROM " + LOBS); reuse(sql, prep);
sql = "SELECT MAX(ID) FROM " + LOBS;
prep = prepare(sql);
rs = prep.executeQuery(); rs = prep.executeQuery();
rs.next(); rs.next();
x = Math.max(x, rs.getLong(1) + 1); x = Math.max(x, rs.getLong(1) + 1);
reuse(sql, prep);
return x; return x;
} }
...@@ -127,12 +132,14 @@ public class LobStorage { ...@@ -127,12 +132,14 @@ public class LobStorage {
if (SysProperties.LOB_IN_DATABASE) { if (SysProperties.LOB_IN_DATABASE) {
init(); init();
try { try {
PreparedStatement prep = prepare("SELECT ID FROM " + LOBS + " WHERE TABLE = ?"); String sql = "SELECT ID FROM " + LOBS + " WHERE TABLE = ?";
PreparedStatement prep = prepare(sql);
prep.setInt(1, tableId); prep.setInt(1, tableId);
ResultSet rs = prep.executeQuery(); ResultSet rs = prep.executeQuery();
while (rs.next()) { while (rs.next()) {
deleteLob(rs.getLong(1)); deleteLob(rs.getLong(1));
} }
reuse(sql, prep);
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convert(e); throw DbException.convert(e);
} }
...@@ -279,41 +286,52 @@ public class LobStorage { ...@@ -279,41 +286,52 @@ public class LobStorage {
} }
private synchronized PreparedStatement prepare(String sql) throws SQLException { private synchronized PreparedStatement prepare(String sql) throws SQLException {
PreparedStatement prep = prepared.get(sql); PreparedStatement prep = prepared.remove(sql);
if (prep == null) { if (prep == null) {
prep = conn.prepareStatement(sql); prep = conn.prepareStatement(sql);
prepared.put(sql, prep);
} }
return prep; return prep;
} }
private void deleteLob(long lob) throws SQLException { private synchronized void reuse(String sql, PreparedStatement prep) {
PreparedStatement prep; prepared.put(sql, prep);
prep = prepare( }
"SELECT BLOCK, HASH FROM " + LOB_MAP + " D WHERE D.LOB = ? " +
private synchronized void deleteLob(long lob) throws SQLException {
String sql = "SELECT BLOCK, HASH FROM " + LOB_MAP + " D WHERE D.LOB = ? " +
"AND NOT EXISTS(SELECT 1 FROM " + LOB_MAP + " O " + "AND NOT EXISTS(SELECT 1 FROM " + LOB_MAP + " O " +
"WHERE O.BLOCK = D.BLOCK AND O.LOB <> ?)"); "WHERE O.BLOCK = D.BLOCK AND O.LOB <> ?)";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lob); prep.setLong(1, lob);
prep.setLong(2, lob); prep.setLong(2, lob);
ResultSet rs = prep.executeQuery(); ResultSet rs = prep.executeQuery();
prep = prepare( ArrayList<Long> blocks = New.arrayList();
"DELETE FROM " + LOB_MAP + " " +
"WHERE LOB = ?");
prep.setLong(1, lob);
prep.execute();
while (rs.next()) { while (rs.next()) {
long block = rs.getLong(1); blocks.add(rs.getLong(1));
int hash = rs.getInt(2); int hash = rs.getInt(2);
setHashCacheBlock(hash, -1); setHashCacheBlock(hash, -1);
prep = prepare("DELETE FROM " + LOB_DATA + " WHERE BLOCK = ?"); }
reuse(sql, prep);
sql = "DELETE FROM " + LOB_MAP + " WHERE LOB = ?";
prep = prepare(sql);
prep.setLong(1, lob);
prep.execute();
reuse(sql, prep);
sql = "DELETE FROM " + LOB_DATA + " WHERE BLOCK = ?";
prep = prepare(sql);
for (long block : blocks) {
prep.setLong(1, block); prep.setLong(1, block);
prep.execute(); prep.execute();
} }
prep = prepare( reuse(sql, prep);
"DELETE FROM " + LOBS + " " +
"WHERE ID = ?"); sql = "DELETE FROM " + LOBS + " WHERE ID = ?";
prep = prepare(sql);
prep.setLong(1, lob); prep.setLong(1, lob);
prep.execute(); prep.execute();
reuse(sql, prep);
} }
/** /**
...@@ -329,17 +347,17 @@ public class LobStorage { ...@@ -329,17 +347,17 @@ public class LobStorage {
} }
private ValueLobDb addLob(InputStream in, long maxLength, int type) { private ValueLobDb addLob(InputStream in, long maxLength, int type) {
byte[] buff = new byte[BLOCK_LENGTH];
if (maxLength < 0) {
maxLength = Long.MAX_VALUE;
}
long length = 0;
long lobId;
int maxLengthInPlaceLob = handler.getMaxLengthInplaceLob();
String compressAlgorithm = handler.getLobCompressionAlgorithm(type);
try { try {
lobId = getNextLobId(); byte[] buff = new byte[BLOCK_LENGTH];
if (maxLength < 0) {
maxLength = Long.MAX_VALUE;
}
long length = 0;
long lobId = -1;
int maxLengthInPlaceLob = handler.getMaxLengthInplaceLob();
String compressAlgorithm = handler.getLobCompressionAlgorithm(type);
try { try {
byte[] small = null;
for (int seq = 0; maxLength > 0; seq++) { for (int seq = 0; maxLength > 0; seq++) {
int len = (int) Math.min(BLOCK_LENGTH, maxLength); int len = (int) Math.min(BLOCK_LENGTH, maxLength);
len = IOUtils.readFully(in, buff, 0, len); len = IOUtils.readFully(in, buff, 0, len);
...@@ -356,15 +374,30 @@ public class LobStorage { ...@@ -356,15 +374,30 @@ public class LobStorage {
b = buff; b = buff;
} }
if (seq == 0 && b.length < BLOCK_LENGTH && b.length <= maxLengthInPlaceLob) { if (seq == 0 && b.length < BLOCK_LENGTH && b.length <= maxLengthInPlaceLob) {
// CLOB: the precision will be fixed later small = b;
ValueLobDb v = ValueLobDb.createSmallLob(type, b, b.length); break;
return v;
} }
storeBlock(lobId, seq, b, compressAlgorithm); synchronized (this) {
if (seq == 0) {
lobId = getNextLobId();
}
storeBlock(lobId, seq, b, compressAlgorithm);
}
}
if (lobId == -1 && small == null) {
// zero length
small = new byte[0];
}
if (small != null) {
// CLOB: the precision will be fixed later
ValueLobDb v = ValueLobDb.createSmallLob(type, small, small.length);
return v;
} }
return registerLob(type, lobId, TABLE_TEMP, length); return registerLob(type, lobId, TABLE_TEMP, length);
} catch (IOException e) { } catch (IOException e) {
deleteLob(lobId); if (lobId != -1) {
deleteLob(lobId);
}
throw DbException.convertIOException(e, "adding blob"); throw DbException.convertIOException(e, "adding blob");
} }
} catch (SQLException e) { } catch (SQLException e) {
...@@ -374,12 +407,13 @@ public class LobStorage { ...@@ -374,12 +407,13 @@ public class LobStorage {
private ValueLobDb registerLob(int type, long lobId, int tableId, long byteCount) { private ValueLobDb registerLob(int type, long lobId, int tableId, long byteCount) {
try { try {
PreparedStatement prep = prepare( String sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) VALUES(?, ?, ?)";
"INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) VALUES(?, ?, ?)"); PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId); prep.setLong(1, lobId);
prep.setLong(2, byteCount); prep.setLong(2, byteCount);
prep.setInt(3, tableId); prep.setInt(3, tableId);
prep.execute(); prep.execute();
reuse(sql, prep);
ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, byteCount); ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, byteCount);
return v; return v;
} catch (SQLException e) { } catch (SQLException e) {
...@@ -396,23 +430,27 @@ public class LobStorage { ...@@ -396,23 +430,27 @@ public class LobStorage {
* @param length the length * @param length the length
* @return the new lob * @return the new lob
*/ */
public ValueLobDb copyLob(int type, long oldLobId, int tableId, long length) { public synchronized ValueLobDb copyLob(int type, long oldLobId, int tableId, long length) {
try { try {
init(); init();
long lobId = getNextLobId(); long lobId = getNextLobId();
PreparedStatement prep = prepare( String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) " +
"INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) " + "SELECT ?, SEQ, HASH, BLOCK FROM " + LOB_MAP + " WHERE LOB = ?";
"SELECT ?, SEQ, HASH, BLOCK FROM " + LOB_MAP + " WHERE LOB = ?"); PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId); prep.setLong(1, lobId);
prep.setLong(2, oldLobId); prep.setLong(2, oldLobId);
prep.executeUpdate(); prep.executeUpdate();
prep = prepare( reuse(sql, prep);
"INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) " +
"SELECT ?, BYTE_COUNT, ? FROM " + LOBS + " WHERE ID = ?"); sql = "INSERT INTO " + LOBS + "(ID, BYTE_COUNT, TABLE) " +
"SELECT ?, BYTE_COUNT, ? FROM " + LOBS + " WHERE ID = ?";
prep = prepare(sql);
prep.setLong(1, lobId); prep.setLong(1, lobId);
prep.setLong(2, tableId); prep.setLong(2, tableId);
prep.setLong(3, oldLobId); prep.setLong(3, oldLobId);
prep.executeUpdate(); prep.executeUpdate();
reuse(sql, prep);
ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, length); ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, length);
return v; return v;
} catch (SQLException e) { } catch (SQLException e) {
...@@ -456,9 +494,9 @@ public class LobStorage { ...@@ -456,9 +494,9 @@ public class LobStorage {
int hash = Arrays.hashCode(b); int hash = Arrays.hashCode(b);
block = getHashCacheBlock(hash); block = getHashCacheBlock(hash);
if (block != -1) { if (block != -1) {
PreparedStatement prep = prepare( String sql = "SELECT COMPRESSED, DATA FROM " + LOB_DATA +
"SELECT COMPRESSED, DATA FROM " + LOB_DATA + " WHERE BLOCK = ?";
" WHERE BLOCK = ?"); PreparedStatement prep = prepare(sql);
prep.setLong(1, block); prep.setLong(1, block);
ResultSet rs = prep.executeQuery(); ResultSet rs = prep.executeQuery();
if (rs.next()) { if (rs.next()) {
...@@ -468,24 +506,27 @@ public class LobStorage { ...@@ -468,24 +506,27 @@ public class LobStorage {
blockExists = true; blockExists = true;
} }
} }
reuse(sql, prep);
} }
if (!blockExists) { if (!blockExists) {
block = nextBlock++; block = nextBlock++;
setHashCacheBlock(hash, block); setHashCacheBlock(hash, block);
PreparedStatement prep = prepare( String sql = "INSERT INTO " + LOB_DATA + "(BLOCK, COMPRESSED, DATA) VALUES(?, ?, ?)";
"INSERT INTO " + LOB_DATA + "(BLOCK, COMPRESSED, DATA) VALUES(?, ?, ?)"); PreparedStatement prep = prepare(sql);
prep.setLong(1, block); prep.setLong(1, block);
prep.setInt(2, compressAlgorithm == null ? 0 : 1); prep.setInt(2, compressAlgorithm == null ? 0 : 1);
prep.setBytes(3, b); prep.setBytes(3, b);
prep.execute(); prep.execute();
reuse(sql, prep);
} }
PreparedStatement prep = prepare( String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) VALUES(?, ?, ?, ?)";
"INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) VALUES(?, ?, ?, ?)"); PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId); prep.setLong(1, lobId);
prep.setInt(2, seq); prep.setInt(2, seq);
prep.setLong(3, hash); prep.setLong(3, hash);
prep.setLong(4, block); prep.setLong(4, block);
prep.execute(); prep.execute();
reuse(sql, prep);
} }
/** /**
...@@ -611,10 +652,12 @@ public class LobStorage { ...@@ -611,10 +652,12 @@ public class LobStorage {
public void setTable(long lobId, int table) { public void setTable(long lobId, int table) {
try { try {
init(); init();
PreparedStatement prep = prepare("UPDATE " + LOBS + " SET TABLE = ? WHERE ID = ?"); String sql = "UPDATE " + LOBS + " SET TABLE = ? WHERE ID = ?";
PreparedStatement prep = prepare(sql);
prep.setInt(1, table); prep.setInt(1, table);
prep.setLong(2, lobId); prep.setLong(2, lobId);
prep.executeUpdate(); prep.executeUpdate();
reuse(sql, prep);
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convert(e); throw DbException.convert(e);
} }
......
...@@ -10,6 +10,7 @@ import java.io.ByteArrayInputStream; ...@@ -10,6 +10,7 @@ import java.io.ByteArrayInputStream;
import java.io.CharArrayReader; import java.io.CharArrayReader;
import java.io.File; import java.io.File;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.io.Reader; import java.io.Reader;
import java.io.StringReader; import java.io.StringReader;
import java.sql.Blob; import java.sql.Blob;
...@@ -28,6 +29,7 @@ import org.h2.constant.SysProperties; ...@@ -28,6 +29,7 @@ import org.h2.constant.SysProperties;
import org.h2.store.FileLister; import org.h2.store.FileLister;
import org.h2.test.TestBase; import org.h2.test.TestBase;
import org.h2.tools.DeleteDbFiles; import org.h2.tools.DeleteDbFiles;
import org.h2.util.Task;
import org.h2.util.Utils; import org.h2.util.Utils;
import org.h2.util.IOUtils; import org.h2.util.IOUtils;
import org.h2.util.StringUtils; import org.h2.util.StringUtils;
...@@ -51,6 +53,7 @@ public class TestLob extends TestBase { ...@@ -51,6 +53,7 @@ public class TestLob extends TestBase {
} }
public void test() throws Exception { public void test() throws Exception {
testConcurrentCreate();
testLobInLargeResult(); testLobInLargeResult();
testUniqueIndex(); testUniqueIndex();
testConvert(); testConvert();
...@@ -89,6 +92,44 @@ public class TestLob extends TestBase { ...@@ -89,6 +92,44 @@ public class TestLob extends TestBase {
IOUtils.deleteRecursive(TEMP_DIR, true); IOUtils.deleteRecursive(TEMP_DIR, true);
} }
public void testConcurrentCreate() throws Exception {
deleteDb("lob");
final Connection conn1 = getConnection("lob");
final Connection conn2 = getConnection("lob");
conn1.setAutoCommit(false);
conn2.setAutoCommit(false);
final byte[] buffer = new byte[10000];
Task task1 = new Task() {
public void call() throws Exception {
while (!stop) {
Blob b = conn1.createBlob();
OutputStream out = b.setBinaryStream(1);
out.write(buffer);
out.close();
}
}
};
Task task2 = new Task() {
public void call() throws Exception {
while (!stop) {
Blob b = conn2.createBlob();
OutputStream out = b.setBinaryStream(1);
out.write(buffer);
out.close();
}
}
};
task1.execute();
task2.execute();
Thread.sleep(1000);
task1.get();
task2.get();
conn1.close();
conn2.close();
}
private void testLobInLargeResult() throws Exception { private void testLobInLargeResult() throws Exception {
deleteDb("lob"); deleteDb("lob");
Connection conn; Connection conn;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论