提交 59b691be authored 作者: Thomas Mueller's avatar Thomas Mueller

Server mode: if there was an error while reading from a LOB, the session was closed in some cases.

上级 0324fd77
...@@ -18,7 +18,8 @@ Change Log ...@@ -18,7 +18,8 @@ Change Log
<h1>Change Log</h1> <h1>Change Log</h1>
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>Issue 463: Driver name and version are now the same in OsgiDataSourceFactory and JdbcDatabaseMetaData. <ul><li>Server mode: if there was an error while reading from a LOB, the session was closed in some cases.
</li><li>Issue 463: Driver name and version are now the same in OsgiDataSourceFactory and JdbcDatabaseMetaData.
</li><li>JaQu: The data type VARCHAR is now (again) used for Strings </li><li>JaQu: The data type VARCHAR is now (again) used for Strings
(no longer TEXT, except when explicitly set). (no longer TEXT, except when explicitly set).
</li><li>For in-memory databases, creating an index on a CLOB or BLOB column </li><li>For in-memory databases, creating an index on a CLOB or BLOB column
......
...@@ -407,26 +407,33 @@ public class TcpServerThread implements Runnable { ...@@ -407,26 +407,33 @@ public class TcpServerThread implements Runnable {
long lobId = transfer.readLong(); long lobId = transfer.readLong();
byte[] hmac; byte[] hmac;
CachedInputStream in; CachedInputStream in;
boolean verifyMac;
if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_11) { if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_11) {
if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_12) { if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_12) {
hmac = transfer.readBytes(); hmac = transfer.readBytes();
transfer.verifyLobMac(hmac, lobId); verifyMac = true;
} else { } else {
hmac = null; hmac = null;
verifyMac = false;
} }
in = lobs.get(lobId); in = lobs.get(lobId);
if (in == null) { if (in == null && verifyMac) {
in = new CachedInputStream(null); in = new CachedInputStream(null);
lobs.put(lobId, in); lobs.put(lobId, in);
} }
} else { } else {
verifyMac = false;
hmac = null; hmac = null;
in = lobs.get(lobId); in = lobs.get(lobId);
if (in == null) {
throw DbException.get(ErrorCode.OBJECT_CLOSED);
}
} }
long offset = transfer.readLong(); long offset = transfer.readLong();
int length = transfer.readInt();
if (verifyMac) {
transfer.verifyLobMac(hmac, lobId);
}
if (in == null) {
throw DbException.get(ErrorCode.OBJECT_CLOSED);
}
if (in.getPos() != offset) { if (in.getPos() != offset) {
LobStorageInterface lobStorage = session.getDataHandler().getLobStorage(); LobStorageInterface lobStorage = session.getDataHandler().getLobStorage();
InputStream lobIn = lobStorage.getInputStream(lobId, hmac, -1); InputStream lobIn = lobStorage.getInputStream(lobId, hmac, -1);
...@@ -434,12 +441,11 @@ public class TcpServerThread implements Runnable { ...@@ -434,12 +441,11 @@ public class TcpServerThread implements Runnable {
lobs.put(lobId, in); lobs.put(lobId, in);
lobIn.skip(offset); lobIn.skip(offset);
} }
int length = transfer.readInt();
// limit the buffer size // limit the buffer size
length = Math.min(16 * Constants.IO_BUFFER_SIZE, length); length = Math.min(16 * Constants.IO_BUFFER_SIZE, length);
transfer.writeInt(SessionRemote.STATUS_OK);
byte[] buff = new byte[length]; byte[] buff = new byte[length];
length = IOUtils.readFully(in, buff, 0, length); length = IOUtils.readFully(in, buff, 0, length);
transfer.writeInt(SessionRemote.STATUS_OK);
transfer.writeInt(length); transfer.writeInt(length);
transfer.writeBytes(buff, 0, length); transfer.writeBytes(buff, 0, length);
transfer.flush(); transfer.flush();
......
...@@ -9,6 +9,8 @@ package org.h2.store; ...@@ -9,6 +9,8 @@ package org.h2.store;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import org.h2.message.DbException;
/** /**
* An input stream that reads from a remote LOB. * An input stream that reads from a remote LOB.
*/ */
...@@ -64,7 +66,11 @@ class LobStorageRemoteInputStream extends InputStream { ...@@ -64,7 +66,11 @@ class LobStorageRemoteInputStream extends InputStream {
if (length == 0) { if (length == 0) {
return -1; return -1;
} }
length = handler.readLob(lob, hmac, pos, buff, off, length); try {
length = handler.readLob(lob, hmac, pos, buff, off, length);
} catch (DbException e) {
throw DbException.convertToIOException(e);
}
remainingBytes -= length; remainingBytes -= length;
if (length == 0) { if (length == 0) {
return -1; return -1;
......
...@@ -29,6 +29,7 @@ import java.util.Random; ...@@ -29,6 +29,7 @@ import java.util.Random;
import org.h2.constant.ErrorCode; import org.h2.constant.ErrorCode;
import org.h2.constant.SysProperties; import org.h2.constant.SysProperties;
import org.h2.jdbc.JdbcConnection; import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
import org.h2.store.FileLister; import org.h2.store.FileLister;
import org.h2.store.fs.FileUtils; import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase; import org.h2.test.TestBase;
...@@ -298,7 +299,7 @@ public class TestLob extends TestBase { ...@@ -298,7 +299,7 @@ public class TestLob extends TestBase {
ResultSet rs = stat.executeQuery("select name from test where id = " + random.nextInt(999)); ResultSet rs = stat.executeQuery("select name from test where id = " + random.nextInt(999));
if (rs.next()) { if (rs.next()) {
Reader r = rs.getClob("name").getCharacterStream(); Reader r = rs.getClob("name").getCharacterStream();
while (r.read(tmp) > 0) { while (r.read(tmp) >= 0) {
// ignore // ignore
} }
r.close(); r.close();
...@@ -307,17 +308,24 @@ public class TestLob extends TestBase { ...@@ -307,17 +308,24 @@ public class TestLob extends TestBase {
} catch (SQLException ex) { } catch (SQLException ex) {
// ignore "LOB gone away", this can happen in the presence of concurrent updates // ignore "LOB gone away", this can happen in the presence of concurrent updates
if (ex.getErrorCode() != ErrorCode.IO_EXCEPTION_2) { if (ex.getErrorCode() != ErrorCode.IO_EXCEPTION_2) {
ex.printStackTrace(); throw ex;
} }
} catch (IOException ex) { } catch (IOException ex) {
// ignore "LOB gone away", this can happen in the presence of concurrent updates // ignore "LOB gone away", this can happen in the presence of concurrent updates
if (!(ex.getCause() instanceof SQLException)) { Exception e = ex;
ex.printStackTrace(); if (e.getCause() instanceof DbException) {
e = (Exception) e.getCause();
} }
SQLException ex2 = (SQLException) ex.getCause(); if (!(e.getCause() instanceof SQLException)) {
if (ex2.getErrorCode() != 90028) { throw ex;
ex.printStackTrace();
} }
SQLException e2 = (SQLException) e.getCause();
if (e2.getErrorCode() != ErrorCode.IO_EXCEPTION_1) {
throw ex;
}
} catch (Exception e) {
e.printStackTrace(System.out);
throw e;
} }
} }
} }
...@@ -358,12 +366,12 @@ public class TestLob extends TestBase { ...@@ -358,12 +366,12 @@ public class TestLob extends TestBase {
Deadlock2Task1 task4 = new Deadlock2Task1(); Deadlock2Task1 task4 = new Deadlock2Task1();
Deadlock2Task2 task5 = new Deadlock2Task2(); Deadlock2Task2 task5 = new Deadlock2Task2();
Deadlock2Task2 task6 = new Deadlock2Task2(); Deadlock2Task2 task6 = new Deadlock2Task2();
task1.execute(); task1.execute("task1");
task2.execute(); task2.execute("task2");
task3.execute(); task3.execute("task3");
task4.execute(); task4.execute("task4");
task5.execute(); task5.execute("task5");
task6.execute(); task6.execute("task6");
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
stat.execute("insert into test values(null, space(10000 + " + i + "), 1)"); stat.execute("insert into test values(null, space(10000 + " + i + "), 1)");
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论