提交 8faf50a8 authored 作者: Thomas Mueller's avatar Thomas Mueller

New auto-reconnect feature

上级 a133bc7b
...@@ -38,6 +38,7 @@ public class CommandRemote implements CommandInterface { ...@@ -38,6 +38,7 @@ public class CommandRemote implements CommandInterface {
private boolean isQuery; private boolean isQuery;
private boolean readonly; private boolean readonly;
private int paramCount; private int paramCount;
private int created;
public CommandRemote(SessionRemote session, ObjectArray transferList, String sql, int fetchSize) throws SQLException { public CommandRemote(SessionRemote session, ObjectArray transferList, String sql, int fetchSize) throws SQLException {
this.transferList = transferList; this.transferList = transferList;
...@@ -49,6 +50,7 @@ public class CommandRemote implements CommandInterface { ...@@ -49,6 +50,7 @@ public class CommandRemote implements CommandInterface {
// need to close the object // need to close the object
this.session = session; this.session = session;
this.fetchSize = fetchSize; this.fetchSize = fetchSize;
created = session.getLastReconnect();
} }
private void prepare(SessionRemote s, boolean createParams) throws SQLException { private void prepare(SessionRemote s, boolean createParams) throws SQLException {
...@@ -82,7 +84,7 @@ public class CommandRemote implements CommandInterface { ...@@ -82,7 +84,7 @@ public class CommandRemote implements CommandInterface {
} }
} }
} catch (IOException e) { } catch (IOException e) {
s.removeServer(i--); s.removeServer(e, i--);
} }
} }
} }
...@@ -95,19 +97,27 @@ public class CommandRemote implements CommandInterface { ...@@ -95,19 +97,27 @@ public class CommandRemote implements CommandInterface {
return parameters; return parameters;
} }
public ResultInterface getMetaData() throws SQLException { private void prepareIfRequired() throws SQLException {
synchronized (session) { if (session.getLastReconnect() != created) {
session.checkClosed(); // in this case we need to prepare again in every case
if (!isQuery) { id = Integer.MIN_VALUE;
return null;
} }
session.checkClosed();
if (id <= session.getCurrentId() - SysProperties.SERVER_CACHED_OBJECTS) { if (id <= session.getCurrentId() - SysProperties.SERVER_CACHED_OBJECTS) {
// object is too old - we need to prepare again // object is too old - we need to prepare again
prepare(session, false); prepare(session, false);
} }
}
public ResultInterface getMetaData() throws SQLException {
synchronized (session) {
if (!isQuery) {
return null;
}
int objectId = session.getNextId(); int objectId = session.getNextId();
ResultRemote result = null; ResultRemote result = null;
for (int i = 0; i < transferList.size(); i++) { for (int i = 0; i < transferList.size(); i++) {
prepareIfRequired();
Transfer transfer = (Transfer) transferList.get(i); Transfer transfer = (Transfer) transferList.get(i);
try { try {
// TODO cluster: support load balance with values for each server / auto detect // TODO cluster: support load balance with values for each server / auto detect
...@@ -118,7 +128,7 @@ public class CommandRemote implements CommandInterface { ...@@ -118,7 +128,7 @@ public class CommandRemote implements CommandInterface {
result = new ResultRemote(session, transfer, objectId, columnCount, Integer.MAX_VALUE); result = new ResultRemote(session, transfer, objectId, columnCount, Integer.MAX_VALUE);
break; break;
} catch (IOException e) { } catch (IOException e) {
session.removeServer(i--); session.removeServer(e, i--);
} }
} }
session.autoCommitIfCluster(); session.autoCommitIfCluster();
...@@ -129,14 +139,10 @@ public class CommandRemote implements CommandInterface { ...@@ -129,14 +139,10 @@ public class CommandRemote implements CommandInterface {
public ResultInterface executeQuery(int maxRows, boolean scrollable) throws SQLException { public ResultInterface executeQuery(int maxRows, boolean scrollable) throws SQLException {
checkParameters(); checkParameters();
synchronized (session) { synchronized (session) {
session.checkClosed();
if (id <= session.getCurrentId() - SysProperties.SERVER_CACHED_OBJECTS) {
// object is too old - we need to prepare again
prepare(session, false);
}
int objectId = session.getNextId(); int objectId = session.getNextId();
ResultRemote result = null; ResultRemote result = null;
for (int i = 0; i < transferList.size(); i++) { for (int i = 0; i < transferList.size(); i++) {
prepareIfRequired();
Transfer transfer = (Transfer) transferList.get(i); Transfer transfer = (Transfer) transferList.get(i);
try { try {
// TODO cluster: support load balance with values for each // TODO cluster: support load balance with values for each
...@@ -163,7 +169,7 @@ public class CommandRemote implements CommandInterface { ...@@ -163,7 +169,7 @@ public class CommandRemote implements CommandInterface {
break; break;
} }
} catch (IOException e) { } catch (IOException e) {
session.removeServer(i--); session.removeServer(e, i--);
} }
} }
session.autoCommitIfCluster(); session.autoCommitIfCluster();
...@@ -174,16 +180,12 @@ public class CommandRemote implements CommandInterface { ...@@ -174,16 +180,12 @@ public class CommandRemote implements CommandInterface {
public int executeUpdate() throws SQLException { public int executeUpdate() throws SQLException {
checkParameters(); checkParameters();
synchronized (session) { synchronized (session) {
session.checkClosed();
if (id <= session.getCurrentId() - SysProperties.SERVER_CACHED_OBJECTS) {
// object is too old - we need to prepare again
prepare(session, false);
}
int updateCount = 0; int updateCount = 0;
boolean autoCommit = false; boolean autoCommit = false;
for (int i = 0; i < transferList.size(); i++) { for (int i = 0; i < transferList.size(); i++) {
try { prepareIfRequired();
Transfer transfer = (Transfer) transferList.get(i); Transfer transfer = (Transfer) transferList.get(i);
try {
session.traceOperation("COMMAND_EXECUTE_UPDATE", id); session.traceOperation("COMMAND_EXECUTE_UPDATE", id);
transfer.writeInt(SessionRemote.COMMAND_EXECUTE_UPDATE).writeInt(id); transfer.writeInt(SessionRemote.COMMAND_EXECUTE_UPDATE).writeInt(id);
sendParameters(transfer); sendParameters(transfer);
...@@ -191,7 +193,7 @@ public class CommandRemote implements CommandInterface { ...@@ -191,7 +193,7 @@ public class CommandRemote implements CommandInterface {
updateCount = transfer.readInt(); updateCount = transfer.readInt();
autoCommit = transfer.readBoolean(); autoCommit = transfer.readBoolean();
} catch (IOException e) { } catch (IOException e) {
session.removeServer(i--); session.removeServer(e, i--);
} }
} }
session.setAutoCommit(autoCommit); session.setAutoCommit(autoCommit);
......
...@@ -4039,6 +4039,10 @@ public class Parser { ...@@ -4039,6 +4039,10 @@ public class Parser {
readIfEqualOrTo(); readIfEqualOrTo();
read(); read();
return new NoOperation(session); return new NoOperation(session);
} else if (readIf("AUTO_RECONNECT")) {
readIfEqualOrTo();
read();
return new NoOperation(session);
} else if (readIf("ASSERT")) { } else if (readIf("ASSERT")) {
readIfEqualOrTo(); readIfEqualOrTo();
read(); read();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论