提交 a263a7d0 authored 作者: Thomas Mueller's avatar Thomas Mueller

More bugs in the server-less multi-connection mode have been fixed.

上级 b4ee1336
......@@ -18,7 +18,11 @@ Change Log
<h1>Change Log</h1>
<h2>Next Version (unreleased)</h2>
<ul><li>User defined functions: the source code is now available using
<ul><li>More bugs in the server-less multi-connection mode have been fixed.
</li><li>When running against an old database, the SCRIPT statement could generate a
SQL script that contained duplicate indexes (PRIMARY_KEY_E).
</li><li>JdbcConnectionPool.getConnection() could throw a NullPointerException.
</li><li>User defined functions: the source code is now available using
SELECT SOURCE FROM INFORMATION_SCHEMA.FUNCTION_ALIASES.
</li><li>User defined functions with source code didn't work after re-opening the database.
</li><li>The newsfeeds are now Atom 1.0 standard compliant.
......
......@@ -169,8 +169,9 @@ public class Database implements DataHandler {
private Properties reconnectLastLock;
private volatile long reconnectCheckNext;
private volatile boolean reconnectChangePending;
private volatile boolean checkpointAllowed;
private volatile int checkpointAllowed;
private volatile boolean checkpointRunning;
private final Object reconnectSync = new Object();
private int cacheSize;
private boolean compactFully;
private SourceCompiler compiler;
......@@ -467,7 +468,7 @@ public class Database implements DataHandler {
}
}
Engine.getInstance().close(databaseName);
throw Message.getSQLException(ErrorCode.SIMULATED_POWER_OFF);
throw Message.getSQLException(ErrorCode.DATABASE_IS_CLOSED);
}
/**
......@@ -719,6 +720,7 @@ public class Database implements DataHandler {
}
systemSession.commit(true);
traceSystem.getTrace(Trace.DATABASE).info("opened " + databaseName);
afterWriting();
}
public Schema getMainSchema() {
......@@ -1289,7 +1291,7 @@ public class Database implements DataHandler {
pageStore.compact(compactFully);
}
} catch (SQLException e) {
if (e.getErrorCode() != ErrorCode.SIMULATED_POWER_OFF){
if (e.getErrorCode() != ErrorCode.DATABASE_IS_CLOSED){
// e.printStackTrace();
// TODO don't ignore exceptions
}
......@@ -2388,24 +2390,27 @@ public class Database implements DataHandler {
if (fileLockMethod != FileLock.LOCK_SERIALIZED || readOnly || !reconnectChangePending || closing) {
return;
}
// to avoid race conditions, we already set it here
// (overlap with checkpointAllowed)
checkpointRunning = true;
if (!checkpointAllowed) {
checkpointRunning = false;
return;
}
long now = System.currentTimeMillis();
if (now > reconnectCheckNext + SysProperties.RECONNECT_CHECK_DELAY) {
if (SysProperties.CHECK && checkpointAllowed < 0) {
Message.throwInternalError();
}
synchronized (reconnectSync) {
if (checkpointAllowed > 0) {
return;
}
checkpointRunning = true;
}
synchronized (this) {
getTrace().debug("checkpoint start");
flushIndexes(0);
checkpoint();
reconnectModified(false);
checkpointRunning = false;
getTrace().debug("checkpoint end");
}
synchronized (reconnectSync) {
checkpointRunning = false;
}
}
}
......@@ -2462,33 +2467,44 @@ public class Database implements DataHandler {
* false if another connection was faster
*/
public boolean beforeWriting() {
if (fileLockMethod == FileLock.LOCK_SERIALIZED) {
// to avoid race conditions, we already set it here
// (overlap with checkpointRunning)
checkpointAllowed = false;
while (checkpointRunning) {
try {
Thread.sleep(10 + (int) (Math.random() * 10));
} catch (Exception e) {
// ignore
}
if (fileLockMethod != FileLock.LOCK_SERIALIZED) {
return true;
}
while (checkpointRunning) {
try {
Thread.sleep(10 + (int) (Math.random() * 10));
} catch (Exception e) {
// ignore
}
boolean writingAllowed = reconnectModified(true);
if (!writingAllowed) {
// make sure the next call to isReconnectNeeded() returns yes
reconnectCheckNext = System.currentTimeMillis() - 1;
reconnectLastLock = null;
}
synchronized (reconnectSync) {
if (reconnectModified(true)) {
checkpointAllowed++;
if (SysProperties.CHECK && checkpointAllowed > 20) {
throw Message.throwInternalError();
}
return true;
}
return writingAllowed;
}
return true;
// make sure the next call to isReconnectNeeded() returns true
reconnectCheckNext = System.currentTimeMillis() - 1;
reconnectLastLock = null;
return false;
}
/**
* This method is called after updates are finished.
*/
public void afterWriting() {
checkpointAllowed = true;
if (fileLockMethod != FileLock.LOCK_SERIALIZED) {
return;
}
synchronized (reconnectSync) {
checkpointAllowed--;
}
if (SysProperties.CHECK && checkpointAllowed < 0) {
throw Message.throwInternalError();
}
}
/**
......
......@@ -28,7 +28,7 @@ import org.h2.log.UndoLogRecord;
import org.h2.message.Message;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.result.LocalResult;
import org.h2.result.ResultInterface;
import org.h2.result.Row;
import org.h2.schema.Schema;
import org.h2.store.DataHandler;
......@@ -93,7 +93,7 @@ public class Session extends SessionWithState {
private long sessionStart = System.currentTimeMillis();
private long currentCommandStart;
private HashMap<String, Value> variables;
private HashSet<LocalResult> temporaryResults;
private HashSet<ResultInterface> temporaryResults;
private int queryTimeout = SysProperties.getMaxQueryTimeout();
private int lastUncommittedDelete;
private boolean commitOrRollbackDisabled;
......@@ -1059,7 +1059,7 @@ public class Session extends SessionWithState {
*
* @param result the temporary result set
*/
public void addTemporaryResult(LocalResult result) {
public void addTemporaryResult(ResultInterface result) {
if (!result.needToClose()) {
return;
}
......@@ -1078,7 +1078,7 @@ public class Session extends SessionWithState {
*/
public void closeTemporaryResults() {
if (temporaryResults != null) {
for (LocalResult result : temporaryResults) {
for (ResultInterface result : temporaryResults) {
result.close();
}
temporaryResults = null;
......@@ -1129,6 +1129,10 @@ public class Session extends SessionWithState {
}
}
public void afterWriting() {
database.afterWriting();
}
public SessionInterface reconnect(boolean write) throws SQLException {
readSessionState();
close();
......
......@@ -89,4 +89,10 @@ public interface SessionInterface {
*/
SessionInterface reconnect(boolean write) throws SQLException;
/**
* Called after writing has ended. It needs to be called after
* isReconnectNeeded(true) returned false.
*/
void afterWriting();
}
......@@ -661,4 +661,8 @@ public class SessionRemote extends SessionWithState implements SessionFactory, D
return this;
}
public void afterWriting() {
// nothing to do
}
}
......@@ -306,9 +306,10 @@ public class JdbcConnection extends TraceObject implements Connection {
if (!session.isClosed()) {
try {
// roll back unless that would require to re-connect
// (the transaction can't be rolled back when re-connecting)
// (the transaction can't be rolled back after re-connecting)
if (!session.isReconnectNeeded(true)) {
rollbackInternal();
session.afterWriting();
}
commit = closeAndSetNull(commit);
rollback = closeAndSetNull(rollback);
......@@ -405,8 +406,12 @@ public class JdbcConnection extends TraceObject implements Connection {
try {
debugCodeCall("commit");
checkClosedForWrite();
commit = prepareCommand("COMMIT", commit);
commit.executeUpdate();
try {
commit = prepareCommand("COMMIT", commit);
commit.executeUpdate();
} finally {
afterWriting();
}
} catch (Exception e) {
throw logAndConvert(e);
}
......@@ -423,7 +428,11 @@ public class JdbcConnection extends TraceObject implements Connection {
try {
debugCodeCall("rollback");
checkClosedForWrite();
rollbackInternal();
try {
rollbackInternal();
} finally {
afterWriting();
}
} catch (Exception e) {
throw logAndConvert(e);
}
......@@ -914,7 +923,11 @@ public class JdbcConnection extends TraceObject implements Connection {
JdbcSavepoint sp = convertSavepoint(savepoint);
debugCode("rollback(" + sp.getTraceObjectName() + ");");
checkClosedForWrite();
sp.rollback();
try {
sp.rollback();
} finally {
afterWriting();
}
} catch (Exception e) {
throw logAndConvert(e);
}
......@@ -1338,6 +1351,10 @@ public class JdbcConnection extends TraceObject implements Connection {
}
}
protected void afterWriting() {
session.afterWriting();
}
String getURL() throws SQLException {
checkClosed();
return url;
......@@ -1408,8 +1425,12 @@ public class JdbcConnection extends TraceObject implements Connection {
int id = getNextId(TraceObject.CLOB);
debugCodeAssign("Clob", TraceObject.CLOB, id, "createClob()");
checkClosedForWrite();
ValueLob v = ValueLob.createSmallLob(Value.CLOB, MemoryUtils.EMPTY_BYTES);
return new JdbcClob(this, v, id);
try {
ValueLob v = ValueLob.createSmallLob(Value.CLOB, MemoryUtils.EMPTY_BYTES);
return new JdbcClob(this, v, id);
} finally {
afterWriting();
}
} catch (Exception e) {
throw logAndConvert(e);
}
......@@ -1425,8 +1446,12 @@ public class JdbcConnection extends TraceObject implements Connection {
int id = getNextId(TraceObject.BLOB);
debugCodeAssign("Blob", TraceObject.BLOB, id, "createClob()");
checkClosedForWrite();
ValueLob v = ValueLob.createSmallLob(Value.BLOB, MemoryUtils.EMPTY_BYTES);
return new JdbcBlob(this, v, id);
try {
ValueLob v = ValueLob.createSmallLob(Value.BLOB, MemoryUtils.EMPTY_BYTES);
return new JdbcBlob(this, v, id);
} finally {
afterWriting();
}
} catch (Exception e) {
throw logAndConvert(e);
}
......
......@@ -114,19 +114,23 @@ public class JdbcStatement extends TraceObject implements Statement {
private int executeUpdateInternal(String sql) throws SQLException {
checkClosedForWrite();
closeOldResultSet();
sql = conn.translateSQL(sql, escapeProcessing);
CommandInterface command = conn.prepareCommand(sql, fetchSize);
synchronized (session) {
setExecutingStatement(command);
try {
updateCount = command.executeUpdate();
} finally {
setExecutingStatement(null);
try {
closeOldResultSet();
sql = conn.translateSQL(sql, escapeProcessing);
CommandInterface command = conn.prepareCommand(sql, fetchSize);
synchronized (session) {
setExecutingStatement(command);
try {
updateCount = command.executeUpdate();
} finally {
setExecutingStatement(null);
}
}
command.close();
return updateCount;
} finally {
afterWriting();
}
command.close();
return updateCount;
}
/**
......@@ -153,29 +157,33 @@ public class JdbcStatement extends TraceObject implements Statement {
private boolean executeInternal(String sql) throws SQLException {
int id = getNextId(TraceObject.RESULT_SET);
checkClosedForWrite();
closeOldResultSet();
sql = conn.translateSQL(sql, escapeProcessing);
CommandInterface command = conn.prepareCommand(sql, fetchSize);
boolean returnsResultSet;
synchronized (session) {
setExecutingStatement(command);
try {
if (command.isQuery()) {
returnsResultSet = true;
boolean scrollable = resultSetType != ResultSet.TYPE_FORWARD_ONLY;
boolean updatable = resultSetConcurrency == ResultSet.CONCUR_UPDATABLE;
ResultInterface result = command.executeQuery(maxRows, scrollable);
resultSet = new JdbcResultSet(conn, this, result, id, closedByResultSet, scrollable, updatable);
} else {
returnsResultSet = false;
updateCount = command.executeUpdate();
try {
closeOldResultSet();
sql = conn.translateSQL(sql, escapeProcessing);
CommandInterface command = conn.prepareCommand(sql, fetchSize);
boolean returnsResultSet;
synchronized (session) {
setExecutingStatement(command);
try {
if (command.isQuery()) {
returnsResultSet = true;
boolean scrollable = resultSetType != ResultSet.TYPE_FORWARD_ONLY;
boolean updatable = resultSetConcurrency == ResultSet.CONCUR_UPDATABLE;
ResultInterface result = command.executeQuery(maxRows, scrollable);
resultSet = new JdbcResultSet(conn, this, result, id, closedByResultSet, scrollable, updatable);
} else {
returnsResultSet = false;
updateCount = command.executeUpdate();
}
} finally {
setExecutingStatement(null);
}
} finally {
setExecutingStatement(null);
}
command.close();
return returnsResultSet;
} finally {
afterWriting();
}
command.close();
return returnsResultSet;
}
/**
......@@ -607,29 +615,33 @@ public class JdbcStatement extends TraceObject implements Statement {
try {
debugCodeCall("executeBatch");
checkClosedForWrite();
if (batchCommands == null) {
// TODO batch: check what other database do if no commands are set
batchCommands = ObjectArray.newInstance();
}
int[] result = new int[batchCommands.size()];
boolean error = false;
for (int i = 0; i < batchCommands.size(); i++) {
String sql = batchCommands.get(i);
try {
result[i] = executeUpdateInternal(sql);
} catch (SQLException e) {
logAndConvert(e);
//## Java 1.4 begin ##
result[i] = Statement.EXECUTE_FAILED;
//## Java 1.4 end ##
error = true;
try {
if (batchCommands == null) {
// TODO batch: check what other database do if no commands are set
batchCommands = ObjectArray.newInstance();
}
int[] result = new int[batchCommands.size()];
boolean error = false;
for (int i = 0; i < batchCommands.size(); i++) {
String sql = batchCommands.get(i);
try {
result[i] = executeUpdateInternal(sql);
} catch (SQLException e) {
logAndConvert(e);
//## Java 1.4 begin ##
result[i] = Statement.EXECUTE_FAILED;
//## Java 1.4 end ##
error = true;
}
}
batchCommands = null;
if (error) {
throw new BatchUpdateException(result);
}
return result;
} finally {
afterWriting();
}
batchCommands = null;
if (error) {
throw new BatchUpdateException(result);
}
return result;
} catch (Exception e) {
throw logAndConvert(e);
}
......@@ -883,6 +895,15 @@ public class JdbcStatement extends TraceObject implements Statement {
return false;
}
/**
* Called after each write operation.
*/
void afterWriting() {
if (conn != null) {
conn.afterWriting();
}
}
/**
* INTERNAL.
* Close and old result set if there is still one open.
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论