提交 181401fb authored 作者: Thomas Mueller's avatar Thomas Mueller

New LOB storage.

上级 dd4594d2
......@@ -18,11 +18,15 @@ import java.util.Arrays;
import java.util.HashMap;
import org.h2.constant.ErrorCode;
import org.h2.constant.SysProperties;
import org.h2.engine.Constants;
import org.h2.message.DbException;
import org.h2.util.IOUtils;
import org.h2.util.New;
import org.h2.value.ValueLob;
/**
* This class stores LOB objects in the database.
*/
public class LobStorage {
/**
......@@ -30,8 +34,6 @@ public class LobStorage {
*/
public static final int TABLE_ID_SESSION_VARIABLE = -1;
// TODO test recovery
private static final String LOBS = "INFORMATION_SCHEMA.LOBS";
private static final String LOB_MAP = "INFORMATION_SCHEMA.LOB_MAP";
private static final String LOB_DATA = "INFORMATION_SCHEMA.LOB_DATA";
......@@ -44,6 +46,37 @@ public class LobStorage {
private long nextLob;
private long nextBlock;
public LobStorage(Connection newConn) {
try {
this.conn = newConn;
Statement stat = conn.createStatement();
// stat.execute("SET UNDO_LOG 0");
// stat.execute("SET REDO_LOG_BINARY 0");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOBS + "(ID BIGINT PRIMARY KEY, LENGTH BIGINT, TABLE INT)");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_MAP + "(LOB BIGINT, SEQ INT, BLOCK BIGINT, PRIMARY KEY(LOB, SEQ))");
stat.execute("CREATE INDEX INFORMATION_SCHEMA.INDEX_LOB_MAP_DATA_LOB ON " + LOB_MAP + "(BLOCK, LOB)");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_DATA + "(BLOCK BIGINT PRIMARY KEY, DATA BINARY)");
ResultSet rs;
rs = stat.executeQuery("SELECT MAX(BLOCK) FROM " + LOB_DATA);
rs.next();
nextBlock = rs.getLong(1) + 1;
if (HASH) {
nextBlock = Math.max(UNIQUE + 1, nextLob);
}
rs = stat.executeQuery("SELECT MAX(ID) FROM " + LOBS);
rs.next();
nextLob = rs.getLong(1) + 1;
} catch (SQLException e) {
throw DbException.convert(e);
}
}
/**
* Remove all LOBs for this table.
*
* @param handler the data handler
* @param tableId the table id
*/
public static void removeAllForTable(DataHandler handler, int tableId) {
if (SysProperties.LOB_IN_DATABASE) {
// remove both lobs in the database as well as in the file system
......@@ -51,6 +84,13 @@ public class LobStorage {
ValueLob.removeAllForTable(handler, tableId);
}
/**
* Create a LOB object that fits in memory.
*
* @param type the value type
* @param small the byte array
* @return the LOB
*/
public static ValueLob createSmallLob(int type, byte[] small) {
if (SysProperties.LOB_IN_DATABASE) {
return null;
......@@ -58,18 +98,35 @@ public class LobStorage {
return ValueLob.createSmallLob(type, small);
}
public static ValueLob createBlob(InputStream in, long length, DataHandler handler) {
/**
* Create a BLOB object.
*
* @param in the input stream
* @param maxLength the maximum length (-1 if not known)
* @param handler the data handler
* @return the LOB
*/
public static ValueLob createBlob(InputStream in, long maxLength, DataHandler handler) {
if (SysProperties.LOB_IN_DATABASE) {
return null;
handler.getLobStorage().addLob(in, maxLength, LobStorage.TABLE_ID_SESSION_VARIABLE);
}
return ValueLob.createBlob(in, length, handler);
return ValueLob.createBlob(in, maxLength, handler);
}
public static ValueLob createClob(Reader in, long length, DataHandler handler) {
/**
* Create a CLOB object.
*
* @param reader the reader
* @param maxLength the maximum length (-1 if not known)
* @param handler the data handler
* @return the LOB
*/
public static ValueLob createClob(Reader reader, long maxLength, DataHandler handler) {
if (SysProperties.LOB_IN_DATABASE) {
return null;
CountingReaderInputStream in = new CountingReaderInputStream(reader);
handler.getLobStorage().addLob(in, maxLength, LobStorage.TABLE_ID_SESSION_VARIABLE);
}
return ValueLob.createClob(in, length, handler);
return ValueLob.createClob(reader, maxLength, handler);
}
/**
......@@ -169,32 +226,7 @@ public class LobStorage {
}
public LobStorage(Connection newConn) {
try {
this.conn = newConn;
Statement stat = conn.createStatement();
// stat.execute("SET UNDO_LOG 0");
// stat.execute("SET REDO_LOG_BINARY 0");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOBS + "(ID BIGINT PRIMARY KEY, LENGTH BIGINT, TABLE INT)");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_MAP + "(LOB BIGINT, SEQ INT, BLOCK BIGINT, PRIMARY KEY(LOB, SEQ))");
stat.execute("CREATE INDEX INFORMATION_SCHEMA.INDEX_LOB_MAP_DATA_LOB ON " + LOB_MAP + "(BLOCK, LOB)");
stat.execute("CREATE TABLE IF NOT EXISTS " + LOB_DATA + "(BLOCK BIGINT PRIMARY KEY, DATA BINARY)");
ResultSet rs;
rs = stat.executeQuery("SELECT MAX(BLOCK) FROM " + LOB_DATA);
rs.next();
nextBlock = rs.getLong(1) + 1;
if (HASH) {
nextBlock = Math.max(UNIQUE + 1, nextLob);
}
rs = stat.executeQuery("SELECT MAX(ID) FROM " + LOBS);
rs.next();
nextLob = rs.getLong(1) + 1;
} catch (SQLException e) {
throw DbException.convert(e);
}
}
protected synchronized PreparedStatement prepare(String sql) throws SQLException {
private synchronized PreparedStatement prepare(String sql) throws SQLException {
PreparedStatement prep = prepared.get(sql);
if (prep == null) {
prep = conn.prepareStatement(sql);
......@@ -225,7 +257,15 @@ public class LobStorage {
prep.execute();
}
public long addLob(InputStream in, long maxLength, int table) throws SQLException {
/**
* Store the LOB in the database.
*
* @param in the input stream
* @param maxLength the maximum length (-1 for unknown)
* @param table the table
* @return the LOB id
*/
public long addLob(InputStream in, long maxLength, int table) {
byte[] buff = new byte[BLOCK_LENGTH];
if (maxLength < 0) {
maxLength = Long.MAX_VALUE;
......@@ -233,71 +273,90 @@ public class LobStorage {
long length = 0;
long lob = nextLob++;
try {
for (int seq = 0; maxLength > 0; seq++) {
int len = IOUtils.readFully(in, buff, 0, BLOCK_LENGTH);
if (len <= 0) {
break;
}
length += len;
maxLength -= len;
try {
for (int seq = 0; maxLength > 0; seq++) {
int len = IOUtils.readFully(in, buff, 0, BLOCK_LENGTH);
if (len <= 0) {
break;
}
length += len;
maxLength -= len;
byte[] b;
if (len != buff.length) {
b = new byte[len];
System.arraycopy(buff, 0, b, 0, len);
} else {
b = buff;
}
long block;
boolean blockExists = false;
if (HASH) {
block = Arrays.hashCode(b) & UNIQUE;
int todoSynchronize;
PreparedStatement prep = prepare(
"SELECT DATA FROM " + LOB_DATA +
" WHERE BLOCK = ?");
prep.setLong(1, block);
ResultSet rs = prep.executeQuery();
if (rs.next()) {
byte[] compare = rs.getBytes(1);
if (Arrays.equals(b, compare)) {
blockExists = true;
} else {
block = nextBlock++;
byte[] b;
if (len != buff.length) {
b = new byte[len];
System.arraycopy(buff, 0, b, 0, len);
} else {
b = buff;
}
long block;
boolean blockExists = false;
if (HASH) {
block = Arrays.hashCode(b) & UNIQUE;
int todoSynchronize;
PreparedStatement prep = prepare(
"SELECT DATA FROM " + LOB_DATA +
" WHERE BLOCK = ?");
prep.setLong(1, block);
ResultSet rs = prep.executeQuery();
if (rs.next()) {
byte[] compare = rs.getBytes(1);
if (Arrays.equals(b, compare)) {
blockExists = true;
} else {
block = nextBlock++;
}
}
} else {
block = nextBlock++;
}
if (!blockExists) {
PreparedStatement prep = prepare(
"INSERT INTO " + LOB_DATA + "(BLOCK, DATA) VALUES(?, ?)");
prep.setLong(1, block);
prep.setBytes(2, b);
prep.execute();
}
} else {
block = nextBlock++;
}
if (!blockExists) {
PreparedStatement prep = prepare(
"INSERT INTO " + LOB_DATA + "(BLOCK, DATA) VALUES(?, ?)");
prep.setLong(1, block);
prep.setBytes(2, b);
"INSERT INTO " + LOB_MAP + "(LOB, SEQ, BLOCK) VALUES(?, ?, ?)");
prep.setLong(1, lob);
prep.setInt(2, seq);
prep.setLong(3, block);
prep.execute();
}
PreparedStatement prep = prepare(
"INSERT INTO " + LOB_MAP + "(LOB, SEQ, BLOCK) VALUES(?, ?, ?)");
"INSERT INTO " + LOBS + "(ID, LENGTH, TABLE) VALUES(?, ?, ?)");
prep.setLong(1, lob);
prep.setInt(2, seq);
prep.setLong(3, block);
prep.setLong(2, length);
prep.setInt(3, table);
prep.execute();
return lob;
} catch (IOException e) {
deleteLob(lob);
throw DbException.convertIOException(e, "adding blob");
}
PreparedStatement prep = prepare(
"INSERT INTO " + LOBS + "(ID, LENGTH, TABLE) VALUES(?, ?, ?)");
prep.setLong(1, lob);
prep.setLong(2, length);
prep.setInt(3, table);
prep.execute();
return lob;
} catch (IOException e) {
deleteLob(lob);
throw DbException.convertIOException(e, "adding blob");
} catch (SQLException e) {
throw DbException.convert(e);
}
}
public InputStream getInputStream(long id) throws IOException {
return new LobInputStream(conn, id);
/**
* An input stream that reads the data from a reader.
*/
static class CountingReaderInputStream extends InputStream {
private final Reader reader;
private long length;
private char[] buffer = new char[Constants.IO_BUFFER_SIZE];
CountingReaderInputStream(Reader reader) {
this.reader = reader;
}
public int read() throws IOException {
return 0;
}
}
}
......@@ -1077,6 +1077,8 @@ public class Recover extends Tool implements DataHandler {
}
writer.println("DROP ALIAS READ_CLOB;");
writer.println("DROP ALIAS READ_BLOB;");
writer.println("DROP ALIAS READ_CLOB_DB;");
writer.println("DROP ALIAS READ_BLOB_DB;");
for (MetaRecord m : schema) {
String sql = m.getSQL();
// everything except create
......
......@@ -94,6 +94,7 @@ public class Build extends BuildBase {
* Run the Emma code coverage.
*/
public void coverage() {
downloadTest();
download("ext/emma-2.0.5312.jar",
"http://repo2.maven.org/maven2/emma/emma/2.0.5312/emma-2.0.5312.jar",
"30a40933caf67d88d9e75957950ccf353b181ab7");
......@@ -225,6 +226,13 @@ public class Build extends BuildBase {
"3006beb1ca6a83449def6127dad3c060148a0209");
}
private void downloadTest() {
// for TestOldVersion
download("ext/h2-1.2.127.jar",
"http://repo1.maven.org/maven2/com/h2database/h2/1.2.127/h2-1.2.127.jar",
"056e784c7cf009483366ab9cd8d21d02fe47031a");
}
private String getVersion() {
return getStaticValue("org.h2.engine.Constants", "getVersion");
}
......@@ -532,10 +540,7 @@ public class Build extends BuildBase {
* Compile and run all tests.
*/
public void test() {
// for TestOldVersion
download("ext/h2-1.2.127.jar",
"http://repo1.maven.org/maven2/com/h2database/h2/1.2.127/h2-1.2.127.jar",
"056e784c7cf009483366ab9cd8d21d02fe47031a");
downloadTest();
compile();
java("org.h2.test.TestAll", null);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论