提交 5a84a557 authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental system property "h2.modifyOnWrite".

上级 d744354c
...@@ -174,7 +174,9 @@ public abstract class Command implements CommandInterface { ...@@ -174,7 +174,9 @@ public abstract class Command implements CommandInterface {
session.waitIfExclusiveModeEnabled(); session.waitIfExclusiveModeEnabled();
boolean writing = !isReadOnly(); boolean writing = !isReadOnly();
if (writing) { if (writing) {
database.beforeWriting(); while (!database.beforeWriting()) {
// wait
}
} }
synchronized (sync) { synchronized (sync) {
session.setCurrentCommand(this); session.setCurrentCommand(this);
...@@ -208,9 +210,14 @@ public abstract class Command implements CommandInterface { ...@@ -208,9 +210,14 @@ public abstract class Command implements CommandInterface {
Object sync = database.isMultiThreaded() ? (Object) session : (Object) database; Object sync = database.isMultiThreaded() ? (Object) session : (Object) database;
session.waitIfExclusiveModeEnabled(); session.waitIfExclusiveModeEnabled();
boolean callStop = true; boolean callStop = true;
database.beforeWriting(); boolean writing = !isReadOnly();
if (writing) {
while (!database.beforeWriting()) {
// wait
}
}
synchronized (sync) { synchronized (sync) {
int rollback = session.getLogId(); int rollback = session.getUndoLogPos();
session.setCurrentCommand(this); session.setCurrentCommand(this);
try { try {
while (true) { while (true) {
...@@ -248,7 +255,9 @@ public abstract class Command implements CommandInterface { ...@@ -248,7 +255,9 @@ public abstract class Command implements CommandInterface {
stop(); stop();
} }
} finally { } finally {
database.afterWriting(); if (writing) {
database.afterWriting();
}
} }
} }
} }
......
...@@ -61,6 +61,11 @@ public class Constants { ...@@ -61,6 +61,11 @@ public class Constants {
*/ */
public static final int TCP_PROTOCOL_VERSION_9 = 9; public static final int TCP_PROTOCOL_VERSION_9 = 9;
/**
* The TCP protocol version number 10.
*/
public static final int TCP_PROTOCOL_VERSION_10 = 10;
/** /**
* The major version of this database. * The major version of this database.
*/ */
......
...@@ -542,7 +542,7 @@ public class Session extends SessionWithState { ...@@ -542,7 +542,7 @@ public class Session extends SessionWithState {
} }
} }
public int getLogId() { public int getUndoLogPos() {
return undoLog.size(); return undoLog.size();
} }
...@@ -755,7 +755,7 @@ public class Session extends SessionWithState { ...@@ -755,7 +755,7 @@ public class Session extends SessionWithState {
if (savepoints == null) { if (savepoints == null) {
savepoints = database.newStringMap(); savepoints = database.newStringMap();
} }
savepoints.put(name, getLogId()); savepoints.put(name, getUndoLogPos());
} }
/** /**
......
...@@ -67,6 +67,13 @@ public interface SessionInterface extends Closeable { ...@@ -67,6 +67,13 @@ public interface SessionInterface extends Closeable {
*/ */
DataHandler getDataHandler(); DataHandler getDataHandler();
/**
* Get the undo log position.
*
* @return the position (0 means no pending transaction)
*/
int getUndoLogPos();
/** /**
* Cancel the current or next command (called when closing a connection). * Cancel the current or next command (called when closing a connection).
*/ */
......
...@@ -55,6 +55,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -55,6 +55,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
public static final int SESSION_CANCEL_STATEMENT = 13; public static final int SESSION_CANCEL_STATEMENT = 13;
public static final int SESSION_CHECK_KEY = 14; public static final int SESSION_CHECK_KEY = 14;
public static final int SESSION_SET_AUTOCOMMIT = 15; public static final int SESSION_SET_AUTOCOMMIT = 15;
public static final int SESSION_UNDO_LOG_POS = 16;
public static final int STATUS_ERROR = 0; public static final int STATUS_ERROR = 0;
public static final int STATUS_OK = 1; public static final int STATUS_OK = 1;
...@@ -94,7 +95,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -94,7 +95,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
trans.setSSL(ci.isSSL()); trans.setSSL(ci.isSSL());
trans.init(); trans.init();
trans.writeInt(Constants.TCP_PROTOCOL_VERSION_6); trans.writeInt(Constants.TCP_PROTOCOL_VERSION_6);
trans.writeInt(Constants.TCP_PROTOCOL_VERSION_9); trans.writeInt(Constants.TCP_PROTOCOL_VERSION_10);
trans.writeString(db); trans.writeString(db);
trans.writeString(ci.getOriginalURL()); trans.writeString(ci.getOriginalURL());
trans.writeString(ci.getUserName()); trans.writeString(ci.getUserName());
...@@ -120,6 +121,24 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -120,6 +121,24 @@ public class SessionRemote extends SessionWithState implements DataHandler {
return trans; return trans;
} }
public int getUndoLogPos() {
if (clientVersion < Constants.TCP_PROTOCOL_VERSION_10) {
return 1;
}
for (int i = 0, count = 0; i < transferList.size(); i++) {
Transfer transfer = transferList.get(i);
try {
traceOperation("SESSION_UNDO_LOG_POS", 0);
transfer.writeInt(SessionRemote.SESSION_UNDO_LOG_POS);
done(transfer);
return transfer.readInt();
} catch (IOException e) {
removeServer(e, i--, ++count);
}
}
return 1;
}
public void cancel() { public void cancel() {
// this method is called when closing the connection // this method is called when closing the connection
// the statement that is currently running is not canceled in this case // the statement that is currently running is not canceled in this case
...@@ -482,6 +501,7 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -482,6 +501,7 @@ public class SessionRemote extends SessionWithState implements DataHandler {
} }
public void close() { public void close() {
RuntimeException closeError = null;
if (transferList != null) { if (transferList != null) {
synchronized (this) { synchronized (this) {
for (Transfer transfer : transferList) { for (Transfer transfer : transferList) {
...@@ -490,6 +510,9 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -490,6 +510,9 @@ public class SessionRemote extends SessionWithState implements DataHandler {
transfer.writeInt(SessionRemote.SESSION_CLOSE); transfer.writeInt(SessionRemote.SESSION_CLOSE);
done(transfer); done(transfer);
transfer.close(); transfer.close();
} catch (RuntimeException e) {
trace.error(e, "close");
closeError = e;
} catch (Exception e) { } catch (Exception e) {
trace.error(e, "close"); trace.error(e, "close");
} }
...@@ -502,6 +525,9 @@ public class SessionRemote extends SessionWithState implements DataHandler { ...@@ -502,6 +525,9 @@ public class SessionRemote extends SessionWithState implements DataHandler {
embedded.close(); embedded.close();
embedded = null; embedded = null;
} }
if (closeError != null) {
throw closeError;
}
} }
public Trace getTrace() { public Trace getTrace() {
......
...@@ -345,15 +345,17 @@ public class JdbcConnection extends TraceObject implements Connection { ...@@ -345,15 +345,17 @@ public class JdbcConnection extends TraceObject implements Connection {
try { try {
if (!session.isClosed()) { if (!session.isClosed()) {
try { try {
// roll back unless that would require to re-connect if (session.getUndoLogPos() != 0) {
// (the transaction can't be rolled back after re-connecting) // roll back unless that would require to re-connect
if (!session.isReconnectNeeded(true)) { // (the transaction can't be rolled back after re-connecting)
try { if (!session.isReconnectNeeded(true)) {
rollbackInternal(); try {
} catch (DbException e) { rollbackInternal();
// ignore if the connection is broken right now } catch (DbException e) {
if (e.getErrorCode() != ErrorCode.CONNECTION_BROKEN_1) { // ignore if the connection is broken right now
throw e; if (e.getErrorCode() != ErrorCode.CONNECTION_BROKEN_1) {
throw e;
}
} }
} }
session.afterWriting(); session.afterWriting();
......
...@@ -71,12 +71,12 @@ public class TcpServerThread implements Runnable { ...@@ -71,12 +71,12 @@ public class TcpServerThread implements Runnable {
int minClientVersion = transfer.readInt(); int minClientVersion = transfer.readInt();
if (minClientVersion < Constants.TCP_PROTOCOL_VERSION_6) { if (minClientVersion < Constants.TCP_PROTOCOL_VERSION_6) {
throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_6); throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_6);
} else if (minClientVersion > Constants.TCP_PROTOCOL_VERSION_9) { } else if (minClientVersion > Constants.TCP_PROTOCOL_VERSION_10) {
throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_9); throw DbException.get(ErrorCode.DRIVER_VERSION_ERROR_2, "" + clientVersion, "" + Constants.TCP_PROTOCOL_VERSION_10);
} }
int maxClientVersion = transfer.readInt(); int maxClientVersion = transfer.readInt();
if (maxClientVersion >= Constants.TCP_PROTOCOL_VERSION_9) { if (maxClientVersion >= Constants.TCP_PROTOCOL_VERSION_10) {
clientVersion = Constants.TCP_PROTOCOL_VERSION_9; clientVersion = Constants.TCP_PROTOCOL_VERSION_10;
} else { } else {
clientVersion = minClientVersion; clientVersion = minClientVersion;
} }
...@@ -149,20 +149,32 @@ public class TcpServerThread implements Runnable { ...@@ -149,20 +149,32 @@ public class TcpServerThread implements Runnable {
private void closeSession() { private void closeSession() {
if (session != null) { if (session != null) {
RuntimeException closeError = null;
try { try {
Command rollback = session.prepareLocal("ROLLBACK"); Command rollback = session.prepareLocal("ROLLBACK");
rollback.executeUpdate(); rollback.executeUpdate();
} catch (RuntimeException e) {
closeError = e;
server.traceError(e);
} catch (Exception e) { } catch (Exception e) {
server.traceError(e); server.traceError(e);
} }
try { try {
session.close(); session.close();
server.removeConnection(threadId); server.removeConnection(threadId);
} catch (RuntimeException e) {
if (closeError == null) {
closeError = e;
server.traceError(e);
}
} catch (Exception e) { } catch (Exception e) {
server.traceError(e); server.traceError(e);
} finally { } finally {
session = null; session = null;
} }
if (closeError != null) {
throw closeError;
}
} }
} }
...@@ -173,12 +185,13 @@ public class TcpServerThread implements Runnable { ...@@ -173,12 +185,13 @@ public class TcpServerThread implements Runnable {
try { try {
stop = true; stop = true;
closeSession(); closeSession();
transfer.close();
trace("Close");
} catch (Exception e) { } catch (Exception e) {
server.traceError(e); server.traceError(e);
} finally {
transfer.close();
trace("Close");
server.remove(this);
} }
server.remove(this);
} }
private void sendError(Throwable t) { private void sendError(Throwable t) {
...@@ -241,6 +254,7 @@ public class TcpServerThread implements Runnable { ...@@ -241,6 +254,7 @@ public class TcpServerThread implements Runnable {
break; break;
} }
case SessionRemote.SESSION_CLOSE: { case SessionRemote.SESSION_CLOSE: {
stop = true;
closeSession(); closeSession();
transfer.writeInt(SessionRemote.STATUS_OK).flush(); transfer.writeInt(SessionRemote.STATUS_OK).flush();
close(); close();
...@@ -364,6 +378,11 @@ public class TcpServerThread implements Runnable { ...@@ -364,6 +378,11 @@ public class TcpServerThread implements Runnable {
transfer.writeInt(SessionRemote.STATUS_OK).flush(); transfer.writeInt(SessionRemote.STATUS_OK).flush();
break; break;
} }
case SessionRemote.SESSION_UNDO_LOG_POS: {
transfer.writeInt(SessionRemote.STATUS_OK).
writeInt(session.getUndoLogPos()).flush();
break;
}
default: default:
trace("Unknown operation: " + operation); trace("Unknown operation: " + operation);
closeSession(); closeSession();
......
...@@ -224,7 +224,7 @@ public class FileLock implements Runnable { ...@@ -224,7 +224,7 @@ public class FileLock implements Runnable {
transfer.setSocket(socket); transfer.setSocket(socket);
transfer.init(); transfer.init();
transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_6); transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_6);
transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_9); transfer.writeInt(Constants.TCP_PROTOCOL_VERSION_10);
transfer.writeString(null); transfer.writeString(null);
transfer.writeString(null); transfer.writeString(null);
transfer.writeString(id); transfer.writeString(id);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论