提交 312d476d authored 作者: noelgrandin's avatar noelgrandin

Issue 475: PgServer: add support for CancelRequest, patch from Andrew Franklin

上级 14182750
...@@ -35,6 +35,7 @@ Change Log ...@@ -35,6 +35,7 @@ Change Log
</li><li>Improve error message when check constraint is broken, test case from Gili (cowwoc) </li><li>Improve error message when check constraint is broken, test case from Gili (cowwoc)
</li><li>Improve error message when we have a unique constraint violation, displays the offending key in the error message </li><li>Improve error message when we have a unique constraint violation, displays the offending key in the error message
</li><li>Issue 478: Support for "SHOW TRANSACTION ISOLATION LEVEL", patch from Andrew Franklin </li><li>Issue 478: Support for "SHOW TRANSACTION ISOLATION LEVEL", patch from Andrew Franklin
</li><li>Issue 475: PgServer: add support for CancelRequest, patch from Andrew Franklin
</li></ul> </li></ul>
<h2>Version 1.3.172 (2013-05-25)</h2> <h2>Version 1.3.172 (2013-05-25)</h2>
......
...@@ -39,6 +39,7 @@ public class JdbcStatement extends TraceObject implements Statement { ...@@ -39,6 +39,7 @@ public class JdbcStatement extends TraceObject implements Statement {
private int lastExecutedCommandType; private int lastExecutedCommandType;
private ArrayList<String> batchCommands; private ArrayList<String> batchCommands;
private boolean escapeProcessing = true; private boolean escapeProcessing = true;
private boolean cancelled = false;
JdbcStatement(JdbcConnection conn, int id, int resultSetType, int resultSetConcurrency, boolean closeWithResultSet) { JdbcStatement(JdbcConnection conn, int id, int resultSetType, int resultSetConcurrency, boolean closeWithResultSet) {
this.conn = conn; this.conn = conn;
...@@ -530,6 +531,7 @@ public class JdbcStatement extends TraceObject implements Statement { ...@@ -530,6 +531,7 @@ public class JdbcStatement extends TraceObject implements Statement {
try { try {
if (c != null) { if (c != null) {
c.cancel(); c.cancel();
cancelled = true;
} }
} finally { } finally {
setExecutingStatement(null); setExecutingStatement(null);
...@@ -539,6 +541,13 @@ public class JdbcStatement extends TraceObject implements Statement { ...@@ -539,6 +541,13 @@ public class JdbcStatement extends TraceObject implements Statement {
} }
} }
/**
* @return true if this statement has been cancelled.
*/
public boolean isCancelled() {
return cancelled;
}
/** /**
* Gets the current query timeout in seconds. * Gets the current query timeout in seconds.
* This method will return 0 if no query timeout is set. * This method will return 0 if no query timeout is set.
......
...@@ -19,6 +19,7 @@ import java.sql.Types; ...@@ -19,6 +19,7 @@ import java.sql.Types;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.engine.Constants; import org.h2.engine.Constants;
import org.h2.message.DbException; import org.h2.message.DbException;
import org.h2.server.Service; import org.h2.server.Service;
...@@ -78,6 +79,7 @@ public class PgServer implements Service { ...@@ -78,6 +79,7 @@ public class PgServer implements Service {
private boolean trace; private boolean trace;
private ServerSocket serverSocket; private ServerSocket serverSocket;
private final Set<PgServerThread> running = Collections.synchronizedSet(new HashSet<PgServerThread>()); private final Set<PgServerThread> running = Collections.synchronizedSet(new HashSet<PgServerThread>());
private final AtomicInteger pid= new AtomicInteger(0);
private String baseDir; private String baseDir;
private boolean allowOthers; private boolean allowOthers;
private boolean isDaemon; private boolean isDaemon;
...@@ -191,7 +193,7 @@ public class PgServer implements Service { ...@@ -191,7 +193,7 @@ public class PgServer implements Service {
} else { } else {
PgServerThread c = new PgServerThread(s, this); PgServerThread c = new PgServerThread(s, this);
running.add(c); running.add(c);
c.setProcessId(running.size()); c.setProcessId(pid.incrementAndGet());
Thread thread = new Thread(c, threadName+" thread"); Thread thread = new Thread(c, threadName+" thread");
thread.setDaemon(isDaemon); thread.setDaemon(isDaemon);
c.setThread(thread); c.setThread(thread);
...@@ -252,6 +254,18 @@ public class PgServer implements Service { ...@@ -252,6 +254,18 @@ public class PgServer implements Service {
} }
} }
/**
* @return the thread with the given process-id
*/
PgServerThread getThread(int processId) {
for (PgServerThread c : New.arrayList(running)) {
if (c.getProcessId()==processId) {
return c;
}
}
return null;
}
String getBaseDir() { String getBaseDir() {
return baseDir; return baseDir;
} }
......
...@@ -28,6 +28,7 @@ import java.sql.Statement; ...@@ -28,6 +28,7 @@ import java.sql.Statement;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Properties; import java.util.Properties;
import java.util.Random;
import org.h2.command.CommandInterface; import org.h2.command.CommandInterface;
import org.h2.constant.SysProperties; import org.h2.constant.SysProperties;
import org.h2.engine.ConnectionInfo; import org.h2.engine.ConnectionInfo;
...@@ -62,6 +63,8 @@ public class PgServerThread implements Runnable { ...@@ -62,6 +63,8 @@ public class PgServerThread implements Runnable {
private String userName; private String userName;
private String databaseName; private String databaseName;
private int processId; private int processId;
private int secret;
private JdbcStatement activeRequest;
private String clientEncoding = SysProperties.PG_DEFAULT_CLIENT_ENCODING; private String clientEncoding = SysProperties.PG_DEFAULT_CLIENT_ENCODING;
private String dateStyle = "ISO"; private String dateStyle = "ISO";
private final HashMap<String, Prepared> prepared = new CaseInsensitiveMap<Prepared>(); private final HashMap<String, Prepared> prepared = new CaseInsensitiveMap<Prepared>();
...@@ -70,6 +73,7 @@ public class PgServerThread implements Runnable { ...@@ -70,6 +73,7 @@ public class PgServerThread implements Runnable {
PgServerThread(Socket socket, PgServer server) { PgServerThread(Socket socket, PgServer server) {
this.server = server; this.server = server;
this.socket = socket; this.socket = socket;
this.secret = new Random().nextInt();
} }
@Override @Override
...@@ -142,9 +146,16 @@ public class PgServerThread implements Runnable { ...@@ -142,9 +146,16 @@ public class PgServerThread implements Runnable {
server.trace("Init"); server.trace("Init");
int version = readInt(); int version = readInt();
if (version == 80877102) { if (version == 80877102) {
server.trace("CancelRequest (not supported)"); server.trace("CancelRequest");
server.trace(" pid: " + readInt()); int pid = readInt();
server.trace(" key: " + readInt()); int key = readInt();
PgServerThread c = server.getThread(pid);
if (c!=null) {
c.cancelRequest(key);
} else {
sendErrorResponse("unknown process: "+pid);
}
close();
} else if (version == 80877103) { } else if (version == 80877103) {
server.trace("SSLRequest"); server.trace("SSLRequest");
out.write('N'); out.write('N');
...@@ -328,6 +339,7 @@ public class PgServerThread implements Runnable { ...@@ -328,6 +339,7 @@ public class PgServerThread implements Runnable {
server.trace(prepared.sql); server.trace(prepared.sql);
try { try {
prep.setMaxRows(maxRows); prep.setMaxRows(maxRows);
setActiveRequest(prep);
boolean result = prep.execute(); boolean result = prep.execute();
if (result) { if (result) {
try { try {
...@@ -345,8 +357,14 @@ public class PgServerThread implements Runnable { ...@@ -345,8 +357,14 @@ public class PgServerThread implements Runnable {
sendCommandComplete(prep, prep.getUpdateCount()); sendCommandComplete(prep, prep.getUpdateCount());
} }
} catch (Exception e) { } catch (Exception e) {
if (prep.isCancelled()) {
sendCancelQueryResponse();
} else {
sendErrorResponse(e); sendErrorResponse(e);
} }
} finally {
setActiveRequest(null);
}
break; break;
} }
case 'S': { case 'S': {
...@@ -367,6 +385,7 @@ public class PgServerThread implements Runnable { ...@@ -367,6 +385,7 @@ public class PgServerThread implements Runnable {
} }
s = getSQL(s); s = getSQL(s);
stat = (JdbcStatement) conn.createStatement(); stat = (JdbcStatement) conn.createStatement();
setActiveRequest(stat);
boolean result = stat.execute(s); boolean result = stat.execute(s);
if (result) { if (result) {
ResultSet rs = stat.getResultSet(); ResultSet rs = stat.getResultSet();
...@@ -385,10 +404,15 @@ public class PgServerThread implements Runnable { ...@@ -385,10 +404,15 @@ public class PgServerThread implements Runnable {
sendCommandComplete(stat, stat.getUpdateCount()); sendCommandComplete(stat, stat.getUpdateCount());
} }
} catch (SQLException e) { } catch (SQLException e) {
if (stat!=null && stat.isCancelled()) {
sendCancelQueryResponse();
} else {
sendErrorResponse(e); sendErrorResponse(e);
}
break; break;
} finally { } finally {
JdbcUtils.closeSilently(stat); JdbcUtils.closeSilently(stat);
setActiveRequest(null);
} }
} }
sendReadyForQuery(); sendReadyForQuery();
...@@ -513,6 +537,19 @@ public class PgServerThread implements Runnable { ...@@ -513,6 +537,19 @@ public class PgServerThread implements Runnable {
sendMessage(); sendMessage();
} }
private void sendCancelQueryResponse() throws IOException {
server.trace("CancelSuccessResponse");
startMessage('E');
write('S');
writeString("ERROR");
write('C');
writeString("57014");
write('M');
writeString("canceling statement due to user request");
write(0);
sendMessage();
}
private void sendParameterDescription(Prepared p) throws IOException { private void sendParameterDescription(Prepared p) throws IOException {
try { try {
PreparedStatement prep = p.prep; PreparedStatement prep = p.prep;
...@@ -746,7 +783,7 @@ public class PgServerThread implements Runnable { ...@@ -746,7 +783,7 @@ public class PgServerThread implements Runnable {
private void sendBackendKeyData() throws IOException { private void sendBackendKeyData() throws IOException {
startMessage('K'); startMessage('K');
writeInt(processId); writeInt(processId);
writeInt(processId); writeInt(secret);
sendMessage(); sendMessage();
} }
...@@ -811,6 +848,34 @@ public class PgServerThread implements Runnable { ...@@ -811,6 +848,34 @@ public class PgServerThread implements Runnable {
this.processId = id; this.processId = id;
} }
int getProcessId() {
return this.processId;
}
synchronized void setActiveRequest(JdbcStatement statement) {
activeRequest = statement;
}
/**
* Kill a currently running query on this thread.
* @param secret the private key of the command
* @return true if the command was successfully killed
*/
synchronized boolean cancelRequest(int secret) throws IOException {
if (activeRequest != null)
{
if (secret != this.secret) throw new IOException("invalid cancel secret");
try {
activeRequest.cancel();
activeRequest = null;
} catch (SQLException e) {
throw DbException.convert(e);
}
return true;
}
return false;
}
/** /**
* Represents a PostgreSQL Prepared object. * Represents a PostgreSQL Prepared object.
*/ */
...@@ -857,5 +922,4 @@ public class PgServerThread implements Runnable { ...@@ -857,5 +922,4 @@ public class PgServerThread implements Runnable {
*/ */
Prepared prep; Prepared prep;
} }
} }
...@@ -16,6 +16,7 @@ import java.sql.ResultSetMetaData; ...@@ -16,6 +16,7 @@ import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import java.sql.Types; import java.sql.Types;
import java.util.concurrent.*;
import org.h2.test.TestBase; import org.h2.test.TestBase;
import org.h2.tools.Server; import org.h2.tools.Server;
...@@ -50,6 +51,50 @@ public class TestPgServer extends TestBase { ...@@ -50,6 +51,50 @@ public class TestPgServer extends TestBase {
} finally { } finally {
server.stop(); server.stop();
} }
}
private void testCancelQuery() throws Exception {
Server server = Server.createPgServer("-pgPort", "5535", "-pgDaemon", "-key", "test", "mem:test");
server.start();
ExecutorService executor = Executors.newSingleThreadExecutor();
try {
Class.forName("org.postgresql.Driver");
Connection conn = DriverManager.getConnection("jdbc:postgresql://localhost:5535/test", "sa", "sa");
final Statement stat = conn.createStatement();
stat.execute("create alias sleep for \"java.lang.Thread.sleep\"");
// create a table with 200 rows (cancel interval is 127)
stat.execute("create table test(id int)");
for (int i=0; i<200; i++) {
stat.execute("insert into test (id) values (rand())");
}
Future<Boolean> future = executor.submit(new Callable<Boolean>() {
public Boolean call() throws Exception {
return stat.execute("select id, sleep(5) from test");
}
});
// give it a little time to start and then cancel it
Thread.sleep(100);
stat.cancel();
try {
future.get();
throw new IllegalStateException();
} catch (ExecutionException e) {
assertStartsWith(e.getCause().getMessage(), "ERROR: Statement was canceled");
} finally {
conn.close();
}
} catch (ClassNotFoundException e) {
println("PostgreSQL JDBC driver not found - PgServer not tested");
} finally {
server.stop();
executor.shutdown();
}
deleteDb("test"); deleteDb("test");
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论