提交 8d59495d authored 作者: Thomas Mueller's avatar Thomas Mueller

New auto-reconnect feature

上级 8faf50a8
...@@ -52,7 +52,8 @@ public class ConnectionInfo implements Cloneable { ...@@ -52,7 +52,8 @@ public class ConnectionInfo implements Cloneable {
// TODO document these settings // TODO document these settings
String[] connectionTime = new String[] { "ACCESS_MODE_LOG", "ACCESS_MODE_DATA", "AUTOCOMMIT", "CIPHER", String[] connectionTime = new String[] { "ACCESS_MODE_LOG", "ACCESS_MODE_DATA", "AUTOCOMMIT", "CIPHER",
"CREATE", "CACHE_TYPE", "DB_CLOSE_ON_EXIT", "FILE_LOCK", "IGNORE_UNKNOWN_SETTINGS", "IFEXISTS", "CREATE", "CACHE_TYPE", "DB_CLOSE_ON_EXIT", "FILE_LOCK", "IGNORE_UNKNOWN_SETTINGS", "IFEXISTS",
"PASSWORD", "RECOVER", "STORAGE", "USER", "DATABASE_EVENT_LISTENER_OBJECT", "AUTO_SERVER" }; "PASSWORD", "RECOVER", "STORAGE", "USER", "DATABASE_EVENT_LISTENER_OBJECT", "AUTO_SERVER",
"AUTO_RECONNECT", "OPEN_NEW" };
for (int i = 0; i < connectionTime.length; i++) { for (int i = 0; i < connectionTime.length; i++) {
String key = connectionTime[i]; String key = connectionTime[i];
if (SysProperties.CHECK && KNOWN_SETTINGS.contains(key)) { if (SysProperties.CHECK && KNOWN_SETTINGS.contains(key)) {
...@@ -272,7 +273,7 @@ public class ConnectionInfo implements Cloneable { ...@@ -272,7 +273,7 @@ public class ConnectionInfo implements Cloneable {
* @param defaultValue the default value * @param defaultValue the default value
* @return the value * @return the value
*/ */
boolean removeProperty(String key, boolean defaultValue) { public boolean removeProperty(String key, boolean defaultValue) {
String x = removeProperty(key, null); String x = removeProperty(key, null);
return x == null ? defaultValue : Boolean.valueOf(x).booleanValue(); return x == null ? defaultValue : Boolean.valueOf(x).booleanValue();
} }
...@@ -301,7 +302,7 @@ public class ConnectionInfo implements Cloneable { ...@@ -301,7 +302,7 @@ public class ConnectionInfo implements Cloneable {
if (persistent) { if (persistent) {
String n = FileUtils.normalize(name + Constants.SUFFIX_DATA_FILE); String n = FileUtils.normalize(name + Constants.SUFFIX_DATA_FILE);
String fileName = FileUtils.getFileName(n); String fileName = FileUtils.getFileName(n);
if (fileName.length() < Constants.SUFFIX_DATA_FILE.length() + 2) { if (fileName.length() < Constants.SUFFIX_DATA_FILE.length() + 1) {
throw Message.getSQLException(ErrorCode.INVALID_DATABASE_NAME_1, name); throw Message.getSQLException(ErrorCode.INVALID_DATABASE_NAME_1, name);
} }
n = n.substring(0, n.length() - Constants.SUFFIX_DATA_FILE.length()); n = n.substring(0, n.length() - Constants.SUFFIX_DATA_FILE.length());
......
...@@ -24,6 +24,7 @@ import org.h2.result.ResultInterface; ...@@ -24,6 +24,7 @@ import org.h2.result.ResultInterface;
import org.h2.store.DataHandler; import org.h2.store.DataHandler;
import org.h2.store.FileStore; import org.h2.store.FileStore;
import org.h2.util.ByteUtils; import org.h2.util.ByteUtils;
import org.h2.util.ClassUtils;
import org.h2.util.FileUtils; import org.h2.util.FileUtils;
import org.h2.util.NetUtils; import org.h2.util.NetUtils;
import org.h2.util.ObjectArray; import org.h2.util.ObjectArray;
...@@ -61,7 +62,7 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -61,7 +62,7 @@ public class SessionRemote implements SessionInterface, DataHandler {
private TraceSystem traceSystem; private TraceSystem traceSystem;
private Trace trace; private Trace trace;
private ObjectArray transferList; private ObjectArray transferList = new ObjectArray();
private int nextId; private int nextId;
private boolean autoCommit = true; private boolean autoCommit = true;
private CommandInterface switchOffAutoCommit; private CommandInterface switchOffAutoCommit;
...@@ -72,7 +73,10 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -72,7 +73,10 @@ public class SessionRemote implements SessionInterface, DataHandler {
private byte[] fileEncryptionKey; private byte[] fileEncryptionKey;
private Object lobSyncObject = new Object(); private Object lobSyncObject = new Object();
private String sessionId; private String sessionId;
private int clientVersion = Constants.TCP_PROTOCOL_VERSION_5; private int clientVersion = Constants.TCP_PROTOCOL_VERSION_5;
private boolean autoReconnect;
private int lastReconnect;
private SessionInterface embedded;
public SessionRemote() { public SessionRemote() {
// nothing to do // nothing to do
...@@ -80,7 +84,6 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -80,7 +84,6 @@ public class SessionRemote implements SessionInterface, DataHandler {
private SessionRemote(ConnectionInfo ci) throws SQLException { private SessionRemote(ConnectionInfo ci) throws SQLException {
this.connectionInfo = ci; this.connectionInfo = ci;
connect();
} }
private Transfer initTransfer(ConnectionInfo ci, String db, String server) throws IOException, SQLException { private Transfer initTransfer(ConnectionInfo ci, String db, String server) throws IOException, SQLException {
...@@ -179,7 +182,7 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -179,7 +182,7 @@ public class SessionRemote implements SessionInterface, DataHandler {
transfer.writeInt(SessionRemote.COMMAND_COMMIT); transfer.writeInt(SessionRemote.COMMAND_COMMIT);
done(transfer); done(transfer);
} catch (IOException e) { } catch (IOException e) {
removeServer(i--); removeServer(e, i--);
} }
} }
} }
...@@ -209,11 +212,49 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -209,11 +212,49 @@ public class SessionRemote implements SessionInterface, DataHandler {
} }
public SessionInterface createSession(ConnectionInfo ci) throws SQLException { public SessionInterface createSession(ConnectionInfo ci) throws SQLException {
return new SessionRemote(ci); return new SessionRemote(ci).connectEmbeddedOrServer();
} }
private void connect() throws SQLException { private SessionInterface connectEmbeddedOrServer() throws SQLException {
ConnectionInfo ci = connectionInfo; ConnectionInfo ci = connectionInfo;
if (ci.isRemote()) {
connectServer(ci);
return this;
}
// create the session using reflection,
// so that the JDBC layer can be compiled without it
boolean autoServerMode = Boolean.valueOf(ci.getProperty("AUTO_SERVER", "false")).booleanValue();
ConnectionInfo backup = null;
try {
if (autoServerMode) {
backup = (ConnectionInfo) ci.clone();
connectionInfo = (ConnectionInfo) ci.clone();
}
SessionInterface si = (SessionInterface) ClassUtils.loadSystemClass("org.h2.engine.Session").newInstance();
return si.createSession(ci);
} catch (SQLException e) {
int errorCode = e.getErrorCode();
if (errorCode == ErrorCode.DATABASE_ALREADY_OPEN_1) {
if (autoServerMode) {
String serverKey = (String) ((JdbcSQLException) e).getPayload();
if (serverKey != null) {
backup.setServerKey(serverKey);
// OPEN_NEW must be removed now, otherwise
// opening a session with AUTO_SERVER fails
// if another connection is already open
backup.removeProperty("OPEN_NEW", false);
connectServer(backup);
return this;
}
}
}
throw e;
} catch (Exception e) {
throw Message.convert(e);
}
}
private void connectServer(ConnectionInfo ci) throws SQLException {
String name = ci.getName(); String name = ci.getName();
if (name.startsWith("//")) { if (name.startsWith("//")) {
name = name.substring("//".length()); name = name.substring("//".length());
...@@ -243,11 +284,14 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -243,11 +284,14 @@ public class SessionRemote implements SessionInterface, DataHandler {
throw Message.convert(e); throw Message.convert(e);
} }
trace = traceSystem.getTrace(Trace.JDBC); trace = traceSystem.getTrace(Trace.JDBC);
transferList = new ObjectArray(); String serverList = null;
String serverlist = null;
if (server.indexOf(',') >= 0) { if (server.indexOf(',') >= 0) {
serverlist = StringUtils.quoteStringSQL(server); serverList = StringUtils.quoteStringSQL(server);
ci.setProperty("CLUSTER", serverlist); ci.setProperty("CLUSTER", serverList);
}
autoReconnect = Boolean.valueOf(ci.getProperty("AUTO_RECONNECT", "false")).booleanValue();
if (autoReconnect && serverList != null) {
throw Message.getSQLException(ErrorCode.FEATURE_NOT_SUPPORTED);
} }
cipher = ci.getProperty("CIPHER"); cipher = ci.getProperty("CIPHER");
if (cipher != null) { if (cipher != null) {
...@@ -255,7 +299,7 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -255,7 +299,7 @@ public class SessionRemote implements SessionInterface, DataHandler {
} }
String[] servers = StringUtils.arraySplit(server, ',', true); String[] servers = StringUtils.arraySplit(server, ',', true);
int len = servers.length; int len = servers.length;
transferList = new ObjectArray(); transferList.clear();
// TODO cluster: support at most 2 connections // TODO cluster: support at most 2 connections
boolean switchOffCluster = false; boolean switchOffCluster = false;
try { try {
...@@ -289,8 +333,8 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -289,8 +333,8 @@ public class SessionRemote implements SessionInterface, DataHandler {
ResultInterface result = command.executeQuery(1, false); ResultInterface result = command.executeQuery(1, false);
if (result.next()) { if (result.next()) {
Value[] v = result.currentRow(); Value[] v = result.currentRow();
String version = v[0].getString(); int version = v[0].getInt();
if (version.compareTo("71") > 0) { if (version > 71) {
clientVersion = Constants.TCP_PROTOCOL_VERSION_6; clientVersion = Constants.TCP_PROTOCOL_VERSION_6;
} }
} }
...@@ -327,10 +371,14 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -327,10 +371,14 @@ public class SessionRemote implements SessionInterface, DataHandler {
* Remove a server from the list of cluster nodes and disables the cluster * Remove a server from the list of cluster nodes and disables the cluster
* mode. * mode.
* *
* @param e the exception (used for debugging)
* @param i the index of the server to remove * @param i the index of the server to remove
*/ */
public void removeServer(int i) throws SQLException { public void removeServer(IOException e, int i) throws SQLException {
transferList.remove(i); transferList.remove(i);
if (autoReconnect()) {
return;
}
checkClosed(); checkClosed();
switchOffCluster(); switchOffCluster();
} }
...@@ -341,6 +389,32 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -341,6 +389,32 @@ public class SessionRemote implements SessionInterface, DataHandler {
return new CommandRemote(this, transferList, sql, fetchSize); return new CommandRemote(this, transferList, sql, fetchSize);
} }
} }
/**
* Automatically re-connect if necessary and if configured to do so.
*
* @return true if reconnected
*/
public boolean autoReconnect() throws SQLException {
if (!isClosed()) {
return false;
}
if (!autoReconnect || !autoCommit) {
return false;
}
lastReconnect++;
embedded = connectEmbeddedOrServer();
if (embedded == this) {
// connected to a server somewhere else
embedded = null;
} else {
// opened an embedded connection now -
// must connect to this database in server mode
// unfortunately
connectEmbeddedOrServer();
}
return true;
}
/** /**
* Check if this session is closed and throws an exception if so. * Check if this session is closed and throws an exception if so.
...@@ -349,12 +423,11 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -349,12 +423,11 @@ public class SessionRemote implements SessionInterface, DataHandler {
*/ */
public void checkClosed() throws SQLException { public void checkClosed() throws SQLException {
if (isClosed()) { if (isClosed()) {
// TODO broken connection: try to reconnect automatically
throw Message.getSQLException(ErrorCode.CONNECTION_BROKEN); throw Message.getSQLException(ErrorCode.CONNECTION_BROKEN);
} }
} }
public void close() { public void close() throws SQLException {
if (transferList != null) { if (transferList != null) {
synchronized (this) { synchronized (this) {
for (int i = 0; i < transferList.size(); i++) { for (int i = 0; i < transferList.size(); i++) {
...@@ -372,6 +445,10 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -372,6 +445,10 @@ public class SessionRemote implements SessionInterface, DataHandler {
transferList = null; transferList = null;
} }
traceSystem.close(); traceSystem.close();
if (embedded != null) {
embedded.close();
embedded = null;
}
} }
public Trace getTrace() { public Trace getTrace() {
...@@ -530,4 +607,8 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -530,4 +607,8 @@ public class SessionRemote implements SessionInterface, DataHandler {
return clientVersion; return clientVersion;
} }
public int getLastReconnect() {
return lastReconnect;
}
} }
...@@ -35,7 +35,6 @@ import org.h2.message.Message; ...@@ -35,7 +35,6 @@ import org.h2.message.Message;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.message.TraceObject; import org.h2.message.TraceObject;
import org.h2.result.ResultInterface; import org.h2.result.ResultInterface;
import org.h2.util.ClassUtils;
import org.h2.util.JdbcConnectionListener; import org.h2.util.JdbcConnectionListener;
import org.h2.value.Value; import org.h2.value.Value;
import org.h2.value.ValueInt; import org.h2.value.ValueInt;
...@@ -93,42 +92,15 @@ public class JdbcConnection extends TraceObject implements Connection { ...@@ -93,42 +92,15 @@ public class JdbcConnection extends TraceObject implements Connection {
*/ */
public JdbcConnection(ConnectionInfo ci, boolean useBaseDir) throws SQLException { public JdbcConnection(ConnectionInfo ci, boolean useBaseDir) throws SQLException {
try { try {
checkJavaVersion(); if (useBaseDir) {
if (ci.isRemote()) { String baseDir = SysProperties.getBaseDir();
session = new SessionRemote().createSession(ci); if (baseDir != null) {
} else { ci.setBaseDir(baseDir);
// create the session using reflection,
// so that the JDBC layer can be compiled without it
SessionInterface si = (SessionInterface) ClassUtils.loadSystemClass("org.h2.engine.Session").newInstance();
if (useBaseDir) {
String baseDir = SysProperties.getBaseDir();
if (baseDir != null) {
ci.setBaseDir(baseDir);
}
}
boolean autoServerMode = Boolean.valueOf(ci.getProperty("AUTO_SERVER", "false")).booleanValue();
ConnectionInfo backup = null;
if (autoServerMode) {
backup = (ConnectionInfo) ci.clone();
}
try {
session = si.createSession(ci);
} catch (SQLException e) {
int errorCode = e.getErrorCode();
if (errorCode == ErrorCode.DATABASE_ALREADY_OPEN_1) {
if (autoServerMode) {
String serverKey = (String) ((JdbcSQLException) e).getPayload();
if (serverKey != null) {
backup.setServerKey(serverKey);
session = new SessionRemote().createSession(backup);
}
}
}
if (session == null) {
throw e;
}
} }
} }
checkJavaVersion();
// this will return an embedded or server connection
session = new SessionRemote().createSession(ci);
trace = session.getTrace(); trace = session.getTrace();
int id = getNextId(TraceObject.CONNECTION); int id = getNextId(TraceObject.CONNECTION);
setTrace(trace, TraceObject.CONNECTION, id); setTrace(trace, TraceObject.CONNECTION, id);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论