提交 d2b7419c authored 作者: Thomas Mueller's avatar Thomas Mueller

In the server mode, BLOB and CLOB objects are no longer closed when the result…

In the server mode, BLOB and CLOB objects are no longer closed when the result set is closed (as required by the JDBC spec).
上级 6472e58b
...@@ -18,7 +18,8 @@ Change Log ...@@ -18,7 +18,8 @@ Change Log
<h1>Change Log</h1> <h1>Change Log</h1>
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>- <ul><li>In the server mode, BLOB and CLOB objects are no longer closed when the result set is closed
(as required by the JDBC spec).
</li></ul> </li></ul>
<h2>Version 1.3.162 (2011-11-26)</h2> <h2>Version 1.3.162 (2011-11-26)</h2>
......
...@@ -233,4 +233,8 @@ abstract class ScriptBase extends Prepared implements DataHandler { ...@@ -233,4 +233,8 @@ abstract class ScriptBase extends Prepared implements DataHandler {
return null; return null;
} }
public int readLob(long lobId, long offset, byte[] buff, int off, int length) {
throw DbException.throwInternalError();
}
} }
...@@ -190,11 +190,11 @@ public class SysProperties { ...@@ -190,11 +190,11 @@ public class SysProperties {
public static final boolean LOB_IN_DATABASE = Utils.getProperty("h2.lobInDatabase", Constants.VERSION_MINOR >= 3); public static final boolean LOB_IN_DATABASE = Utils.getProperty("h2.lobInDatabase", Constants.VERSION_MINOR >= 3);
/** /**
* System property <code>h2.lobClientMaxSizeMemory</code> (default: 65536).<br /> * System property <code>h2.lobClientMaxSizeMemory</code> (default: 1048576).<br />
* The maximum size of a LOB object to keep in memory on the client side * The maximum size of a LOB object to keep in memory on the client side
* when using the server mode. * when using the server mode.
*/ */
public static final int LOB_CLIENT_MAX_SIZE_MEMORY = Utils.getProperty("h2.lobClientMaxSizeMemory", 65536); public static final int LOB_CLIENT_MAX_SIZE_MEMORY = Utils.getProperty("h2.lobClientMaxSizeMemory", 1024 * 1024);
/** /**
* System property <code>h2.maxFileRetry</code> (default: 16).<br /> * System property <code>h2.maxFileRetry</code> (default: 16).<br />
......
...@@ -66,6 +66,11 @@ public class Constants { ...@@ -66,6 +66,11 @@ public class Constants {
*/ */
public static final int TCP_PROTOCOL_VERSION_10 = 10; public static final int TCP_PROTOCOL_VERSION_10 = 10;
/**
* The TCP protocol version number 11.
*/
public static final int TCP_PROTOCOL_VERSION_11 = 11;
/** /**
* The major version of this database. * The major version of this database.
*/ */
......
...@@ -2405,4 +2405,8 @@ public class Database implements DataHandler { ...@@ -2405,4 +2405,8 @@ public class Database implements DataHandler {
return false; return false;
} }
public int readLob(long lobId, long offset, byte[] buff, int off, int length) {
throw DbException.throwInternalError();
}
} }
...@@ -56,6 +56,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -56,6 +56,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
public static final int SESSION_CHECK_KEY = 14; public static final int SESSION_CHECK_KEY = 14;
public static final int SESSION_SET_AUTOCOMMIT = 15; public static final int SESSION_SET_AUTOCOMMIT = 15;
public static final int SESSION_UNDO_LOG_POS = 16; public static final int SESSION_UNDO_LOG_POS = 16;
public static final int LOB_READ = 17;
public static final int STATUS_ERROR = 0; public static final int STATUS_ERROR = 0;
public static final int STATUS_OK = 1; public static final int STATUS_OK = 1;
...@@ -95,7 +96,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -95,7 +96,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
trans.setSSL(ci.isSSL()); trans.setSSL(ci.isSSL());
trans.init(); trans.init();
trans.writeInt(Constants.TCP_PROTOCOL_VERSION_6); trans.writeInt(Constants.TCP_PROTOCOL_VERSION_6);
trans.writeInt(Constants.TCP_PROTOCOL_VERSION_10); trans.writeInt(Constants.TCP_PROTOCOL_VERSION_11);
trans.writeString(db); trans.writeString(db);
trans.writeString(ci.getOriginalURL()); trans.writeString(ci.getOriginalURL());
trans.writeString(ci.getUserName()); trans.writeString(ci.getUserName());
...@@ -692,4 +693,27 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -692,4 +693,27 @@ public class SessionRemote extends SessionWithState implements DataHandler {
return null; return null;
} }
public synchronized int readLob(long lobId, long offset, byte[] buff, int off, int length) {
for (int i = 0, count = 0; i < transferList.size(); i++) {
Transfer transfer = transferList.get(i);
try {
traceOperation("LOB_READ", (int) lobId);
transfer.writeInt(SessionRemote.LOB_READ);
transfer.writeLong(lobId);
transfer.writeLong(offset);
transfer.writeInt(length);
done(transfer);
length = transfer.readInt();
if (length <= 0) {
return length;
}
transfer.readBytes(buff, off, length);
return length;
} catch (IOException e) {
removeServer(e, i--, ++count);
}
}
return 1;
}
} }
...@@ -31,7 +31,6 @@ public class ResultRemote implements ResultInterface { ...@@ -31,7 +31,6 @@ public class ResultRemote implements ResultInterface {
private Value[] currentRow; private Value[] currentRow;
private int rowId, rowCount, rowOffset; private int rowId, rowCount, rowOffset;
private ArrayList<Value[]> result; private ArrayList<Value[]> result;
private ArrayList<Value> lobValues;
private final Trace trace; private final Trace trace;
public ResultRemote(SessionRemote session, Transfer transfer, int id, int columnCount, int fetchSize) public ResultRemote(SessionRemote session, Transfer transfer, int id, int columnCount, int fetchSize)
...@@ -159,16 +158,6 @@ public class ResultRemote implements ResultInterface { ...@@ -159,16 +158,6 @@ public class ResultRemote implements ResultInterface {
} }
public void close() { public void close() {
if (lobValues != null) {
for (Value v : lobValues) {
try {
v.close();
} catch (DbException e) {
trace.error(e, "delete lob {0}", v.getTraceSQL());
}
}
lobValues = null;
}
result = null; result = null;
sendClose(); sendClose();
} }
...@@ -215,12 +204,6 @@ public class ResultRemote implements ResultInterface { ...@@ -215,12 +204,6 @@ public class ResultRemote implements ResultInterface {
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
Value v = transfer.readValue(); Value v = transfer.readValue();
values[i] = v; values[i] = v;
if (v.isFileBased()) {
if (lobValues == null) {
lobValues = New.arrayList();
}
lobValues.add(v);
}
} }
result.add(values); result.add(values);
} }
......
...@@ -6,7 +6,10 @@ ...@@ -6,7 +6,10 @@
*/ */
package org.h2.server; package org.h2.server;
import java.io.ByteArrayInputStream;
import java.io.FilterInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.io.StringWriter; import java.io.StringWriter;
import java.net.Socket; import java.net.Socket;
...@@ -27,22 +30,28 @@ import org.h2.jdbc.JdbcSQLException; ...@@ -27,22 +30,28 @@ import org.h2.jdbc.JdbcSQLException;
import org.h2.message.DbException; import org.h2.message.DbException;
import org.h2.result.ResultColumn; import org.h2.result.ResultColumn;
import org.h2.result.ResultInterface; import org.h2.result.ResultInterface;
import org.h2.store.LobStorage;
import org.h2.util.IOUtils;
import org.h2.util.SmallLRUCache;
import org.h2.util.SmallMap; import org.h2.util.SmallMap;
import org.h2.util.StringUtils; import org.h2.util.StringUtils;
import org.h2.value.Transfer; import org.h2.value.Transfer;
import org.h2.value.Value; import org.h2.value.Value;
import org.h2.value.ValueLobDb;
/** /**
* One server thread is opened per client connection. * One server thread is opened per client connection.
*/ */
public class TcpServerThread implements Runnable { public class TcpServerThread implements Runnable {
protected Transfer transfer;
private TcpServer server; private TcpServer server;
private Session session; private Session session;
private boolean stop; private boolean stop;
private Thread thread; private Thread thread;
private Transfer transfer;
private Command commit; private Command commit;
private SmallMap cache = new SmallMap(SysProperties.SERVER_CACHED_OBJECTS); private SmallMap cache = new SmallMap(SysProperties.SERVER_CACHED_OBJECTS);
private SmallLRUCache<Long, CachedInputStream> lobs = SmallLRUCache.newInstance(SysProperties.SERVER_CACHED_OBJECTS);
private int threadId; private int threadId;
private int clientVersion; private int clientVersion;
private String sessionId; private String sessionId;
...@@ -71,12 +80,12 @@ public class TcpServerThread implements Runnable { ...@@ -71,12 +80,12 @@ public class TcpServerThread implements Runnable {
int minClientVersion = transfer.readInt(); int minClientVersion = transfer.readInt();
if (minClientVersion < Constants.TCP_PROTOCOL_VERSION_6) { if (minClientVersion < Constants.TCP_PROTOCOL_VERSION_6) {
throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_6); throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_6);
} else if (minClientVersion > Constants.TCP_PROTOCOL_VERSION_10) { } else if (minClientVersion > Constants.TCP_PROTOCOL_VERSION_11) {
throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_10); throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_11);
} }
int maxClientVersion = transfer.readInt(); int maxClientVersion = transfer.readInt();
if (maxClientVersion >= Constants.TCP_PROTOCOL_VERSION_10) { if (maxClientVersion >= Constants.TCP_PROTOCOL_VERSION_11) {
clientVersion = Constants.TCP_PROTOCOL_VERSION_10; clientVersion = Constants.TCP_PROTOCOL_VERSION_11;
} else { } else {
clientVersion = minClientVersion; clientVersion = minClientVersion;
} }
...@@ -383,6 +392,31 @@ public class TcpServerThread implements Runnable { ...@@ -383,6 +392,31 @@ public class TcpServerThread implements Runnable {
writeInt(session.getUndoLogPos()).flush(); writeInt(session.getUndoLogPos()).flush();
break; break;
} }
case SessionRemote.LOB_READ: {
long lobId = transfer.readLong();
CachedInputStream cin = lobs.get(lobId);
if (cin == null) {
throw DbException.get(ErrorCode.OBJECT_CLOSED);
}
long offset = transfer.readLong();
if (cin.getPos() != offset) {
LobStorage lobStorage = session.getDataHandler().getLobStorage();
InputStream in = lobStorage.getInputStream(lobId, -1);
cin = new CachedInputStream(in);
lobs.put(lobId, cin);
in.skip(offset);
}
int length = transfer.readInt();
// limit the buffer size
length = Math.min(16 * Constants.IO_BUFFER_SIZE, length);
transfer.writeInt(SessionRemote.STATUS_OK);
byte[] buff = new byte[length];
length = IOUtils.readFully(cin, buff, 0, length);
transfer.writeInt(length);
transfer.writeBytes(buff, 0, length);
transfer.flush();
break;
}
default: default:
trace("Unknown operation: " + operation); trace("Unknown operation: " + operation);
closeSession(); closeSession();
...@@ -402,13 +436,26 @@ public class TcpServerThread implements Runnable { ...@@ -402,13 +436,26 @@ public class TcpServerThread implements Runnable {
transfer.writeBoolean(true); transfer.writeBoolean(true);
Value[] v = result.currentRow(); Value[] v = result.currentRow();
for (int i = 0; i < result.getVisibleColumnCount(); i++) { for (int i = 0; i < result.getVisibleColumnCount(); i++) {
transfer.writeValue(v[i]); writeValue(v[i]);
} }
} else { } else {
transfer.writeBoolean(false); transfer.writeBoolean(false);
} }
} }
private void writeValue(Value v) throws IOException {
if (v.getType() == Value.CLOB || v.getType() == Value.BLOB) {
if (v instanceof ValueLobDb) {
ValueLobDb lob = (ValueLobDb) v;
if (lob.isStored()) {
long id = lob.getLobId();
lobs.put(id, new CachedInputStream(null));
}
}
}
transfer.writeValue(v);
}
void setThread(Thread thread) { void setThread(Thread thread) {
this.thread = thread; this.thread = thread;
} }
...@@ -430,4 +477,49 @@ public class TcpServerThread implements Runnable { ...@@ -430,4 +477,49 @@ public class TcpServerThread implements Runnable {
} }
} }
/**
* An input stream with a position.
*/
static class CachedInputStream extends FilterInputStream {
private static final ByteArrayInputStream DUMMY = new ByteArrayInputStream(new byte[0]);
private long pos;
CachedInputStream(InputStream in) {
super(in == null ? DUMMY : in);
if (in == null) {
pos = -1;
}
}
public int read(byte[] buff, int off, int len) throws IOException {
len = super.read(buff, off, len);
if (len > 0) {
pos += len;
}
return len;
}
public int read() throws IOException {
int x = in.read();
if (x >= 0) {
pos++;
}
return x;
}
public long skip(long n) throws IOException {
n = super.skip(n);
if (n > 0) {
pos += n;
}
return n;
}
public long getPos() {
return pos;
}
}
} }
...@@ -793,7 +793,7 @@ public class Data { ...@@ -793,7 +793,7 @@ public class Data {
long lobId = readVarLong(); long lobId = readVarLong();
long precision = readVarLong(); long precision = readVarLong();
LobStorage lobStorage = handler.getLobStorage(); LobStorage lobStorage = handler.getLobStorage();
ValueLobDb lob = ValueLobDb.create(type, lobStorage, null, tableId, lobId, precision); ValueLobDb lob = ValueLobDb.create(type, lobStorage, tableId, lobId, precision);
return lob; return lob;
} else { } else {
int tableId = readVarInt(); int tableId = readVarInt();
......
...@@ -107,4 +107,16 @@ public interface DataHandler { ...@@ -107,4 +107,16 @@ public interface DataHandler {
*/ */
Connection getLobConnection(); Connection getLobConnection();
/**
* Read from a lob.
*
* @param lobId the lob
* @param offset the offset within the lob
* @param buff the target buffer
* @param off the offset within the target buffer
* @param length the number of bytes to read
* @return the number of bytes read
*/
int readLob(long lobId, long offset, byte[] buff, int off, int length);
} }
...@@ -224,7 +224,7 @@ public class FileLock implements Runnable { ...@@ -224,7 +224,7 @@ public class FileLock implements Runnable {
transfer.setSocket(socket); transfer.setSocket(socket);
transfer.init(); transfer.init();
transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_6); transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_6);
transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_10); transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_11);
transfer.writeString(null); transfer.writeString(null);
transfer.writeString(null); transfer.writeString(null);
transfer.writeString(id); transfer.writeString(id);
......
...@@ -6,6 +6,7 @@ ...@@ -6,6 +6,7 @@
*/ */
package org.h2.store; package org.h2.store;
import java.io.BufferedInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.Reader; import java.io.Reader;
...@@ -265,18 +266,84 @@ public class LobStorage { ...@@ -265,18 +266,84 @@ public class LobStorage {
} }
} }
/**
* An input stream that reads from a remote LOB.
*/
public class RemoteInputStream extends InputStream {
/**
* The data handler.
*/
private final DataHandler handler;
/**
* The lob id.
*/
private final long lob;
/**
* The position.
*/
private long pos;
/**
* The remaining bytes in the lob.
*/
private long remainingBytes;
public RemoteInputStream(DataHandler handler, long lob, long byteCount) {
this.handler = handler;
this.lob = lob;
remainingBytes = byteCount;
}
public int read() throws IOException {
byte[] buff = new byte[1];
int len = read(buff, 0, 1);
return len < 0 ? len : (buff[0] & 255);
}
public int read(byte[] buff) throws IOException {
return read(buff, 0, buff.length);
}
public int read(byte[] buff, int off, int length) throws IOException {
if (length == 0) {
return 0;
}
length = (int) Math.min(length, remainingBytes);
if (length == 0) {
return -1;
}
length = handler.readLob(lob, pos, buff, off, length);
remainingBytes -= length;
if (length == 0) {
return -1;
}
pos += length;
return length;
}
public long skip(long n) {
remainingBytes -= n;
pos += n;
return n;
}
}
/** /**
* An input stream that reads from a LOB. * An input stream that reads from a LOB.
*/ */
public class LobInputStream extends InputStream { public class LobInputStream extends InputStream {
/** /**
* The size of the blob. * The size of the lob.
*/ */
private long length; private long length;
/** /**
* The remaining bytes in the blob. * The remaining bytes in the lob.
*/ */
private long remainingBytes; private long remainingBytes;
...@@ -291,12 +358,12 @@ public class LobStorage { ...@@ -291,12 +358,12 @@ public class LobStorage {
private int pos; private int pos;
/** /**
* The blob id. * The lob id.
*/ */
private long lob; private long lob;
/** /**
* The blob sequence id. * The lob sequence id.
*/ */
private int seq; private int seq;
...@@ -482,6 +549,12 @@ public class LobStorage { ...@@ -482,6 +549,12 @@ public class LobStorage {
*/ */
public InputStream getInputStream(long lobId, long byteCount) throws IOException { public InputStream getInputStream(long lobId, long byteCount) throws IOException {
init(); init();
if (conn == null) {
if (byteCount < 0) {
byteCount = Long.MAX_VALUE;
}
return new BufferedInputStream(new RemoteInputStream(handler, lobId, byteCount));
}
if (byteCount == -1) { if (byteCount == -1) {
synchronized (handler) { synchronized (handler) {
try { try {
...@@ -571,7 +644,7 @@ public class LobStorage { ...@@ -571,7 +644,7 @@ public class LobStorage {
prep.setInt(3, tableId); prep.setInt(3, tableId);
prep.execute(); prep.execute();
reuse(sql, prep); reuse(sql, prep);
ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, byteCount); ValueLobDb v = ValueLobDb.create(type, this, tableId, lobId, byteCount);
return v; return v;
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convert(e); throw DbException.convert(e);
...@@ -610,7 +683,7 @@ public class LobStorage { ...@@ -610,7 +683,7 @@ public class LobStorage {
prep.executeUpdate(); prep.executeUpdate();
reuse(sql, prep); reuse(sql, prep);
ValueLobDb v = ValueLobDb.create(type, this, null, tableId, lobId, length); ValueLobDb v = ValueLobDb.create(type, this, tableId, lobId, length);
return v; return v;
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convert(e); throw DbException.convert(e);
...@@ -786,6 +859,10 @@ public class LobStorage { ...@@ -786,6 +859,10 @@ public class LobStorage {
if (SysProperties.LOB_IN_DATABASE) { if (SysProperties.LOB_IN_DATABASE) {
init(); init();
if (conn == null) { if (conn == null) {
// remote connections:
// need to use a temp file, because the input stream could come from
// the same database, which would create a weird situation (trying
// to read a block while write something)
return ValueLobDb.createTempBlob(in, maxLength, handler); return ValueLobDb.createTempBlob(in, maxLength, handler);
} }
return addLob(in, maxLength, Value.BLOB); return addLob(in, maxLength, Value.BLOB);
...@@ -804,6 +881,10 @@ public class LobStorage { ...@@ -804,6 +881,10 @@ public class LobStorage {
if (SysProperties.LOB_IN_DATABASE) { if (SysProperties.LOB_IN_DATABASE) {
init(); init();
if (conn == null) { if (conn == null) {
// remote connections:
// need to use a temp file, because the input stream could come from
// the same database, which would create a weird situation (trying
// to read a block while write something)
return ValueLobDb.createTempClob(reader, maxLength, handler); return ValueLobDb.createTempClob(reader, maxLength, handler);
} }
long max = maxLength == -1 ? Long.MAX_VALUE : maxLength; long max = maxLength == -1 ? Long.MAX_VALUE : maxLength;
......
...@@ -193,7 +193,7 @@ public class Recover extends Tool implements DataHandler { ...@@ -193,7 +193,7 @@ public class Recover extends Tool implements DataHandler {
public static Value.ValueBlob readBlobDb(Connection conn, long lobId, long precision) { public static Value.ValueBlob readBlobDb(Connection conn, long lobId, long precision) {
DataHandler h = ((JdbcConnection) conn).getSession().getDataHandler(); DataHandler h = ((JdbcConnection) conn).getSession().getDataHandler();
LobStorage lobStorage = h.getLobStorage(); LobStorage lobStorage = h.getLobStorage();
return ValueLobDb.create(Value.BLOB, lobStorage, null, LobStorage.TABLE_TEMP, lobId, precision); return ValueLobDb.create(Value.BLOB, lobStorage, LobStorage.TABLE_TEMP, lobId, precision);
} }
/** /**
...@@ -202,7 +202,7 @@ public class Recover extends Tool implements DataHandler { ...@@ -202,7 +202,7 @@ public class Recover extends Tool implements DataHandler {
public static Value.ValueClob readClobDb(Connection conn, long lobId, long precision) { public static Value.ValueClob readClobDb(Connection conn, long lobId, long precision) {
DataHandler h = ((JdbcConnection) conn).getSession().getDataHandler(); DataHandler h = ((JdbcConnection) conn).getSession().getDataHandler();
LobStorage lobStorage = h.getLobStorage(); LobStorage lobStorage = h.getLobStorage();
return ValueLobDb.create(Value.CLOB, lobStorage, null, LobStorage.TABLE_TEMP, lobId, precision); return ValueLobDb.create(Value.CLOB, lobStorage, LobStorage.TABLE_TEMP, lobId, precision);
} }
private void trace(String message) { private void trace(String message) {
...@@ -1419,4 +1419,11 @@ public class Recover extends Tool implements DataHandler { ...@@ -1419,4 +1419,11 @@ public class Recover extends Tool implements DataHandler {
return null; return null;
} }
/**
* INTERNAL
*/
public int readLob(long lobId, long offset, byte[] buff, int off, int length) {
throw DbException.throwInternalError();
}
} }
...@@ -74,7 +74,7 @@ public class Script extends Tool { ...@@ -74,7 +74,7 @@ public class Script extends Tool {
for (; i < args.length; i++) { for (; i < args.length; i++) {
String a = args[i]; String a = args[i];
String upper = StringUtils.toUpperEnglish(a); String upper = StringUtils.toUpperEnglish(a);
if (upper.startsWith("NO") || "DROP".equals(upper)) { if ("SIMPLE".equals(upper) || upper.startsWith("NO") || "DROP".equals(upper)) {
buff1.append(' '); buff1.append(' ');
buff1.append(args[i]); buff1.append(args[i]);
} else { } else {
......
...@@ -261,6 +261,19 @@ public class Transfer { ...@@ -261,6 +261,19 @@ public class Transfer {
return this; return this;
} }
/**
* Write a number of bytes.
*
* @param buff the value
* @param off the offset
* @param len the length
* @return itself
*/
public Transfer writeBytes(byte[] buff, int off, int len) throws IOException {
out.write(buff, off, len);
return this;
}
/** /**
* Read a byte array. * Read a byte array.
* *
...@@ -276,6 +289,17 @@ public class Transfer { ...@@ -276,6 +289,17 @@ public class Transfer {
return b; return b;
} }
/**
* Read a number of bytes.
*
* @param buff the target buffer
* @param off the offset
* @param len the number of bytes to read
*/
public void readBytes(byte[] buff, int off, int len) throws IOException {
in.readFully(buff, off, len);
}
/** /**
* Close the transfer object and the socket. * Close the transfer object and the socket.
*/ */
...@@ -381,6 +405,18 @@ public class Transfer { ...@@ -381,6 +405,18 @@ public class Transfer {
writeString(v.getString()); writeString(v.getString());
break; break;
case Value.BLOB: { case Value.BLOB: {
if (version >= Constants.TCP_PROTOCOL_VERSION_11) {
if (v instanceof ValueLobDb) {
ValueLobDb lob = (ValueLobDb) v;
if (lob.isStored()) {
writeLong(-1);
writeInt(lob.getTableId());
writeLong(lob.getLobId());
writeLong(lob.getPrecision());
break;
}
}
}
long length = v.getPrecision(); long length = v.getPrecision();
if (length < 0) { if (length < 0) {
throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, "length=" + length); throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, "length=" + length);
...@@ -394,6 +430,18 @@ public class Transfer { ...@@ -394,6 +430,18 @@ public class Transfer {
break; break;
} }
case Value.CLOB: { case Value.CLOB: {
if (version >= Constants.TCP_PROTOCOL_VERSION_11) {
if (v instanceof ValueLobDb) {
ValueLobDb lob = (ValueLobDb) v;
if (lob.isStored()) {
writeLong(-1);
writeInt(lob.getTableId());
writeLong(lob.getLobId());
writeLong(lob.getPrecision());
break;
}
}
}
long length = v.getPrecision(); long length = v.getPrecision();
if (length < 0) { if (length < 0) {
throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, "length=" + length); throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, "length=" + length);
...@@ -519,6 +567,22 @@ public class Transfer { ...@@ -519,6 +567,22 @@ public class Transfer {
return ValueStringFixed.get(readString()); return ValueStringFixed.get(readString());
case Value.BLOB: { case Value.BLOB: {
long length = readLong(); long length = readLong();
if (version >= Constants.TCP_PROTOCOL_VERSION_11) {
if (length == -1) {
int tableId = readInt();
long id = readLong();
long precision = readLong();
return ValueLobDb.create(Value.BLOB, session.getDataHandler().getLobStorage(), tableId, id, precision);
}
int len = (int) length;
byte[] small = new byte[len];
IOUtils.readFully(in, small, 0, len);
int magic = readInt();
if (magic != LOB_MAGIC) {
throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, "magic=" + magic);
}
return ValueLobDb.createSmallLob(Value.BLOB, small, length);
}
Value v = session.getDataHandler().getLobStorage().createBlob(in, length); Value v = session.getDataHandler().getLobStorage().createBlob(in, length);
int magic = readInt(); int magic = readInt();
if (magic != LOB_MAGIC) { if (magic != LOB_MAGIC) {
...@@ -528,6 +592,24 @@ public class Transfer { ...@@ -528,6 +592,24 @@ public class Transfer {
} }
case Value.CLOB: { case Value.CLOB: {
long length = readLong(); long length = readLong();
if (version >= Constants.TCP_PROTOCOL_VERSION_11) {
if (length == -1) {
int tableId = readInt();
long id = readLong();
long precision = readLong();
return ValueLobDb.create(Value.CLOB, session.getDataHandler().getLobStorage(), tableId, id, precision);
}
DataReader reader = new DataReader(in);
int len = (int) length;
char[] buff = new char[len];
IOUtils.readFully(reader, buff, len);
int magic = readInt();
if (magic != LOB_MAGIC) {
throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, "magic=" + magic);
}
byte[] small = new String(buff).getBytes("UTF-8");
return ValueLobDb.createSmallLob(Value.CLOB, small, length);
}
Value v = session.getDataHandler().getLobStorage().createClob(new DataReader(in), length); Value v = session.getDataHandler().getLobStorage().createClob(new DataReader(in), length);
int magic = readInt(); int magic = readInt();
if (magic != LOB_MAGIC) { if (magic != LOB_MAGIC) {
......
...@@ -992,16 +992,6 @@ public abstract class Value { ...@@ -992,16 +992,6 @@ public abstract class Value {
// nothing to do // nothing to do
} }
/**
* Check if this value is stored in it's own file. For values that are
* kept fully in memory, this method returns false.
*
* @return true if it is
*/
public boolean isFileBased() {
return false;
}
/** /**
* Close the underlying resource, if any. For values that are kept fully in * Close the underlying resource, if any. For values that are kept fully in
* memory this method has no effect. * memory this method has no effect.
......
...@@ -739,10 +739,6 @@ public class ValueLob extends Value { ...@@ -739,10 +739,6 @@ public class ValueLob extends Value {
return compression; return compression;
} }
public boolean isFileBased() {
return fileName != null;
}
private static synchronized void deleteFile(DataHandler handler, String fileName) { private static synchronized void deleteFile(DataHandler handler, String fileName) {
// synchronize on the database, to avoid concurrent temp file creation / // synchronize on the database, to avoid concurrent temp file creation /
// deletion / backup // deletion / backup
......
...@@ -46,10 +46,9 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo ...@@ -46,10 +46,9 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
private FileStore tempFile; private FileStore tempFile;
private String fileName; private String fileName;
private ValueLobDb(int type, LobStorage lobStorage, String fileName, int tableId, long lobId, long precision) { private ValueLobDb(int type, LobStorage lobStorage, int tableId, long lobId, long precision) {
this.type = type; this.type = type;
this.lobStorage = lobStorage; this.lobStorage = lobStorage;
this.fileName = fileName;
this.tableId = tableId; this.tableId = tableId;
this.lobId = lobId; this.lobId = lobId;
this.precision = precision; this.precision = precision;
...@@ -66,15 +65,14 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo ...@@ -66,15 +65,14 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
* *
* @param type the type * @param type the type
* @param lobStorage the storage * @param lobStorage the storage
* @param fileName the file name (may be null)
* @param tableId the table id * @param tableId the table id
* @param id the lob id * @param id the lob id
* @param precision the precision (number of bytes / characters) * @param precision the precision (number of bytes / characters)
* @return the value * @return the value
*/ */
public static ValueLobDb create(int type, LobStorage lobStorage, public static ValueLobDb create(int type, LobStorage lobStorage,
String fileName, int tableId, long id, long precision) { int tableId, long id, long precision) {
return new ValueLobDb(type, lobStorage, fileName, tableId, id, precision); return new ValueLobDb(type, lobStorage, tableId, id, precision);
} }
/** /**
...@@ -103,18 +101,26 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo ...@@ -103,18 +101,26 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
if (lobStorage != null) { if (lobStorage != null) {
Value copy = lobStorage.createClob(getReader(), -1); Value copy = lobStorage.createClob(getReader(), -1);
return copy; return copy;
} else if (small != null) {
return LobStorage.createSmallLob(t, small);
} }
} else if (t == Value.BLOB) { } else if (t == Value.BLOB) {
if (lobStorage != null) { if (lobStorage != null) {
Value copy = lobStorage.createBlob(getInputStream(), -1); Value copy = lobStorage.createBlob(getInputStream(), -1);
return copy; return copy;
} else if (small != null) {
return LobStorage.createSmallLob(t, small);
} }
} }
return super.convertTo(t); return super.convertTo(t);
} }
public boolean isLinked() { public boolean isLinked() {
return tableId != LobStorage.TABLE_ID_SESSION_VARIABLE; return tableId != LobStorage.TABLE_ID_SESSION_VARIABLE && small == null;
}
public boolean isStored() {
return small == null && fileName == null;
} }
public void close() { public void close() {
...@@ -339,10 +345,6 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo ...@@ -339,10 +345,6 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
return other instanceof ValueLobDb && compareSecure((Value) other, null) == 0; return other instanceof ValueLobDb && compareSecure((Value) other, null) == 0;
} }
public boolean isFileBased() {
return small == null;
}
public int getMemory() { public int getMemory() {
if (small != null) { if (small != null) {
return small.length + 104; return small.length + 104;
......
...@@ -345,6 +345,8 @@ java org.h2.test.TestAll timer ...@@ -345,6 +345,8 @@ java org.h2.test.TestAll timer
int testing; int testing;
// System.setProperty("h2.modifyOnWrite", "true"); // System.setProperty("h2.modifyOnWrite", "true");
int testWith_TCP_PROTOCOL_VERSION_10;
// System.setProperty("h2.storeLocalTime", "true"); // System.setProperty("h2.storeLocalTime", "true");
// speedup // speedup
......
...@@ -649,7 +649,9 @@ public abstract class TestBase { ...@@ -649,7 +649,9 @@ public abstract class TestBase {
int ca = actual.read(); int ca = actual.read();
actual.read(new byte[0]); actual.read(new byte[0]);
int ce = expected.read(); int ce = expected.read();
assertEquals(ce, ca); if (ca != ce) {
assertEquals("Error at index " + i, ce, ca);
}
if (ca == -1) { if (ca == -1) {
break; break;
} }
......
...@@ -70,7 +70,8 @@ public class TestLob extends TestBase { ...@@ -70,7 +70,8 @@ public class TestLob extends TestBase {
testCreateAsSelect(); testCreateAsSelect();
testDropAllObjects(); testDropAllObjects();
testDelete(); testDelete();
testTempFilesDeleted(); testTempFilesDeleted(true);
testTempFilesDeleted(false);
testAddLobRestart(); testAddLobRestart();
testLobServerMemory(); testLobServerMemory();
if (config.memory) { if (config.memory) {
...@@ -454,7 +455,7 @@ public class TestLob extends TestBase { ...@@ -454,7 +455,7 @@ public class TestLob extends TestBase {
conn.close(); conn.close();
} }
private void testTempFilesDeleted() throws Exception { private void testTempFilesDeleted(boolean stream) throws Exception {
FileUtils.deleteRecursive(TEMP_DIR, true); FileUtils.deleteRecursive(TEMP_DIR, true);
FileUtils.createDirectories(TEMP_DIR); FileUtils.createDirectories(TEMP_DIR);
List<String> list = FileUtils.newDirectoryStream(TEMP_DIR); List<String> list = FileUtils.newDirectoryStream(TEMP_DIR);
...@@ -464,12 +465,25 @@ public class TestLob extends TestBase { ...@@ -464,12 +465,25 @@ public class TestLob extends TestBase {
Statement stat; Statement stat;
stat = conn.createStatement(); stat = conn.createStatement();
stat.execute("create table test(id int primary key, name text)"); stat.execute("create table test(id int primary key, name text)");
stat.execute("insert into test values(1, space(100000))"); PreparedStatement prep = conn.prepareStatement("insert into test values(2, ?)");
if (stream) {
String large = new String(new char[1024 * 1024 * 2]).replace((char) 0, 'x');
prep.setCharacterStream(1, new StringReader(large));
large = null;
prep.execute();
} else {
stat.execute("insert into test values(1, space(100000))");
}
/*
list = FileUtils.newDirectoryStream(TEMP_DIR);
assertEquals("Unexpected temp file: " + list, 0, list.size());
*/
ResultSet rs; ResultSet rs;
rs = stat.executeQuery("select * from test"); rs = stat.executeQuery("select * from test");
rs.next(); while (rs.next()) {
rs.getCharacterStream("name").close(); rs.getCharacterStream("name").close();
rs.close(); }
prep.close();
conn.close(); conn.close();
list = FileUtils.newDirectoryStream(TEMP_DIR); list = FileUtils.newDirectoryStream(TEMP_DIR);
assertEquals("Unexpected temp file: " + list, 0, list.size()); assertEquals("Unexpected temp file: " + list, 0, list.size());
...@@ -1123,7 +1137,8 @@ public class TestLob extends TestBase { ...@@ -1123,7 +1137,8 @@ public class TestLob extends TestBase {
stat = conn.createStatement(); stat = conn.createStatement();
ResultSet rs = stat.executeQuery("SELECT * FROM TEST WHERE ID=1"); ResultSet rs = stat.executeQuery("SELECT * FROM TEST WHERE ID=1");
rs.next(); rs.next();
assertEqualStreams(rs.getBinaryStream("TEXT"), new ByteArrayInputStream(data), -1); InputStream in = new ByteArrayInputStream(data);
assertEqualStreams(in, rs.getBinaryStream("TEXT"), -1);
prep = conn.prepareStatement("UPDATE TEST SET TEXT = ?"); prep = conn.prepareStatement("UPDATE TEST SET TEXT = ?");
prep.setBinaryStream(1, new ByteArrayInputStream(data), 0); prep.setBinaryStream(1, new ByteArrayInputStream(data), 0);
......
...@@ -6,10 +6,12 @@ ...@@ -6,10 +6,12 @@
*/ */
package org.h2.test.jdbc; package org.h2.test.jdbc;
import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.Reader; import java.io.Reader;
import java.io.StringReader;
import java.io.Writer; import java.io.Writer;
import java.sql.Blob; import java.sql.Blob;
import java.sql.Clob; import java.sql.Clob;
...@@ -42,6 +44,7 @@ public class TestLobApi extends TestBase { ...@@ -42,6 +44,7 @@ public class TestLobApi extends TestBase {
public void test() throws Exception { public void test() throws Exception {
deleteDb("lob"); deleteDb("lob");
testLobStaysOpenUntilCommitted();
testInputStreamThrowsException(true); testInputStreamThrowsException(true);
testInputStreamThrowsException(false); testInputStreamThrowsException(false);
conn = (JdbcConnection) getConnection("lob"); conn = (JdbcConnection) getConnection("lob");
...@@ -61,6 +64,51 @@ public class TestLobApi extends TestBase { ...@@ -61,6 +64,51 @@ public class TestLobApi extends TestBase {
conn.close(); conn.close();
} }
/**
* According to the JDBC spec, BLOB and CLOB objects must stay open even if
* the result set is closed (see ResultSet.close).
*/
private void testLobStaysOpenUntilCommitted() throws Exception {
Connection conn = getConnection("lob");
stat = conn.createStatement();
stat.execute("create table test(id identity, c clob, b blob)");
PreparedStatement prep = conn.prepareStatement("insert into test values(null, ?, ?)");
prep.setString(1, "");
prep.setBytes(2, new byte[0]);
prep.execute();
Random r = new Random(1);
char[] chars = new char[100000];
for (int i = 0; i < chars.length; i++) {
chars[i] = (char) r.nextInt(10000);
}
String d = new String(chars);
prep.setCharacterStream(1, new StringReader(d));
byte[] bytes = new byte[100000];
r.nextBytes(bytes);
prep.setBinaryStream(2, new ByteArrayInputStream(bytes));
prep.execute();
conn.setAutoCommit(false);
ResultSet rs = stat.executeQuery("select * from test order by id");
rs.next();
Clob c1 = rs.getClob(2);
Blob b1 = rs.getBlob(3);
rs.next();
Clob c2 = rs.getClob(2);
Blob b2 = rs.getBlob(3);
assertFalse(rs.next());
rs.close();
assertEquals(0, c1.length());
assertEquals(0, b1.length());
assertEquals(chars.length, c2.length());
assertEquals(bytes.length, b2.length());
assertEquals("", c1.getSubString(1, 0));
assertEquals(new byte[0], b1.getBytes(1, 0));
assertEquals(d, c2.getSubString(1, (int) c2.length()));
assertEquals(bytes, b2.getBytes(1, (int) b2.length()));
stat.execute("drop table test");
conn.close();
}
private void testInputStreamThrowsException(final boolean ioException) throws Exception { private void testInputStreamThrowsException(final boolean ioException) throws Exception {
Connection conn = getConnection("lob"); Connection conn = getConnection("lob");
stat = conn.createStatement(); stat = conn.createStatement();
......
...@@ -327,4 +327,8 @@ public class TestDataPage extends TestBase implements DataHandler { ...@@ -327,4 +327,8 @@ public class TestDataPage extends TestBase implements DataHandler {
return null; return null;
} }
public int readLob(long lobId, long offset, byte[] buff, int off, int length) {
return -1;
}
} }
...@@ -179,4 +179,8 @@ public class TestFile extends TestBase implements DataHandler { ...@@ -179,4 +179,8 @@ public class TestFile extends TestBase implements DataHandler {
return null; return null;
} }
public int readLob(long lobId, long offset, byte[] buff, int off, int length) {
return -1;
}
} }
...@@ -157,4 +157,8 @@ public class TestValueHashMap extends TestBase implements DataHandler { ...@@ -157,4 +157,8 @@ public class TestValueHashMap extends TestBase implements DataHandler {
return null; return null;
} }
public int readLob(long lobId, long offset, byte[] buff, int off, int length) {
return -1;
}
} }
...@@ -261,4 +261,8 @@ public class TestValueMemory extends TestBase implements DataHandler { ...@@ -261,4 +261,8 @@ public class TestValueMemory extends TestBase implements DataHandler {
return null; return null;
} }
public int readLob(long lobId, long offset, byte[] buff, int off, int length) {
return -1;
}
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论