提交 a847d39c authored 作者: christian.peter.io's avatar christian.peter.io

More bugs in the server-less multi-connection mode have been fixed:

90097 The database is read only, caches must be cleared on reconnect, etc. .
上级 c58cd967
......@@ -23,6 +23,8 @@ Change Log
</li><li>The documentation is no longer available in Japanese because the
translation was too much out of sync. Please use the Google translation instead.
</li><li>Certain queries were not sorted if subselect queries were involved
</li><li>More bugs in the server-less multi-connection mode have been fixed:
90097 The database is read only, caches must be cleared on reconnect, etc. .
</li></ul>
<h2>Version 1.2.121 (2009-10-11)</h2>
......
......@@ -71,6 +71,7 @@ public class CommandContainer extends Command {
prepared.checkParameters();
int updateCount = prepared.update();
prepared.trace(startTime, updateCount);
session.getDatabase().afterWriting();
return updateCount;
}
......
......@@ -42,6 +42,7 @@ public class CommandList extends Command {
public int update() throws SQLException {
int updateCount = command.executeUpdate();
executeRemaining();
session.getDatabase().afterWriting();
return updateCount;
}
......
......@@ -169,6 +169,9 @@ public class Database implements DataHandler {
private Properties reconnectLastLock;
private volatile long reconnectCheckNext;
private volatile boolean reconnectChangePending;
private volatile boolean allowedToRunCheckpoint;
private volatile boolean isCheckpointRunning;
private int cacheSize;
public Database(String name, ConnectionInfo ci, String cipher) throws SQLException {
......@@ -1801,6 +1804,21 @@ public class Database implements DataHandler {
}
}
/**
* Clears all caches.
*
* @throws SQLException
*/
public synchronized void clearCaches() throws SQLException {
if (fileData != null) {
fileData.getCache().clear();
fileIndex.getCache().clear();
}
if (pageStore != null) {
pageStore.getCache().clear();
}
}
public synchronized void setMasterUser(User user) throws SQLException {
addDatabaseObject(systemSession, user);
systemSession.commit(true);
......@@ -2365,13 +2383,23 @@ public class Database implements DataHandler {
if (fileLockMethod != FileLock.LOCK_SERIALIZED || readOnly || !reconnectChangePending || closing) {
return;
}
// To avoid race conditions, we already set it here
// (overlap with allowedToRunCheckpoint)
isCheckpointRunning = true;
if (!allowedToRunCheckpoint) {
isCheckpointRunning = false;
return;
}
long now = System.currentTimeMillis();
if (now > reconnectCheckNext + SysProperties.RECONNECT_CHECK_DELAY) {
synchronized (this) {
getTrace().debug("checkpoint");
getTrace().debug("checkpoint start");
flushIndexes(0);
checkpoint();
reconnectModified(false);
isCheckpointRunning = false;
getTrace().debug("checkpoint end");
}
}
}
......@@ -2430,12 +2458,34 @@ public class Database implements DataHandler {
*/
public boolean beforeWriting() {
if (fileLockMethod == FileLock.LOCK_SERIALIZED) {
return reconnectModified(true);
// To avoid race conditions, we already set it here (overlap with
// isCheckpointRunning)
allowedToRunCheckpoint = false;
while (isCheckpointRunning) {
try {
Thread.sleep(10+(int) Math.random() * 10);
} catch (Exception e) {
// ignore
}
}
boolean allowedToWrite = reconnectModified(true);
if (!allowedToWrite) {
// make sure the next call to isReconnectNeeded() returns yes
reconnectCheckNext = System.currentTimeMillis() - 1;
reconnectLastLock = null;
}
return allowedToWrite;
}
return true;
}
/**
* This method is called after updates are finished.
*/
public void afterWriting() {
allowedToRunCheckpoint = true;
}
/**
* Switch the database to read-only mode.
*
......
......@@ -1127,12 +1127,15 @@ public class Session extends SessionWithState {
}
}
public SessionInterface reconnect() throws SQLException {
public SessionInterface reconnect(boolean write) throws SQLException {
readSessionState();
close();
Session newSession = Engine.getInstance().getSession(connectionInfo);
newSession.sessionState = sessionState;
newSession.recreateSessionState();
database.clearCaches();
while (write && !database.beforeWriting()) {
}
return newSession;
}
......
......@@ -84,7 +84,8 @@ public interface SessionInterface {
/**
* Close the connection and open a new connection.
*
* @param write if the next operation may be writing
* @return the new connection
*/
SessionInterface reconnect() throws SQLException;
SessionInterface reconnect(boolean write) throws SQLException;
}
......@@ -653,7 +653,7 @@ public class SessionRemote extends SessionWithState implements SessionFactory, D
return false;
}
public SessionInterface reconnect() {
public SessionInterface reconnect(boolean write) {
return this;
}
......
......@@ -1329,7 +1329,7 @@ public class JdbcConnection extends TraceObject implements Connection {
}
if (session.isReconnectNeeded(write)) {
trace.debug("reconnect");
session = session.reconnect();
session = session.reconnect(write);
setTrace(session.getTrace());
}
}
......
......@@ -33,6 +33,16 @@ public class TestFileLockSerialized extends TestBase {
}
public void test() throws Exception {
println("testCache()");
testCache();
println("testBigDatabase(false)");
testBigDatabase(false);
println("testBigDatabase(true)");
testBigDatabase(true);
println("testCheckpointInUpdateRaceCondition");
testCheckpointInUpdateRaceCondition();
println("testConcurrentUpdates");
testConcurrentUpdates();
println("testThreeMostlyReaders true");
testThreeMostlyReaders(true);
println("testThreeMostlyReaders false");
......@@ -50,28 +60,24 @@ public class TestFileLockSerialized extends TestBase {
}
private void testThreeMostlyReaders(final boolean write) throws Exception {
boolean longRun = false;
deleteDb("fileLockSerialized");
String url = "jdbc:h2:" + baseDir + "/fileLockSerialized;FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE";
int len = 3;
final String url = "jdbc:h2:" + baseDir + "/fileLockSerialized;FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE";
Connection c = DriverManager.getConnection(url);
c.createStatement().execute("create table test(id int) as select 1");
c.close();
final int len = 10;
final Exception[] ex = new Exception[1];
final Connection[] connList = new Connection[len];
final boolean[] stop = new boolean[1];
Thread[] threads = new Thread[len];
for (int i = 0; i < len; i++) {
Connection conn;
try {
conn = DriverManager.getConnection(url);
} catch (SQLException e) {
throw new Exception("Can not connect " + len, e);
}
final Connection c = conn;
connList[i] = c;
if (i == 0) {
connList[i].createStatement().execute("create table test(id int) as select 1");
}
Thread t = new Thread(new Runnable() {
public void run() {
try {
Connection c = DriverManager.getConnection(url);
PreparedStatement p = c.prepareStatement("select * from test where id = ?");
while (!stop[0]) {
Thread.sleep(100);
......@@ -93,7 +99,11 @@ public class TestFileLockSerialized extends TestBase {
t.start();
threads[i] = t;
}
Thread.sleep(400);
if (longRun) {
Thread.sleep(40000);
} else {
Thread.sleep(1000);
}
stop[0] = true;
for (int i = 0; i < len; i++) {
threads[i].join();
......@@ -251,4 +261,212 @@ public class TestFileLockSerialized extends TestBase {
}
trace(" " + rowCount + " row(s)");
}
private void testConcurrentUpdates() throws Exception {
boolean longRun = false;
if (longRun) {
for (int waitTime = 100; waitTime < 10000; waitTime += 20) {
for (int howManyThreads = 1; howManyThreads < 10; howManyThreads++) {
testConcurrentUpdates(waitTime, howManyThreads, waitTime * howManyThreads * 10);
}
}
} else {
testConcurrentUpdates(100, 4, 2000);
}
}
private void testConcurrentUpdates(final int waitTime, int howManyThreads, int runTime) throws Exception {
println("testConcurrentUpdates waitTime: " + waitTime + " howManyThreads: " + howManyThreads + " runTime: " + runTime);
deleteDb("fileLockSerialized");
final String url = "jdbc:h2:" + baseDir + "/fileLockSerialized;FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE;" +
"AUTO_RECONNECT=TRUE;MAX_LENGTH_INPLACE_LOB=8192;COMPRESS_LOB=DEFLATE;LOG=2;CACHE_SIZE=65536";
Connection c = DriverManager.getConnection(url);
c.createStatement().execute("create table test(id int)");
c.createStatement().execute("insert into test values(1)");
c.close();
final long endTime = System.currentTimeMillis() + runTime;
final Exception[] ex = new Exception[1];
final Connection[] connList = new Connection[howManyThreads];
final boolean[] stop = new boolean[1];
final int[] lastInt = { 1 };
Thread[] threads = new Thread[howManyThreads];
for (int i = 0; i < howManyThreads; i++) {
final int finalNrOfConnection = i;
Thread t = new Thread(new Runnable() {
public void run() {
try {
Connection c = DriverManager.getConnection(url);
connList[finalNrOfConnection] = c;
while (!stop[0]) {
ResultSet rs = c.createStatement().executeQuery("select * from test");
rs.next();
if (rs.getInt(1) != lastInt[0]) {
throw new Exception(finalNrOfConnection + " Expected: " + lastInt[0] + " got " + rs.getInt(1));
}
Thread.sleep(waitTime);
if (Math.random() > 0.7) {
int newLastInt = (int) (Math.random() * 1000);
c.createStatement().execute("update test set id = " + newLastInt);
lastInt[0] = newLastInt;
}
}
c.close();
} catch (Exception e) {
e.printStackTrace();
ex[0] = e;
}
}
});
t.start();
threads[i] = t;
}
while ((ex[0] == null) && (System.currentTimeMillis() < endTime)) {
Thread.sleep(10);
}
stop[0] = true;
for (int i = 0; i < howManyThreads; i++) {
threads[i].join();
}
if (ex[0] != null) {
throw ex[0];
}
DriverManager.getConnection(url).close();
deleteDb("fileLockSerialized");
}
/**
* If a checkpoint occurs between beforeWriting and checkWritingAllowed
* then the result of checkWritingAllowed is READ_ONLY, which is wrong.
*
* Also, if a checkpoint started before beforeWriting, and ends between
* between beforeWriting and checkWritingAllowed, then the same error occurs.
*
* @throws Exception
*/
private void testCheckpointInUpdateRaceCondition() throws Exception {
boolean longRun = false;
deleteDb("fileLockSerialized");
String url = "jdbc:h2:" + baseDir + "/fileLockSerialized;FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE";
Connection conn = DriverManager.getConnection(url);
conn.createStatement().execute("create table test(id int)");
conn.createStatement().execute("insert into test values(1)");
for (int i = 0; i < (longRun ? 10000 : 5); i++) {
Thread.sleep(402);
conn.createStatement().execute("update test set id = " + i);
}
conn.close();
deleteDb("fileLockSerialized");
}
/**
* Caches must be cleared. Session.reconnect only closes the DiskFile (which is
* associated with the cache) if there is one session
*
* @throws Exception
*/
private void testCache() throws Exception {
deleteDb("fileLockSerialized");
String urlShared = "jdbc:h2:" + baseDir + "/fileLockSerialized;FILE_LOCK=SERIALIZED";
String urlForNew = urlShared + ";OPEN_NEW=TRUE";
Connection connShared1 = DriverManager.getConnection(urlShared);
Connection connShared2 = DriverManager.getConnection(urlShared);
Connection connNew = DriverManager.getConnection(urlForNew);
connShared1.createStatement().execute("create table test(id int)");
connShared1.createStatement().execute("insert into test values(1)");
ResultSet rs = connShared1.createStatement().executeQuery("select id from test");
rs.close();
connNew.createStatement().execute("update test set id=2");
Thread.sleep(500);
rs = connShared1.createStatement().executeQuery("select id from test");
assertTrue(rs.next());
assertEquals(2, rs.getInt(1));
rs.close();
connShared1.close();
connShared2.close();
connNew.close();
deleteDb("fileLockSerialized");
}
private void testBigDatabase(boolean withCache) throws Exception {
boolean longRun = false;
final int howMuchRows = longRun ? 2000000 : 500000;
deleteDb("fileLockSerialized");
int cacheSizeKb = withCache ? 5000 : 0;
final String url = "jdbc:h2:" + baseDir + "/fileLockSerialized;FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE;CACHE_SIZE=" + cacheSizeKb;
final boolean[] importFinished = { false };
final boolean[] updateFinished = { false };
final boolean[] testFinished = { false };
final Exception[] ex = new Exception[1];
new Thread() {
public void run() {
try {
Connection conn = DriverManager.getConnection(url);
Statement stat = conn.createStatement();
stat.execute("create table test(id int, id2 int)");
for (int i = 0; i < howMuchRows; i++) {
stat.execute("insert into test values(" + i + ", " + i + ")");
}
importFinished[0] = true;
Thread.sleep(5000);
stat.execute("update test set id2=999 where id=500");
updateFinished[0] = true;
conn.close();
} catch (Exception e) {
e.printStackTrace();
ex[0] = e;
}
}
}.start();
new Thread() {
public void run() {
try {
Connection conn = DriverManager.getConnection(url);
Statement stat = conn.createStatement();
while (!importFinished[0]) {
Thread.sleep(100);
}
Thread.sleep(1000);
ResultSet rs = stat.executeQuery("select id2 from test where id=500");
assertTrue(rs.next());
assertEquals(500, rs.getInt(1));
rs.close();
while (!updateFinished[0]) {
Thread.sleep(100);
}
Thread.sleep(1000);
rs = stat.executeQuery("select id2 from test where id=500");
assertTrue(rs.next());
assertEquals(999, rs.getInt(1));
rs.close();
conn.close();
testFinished[0] = true;
} catch (Exception e) {
e.printStackTrace();
ex[0] = e;
}
}
}.start();
while ((ex[0] == null) && (!testFinished[0])) {
Thread.sleep(10);
}
if (ex[0] != null) {
throw ex[0];
}
deleteDb("fileLockSerialized");
}
}
......@@ -617,4 +617,4 @@ scrambling distinguish official unofficial distinguishable overwrites lastval
notranslate vince bonfanti alphabetically sysdummy sysibm activation
deactivation concatenating reproducing black railroads railroad radius moz
imageio argb bilinear rendering stroke interpolation flip diagrams draw
delim
\ No newline at end of file
delim overlap subselect
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论