提交 8063302d authored 作者: Thomas Mueller's avatar Thomas Mueller

Cluster: in a two node cluster, if cluster node stopped, and autocommit was…

Cluster: in a two node cluster, if cluster node stopped, and autocommit was enabled, then changes on the remaining cluster node didn't get committed automatically.
上级 16894c19
...@@ -18,7 +18,9 @@ Change Log ...@@ -18,7 +18,9 @@ Change Log
<h1>Change Log</h1> <h1>Change Log</h1>
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>Issue 291: Allow org.h2.value.Value as FUNCTION ALIAS params and return value. <ul><li>Cluster: in a two node cluster, if cluster node stopped, and autocommit was enabled,
then changes on the remaining cluster node didn't get committed automatically.
</li><li>Issue 291: Allow org.h2.value.Value as FUNCTION ALIAS params and return value.
</li><li>Issue 265: Linked tables: auto-reconnect if the backside connection is lost </li><li>Issue 265: Linked tables: auto-reconnect if the backside connection is lost
(workaround for the MySQL problem that disconnects after 8 hours of inactivity). (workaround for the MySQL problem that disconnects after 8 hours of inactivity).
</li><li>Linked tables: the index conditions was sometimes not used when querying the remote database. </li><li>Linked tables: the index conditions was sometimes not used when querying the remote database.
......
...@@ -216,7 +216,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -216,7 +216,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
* Calls COMMIT if the session is in cluster mode. * Calls COMMIT if the session is in cluster mode.
*/ */
public void autoCommitIfCluster() { public void autoCommitIfCluster() {
if (autoCommit && transferList != null && transferList.size() > 1) { if (autoCommit && cluster) {
// server side auto commit is off because of race conditions // server side auto commit is off because of race conditions
// (update set id=1 where id=0, but update set id=2 where id=0 is // (update set id=1 where id=0, but update set id=2 where id=0 is
// faster) // faster)
...@@ -368,12 +368,13 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -368,12 +368,13 @@ public class SessionRemote extends SessionWithState implements DataHandler {
boolean switchOffCluster = false; boolean switchOffCluster = false;
try { try {
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
String s = servers[i];
try { try {
Transfer trans = initTransfer(ci, databaseName, servers[i]); Transfer trans = initTransfer(ci, databaseName, s);
transferList.add(trans); transferList.add(trans);
} catch (IOException e) { } catch (IOException e) {
if (len == 1) { if (len == 1) {
throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, e, e.getMessage()); throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, e + ": " + s, e.getMessage());
} }
switchOffCluster = true; switchOffCluster = true;
} }
...@@ -556,12 +557,12 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -556,12 +557,12 @@ public class SessionRemote extends SessionWithState implements DataHandler {
} }
/** /**
* Returns true if the connection is in cluster mode. * Returns true if the connection was opened in cluster mode.
* *
* @return true if it is * @return true if it is
*/ */
public boolean isClustered() { public boolean isClustered() {
return transferList.size() > 1; return cluster;
} }
public boolean isClosed() { public boolean isClosed() {
......
...@@ -322,7 +322,14 @@ public class JdbcConnection extends TraceObject implements Connection { ...@@ -322,7 +322,14 @@ public class JdbcConnection extends TraceObject implements Connection {
// roll back unless that would require to re-connect // roll back unless that would require to re-connect
// (the transaction can't be rolled back after re-connecting) // (the transaction can't be rolled back after re-connecting)
if (!session.isReconnectNeeded(true)) { if (!session.isReconnectNeeded(true)) {
rollbackInternal(); try {
rollbackInternal();
} catch (DbException e) {
// ignore if the connection is broken right now
if (e.getErrorCode() != ErrorCode.CONNECTION_BROKEN_1) {
throw e;
}
}
session.afterWriting(); session.afterWriting();
} }
closePreparedCommands(); closePreparedCommands();
......
...@@ -116,10 +116,12 @@ public class CreateCluster extends Tool { ...@@ -116,10 +116,12 @@ public class CreateCluster extends Tool {
if (e.getErrorCode() == ErrorCode.DATABASE_NOT_FOUND_1) { if (e.getErrorCode() == ErrorCode.DATABASE_NOT_FOUND_1) {
// database does not exists yet - ok // database does not exists yet - ok
exists = false; exists = false;
} else {
throw e;
} }
} }
if (exists) { if (exists) {
throw new SQLException("Target database must not yet exist. Please delete it first"); throw new SQLException("Target database must not yet exist. Please delete it first: " + urlTarget);
} }
// use cluster='' so connecting is possible // use cluster='' so connecting is possible
......
...@@ -35,12 +35,70 @@ public class TestCluster extends TestBase { ...@@ -35,12 +35,70 @@ public class TestCluster extends TestBase {
} }
public void test() throws Exception { public void test() throws Exception {
testRecover();
testRollback(); testRollback();
testCase(); testCase();
testCreateClusterAtRuntime(); testCreateClusterAtRuntime();
testStartStopCluster(); testStartStopCluster();
} }
private void testRecover() throws SQLException {
if (config.memory || config.networked || config.cipher != null) {
return;
}
int port1 = 9191, port2 = 9192;
String serverList = "localhost:" + port1 + ",localhost:" + port2;
deleteFiles();
org.h2.Driver.load();
String user = getUser(), password = getPassword();
Connection conn;
Statement stat;
ResultSet rs;
String url1 = "jdbc:h2:tcp://localhost:" + port1 + "/test";
String url2 = "jdbc:h2:tcp://localhost:" + port2 + "/test";
String urlCluster = "jdbc:h2:tcp://" + serverList + "/test";
Server n1 = org.h2.tools.Server.createTcpServer("-tcpPort", "" + port1, "-baseDir", getBaseDir() + "/node1").start();
Server n2 = org.h2.tools.Server.createTcpServer("-tcpPort", "" + port2 , "-baseDir", getBaseDir() + "/node2").start();
CreateCluster.main("-urlSource", url1, "-urlTarget", url2, "-user", user, "-password", password, "-serverList",
serverList);
conn = DriverManager.getConnection(urlCluster, user, password);
stat = conn.createStatement();
stat.execute("create table t1(id int, name varchar(30))");
stat.execute("insert into t1 values(1, 'a'), (2, 'b'), (3, 'c')");
rs = stat.executeQuery("select count(*) from t1");
rs.next();
assertEquals(3, rs.getInt(1));
n2.stop();
DeleteDbFiles.main("-dir", getBaseDir() + "/node2", "-quiet");
stat.execute("insert into t1 values(4, 'd'), (5, 'e')");
rs = stat.executeQuery("select count(*) from t1");
rs.next();
assertEquals(5, rs.getInt(1));
n2 = org.h2.tools.Server.createTcpServer("-tcpPort", "" + port2 , "-baseDir", getBaseDir() + "/node2").start();
CreateCluster.main("-urlSource", url1, "-urlTarget", url2, "-user", user, "-password", password, "-serverList",
serverList);
conn.close();
conn = DriverManager.getConnection(urlCluster, user, password);
stat = conn.createStatement();
rs = stat.executeQuery("select count(*) from t1");
rs.next();
assertEquals(5, rs.getInt(1));
n1.stop();
n2.stop();
deleteFiles();
}
private void testRollback() throws SQLException { private void testRollback() throws SQLException {
if (config.memory || config.networked || config.cipher != null) { if (config.memory || config.networked || config.cipher != null) {
return; return;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论