提交 1cdec2b8 authored 作者: Thomas Mueller's avatar Thomas Mueller

--no commit message

--no commit message
上级 c77b7023
......@@ -249,6 +249,11 @@ Phase-6</a><br />
A computer based learning software.
</p>
<p><a href="http://code.google.com/p/pickle4j">
Pickle</a><br />
Pickle is a Java library containing classes for persistence, concurrency, and logging.
</p>
<p><a href="http://www.polepos.org">
PolePosition</a><br />
Open source database benchmark.
......
......@@ -238,10 +238,9 @@ public class CommandRemote implements CommandInterface {
/**
* Cancel this current statement.
* This method is not yet implemented for this class.
*/
public void cancel() {
// TODO server: support cancel
session.cancelStatement(id);
}
public String toString() {
......
......@@ -23,6 +23,7 @@ import org.h2.message.TraceSystem;
import org.h2.result.ResultInterface;
import org.h2.store.DataHandler;
import org.h2.store.FileStore;
import org.h2.util.ByteUtils;
import org.h2.util.FileUtils;
import org.h2.util.NetUtils;
import org.h2.util.ObjectArray;
......@@ -51,6 +52,8 @@ public class SessionRemote implements SessionInterface, DataHandler {
public static final int CHANGE_ID = 9;
public static final int COMMAND_GET_META_DATA = 10;
public static final int SESSION_PREPARE_READ_PARAMS = 11;
public static final int SESSION_SET_ID = 12;
public static final int SESSION_CANCEL_STATEMENT = 13;
public static final int STATUS_ERROR = 0;
public static final int STATUS_OK = 1;
......@@ -68,12 +71,14 @@ public class SessionRemote implements SessionInterface, DataHandler {
private String cipher;
private byte[] fileEncryptionKey;
private Object lobSyncObject = new Object();
private String sessionId;
private int clientVersion = Constants.TCP_DRIVER_VERSION_5;
private Transfer initTransfer(ConnectionInfo ci, String db, String server) throws IOException, SQLException {
Socket socket = NetUtils.createSocket(server, Constants.DEFAULT_SERVER_PORT, ci.isSSL());
Transfer trans = new Transfer(this);
trans.setSocket(socket);
trans.setSSL(ci.isSSL());
trans.init();
trans.writeInt(clientVersion);
trans.writeString(db);
......@@ -96,6 +101,34 @@ public class SessionRemote implements SessionInterface, DataHandler {
autoCommit = true;
return trans;
}
public void cancel() {
// this method is called when closing the connection
// the statement that is currently running is not cancelled in this case
// however Statement.cancel is supported
}
public void cancelStatement(int id) {
for (int i = 0; i < transferList.size(); i++) {
Transfer transfer = (Transfer) transferList.get(i);
try {
Transfer trans = transfer.openNewConnection();
trans.init();
trans.writeInt(clientVersion);
if (clientVersion >= Constants.TCP_DRIVER_VERSION_6) {
trans.writeInt(clientVersion);
}
trans.writeString(null);
trans.writeString(null);
trans.writeString(sessionId);
trans.writeInt(SessionRemote.SESSION_CANCEL_STATEMENT);
trans.writeInt(id);
trans.close();
} catch (IOException e) {
trace.debug("Could not cancel statement", e);
}
}
}
private void switchOffAutoCommitIfCluster() throws SQLException {
if (autoCommit && transferList.size() > 1) {
......@@ -256,6 +289,23 @@ public class SessionRemote implements SessionInterface, DataHandler {
trace.error("Error trying to upgrade client version", e);
// ignore
}
if (clientVersion >= Constants.TCP_DRIVER_VERSION_6) {
sessionId = ByteUtils.convertBytesToString(RandomUtils.getSecureBytes(32));
synchronized (this) {
for (int i = 0; i < transferList.size(); i++) {
Transfer transfer = (Transfer) transferList.get(i);
try {
traceOperation("SESSION_SET_ID", 0);
transfer.writeInt(SessionRemote.SESSION_SET_ID);
transfer.writeString(sessionId);
done(transfer);
} catch (Exception e) {
trace.error("sessionSetId", e);
}
}
}
}
}
private void switchOffCluster() throws SQLException {
......@@ -454,11 +504,6 @@ public class SessionRemote implements SessionInterface, DataHandler {
return lobSyncObject;
}
public void cancel() {
// TODO open another remote connection and cancel this session
// using a unique id (like PostgreSQL)
}
public boolean getLobFilesInDirectories() {
return false;
}
......
......@@ -493,10 +493,9 @@ public class JdbcStatement extends TraceObject implements Statement {
}
/**
* [Partially supported] Cancels a currently running statement.
* Cancels a currently running statement.
* This method must be called from within another
* thread than the execute method.
* This method is not supported in the server mode.
*
* @throws SQLException if this object is closed
*/
......
......@@ -385,4 +385,14 @@ public class TcpServer implements Service {
}
}
public void cancelStatement(String sessionId, int statementId) throws SQLException {
ArrayList list = new ArrayList(running);
for (int i = 0; i < list.size(); i++) {
TcpServerThread c = (TcpServerThread) list.get(i);
if (c != null) {
c.cancelStatement(sessionId, statementId);
}
}
}
}
......@@ -28,6 +28,7 @@ import org.h2.result.LocalResult;
import org.h2.result.ResultColumn;
import org.h2.util.ObjectArray;
import org.h2.util.SmallMap;
import org.h2.util.StringUtils;
import org.h2.value.Transfer;
import org.h2.value.Value;
......@@ -44,6 +45,7 @@ public class TcpServerThread implements Runnable {
private SmallMap cache = new SmallMap(SysProperties.SERVER_CACHED_OBJECTS);
private int id;
private int clientVersion;
private String sessionId;
public TcpServerThread(Socket socket, TcpServer server, int id) {
this.server = server;
......@@ -76,6 +78,15 @@ public class TcpServerThread implements Runnable {
}
String db = transfer.readString();
String originalURL = transfer.readString();
if (db == null && originalURL == null) {
String sessionId = transfer.readString();
int command = transfer.readInt();
stop = true;
if (command == SessionRemote.SESSION_CANCEL_STATEMENT) {
int statementId = transfer.readInt();
server.cancelStatement(sessionId, statementId);
}
}
String baseDir = server.getBaseDir();
if (baseDir == null) {
baseDir = SysProperties.getBaseDir();
......@@ -317,6 +328,11 @@ public class TcpServerThread implements Runnable {
cache.addObject(newId, obj);
break;
}
case SessionRemote.SESSION_SET_ID: {
sessionId = transfer.readString();
transfer.writeInt(SessionRemote.STATUS_OK).flush();
break;
}
default:
trace("Unknown operation: " + operation);
closeSession();
......@@ -344,4 +360,11 @@ public class TcpServerThread implements Runnable {
return thread;
}
public void cancelStatement(String sessionId, int statementId) throws SQLException {
if (StringUtils.equals(sessionId, this.sessionId)) {
Command cmd = (Command) cache.getObject(statementId, false);
cmd.cancel();
}
}
}
......@@ -17,6 +17,7 @@ import java.io.OutputStreamWriter;
import java.io.Reader;
import java.io.Writer;
import java.math.BigDecimal;
import java.net.InetAddress;
import java.net.Socket;
import java.sql.Date;
import java.sql.ResultSet;
......@@ -34,6 +35,7 @@ import org.h2.message.TraceSystem;
import org.h2.tools.SimpleResultSet;
import org.h2.util.ExactUTF8InputStreamReader;
import org.h2.util.IOUtils;
import org.h2.util.NetUtils;
import org.h2.util.StringCache;
/**
......@@ -49,6 +51,7 @@ public class Transfer {
protected Socket socket;
protected DataInputStream in;
protected DataOutputStream out;
private boolean ssl;
public Transfer(SessionInterface session) {
this.session = session;
......@@ -413,4 +416,18 @@ public class Transfer {
this.session = session;
}
public void setSSL(boolean ssl) {
this.ssl = ssl;
}
public Transfer openNewConnection() throws IOException {
InetAddress address = socket.getInetAddress();
int port = socket.getPort();
Socket socket = NetUtils.createSocket(address, port, ssl);
Transfer trans = new Transfer(null);
trans.setSocket(socket);
trans.setSSL(ssl);
return trans;
}
}
......@@ -162,6 +162,10 @@ java org.h2.test.TestAll timer
improve javadocs
Shell / JDK 1.6: use java.io.Console
</li><li>Implement Statement.cancel for server connections
upload jazoon
test case for out of memory (try to corrupt the database using out of memory)
......@@ -211,6 +215,8 @@ Add where required // TODO: change in version 1.1
http://www.w3schools.com/sql/
History:
Statements can now be cancelled remotely
(when using remote connections).
Roadmap:
......@@ -412,7 +418,6 @@ Roadmap:
* Run all tests with the current settings.
*/
private void test() throws Exception {
System.out.println();
System.out.println("Test big:"+big+" net:"+networked+" cipher:"+cipher+" memory:"+memory+" log:"+logMode+" diskResult:"+diskResult + " mvcc:" + mvcc + " deleteIndex:" + deleteIndex);
beforeTest();
......
......@@ -16,30 +16,38 @@ import org.h2.constant.ErrorCode;
import org.h2.constant.SysProperties;
import org.h2.test.TestBase;
/**
* Tests Statement.cancel
*/
public class TestCancel extends TestBase {
private static int lastVisited;
class CancelThread extends Thread {
private Statement cancel;
private int wait;
private volatile boolean stop;
CancelThread(Statement cancel, int wait) {
this.cancel = cancel;
this.wait = wait;
}
public void stopNow() {
this.stop = true;
}
public void run() {
try {
Thread.sleep(wait);
cancel.cancel();
Thread.yield();
} catch (SQLException e) {
// ignore errors on closed statements
} catch (Exception e) {
TestBase.logError("sleep", e);
while (!stop) {
try {
Thread.sleep(wait);
cancel.cancel();
Thread.yield();
} catch (SQLException e) {
// ignore errors on closed statements
} catch (Exception e) {
TestBase.logError("sleep", e);
}
}
}
}
......@@ -124,16 +132,22 @@ public class TestCancel extends TestBase {
System.setProperty("h2.maxQueryTimeout", "" + oldMax);
}
}
public static int visit(int x) {
lastVisited = x;
return x;
}
private void testCancelStatement() throws Exception {
deleteDb("cancel");
Connection conn = getConnection("cancel");
Statement stat = conn.createStatement();
stat.execute("DROP TABLE IF EXISTS TEST");
stat.execute("CREATE ALIAS VISIT FOR \"" + getClass().getName() + ".visit\"");
stat.execute("CREATE MEMORY TABLE TEST(ID INT PRIMARY KEY, NAME VARCHAR(255))");
PreparedStatement prep = conn.prepareStatement("INSERT INTO TEST VALUES(?, ?)");
trace("insert");
int len = getSize(1, 1000);
int len = getSize(10, 1000);
for (int i = 0; i < len; i++) {
prep.setInt(1, i);
// prep.setString(2, "Test Value "+i);
......@@ -145,23 +159,21 @@ public class TestCancel extends TestBase {
for (int i = 1;;) {
Statement query = conn.createStatement();
CancelThread cancel = new CancelThread(query, i);
visit(0);
cancel.start();
Thread.yield();
int j = 0;
try {
ResultSet rs = query.executeQuery("SELECT * FROM TEST");
while (rs.next()) {
j++;
}
trace("record count: " + j);
query.executeQuery(
"SELECT VISIT(ID), (SELECT SUM(X) FROM SYSTEM_RANGE(1, 10000) WHERE X<>ID) FROM TEST ORDER BY ID");
} catch (SQLException e) {
checkNotGeneralException(e);
// ignore cancelled statements
trace("record count: " + j);
}
if (j == 0) {
cancel.stopNow();
cancel.join();
if (lastVisited == 0) {
i += 10;
} else if (j == len) {
} else {
break;
}
}
......
......@@ -28,6 +28,7 @@ To use the same default settings as H2, use:
jdbc:hsqldb:data/test;hsqldb.default_table_type=cached;sql.enforce_size=true
Also, you need to execute the following statement:
SET WRITE_DELAY 1
No optimization for COUNT(*)
Derby
......@@ -38,3 +39,4 @@ See
http://db.apache.org/derby/javadoc/engine/org/apache/derby/iapi/reference/Property.html#FILESYNC_TRANSACTION_LOG
Missing features:
LIMIT OFFSET is not supported.
No optimization for COUNT(*)
......@@ -508,4 +508,5 @@ informs negotiations collectively omissions trial nor qualify steward neither
worldwide everyone additions expense lawsuit checksums jazoon flashback
dieguez dfile mvn dversion dgroup dpackaging dartifact durl dpom pom
subpackages slowed deactivate throttled noindex expired arizona export
intentional knowing jcl plug facade deployment logback confusion
intentional knowing jcl plug facade deployment logback confusion visited
pickle
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论