提交 7b675bd8 authored 作者: Thomas Mueller's avatar Thomas Mueller

BLOB: InputStream.skip is now more efficient in embedded mode.

上级 fd3ad0ae
......@@ -23,6 +23,7 @@ import org.h2.engine.Constants;
import org.h2.message.DbException;
import org.h2.tools.CompressTool;
import org.h2.util.IOUtils;
import org.h2.util.MathUtils;
import org.h2.util.New;
import org.h2.util.StringUtils;
import org.h2.util.Utils;
......@@ -96,7 +97,8 @@ public class LobStorage {
// stat.execute("SET REDO_LOG_BINARY 0");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOBS + "(ID BIGINT PRIMARY KEY, BYTE_COUNT BIGINT, TABLE INT) HIDDEN");
stat.execute("CREATE INDEX IF NOT EXISTS INFORMATION_SCHEMA.INDEX_LOB_TABLE ON " + LOBS + "(TABLE)");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_MAP + "(LOB BIGINT, SEQ INT, HASH INT, BLOCK BIGINT, PRIMARY KEY(LOB, SEQ)) HIDDEN");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_MAP + "(LOB BIGINT, SEQ INT, OFFSET BIGINT, HASH INT, BLOCK BIGINT, PRIMARY KEY(LOB, SEQ)) HIDDEN");
stat.execute("ALTER TABLE " + LOB_MAP + " ADD IF NOT EXISTS OFFSET BIGINT BEFORE HASH");
stat.execute("CREATE INDEX IF NOT EXISTS INFORMATION_SCHEMA.INDEX_LOB_MAP_DATA_LOB ON " + LOB_MAP + "(BLOCK, LOB)");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_DATA + "(BLOCK BIGINT PRIMARY KEY, COMPRESSED INT, DATA BINARY) HIDDEN");
ResultSet rs;
......@@ -204,12 +206,42 @@ public class LobStorage {
}
}
/**
* Retrieve the sequence id and offset that is smaller than the requested
* offset. Those values can be used to quickly skip to a given position
* without having to read all data.
*
* @param lob the lob
* @param offset the required offset
* @return null if the data is not available, or an array of two elements:
* the sequence, and the offset
*/
long[] skipBuffer(long lob, long offset) throws SQLException {
synchronized (handler) {
String sql = "SELECT MAX(SEQ), MAX(OFFSET) FROM " + LOB_MAP +
" WHERE LOB = ? AND OFFSET < ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lob);
prep.setLong(2, offset);
ResultSet rs = prep.executeQuery();
rs.next();
int seq = rs.getInt(1);
offset = rs.getLong(2);
boolean wasNull = rs.wasNull();
rs.close();
reuse(sql, prep);
// upgraded: offset not set
return wasNull ? null : new long[]{seq, offset};
}
}
/**
* An input stream that reads from a LOB.
*/
public class LobInputStream extends InputStream {
private byte[] buffer;
private long length;
private int pos;
private long remainingBytes;
private long lob;
......@@ -218,6 +250,7 @@ public class LobStorage {
public LobInputStream(long lob, long byteCount) {
this.lob = lob;
remainingBytes = byteCount;
this.length = byteCount;
}
public int read() throws IOException {
......@@ -229,6 +262,55 @@ public class LobStorage {
return buffer[pos++] & 255;
}
public long skip(long n) throws IOException {
n -= skipSmall(n);
if (n > BLOCK_LENGTH) {
long toPos = length - remainingBytes + n;
try {
long[] seqOffset = skipBuffer(lob, toPos);
if (seqOffset == null) {
return super.skip(n);
}
seq = (int) seqOffset[0];
n = toPos - seqOffset[1];
} catch (SQLException e) {
throw DbException.convertToIOException(e);
}
pos = 0;
buffer = null;
}
fillBuffer();
n -= skipSmall(n);
return super.skip(n);
}
// public long skip(long n) throws IOException {
// n -= skipSmall(n);
// while (n > BLOCK_LENGTH) {
// try {
// n -= skipBuffer(lob, seq++);
// pos = 0;
// buffer = null;
// } catch (SQLException e) {
// throw DbException.convertToIOException(e);
// }
// }
// fillBuffer();
// n -= skipSmall(n);
// return super.skip(n);
// }
//
private int skipSmall(long n) {
if (n > 0 && buffer != null && pos < buffer.length) {
int x = MathUtils.convertLongToInt(Math.min(n, buffer.length - pos));
n -= x;
pos += x;
remainingBytes -= x;
return x;
}
return 0;
}
public int read(byte[] buff) throws IOException {
return readFully(buff, 0, buff.length);
}
......@@ -396,7 +478,6 @@ public class LobStorage {
if (len <= 0) {
break;
}
length += len;
maxLength -= len;
byte[] b;
if (len != buff.length) {
......@@ -413,8 +494,9 @@ public class LobStorage {
if (seq == 0) {
lobId = getNextLobId();
}
storeBlock(lobId, seq, b, compressAlgorithm);
storeBlock(lobId, seq, length, b, compressAlgorithm);
}
length += len;
}
if (lobId == -1 && small == null) {
// zero length
......@@ -469,8 +551,10 @@ public class LobStorage {
try {
init();
long lobId = getNextLobId();
String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) " +
"SELECT ?, SEQ, HASH, BLOCK FROM " + LOB_MAP + " WHERE LOB = ?";
String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, OFFSET, HASH, BLOCK) " +
"SELECT ?, SEQ, OFFSET, HASH, BLOCK FROM " + LOB_MAP + " WHERE LOB = ?";
// String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, LEN, HASH, BLOCK) " +
// "SELECT ?, SEQ, LEN, HASH, BLOCK FROM " + LOB_MAP + " WHERE LOB = ?";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
prep.setLong(2, oldLobId);
......@@ -518,11 +602,13 @@ public class LobStorage {
*
* @param lobId the lob id
* @param seq the sequence number
* @param offset the offset within the lob
* @param b the data
* @param compressAlgorithm the compression algorithm (may be null)
*/
void storeBlock(long lobId, int seq, byte[] b, String compressAlgorithm) throws SQLException {
void storeBlock(long lobId, int seq, long offset, byte[] b, String compressAlgorithm) throws SQLException {
long block;
// int len = b.length;
boolean blockExists = false;
if (compressAlgorithm != null) {
b = compress.compress(b, compressAlgorithm);
......@@ -556,12 +642,15 @@ public class LobStorage {
prep.execute();
reuse(sql, prep);
}
String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, HASH, BLOCK) VALUES(?, ?, ?, ?)";
String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, OFFSET, HASH, BLOCK) VALUES(?, ?, ?, ?, ?)";
// String sql = "INSERT INTO " + LOB_MAP + "(LOB, SEQ, LEN, HASH, BLOCK) VALUES(?, ?, ?, ?, ?)";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lobId);
prep.setInt(2, seq);
prep.setLong(3, hash);
prep.setLong(4, block);
// prep.setInt(3, len);
prep.setLong(3, offset);
prep.setLong(4, hash);
prep.setLong(5, block);
prep.execute();
reuse(sql, prep);
}
......
......@@ -47,13 +47,14 @@ public class TestLob extends TestBase {
* @param a ignored
*/
public static void main(String... a) throws Exception {
System.setProperty("h2.lobInDatabase", "true");
TestBase test = TestBase.createCaller().init();
// test.config.big = true;
test.test();
}
public void test() throws Exception {
testBlobInputStreamSeek(true);
testBlobInputStreamSeek(false);
testDeadlock();
testCopyManyLobs();
testCopyLob();
......@@ -96,6 +97,46 @@ public class TestLob extends TestBase {
IOUtils.deleteRecursive(TEMP_DIR, true);
}
private void testBlobInputStreamSeek(boolean upgraded) throws Exception {
deleteDb("lob");
Connection conn;
conn = getConnection("lob");
Statement stat = conn.createStatement();
stat.execute("create table test(id int primary key, data blob)");
PreparedStatement prep;
Random random = new Random();
byte[] buff = new byte[3000000];
for (int i = 0; i < 10; i++) {
prep = conn.prepareStatement("insert into test values(?, ?)");
prep.setInt(1, i);
random.setSeed(i);
random.nextBytes(buff);
prep.setBinaryStream(2, new ByteArrayInputStream(buff));
prep.execute();
}
if (upgraded) {
stat.execute("alter table information_schema.lob_map drop column offset");
conn.close();
conn = getConnection("lob");
}
prep = conn.prepareStatement("select * from test where id = ?");
for (int i = 0; i < 1; i++) {
random.setSeed(i);
random.nextBytes(buff);
for (int j = 0; j < buff.length; j += 4096) {
prep.setInt(1, i);
ResultSet rs = prep.executeQuery();
rs.next();
InputStream in = rs.getBinaryStream(2);
in.skip(j);
int t = in.read();
assertEquals(t, buff[j] & 0xff);
}
}
conn.close();
conn.close();
}
/**
* Test for issue 315: Java Level Deadlock on Database & Session Objects
*/
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论