提交 70660686 authored 作者: Thomas Mueller's avatar Thomas Mueller

After an automatic re-connect, part of the session state stays.

上级 b514ec22
...@@ -173,6 +173,7 @@ public class CommandRemote implements CommandInterface { ...@@ -173,6 +173,7 @@ public class CommandRemote implements CommandInterface {
} }
} }
session.autoCommitIfCluster(); session.autoCommitIfCluster();
session.readSessionState();
return result; return result;
} }
} }
...@@ -198,6 +199,7 @@ public class CommandRemote implements CommandInterface { ...@@ -198,6 +199,7 @@ public class CommandRemote implements CommandInterface {
} }
session.setAutoCommit(autoCommit); session.setAutoCommit(autoCommit);
session.autoCommitIfCluster(); session.autoCommitIfCluster();
session.readSessionState();
return updateCount; return updateCount;
} }
} }
......
...@@ -88,6 +88,7 @@ public class Session implements SessionInterface { ...@@ -88,6 +88,7 @@ public class Session implements SessionInterface {
private int lastUncommittedDelete; private int lastUncommittedDelete;
private boolean commitOrRollbackDisabled; private boolean commitOrRollbackDisabled;
private Table waitForLock; private Table waitForLock;
private int modificationId;
public Session() { public Session() {
// nothing to do // nothing to do
...@@ -124,6 +125,7 @@ public class Session implements SessionInterface { ...@@ -124,6 +125,7 @@ public class Session implements SessionInterface {
*/ */
public void setVariable(String name, Value value) throws SQLException { public void setVariable(String name, Value value) throws SQLException {
initVariables(); initVariables();
modificationId++;
Value old; Value old;
if (value == ValueNull.INSTANCE) { if (value == ValueNull.INSTANCE) {
old = (Value) variables.remove(name); old = (Value) variables.remove(name);
...@@ -200,6 +202,7 @@ public class Session implements SessionInterface { ...@@ -200,6 +202,7 @@ public class Session implements SessionInterface {
if (localTempTables.get(table.getName()) != null) { if (localTempTables.get(table.getName()) != null) {
throw Message.getSQLException(ErrorCode.TABLE_OR_VIEW_ALREADY_EXISTS_1, table.getSQL()); throw Message.getSQLException(ErrorCode.TABLE_OR_VIEW_ALREADY_EXISTS_1, table.getSQL());
} }
modificationId++;
localTempTables.put(table.getName(), table); localTempTables.put(table.getName(), table);
} }
...@@ -209,6 +212,7 @@ public class Session implements SessionInterface { ...@@ -209,6 +212,7 @@ public class Session implements SessionInterface {
* @param table the table * @param table the table
*/ */
public void removeLocalTempTable(Table table) throws SQLException { public void removeLocalTempTable(Table table) throws SQLException {
modificationId++;
localTempTables.remove(table.getName()); localTempTables.remove(table.getName());
table.removeChildrenAndResources(this); table.removeChildrenAndResources(this);
} }
...@@ -534,6 +538,7 @@ public class Session implements SessionInterface { ...@@ -534,6 +538,7 @@ public class Session implements SessionInterface {
for (int i = 0; i < list.size(); i++) { for (int i = 0; i < list.size(); i++) {
Table table = (Table) list.get(i); Table table = (Table) list.get(i);
if (closeSession || table.getOnCommitDrop()) { if (closeSession || table.getOnCommitDrop()) {
modificationId++;
table.setModified(); table.setModified();
localTempTables.remove(table.getName()); localTempTables.remove(table.getName());
table.removeChildrenAndResources(this); table.removeChildrenAndResources(this);
...@@ -755,6 +760,7 @@ public class Session implements SessionInterface { ...@@ -755,6 +760,7 @@ public class Session implements SessionInterface {
} }
public void setCurrentSchema(Schema schema) { public void setCurrentSchema(Schema schema) {
modificationId++;
this.currentSchemaName = schema.getName(); this.currentSchemaName = schema.getName();
} }
...@@ -852,6 +858,7 @@ public class Session implements SessionInterface { ...@@ -852,6 +858,7 @@ public class Session implements SessionInterface {
} }
public void setSchemaSearchPath(String[] schemas) { public void setSchemaSearchPath(String[] schemas) {
modificationId++;
this.schemaSearchPath = schemas; this.schemaSearchPath = schemas;
} }
...@@ -971,4 +978,8 @@ public class Session implements SessionInterface { ...@@ -971,4 +978,8 @@ public class Session implements SessionInterface {
return waitForLock; return waitForLock;
} }
public int getModificationId() {
return modificationId;
}
} }
...@@ -59,6 +59,7 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -59,6 +59,7 @@ public class SessionRemote implements SessionInterface, DataHandler {
public static final int STATUS_ERROR = 0; public static final int STATUS_ERROR = 0;
public static final int STATUS_OK = 1; public static final int STATUS_OK = 1;
public static final int STATUS_CLOSED = 2; public static final int STATUS_CLOSED = 2;
public static final int STATUS_OK_STATE_CHANGED = 3;
private TraceSystem traceSystem; private TraceSystem traceSystem;
private Trace trace; private Trace trace;
...@@ -77,6 +78,9 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -77,6 +78,9 @@ public class SessionRemote implements SessionInterface, DataHandler {
private boolean autoReconnect; private boolean autoReconnect;
private int lastReconnect; private int lastReconnect;
private SessionInterface embedded; private SessionInterface embedded;
private boolean sessionStateChanged;
private ObjectArray sessionState;
private boolean sessionStateUpdating;
public SessionRemote() { public SessionRemote() {
// nothing to do // nothing to do
...@@ -424,6 +428,19 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -424,6 +428,19 @@ public class SessionRemote implements SessionInterface, DataHandler {
// unfortunately // unfortunately
connectEmbeddedOrServer(); connectEmbeddedOrServer();
} }
if (sessionState != null && sessionState.size() > 0) {
sessionStateUpdating = true;
try {
for (int i = 0; i < sessionState.size(); i++) {
String sql = (String) sessionState.get(i);
CommandInterface ci = prepareCommand(sql, Integer.MAX_VALUE);
ci.executeUpdate();
}
} finally {
sessionStateUpdating = false;
sessionStateChanged = false;
}
}
return true; return true;
} }
...@@ -496,6 +513,8 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -496,6 +513,8 @@ public class SessionRemote implements SessionInterface, DataHandler {
throw new JdbcSQLException(message, sql, sqlstate, errorCode, null, stackTrace); throw new JdbcSQLException(message, sql, sqlstate, errorCode, null, stackTrace);
} else if (status == STATUS_CLOSED) { } else if (status == STATUS_CLOSED) {
transferList = null; transferList = null;
} else if (status == STATUS_OK_STATE_CHANGED) {
sessionStateChanged = true;
} }
} }
...@@ -622,4 +641,21 @@ public class SessionRemote implements SessionInterface, DataHandler { ...@@ -622,4 +641,21 @@ public class SessionRemote implements SessionInterface, DataHandler {
return lastReconnect; return lastReconnect;
} }
/**
* Read the session state if necessary.
*/
public void readSessionState() throws SQLException {
if (!sessionStateChanged || sessionStateUpdating) {
return;
}
sessionStateChanged = false;
sessionState = new ObjectArray();
CommandInterface ci = prepareCommand("SELECT * FROM INFORMATION_SCHEMA.SESSION_STATE", Integer.MAX_VALUE);
ResultInterface result = ci.executeQuery(0, false);
while (result.next()) {
Value[] row = result.currentRow();
sessionState.add(row[1].getString());
}
}
} }
...@@ -208,13 +208,14 @@ public class TcpServerThread implements Runnable { ...@@ -208,13 +208,14 @@ public class TcpServerThread implements Runnable {
case SessionRemote.SESSION_PREPARE: { case SessionRemote.SESSION_PREPARE: {
int id = transfer.readInt(); int id = transfer.readInt();
String sql = transfer.readString(); String sql = transfer.readString();
int old = session.getModificationId();
Command command = session.prepareLocal(sql); Command command = session.prepareLocal(sql);
boolean readonly = command.isReadOnly(); boolean readonly = command.isReadOnly();
cache.addObject(id, command); cache.addObject(id, command);
boolean isQuery = command.isQuery(); boolean isQuery = command.isQuery();
ObjectArray params = command.getParameters(); ObjectArray params = command.getParameters();
int paramCount = params.size(); int paramCount = params.size();
transfer.writeInt(SessionRemote.STATUS_OK).writeBoolean(isQuery).writeBoolean(readonly) transfer.writeInt(getState(old)).writeBoolean(isQuery).writeBoolean(readonly)
.writeInt(paramCount); .writeInt(paramCount);
if (operation == SessionRemote.SESSION_PREPARE_READ_PARAMS) { if (operation == SessionRemote.SESSION_PREPARE_READ_PARAMS) {
for (int i = 0; i < paramCount; i++) { for (int i = 0; i < paramCount; i++) {
...@@ -235,8 +236,9 @@ public class TcpServerThread implements Runnable { ...@@ -235,8 +236,9 @@ public class TcpServerThread implements Runnable {
if (commit == null) { if (commit == null) {
commit = session.prepareLocal("COMMIT"); commit = session.prepareLocal("COMMIT");
} }
int old = session.getModificationId();
commit.executeUpdate(); commit.executeUpdate();
transfer.writeInt(SessionRemote.STATUS_OK).flush(); transfer.writeInt(getState(old)).flush();
break; break;
} }
case SessionRemote.COMMAND_GET_META_DATA: { case SessionRemote.COMMAND_GET_META_DATA: {
...@@ -260,10 +262,12 @@ public class TcpServerThread implements Runnable { ...@@ -260,10 +262,12 @@ public class TcpServerThread implements Runnable {
int fetchSize = transfer.readInt(); int fetchSize = transfer.readInt();
Command command = (Command) cache.getObject(id, false); Command command = (Command) cache.getObject(id, false);
setParameters(command); setParameters(command);
int old = session.getModificationId();
LocalResult result = command.executeQueryLocal(maxRows); LocalResult result = command.executeQueryLocal(maxRows);
cache.addObject(objectId, result); cache.addObject(objectId, result);
int columnCount = result.getVisibleColumnCount(); int columnCount = result.getVisibleColumnCount();
transfer.writeInt(SessionRemote.STATUS_OK).writeInt(columnCount); int state = getState(old);
transfer.writeInt(state).writeInt(columnCount);
int rowCount = result.getRowCount(); int rowCount = result.getRowCount();
transfer.writeInt(rowCount); transfer.writeInt(rowCount);
for (int i = 0; i < columnCount; i++) { for (int i = 0; i < columnCount; i++) {
...@@ -280,10 +284,13 @@ public class TcpServerThread implements Runnable { ...@@ -280,10 +284,13 @@ public class TcpServerThread implements Runnable {
int id = transfer.readInt(); int id = transfer.readInt();
Command command = (Command) cache.getObject(id, false); Command command = (Command) cache.getObject(id, false);
setParameters(command); setParameters(command);
int old = session.getModificationId();
int updateCount = command.executeUpdate(); int updateCount = command.executeUpdate();
int status = SessionRemote.STATUS_OK; int status;
if (session.isClosed()) { if (session.isClosed()) {
status = SessionRemote.STATUS_CLOSED; status = SessionRemote.STATUS_CLOSED;
} else {
status = getState(old);
} }
transfer.writeInt(status).writeInt(updateCount).writeBoolean(session.getAutoCommit()); transfer.writeInt(status).writeInt(updateCount).writeBoolean(session.getAutoCommit());
transfer.flush(); transfer.flush();
...@@ -344,6 +351,13 @@ public class TcpServerThread implements Runnable { ...@@ -344,6 +351,13 @@ public class TcpServerThread implements Runnable {
} }
} }
private int getState(int oldModificationId) {
if (session.getModificationId() == oldModificationId) {
return SessionRemote.STATUS_OK;
}
return SessionRemote.STATUS_OK_STATE_CHANGED;
}
private void sendRow(LocalResult result) throws IOException, SQLException { private void sendRow(LocalResult result) throws IOException, SQLException {
if (result.next()) { if (result.next()) {
transfer.writeBoolean(true); transfer.writeBoolean(true);
......
...@@ -32,7 +32,7 @@ public class TestAutoReconnect extends TestBase { ...@@ -32,7 +32,7 @@ public class TestAutoReconnect extends TestBase {
* @param a ignored * @param a ignored
*/ */
public static void main(String[] a) throws Exception { public static void main(String[] a) throws Exception {
new TestAutoReconnect().init().test(); TestBase.createCaller().init().test();
} }
private void restart() throws SQLException { private void restart() throws SQLException {
...@@ -98,6 +98,11 @@ public class TestAutoReconnect extends TestBase { ...@@ -98,6 +98,11 @@ public class TestAutoReconnect extends TestBase {
restart(); restart();
assertFalse(rs.next()); assertFalse(rs.next());
restart(); restart();
stat.execute("SET @TEST 10");
restart();
rs = stat.executeQuery("CALL @TEST");
rs.next();
assertEquals(10, rs.getInt(1));
stat.setFetchSize(10); stat.setFetchSize(10);
restart(); restart();
rs = stat.executeQuery("select * from system_range(1, 20)"); rs = stat.executeQuery("select * from system_range(1, 20)");
......
...@@ -30,7 +30,7 @@ public class TestAutoServer extends TestBase { ...@@ -30,7 +30,7 @@ public class TestAutoServer extends TestBase {
* @param a ignored * @param a ignored
*/ */
public static void main(String[] a) throws Exception { public static void main(String[] a) throws Exception {
new TestAutoServer().init().test(); TestBase.createCaller().init().test();
} }
public void test() throws Exception { public void test() throws Exception {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论