提交 945c201f authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental server-less multi-connection mode

上级 c142344e
...@@ -4085,6 +4085,10 @@ public class Parser { ...@@ -4085,6 +4085,10 @@ public class Parser {
readIfEqualOrTo(); readIfEqualOrTo();
read(); read();
return new NoOperation(session); return new NoOperation(session);
} else if (readIf("OPEN_NEW")) {
readIfEqualOrTo();
read();
return new NoOperation(session);
} else if (readIf("RECOVER")) { } else if (readIf("RECOVER")) {
readIfEqualOrTo(); readIfEqualOrTo();
read(); read();
......
...@@ -368,6 +368,12 @@ public abstract class Prepared { ...@@ -368,6 +368,12 @@ public abstract class Prepared {
return sqlStatement; return sqlStatement;
} }
/**
* Get the SQL snippet of the value list.
*
* @param values the value list
* @return the SQL snippet
*/
protected String getSQL(Value[] values) { protected String getSQL(Value[] values) {
StringBuffer buff = new StringBuffer(); StringBuffer buff = new StringBuffer();
for (int i = 0; i < values.length; i++) { for (int i = 0; i < values.length; i++) {
...@@ -382,6 +388,12 @@ public abstract class Prepared { ...@@ -382,6 +388,12 @@ public abstract class Prepared {
return buff.toString(); return buff.toString();
} }
/**
* Get the SQL snippet of the expression list.
*
* @param list the expression list
* @return the SQL snippet
*/
protected String getSQL(Expression[] list) { protected String getSQL(Expression[] list) {
StringBuffer buff = new StringBuffer(); StringBuffer buff = new StringBuffer();
for (int i = 0; i < list.length; i++) { for (int i = 0; i < list.length; i++) {
...@@ -396,7 +408,14 @@ public abstract class Prepared { ...@@ -396,7 +408,14 @@ public abstract class Prepared {
return buff.toString(); return buff.toString();
} }
/**
* Set the SQL statement of the exception to the given row.
*
* @param ex the exception
* @param rowId the row number
* @param values the values of the row
* @return the exception
*/
protected SQLException setRow(SQLException ex, int rowId, String values) { protected SQLException setRow(SQLException ex, int rowId, String values) {
if (ex instanceof JdbcSQLException) { if (ex instanceof JdbcSQLException) {
JdbcSQLException e = (JdbcSQLException) ex; JdbcSQLException e = (JdbcSQLException) ex;
......
...@@ -9,13 +9,11 @@ package org.h2.command.dml; ...@@ -9,13 +9,11 @@ package org.h2.command.dml;
import java.sql.SQLException; import java.sql.SQLException;
import org.h2.command.Prepared; import org.h2.command.Prepared;
import org.h2.constant.SysProperties;
import org.h2.engine.Database; import org.h2.engine.Database;
import org.h2.engine.Session; import org.h2.engine.Session;
import org.h2.log.LogSystem; import org.h2.log.LogSystem;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.result.LocalResult; import org.h2.result.LocalResult;
import org.h2.store.PageStore;
/** /**
* Represents a transactional statement. * Represents a transactional statement.
...@@ -124,12 +122,7 @@ public class TransactionCommand extends Prepared { ...@@ -124,12 +122,7 @@ public class TransactionCommand extends Prepared {
break; break;
case CHECKPOINT: case CHECKPOINT:
session.getUser().checkAdmin(); session.getUser().checkAdmin();
if (SysProperties.PAGE_STORE) { session.getDatabase().checkpoint();
PageStore store = session.getDatabase().getPageStore();
store.checkpoint();
}
session.getDatabase().getLog().checkpoint();
session.getDatabase().getTempFileDeleter().deleteUnused();
break; break;
case SAVEPOINT: case SAVEPOINT:
session.addSavepoint(savepointName); session.addSavepoint(savepointName);
......
...@@ -446,6 +446,15 @@ public class SysProperties { ...@@ -446,6 +446,15 @@ public class SysProperties {
*/ */
public static final boolean RECOMPILE_ALWAYS = getBooleanSetting("h2.recompileAlways", false); public static final boolean RECOMPILE_ALWAYS = getBooleanSetting("h2.recompileAlways", false);
/**
* System property <code>h2.reconnectCheckDelay</code> (default: 250).<br />
* Check the .lock.db file every this many milliseconds to detect that the
* database was changed. The process writing to the database must first
* notify a change in the .lock.db file, then wait twice this many
* milliseconds before updating the database.
*/
public static final int RECONNECT_CHECK_DELAY = getIntSetting("h2.reconnectCheckDelay", 250);
/** /**
* System property <code>h2.redoBufferSize</code> (default: 262144).<br /> * System property <code>h2.redoBufferSize</code> (default: 262144).<br />
* Size of the redo buffer (used at startup when recovering). * Size of the redo buffer (used at startup when recovering).
...@@ -558,6 +567,11 @@ public class SysProperties { ...@@ -558,6 +567,11 @@ public class SysProperties {
*/ */
public static final int COLLATOR_CACHE_SIZE = getCollatorCacheSize(); public static final int COLLATOR_CACHE_SIZE = getCollatorCacheSize();
/**
* The current time this class was loaded (in milliseconds).
*/
public static final long TIME_START = System.currentTimeMillis();
private static final String H2_BASE_DIR = "h2.baseDir"; private static final String H2_BASE_DIR = "h2.baseDir";
private SysProperties() { private SysProperties() {
......
...@@ -277,6 +277,18 @@ public class ConnectionInfo implements Cloneable { ...@@ -277,6 +277,18 @@ public class ConnectionInfo implements Cloneable {
userPasswordHash = sha.getKeyPasswordHash(user, password); userPasswordHash = sha.getKeyPasswordHash(user, password);
} }
/**
* Get a boolean property if it is set and return the value.
*
* @param key the property name
* @param defaultValue the default value
* @return the value
*/
public boolean getProperty(String key, boolean defaultValue) {
String x = getProperty(key, null);
return x == null ? defaultValue : Boolean.valueOf(x).booleanValue();
}
/** /**
* Remove a boolean property if it is set and return the value. * Remove a boolean property if it is set and return the value.
* *
......
...@@ -13,6 +13,7 @@ import java.util.Collections; ...@@ -13,6 +13,7 @@ import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Properties;
import java.util.Set; import java.util.Set;
import java.util.StringTokenizer; import java.util.StringTokenizer;
...@@ -165,6 +166,10 @@ public class Database implements DataHandler { ...@@ -165,6 +166,10 @@ public class Database implements DataHandler {
private TempFileDeleter tempFileDeleter = TempFileDeleter.getInstance(); private TempFileDeleter tempFileDeleter = TempFileDeleter.getInstance();
private PageStore pageStore; private PageStore pageStore;
private Properties reconnectLastLock;
private long reconnectCheckNext;
private boolean reconnectChangePending;
public Database(String name, ConnectionInfo ci, String cipher) throws SQLException { public Database(String name, ConnectionInfo ci, String cipher) throws SQLException {
this.compareMode = new CompareMode(null, null, 0); this.compareMode = new CompareMode(null, null, 0);
this.persistent = ci.isPersistent(); this.persistent = ci.isPersistent();
...@@ -172,15 +177,18 @@ public class Database implements DataHandler { ...@@ -172,15 +177,18 @@ public class Database implements DataHandler {
this.databaseName = name; this.databaseName = name;
this.databaseShortName = parseDatabaseShortName(); this.databaseShortName = parseDatabaseShortName();
this.cipher = cipher; this.cipher = cipher;
String lockMethodName = ci.removeProperty("FILE_LOCK", null); String lockMethodName = ci.getProperty("FILE_LOCK", null);
this.accessModeLog = ci.removeProperty("ACCESS_MODE_LOG", "rw").toLowerCase(); this.accessModeLog = ci.getProperty("ACCESS_MODE_LOG", "rw").toLowerCase();
this.accessModeData = ci.removeProperty("ACCESS_MODE_DATA", "rw").toLowerCase(); this.accessModeData = ci.getProperty("ACCESS_MODE_DATA", "rw").toLowerCase();
this.autoServerMode = ci.removeProperty("AUTO_SERVER", false); this.autoServerMode = ci.getProperty("AUTO_SERVER", false);
if ("r".equals(accessModeData)) { if ("r".equals(accessModeData)) {
readOnly = true; readOnly = true;
accessModeLog = "r"; accessModeLog = "r";
} }
this.fileLockMethod = FileLock.getFileLockMethod(lockMethodName); this.fileLockMethod = FileLock.getFileLockMethod(lockMethodName);
if (fileLockMethod == FileLock.LOCK_SERIALIZED) {
writeDelay = SysProperties.MIN_WRITE_DELAY;
}
this.databaseURL = ci.getURL(); this.databaseURL = ci.getURL();
this.eventListener = ci.getDatabaseEventListenerObject(); this.eventListener = ci.getDatabaseEventListenerObject();
ci.removeDatabaseEventListenerObject(); ci.removeDatabaseEventListenerObject();
...@@ -199,12 +207,16 @@ public class Database implements DataHandler { ...@@ -199,12 +207,16 @@ public class Database implements DataHandler {
if (ignoreSummary != null) { if (ignoreSummary != null) {
this.recovery = true; this.recovery = true;
} }
this.multiVersion = ci.removeProperty("MVCC", false); this.multiVersion = ci.getProperty("MVCC", false);
boolean closeAtVmShutdown = ci.removeProperty("DB_CLOSE_ON_EXIT", true); boolean closeAtVmShutdown = ci.getProperty("DB_CLOSE_ON_EXIT", true);
int traceLevelFile = ci.getIntProperty(SetTypes.TRACE_LEVEL_FILE, TraceSystem.DEFAULT_TRACE_LEVEL_FILE); int traceLevelFile = ci.getIntProperty(SetTypes.TRACE_LEVEL_FILE, TraceSystem.DEFAULT_TRACE_LEVEL_FILE);
int traceLevelSystemOut = ci.getIntProperty(SetTypes.TRACE_LEVEL_SYSTEM_OUT, int traceLevelSystemOut = ci.getIntProperty(SetTypes.TRACE_LEVEL_SYSTEM_OUT,
TraceSystem.DEFAULT_TRACE_LEVEL_SYSTEM_OUT); TraceSystem.DEFAULT_TRACE_LEVEL_SYSTEM_OUT);
this.cacheType = StringUtils.toUpperEnglish(ci.removeProperty("CACHE_TYPE", CacheLRU.TYPE_NAME)); this.cacheType = StringUtils.toUpperEnglish(ci.removeProperty("CACHE_TYPE", CacheLRU.TYPE_NAME));
openDatabase(traceLevelFile, traceLevelSystemOut, closeAtVmShutdown);
}
private void openDatabase(int traceLevelFile, int traceLevelSystemOut, boolean closeAtVmShutdown) throws SQLException {
try { try {
open(traceLevelFile, traceLevelSystemOut); open(traceLevelFile, traceLevelSystemOut);
if (closeAtVmShutdown) { if (closeAtVmShutdown) {
...@@ -295,6 +307,25 @@ public class Database implements DataHandler { ...@@ -295,6 +307,25 @@ public class Database implements DataHandler {
return modificationDataId; return modificationDataId;
} }
private void reconnectModified(boolean pending) {
if (readOnly || pending == reconnectChangePending || lock == null) {
return;
}
try {
if (pending) {
getTrace().debug("wait before writing");
Thread.sleep((int) (SysProperties.RECONNECT_CHECK_DELAY * 1.1));
}
lock.setProperty("modificationDataId", Long.toString(modificationDataId));
lock.setProperty("modificationMetaId", Long.toString(modificationMetaId));
lock.setProperty("changePending", pending ? "true" : null);
lock.save();
reconnectChangePending = pending;
} catch (Exception e) {
getTrace().error("pending:"+ pending, e);
}
}
public long getNextModificationDataId() { public long getNextModificationDataId() {
return ++modificationDataId; return ++modificationDataId;
} }
...@@ -478,12 +509,14 @@ public class Database implements DataHandler { ...@@ -478,12 +509,14 @@ public class Database implements DataHandler {
} }
} }
if (!readOnly && fileLockMethod != FileLock.LOCK_NO) { if (!readOnly && fileLockMethod != FileLock.LOCK_NO) {
lock = new FileLock(traceSystem, Constants.LOCK_SLEEP); lock = new FileLock(traceSystem, databaseName + Constants.SUFFIX_LOCK_FILE, Constants.LOCK_SLEEP);
lock.lock(databaseName + Constants.SUFFIX_LOCK_FILE, fileLockMethod == FileLock.LOCK_SOCKET); lock.lock(fileLockMethod);
if (autoServerMode) { if (autoServerMode) {
startServer(lock.getUniqueId()); startServer(lock.getUniqueId());
} }
} }
// wait until pending changes are written
isReconnectNeeded();
if (SysProperties.PAGE_STORE) { if (SysProperties.PAGE_STORE) {
PageStore store = getPageStore(); PageStore store = getPageStore();
if (!store.isNew()) { if (!store.isNew()) {
...@@ -602,7 +635,8 @@ public class Database implements DataHandler { ...@@ -602,7 +635,8 @@ public class Database implements DataHandler {
"-key", key, databaseName}); "-key", key, databaseName});
server.start(); server.start();
String address = NetUtils.getLocalAddress() + ":" + server.getPort(); String address = NetUtils.getLocalAddress() + ":" + server.getPort();
lock.addProperty("server", address); lock.setProperty("server", address);
lock.save();
} }
private void stopServer() { private void stopServer() {
...@@ -2138,4 +2172,83 @@ public class Database implements DataHandler { ...@@ -2138,4 +2172,83 @@ public class Database implements DataHandler {
return null; return null;
} }
public boolean isReconnectNeeded() {
if (fileLockMethod != FileLock.LOCK_SERIALIZED) {
return false;
}
long now = System.currentTimeMillis();
if (now < reconnectCheckNext) {
return false;
}
reconnectCheckNext = now + SysProperties.RECONNECT_CHECK_DELAY;
if (lock == null) {
lock = new FileLock(traceSystem, databaseName + Constants.SUFFIX_LOCK_FILE, Constants.LOCK_SLEEP);
}
Properties prop;
try {
while (true) {
prop = lock.load();
if (prop.equals(reconnectLastLock)) {
return false;
}
if (prop.getProperty("changePending", null) == null) {
break;
}
getTrace().debug("delay (change pending)");
Thread.sleep(SysProperties.RECONNECT_CHECK_DELAY);
}
reconnectLastLock = prop;
} catch (Exception e) {
getTrace().error("readOnly:" + readOnly, e);
// ignore
}
return true;
}
/**
* This method is called after writing to the database.
*/
public void afterWriting() throws SQLException {
if (fileLockMethod != FileLock.LOCK_SERIALIZED || readOnly) {
return;
}
reconnectCheckNext = System.currentTimeMillis() + 1;
}
/**
* Flush all changes when using the serialized mode, and if there are
* pending changes.
*/
public void checkpointIfRequired() throws SQLException {
if (fileLockMethod != FileLock.LOCK_SERIALIZED || readOnly || !reconnectChangePending) {
return;
}
long now = System.currentTimeMillis();
if (now > reconnectCheckNext) {
getTrace().debug("checkpoint");
checkpoint();
reconnectModified(false);
}
}
/**
* Flush all changes and open a new log file.
*/
public void checkpoint() throws SQLException {
if (SysProperties.PAGE_STORE) {
pageStore.checkpoint();
}
getLog().checkpoint();
getTempFileDeleter().deleteUnused();
}
/**
* This method is called before writing to the log file.
*/
public void beforeWriting() {
if (fileLockMethod == FileLock.LOCK_SERIALIZED) {
reconnectModified(true);
}
}
} }
...@@ -16,6 +16,7 @@ import org.h2.constant.ErrorCode; ...@@ -16,6 +16,7 @@ import org.h2.constant.ErrorCode;
import org.h2.constant.SysProperties; import org.h2.constant.SysProperties;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.store.FileLock;
import org.h2.util.RandomUtils; import org.h2.util.RandomUtils;
import org.h2.util.StringUtils; import org.h2.util.StringUtils;
...@@ -41,7 +42,7 @@ public class Engine { ...@@ -41,7 +42,7 @@ public class Engine {
private Session openSession(ConnectionInfo ci, boolean ifExists, String cipher) throws SQLException { private Session openSession(ConnectionInfo ci, boolean ifExists, String cipher) throws SQLException {
String name = ci.getName(); String name = ci.getName();
Database database; Database database;
boolean openNew = ci.removeProperty("OPEN_NEW", false); boolean openNew = ci.getProperty("OPEN_NEW", false);
if (openNew || ci.isUnnamedInMemory()) { if (openNew || ci.isUnnamedInMemory()) {
database = null; database = null;
} else { } else {
...@@ -105,8 +106,20 @@ public class Engine { ...@@ -105,8 +106,20 @@ public class Engine {
*/ */
public Session getSession(ConnectionInfo ci) throws SQLException { public Session getSession(ConnectionInfo ci) throws SQLException {
try { try {
ConnectionInfo backup = null;
String lockMethodName = ci.getProperty("FILE_LOCK", null);
int fileLockMethod = FileLock.getFileLockMethod(lockMethodName);
if (fileLockMethod == FileLock.LOCK_SERIALIZED) {
try {
backup = (ConnectionInfo) ci.clone();
} catch (CloneNotSupportedException e) {
}
}
Session session = openSession(ci); Session session = openSession(ci);
validateUserAndPassword(true); validateUserAndPassword(true);
if (backup != null) {
session.setConnectionInfo(backup);
}
return session; return session;
} catch (SQLException e) { } catch (SQLException e) {
if (e.getErrorCode() == ErrorCode.WRONG_USER_OR_PASSWORD) { if (e.getErrorCode() == ErrorCode.WRONG_USER_OR_PASSWORD) {
......
...@@ -46,7 +46,7 @@ import org.h2.value.ValueNull; ...@@ -46,7 +46,7 @@ import org.h2.value.ValueNull;
* mode, this object resides on the server side and communicates with a * mode, this object resides on the server side and communicates with a
* SessionRemote object on the client side. * SessionRemote object on the client side.
*/ */
public class Session implements SessionInterface { public class Session extends SessionWithState {
/** /**
* The prefix of generated identifiers. It may not have letters, because * The prefix of generated identifiers. It may not have letters, because
...@@ -56,9 +56,10 @@ public class Session implements SessionInterface { ...@@ -56,9 +56,10 @@ public class Session implements SessionInterface {
private static int nextSerialId; private static int nextSerialId;
private final int serialId = nextSerialId++; private final int serialId = nextSerialId++;
private Database database;
private ConnectionInfo connectionInfo;
private User user; private User user;
private int id; private int id;
private Database database;
private ObjectArray locks = new ObjectArray(); private ObjectArray locks = new ObjectArray();
private UndoLog undoLog; private UndoLog undoLog;
private boolean autoCommit = true; private boolean autoCommit = true;
...@@ -97,6 +98,7 @@ public class Session implements SessionInterface { ...@@ -97,6 +98,7 @@ public class Session implements SessionInterface {
private boolean commitOrRollbackDisabled; private boolean commitOrRollbackDisabled;
private Table waitForLock; private Table waitForLock;
private int modificationId; private int modificationId;
private int modificationIdState;
Session(Database database, User user, int id) { Session(Database database, User user, int id) {
this.database = database; this.database = database;
...@@ -625,12 +627,13 @@ public class Session implements SessionInterface { ...@@ -625,12 +627,13 @@ public class Session implements SessionInterface {
} }
} }
private void unlockAll() { private void unlockAll() throws SQLException {
if (SysProperties.CHECK) { if (SysProperties.CHECK) {
if (undoLog.size() > 0) { if (undoLog.size() > 0) {
Message.throwInternalError(); Message.throwInternalError();
} }
} }
database.afterWriting();
if (locks.size() > 0) { if (locks.size() > 0) {
synchronized (database) { synchronized (database) {
for (int i = 0; i < locks.size(); i++) { for (int i = 0; i < locks.size(); i++) {
...@@ -641,6 +644,10 @@ public class Session implements SessionInterface { ...@@ -641,6 +644,10 @@ public class Session implements SessionInterface {
} }
} }
savepoints = null; savepoints = null;
if (modificationIdState != modificationId) {
sessionStateChanged = true;
}
} }
private void cleanTempTables(boolean closeSession) throws SQLException { private void cleanTempTables(boolean closeSession) throws SQLException {
...@@ -1104,4 +1111,20 @@ public class Session implements SessionInterface { ...@@ -1104,4 +1111,20 @@ public class Session implements SessionInterface {
return modificationId; return modificationId;
} }
public boolean isReconnectNeeded() {
return database.isReconnectNeeded();
}
public SessionInterface reconnect() throws SQLException {
readSessionState();
Session newSession = Engine.getInstance().getSession(connectionInfo);
newSession.sessionState = sessionState;
newSession.recreateSessionState();
return newSession;
}
public void setConnectionInfo(ConnectionInfo ci) {
connectionInfo = ci;
}
} }
...@@ -72,4 +72,18 @@ public interface SessionInterface { ...@@ -72,4 +72,18 @@ public interface SessionInterface {
* Cancel the current or next command (called when closing a connection). * Cancel the current or next command (called when closing a connection).
*/ */
void cancel(); void cancel();
/**
* Check if the database changed and therefore reconnecting is required.
*
* @return true if reconnecting is required
*/
boolean isReconnectNeeded();
/**
* Close the connection and open a new connection.
*
* @return the new connection
*/
SessionInterface reconnect() throws SQLException;
} }
...@@ -41,7 +41,7 @@ import org.h2.value.ValueString; ...@@ -41,7 +41,7 @@ import org.h2.value.ValueString;
* The client side part of a session when using the server mode. This object * The client side part of a session when using the server mode. This object
* communicates with a Session on the server side. * communicates with a Session on the server side.
*/ */
public class SessionRemote implements SessionInterface, SessionFactory, DataHandler { public class SessionRemote extends SessionWithState implements SessionFactory, DataHandler {
public static final int SESSION_PREPARE = 0; public static final int SESSION_PREPARE = 0;
public static final int SESSION_CLOSE = 1; public static final int SESSION_CLOSE = 1;
...@@ -81,9 +81,6 @@ public class SessionRemote implements SessionInterface, SessionFactory, DataHand ...@@ -81,9 +81,6 @@ public class SessionRemote implements SessionInterface, SessionFactory, DataHand
private boolean autoReconnect; private boolean autoReconnect;
private int lastReconnect; private int lastReconnect;
private SessionInterface embedded; private SessionInterface embedded;
private boolean sessionStateChanged;
private ObjectArray sessionState;
private boolean sessionStateUpdating;
private DatabaseEventListener eventListener; private DatabaseEventListener eventListener;
public SessionRemote() { public SessionRemote() {
...@@ -259,7 +256,7 @@ public class SessionRemote implements SessionInterface, SessionFactory, DataHand ...@@ -259,7 +256,7 @@ public class SessionRemote implements SessionInterface, SessionFactory, DataHand
// OPEN_NEW must be removed now, otherwise // OPEN_NEW must be removed now, otherwise
// opening a session with AUTO_SERVER fails // opening a session with AUTO_SERVER fails
// if another connection is already open // if another connection is already open
backup.removeProperty("OPEN_NEW", false); backup.removeProperty("OPEN_NEW", null);
connectServer(backup); connectServer(backup);
return this; return this;
} }
...@@ -451,19 +448,7 @@ public class SessionRemote implements SessionInterface, SessionFactory, DataHand ...@@ -451,19 +448,7 @@ public class SessionRemote implements SessionInterface, SessionFactory, DataHand
// unfortunately // unfortunately
connectEmbeddedOrServer(true); connectEmbeddedOrServer(true);
} }
if (sessionState != null && sessionState.size() > 0) { recreateSessionState();
sessionStateUpdating = true;
try {
for (int i = 0; i < sessionState.size(); i++) {
String sql = (String) sessionState.get(i);
CommandInterface ci = prepareCommand(sql, Integer.MAX_VALUE);
ci.executeUpdate();
}
} finally {
sessionStateUpdating = false;
sessionStateChanged = false;
}
}
if (eventListener != null) { if (eventListener != null) {
eventListener.setProgress(DatabaseEventListener.STATE_RECONNECTED, databaseName, count, eventListener.setProgress(DatabaseEventListener.STATE_RECONNECTED, databaseName, count,
SysProperties.MAX_RECONNECT); SysProperties.MAX_RECONNECT);
...@@ -663,25 +648,16 @@ public class SessionRemote implements SessionInterface, SessionFactory, DataHand ...@@ -663,25 +648,16 @@ public class SessionRemote implements SessionInterface, SessionFactory, DataHand
return lastReconnect; return lastReconnect;
} }
/**
* Read the session state if necessary.
*/
public void readSessionState() throws SQLException {
if (!sessionStateChanged || sessionStateUpdating) {
return;
}
sessionStateChanged = false;
sessionState = new ObjectArray();
CommandInterface ci = prepareCommand("SELECT * FROM INFORMATION_SCHEMA.SESSION_STATE", Integer.MAX_VALUE);
ResultInterface result = ci.executeQuery(0, false);
while (result.next()) {
Value[] row = result.currentRow();
sessionState.add(row[1].getString());
}
}
public TempFileDeleter getTempFileDeleter() { public TempFileDeleter getTempFileDeleter() {
return TempFileDeleter.getInstance(); return TempFileDeleter.getInstance();
} }
public boolean isReconnectNeeded() {
return false;
}
public SessionInterface reconnect() {
return this;
}
} }
/*
* Copyright 2004-2009 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.engine;
import java.sql.SQLException;
import org.h2.command.CommandInterface;
import org.h2.result.ResultInterface;
import org.h2.util.ObjectArray;
import org.h2.value.Value;
/**
* The base class for both remote and embedded sessions.
*/
public abstract class SessionWithState implements SessionInterface {
protected ObjectArray sessionState;
protected boolean sessionStateChanged;
private boolean sessionStateUpdating;
/**
* Re-create the session state using the stored sessionState list.
*/
protected void recreateSessionState() throws SQLException {
if (sessionState != null && sessionState.size() > 0) {
sessionStateUpdating = true;
try {
for (int i = 0; i < sessionState.size(); i++) {
String sql = (String) sessionState.get(i);
CommandInterface ci = prepareCommand(sql, Integer.MAX_VALUE);
ci.executeUpdate();
}
} finally {
sessionStateUpdating = false;
sessionStateChanged = false;
}
}
}
/**
* Read the session state if necessary.
*/
public void readSessionState() throws SQLException {
if (!sessionStateChanged || sessionStateUpdating) {
return;
}
sessionStateChanged = false;
sessionState = new ObjectArray();
CommandInterface ci = prepareCommand("SELECT * FROM INFORMATION_SCHEMA.SESSION_STATE", Integer.MAX_VALUE);
ResultInterface result = ci.executeQuery(0, false);
while (result.next()) {
Value[] row = result.currentRow();
sessionState.add(row[1].getString());
}
}
}
...@@ -62,9 +62,6 @@ import java.sql.SQLClientInfoException; ...@@ -62,9 +62,6 @@ import java.sql.SQLClientInfoException;
* </p> * </p>
*/ */
public class JdbcConnection extends TraceObject implements Connection { public class JdbcConnection extends TraceObject implements Connection {
// TODO test: check if enough synchronization on jdbc objects
// TODO feature: auto-reconnect on lost connection
private String url; private String url;
private String user; private String user;
...@@ -1266,6 +1263,12 @@ public class JdbcConnection extends TraceObject implements Connection { ...@@ -1266,6 +1263,12 @@ public class JdbcConnection extends TraceObject implements Connection {
if (session.isClosed()) { if (session.isClosed()) {
throw Message.getSQLException(ErrorCode.DATABASE_CALLED_AT_SHUTDOWN); throw Message.getSQLException(ErrorCode.DATABASE_CALLED_AT_SHUTDOWN);
} }
if (session.isReconnectNeeded()) {
trace.debug("reconnect");
session = session.reconnect();
trace = session.getTrace();
setTrace(trace, TraceObject.CONNECTION, getTraceId());
}
} }
String getURL() throws SQLException { String getURL() throws SQLException {
......
...@@ -509,6 +509,7 @@ public class LogSystem { ...@@ -509,6 +509,7 @@ public class LogSystem {
return; return;
} }
database.checkWritingAllowed(); database.checkWritingAllowed();
database.beforeWriting();
if (!file.isDataFile()) { if (!file.isDataFile()) {
storageId = -storageId; storageId = -storageId;
} }
...@@ -535,6 +536,7 @@ public class LogSystem { ...@@ -535,6 +536,7 @@ public class LogSystem {
return; return;
} }
database.checkWritingAllowed(); database.checkWritingAllowed();
database.beforeWriting();
int storageId = record.getStorageId(); int storageId = record.getStorageId();
if (!file.isDataFile()) { if (!file.isDataFile()) {
storageId = -storageId; storageId = -storageId;
......
...@@ -178,6 +178,10 @@ public class TraceSystem implements TraceWriter { ...@@ -178,6 +178,10 @@ public class TraceSystem implements TraceWriter {
updateLevel(); updateLevel();
} }
public int getLevelSystemOut() {
return levelSystemOut;
}
/** /**
* Set the file trace level. * Set the file trace level.
* *
...@@ -209,6 +213,10 @@ public class TraceSystem implements TraceWriter { ...@@ -209,6 +213,10 @@ public class TraceSystem implements TraceWriter {
updateLevel(); updateLevel();
} }
public int getLevelFile() {
return levelFile;
}
private String format(String module, String s) { private String format(String module, String s) {
synchronized (dateFormat) { synchronized (dateFormat) {
return dateFormat.format(new Date()) + module + ": " + s; return dateFormat.format(new Date()) + module + ": " + s;
......
...@@ -1843,7 +1843,7 @@ ARRAY ...@@ -1843,7 +1843,7 @@ ARRAY
"Functions (Aggregate)","AVG"," "Functions (Aggregate)","AVG","
AVG([DISTINCT] {int | long | decimal | double}): value AVG([DISTINCT] {int | long | decimal | double}): value
"," ","
The average (mean) value. The average (mean) value.
If no rows are selected, the result is NULL. If no rows are selected, the result is NULL.
Aggregates are only allowed in select statements. Aggregates are only allowed in select statements.
"," ","
...@@ -1893,7 +1893,7 @@ GROUP_CONCAT(NAME ORDER BY ID SEPARATOR ', ') ...@@ -1893,7 +1893,7 @@ GROUP_CONCAT(NAME ORDER BY ID SEPARATOR ', ')
"Functions (Aggregate)","MAX"," "Functions (Aggregate)","MAX","
MAX(value): value MAX(value): value
"," ","
The highest value. The highest value.
If no rows are selected, the result is NULL. If no rows are selected, the result is NULL.
Aggregates are only allowed in select statements. Aggregates are only allowed in select statements.
"," ","
...@@ -1903,7 +1903,7 @@ MAX(NAME) ...@@ -1903,7 +1903,7 @@ MAX(NAME)
"Functions (Aggregate)","MIN"," "Functions (Aggregate)","MIN","
MIN(value): value MIN(value): value
"," ","
The lowest value. The lowest value.
If no rows are selected, the result is NULL. If no rows are selected, the result is NULL.
Aggregates are only allowed in select statements. Aggregates are only allowed in select statements.
"," ","
...@@ -1913,7 +1913,7 @@ MIN(NAME) ...@@ -1913,7 +1913,7 @@ MIN(NAME)
"Functions (Aggregate)","SUM"," "Functions (Aggregate)","SUM","
SUM([DISTINCT] {int | long | decimal | double}): value SUM([DISTINCT] {int | long | decimal | double}): value
"," ","
The sum of all values. The sum of all values.
If no rows are selected, the result is NULL. If no rows are selected, the result is NULL.
Aggregates are only allowed in select statements. Aggregates are only allowed in select statements.
"," ","
......
...@@ -54,8 +54,14 @@ public class FileLock { ...@@ -54,8 +54,14 @@ public class FileLock {
*/ */
public static final int LOCK_SOCKET = 2; public static final int LOCK_SOCKET = 2;
/**
* This locking method means multiple writers are allowed, and they
* synchronize themselves.
*/
public static final int LOCK_SERIALIZED = 3;
private static final String MAGIC = "FileLock"; private static final String MAGIC = "FileLock";
private static final String FILE = "file", SOCKET = "socket"; private static final String FILE = "file", SOCKET = "socket", SERIALIZED = "serialized";
private static final int RANDOM_BYTES = 16; private static final int RANDOM_BYTES = 16;
private static final int SLEEP_GAP = 25; private static final int SLEEP_GAP = 25;
private static final int TIME_GRANULARITY = 2000; private static final int TIME_GRANULARITY = 2000;
...@@ -101,30 +107,34 @@ public class FileLock { ...@@ -101,30 +107,34 @@ public class FileLock {
* @param traceSystem the trace system to use * @param traceSystem the trace system to use
* @param sleep the number of milliseconds to sleep * @param sleep the number of milliseconds to sleep
*/ */
public FileLock(TraceSystem traceSystem, int sleep) { public FileLock(TraceSystem traceSystem, String fileName, int sleep) {
this.trace = traceSystem.getTrace(Trace.FILE_LOCK); this.trace = traceSystem.getTrace(Trace.FILE_LOCK);
this.fileName = fileName;
this.sleep = sleep; this.sleep = sleep;
} }
/** /**
* Lock the file if possible. A file may only be locked once. * Lock the file if possible. A file may only be locked once.
* *
* @param fileName the name of the properties file to use * @param fileLockMethod the file locking method to use
* @param allowSocket if the socket locking protocol should be used if
* possible
* @throws SQLException if locking was not successful * @throws SQLException if locking was not successful
*/ */
public synchronized void lock(String fileName, boolean allowSocket) throws SQLException { public synchronized void lock(int fileLockMethod) throws SQLException {
this.fs = FileSystem.getInstance(fileName); this.fs = FileSystem.getInstance(fileName);
this.fileName = fileName;
checkServer(); checkServer();
if (locked) { if (locked) {
Message.throwInternalError("already locked"); Message.throwInternalError("already locked");
} }
if (allowSocket) { switch (fileLockMethod) {
lockSocket(); case LOCK_FILE:
} else {
lockFile(); lockFile();
break;
case LOCK_SOCKET:
lockSocket();
break;
case LOCK_SERIALIZED:
lockSerialized();
break;
} }
locked = true; locked = true;
} }
...@@ -155,14 +165,18 @@ public class FileLock { ...@@ -155,14 +165,18 @@ public class FileLock {
} }
/** /**
* Add a setting to the properties file. * Add or change a setting to the properties. This call does not save the
* file.
* *
* @param key the key * @param key the key
* @param value the value * @param value the value
*/ */
public void addProperty(String key, String value) throws SQLException { public void setProperty(String key, String value) throws SQLException {
properties.put(key, value); if (value == null) {
save(); properties.remove(key);
} else {
properties.put(key, value);
}
} }
/** /**
...@@ -187,11 +201,10 @@ public class FileLock { ...@@ -187,11 +201,10 @@ public class FileLock {
/** /**
* Save the lock file. * Save the lock file.
*/ */
void save() throws SQLException { public void save() throws SQLException {
try { try {
OutputStream out = fs.openFileOutputStream(fileName, false); OutputStream out = fs.openFileOutputStream(fileName, false);
try { try {
properties.setProperty("method", String.valueOf(method));
properties.store(out, MAGIC); properties.store(out, MAGIC);
} finally { } finally {
out.close(); out.close();
...@@ -242,7 +255,12 @@ public class FileLock { ...@@ -242,7 +255,12 @@ public class FileLock {
} }
} }
private Properties load() throws SQLException { /**
* Load the properties file.
*
* @return the properties
*/
public Properties load() throws SQLException {
try { try {
Properties p2 = SortedProperties.loadProperties(fileName); Properties p2 = SortedProperties.loadProperties(fileName);
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
...@@ -281,9 +299,17 @@ public class FileLock { ...@@ -281,9 +299,17 @@ public class FileLock {
properties.setProperty("id", uniqueId); properties.setProperty("id", uniqueId);
} }
private void lockSerialized() throws SQLException {
method = SERIALIZED;
properties = new SortedProperties();
properties.setProperty("method", String.valueOf(method));
setUniqueId();
}
private void lockFile() throws SQLException { private void lockFile() throws SQLException {
method = FILE; method = FILE;
properties = new SortedProperties(); properties = new SortedProperties();
properties.setProperty("method", String.valueOf(method));
setUniqueId(); setUniqueId();
if (!fs.createNewFile(fileName)) { if (!fs.createNewFile(fileName)) {
waitUntilOld(); waitUntilOld();
...@@ -337,6 +363,7 @@ public class FileLock { ...@@ -337,6 +363,7 @@ public class FileLock {
private void lockSocket() throws SQLException { private void lockSocket() throws SQLException {
method = SOCKET; method = SOCKET;
properties = new SortedProperties(); properties = new SortedProperties();
properties.setProperty("method", String.valueOf(method));
setUniqueId(); setUniqueId();
// if this returns 127.0.0.1, // if this returns 127.0.0.1,
// the computer is probably not networked // the computer is probably not networked
...@@ -458,6 +485,8 @@ public class FileLock { ...@@ -458,6 +485,8 @@ public class FileLock {
return FileLock.LOCK_NO; return FileLock.LOCK_NO;
} else if (method.equalsIgnoreCase("SOCKET")) { } else if (method.equalsIgnoreCase("SOCKET")) {
return FileLock.LOCK_SOCKET; return FileLock.LOCK_SOCKET;
} else if (method.equalsIgnoreCase("SERIALIZED")) {
return FileLock.LOCK_SERIALIZED;
} else { } else {
throw Message.getSQLException(ErrorCode.UNSUPPORTED_LOCK_METHOD_1, method); throw Message.getSQLException(ErrorCode.UNSUPPORTED_LOCK_METHOD_1, method);
} }
......
...@@ -63,6 +63,7 @@ public class PageStore implements CacheWriter { ...@@ -63,6 +63,7 @@ public class PageStore implements CacheWriter {
// synchronized correctly (on the index?) // synchronized correctly (on the index?)
// TODO two phase commit: append (not patch) commit & rollback // TODO two phase commit: append (not patch) commit & rollback
// TODO remove trace or use isDebugEnabled // TODO remove trace or use isDebugEnabled
// TODO recover tool: don't re-do uncommitted operations
/** /**
* The smallest possible page size. * The smallest possible page size.
......
...@@ -142,6 +142,17 @@ public class WriterThread extends Thread { ...@@ -142,6 +142,17 @@ public class WriterThread extends Thread {
if (Constants.FLUSH_INDEX_DELAY != 0) { if (Constants.FLUSH_INDEX_DELAY != 0) {
flushIndexes(database); flushIndexes(database);
} }
// checkpoint if required
try {
database.checkpointIfRequired();
} catch (SQLException e) {
TraceSystem traceSystem = database.getTraceSystem();
if (traceSystem != null) {
traceSystem.getTrace(Trace.LOG).error("reconnectCheckpoint", e);
}
}
LogSystem log = database.getLog(); LogSystem log = database.getLog();
if (log == null) { if (log == null) {
break; break;
...@@ -154,6 +165,7 @@ public class WriterThread extends Thread { ...@@ -154,6 +165,7 @@ public class WriterThread extends Thread {
traceSystem.getTrace(Trace.LOG).error("flush", e); traceSystem.getTrace(Trace.LOG).error("flush", e);
} }
} }
// TODO log writer: could also flush the dirty cache when there is // TODO log writer: could also flush the dirty cache when there is
// low activity // low activity
int wait = writeDelay; int wait = writeDelay;
......
...@@ -283,23 +283,15 @@ java org.h2.test.TestAll timer ...@@ -283,23 +283,15 @@ java org.h2.test.TestAll timer
/* /*
maybe close the database when a final static field is set to null? select 1 from dual a where 1 in(select 1 from dual b
where 1 in(select 1 from dual c where a.x=1));
use 127.0.0.1 if other addresses don't work error message on insert / merge: include SQL statement (at least table name)
isShutdown
PageStore.switchLogIfPossible() use 127.0.0.1 if other addresses don't work
drop table test;
create table test(id int);
select 1 from test where 'a'=1;
Fails: Oracle, PostgreSQL, H2
Works: MySQL, HSQLDB
select for update in mvcc mode: only lock the selected records? select for update in mvcc mode: only lock the selected records?
test case for daylight saving time enabled/move to a timezone (locking,...)
JCR: for each node type, create a table; one 'dynamic' table with parameter; JCR: for each node type, create a table; one 'dynamic' table with parameter;
option to cache the results option to cache the results
<link rel="icon" type="image/png" href="/path/image.png"> <link rel="icon" type="image/png" href="/path/image.png">
......
...@@ -375,8 +375,8 @@ public abstract class TestBase { ...@@ -375,8 +375,8 @@ public abstract class TestBase {
e.printStackTrace(); e.printStackTrace();
try { try {
TraceSystem ts = new TraceSystem(null, false); TraceSystem ts = new TraceSystem(null, false);
FileLock lock = new FileLock(ts, 1000); FileLock lock = new FileLock(ts, "error.lock", 1000);
lock.lock("error.lock", false); lock.lock(FileLock.LOCK_FILE);
FileWriter fw = new FileWriter("ERROR.txt", true); FileWriter fw = new FileWriter("ERROR.txt", true);
PrintWriter pw = new PrintWriter(fw); PrintWriter pw = new PrintWriter(fw);
e.printStackTrace(pw); e.printStackTrace(pw);
......
...@@ -62,9 +62,9 @@ public class TestFileLock extends TestBase implements Runnable { ...@@ -62,9 +62,9 @@ public class TestFileLock extends TestBase implements Runnable {
public void run() { public void run() {
while (!stop) { while (!stop) {
FileLock lock = new FileLock(new TraceSystem(null, false), 100); FileLock lock = new FileLock(new TraceSystem(null, false), FILE, 100);
try { try {
lock.lock(FILE, allowSockets); lock.lock(allowSockets ? FileLock.LOCK_SOCKET : FileLock.LOCK_FILE);
base.trace(lock + " locked"); base.trace(lock + " locked");
locks++; locks++;
if (locks > 1) { if (locks > 1) {
......
/*
* Copyright 2004-2009 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.test.unit;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.h2.test.TestBase;
/**
* Test the serialized (server-less) mode.
*/
public class TestFileLockSerialized extends TestBase {
/**
* Run just this test.
*
* @param a ignored
*/
public static void main(String[] a) throws Exception {
TestBase.createCaller().init().test();
}
public void test() throws Exception {
// TODO support long running queries
deleteDb("fileLockSerialized");
String url = "jdbc:h2:" + baseDir + "/fileLockSerialized";
String writeUrl = url + ";FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE";
// ;TRACE_LEVEL_SYSTEM_OUT=3
// String readUrl = writeUrl + ";ACCESS_MODE_LOG=R;ACCESS_MODE_DATA=R";
trace("create database");
Class.forName("org.h2.Driver");
Connection conn = DriverManager.getConnection(writeUrl, "sa", "sa");
Statement stat = conn.createStatement();
stat.execute("create table test(id int primary key)");
Connection conn3 = DriverManager.getConnection(writeUrl, "sa", "sa");
Statement stat3 = conn3.createStatement();
Connection conn2 = DriverManager.getConnection(writeUrl, "sa", "sa");
Statement stat2 = conn2.createStatement();
printResult(stat2, "select * from test");
stat2.execute("create local temporary table temp(name varchar)");
printResult(stat2, "select * from temp");
trace("insert row 1");
stat.execute("insert into test values(1)");
trace("insert row 2");
stat3.execute("insert into test values(2)");
printResult(stat2, "select * from test");
printResult(stat2, "select * from temp");
conn.close();
conn2.close();
conn3.close();
}
private void printResult(Statement stat, String sql) throws SQLException {
trace("query: " + sql);
ResultSet rs = stat.executeQuery(sql);
int rowCount = 0;
while (rs.next()) {
trace(" " + rs.getString(1));
rowCount++;
}
trace(" " + rowCount + " row(s)");
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论