提交 c04541fe authored 作者: noelgrandin's avatar noelgrandin

Fix the combination of updating a table which contains an LOB, and reading from…

Fix the combination of updating a table which contains an LOB, and reading from the LOB at the same time.
Previously it would throw an exception, now it works.
上级 473fcb41
...@@ -53,6 +53,8 @@ Change Log ...@@ -53,6 +53,8 @@ Change Log
</li><li>Add sufficient ClientInfo support to our javax.sql.Connection implementation to make WebSphere happy. </li><li>Add sufficient ClientInfo support to our javax.sql.Connection implementation to make WebSphere happy.
</li><li>Issue 482: class LobStorageBackend$LobInputStream does not override the method InputStream.available(). </li><li>Issue 482: class LobStorageBackend$LobInputStream does not override the method InputStream.available().
</li><li>Fix corruption resulting from a mix of the "WRITE_DELAY=0" option and "SELECT DISTINCT" queries </li><li>Fix corruption resulting from a mix of the "WRITE_DELAY=0" option and "SELECT DISTINCT" queries
</li><li>Fix the combination of updating a table which contains an LOB, and reading from the LOB at the same time.
Previously it would throw an exception, now it works.
</li></ul> </li></ul>
<h2>Version 1.3.172 (2013-05-25)</h2> <h2>Version 1.3.172 (2013-05-25)</h2>
......
...@@ -185,24 +185,21 @@ public class LobStorageBackend implements LobStorageInterface { ...@@ -185,24 +185,21 @@ public class LobStorageBackend implements LobStorageInterface {
/** /**
* Read a block of data from the given LOB. * Read a block of data from the given LOB.
* *
* @param lob the lob id * @param block the block number
* @param seq the block sequence number
* @return the block (expanded if stored compressed) * @return the block (expanded if stored compressed)
*/ */
byte[] readBlock(long lob, int seq) throws SQLException { byte[] readBlock(long block) throws SQLException {
// we have to take the lock on the session // we have to take the lock on the session
// before the lock on the database to prevent ABBA deadlocks // before the lock on the database to prevent ABBA deadlocks
synchronized (conn.getSession()) { synchronized (conn.getSession()) {
synchronized (database) { synchronized (database) {
String sql = "SELECT COMPRESSED, DATA FROM " + LOB_MAP + " M " + String sql = "SELECT COMPRESSED, DATA FROM " + LOB_DATA + " WHERE BLOCK = ?";
"INNER JOIN " + LOB_DATA + " D ON M.BLOCK = D.BLOCK " +
"WHERE M.LOB = ? AND M.SEQ = ?";
PreparedStatement prep = prepare(sql); PreparedStatement prep = prepare(sql);
prep.setLong(1, lob); prep.setLong(1, block);
prep.setInt(2, seq);
ResultSet rs = prep.executeQuery(); ResultSet rs = prep.executeQuery();
if (!rs.next()) { if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry: "+ lob + "/" + seq).getSQLException(); throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry, block: " + block)
.getSQLException();
} }
int compressed = rs.getInt(1); int compressed = rs.getInt(1);
byte[] buffer = rs.getBytes(2); byte[] buffer = rs.getBytes(2);
...@@ -316,28 +313,13 @@ public class LobStorageBackend implements LobStorageInterface { ...@@ -316,28 +313,13 @@ public class LobStorageBackend implements LobStorageInterface {
@Override @Override
public InputStream getInputStream(long lobId, byte[] hmac, long byteCount) throws IOException { public InputStream getInputStream(long lobId, byte[] hmac, long byteCount) throws IOException {
init();
if (byteCount == -1) {
synchronized (conn.getSession()) {
synchronized (database) {
try { try {
String sql = "SELECT BYTE_COUNT FROM " + LOBS + " WHERE ID = ?"; init();
PreparedStatement prep = prepare(sql); return new LobInputStream(lobId, byteCount);
prep.setLong(1, lobId);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob: " + lobId).getSQLException();
}
byteCount = rs.getLong(1);
reuse(sql, prep);
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convertToIOException(e); throw DbException.convertToIOException(e);
} }
} }
}
}
return new LobInputStream(lobId, byteCount);
}
private ValueLobDb addLob(InputStream in, long maxLength, int type) { private ValueLobDb addLob(InputStream in, long maxLength, int type) {
try { try {
...@@ -585,9 +567,15 @@ public class LobStorageBackend implements LobStorageInterface { ...@@ -585,9 +567,15 @@ public class LobStorageBackend implements LobStorageInterface {
public class LobInputStream extends InputStream { public class LobInputStream extends InputStream {
/** /**
* The size of the lob. * Data from the LOB_MAP table. We cache this to prevent other updates
* to the table that contains the LOB column from changing the data under us.
*/ */
private final long length; private final long[] lobMapBlocks;
/**
* index into the lobMapBlocks array.
*/
private int lobMapIndex = 0;
/** /**
* The remaining bytes in the lob. * The remaining bytes in the lob.
...@@ -602,22 +590,52 @@ public class LobStorageBackend implements LobStorageInterface { ...@@ -602,22 +590,52 @@ public class LobStorageBackend implements LobStorageInterface {
/** /**
* The position within the buffer. * The position within the buffer.
*/ */
private int pos; private int bufferPos;
/**
* The lob id.
*/
private final long lob;
/** public LobInputStream(long lobId, long byteCount) throws SQLException {
* The lob sequence id.
*/
private int seq;
public LobInputStream(long lob, long byteCount) { // we have to take the lock on the session
this.lob = lob; // before the lock on the database to prevent ABBA deadlocks
synchronized (conn.getSession()) {
synchronized (database) {
if (byteCount == -1) {
String sql = "SELECT BYTE_COUNT FROM " + LOBS + " WHERE ID = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry: " + lobId).getSQLException();
}
byteCount = rs.getLong(1);
reuse(sql, prep);
}
this.remainingBytes = byteCount; this.remainingBytes = byteCount;
this.length = byteCount;
String sql = "SELECT COUNT(*) FROM " + LOB_MAP + " WHERE LOB = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
ResultSet rs = prep.executeQuery();
if (!rs.next()) {
throw DbException.get(ErrorCode.IO_EXCEPTION_1, "Missing lob entry: " + lobId).getSQLException();
}
int lobMapCount = rs.getInt(1);
reuse(sql, prep);
this.lobMapBlocks = new long[lobMapCount];
sql = "SELECT BLOCK FROM " + LOB_MAP + " WHERE LOB = ? ORDER BY SEQ";
prep = prepare(sql);
prep.setLong(1, lobId);
rs = prep.executeQuery();
int i = 0;
while (rs.next()) {
this.lobMapBlocks[i] = rs.getLong(1);
i++;
}
reuse(sql, prep);
}
}
} }
@Override @Override
...@@ -627,29 +645,23 @@ public class LobStorageBackend implements LobStorageInterface { ...@@ -627,29 +645,23 @@ public class LobStorageBackend implements LobStorageInterface {
return -1; return -1;
} }
remainingBytes--; remainingBytes--;
return buffer[pos++] & 255; return buffer[bufferPos++] & 255;
} }
@Override @Override
public long skip(long n) throws IOException { public long skip(long n) throws IOException {
if (n <= 0) {
return 0;
}
long remaining = n; long remaining = n;
remaining -= skipSmall(remaining); remaining -= skipSmall(remaining);
if (remaining > BLOCK_LENGTH) { if (remaining > BLOCK_LENGTH) {
long toPos = length - remainingBytes + remaining; while (remaining > BLOCK_LENGTH) {
try { remaining -= BLOCK_LENGTH;
long[] seqPos = skipBuffer(lob, toPos); remainingBytes -= BLOCK_LENGTH;
if (seqPos == null) { lobMapIndex++;
remaining -= super.skip(remaining);
return n - remaining;
}
seq = (int) seqPos[0];
long p = seqPos[1];
remainingBytes = length - p;
remaining = toPos - p;
} catch (SQLException e) {
throw DbException.convertToIOException(e);
} }
pos = 0; bufferPos = 0;
buffer = null; buffer = null;
} }
fillBuffer(); fillBuffer();
...@@ -659,9 +671,9 @@ public class LobStorageBackend implements LobStorageInterface { ...@@ -659,9 +671,9 @@ public class LobStorageBackend implements LobStorageInterface {
} }
private int skipSmall(long n) { private int skipSmall(long n) {
if (n > 0 && buffer != null && pos < buffer.length) { if (buffer != null && bufferPos < buffer.length) {
int x = MathUtils.convertLongToInt(Math.min(n, buffer.length - pos)); int x = MathUtils.convertLongToInt(Math.min(n, buffer.length - bufferPos));
pos += x; bufferPos += x;
remainingBytes -= x; remainingBytes -= x;
return x; return x;
} }
...@@ -694,9 +706,9 @@ public class LobStorageBackend implements LobStorageInterface { ...@@ -694,9 +706,9 @@ public class LobStorageBackend implements LobStorageInterface {
break; break;
} }
int len = (int) Math.min(length, remainingBytes); int len = (int) Math.min(length, remainingBytes);
len = Math.min(len, buffer.length - pos); len = Math.min(len, buffer.length - bufferPos);
System.arraycopy(buffer, pos, buff, off, len); System.arraycopy(buffer, bufferPos, buff, off, len);
pos += len; bufferPos += len;
read += len; read += len;
remainingBytes -= len; remainingBytes -= len;
off += len; off += len;
...@@ -706,15 +718,16 @@ public class LobStorageBackend implements LobStorageInterface { ...@@ -706,15 +718,16 @@ public class LobStorageBackend implements LobStorageInterface {
} }
private void fillBuffer() throws IOException { private void fillBuffer() throws IOException {
if (buffer != null && pos < buffer.length) { if (buffer != null && bufferPos < buffer.length) {
return; return;
} }
if (remainingBytes <= 0) { if (remainingBytes <= 0) {
return; return;
} }
try { try {
buffer = readBlock(lob, seq++); buffer = readBlock(lobMapBlocks[lobMapIndex]);
pos = 0; lobMapIndex++;
bufferPos = 0;
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convertToIOException(e); throw DbException.convertToIOException(e);
} }
......
...@@ -80,6 +80,7 @@ public class TestLob extends TestBase { ...@@ -80,6 +80,7 @@ public class TestLob extends TestBase {
testTempFilesDeleted(false); testTempFilesDeleted(false);
testAddLobRestart(); testAddLobRestart();
testLobServerMemory(); testLobServerMemory();
testUpdatingLobRow();
if (config.memory) { if (config.memory) {
return; return;
} }
...@@ -1447,4 +1448,24 @@ public class TestLob extends TestBase { ...@@ -1447,4 +1448,24 @@ public class TestLob extends TestBase {
return new ByteArrayInputStream(buff); return new ByteArrayInputStream(buff);
} }
} /** test the combination of updating a table which contains an LOB, and reading from the LOB at the same time */
private void testUpdatingLobRow() throws Exception {
deleteDb("lob");
Connection conn = getConnection("lob");
Statement stat = conn.createStatement();
stat.execute("create table test(id int primary key, name clob, counter int)");
stat.execute("insert into test(id, name) select x, space(100000) from system_range(1, 3)");
ResultSet rs = stat.executeQuery("select name from test where id = 1");
rs.next();
Reader r = rs.getClob("name").getCharacterStream();
Random random = new Random();
char[] tmp = new char[256];
while ( r.read(tmp) > 0) {
stat.execute("update test set counter = " + random.nextInt(1000) + " where id = 1");
}
r.close();
conn.close();
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论