提交 8ac34844 authored 作者: Thomas Mueller's avatar Thomas Mueller

Rename CallThread to Task.

上级 71d41cb0
......@@ -21,7 +21,7 @@ import org.h2.constant.ErrorCode;
import org.h2.engine.Constants;
import org.h2.message.DbException;
import org.h2.message.TraceObject;
import org.h2.util.CallThread;
import org.h2.util.Task;
import org.h2.util.IOUtils;
import org.h2.value.Value;
......@@ -192,22 +192,22 @@ public class JdbcBlob extends TraceObject implements Blob {
}
final JdbcConnection c = conn;
final PipedInputStream in = new PipedInputStream();
final CallThread<Value> call = new CallThread<Value>() {
public Value call() {
return c.createBlob(in, -1);
final Task task = new Task() {
public void call() {
value = c.createBlob(in, -1);
}
};
PipedOutputStream out = new PipedOutputStream(in) {
public void close() throws IOException {
super.close();
try {
value = call.get();
task.get();
} catch (Exception e) {
throw DbException.convertToIOException(e);
}
}
};
call.execute();
task.execute();
return new BufferedOutputStream(out);
} catch (Exception e) {
throw logAndConvert(e);
......
......@@ -15,13 +15,15 @@ import java.io.Reader;
import java.io.StringReader;
import java.io.Writer;
import java.sql.Clob;
//## Java 1.6 begin ##
import java.sql.NClob;
//## Java 1.6 end ##
import java.sql.SQLException;
import org.h2.constant.ErrorCode;
import org.h2.engine.Constants;
import org.h2.message.DbException;
import org.h2.message.TraceObject;
import org.h2.util.CallThread;
import org.h2.util.Task;
import org.h2.util.IOUtils;
import org.h2.value.Value;
......@@ -149,22 +151,22 @@ public class JdbcClob extends TraceObject implements Clob
// than PipedInputStream / PipedOutputStream
// (Sun/Oracle Java 1.6.0_20)
final PipedInputStream in = new PipedInputStream();
final CallThread<Value> call = new CallThread<Value>() {
public Value call() {
return c.createClob(IOUtils.getReader(in), -1);
final Task task = new Task() {
public void call() {
value = c.createClob(IOUtils.getReader(in), -1);
}
};
PipedOutputStream out = new PipedOutputStream(in) {
public void close() throws IOException {
super.close();
try {
value = call.get();
task.get();
} catch (Exception e) {
throw DbException.convertToIOException(e);
}
}
};
call.execute();
task.execute();
return IOUtils.getBufferedWriter(out);
} catch (Exception e) {
throw logAndConvert(e);
......
......@@ -216,23 +216,19 @@ public class SourceCompiler {
}
private void copyInThread(final InputStream in, final OutputStream out) {
new Thread() {
public void run() {
try {
while (true) {
int x = in.read();
if (x < 0) {
return;
}
if (out != null) {
out.write(x);
}
new Task() {
public void call() throws IOException {
while (true) {
int x = in.read();
if (x < 0) {
return;
}
if (out != null) {
out.write(x);
}
} catch (Exception e) {
throw DbException.convert(e);
}
}
} .start();
}.execute();
}
private void javacSun(File javaFile) {
......
......@@ -9,18 +9,22 @@ package org.h2.util;
/**
* A method call that is executed in a separate thread. If the method throws an
* exception, it is wrapped in a RuntimeException.
*
* @param <R> the return value
*/
public abstract class CallThread<R> extends Thread {
public abstract class Task implements Runnable {
/**
* A flag indicating the get() method has been called.
*/
protected volatile boolean stop;
/**
* The result, if any.
*/
protected Object result;
private Thread thread;
private Exception ex;
private R result;
/**
* The method to be implemented.
......@@ -28,11 +32,11 @@ public abstract class CallThread<R> extends Thread {
* @return the value, or null
* @throws Exception any exception is wrapped in a RuntimeException
*/
public abstract R call() throws Exception;
public abstract void call() throws Exception;
public void run() {
try {
result = call();
call();
} catch (Exception e) {
this.ex = e;
}
......@@ -43,30 +47,44 @@ public abstract class CallThread<R> extends Thread {
*
* @return this
*/
public CallThread<R> execute() {
setDaemon(true);
setName(getClass().getName());
start();
public Task execute() {
thread = new Thread(this);
thread.setDaemon(true);
thread.setName(getClass().getName());
thread.start();
return this;
}
/**
* Calling this method will set the stop flag and wait until the thread is stopped.
*
* @return the return value, or null
* @return the result, or null
* @throws RuntimeException if an exception in the method call occurs
*/
public R get() {
public Object get() {
Exception e = getException();
if (e != null) {
throw new RuntimeException(e);
}
return result;
}
/**
* Get the exception that was thrown in the call (if any).
*
* @return the exception or null
*/
public Exception getException() {
stop = true;
try {
join();
thread.join();
} catch (InterruptedException e) {
// ignore
}
if (ex != null) {
throw new RuntimeException(ex);
return ex;
}
return result;
return null;
}
}
......@@ -14,7 +14,7 @@ import org.h2.test.utils.SelfDestructor;
/**
* A task that can be run as a separate process.
*/
public abstract class Task {
public abstract class TaskDef {
/**
* Run the class. This method is called by the task framework, and should
......@@ -24,10 +24,10 @@ public abstract class Task {
*/
public static void main(String... args) {
SelfDestructor.startCountdown(60);
Task task;
TaskDef task;
try {
String className = args[0];
task = (Task) Class.forName(className).newInstance();
task = (TaskDef) Class.forName(className).newInstance();
System.out.println("running");
} catch (Throwable t) {
System.out.println("init error: " + t);
......
......@@ -17,6 +17,7 @@ import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Arrays;
import org.h2.test.utils.SelfDestructor;
import org.h2.util.Task;
import org.h2.util.StringUtils;
/**
......@@ -25,7 +26,7 @@ import org.h2.util.StringUtils;
* process is directly send to the standard error stream of this process.
*/
public class TaskProcess {
private final Task taskDef;
private final TaskDef taskDef;
private Process process;
private BufferedReader reader;
private BufferedWriter writer;
......@@ -35,7 +36,7 @@ public class TaskProcess {
*
* @param taskDef the task
*/
public TaskProcess(Task taskDef) {
public TaskProcess(TaskDef taskDef) {
this.taskDef = taskDef;
}
......@@ -52,7 +53,7 @@ public class TaskProcess {
list.add(selfDestruct);
list.add("-cp");
list.add("bin" + File.pathSeparator + ".");
list.add(Task.class.getName());
list.add(TaskDef.class.getName());
list.add(taskDef.getClass().getName());
if (args != null && args.length > 0) {
list.addAll(Arrays.asList(args));
......@@ -78,23 +79,19 @@ public class TaskProcess {
}
private void copyInThread(final InputStream in, final OutputStream out) {
new Thread() {
public void run() {
try {
while (true) {
int x = in.read();
if (x < 0) {
return;
}
if (out != null) {
out.write(x);
}
new Task() {
public void call() throws IOException {
while (true) {
int x = in.read();
if (x < 0) {
return;
}
if (out != null) {
out.write(x);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
} .start();
}.execute();
}
/**
......
......@@ -9,8 +9,10 @@ package org.h2.test.db;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* Test for the exclusive mode.
......@@ -43,30 +45,19 @@ public class TestExclusive extends TestBase {
Connection conn2 = getConnection("exclusive");
final Statement stat2 = conn2.createStatement();
stat.execute("set exclusive true");
final int[] state = { 0 };
Thread t = new Thread() {
public void run() {
try {
stat2.execute("select * from dual");
if (state[0] != 1) {
new Error("unexpected state: " + state[0]).printStackTrace();
}
state[0] = 2;
} catch (Exception e) {
e.printStackTrace();
final AtomicInteger state = new AtomicInteger(0);
Task task = new Task() {
public void call() throws SQLException {
stat2.execute("select * from dual");
if (state.get() != 1) {
new Error("unexpected state: " + state.get()).printStackTrace();
}
}
};
t.start();
state[0] = 1;
task.execute();
state.set(1);
stat.execute("set exclusive false");
for (int i = 0; i < 20; i++) {
Thread.sleep(100);
if (state[0] == 2) {
break;
}
}
assertEquals(2, state[0]);
task.get();
stat.execute("set exclusive true");
conn.close();
......
......@@ -17,6 +17,7 @@ import java.util.StringTokenizer;
import org.h2.fulltext.FullText;
import org.h2.store.fs.FileSystem;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* Fulltext search tests.
......@@ -109,10 +110,8 @@ public class TestFullText extends TestBase {
final String prefix = lucene ? "FTL" : "FT";
trace("Testing multithreaded " + prefix);
deleteDb("fullText");
final boolean[] stop = { false };
final Exception[] exception = { null };
int len = 2;
Thread[] threads = new Thread[len];
Task[] task = new Task[len];
for (int i = 0; i < len; i++) {
// final Connection conn =
// getConnection("fullText;MULTI_THREADED=1;LOCK_TIMEOUT=10000");
......@@ -126,59 +125,49 @@ public class TestFullText extends TestBase {
final String tableName = "TEST" + i;
stat.execute("CREATE TABLE " + tableName + "(ID INT PRIMARY KEY, DATA VARCHAR)");
stat.execute("CALL " + prefix + "_CREATE_INDEX('PUBLIC', '" + tableName + "', NULL)");
threads[i] = new Thread() {
public void run() {
task[i] = new Task() {
public void call() throws SQLException {
trace("starting thread " + Thread.currentThread());
try {
PreparedStatement prep = conn.prepareStatement("INSERT INTO " + tableName + " VALUES(?, ?)");
Statement stat = conn.createStatement();
Random random = new Random();
int x = 0;
while (!stop[0]) {
trace("stop[0] = " + stop[0] + " for " + Thread.currentThread());
StringBuilder buff = new StringBuilder();
for (int j = 0; j < 1000; j++) {
buff.append(" ").append(random.nextInt(10000));
buff.append(" x").append(j);
buff.append(" ").append(KNOWN_WORDS[j % KNOWN_WORDS.length]);
}
prep.setInt(1, x);
prep.setString(2, buff.toString());
prep.execute();
x++;
for (String knownWord : KNOWN_WORDS) {
trace("searching for " + knownWord + " with " + Thread.currentThread());
ResultSet rs = stat.executeQuery("SELECT * FROM " + prefix + "_SEARCH('" + knownWord + "', 0, 0)");
assertTrue(rs.next());
}
PreparedStatement prep = conn.prepareStatement("INSERT INTO " + tableName + " VALUES(?, ?)");
Statement stat = conn.createStatement();
Random random = new Random();
int x = 0;
while (!stop) {
trace("stop = " + stop + " for " + Thread.currentThread());
StringBuilder buff = new StringBuilder();
for (int j = 0; j < 1000; j++) {
buff.append(" ").append(random.nextInt(10000));
buff.append(" x").append(j);
buff.append(" ").append(KNOWN_WORDS[j % KNOWN_WORDS.length]);
}
prep.setInt(1, x);
prep.setString(2, buff.toString());
prep.execute();
x++;
for (String knownWord : KNOWN_WORDS) {
trace("searching for " + knownWord + " with " + Thread.currentThread());
ResultSet rs = stat.executeQuery("SELECT * FROM " + prefix + "_SEARCH('" + knownWord + "', 0, 0)");
assertTrue(rs.next());
}
trace("closing connection");
conn.close();
} catch (SQLException e) {
exception[0] = e;
} finally {
trace("completed thread " + Thread.currentThread());
}
trace("closing connection");
conn.close();
trace("completed thread " + Thread.currentThread());
}
};
}
for (Thread t : threads) {
t.setDaemon(true);
t.start();
for (Task t : task) {
t.execute();
}
trace("sleeping");
Thread.sleep(1000);
trace("setting stop to true");
stop[0] = true;
for (Thread t : threads) {
for (Task t : task) {
trace("joining " + t);
t.join();
t.get();
trace("done joining " + t);
}
if (exception[0] != null) {
throw exception[0];
}
}
private void testStreamLob() throws SQLException {
......
......@@ -13,6 +13,7 @@ import java.sql.Statement;
import org.h2.api.DatabaseEventListener;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* Multi-connection tests.
......@@ -45,21 +46,14 @@ public class TestMultiConn extends TestBase implements DatabaseEventListener {
stat1.execute("CREATE ALIAS SLEEP FOR \"java.lang.Thread.sleep(long)\"");
final Statement stat2 = conn2.createStatement();
stat1.execute("SET THROTTLE 100");
new Thread() {
public void run() {
try {
stat2.executeQuery("CALL SLEEP(100)");
try {
Thread.sleep(10);
} catch (Exception e) {
// ignore
}
stat2.executeQuery("CALL SLEEP(100)");
} catch (SQLException e) {
// ignore
}
Task t = new Task() {
public void call() throws Exception {
stat2.executeQuery("CALL SLEEP(100)");
Thread.sleep(10);
stat2.executeQuery("CALL SLEEP(100)");
}
} .start();
};
t.execute();
Thread.sleep(50);
stat1.execute("SHUTDOWN");
conn1.close();
......@@ -68,6 +62,7 @@ public class TestMultiConn extends TestBase implements DatabaseEventListener {
} catch (SQLException e) {
// ignore
}
t.get();
}
private void testThreeThreads() throws Exception {
......
......@@ -14,6 +14,7 @@ import java.sql.Statement;
import java.util.Random;
import org.h2.test.TestAll;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* Multi-threaded tests.
......@@ -61,30 +62,22 @@ public class TestMultiThread extends TestBase implements Runnable {
Connection conn = getConnection(url);
Statement stat = conn.createStatement();
stat.execute("create table test(id bigint primary key) as select x from system_range(1, 1000)");
final Exception[] ex = new Exception[1];
Thread t = new Thread() {
public void run() {
try {
Connection conn2;
conn2 = getConnection(url);
for (int i = 0; i < 1000; i++) {
conn2.createStatement().execute("analyze");
}
conn2.close();
} catch (Exception e) {
ex[0] = e;
Task t = new Task() {
public void call() throws SQLException {
Connection conn2;
conn2 = getConnection(url);
for (int i = 0; i < 1000; i++) {
conn2.createStatement().execute("analyze");
}
conn2.close();
}
};
t.start();
t.execute();
Thread.yield();
for (int i = 0; i < 1000; i++) {
conn.createStatement().execute("analyze");
}
t.join();
if (ex[0] != null) {
throw ex[0];
}
t.get();
stat.execute("drop table test");
conn.close();
deleteDb("concurrentAnalyze");
......
......@@ -9,12 +9,14 @@ package org.h2.test.db;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Random;
import org.h2.test.TestBase;
import org.h2.util.JdbcUtils;
import org.h2.util.New;
import org.h2.util.Task;
/**
* A multi-threaded test case.
......@@ -90,10 +92,9 @@ public class TestMultiThreadedKernel extends TestBase {
}
private void testConcurrentRead() throws Exception {
ArrayList<Thread> list = New.arrayList();
ArrayList<Task> list = New.arrayList();
int size = 2;
final int count = 1000;
final boolean[] stopped = { false };
String url = getURL("multiThreadedKernel;MULTI_THREADED=TRUE;CACHE_SIZE=16", true);
for (int i = 0; i < size; i++) {
final Connection conn = DriverManager.getConnection(url, getUser(), getPassword());
......@@ -104,35 +105,29 @@ public class TestMultiThreadedKernel extends TestBase {
+ "as select x, x || space(10) from system_range(1, " + count + ")");
}
final Random random = new Random(i);
Thread t = new Thread() {
public void run() {
try {
PreparedStatement prep = conn.prepareStatement(
"select * from test where id = ?");
while (!stopped[0]) {
prep.setInt(1, random.nextInt(count));
prep.execute();
}
} catch (Exception e) {
e.printStackTrace();
Task t = new Task() {
public void call() throws Exception {
PreparedStatement prep = conn.prepareStatement(
"select * from test where id = ?");
while (!stop) {
prep.setInt(1, random.nextInt(count));
prep.execute();
}
}
};
t.start();
t.execute();
list.add(t);
}
Thread.sleep(1000);
stopped[0] = true;
for (Thread t : list) {
t.join();
for (Task t : list) {
t.get();
}
}
private void testCache() throws Exception {
ArrayList<Thread> list = New.arrayList();
ArrayList<Task> list = New.arrayList();
int size = 3;
final int count = 100;
final boolean[] stopped = { false };
String url = getURL("multiThreadedKernel;MULTI_THREADED=TRUE;CACHE_SIZE=1", true);
for (int i = 0; i < size; i++) {
final Connection conn = DriverManager.getConnection(url, getUser(), getPassword());
......@@ -143,27 +138,22 @@ public class TestMultiThreadedKernel extends TestBase {
+ "as select x, space(3000) from system_range(1, " + count + ")");
}
final Random random = new Random(i);
Thread t = new Thread() {
public void run() {
try {
PreparedStatement prep = conn.prepareStatement(
"select * from test where id = ?");
while (!stopped[0]) {
prep.setInt(1, random.nextInt(count));
prep.execute();
}
} catch (Exception e) {
e.printStackTrace();
Task t = new Task() {
public void call() throws SQLException {
PreparedStatement prep = conn.prepareStatement(
"select * from test where id = ?");
while (!stop) {
prep.setInt(1, random.nextInt(count));
prep.execute();
}
}
};
t.start();
t.execute();
list.add(t);
}
Thread.sleep(1000);
stopped[0] = true;
for (Thread t : list) {
t.join();
for (Task t : list) {
t.get();
}
}
......
......@@ -16,6 +16,7 @@ import java.sql.Statement;
import org.h2.api.DatabaseEventListener;
import org.h2.test.TestBase;
import org.h2.tools.Restore;
import org.h2.util.Task;
import org.h2.util.IOUtils;
/**
......@@ -139,30 +140,26 @@ public class TestOpenClose extends TestBase implements DatabaseEventListener {
// but for Ubuntu, the default ulimit is 1024,
// which breaks the test
int len = getSize(10, 50);
Thread[] threads = new Thread[len];
Task[] tasks = new Task[len];
for (int i = 0; i < len; i++) {
threads[i] = new Thread() {
public void run() {
try {
Connection c = DriverManager.getConnection(url, user, password);
PreparedStatement prep = c.prepareStatement("insert into employee values(?, ?, 0)");
int id = getNextId();
prep.setInt(1, id);
prep.setString(2, "employee " + id);
prep.execute();
c.close();
} catch (Throwable e) {
TestBase.logError("insert", e);
}
tasks[i] = new Task() {
public void call() throws SQLException {
Connection c = DriverManager.getConnection(url, user, password);
PreparedStatement prep = c.prepareStatement("insert into employee values(?, ?, 0)");
int id = getNextId();
prep.setInt(1, id);
prep.setString(2, "employee " + id);
prep.execute();
c.close();
}
};
threads[i].start();
tasks[i].execute();
}
// for(int i=0; i<len; i++) {
// threads[i].start();
// }
for (int i = 0; i < len; i++) {
threads[i].join();
tasks[i].get();
}
conn = DriverManager.getConnection(url, user, password);
ResultSet rs = conn.createStatement().executeQuery("select count(*) from employee");
......
......@@ -16,6 +16,7 @@ import org.h2.constant.ErrorCode;
import org.h2.test.TestBase;
import org.h2.tools.ChangeFileEncryption;
import org.h2.tools.Recover;
import org.h2.util.Task;
import org.h2.util.IOUtils;
/**
......@@ -52,46 +53,32 @@ public class TestRunscript extends TestBase implements Trigger {
// need to wait a bit (throttle is only used every 50 ms)
Thread.sleep(200);
final String dir = getBaseDir();
final SQLException[] ex = new SQLException[1];
Thread thread;
SQLException e;
thread = new Thread() {
public void run() {
try {
stat.execute("script simple drop to '"+dir+"/backup2.sql'");
} catch (SQLException e) {
ex[0] = e;
}
Task task;
task = new Task() {
public void call() throws SQLException {
stat.execute("script simple drop to '"+dir+"/backup2.sql'");
}
};
thread.start();
task.execute();
Thread.sleep(200);
stat.cancel();
thread.join();
e = ex[0];
SQLException e = (SQLException) task.getException();
assertTrue(e != null);
assertEquals(ErrorCode.STATEMENT_WAS_CANCELED, e.getErrorCode());
ex[0] = null;
stat.execute("set throttle 1000");
// need to wait a bit (throttle is only used every 50 ms)
Thread.sleep(100);
thread = new Thread() {
public void run() {
try {
stat.execute("runscript from '"+dir+"/backup.sql'");
} catch (SQLException e) {
ex[0] = e;
}
task = new Task() {
public void call() throws SQLException {
stat.execute("runscript from '"+dir+"/backup.sql'");
}
};
thread.start();
task.execute();
Thread.sleep(100);
stat.cancel();
thread.join();
e = ex[0];
assertTrue(e != null);
e = (SQLException) task.getException();
assertEquals(ErrorCode.STATEMENT_WAS_CANCELED, e.getErrorCode());
conn.close();
......
......@@ -23,6 +23,7 @@ import java.util.UUID;
import org.h2.api.Trigger;
import org.h2.constant.ErrorCode;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* Tests for the PreparedStatement implementation.
......@@ -272,21 +273,15 @@ public class TestPreparedStatement extends TestBase {
final PreparedStatement prep = conn.prepareStatement("SELECT SLEEP(?) FROM SYSTEM_RANGE(1, 10000) LIMIT ?");
prep.setInt(1, 1);
prep.setInt(2, 10000);
final SQLException[] ex = new SQLException[1];
Thread t = new Thread() {
public void run() {
try {
prep.execute();
} catch (SQLException e) {
ex[0] = e;
}
Task t = new Task() {
public void call() throws SQLException {
prep.execute();
}
};
t.start();
t.execute();
Thread.sleep(100);
prep.cancel();
t.join();
SQLException e = ex[0];
SQLException e = (SQLException) t.getException();
assertTrue(e != null);
assertEquals(ErrorCode.STATEMENT_WAS_CANCELED, e.getErrorCode());
prep.setInt(1, 1);
......
......@@ -16,6 +16,7 @@ import java.sql.Statement;
import org.h2.jdbcx.JdbcConnectionPool;
import org.h2.jdbcx.JdbcDataSource;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* This class tests the JdbcConnectionPool.
......@@ -48,17 +49,16 @@ public class TestConnectionPool extends TestBase {
man.setLoginTimeout(1);
man.setMaxConnections(2);
Connection conn = man.getConnection();
final boolean[] stop = { false };
Thread t = new Thread() {
public void run() {
while (!stop[0]) {
Task t = new Task() {
public void call() {
while (!stop) {
// this calls notifyAll
man.setMaxConnections(1);
man.setMaxConnections(2);
}
}
};
t.start();
t.execute();
long time = System.currentTimeMillis();
try {
man.getConnection();
......@@ -69,8 +69,7 @@ public class TestConnectionPool extends TestBase {
assertTrue("timeout after " + time + " ms", time > 1000);
} finally {
conn.close();
stop[0] = true;
t.join();
t.get();
}
man.dispose();
......
......@@ -10,8 +10,10 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicBoolean;
import org.h2.constant.ErrorCode;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* Additional MVCC (multi version concurrency) test cases.
......@@ -59,33 +61,29 @@ public class TestMvcc2 extends TestBase {
stat2.execute("set lock_timeout 1000");
stat.execute("create table test(id int primary key, name varchar)");
conn.setAutoCommit(false);
final boolean[] committed = { false };
final SQLException[] ex = { null };
Thread t = new Thread() {
public void run() {
final AtomicBoolean committed = new AtomicBoolean(false);
Task t = new Task() {
public void call() throws SQLException {
try {
//System.out.println("insert2 hallo");
stat2.execute("insert into test values(0, 'Hallo')");
//System.out.println("insert2 hallo done");
} catch (SQLException e) {
//System.out.println("insert2 hallo e " + e);
if (!committed[0]) {
ex[0] = e;
if (!committed.get()) {
throw e;
}
}
}
};
//System.out.println("insert hello");
stat.execute("insert into test values(0, 'Hello')");
t.start();
t.execute();
Thread.sleep(500);
//System.out.println("insert hello commit");
committed[0] = true;
committed.set(true);
conn.commit();
t.join();
if (ex[0] != null) {
throw ex[0];
}
t.get();
ResultSet rs;
rs = stat.executeQuery("select name from test");
rs.next();
......@@ -104,24 +102,16 @@ public class TestMvcc2 extends TestBase {
stat.execute("create table test(id int primary key, name varchar)");
stat.execute("insert into test values(0, 'Hello')");
conn.setAutoCommit(false);
final SQLException[] ex = { null };
Thread t = new Thread() {
public void run() {
try {
stat2.execute("update test set name = 'Hallo'");
} catch (SQLException e) {
ex[0] = e;
}
Task t = new Task() {
public void call() throws SQLException {
stat2.execute("update test set name = 'Hallo'");
}
};
stat.execute("update test set name = 'Hi'");
t.start();
t.execute();
Thread.sleep(500);
conn.commit();
t.join();
if (ex[0] != null) {
throw ex[0];
}
t.get();
ResultSet rs;
rs = stat.executeQuery("select name from test");
rs.next();
......
......@@ -12,6 +12,7 @@ import java.sql.SQLException;
import java.util.concurrent.CountDownLatch;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* Multi-threaded MVCC (multi version concurrency) test cases.
......@@ -44,33 +45,25 @@ public class TestMvccMultiThreaded extends TestBase {
}
Connection conn = connList[0];
conn.createStatement().execute("create table test(id int primary key, name varchar)");
final SQLException[] ex = { null };
Thread[] threads = new Thread[len];
Task[] tasks = new Task[len];
final boolean[] stop = { false };
for (int i = 0; i < len; i++) {
final Connection c = connList[i];
c.setAutoCommit(false);
threads[i] = new Thread() {
public void run() {
while (!stop[0]) {
try {
c.createStatement().execute("merge into test values(1, 'x')");
c.commit();
} catch (SQLException e) {
ex[0] = e;
}
tasks[i] = new Task() {
public void call() throws SQLException {
while (!stop) {
c.createStatement().execute("merge into test values(1, 'x')");
c.commit();
}
}
};
threads[i].start();
tasks[i].execute();
}
Thread.sleep(1000);
stop[0] = true;
for (int i = 0; i < len; i++) {
threads[i].join();
}
if (ex[0] != null) {
throw ex[0];
tasks[i].get();
}
for (int i = 0; i < len; i++) {
connList[i].close();
......@@ -88,35 +81,26 @@ public class TestMvccMultiThreaded extends TestBase {
Connection conn = connList[0];
conn.createStatement().execute("create table test(id int primary key, value int)");
conn.createStatement().execute("insert into test values(0, 0)");
final Exception[] ex = { null };
final int count = 1000;
Thread[] threads = new Thread[len];
Task[] tasks = new Task[len];
final CountDownLatch latch = new CountDownLatch(len);
for (int i = 0; i < len; i++) {
final int x = i;
threads[i] = new Thread() {
public void run() {
tasks[i] = new Task() {
public void call() throws Exception {
for (int a = 0; a < count; a++) {
try {
connList[x].createStatement().execute("update test set value=value+1");
latch.countDown();
latch.await();
} catch (Exception e) {
ex[0] = e;
break;
}
connList[x].createStatement().execute("update test set value=value+1");
latch.countDown();
latch.await();
}
}
};
threads[i].start();
tasks[i].execute();
}
for (int i = 0; i < len; i++) {
threads[i].join();
}
if (ex[0] != null) {
throw ex[0];
tasks[i].get();
}
ResultSet rs = conn.createStatement().executeQuery("select value from test");
rs.next();
......
......@@ -12,6 +12,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* Row level locking tests.
......@@ -82,19 +83,15 @@ public class TestRowLocks extends TestBase {
assertEquals("Hallo", getSingleValue(s2, "SELECT NAME FROM TEST WHERE ID=1"));
s2.execute("UPDATE TEST SET NAME='H1' WHERE ID=1");
Thread thread = new Thread() {
public void run() {
try {
s1.execute("UPDATE TEST SET NAME='H2' WHERE ID=1");
} catch (SQLException e) {
e.printStackTrace();
}
Task task = new Task() {
public void call() throws SQLException {
s1.execute("UPDATE TEST SET NAME='H2' WHERE ID=1");
}
};
thread.start();
task.execute();
Thread.sleep(100);
c2.commit();
thread.join();
task.get();
c1.commit();
assertEquals("H2", getSingleValue(s1, "SELECT NAME FROM TEST WHERE ID=1"));
assertEquals("H2", getSingleValue(s2, "SELECT NAME FROM TEST WHERE ID=1"));
......
......@@ -19,6 +19,7 @@ import org.h2.store.fs.FileSystem;
import org.h2.test.TestBase;
import org.h2.tools.CompressTool;
import org.h2.util.New;
import org.h2.util.Task;
/**
* Data compression tests.
......@@ -62,40 +63,26 @@ public class TestCompress extends TestBase {
}
private void testMultiThreaded() throws Exception {
Thread[] threads = new Thread[3];
final boolean[] stop = { false };
final Exception[] ex = { null };
for (int i = 0; i < threads.length; i++) {
Thread t = new Thread() {
public void run() {
Task[] tasks = new Task[3];
for (int i = 0; i < tasks.length; i++) {
Task t = new Task() {
public void call() {
CompressTool tool = CompressTool.getInstance();
byte[] buff = new byte[1024];
Random r = new Random();
while (!stop[0]) {
while (!stop) {
r.nextBytes(buff);
try {
byte[] test = tool.expand(tool.compress(buff, "LZF"));
assertEquals(buff, test);
} catch (Exception e) {
ex[0] = e;
}
byte[] test = tool.expand(tool.compress(buff, "LZF"));
assertEquals(buff, test);
}
}
};
threads[i] = t;
t.start();
tasks[i] = t;
t.execute();
}
try {
Thread.sleep(1000);
stop[0] = true;
for (Thread t : threads) {
t.join();
}
} catch (InterruptedException e) {
// ignore
}
if (ex[0] != null) {
throw ex[0];
Thread.sleep(1000);
for (Task t : tasks) {
t.get();
}
}
......
......@@ -13,6 +13,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import org.h2.constant.ErrorCode;
import org.h2.test.TestBase;
import org.h2.util.Task;
/**
* Test concurrent access to JDBC objects.
......@@ -46,42 +47,36 @@ public class TestConcurrent extends TestBase {
break;
}
final PreparedStatement prep = conn.prepareStatement(sql);
final SQLException[] ex = new SQLException[1];
Thread t = new Thread() {
public void run() {
try {
while (!conn.isClosed()) {
switch (x % 6) {
case 0:
prep.executeQuery();
break;
case 1:
prep.execute();
break;
case 2:
prep.executeUpdate();
break;
case 3:
stat.executeQuery("select 1");
break;
case 4:
stat.execute("select 1");
break;
case 5:
stat.execute("delete from test");
break;
}
Task t = new Task() {
public void call() throws SQLException {
while (!conn.isClosed()) {
switch (x % 6) {
case 0:
prep.executeQuery();
break;
case 1:
prep.execute();
break;
case 2:
prep.executeUpdate();
break;
case 3:
stat.executeQuery("select 1");
break;
case 4:
stat.execute("select 1");
break;
case 5:
stat.execute("delete from test");
break;
}
} catch (SQLException e) {
ex[0] = e;
}
}
};
t.start();
t.execute();
Thread.sleep(100);
conn.close();
t.join();
SQLException e = ex[0];
SQLException e = (SQLException) t.getException();
if (e != null) {
if (ErrorCode.OBJECT_CLOSED != e.getErrorCode() &&
ErrorCode.STATEMENT_WAS_CANCELED != e.getErrorCode()) {
......
......@@ -20,6 +20,7 @@ import org.h2.store.fs.FileSystem;
import org.h2.test.TestBase;
import org.h2.util.IOUtils;
import org.h2.util.SortedProperties;
import org.h2.util.Task;
/**
* Test the serialized (server-less) mode.
......@@ -162,24 +163,19 @@ public class TestFileLockSerialized extends TestBase {
deleteDb("fileLockSerialized");
String url = "jdbc:h2:" + getBaseDir() + "/fileLockSerialized";
final String writeUrl = url + ";FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE";
final boolean[] stop = { false };
Connection conn = DriverManager.getConnection(writeUrl, "sa", "sa");
conn.createStatement().execute("create table test(id identity) as select x from system_range(1, 100)");
conn.close();
new Thread() {
public void run() {
while (!stop[0]) {
try {
Thread.sleep(10);
Connection c = DriverManager.getConnection(writeUrl, "sa", "sa");
c.createStatement().execute("select * from test");
c.close();
} catch (Exception e) {
// ignore
}
Task task = new Task() {
public void call() throws Exception {
while (!stop) {
Thread.sleep(10);
Connection c = DriverManager.getConnection(writeUrl, "sa", "sa");
c.createStatement().execute("select * from test");
c.close();
}
}
}.start();
}.execute();
Thread.sleep(20);
for (int i = 0; i < 2; i++) {
conn = DriverManager.getConnection(writeUrl, "sa", "sa");
......@@ -189,11 +185,11 @@ public class TestFileLockSerialized extends TestBase {
conn.createStatement().execute("select * from test");
conn.close();
}
stop[0] = true;
Thread.sleep(100);
conn = DriverManager.getConnection(writeUrl, "sa", "sa");
conn.createStatement().execute("select * from test");
conn.close();
task.get();
}
private void testPendingWrite() throws Exception {
......@@ -538,60 +534,47 @@ public class TestFileLockSerialized extends TestBase {
final String url = "jdbc:h2:" + getBaseDir() + "/fileLockSerialized;FILE_LOCK=SERIALIZED;OPEN_NEW=TRUE;CACHE_SIZE=" + cacheSizeKb;
final boolean[] importFinished = { false };
final Exception[] ex = { null };
final Thread importUpdate = new Thread() {
public void run() {
try {
Connection conn = DriverManager.getConnection(url);
Statement stat = conn.createStatement();
stat.execute("create table test(id int, id2 int)");
for (int i = 0; i < howMuchRows; i++) {
stat.execute("insert into test values(" + i + ", " + i + ")");
}
importFinished[0] = true;
Thread.sleep(5000);
stat.execute("update test set id2=999 where id=500");
conn.close();
} catch (Exception e) {
ex[0] = e;
} finally {
importFinished[0] = true;
final Task importUpdate = new Task() {
public void call() throws Exception {
Connection conn = DriverManager.getConnection(url);
Statement stat = conn.createStatement();
stat.execute("create table test(id int, id2 int)");
for (int i = 0; i < howMuchRows; i++) {
stat.execute("insert into test values(" + i + ", " + i + ")");
}
importFinished[0] = true;
Thread.sleep(5000);
stat.execute("update test set id2=999 where id=500");
conn.close();
importFinished[0] = true;
}
};
importUpdate.start();
Thread select = new Thread() {
public void run() {
try {
Connection conn = DriverManager.getConnection(url);
Statement stat = conn.createStatement();
while (!importFinished[0]) {
Thread.sleep(100);
}
Thread.sleep(1000);
ResultSet rs = stat.executeQuery("select id2 from test where id=500");
assertTrue(rs.next());
assertEquals(500, rs.getInt(1));
rs.close();
importUpdate.join();
Thread.sleep(1000);
rs = stat.executeQuery("select id2 from test where id=500");
assertTrue(rs.next());
assertEquals(999, rs.getInt(1));
rs.close();
conn.close();
} catch (Exception e) {
ex[0] = e;
importUpdate.execute();
Task select = new Task() {
public void call() throws Exception {
Connection conn = DriverManager.getConnection(url);
Statement stat = conn.createStatement();
while (!importFinished[0]) {
Thread.sleep(100);
}
Thread.sleep(1000);
ResultSet rs = stat.executeQuery("select id2 from test where id=500");
assertTrue(rs.next());
assertEquals(500, rs.getInt(1));
rs.close();
importUpdate.get();
Thread.sleep(1000);
rs = stat.executeQuery("select id2 from test where id=500");
assertTrue(rs.next());
assertEquals(999, rs.getInt(1));
rs.close();
conn.close();
}
};
select.start();
importUpdate.join();
select.join();
if (ex[0] != null) {
throw ex[0];
}
select.execute();
importUpdate.get();
select.get();
deleteDb("fileLockSerialized");
}
......
......@@ -14,6 +14,7 @@ import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.test.TestBase;
import org.h2.util.NetUtils;
import org.h2.util.Task;
/**
* Test the network utilities.
......@@ -42,9 +43,9 @@ public class TestNetUtils extends TestBase {
private void testFrequentConnections(boolean ssl, int count) throws Exception {
final ServerSocket serverSocket = NetUtils.createServerSocket(PORT, ssl);
final AtomicInteger counter = new AtomicInteger(count);
Thread serverThread = new Thread() {
public void run() {
while (!isInterrupted()) {
Task serverThread = new Task() {
public void call() {
while (!stop) {
try {
Socket socket = serverSocket.accept();
// System.out.println("opened " + counter);
......@@ -57,7 +58,7 @@ public class TestNetUtils extends TestBase {
}
};
serverThread.start();
serverThread.execute();
try {
Set<ConnectWorker> workers = new HashSet<ConnectWorker>();
for (int i = 0; i < WORKER_COUNT; i++) {
......@@ -81,8 +82,7 @@ public class TestNetUtils extends TestBase {
} catch (Exception e) {
// ignore
}
serverThread.interrupt();
serverThread.join();
serverThread.get();
}
}
......
......@@ -40,6 +40,7 @@ import org.h2.tools.RunScript;
import org.h2.tools.Script;
import org.h2.tools.Server;
import org.h2.tools.SimpleResultSet;
import org.h2.util.Task;
import org.h2.util.IOUtils;
import org.h2.util.JdbcUtils;
......@@ -221,20 +222,16 @@ public class TestTools extends TestBase {
assertEquals(ErrorCode.CONNECTION_BROKEN_1, e.getErrorCode());
}
final ServerSocket serverSocket = new ServerSocket(9001);
Thread thread = new Thread() {
public void run() {
try {
Socket socket = serverSocket.accept();
byte[] data = new byte[1024];
data[0] = 'x';
socket.getOutputStream().write(data);
socket.close();
} catch (Exception e) {
// ignore
}
Task task = new Task() {
public void call() throws Exception {
Socket socket = serverSocket.accept();
byte[] data = new byte[1024];
data[0] = 'x';
socket.getOutputStream().write(data);
socket.close();
}
};
thread.start();
task.execute();
Thread.sleep(100);
try {
Connection conn = getConnection("jdbc:h2:tcp://localhost:9001/test");
......@@ -244,7 +241,7 @@ public class TestTools extends TestBase {
assertEquals(ErrorCode.CONNECTION_BROKEN_1, e.getErrorCode());
}
serverSocket.close();
thread.join();
task.get();
}
private void testDeleteFiles() throws SQLException {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论