提交 c9eb1697 authored 作者: Thomas Mueller's avatar Thomas Mueller

Cluster: auto-commit was disabled after opening a connection.

Connection.getAutoCommit() is now much faster, specially when using the server mode.
上级 8c34c321
...@@ -188,7 +188,7 @@ public class CommandRemote implements CommandInterface { ...@@ -188,7 +188,7 @@ public class CommandRemote implements CommandInterface {
session.removeServer(e, i--, ++count); session.removeServer(e, i--, ++count);
} }
} }
session.setAutoCommit(autoCommit); session.setAutoCommitFromServer(autoCommit);
session.autoCommitIfCluster(); session.autoCommitIfCluster();
session.readSessionState(); session.readSessionState();
return updateCount; return updateCount;
......
...@@ -359,11 +359,6 @@ public class Session extends SessionWithState { ...@@ -359,11 +359,6 @@ public class Session extends SessionWithState {
return user; return user;
} }
/**
* Change the autocommit setting for this session.
*
* @param b the new value
*/
public void setAutoCommit(boolean b) { public void setAutoCommit(boolean b) {
autoCommit = b; autoCommit = b;
} }
......
...@@ -93,4 +93,18 @@ public interface SessionInterface { ...@@ -93,4 +93,18 @@ public interface SessionInterface {
*/ */
void afterWriting(); void afterWriting();
/**
* Check if this session is in auto-commit mode.
*
* @return true if the session is in auto-commit mode
*/
boolean getAutoCommit();
/**
* Set the auto-commit mode.
*
* @param autoCommit the new value
*/
void setAutoCommit(boolean autoCommit);
} }
...@@ -54,6 +54,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -54,6 +54,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
public static final int SESSION_SET_ID = 12; public static final int SESSION_SET_ID = 12;
public static final int SESSION_CANCEL_STATEMENT = 13; public static final int SESSION_CANCEL_STATEMENT = 13;
public static final int SESSION_CHECK_KEY = 14; public static final int SESSION_CHECK_KEY = 14;
public static final int SESSION_SET_AUTOCOMMIT = 15;
public static final int STATUS_ERROR = 0; public static final int STATUS_ERROR = 0;
public static final int STATUS_OK = 1; public static final int STATUS_OK = 1;
...@@ -67,7 +68,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -67,7 +68,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
private ArrayList<Transfer> transferList = New.arrayList(); private ArrayList<Transfer> transferList = New.arrayList();
private int nextId; private int nextId;
private boolean autoCommit = true; private boolean autoCommit = true;
private CommandInterface switchOffAutoCommit; private CommandInterface autoCommitFalse, autoCommitTrue;
private ConnectionInfo connectionInfo; private ConnectionInfo connectionInfo;
private String databaseName; private String databaseName;
private String cipher; private String cipher;
...@@ -93,7 +94,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -93,7 +94,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
trans.setSSL(ci.isSSL()); trans.setSSL(ci.isSSL());
trans.init(); trans.init();
trans.writeInt(Constants.TCP_PROTOCOL_VERSION_6); trans.writeInt(Constants.TCP_PROTOCOL_VERSION_6);
trans.writeInt(Constants.TCP_PROTOCOL_VERSION_7); trans.writeInt(Constants.TCP_PROTOCOL_VERSION_8);
trans.writeString(db); trans.writeString(db);
trans.writeString(ci.getOriginalURL()); trans.writeString(ci.getOriginalURL());
trans.writeString(ci.getUserName()); trans.writeString(ci.getUserName());
...@@ -108,6 +109,9 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -108,6 +109,9 @@ public class SessionRemote extends SessionWithState implements DataHandler {
done(trans); done(trans);
clientVersion = trans.readInt(); clientVersion = trans.readInt();
trans.setVersion(clientVersion); trans.setVersion(clientVersion);
trans.writeInt(SessionRemote.SESSION_SET_ID);
trans.writeString(sessionId);
done(trans);
} catch (DbException e) { } catch (DbException e) {
trans.close(); trans.close();
throw e; throw e;
...@@ -148,23 +152,66 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -148,23 +152,66 @@ public class SessionRemote extends SessionWithState implements DataHandler {
private void checkClusterDisableAutoCommit(String serverList) { private void checkClusterDisableAutoCommit(String serverList) {
if (autoCommit && transferList.size() > 1) { if (autoCommit && transferList.size() > 1) {
if (switchOffAutoCommit == null) { setAutoCommitSend(false);
switchOffAutoCommit = prepareCommand("SET AUTOCOMMIT FALSE", Integer.MAX_VALUE);
}
// this will call setAutoCommit(false)
switchOffAutoCommit.executeUpdate();
// so we need to switch it on
autoCommit = true;
CommandInterface c = prepareCommand("SET CLUSTER " + serverList, Integer.MAX_VALUE); CommandInterface c = prepareCommand("SET CLUSTER " + serverList, Integer.MAX_VALUE);
// this will set autoCommit to false
c.executeUpdate(); c.executeUpdate();
// so we need to switch it on
autoCommit = true;
cluster = true; cluster = true;
} }
} }
public boolean getAutoCommit() {
return autoCommit;
}
public void setAutoCommit(boolean autoCommit) { public void setAutoCommit(boolean autoCommit) {
if (!cluster) {
setAutoCommitSend(autoCommit);
}
this.autoCommit = autoCommit; this.autoCommit = autoCommit;
} }
public void setAutoCommitFromServer(boolean autoCommit) {
if (cluster) {
if (autoCommit) {
// the user executed SET AUTOCOMMIT TRUE
setAutoCommitSend(false);
this.autoCommit = true;
}
} else {
this.autoCommit = autoCommit;
}
}
private void setAutoCommitSend(boolean autoCommit) {
if (clientVersion >= Constants.TCP_PROTOCOL_VERSION_8) {
for (int i = 0, count = 0; i < transferList.size(); i++) {
Transfer transfer = transferList.get(i);
try {
traceOperation("SESSION_SET_AUTOCOMMIT", autoCommit ? 1 : 0);
transfer.writeInt(SessionRemote.SESSION_SET_AUTOCOMMIT).writeBoolean(autoCommit);
done(transfer);
} catch (IOException e) {
removeServer(e, i--, ++count);
}
}
} else {
if (autoCommit) {
if (autoCommitFalse == null) {
autoCommitFalse = prepareCommand("SET AUTOCOMMIT FALSE", Integer.MAX_VALUE);
}
autoCommitFalse.executeUpdate();
} else {
if (autoCommitTrue == null) {
autoCommitTrue = prepareCommand("SET AUTOCOMMIT TRUE", Integer.MAX_VALUE);
}
autoCommitTrue.executeUpdate();
}
}
}
/** /**
* Calls COMMIT if the session is in cluster mode. * Calls COMMIT if the session is in cluster mode.
*/ */
...@@ -320,6 +367,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -320,6 +367,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
String[] servers = StringUtils.arraySplit(server, ',', true); String[] servers = StringUtils.arraySplit(server, ',', true);
int len = servers.length; int len = servers.length;
transferList.clear(); transferList.clear();
sessionId = StringUtils.convertBytesToString(MathUtils.secureRandomBytes(32));
// TODO cluster: support more than 2 connections // TODO cluster: support more than 2 connections
boolean switchOffCluster = false; boolean switchOffCluster = false;
try { try {
......
...@@ -74,7 +74,6 @@ public class JdbcConnection extends TraceObject implements Connection { ...@@ -74,7 +74,6 @@ public class JdbcConnection extends TraceObject implements Connection {
private SessionInterface session; private SessionInterface session;
private CommandInterface commit, rollback; private CommandInterface commit, rollback;
private CommandInterface setAutoCommitTrue, setAutoCommitFalse, getAutoCommit;
private CommandInterface getReadOnly, getGeneratedKeys; private CommandInterface getReadOnly, getGeneratedKeys;
private CommandInterface setLockMode, getLockMode; private CommandInterface setLockMode, getLockMode;
private CommandInterface setQueryTimeout, getQueryTimeout; private CommandInterface setQueryTimeout, getQueryTimeout;
...@@ -329,9 +328,6 @@ public class JdbcConnection extends TraceObject implements Connection { ...@@ -329,9 +328,6 @@ public class JdbcConnection extends TraceObject implements Connection {
private void closePreparedCommands() { private void closePreparedCommands() {
commit = closeAndSetNull(commit); commit = closeAndSetNull(commit);
rollback = closeAndSetNull(rollback); rollback = closeAndSetNull(rollback);
setAutoCommitTrue = closeAndSetNull(setAutoCommitTrue);
setAutoCommitFalse = closeAndSetNull(setAutoCommitFalse);
getAutoCommit = closeAndSetNull(getAutoCommit);
getReadOnly = closeAndSetNull(getReadOnly); getReadOnly = closeAndSetNull(getReadOnly);
getGeneratedKeys = closeAndSetNull(getGeneratedKeys); getGeneratedKeys = closeAndSetNull(getGeneratedKeys);
getLockMode = closeAndSetNull(getLockMode); getLockMode = closeAndSetNull(getLockMode);
...@@ -362,13 +358,7 @@ public class JdbcConnection extends TraceObject implements Connection { ...@@ -362,13 +358,7 @@ public class JdbcConnection extends TraceObject implements Connection {
debugCode("setAutoCommit(" + autoCommit + ");"); debugCode("setAutoCommit(" + autoCommit + ");");
} }
checkClosed(); checkClosed();
if (autoCommit) { session.setAutoCommit(autoCommit);
setAutoCommitTrue = prepareCommand("SET AUTOCOMMIT TRUE", setAutoCommitTrue);
setAutoCommitTrue.executeUpdate();
} else {
setAutoCommitFalse = prepareCommand("SET AUTOCOMMIT FALSE", setAutoCommitFalse);
setAutoCommitFalse.executeUpdate();
}
} catch (Exception e) { } catch (Exception e) {
throw logAndConvert(e); throw logAndConvert(e);
} }
...@@ -385,21 +375,12 @@ public class JdbcConnection extends TraceObject implements Connection { ...@@ -385,21 +375,12 @@ public class JdbcConnection extends TraceObject implements Connection {
try { try {
checkClosed(); checkClosed();
debugCodeCall("getAutoCommit"); debugCodeCall("getAutoCommit");
return getInternalAutoCommit(); return session.getAutoCommit();
} catch (Exception e) { } catch (Exception e) {
throw logAndConvert(e); throw logAndConvert(e);
} }
} }
private boolean getInternalAutoCommit() {
getAutoCommit = prepareCommand("CALL AUTOCOMMIT()", getAutoCommit);
ResultInterface result = getAutoCommit.executeQuery(0, false);
result.next();
boolean autoCommit = result.currentRow()[0].getBoolean().booleanValue();
result.close();
return autoCommit;
}
/** /**
* Commits the current transaction. This call has only an effect if * Commits the current transaction. This call has only an effect if
* auto commit is switched off. * auto commit is switched off.
...@@ -1542,7 +1523,7 @@ public class JdbcConnection extends TraceObject implements Connection { ...@@ -1542,7 +1523,7 @@ public class JdbcConnection extends TraceObject implements Connection {
return false; return false;
} }
// force a network round trip (if networked) // force a network round trip (if networked)
getInternalAutoCommit(); getTransactionIsolation();
return true; return true;
} catch (Exception e) { } catch (Exception e) {
// this method doesn't throw an exception, but it logs it // this method doesn't throw an exception, but it logs it
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论