提交 27252c09 authored 作者: Thomas Mueller's avatar Thomas Mueller

More bugs in the server-less multi-connection mode have been fixed.

上级 1d4b9c7a
......@@ -463,13 +463,13 @@ public class SysProperties {
public static final boolean RECOMPILE_ALWAYS = getBooleanSetting("h2.recompileAlways", false);
/**
* System property <code>h2.reconnectCheckDelay</code> (default: 250).<br />
* System property <code>h2.reconnectCheckDelay</code> (default: 100).<br />
* Check the .lock.db file every this many milliseconds to detect that the
* database was changed. The process writing to the database must first
* notify a change in the .lock.db file, then wait twice this many
* milliseconds before updating the database.
*/
public static final int RECONNECT_CHECK_DELAY = getIntSetting("h2.reconnectCheckDelay", 250);
public static final int RECONNECT_CHECK_DELAY = getIntSetting("h2.reconnectCheckDelay", 100);
/**
* System property <code>h2.redoBufferSize</code> (default: 262144).<br />
......
......@@ -634,7 +634,6 @@ public class Session extends SessionWithState {
Message.throwInternalError();
}
}
database.afterWriting();
if (locks.size() > 0) {
synchronized (database) {
for (int i = 0; i < locks.size(); i++) {
......@@ -1112,8 +1111,20 @@ public class Session extends SessionWithState {
return modificationId;
}
public boolean isReconnectNeeded() {
return database.isReconnectNeeded();
public boolean isReconnectNeeded(boolean write) {
while (true) {
boolean reconnect = database.isReconnectNeeded();
if (reconnect) {
return true;
}
if (write) {
if (database.beforeWriting()) {
return false;
}
} else {
return false;
}
}
}
public SessionInterface reconnect() throws SQLException {
......
......@@ -76,9 +76,10 @@ public interface SessionInterface {
/**
* Check if the database changed and therefore reconnecting is required.
*
* @param write if the next operation may be writing
* @return true if reconnecting is required
*/
boolean isReconnectNeeded();
boolean isReconnectNeeded(boolean write);
/**
* Close the connection and open a new connection.
......
......@@ -652,7 +652,7 @@ public class SessionRemote extends SessionWithState implements SessionFactory, D
return TempFileDeleter.getInstance();
}
public boolean isReconnectNeeded() {
public boolean isReconnectNeeded(boolean write) {
return false;
}
......
......@@ -400,7 +400,7 @@ public class JdbcConnection extends TraceObject implements Connection {
public synchronized void commit() throws SQLException {
try {
debugCodeCall("commit");
checkClosed();
checkClosedForWrite();
commit = prepareCommand("COMMIT", commit);
commit.executeUpdate();
} catch (Exception e) {
......@@ -418,7 +418,7 @@ public class JdbcConnection extends TraceObject implements Connection {
public synchronized void rollback() throws SQLException {
try {
debugCodeCall("rollback");
checkClosed();
checkClosedForWrite();
rollbackInternal();
} catch (Exception e) {
throw logAndConvert(e);
......@@ -641,6 +641,8 @@ public class JdbcConnection extends TraceObject implements Connection {
*/
public void setQueryTimeout(int seconds) throws SQLException {
try {
debugCodeCall("setQueryTimeout", seconds);
checkClosed();
setQueryTimeout = prepareCommand("SET QUERY_TIMEOUT ?", setQueryTimeout);
((ParameterInterface) setQueryTimeout.getParameters().get(0)).setValue(ValueInt.get(seconds * 1000), false);
setQueryTimeout.executeUpdate();
......@@ -654,6 +656,8 @@ public class JdbcConnection extends TraceObject implements Connection {
*/
public int getQueryTimeout() throws SQLException {
try {
debugCodeCall("getQueryTimeout");
checkClosed();
getQueryTimeout = prepareCommand("SELECT VALUE FROM INFORMATION_SCHEMA.SETTINGS WHERE NAME=?", getQueryTimeout);
((ParameterInterface) getQueryTimeout.getParameters().get(0)).setValue(ValueString.get("QUERY_TIMEOUT"), false);
ResultInterface result = getQueryTimeout.executeQuery(0, false);
......@@ -668,7 +672,6 @@ public class JdbcConnection extends TraceObject implements Connection {
} catch (Exception e) {
throw logAndConvert(e);
}
}
/**
......@@ -903,7 +906,7 @@ public class JdbcConnection extends TraceObject implements Connection {
try {
JdbcSavepoint sp = convertSavepoint(savepoint);
debugCode("rollback(" + sp.getTraceObjectName() + ");");
checkClosed();
checkClosedForWrite();
sp.rollback();
} catch (Exception e) {
throw logAndConvert(e);
......@@ -1249,20 +1252,43 @@ public class JdbcConnection extends TraceObject implements Connection {
}
/**
* INTERNAL
* Check if this connection is closed.
* The next operation is a read request.
*
* @return true if the session was re-connected
* @throws SQLException if the connection or session is closed
*/
protected boolean checkClosed() throws SQLException {
return checkClosed(false);
}
/**
* Check if this connection is closed.
* The next operation may be a write request.
*
* @return true if the session was re-connected
* @throws SQLException if the connection or session is closed
*/
private boolean checkClosedForWrite() throws SQLException {
return checkClosed(true);
}
/**
* INTERNAL
* Check if this connection is closed.
*
* @param write if the next operation is possibly writing
* @return true if the session was re-connected
* @throws SQLException if the connection or session is closed
*/
protected boolean checkClosed(boolean write) throws SQLException {
if (session == null) {
throw Message.getSQLException(ErrorCode.OBJECT_CLOSED);
}
if (session.isClosed()) {
throw Message.getSQLException(ErrorCode.DATABASE_CALLED_AT_SHUTDOWN);
}
if (session.isReconnectNeeded()) {
if (session.isReconnectNeeded(write)) {
trace.debug("reconnect");
session = session.reconnect();
setTrace(session.getTrace());
......@@ -1340,7 +1366,7 @@ public class JdbcConnection extends TraceObject implements Connection {
try {
int id = getNextId(TraceObject.CLOB);
debugCodeAssign("Clob", TraceObject.CLOB, id, "createClob()");
checkClosed();
checkClosedForWrite();
ValueLob v = ValueLob.createSmallLob(Value.CLOB, new byte[0]);
return new JdbcClob(this, v, id);
} catch (Exception e) {
......@@ -1357,7 +1383,7 @@ public class JdbcConnection extends TraceObject implements Connection {
try {
int id = getNextId(TraceObject.BLOB);
debugCodeAssign("Blob", TraceObject.BLOB, id, "createClob()");
checkClosed();
checkClosedForWrite();
ValueLob v = ValueLob.createSmallLob(Value.BLOB, new byte[0]);
return new JdbcBlob(this, v, id);
} catch (Exception e) {
......@@ -1375,7 +1401,7 @@ public class JdbcConnection extends TraceObject implements Connection {
try {
int id = getNextId(TraceObject.CLOB);
debugCodeAssign("NClob", TraceObject.CLOB, id, "createNClob()");
checkClosed();
checkClosedForWrite();
ValueLob v = ValueLob.createSmallLob(Value.CLOB, new byte[0]);
return new JdbcClob(this, v, id);
} catch (Exception e) {
......
......@@ -124,7 +124,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
public int executeUpdate() throws SQLException {
try {
debugCodeCall("executeUpdate");
checkClosed();
checkClosedForWrite();
return executeUpdateInternal();
} catch (Exception e) {
throw logAndConvert(e);
......@@ -159,7 +159,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCodeCall("execute");
}
checkClosed();
checkClosedForWrite();
closeOldResultSet();
boolean returnsResultSet;
synchronized (conn.getSession()) {
......@@ -703,7 +703,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setBlob("+parameterIndex+", x);");
}
checkClosed();
checkClosedForWrite();
Value v;
if (x == null) {
v = ValueNull.INSTANCE;
......@@ -728,7 +728,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setBlob("+parameterIndex+", x);");
}
checkClosed();
checkClosedForWrite();
Value v = conn.createBlob(x, -1);
setParameter(parameterIndex, v);
} catch (Exception e) {
......@@ -748,7 +748,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setClob("+parameterIndex+", x);");
}
checkClosed();
checkClosedForWrite();
Value v;
if (x == null) {
v = ValueNull.INSTANCE;
......@@ -773,7 +773,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setClob("+parameterIndex+", x);");
}
checkClosed();
checkClosedForWrite();
Value v;
if (x == null) {
v = ValueNull.INSTANCE;
......@@ -832,7 +832,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setBinaryStream("+parameterIndex+", x, "+length+"L);");
}
checkClosed();
checkClosedForWrite();
Value v = conn.createBlob(x, length);
setParameter(parameterIndex, v);
} catch (Exception e) {
......@@ -888,7 +888,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setAsciiStream("+parameterIndex+", x, "+length+"L);");
}
checkClosed();
checkClosedForWrite();
Value v = conn.createClob(IOUtils.getAsciiReader(x), length);
setParameter(parameterIndex, v);
} catch (Exception e) {
......@@ -943,7 +943,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setCharacterStream("+parameterIndex+", x, "+length+"L);");
}
checkClosed();
checkClosedForWrite();
Value v = conn.createClob(x, length);
setParameter(parameterIndex, v);
} catch (Exception e) {
......@@ -1031,7 +1031,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
public int[] executeBatch() throws SQLException {
try {
debugCodeCall("executeBatch");
checkClosed();
checkClosedForWrite();
if (batchParameters == null) {
// TODO batch: check what other database do if no parameters are set
batchParameters = new ObjectArray();
......@@ -1081,7 +1081,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
public void addBatch() throws SQLException {
try {
debugCodeCall("addBatch");
checkClosed();
checkClosedForWrite();
ObjectArray parameters = command.getParameters();
Value[] set = new Value[parameters.size()];
for (int i = 0; i < parameters.size(); i++) {
......@@ -1271,7 +1271,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setNCharacterStream("+parameterIndex+", x, "+length+"L);");
}
checkClosed();
checkClosedForWrite();
Value v = conn.createClob(x, length);
setParameter(parameterIndex, v);
} catch (Exception e) {
......@@ -1303,7 +1303,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setNClob("+parameterIndex+", x);");
}
checkClosed();
checkClosedForWrite();
Value v;
if (x == null) {
v = ValueNull.INSTANCE;
......@@ -1329,7 +1329,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setNClob("+parameterIndex+", x);");
}
checkClosed();
checkClosedForWrite();
Value v = conn.createClob(x, -1);
setParameter(parameterIndex, v);
} catch (Exception e) {
......@@ -1349,7 +1349,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setClob("+parameterIndex+", x, "+length+"L);");
}
checkClosed();
checkClosedForWrite();
Value v = conn.createClob(x, length);
setParameter(parameterIndex, v);
} catch (Exception e) {
......@@ -1369,7 +1369,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setBlob("+parameterIndex+", x, "+length+"L);");
}
checkClosed();
checkClosedForWrite();
Value v = conn.createBlob(x, length);
setParameter(parameterIndex, v);
} catch (Exception e) {
......@@ -1389,7 +1389,7 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
if (isDebugEnabled()) {
debugCode("setNClob("+parameterIndex+", x, "+length+"L);");
}
checkClosed();
checkClosedForWrite();
Value v = conn.createClob(x, length);
setParameter(parameterIndex, v);
} catch (Exception e) {
......@@ -1413,10 +1413,20 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat
return getTraceObjectName() + ": " + command;
}
boolean checkClosed() throws SQLException {
if (super.checkClosed()) {
protected boolean checkClosed(boolean write) throws SQLException {
if (super.checkClosed(write)) {
// if the session was re-connected, re-prepare the statement
ObjectArray oldParams = command.getParameters();
command = conn.prepareCommand(sql, fetchSize);
ObjectArray newParams = command.getParameters();
for (int i = 0; i < oldParams.size(); i++) {
ParameterInterface old = (ParameterInterface) oldParams.get(i);
Value value = old.getParamValue();
if (value != null) {
ParameterInterface n = (ParameterInterface) newParams.get(i);
n.setValue(value, false);
}
}
return true;
}
return false;
......
......@@ -105,7 +105,7 @@ public class JdbcStatement extends TraceObject implements Statement {
public int executeUpdate(String sql) throws SQLException {
try {
debugCodeCall("executeUpdate", sql);
checkClosed();
checkClosedForWrite();
closeOldResultSet();
if (escapeProcessing) {
sql = conn.translateSQL(sql);
......@@ -144,7 +144,7 @@ public class JdbcStatement extends TraceObject implements Statement {
if (isDebugEnabled()) {
debugCodeCall("execute", sql);
}
checkClosed();
checkClosedForWrite();
closeOldResultSet();
if (escapeProcessing) {
sql = conn.translateSQL(sql);
......@@ -610,7 +610,7 @@ public class JdbcStatement extends TraceObject implements Statement {
public int[] executeBatch() throws SQLException {
try {
debugCodeCall("executeBatch");
checkClosed();
checkClosedForWrite();
if (batchCommands == null) {
// TODO batch: check what other database do if no commands are set
batchCommands = new ObjectArray();
......@@ -843,23 +843,48 @@ public class JdbcStatement extends TraceObject implements Statement {
// =============================================================
/**
* Check if this connection is closed.
* The next operation is a read request.
*
* @return true if the session was re-connected
* @throws SQLException if the connection or session is closed
*/
boolean checkClosed() throws SQLException {
return checkClosed(false);
}
/**
* Check if this connection is closed.
* The next operation may be a write request.
*
* @return true if the session was re-connected
* @throws SQLException if the connection or session is closed
*/
boolean checkClosedForWrite() throws SQLException {
return checkClosed(true);
}
/**
* Check if the statement is closed.
*
* @param write if the next operation is possibly writing
* @return true if a reconnect was required
* @throws SQLException if it is closed
*/
boolean checkClosed() throws SQLException {
protected boolean checkClosed(boolean write) throws SQLException {
if (conn == null) {
throw Message.getSQLException(ErrorCode.OBJECT_CLOSED);
}
if (conn.checkClosed()) {
if (!conn.checkClosed(write)) {
return false;
}
do {
session = conn.getSession();
setTrace(session.getTrace());
} while (conn.checkClosed(write));
return true;
}
return false;
}
/**
* Close and old result set if there is still one open.
......
......@@ -494,11 +494,11 @@ implements XAConnection, XAResource
return isClosed || super.isClosed();
}
protected synchronized boolean checkClosed() throws SQLException {
protected synchronized boolean checkClosed(boolean write) throws SQLException {
if (isClosed) {
throw Message.getSQLException(ErrorCode.OBJECT_CLOSED);
}
return super.checkClosed();
return super.checkClosed(write);
}
}
......
......@@ -240,14 +240,16 @@ public class LogSystem {
/**
* Roll back any uncommitted transactions if required, and apply committed
* changed to the data files.
*
* @return if recovery was needed
*/
public void recover() throws SQLException {
public boolean recover() throws SQLException {
if (database == null) {
return;
return false;
}
synchronized (database) {
if (closed) {
return;
return false;
}
undo = new ObjectArray();
for (int i = 0; i < activeLogs.size(); i++) {
......@@ -282,7 +284,7 @@ public class LogSystem {
if (!readOnly && fileChanged && !containsInDoubtTransactions()) {
checkpoint();
}
return;
return fileChanged;
}
}
......@@ -514,7 +516,6 @@ public class LogSystem {
return;
}
database.checkWritingAllowed();
database.beforeWriting();
if (!file.isDataFile()) {
storageId = -storageId;
}
......@@ -541,7 +542,6 @@ public class LogSystem {
return;
}
database.checkWritingAllowed();
database.beforeWriting();
int storageId = record.getStorageId();
if (!file.isDataFile()) {
storageId = -storageId;
......@@ -569,6 +569,7 @@ public class LogSystem {
if (closed || disabled) {
return;
}
database.checkWritingAllowed();
flushAndCloseUnused();
currentLog = new LogFile(this, currentLog.getId() + 1, fileNamePrefix);
activeLogs.add(currentLog);
......@@ -668,7 +669,7 @@ public class LogSystem {
*
* @param readOnly the new value
*/
void setReadOnly(boolean readOnly) {
public void setReadOnly(boolean readOnly) {
this.readOnly = readOnly;
}
......@@ -706,4 +707,13 @@ public class LogSystem {
return accessMode;
}
/**
* Get the write position.
*
* @return the write position
*/
public String getWritePos() {
return currentLog.getId() + "/" + currentLog.getPos();
}
}
......@@ -200,8 +200,10 @@ public class FileLock {
/**
* Save the lock file.
*
* @return the saved properties
*/
public void save() throws SQLException {
public Properties save() throws SQLException {
try {
OutputStream out = fs.openFileOutputStream(fileName, false);
try {
......@@ -213,6 +215,7 @@ public class FileLock {
if (trace.isDebugEnabled()) {
trace.debug("save " + properties);
}
return properties;
} catch (IOException e) {
throw getExceptionFatal("Could not save properties " + fileName, e);
}
......@@ -301,9 +304,21 @@ public class FileLock {
private void lockSerialized() throws SQLException {
method = SERIALIZED;
if (fs.createNewFile(fileName)) {
properties = new SortedProperties();
properties.setProperty("method", String.valueOf(method));
setUniqueId();
save();
} else {
while (true) {
try {
properties = load();
} catch (SQLException e) {
// ignore
}
return;
}
}
}
private void lockFile() throws SQLException {
......
......@@ -8,18 +8,13 @@ package org.h2.store;
import java.lang.ref.WeakReference;
import java.sql.SQLException;
import org.h2.constant.SysProperties;
import org.h2.engine.Constants;
import org.h2.engine.Database;
import org.h2.engine.DbObject;
import org.h2.index.BtreeIndex;
import org.h2.log.LogSystem;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.table.Table;
import org.h2.util.FileUtils;
import org.h2.util.ObjectArray;
/**
* The writer thread is responsible to flush the transaction log file from time
......@@ -91,35 +86,12 @@ public class WriterThread implements Runnable {
return log;
}
private void flushIndexes(Database database) {
private void flushIndexesIfRequired(Database database) {
long time = System.currentTimeMillis();
if (lastIndexFlush + Constants.FLUSH_INDEX_DELAY > time) {
return;
}
synchronized (database) {
ObjectArray array = database.getAllSchemaObjects(DbObject.INDEX);
for (int i = 0; i < array.size(); i++) {
DbObject obj = (DbObject) array.get(i);
if (obj instanceof BtreeIndex) {
BtreeIndex idx = (BtreeIndex) obj;
if (idx.getLastChange() == 0) {
continue;
}
Table tab = idx.getTable();
if (tab.isLockedExclusively()) {
continue;
}
if (idx.getLastChange() + Constants.FLUSH_INDEX_DELAY > time) {
continue;
}
try {
idx.flush(database.getSystemSession());
} catch (SQLException e) {
database.getTrace(Trace.DATABASE).error("flush index " + idx.getName(), e);
}
}
}
}
database.flushIndexes(time - Constants.FLUSH_INDEX_DELAY);
lastIndexFlush = time;
}
......@@ -141,7 +113,7 @@ public class WriterThread implements Runnable {
break;
}
if (Constants.FLUSH_INDEX_DELAY != 0) {
flushIndexes(database);
flushIndexesIfRequired(database);
}
// checkpoint if required
......
......@@ -34,6 +34,8 @@ public class TestFileLockSerialized extends TestBase {
public void test() throws Exception {
Class.forName("org.h2.Driver");
testThreeMostlyReaders(true);
testThreeMostlyReaders(false);
testTwoReaders();
testTwoWriters();
testPendingWrite();
......@@ -41,6 +43,55 @@ public class TestFileLockSerialized extends TestBase {
testConcurrentReadWrite();
}
private void testThreeMostlyReaders(final boolean write) throws Exception {
deleteDb("fileLockSerialized");
String url = "jdbc:h2:" + baseDir + "/fileLockSerialized;FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE";
int len = 3;
final Exception[] ex = new Exception[1];
final Connection[] conn = new Connection[len];
final boolean[] stop = new boolean[1];
Thread[] threads = new Thread[len];
for (int i = 0; i < len; i++) {
final Connection c = DriverManager.getConnection(url);
conn[i] = c;
if (i == 0) {
conn[i].createStatement().execute("create table test(id int) as select 1");
}
Thread t = new Thread(new Runnable() {
public void run() {
try {
PreparedStatement p = c.prepareStatement("select * from test where id = ?");
while (!stop[0]) {
if (write) {
if (Math.random() > 0.9) {
c.createStatement().execute("update test set id = id");
}
}
p.setInt(1, 1);
Thread.sleep(10);
p.executeQuery();
p.clearParameters();
}
c.close();
} catch (Exception e) {
ex[0] = e;
}
}
});
t.start();
threads[i] = t;
}
Thread.sleep(1000);
stop[0] = true;
for (int i = 0; i < len; i++) {
threads[i].join();
}
if (ex[0] != null) {
throw ex[0];
}
DriverManager.getConnection(url).close();
}
private void testTwoReaders() throws Exception {
deleteDb("fileLockSerialized");
String url = "jdbc:h2:" + baseDir + "/fileLockSerialized;FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE";
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论