提交 f3ec805a authored 作者: Noel Grandin's avatar Noel Grandin

Issue #324: Deadlock when sending BLOBs over TCP

上级 add3db91
......@@ -21,6 +21,8 @@ Change Log
<h2>Next Version (unreleased)</h2>
<ul>
<li>Issue #324: Deadlock when sending BLOBs over TCP
</li>
<li>Fix for creating and accessing views in MULTITHREADED mode, test-case courtesy of Daniel Rosenbaum
</li>
<li>Issue #266: Spatial index not updating, fixed by merging PR #267
......
......@@ -1602,10 +1602,11 @@ public class Function extends Expression implements FunctionCall {
String fileName = v0.getString();
boolean blob = args.length == 1;
try {
long fileLength = FileUtils.size(fileName);
InputStream in = new AutoCloseInputStream(
FileUtils.newInputStream(fileName));
if (blob) {
result = database.getLobStorage().createBlob(in, -1);
result = database.getLobStorage().createBlob(in, fileLength);
} else {
Reader reader;
if (v1 == ValueNull.INSTANCE) {
......@@ -1613,7 +1614,7 @@ public class Function extends Expression implements FunctionCall {
} else {
reader = new InputStreamReader(in, v1.getString());
}
result = database.getLobStorage().createClob(reader, -1);
result = database.getLobStorage().createClob(reader, fileLength);
}
session.addTemporaryLob(result);
} catch (IOException e) {
......
......@@ -12,11 +12,10 @@ import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharsetEncoder;
import java.nio.charset.CodingErrorAction;
import org.h2.engine.Constants;
/**
* An input stream that reads the data from a reader.
* An input stream that reads the data from a reader and limits the number of bytes that can be read.
*/
public class CountingReaderInputStream extends InputStream {
......
......@@ -5,15 +5,12 @@
*/
package org.h2.store;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Map.Entry;
import org.h2.api.ErrorCode;
import org.h2.engine.Constants;
import org.h2.engine.Database;
......@@ -145,26 +142,21 @@ public class LobStorageMap implements LobStorageInterface {
public Value createBlob(InputStream in, long maxLength) {
init();
int type = Value.BLOB;
if (maxLength < 0) {
maxLength = Long.MAX_VALUE;
}
int max = (int) Math.min(maxLength, database.getMaxLengthInplaceLob());
try {
if (max != 0 && max < Integer.MAX_VALUE) {
BufferedInputStream b = new BufferedInputStream(in, max);
b.mark(max);
byte[] small = new byte[max];
int len = IOUtils.readFully(b, small, max);
if (len < max) {
if (len < small.length) {
small = Arrays.copyOf(small, len);
}
return ValueLobDb.createSmallLob(type, small);
if (maxLength != -1
&& maxLength <= database.getMaxLengthInplaceLob()) {
byte[] small = new byte[(int) maxLength];
int len = IOUtils.readFully(in, small, (int) maxLength);
if (len > maxLength) {
throw new IllegalStateException(
"len > blobLength, " + len + " > " + maxLength);
}
if (len < small.length) {
small = Arrays.copyOf(small, len);
}
b.reset();
in = b;
return ValueLobDb.createSmallLob(type, small);
}
if (maxLength != Long.MAX_VALUE) {
if (maxLength != -1) {
in = new LimitInputStream(in, maxLength);
}
return createLob(in, type);
......@@ -179,32 +171,34 @@ public class LobStorageMap implements LobStorageInterface {
public Value createClob(Reader reader, long maxLength) {
init();
int type = Value.CLOB;
if (maxLength < 0) {
maxLength = Long.MAX_VALUE;
}
int max = (int) Math.min(maxLength, database.getMaxLengthInplaceLob());
try {
if (max != 0 && max < Integer.MAX_VALUE) {
BufferedReader b = new BufferedReader(reader, max);
b.mark(max);
char[] small = new char[max];
int len = IOUtils.readFully(b, small, max);
if (len < max) {
if (len < small.length) {
small = Arrays.copyOf(small, len);
}
byte[] utf8 = new String(small, 0, len).getBytes(Constants.UTF8);
return ValueLobDb.createSmallLob(type, utf8);
// we multiple by 3 here to get the worst-case size in bytes
if (maxLength != -1
&& maxLength * 3 <= database.getMaxLengthInplaceLob()) {
char[] small = new char[(int) maxLength];
int len = IOUtils.readFully(reader, small, (int) maxLength);
if (len > maxLength) {
throw new IllegalStateException(
"len > blobLength, " + len + " > " + maxLength);
}
byte[] utf8 = new String(small, 0, len)
.getBytes(Constants.UTF8);
if (utf8.length > database.getMaxLengthInplaceLob()) {
throw new IllegalStateException(
"len > maxinplace, " + utf8.length + " > "
+ database.getMaxLengthInplaceLob());
}
b.reset();
reader = b;
return ValueLobDb.createSmallLob(type, utf8);
}
if (maxLength < 0) {
maxLength = Long.MAX_VALUE;
}
CountingReaderInputStream in =
new CountingReaderInputStream(reader, maxLength);
CountingReaderInputStream in = new CountingReaderInputStream(reader,
maxLength);
ValueLobDb lob = createLob(in, type);
// the length is not correct
lob = ValueLobDb.create(type, database,
lob.getTableId(), lob.getLobId(), null, in.getLength());
lob = ValueLobDb.create(type, database, lob.getTableId(),
lob.getLobId(), null, in.getLength());
return lob;
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.OBJECT_CLOSED, e);
......
......@@ -1046,10 +1046,10 @@ public class DataType {
createClob(r, -1);
} else if (x instanceof java.sql.Clob) {
try {
Reader r = new BufferedReader(
((java.sql.Clob) x).getCharacterStream());
java.sql.Clob clob = (java.sql.Clob) x;
Reader r = new BufferedReader(clob.getCharacterStream());
return session.getDataHandler().getLobStorage().
createClob(r, -1);
createClob(r, clob.length());
} catch (SQLException e) {
throw DbException.convert(e);
}
......@@ -1058,8 +1058,9 @@ public class DataType {
createBlob((java.io.InputStream) x, -1);
} else if (x instanceof java.sql.Blob) {
try {
java.sql.Blob blob = (java.sql.Blob) x;
return session.getDataHandler().getLobStorage().
createBlob(((java.sql.Blob) x).getBinaryStream(), -1);
createBlob(blob.getBinaryStream(), blob.length());
} catch (SQLException e) {
throw DbException.convert(e);
}
......
......@@ -23,7 +23,6 @@ import java.sql.SQLException;
import java.sql.Savepoint;
import java.sql.Statement;
import java.util.Random;
import org.h2.api.ErrorCode;
import org.h2.engine.SysProperties;
import org.h2.jdbc.JdbcConnection;
......@@ -84,6 +83,7 @@ public class TestLob extends TestBase {
testDelete();
testLobServerMemory();
testUpdatingLobRow();
testBufferedInputStreamBug();
if (config.memory) {
return;
}
......@@ -1505,6 +1505,17 @@ public class TestLob extends TestBase {
conn.close();
}
/** test a bug where the usage of BufferedInputStream in LobStorageMap was causing a deadlock */
private void testBufferedInputStreamBug() throws SQLException {
deleteDb("lob");
JdbcConnection conn = (JdbcConnection) getConnection("lob");
conn.createStatement().execute("CREATE TABLE TEST(test BLOB)");
PreparedStatement ps = conn.prepareStatement("INSERT INTO TEST(test) VALUES(?)");
ps.setBlob(1, new ByteArrayInputStream(new byte[257]));
ps.executeUpdate();
conn.close();
}
private static Reader getRandomReader(int len, int seed) {
return new CharArrayReader(getRandomChars(len, seed));
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论