提交 3c5b4596 authored 作者: Thomas Mueller's avatar Thomas Mueller

Store CLOBs and BLOBs in maps (StreamStore) to improve performance and avoid…

Store CLOBs and BLOBs in maps (StreamStore) to improve performance and avoid using a second connection. (WIP)
上级 b5ac069e
......@@ -453,7 +453,7 @@ public class ScriptCommand extends ScriptBase {
for (int i = 0;; i++) {
StringBuilder buff = new StringBuilder(lobBlockSize * 2);
buff.append("INSERT INTO SYSTEM_LOB_STREAM VALUES(" + id + ", " + i + ", NULL, '");
int len = IOUtils.readFully(input, bytes, 0, lobBlockSize);
int len = IOUtils.readFully(input, bytes, lobBlockSize);
if (len <= 0) {
break;
}
......@@ -474,7 +474,7 @@ public class ScriptCommand extends ScriptBase {
StringBuilder buff = new StringBuilder(lobBlockSize * 2);
buff.append("INSERT INTO SYSTEM_LOB_STREAM VALUES(" + id + ", " + i + ", ");
int len = IOUtils.readFully(reader, chars, lobBlockSize);
if (len < 0) {
if (len == 0) {
break;
}
buff.append(StringUtils.quoteStringSQL(new String(chars, 0, len))).
......
......@@ -43,6 +43,8 @@ import org.h2.store.FileStore;
import org.h2.store.InDoubtTransaction;
import org.h2.store.LobStorageBackend;
import org.h2.store.LobStorageFrontend;
import org.h2.store.LobStorageInterface;
import org.h2.store.LobStorageMap;
import org.h2.store.PageStore;
import org.h2.store.WriterThread;
import org.h2.store.fs.FileUtils;
......@@ -171,7 +173,7 @@ public class Database implements DataHandler {
private SourceCompiler compiler;
private volatile boolean metaTablesInitialized;
private boolean flushOnEachCommit;
private LobStorageBackend lobStorage;
private LobStorageInterface lobStorage;
private final int pageSize;
private int defaultTableType = Table.TYPE_CACHED;
private final DbSettings dbSettings;
......@@ -2501,10 +2503,14 @@ public class Database implements DataHandler {
}
@Override
public LobStorageBackend getLobStorage() {
public LobStorageInterface getLobStorage() {
if (lobStorage == null) {
if (dbSettings.mvStore) {
lobStorage = new LobStorageMap(this);
} else {
lobStorage = new LobStorageBackend(this);
}
}
return lobStorage;
}
......
......@@ -33,7 +33,6 @@ import org.h2.result.Row;
import org.h2.schema.Schema;
import org.h2.store.DataHandler;
import org.h2.store.InDoubtTransaction;
import org.h2.store.LobStorageBackend;
import org.h2.store.LobStorageFrontend;
import org.h2.table.Table;
import org.h2.util.New;
......@@ -1072,10 +1071,6 @@ public class Session extends SessionWithState {
return database;
}
public LobStorageBackend getLobStorageBackend() {
return database.getLobStorage();
}
/**
* Remember that the given LOB value must be un-linked (disconnected from
* the table) at commit.
......
......@@ -53,6 +53,7 @@ MVTableEngine:
- test and possibly allow MVCC & MULTI_THREADED
- maybe enable MVCC by default (but allow to disable it)
- use StreamStore to avoid deadlocks
- config options for compression and page size (maybe combined)
TransactionStore:
......
......@@ -195,7 +195,12 @@ public class StreamStore {
// do nothing by default
}
private long getAndIncrementNextKey() {
/**
* Generate a new key.
*
* @return the new key
*/
public long getAndIncrementNextKey() {
long key = nextKey.getAndIncrement();
if (!map.containsKey(key)) {
return key;
......
......@@ -22,7 +22,6 @@ import org.h2.mvstore.rtree.SpatialKey;
import org.h2.mvstore.type.DataType;
import org.h2.result.SortOrder;
import org.h2.store.DataHandler;
import org.h2.store.LobStorageFrontend;
import org.h2.tools.SimpleResultSet;
import org.h2.value.CompareMode;
import org.h2.value.Value;
......@@ -493,7 +492,7 @@ public class ValueDataType implements DataType {
if (smallLen >= 0) {
byte[] small = DataUtils.newBytes(smallLen);
buff.get(small, 0, smallLen);
return LobStorageFrontend.createSmallLob(type, small);
return ValueLobDb.createSmallLob(type, small);
} else if (smallLen == -3) {
int tableId = readVarInt(buff);
long lobId = readVarLong(buff);
......
......@@ -441,7 +441,9 @@ public class TcpServerThread implements Runnable {
}
if (in.getPos() != offset) {
LobStorageInterface lobStorage = session.getDataHandler().getLobStorage();
InputStream lobIn = lobStorage.getInputStream(lobId, hmac, -1);
// only the lob id is used
ValueLobDb lob = ValueLobDb.create(Value.BLOB, null, -1, lobId, hmac, -1);
InputStream lobIn = lobStorage.getInputStream(lob, hmac, -1);
in = new CachedInputStream(lobIn);
lobs.put(lobId, in);
lobIn.skip(offset);
......@@ -449,7 +451,7 @@ public class TcpServerThread implements Runnable {
// limit the buffer size
length = Math.min(16 * Constants.IO_BUFFER_SIZE, length);
byte[] buff = new byte[length];
length = IOUtils.readFully(in, buff, 0, length);
length = IOUtils.readFully(in, buff, length);
transfer.writeInt(SessionRemote.STATUS_OK);
transfer.writeInt(length);
transfer.writeBytes(buff, 0, length);
......
/*
* Copyright 2004-2013 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.store;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import org.h2.engine.Constants;
/**
* An input stream that reads the data from a reader.
*/
public class CountingReaderInputStream extends InputStream {
private final Reader reader;
private final CharBuffer charBuffer = CharBuffer.allocate(Constants.IO_BUFFER_SIZE);
private final CharsetEncoder encoder = Constants.UTF8.newEncoder().
onMalformedInput(CodingErrorAction.REPLACE).
onUnmappableCharacter(CodingErrorAction.REPLACE);
private ByteBuffer byteBuffer = ByteBuffer.allocate(0);
private long length;
private long remaining;
CountingReaderInputStream(Reader reader, long maxLength) {
this.reader = reader;
this.remaining = maxLength;
}
@Override
public int read(byte[] buff, int offset, int len) throws IOException {
if (!fetch()) {
return -1;
}
len = Math.min(len, byteBuffer.remaining());
byteBuffer.get(buff, offset, len);
return len;
}
@Override
public int read() throws IOException {
if (!fetch()) {
return -1;
}
return byteBuffer.get() & 255;
}
private boolean fetch() throws IOException {
if (byteBuffer != null && byteBuffer.remaining() == 0) {
fillBuffer();
}
return byteBuffer != null;
}
private void fillBuffer() throws IOException {
int len = (int) Math.min(charBuffer.capacity() - charBuffer.position(), remaining);
if (len > 0) {
len = reader.read(charBuffer.array(), charBuffer.position(), len);
}
if (len > 0) {
remaining -= len;
} else {
len = 0;
remaining = 0;
}
length += len;
charBuffer.limit(charBuffer.position() + len);
charBuffer.rewind();
byteBuffer = ByteBuffer.allocate(Constants.IO_BUFFER_SIZE);
boolean end = remaining == 0;
encoder.encode(charBuffer, byteBuffer, end);
if (end && byteBuffer.position() == 0) {
// EOF
byteBuffer = null;
return;
}
byteBuffer.flip();
charBuffer.compact();
charBuffer.flip();
charBuffer.position(charBuffer.limit());
}
/**
* The number of characters read so far (but there might still be some bytes
* in the buffer).
*
* @return the number of characters
*/
public long getLength() {
return length;
}
@Override
public void close() throws IOException {
reader.close();
}
}
\ No newline at end of file
......@@ -800,7 +800,7 @@ public class Data {
if (smallLen >= 0) {
byte[] small = DataUtils.newBytes(smallLen);
read(small, 0, smallLen);
return LobStorageFrontend.createSmallLob(type, small);
return ValueLobDb.createSmallLob(type, small);
} else if (smallLen == -3) {
int tableId = readVarInt();
long lobId = readVarLong();
......
......@@ -95,7 +95,7 @@ public interface DataHandler {
/**
* Read from a lob.
*
* @param lobId the lob
* @param lobId the lob id
* @param hmac the message authentication code
* @param offset the offset within the lob
* @param buff the target buffer
......
......@@ -113,11 +113,10 @@ public class DataReader extends Reader {
* Read a number of bytes.
*
* @param buff the target buffer
* @param offset the offset within the target buffer
* @param len the number of bytes to read
*/
public void readFully(byte[] buff, int offset, int len) throws IOException {
int got = IOUtils.readFully(in, buff, offset, len);
public void readFully(byte[] buff, int len) throws IOException {
int got = IOUtils.readFully(in, buff, len);
if (got < len) {
throw new FastEOFException();
}
......
......@@ -9,10 +9,6 @@ package org.h2.store;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
......@@ -23,7 +19,6 @@ import java.util.HashMap;
import org.h2.constant.ErrorCode;
import org.h2.constant.SysProperties;
import org.h2.engine.Constants;
import org.h2.engine.Database;
import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
......@@ -35,8 +30,8 @@ import org.h2.value.Value;
import org.h2.value.ValueLobDb;
/**
* This class stores LOB objects in the database. This is the back-end i.e. the
* server side of the LOB storage.
* This class stores LOB objects in the database, in tables. This is the
* back-end i.e. the server side of the LOB storage.
* <p>
* Using the system session
* <p>
......@@ -44,8 +39,8 @@ import org.h2.value.ValueLobDb;
* take a very long time. If we did them on a normal session, we would be
* locking the LOB tables for long periods of time, which is extremely
* detrimental to the rest of the system. Perhaps when we shift to the MVStore
* engine, we can revisit this design decision
* (using the StreamStore, that is, no connection at all).
* engine, we can revisit this design decision (using the StreamStore, that is,
* no connection at all).
* <p>
* Locking
* <p>
......@@ -107,9 +102,7 @@ public class LobStorageBackend implements LobStorageInterface {
this.database = database;
}
/**
* Initialize the lob storage.
*/
@Override
public void init() {
if (init) {
return;
......@@ -189,11 +182,7 @@ public class LobStorageBackend implements LobStorageInterface {
return x;
}
/**
* Remove all LOBs for this table.
*
* @param tableId the table id
*/
@Override
public void removeAllForTable(int tableId) {
init();
try {
......@@ -278,7 +267,11 @@ public class LobStorageBackend implements LobStorageInterface {
}
@Override
public void removeLob(long lob) {
public void removeLob(ValueLobDb lob) {
removeLob(lob.getLobId());
}
private void removeLob(long lobId) {
try {
// see locking discussion at the top
assertNotHolds(conn.getSession());
......@@ -288,8 +281,8 @@ public class LobStorageBackend implements LobStorageInterface {
"AND NOT EXISTS(SELECT 1 FROM " + LOB_MAP + " O " +
"WHERE O.BLOCK = D.BLOCK AND O.LOB <> ?)";
PreparedStatement prep = prepare(sql);
prep.setLong(1, lob);
prep.setLong(2, lob);
prep.setLong(1, lobId);
prep.setLong(2, lobId);
ResultSet rs = prep.executeQuery();
ArrayList<Long> blocks = New.arrayList();
while (rs.next()) {
......@@ -301,7 +294,7 @@ public class LobStorageBackend implements LobStorageInterface {
sql = "DELETE FROM " + LOB_MAP + " WHERE LOB = ?";
prep = prepare(sql);
prep.setLong(1, lob);
prep.setLong(1, lobId);
prep.execute();
reuse(sql, prep);
......@@ -315,7 +308,7 @@ public class LobStorageBackend implements LobStorageInterface {
sql = "DELETE FROM " + LOBS + " WHERE ID = ?";
prep = prepare(sql);
prep.setLong(1, lob);
prep.setLong(1, lobId);
prep.execute();
reuse(sql, prep);
}
......@@ -326,13 +319,14 @@ public class LobStorageBackend implements LobStorageInterface {
}
@Override
public InputStream getInputStream(long lobId, byte[] hmac, long byteCount) throws IOException {
public InputStream getInputStream(ValueLobDb lob, byte[] hmac, long byteCount) throws IOException {
try {
init();
assertNotHolds(conn.getSession());
// see locking discussion at the top
synchronized (database) {
synchronized (conn.getSession()) {
long lobId = lob.getLobId();
return new LobInputStream(lobId, byteCount);
}
}
......@@ -355,7 +349,7 @@ public class LobStorageBackend implements LobStorageInterface {
byte[] small = null;
for (int seq = 0; maxLength > 0; seq++) {
int len = (int) Math.min(BLOCK_LENGTH, maxLength);
len = IOUtils.readFully(in, buff, 0, len);
len = IOUtils.readFully(in, buff, len);
if (len <= 0) {
break;
}
......@@ -429,7 +423,9 @@ public class LobStorageBackend implements LobStorageInterface {
}
@Override
public ValueLobDb copyLob(int type, long oldLobId, int tableId, long length) {
public ValueLobDb copyLob(ValueLobDb old, int tableId, long length) {
int type = old.getType();
long oldLobId = old.getLobId();
assertNotHolds(conn.getSession());
// see locking discussion at the top
synchronized (database) {
......@@ -562,7 +558,8 @@ public class LobStorageBackend implements LobStorageInterface {
}
@Override
public void setTable(long lobId, int table) {
public void setTable(ValueLobDb lob, int table) {
long lobId = lob.getLobId();
assertNotHolds(conn.getSession());
// see locking discussion at the top
synchronized (database) {
......@@ -762,6 +759,10 @@ public class LobStorageBackend implements LobStorageInterface {
return;
}
try {
;
if(lobMapIndex >= lobMapBlocks.length) {
System.out.println("??");
}
buffer = readBlock(lobMapBlocks[lobMapIndex]);
lobMapIndex++;
bufferPos = 0;
......@@ -772,96 +773,4 @@ public class LobStorageBackend implements LobStorageInterface {
}
/**
* An input stream that reads the data from a reader.
*/
public static class CountingReaderInputStream extends InputStream {
private final Reader reader;
private final CharBuffer charBuffer = CharBuffer.allocate(Constants.IO_BUFFER_SIZE);
private final CharsetEncoder encoder = Constants.UTF8.newEncoder().
onMalformedInput(CodingErrorAction.REPLACE).
onUnmappableCharacter(CodingErrorAction.REPLACE);
private ByteBuffer byteBuffer = ByteBuffer.allocate(0);
private long length;
private long remaining;
CountingReaderInputStream(Reader reader, long maxLength) {
this.reader = reader;
this.remaining = maxLength;
}
@Override
public int read(byte[] buff, int offset, int len) throws IOException {
if (!fetch()) {
return -1;
}
len = Math.min(len, byteBuffer.remaining());
byteBuffer.get(buff, offset, len);
return len;
}
@Override
public int read() throws IOException {
if (!fetch()) {
return -1;
}
return byteBuffer.get() & 255;
}
private boolean fetch() throws IOException {
if (byteBuffer != null && byteBuffer.remaining() == 0) {
fillBuffer();
}
return byteBuffer != null;
}
private void fillBuffer() throws IOException {
int len = (int) Math.min(charBuffer.capacity() - charBuffer.position(), remaining);
if (len > 0) {
len = reader.read(charBuffer.array(), charBuffer.position(), len);
}
if (len > 0) {
remaining -= len;
} else {
len = 0;
remaining = 0;
}
length += len;
charBuffer.limit(charBuffer.position() + len);
charBuffer.rewind();
byteBuffer = ByteBuffer.allocate(Constants.IO_BUFFER_SIZE);
boolean end = remaining == 0;
encoder.encode(charBuffer, byteBuffer, end);
if (end && byteBuffer.position() == 0) {
// EOF
byteBuffer = null;
return;
}
byteBuffer.flip();
charBuffer.compact();
charBuffer.flip();
charBuffer.position(charBuffer.limit());
}
/**
* The number of characters read so far (but there might still be some bytes
* in the buffer).
*
* @return the number of characters
*/
public long getLength() {
return length;
}
@Override
public void close() throws IOException {
reader.close();
}
}
}
......@@ -10,13 +10,12 @@ import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import org.h2.engine.Constants;
import org.h2.value.Value;
import org.h2.value.ValueLobDb;
/**
* This class stores LOB objects in the database.
* This is the front-end i.e. the client side of the LOB storage.
* This factory creates in-memory objects and temporary files. It is used on the
* client side.
*/
public class LobStorageFrontend implements LobStorageInterface {
......@@ -36,40 +35,39 @@ public class LobStorageFrontend implements LobStorageInterface {
this.handler = handler;
}
/**
* Delete a LOB from the database.
*
* @param lob the lob id
*/
@Override
public void removeLob(long lob) {
// TODO this should not be called at all,
// but that's a refactoring for another day
public void removeLob(ValueLobDb lob) {
// not stored in the database
}
/**
* Get the input stream for the given lob.
*
* @param lobId the lob id
* @param lob the lob
* @param hmac the message authentication code (for remote input streams)
* @param byteCount the number of bytes to read, or -1 if not known
* @return the stream
*/
@Override
public InputStream getInputStream(long lobId, byte[] hmac, long byteCount) throws IOException {
public InputStream getInputStream(ValueLobDb lob, byte[] hmac, long byteCount) throws IOException {
if (byteCount < 0) {
byteCount = Long.MAX_VALUE;
}
return new BufferedInputStream(new LobStorageRemoteInputStream(handler, lobId, hmac, byteCount));
return new BufferedInputStream(new LobStorageRemoteInputStream(handler, lob, hmac, byteCount));
}
@Override
public ValueLobDb copyLob(ValueLobDb old, int tableId, long length) {
throw new UnsupportedOperationException();
}
@Override
public ValueLobDb copyLob(int type, long oldLobId, int tableId, long length) {
public void setTable(ValueLobDb lob, int tableIdSessionVariable) {
throw new UnsupportedOperationException();
}
@Override
public void setTable(long lobId, int tableIdSessionVariable) {
public void removeAllForTable(int tableId) {
throw new UnsupportedOperationException();
}
......@@ -77,7 +75,7 @@ public class LobStorageFrontend implements LobStorageInterface {
public Value createBlob(InputStream in, long maxLength) {
// need to use a temp file, because the input stream could come from
// the same database, which would create a weird situation (trying
// to read a block while write something)
// to read a block while writing something)
return ValueLobDb.createTempBlob(in, maxLength, handler);
}
......@@ -92,26 +90,13 @@ public class LobStorageFrontend implements LobStorageInterface {
public Value createClob(Reader reader, long maxLength) {
// need to use a temp file, because the input stream could come from
// the same database, which would create a weird situation (trying
// to read a block while write something)
// to read a block while writing something)
return ValueLobDb.createTempClob(reader, maxLength, handler);
}
/**
* Create a LOB object that fits in memory.
*
* @param type the value type
* @param small the byte array
* @return the LOB
*/
public static Value createSmallLob(int type, byte[] small) {
int precision;
if (type == Value.CLOB) {
precision = new String(small, Constants.UTF8).length();
} else {
precision = small.length;
}
return ValueLobDb.createSmallLob(type, small, precision);
@Override
public void init() {
// nothing to do
}
}
......@@ -38,38 +38,49 @@ public interface LobStorageInterface {
/**
* Copy a lob.
*
* @param type the type
* @param oldLobId the old lob id
* @param old the old lob
* @param tableId the new table id
* @param length the length
* @return the new lob
*/
ValueLobDb copyLob(int type, long oldLobId, int tableId, long length);
ValueLobDb copyLob(ValueLobDb old, int tableId, long length);
/**
* Get the input stream for the given lob.
*
* @param lobId the lob id
* @param lob the lob id
* @param hmac the message authentication code (for remote input streams)
* @param byteCount the number of bytes to read, or -1 if not known
* @return the stream
*/
InputStream getInputStream(long lobId, byte[] hmac, long byteCount)
InputStream getInputStream(ValueLobDb lob, byte[] hmac, long byteCount)
throws IOException;
/**
* Set the table reference of this lob.
*
* @param lobId the lob
* @param lob the lob
* @param table the table
*/
void setTable(long lobId, int table);
void setTable(ValueLobDb lob, int table);
/**
* Delete a LOB from the database.
* Delete a LOB (from the database, if it is stored there).
*
* @param lob the lob id
* @param lob the lob
*/
void removeLob(ValueLobDb lob);
/**
* Remove all LOBs for this table.
*
* @param tableId the table id
*/
void removeAllForTable(int tableId);
/**
* Initialize the lob storage.
*/
void removeLob(long lob);
void init();
}
/*
* Copyright 2004-2013 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.store;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map.Entry;
import org.h2.engine.Constants;
import org.h2.engine.Database;
import org.h2.message.DbException;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVMapConcurrent;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.StreamStore;
import org.h2.mvstore.db.MVTableEngine.Store;
import org.h2.util.IOUtils;
import org.h2.util.New;
import org.h2.value.Value;
import org.h2.value.ValueLobDb;
/**
* This class stores LOB objects in the database, in maps. This is the back-end
* i.e. the server side of the LOB storage.
*/
public class LobStorageMap implements LobStorageInterface {
private final Database database;
private boolean init;
/**
* The lob metadata map. It contains the mapping from the lob id
* (which is a long) to the stream store id (which is a byte array).
*
* Key: lobId (long)
* Value: { streamStoreId (byte[]), tableId (int), byteCount (long), hashCode (long) }.
*/
private MVMap<Long, Object[]> lobMap;
/**
* The reference map. It is used to remove data from the stream store: if no
* more entries for the given streamStoreId exist, the data is removed from
* the stream store.
*
* Key: { streamStoreId (byte[]), lobId (long) }.
* Value: true (boolean).
*/
private MVMap<Object[], Boolean> refMap;
/**
* The stream store data map.
*
* Key: stream store block id (long).
* Value: data (byte[]).
*/
private MVMap<Long, byte[]> dataMap;
private StreamStore streamStore;
public LobStorageMap(Database database) {
this.database = database;
}
@Override
public void init() {
if (init) {
return;
}
init = true;
Store s = database.getMvStore();
MVStore mvStore;
if (s == null) {
// in-memory database
mvStore = MVStore.open(null);
} else {
mvStore = s.getStore();
}
lobMap = mvStore.openMap("lobMap",
new MVMapConcurrent.Builder<Long, Object[]>());
refMap = mvStore.openMap("lobRef",
new MVMapConcurrent.Builder<Object[], Boolean>());
dataMap = mvStore.openMap("lobData",
new MVMapConcurrent.Builder<Long, byte[]>());
streamStore = new StreamStore(dataMap);
}
@Override
public Value createBlob(InputStream in, long maxLength) {
init();
int type = Value.BLOB;
if (maxLength < 0) {
maxLength = Long.MAX_VALUE;
}
int max = (int) Math.min(maxLength, database.getMaxLengthInplaceLob());
try {
if (max != 0 && max < Integer.MAX_VALUE) {
BufferedInputStream b = new BufferedInputStream(in, max);
b.mark(max);
byte[] small = new byte[max];
int len = IOUtils.readFully(b, small, max);
if (len < max) {
if (len < small.length) {
small = Arrays.copyOf(small, len);
}
return ValueLobDb.createSmallLob(type, small);
}
b.reset();
in = b;
}
return createLob(in, type);
} catch (IOException e) {
throw DbException.convertIOException(e, null);
}
}
@Override
public Value createClob(Reader reader, long maxLength) {
init();
int type = Value.CLOB;
if (maxLength < 0) {
maxLength = Long.MAX_VALUE;
}
int max = (int) Math.min(maxLength, database.getMaxLengthInplaceLob());
try {
if (max != 0 && max < Integer.MAX_VALUE) {
BufferedReader b = new BufferedReader(reader, max);
b.mark(max);
char[] small = new char[max];
int len = IOUtils.readFully(b, small, max);
if (len < max) {
if (len < small.length) {
small = Arrays.copyOf(small, len);
}
byte[] utf8 = new String(small, 0, len).getBytes(Constants.UTF8);
return ValueLobDb.createSmallLob(type, utf8);
}
b.reset();
reader = b;
}
CountingReaderInputStream in = new CountingReaderInputStream(reader, maxLength);
ValueLobDb lob = createLob(in, type);
// the length is not correct
lob = ValueLobDb.create(type, database, lob.getTableId(), lob.getLobId(), null, in.getLength());
return lob;
} catch (IOException e) {
throw DbException.convertIOException(e, null);
}
}
private ValueLobDb createLob(InputStream in, int type) throws IOException {
byte[] streamStoreId;
try {
streamStoreId = streamStore.put(in);
} catch (Exception e) {
throw DbException.convertToIOException(e);
}
long lobId = streamStore.getAndIncrementNextKey();
long length = streamStore.length(streamStoreId);
int tableId = LobStorageFrontend.TABLE_TEMP;
Object[] value = new Object[] { streamStoreId, tableId, length, 0 };
lobMap.put(lobId, value);
Object[] key = new Object[] { streamStoreId, lobId };
refMap.put(key, Boolean.TRUE);
ValueLobDb lob = ValueLobDb.create(type, database, tableId, lobId, null, length);
return lob;
}
@Override
public ValueLobDb copyLob(ValueLobDb old, int tableId, long length) {
init();
int type = old.getType();
long oldLobId = old.getLobId();
long oldLength = old.getPrecision();
if (oldLength != length) {
throw DbException.throwInternalError("Length is different");
}
Object[] value = lobMap.get(oldLobId);
byte[] streamStoreId = (byte[]) value[0];
long lobId = streamStore.getAndIncrementNextKey();
value[1] = tableId;
lobMap.put(lobId, value);
Object[] key = new Object[] { streamStoreId, lobId };
refMap.put(key, Boolean.TRUE);
ValueLobDb lob = ValueLobDb.create(type, database, tableId, lobId, null, length);
return lob;
}
@Override
public InputStream getInputStream(ValueLobDb lob, byte[] hmac, long byteCount)
throws IOException {
init();
Object[] value = lobMap.get(lob.getLobId());
byte[] streamStoreId = (byte[]) value[0];
return streamStore.get(streamStoreId);
}
@Override
public void setTable(ValueLobDb lob, int tableId) {
init();
long lobId = lob.getLobId();
Object[] value = lobMap.remove(lobId);
value[1] = tableId;
lobMap.put(lobId, value);
}
@Override
public void removeAllForTable(int tableId) {
init();
// this might not be very efficient - to speed it up, we would need yet another map
ArrayList<Long> list = New.arrayList();
for (Entry<Long, Object[]> e : lobMap.entrySet()) {
Object[] value = e.getValue();
int t = (Integer) value[1];
if (t == tableId) {
list.add(e.getKey());
}
}
for (long lobId : list) {
removeLob(tableId, lobId);
}
}
@Override
public void removeLob(ValueLobDb lob) {
init();
int tableId = lob.getTableId();
long lobId = lob.getLobId();
removeLob(tableId, lobId);
}
private void removeLob(int tableId, long lobId) {
Object[] value = lobMap.remove(lobId);
byte[] streamStoreId = (byte[]) value[0];
// check if there are more entries for this streamStoreId
Object[] key = new Object[] {streamStoreId, 0 };
value = refMap.ceilingKey(key);
boolean hasMoreEntries = false;
if (value != null) {
byte[] s2 = (byte[]) value[0];
if (Arrays.equals(streamStoreId, s2)) {
hasMoreEntries = true;
}
}
if (!hasMoreEntries) {
streamStore.remove(streamStoreId);
}
}
}
......@@ -10,6 +10,7 @@ import java.io.IOException;
import java.io.InputStream;
import org.h2.message.DbException;
import org.h2.value.ValueLobDb;
/**
* An input stream that reads from a remote LOB.
......@@ -38,9 +39,9 @@ class LobStorageRemoteInputStream extends InputStream {
*/
private long remainingBytes;
public LobStorageRemoteInputStream(DataHandler handler, long lob, byte[] hmac, long byteCount) {
public LobStorageRemoteInputStream(DataHandler handler, ValueLobDb lob, byte[] hmac, long byteCount) {
this.handler = handler;
this.lob = lob;
this.lob = lob.getLobId();
this.hmac = hmac;
remainingBytes = byteCount;
}
......
......@@ -278,12 +278,12 @@ public class PageLog {
int pageId = in.readVarInt();
int size = in.readVarInt();
if (size == 0) {
in.readFully(data.getBytes(), 0, store.getPageSize());
in.readFully(data.getBytes(), store.getPageSize());
} else if (size == 1) {
// empty
Arrays.fill(data.getBytes(), 0, store.getPageSize(), (byte) 0);
} else {
in.readFully(compressBuffer, 0, size);
in.readFully(compressBuffer, size);
try {
compress.expand(compressBuffer, 0, size, data.getBytes(), 0, store.getPageSize());
} catch (ArrayIndexOutOfBoundsException e) {
......@@ -445,7 +445,7 @@ public class PageLog {
int len = in.readVarInt();
data.reset();
data.checkCapacity(len);
in.readFully(data.getBytes(), 0, len);
in.readFully(data.getBytes(), len);
int columnCount = data.readVarInt();
Value[] values = new Value[columnCount];
for (int i = 0; i < columnCount; i++) {
......
......@@ -8,20 +8,26 @@ package org.h2.tools;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.io.Reader;
import java.io.SequenceInputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.zip.CRC32;
import org.h2.api.JavaObjectSerializer;
import org.h2.compress.CompressLZF;
......@@ -33,6 +39,7 @@ import org.h2.message.DbException;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.MVStoreTool;
import org.h2.mvstore.StreamStore;
import org.h2.mvstore.db.TransactionStore;
import org.h2.mvstore.db.TransactionStore.TransactionMap;
import org.h2.mvstore.db.ValueDataType;
......@@ -93,6 +100,7 @@ public class Recover extends Tool implements DataHandler {
private int[] parents;
private Stats stat;
private boolean lobMaps;
/**
* Statistic data
......@@ -215,6 +223,59 @@ public class Recover extends Tool implements DataHandler {
return ValueLobDb.create(Value.CLOB, h, LobStorageFrontend.TABLE_TEMP, lobId, null, precision);
}
/**
* INTERNAL
*/
public static InputStream readBlobMap(Connection conn, long lobId, long precision) throws SQLException {
final PreparedStatement prep = conn.prepareStatement(
"SELECT DATA FROM INFORMATION_SCHEMA.LOB_BLOCKS " +
"WHERE LOB_ID = ? AND SEQ = ? AND ? > 0");
prep.setLong(1, lobId);
// precision is currently not really used,
// it is just to improve readability of the script
prep.setLong(3, precision);
return new SequenceInputStream(
new Enumeration<InputStream>() {
private int seq;
private byte[] data = fetch();
private byte[] fetch() {
try {
prep.setInt(2, seq++);
ResultSet rs = prep.executeQuery();
if (rs.next()) {
return rs.getBytes(1);
}
return null;
} catch (SQLException e) {
throw DbException.convert(e);
}
}
@Override
public boolean hasMoreElements() {
return data != null;
}
@Override
public InputStream nextElement() {
ByteArrayInputStream in = new ByteArrayInputStream(data);
data = fetch();
return in;
}
}
);
}
/**
* INTERNAL
*/
public static Reader readClobMap(Connection conn, long lobId, long precision) throws Exception {
InputStream in = readBlobMap(conn, lobId, precision);
return new BufferedReader(new InputStreamReader(in, "UTF-8"));
}
private void trace(String message) {
if (trace) {
out.println(message);
......@@ -353,10 +414,15 @@ public class Recover extends Tool implements DataHandler {
String columnType;
if (type == Value.BLOB) {
columnType = "BLOB";
m = "READ_BLOB_DB";
m = "READ_BLOB";
} else {
columnType = "CLOB";
m = "READ_CLOB_DB";
m = "READ_CLOB";
}
if (lobMaps) {
m += "_MAP";
} else {
m += "_DB";
}
columnTypeMap.put(column, columnType);
return m + "(" + id + ", " + precision + ")";
......@@ -482,24 +548,24 @@ public class Recover extends Tool implements DataHandler {
}
private void dumpMVStoreFile(PrintWriter writer, String fileName) {
writer.println("-- mvstore");
writer.println("-- MVStore");
writer.println("CREATE ALIAS IF NOT EXISTS READ_BLOB FOR \"" + this.getClass().getName() + ".readBlob\";");
writer.println("CREATE ALIAS IF NOT EXISTS READ_CLOB FOR \"" + this.getClass().getName() + ".readClob\";");
writer.println("CREATE ALIAS IF NOT EXISTS READ_BLOB_DB FOR \"" + this.getClass().getName() + ".readBlobDb\";");
writer.println("CREATE ALIAS IF NOT EXISTS READ_CLOB_DB FOR \"" + this.getClass().getName() + ".readClobDb\";");
writer.println("CREATE ALIAS IF NOT EXISTS READ_BLOB_MAP FOR \"" + this.getClass().getName() + ".readBlobMap\";");
writer.println("CREATE ALIAS IF NOT EXISTS READ_CLOB_MAP FOR \"" + this.getClass().getName() + ".readClobMap\";");
resetSchema();
setDatabaseName(fileName.substring(0, fileName.length() - Constants.SUFFIX_MV_FILE.length()));
MVStore mv = new MVStore.Builder().fileName(fileName).readOnly().open();
dumpLobMaps(writer, mv);
writer.println("-- Tables");
TransactionStore store = new TransactionStore(mv);
try {
MVMap<String, String> metaMap = mv.getMetaMap();
Iterator<String> it = metaMap.keyIterator(null);
while (it.hasNext()) {
String key = it.next();
if (!key.startsWith("name.table.")) {
for (String mapName : mv.getMapNames()) {
if (!mapName.startsWith("table.")) {
continue;
}
String mapName = key.substring("name.".length());
String tableId = mapName.substring("table.".length());
ValueDataType keyType = new ValueDataType(
null, this, null);
......@@ -552,6 +618,9 @@ public class Recover extends Tool implements DataHandler {
}
}
writeSchema(writer);
writer.println("DROP ALIAS READ_BLOB_MAP;");
writer.println("DROP ALIAS READ_CLOB_MAP;");
writer.println("DROP TABLE IF EXISTS INFORMATION_SCHEMA.LOB_BLOCKS;");
} catch (Throwable e) {
writeError(writer, e);
} finally {
......@@ -559,6 +628,43 @@ public class Recover extends Tool implements DataHandler {
}
}
private void dumpLobMaps(PrintWriter writer, MVStore mv) {
lobMaps = mv.hasMap("lobData");
if (!lobMaps) {
return;
}
MVMap<Long, byte[]> lobData = mv.openMap("lobData");
StreamStore streamStore = new StreamStore(lobData);
MVMap<Long, Object[]> lobMap = mv.openMap("lobMap");
writer.println("-- LOB");
writer.println("CREATE TABLE IF NOT EXISTS " +
"INFORMATION_SCHEMA.LOB_BLOCKS(" +
"LOB_ID BIGINT, SEQ INT, DATA BINARY);");
for (Entry<Long, Object[]> e : lobMap.entrySet()) {
long lobId = e.getKey();
Object[] value = e.getValue();
byte[] streamStoreId = (byte[]) value[0];
InputStream in = streamStore.get(streamStoreId);
int len = 8 * 1024;
byte[] block = new byte[len];
try {
for (int seq = 0;; seq++) {
int l = IOUtils.readFully(in, block, block.length);
String x = StringUtils.convertBytesToHex(block, l);
if (l > 0) {
writer.println("INSERT INTO INFORMATION_SCHEMA.LOB_BLOCKS " +
"VALUES(" + lobId + ", " + seq + ", '" + x + "');");
}
if (l != len) {
break;
}
}
} catch (IOException ex) {
writeError(writer, ex);
}
}
}
private static String getPageType(int type) {
switch (type) {
case 0:
......@@ -704,12 +810,12 @@ public class Recover extends Tool implements DataHandler {
int size = in.readVarInt();
byte[] data = new byte[pageSize];
if (size == 0) {
in.readFully(data, 0, pageSize);
in.readFully(data, pageSize);
} else if (size == 1) {
// empty
} else {
byte[] compressBuffer = new byte[size];
in.readFully(compressBuffer, 0, size);
in.readFully(compressBuffer, size);
try {
compress.expand(compressBuffer, 0, size, data, 0, pageSize);
} catch (ArrayIndexOutOfBoundsException e) {
......
......@@ -312,21 +312,18 @@ public class IOUtils {
*
* @param in the input stream
* @param buffer the output buffer
* @param off the offset in the buffer
* @param max the number of bytes to read at most
* @return the number of bytes read, 0 meaning EOF
*/
public static int readFully(InputStream in, byte[] buffer, int off, int max) throws IOException {
public static int readFully(InputStream in, byte[] buffer, int max) throws IOException {
try {
int len = Math.min(max, buffer.length);
int result = 0;
int result = 0, len = Math.min(max, buffer.length);
while (len > 0) {
int l = in.read(buffer, off, len);
int l = in.read(buffer, result, len);
if (l < 0) {
break;
}
result += l;
off += l;
len -= l;
}
return result;
......@@ -343,26 +340,20 @@ public class IOUtils {
* @param in the reader
* @param buffer the output buffer
* @param max the number of characters to read at most
* @return the number of characters read
* @return the number of characters read, 0 meaning EOF
*/
public static int readFully(Reader in, char[] buffer, int max) throws IOException {
try {
int off = 0, len = Math.min(max, buffer.length);
if (len == 0) {
return 0;
}
while (true) {
int l = len - off;
if (l <= 0) {
break;
}
l = in.read(buffer, off, l);
int result = 0, len = Math.min(max, buffer.length);
while (len > 0) {
int l = in.read(buffer, result, len);
if (l < 0) {
break;
}
off += l;
result += l;
len -= l;
}
return off <= 0 ? -1 : off;
return result;
} catch (Exception e) {
throw DbException.convertToIOException(e);
}
......
......@@ -31,7 +31,6 @@ import org.h2.jdbc.JdbcBlob;
import org.h2.jdbc.JdbcClob;
import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
import org.h2.store.LobStorageFrontend;
import org.h2.tools.SimpleResultSet;
import org.h2.util.New;
import org.h2.util.Utils;
......@@ -564,7 +563,7 @@ public class DataType {
}
case Value.CLOB: {
if (session == null) {
v = LobStorageFrontend.createSmallLob(Value.CLOB, rs.getString(columnIndex).getBytes(Constants.UTF8));
v = ValueLobDb.createSmallLob(Value.CLOB, rs.getString(columnIndex).getBytes(Constants.UTF8));
} else {
Reader in = rs.getCharacterStream(columnIndex);
if (in == null) {
......@@ -577,7 +576,7 @@ public class DataType {
}
case Value.BLOB: {
if (session == null) {
v = LobStorageFrontend.createSmallLob(Value.BLOB, rs.getBytes(columnIndex));
v = ValueLobDb.createSmallLob(Value.BLOB, rs.getBytes(columnIndex));
} else {
InputStream in = rs.getBinaryStream(columnIndex);
v = (in == null) ? (Value) ValueNull.INSTANCE : session.getDataHandler().getLobStorage().createBlob(in, -1);
......
......@@ -596,7 +596,7 @@ public class Transfer {
}
int len = (int) length;
byte[] small = new byte[len];
IOUtils.readFully(in, small, 0, len);
IOUtils.readFully(in, small, len);
int magic = readInt();
if (magic != LOB_MAGIC) {
throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, "magic=" + magic);
......
......@@ -24,7 +24,6 @@ import org.h2.constant.SysProperties;
import org.h2.engine.Constants;
import org.h2.message.DbException;
import org.h2.store.DataHandler;
import org.h2.store.LobStorageFrontend;
import org.h2.tools.SimpleResultSet;
import org.h2.util.DateTimeUtils;
import org.h2.util.MathUtils;
......@@ -783,7 +782,7 @@ public abstract class Value {
case BLOB: {
switch(getType()) {
case BYTES:
return LobStorageFrontend.createSmallLob(Value.BLOB, getBytesNoCopy());
return ValueLobDb.createSmallLob(Value.BLOB, getBytesNoCopy());
}
break;
}
......@@ -856,9 +855,9 @@ public abstract class Value {
case FLOAT:
return ValueFloat.get(Float.parseFloat(s.trim()));
case CLOB:
return LobStorageFrontend.createSmallLob(CLOB, s.getBytes(Constants.UTF8));
return ValueLobDb.createSmallLob(CLOB, s.getBytes(Constants.UTF8));
case BLOB:
return LobStorageFrontend.createSmallLob(BLOB, StringUtils.convertHexToBytes(s.trim()));
return ValueLobDb.createSmallLob(BLOB, StringUtils.convertHexToBytes(s.trim()));
case ARRAY:
return ValueArray.get(new Value[]{ValueString.get(s)});
case RESULT_SET: {
......
......@@ -178,7 +178,6 @@ public class ValueLob extends Value {
} else {
buff = new char[len];
len = IOUtils.readFully(in, buff, len);
len = len < 0 ? 0 : len;
}
if (len <= handler.getMaxLengthInplaceLob()) {
byte[] small = new String(buff, 0, len).getBytes(Constants.UTF8);
......@@ -227,7 +226,7 @@ public class ValueLob extends Value {
}
len = getBufferSize(h, compress, remaining);
len = IOUtils.readFully(in, buff, len);
if (len <= 0) {
if (len == 0) {
break;
}
}
......@@ -366,7 +365,7 @@ public class ValueLob extends Value {
len = buff.length;
} else {
buff = DataUtils.newBytes(len);
len = IOUtils.readFully(in, buff, 0, len);
len = IOUtils.readFully(in, buff, len);
}
if (len <= handler.getMaxLengthInplaceLob()) {
byte[] small = DataUtils.newBytes(len);
......@@ -416,7 +415,7 @@ public class ValueLob extends Value {
break;
}
len = getBufferSize(h, compress, remaining);
len = IOUtils.readFully(in, buff, 0, len);
len = IOUtils.readFully(in, buff, len);
if (len <= 0) {
break;
}
......
......@@ -31,8 +31,10 @@ import org.h2.util.StringUtils;
import org.h2.util.Utils;
/**
* An alternate LOB implementation, where LOB data is stored inside the
* database, instead of in external files.
* A implementation of the BLOB and CLOB data types.
*
* Small objects are kept in memory and stored in the record.
* Large objects are either stored in the database, or in temporary files.
*/
public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlob {
......@@ -77,7 +79,7 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
}
/**
* Create temporary CLOB from Reader.
* Create a CLOB in a temporary file.
*/
private ValueLobDb(DataHandler handler, Reader in, long remaining) throws IOException {
this.type = Value.CLOB;
......@@ -95,7 +97,7 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
while (true) {
int len = getBufferSize(this.handler, false, remaining);
len = IOUtils.readFully(in, buff, len);
if (len <= 0) {
if (len == 0) {
break;
}
}
......@@ -106,7 +108,7 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
}
/**
* Create temporary BLOB from InputStream.
* Create a BLOB in a temporary file.
*/
private ValueLobDb(DataHandler handler, byte[] buff, int len, InputStream in,
long remaining) throws IOException {
......@@ -130,7 +132,7 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
break;
}
len = getBufferSize(this.handler, compress, remaining);
len = IOUtils.readFully(in, buff, 0, len);
len = IOUtils.readFully(in, buff, len);
if (len <= 0) {
break;
}
......@@ -165,18 +167,6 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
return new ValueLobDb(type, handler, tableId, id, hmac, precision);
}
/**
* Create a small lob using the given byte array.
*
* @param type the type (Value.BLOB or CLOB)
* @param small the byte array
* @param precision the precision
* @return the lob value
*/
public static ValueLobDb createSmallLob(int type, byte[] small, long precision) {
return new ValueLobDb(type, small, precision);
}
/**
* Convert a lob to another data type. The data is fully read in memory
* except when converting to BLOB or CLOB.
......@@ -193,14 +183,14 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
Value copy = handler.getLobStorage().createClob(getReader(), -1);
return copy;
} else if (small != null) {
return LobStorageFrontend.createSmallLob(t, small);
return ValueLobDb.createSmallLob(t, small);
}
} else if (t == Value.BLOB) {
if (handler != null) {
Value copy = handler.getLobStorage().createBlob(getInputStream(), -1);
return copy;
} else if (small != null) {
return LobStorageFrontend.createSmallLob(t, small);
return ValueLobDb.createSmallLob(t, small);
}
}
return super.convertTo(t);
......@@ -228,14 +218,14 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
}
}
if (handler != null) {
handler.getLobStorage().removeLob(lobId);
handler.getLobStorage().removeLob(this);
}
}
@Override
public void unlink(DataHandler database) {
if (small == null && tableId != LobStorageFrontend.TABLE_ID_SESSION_VARIABLE) {
database.getLobStorage().setTable(lobId, LobStorageFrontend.TABLE_ID_SESSION_VARIABLE);
database.getLobStorage().setTable(this, LobStorageFrontend.TABLE_ID_SESSION_VARIABLE);
tableId = LobStorageFrontend.TABLE_ID_SESSION_VARIABLE;
}
}
......@@ -244,10 +234,10 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
public Value link(DataHandler database, int tabId) {
if (small == null) {
if (tableId == LobStorageFrontend.TABLE_TEMP) {
database.getLobStorage().setTable(lobId, tabId);
database.getLobStorage().setTable(this, tabId);
this.tableId = tabId;
} else {
return handler.getLobStorage().copyLob(type, lobId, tabId, getPrecision());
return handler.getLobStorage().copyLob(this, tabId, getPrecision());
}
} else if (small.length > database.getMaxLengthInplaceLob()) {
LobStorageInterface s = database.getLobStorage();
......@@ -390,7 +380,7 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
}
long byteCount = (type == Value.BLOB) ? precision : -1;
try {
return handler.getLobStorage().getInputStream(lobId, hmac, byteCount);
return handler.getLobStorage().getInputStream(this, hmac, byteCount);
} catch (IOException e) {
throw DbException.convertIOException(e, toString());
}
......@@ -515,7 +505,6 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
buff = new char[len];
reader.mark(len);
len = IOUtils.readFully(reader, buff, len);
len = len < 0 ? 0 : len;
}
if (len <= handler.getMaxLengthInplaceLob()) {
byte[] small = new String(buff, 0, len).getBytes(Constants.UTF8);
......@@ -551,7 +540,7 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
len = buff.length;
} else {
buff = DataUtils.newBytes(len);
len = IOUtils.readFully(in, buff, 0, len);
len = IOUtils.readFully(in, buff, len);
}
if (len <= handler.getMaxLengthInplaceLob()) {
byte[] small = DataUtils.newBytes(len);
......@@ -572,7 +561,7 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
int inplace = handler.getMaxLengthInplaceLob();
long m = compress ? Constants.IO_BUFFER_SIZE_COMPRESS : Constants.IO_BUFFER_SIZE;
if (m < remaining && m <= inplace) {
// using "1L" to force long arithmetic
// using "1L" to force long arithmetic because inplace could be Integer.MAX_VALUE
m = Math.min(remaining, inplace + 1L);
// the buffer size must be bigger than the inplace lob, otherwise we can't
// know if it must be stored in-place or not
......@@ -621,4 +610,33 @@ public class ValueLobDb extends Value implements Value.ValueClob, Value.ValueBlo
return lob;
}
/**
* Create a LOB object that fits in memory.
*
* @param type the type (Value.BLOB or CLOB)
* @param small the byte array
* @return the LOB
*/
public static Value createSmallLob(int type, byte[] small) {
int precision;
if (type == Value.CLOB) {
precision = new String(small, Constants.UTF8).length();
} else {
precision = small.length;
}
return createSmallLob(type, small, precision);
}
/**
* Create a LOB object that fits in memory.
*
* @param type the type (Value.BLOB or CLOB)
* @param small the byte array
* @param precision the precision
* @return the LOB
*/
public static ValueLobDb createSmallLob(int type, byte[] small, long precision) {
return new ValueLobDb(type, small, precision);
}
}
......@@ -235,7 +235,7 @@ java org.h2.test.TestAll timer
*/
;
private static final boolean MV_STORE = false;
private static final boolean MV_STORE = true;
/**
* If the test should run with many rows.
......
......@@ -219,6 +219,7 @@ public class TestLob extends TestBase {
prep.execute();
}
if (upgraded) {
if (!config.mvStore) {
if (config.memory) {
stat.execute("update information_schema.lob_map set pos=null");
} else {
......@@ -227,6 +228,7 @@ public class TestLob extends TestBase {
conn = getConnection("lob");
}
}
}
prep = conn.prepareStatement("select * from test where id = ?");
for (int i = 0; i < 1; i++) {
random.setSeed(i);
......@@ -527,7 +529,7 @@ public class TestLob extends TestBase {
}
private void testDelete() throws Exception {
if (config.memory) {
if (config.memory || config.mvStore) {
return;
}
deleteDb("lob");
......
......@@ -134,7 +134,9 @@ public class TestLobApi extends TestBase {
Clob c2 = rs.getClob(2);
Blob b2 = rs.getBlob(3);
assertFalse(rs.next());
// now close
rs.close();
// but the LOBs must stay open
assertEquals(0, c1.length());
assertEquals(0, b1.length());
assertEquals(chars.length, c2.length());
......
......@@ -43,6 +43,7 @@ public class TestStreamStore extends TestBase {
FileUtils.deleteRecursive(getBaseDir(), true);
FileUtils.createDirectories(getBaseDir());
testExceptionDuringStore();
testReadCount();
testLarge();
testDetectIllegalId();
......@@ -53,6 +54,12 @@ public class TestStreamStore extends TestBase {
testLoop();
}
private void testExceptionDuringStore() {
// TODO test that if there is an IOException while storing
// the data, the entries in the map are rolled back
;
}
private void testReadCount() throws IOException {
String fileName = getBaseDir() + "/testReadCount.h3";
FileUtils.delete(fileName);
......
......@@ -6,7 +6,12 @@
*/
package org.h2.test.unit;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.io.StringReader;
import java.math.BigInteger;
import java.sql.Timestamp;
import java.util.ArrayList;
......@@ -15,6 +20,7 @@ import java.util.Comparator;
import java.util.Date;
import java.util.Random;
import org.h2.test.TestBase;
import org.h2.util.IOUtils;
import org.h2.util.Utils;
/**
......@@ -38,6 +44,7 @@ public class TestUtils extends TestBase {
@Override
public void test() throws Exception {
testIOUtils();
testSortTopN();
testSortTopNRandom();
testWriteReadLong();
......@@ -47,6 +54,45 @@ public class TestUtils extends TestBase {
testReflectionUtils();
}
private void testIOUtils() throws IOException {
for (int i = 0; i < 20; i++) {
byte[] data = new byte[i];
InputStream in = new ByteArrayInputStream(data);
byte[] buffer = new byte[i];
assertEquals(0, IOUtils.readFully(in, buffer, -2));
assertEquals(0, IOUtils.readFully(in, buffer, -1));
assertEquals(0, IOUtils.readFully(in, buffer, 0));
for (int j = 1, off = 0;; j += 1) {
int read = Math.max(0, Math.min(i - off, j));
int l = IOUtils.readFully(in, buffer, j);
assertEquals(read, l);
off += l;
if (l == 0) {
break;
}
}
assertEquals(0, IOUtils.readFully(in, buffer, 1));
}
for (int i = 0; i < 10; i++) {
char[] data = new char[i];
Reader in = new StringReader(new String(data));
char[] buffer = new char[i];
assertEquals(0, IOUtils.readFully(in, buffer, -2));
assertEquals(0, IOUtils.readFully(in, buffer, -1));
assertEquals(0, IOUtils.readFully(in, buffer, 0));
for (int j = 1, off = 0;; j += 1) {
int read = Math.max(0, Math.min(i - off, j));
int l = IOUtils.readFully(in, buffer, j);
assertEquals(read, l);
off += l;
if (l == 0) {
break;
}
}
assertEquals(0, IOUtils.readFully(in, buffer, 1));
}
}
private void testWriteReadLong() {
byte[] buff = new byte[8];
for (long x : new long[]{Long.MIN_VALUE, Long.MAX_VALUE, 0, 1, -1,
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论