提交 555af2d5 authored 作者: Thomas Mueller's avatar Thomas Mueller

Ensure threads stop running if the test fails

上级 0aae334e
...@@ -6,9 +6,10 @@ ...@@ -6,9 +6,10 @@
package org.h2.test.store; package org.h2.test.store;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.ByteArrayOutputStream; import java.io.BufferedOutputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
...@@ -178,9 +179,12 @@ public class TestConcurrent extends TestMVStore { ...@@ -178,9 +179,12 @@ public class TestConcurrent extends TestMVStore {
}; };
tasks[i].execute(); tasks[i].execute();
} }
Thread.sleep(100); try {
for (Task t : tasks) { Thread.sleep(100);
t.get(); } finally {
for (Task t : tasks) {
t.get();
}
} }
} }
...@@ -256,11 +260,14 @@ public class TestConcurrent extends TestMVStore { ...@@ -256,11 +260,14 @@ public class TestConcurrent extends TestMVStore {
} }
}; };
task.execute(); task.execute();
Thread.sleep(1); try {
for (int i = 0; !task.isFinished() && i < 1000000; i++) { Thread.sleep(1);
assertEquals(i % 100, map.get(i % 100).intValue()); for (int i = 0; !task.isFinished() && i < 1000000; i++) {
assertEquals(i % 100, map.get(i % 100).intValue());
}
} finally {
task.get();
} }
task.get();
s.close(); s.close();
} }
...@@ -272,76 +279,82 @@ public class TestConcurrent extends TestMVStore { ...@@ -272,76 +279,82 @@ public class TestConcurrent extends TestMVStore {
pageSplitSize(10). pageSplitSize(10).
autoCommitDisabled().open(); autoCommitDisabled().open();
s.setRetentionTime(10000); s.setRetentionTime(10000);
Task task = new Task() { try {
@Override Task task = new Task() {
public void call() throws Exception { @Override
while (!stop) { public void call() throws Exception {
s.compact(100, 1024 * 1024); while (!stop) {
s.compact(100, 1024 * 1024);
}
} }
} };
}; task.execute();
task.execute(); Task task2 = new Task() {
Task task2 = new Task() { @Override
@Override public void call() throws Exception {
public void call() throws Exception { while (!stop) {
while (!stop) { s.compact(100, 1024 * 1024);
s.compact(100, 1024 * 1024); }
} }
};
task2.execute();
Thread.sleep(1);
for (int i = 0; !task.isFinished() && !task2.isFinished() && i < 1000; i++) {
MVMap<Integer, Integer> map = s.openMap("d" + (i % 3));
// MVMap<Integer, Integer> map = s.openMap("d" + (i % 3),
// new MVMapConcurrent.Builder<Integer, Integer>());
map.put(0, i);
map.get(0);
s.commit();
} }
}; task.get();
task2.execute(); task2.get();
Thread.sleep(1); } finally {
for (int i = 0; !task.isFinished() && !task2.isFinished() && i < 1000; i++) { s.close();
MVMap<Integer, Integer> map = s.openMap("d" + (i % 3));
// MVMap<Integer, Integer> map = s.openMap("d" + (i % 3),
// new MVMapConcurrent.Builder<Integer, Integer>());
map.put(0, i);
map.get(0);
s.commit();
} }
task.get();
task2.get();
s.close();
} }
private void testConcurrentChangeAndGetVersion() throws InterruptedException { private void testConcurrentChangeAndGetVersion() throws InterruptedException {
for (int test = 0; test < 10; test++) { for (int test = 0; test < 10; test++) {
final MVStore s = new MVStore.Builder(). final MVStore s = new MVStore.Builder().
autoCommitDisabled().open(); autoCommitDisabled().open();
s.setVersionsToKeep(10); try {
final MVMap<Integer, Integer> m = s.openMap("data"); s.setVersionsToKeep(10);
m.put(1, 1); final MVMap<Integer, Integer> m = s.openMap("data");
Task task = new Task() { m.put(1, 1);
@Override Task task = new Task() {
public void call() throws Exception { @Override
while (!stop) { public void call() throws Exception {
while (!stop) {
m.put(1, 1);
s.commit();
}
}
};
task.execute();
Thread.sleep(1);
for (int i = 0; i < 10000; i++) {
if (task.isFinished()) {
break;
}
for (int j = 0; j < 20; j++) {
m.put(1, 1); m.put(1, 1);
s.commit(); s.commit();
} }
s.setVersionsToKeep(15);
long version = s.getCurrentVersion() - 1;
try {
m.openVersion(version);
} catch (IllegalArgumentException e) {
// ignore
}
s.setVersionsToKeep(20);
} }
}; task.get();
task.execute(); s.commit();
Thread.sleep(1); } finally {
for (int i = 0; i < 10000; i++) { s.close();
if (task.isFinished()) {
break;
}
for (int j = 0; j < 20; j++) {
m.put(1, 1);
s.commit();
}
s.setVersionsToKeep(15);
long version = s.getCurrentVersion() - 1;
try {
m.openVersion(version);
} catch (IllegalArgumentException e) {
// ignore
}
s.setVersionsToKeep(20);
} }
task.get();
s.commit();
s.close();
} }
} }
...@@ -363,63 +376,66 @@ public class TestConcurrent extends TestMVStore { ...@@ -363,63 +376,66 @@ public class TestConcurrent extends TestMVStore {
s1.close(); s1.close();
final MVStore s = new MVStore.Builder(). final MVStore s = new MVStore.Builder().
fileName(fileName).autoCommitDisabled().open(); fileName(fileName).autoCommitDisabled().open();
s.setRetentionTime(0); try {
final ArrayList<MVMap<Integer, Integer>> list = New.arrayList(); s.setRetentionTime(0);
for (int i = 0; i < count; i++) { final ArrayList<MVMap<Integer, Integer>> list = New.arrayList();
MVMap<Integer, Integer> m = s.openMap("d" + i); for (int i = 0; i < count; i++) {
list.add(m); MVMap<Integer, Integer> m = s.openMap("d" + i);
} list.add(m);
}
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
Task task = new Task() { Task task = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
while (!stop) { while (!stop) {
int x = counter.getAndIncrement(); int x = counter.getAndIncrement();
if (x >= count) { if (x >= count) {
break; break;
}
MVMap<Integer, Integer> m = list.get(x);
m.clear();
s.removeMap(m);
} }
MVMap<Integer, Integer> m = list.get(x); }
m.clear(); };
s.removeMap(m); task.execute();
Thread.sleep(1);
while (true) {
int x = counter.getAndIncrement();
if (x >= count) {
break;
}
MVMap<Integer, Integer> m = list.get(x);
m.clear();
s.removeMap(m);
if (x % 5 == 0) {
s.commit();
} }
} }
}; task.get();
task.execute(); // this will mark old chunks as unused,
Thread.sleep(1); // but not remove (and overwrite) them yet
while (true) { s.commit();
int x = counter.getAndIncrement(); // this will remove them, so we end up with
if (x >= count) { // one unused one, and one active one
break; MVMap<Integer, Integer> m = s.openMap("dummy");
} m.put(1, 1);
MVMap<Integer, Integer> m = list.get(x); s.commit();
m.clear(); m.put(2, 2);
s.removeMap(m); s.commit();
if (x % 5 == 0) {
s.commit();
}
}
task.get();
// this will mark old chunks as unused,
// but not remove (and overwrite) them yet
s.commit();
// this will remove them, so we end up with
// one unused one, and one active one
MVMap<Integer, Integer> m = s.openMap("dummy");
m.put(1, 1);
s.commit();
m.put(2, 2);
s.commit();
MVMap<String, String> meta = s.getMetaMap(); MVMap<String, String> meta = s.getMetaMap();
int chunkCount = 0; int chunkCount = 0;
for (String k : meta.keyList()) { for (String k : meta.keyList()) {
if (k.startsWith("chunk.")) { if (k.startsWith("chunk.")) {
chunkCount++; chunkCount++;
}
} }
assertTrue("" + chunkCount, chunkCount < 3);
} finally {
s.close();
} }
assertTrue("" + chunkCount, chunkCount < 3);
s.close();
} }
} }
...@@ -427,73 +443,79 @@ public class TestConcurrent extends TestMVStore { ...@@ -427,73 +443,79 @@ public class TestConcurrent extends TestMVStore {
String fileName = "memFS:" + getTestName(); String fileName = "memFS:" + getTestName();
FileUtils.delete(fileName); FileUtils.delete(fileName);
final MVStore s = openStore(fileName); final MVStore s = openStore(fileName);
int count = 200; try {
for (int i = 0; i < count; i++) { int count = 200;
MVMap<Integer, Integer> m = s.openMap("d" + i); for (int i = 0; i < count; i++) {
m.put(1, 1); MVMap<Integer, Integer> m = s.openMap("d" + i);
} m.put(1, 1);
final AtomicInteger counter = new AtomicInteger();
Task task = new Task() {
@Override
public void call() throws Exception {
while (!stop) {
counter.incrementAndGet();
s.commit();
}
}
};
task.execute();
Thread.sleep(1);
for (int i = 0; i < count || counter.get() < count; i++) {
MVMap<Integer, Integer> m = s.openMap("d" + i);
m.put(1, 10);
s.removeMap(m);
if (task.isFinished()) {
break;
} }
}
task.get();
s.close();
}
private void testConcurrentStoreAndClose() throws InterruptedException {
String fileName = "memFS:" + getTestName();
for (int i = 0; i < 10; i++) {
FileUtils.delete(fileName);
final MVStore s = openStore(fileName);
final AtomicInteger counter = new AtomicInteger(); final AtomicInteger counter = new AtomicInteger();
Task task = new Task() { Task task = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
while (!stop) { while (!stop) {
s.setStoreVersion(counter.incrementAndGet()); counter.incrementAndGet();
s.commit(); s.commit();
} }
} }
}; };
task.execute(); task.execute();
while (counter.get() < 5) { Thread.sleep(1);
Thread.sleep(1); for (int i = 0; i < count || counter.get() < count; i++) {
MVMap<Integer, Integer> m = s.openMap("d" + i);
m.put(1, 10);
s.removeMap(m);
if (task.isFinished()) {
break;
}
} }
task.get();
} finally {
s.close();
}
}
private void testConcurrentStoreAndClose() throws InterruptedException {
String fileName = "memFS:" + getTestName();
for (int i = 0; i < 10; i++) {
FileUtils.delete(fileName);
final MVStore s = openStore(fileName);
try { try {
s.close(); final AtomicInteger counter = new AtomicInteger();
// sometimes closing works, in which case Task task = new Task() {
// storing must fail at some point (not necessarily @Override
// immediately) public void call() throws Exception {
for (int x = counter.get(), y = x; x <= y + 2; x++) { while (!stop) {
s.setStoreVersion(counter.incrementAndGet());
s.commit();
}
}
};
task.execute();
while (counter.get() < 5) {
Thread.sleep(1); Thread.sleep(1);
} }
Exception e = task.getException(); try {
assertEquals(DataUtils.ERROR_CLOSED, s.close();
DataUtils.getErrorCode(e.getMessage())); // sometimes closing works, in which case
} catch (IllegalStateException e) { // storing must fail at some point (not necessarily
// sometimes storing works, in which case // immediately)
// closing must fail for (int x = counter.get(), y = x; x <= y + 2; x++) {
assertEquals(DataUtils.ERROR_WRITING_FAILED, Thread.sleep(1);
DataUtils.getErrorCode(e.getMessage())); }
task.get(); Exception e = task.getException();
assertEquals(DataUtils.ERROR_CLOSED,
DataUtils.getErrorCode(e.getMessage()));
} catch (IllegalStateException e) {
// sometimes storing works, in which case
// closing must fail
assertEquals(DataUtils.ERROR_WRITING_FAILED,
DataUtils.getErrorCode(e.getMessage()));
task.get();
}
} finally {
s.close();
} }
s.close();
} }
} }
...@@ -503,51 +525,54 @@ public class TestConcurrent extends TestMVStore { ...@@ -503,51 +525,54 @@ public class TestConcurrent extends TestMVStore {
private void testConcurrentMap() throws InterruptedException { private void testConcurrentMap() throws InterruptedException {
final MVStore s = openStore(null); final MVStore s = openStore(null);
final MVMap<Integer, Integer> m = s.openMap("data"); final MVMap<Integer, Integer> m = s.openMap("data");
final int size = 20; try {
final Random rand = new Random(1); final int size = 20;
Task task = new Task() { final Random rand = new Random(1);
@Override Task task = new Task() {
public void call() throws Exception { @Override
try { public void call() throws Exception {
while (!stop) { try {
if (rand.nextBoolean()) { while (!stop) {
m.put(rand.nextInt(size), 1); if (rand.nextBoolean()) {
} else { m.put(rand.nextInt(size), 1);
m.remove(rand.nextInt(size)); } else {
} m.remove(rand.nextInt(size));
m.get(rand.nextInt(size)); }
m.firstKey(); m.get(rand.nextInt(size));
m.lastKey(); m.firstKey();
m.ceilingKey(5); m.lastKey();
m.floorKey(5); m.ceilingKey(5);
m.higherKey(5); m.floorKey(5);
m.lowerKey(5); m.higherKey(5);
for (Iterator<Integer> it = m.keyIterator(null); m.lowerKey(5);
it.hasNext();) { for (Iterator<Integer> it = m.keyIterator(null);
it.next(); it.hasNext();) {
it.next();
}
} }
} catch (Exception e) {
e.printStackTrace();
} }
} catch (Exception e) {
e.printStackTrace();
} }
} };
}; task.execute();
task.execute(); Thread.sleep(1);
Thread.sleep(1); for (int j = 0; j < 100; j++) {
for (int j = 0; j < 100; j++) { for (int i = 0; i < 100; i++) {
for (int i = 0; i < 100; i++) { if (rand.nextBoolean()) {
if (rand.nextBoolean()) { m.put(rand.nextInt(size), 2);
m.put(rand.nextInt(size), 2); } else {
} else { m.remove(rand.nextInt(size));
m.remove(rand.nextInt(size)); }
m.get(rand.nextInt(size));
} }
m.get(rand.nextInt(size)); s.commit();
Thread.sleep(1);
} }
s.commit(); task.get();
Thread.sleep(1); } finally {
s.close();
} }
task.get();
s.close();
} }
private void testConcurrentOnlineBackup() throws Exception { private void testConcurrentOnlineBackup() throws Exception {
...@@ -556,7 +581,7 @@ public class TestConcurrent extends TestMVStore { ...@@ -556,7 +581,7 @@ public class TestConcurrent extends TestMVStore {
final MVStore s = openStore(fileName); final MVStore s = openStore(fileName);
final MVMap<Integer, byte[]> map = s.openMap("test"); final MVMap<Integer, byte[]> map = s.openMap("test");
final Random r = new Random(); final Random r = new Random();
Task t = new Task() { Task task = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
while (!stop) { while (!stop) {
...@@ -577,44 +602,46 @@ public class TestConcurrent extends TestMVStore { ...@@ -577,44 +602,46 @@ public class TestConcurrent extends TestMVStore {
} }
} }
}; };
t.execute(); task.execute();
for (int i = 0; i < 10; i++) { try {
// System.out.println("test " + i); for (int i = 0; i < 10; i++) {
s.setReuseSpace(false); // System.out.println("test " + i);
byte[] buff = readFileSlowly(s.getFileStore().getFile(), s.setReuseSpace(false);
s.getFileStore().size()); OutputStream out = new BufferedOutputStream(
s.setReuseSpace(true); new FileOutputStream(fileNameRestore));
FileOutputStream out = new FileOutputStream(fileNameRestore); long len = s.getFileStore().size();
out.write(buff); copyFileSlowly(s.getFileStore().getFile(),
out.close(); len, out);
MVStore s2 = openStore(fileNameRestore); out.close();
MVMap<Integer, byte[]> test = s2.openMap("test"); s.setReuseSpace(true);
for (Integer k : test.keySet()) { MVStore s2 = openStore(fileNameRestore);
test.get(k); MVMap<Integer, byte[]> test = s2.openMap("test");
for (Integer k : test.keySet()) {
test.get(k);
}
s2.close();
// let it compact
Thread.sleep(10);
} }
s2.close(); } finally {
// let it compact task.get();
Thread.sleep(10);
} }
t.get();
s.close(); s.close();
} }
private static byte[] readFileSlowly(FileChannel file, long length) private static void copyFileSlowly(FileChannel file, long length, OutputStream out)
throws Exception { throws Exception {
file.position(0); file.position(0);
InputStream in = new BufferedInputStream(new FileChannelInputStream( InputStream in = new BufferedInputStream(new FileChannelInputStream(
file, false)); file, false));
ByteArrayOutputStream buff = new ByteArrayOutputStream();
for (int j = 0; j < length; j++) { for (int j = 0; j < length; j++) {
int x = in.read(); int x = in.read();
if (x < 0) { if (x < 0) {
break; break;
} }
buff.write(x); out.write(x);
} }
in.close(); in.close();
return buff.toByteArray();
} }
private void testConcurrentIterate() { private void testConcurrentIterate() {
...@@ -623,7 +650,7 @@ public class TestConcurrent extends TestMVStore { ...@@ -623,7 +650,7 @@ public class TestConcurrent extends TestMVStore {
final MVMap<Integer, Integer> map = s.openMap("test"); final MVMap<Integer, Integer> map = s.openMap("test");
final int len = 10; final int len = 10;
final Random r = new Random(); final Random r = new Random();
Task t = new Task() { Task task = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
while (!stop) { while (!stop) {
...@@ -636,19 +663,22 @@ public class TestConcurrent extends TestMVStore { ...@@ -636,19 +663,22 @@ public class TestConcurrent extends TestMVStore {
} }
} }
}; };
t.execute(); task.execute();
for (int k = 0; k < 10000; k++) { try {
Iterator<Integer> it = map.keyIterator(r.nextInt(len)); for (int k = 0; k < 10000; k++) {
long old = s.getCurrentVersion(); Iterator<Integer> it = map.keyIterator(r.nextInt(len));
s.commit(); long old = s.getCurrentVersion();
while (map.getVersion() == old) { s.commit();
Thread.yield(); while (map.getVersion() == old) {
} Thread.yield();
while (it.hasNext()) { }
it.next(); while (it.hasNext()) {
it.next();
}
} }
} finally {
task.get();
} }
t.get();
s.close(); s.close();
} }
...@@ -699,32 +729,35 @@ public class TestConcurrent extends TestMVStore { ...@@ -699,32 +729,35 @@ public class TestConcurrent extends TestMVStore {
} }
}; };
task.execute(); task.execute();
Thread.sleep(1); try {
for (int j = 0; j < 10; j++) { Thread.sleep(1);
for (int i = 0; i < 10; i++) { for (int j = 0; j < 10; j++) {
try { for (int i = 0; i < 10; i++) {
if (rand.nextBoolean()) { try {
m.put(rand.nextInt(size), 2); if (rand.nextBoolean()) {
} else { m.put(rand.nextInt(size), 2);
m.remove(rand.nextInt(size)); } else {
m.remove(rand.nextInt(size));
}
m.get(rand.nextInt(size));
} catch (ConcurrentModificationException e) {
detected.incrementAndGet();
} catch (NegativeArraySizeException e) {
notDetected.incrementAndGet();
} catch (ArrayIndexOutOfBoundsException e) {
notDetected.incrementAndGet();
} catch (IllegalArgumentException e) {
notDetected.incrementAndGet();
} catch (NullPointerException e) {
notDetected.incrementAndGet();
} }
m.get(rand.nextInt(size));
} catch (ConcurrentModificationException e) {
detected.incrementAndGet();
} catch (NegativeArraySizeException e) {
notDetected.incrementAndGet();
} catch (ArrayIndexOutOfBoundsException e) {
notDetected.incrementAndGet();
} catch (IllegalArgumentException e) {
notDetected.incrementAndGet();
} catch (NullPointerException e) {
notDetected.incrementAndGet();
} }
s.commit();
Thread.sleep(1);
} }
s.commit(); } finally {
Thread.sleep(1); task.get();
} }
task.get();
s.close(); s.close();
} }
...@@ -754,16 +787,19 @@ public class TestConcurrent extends TestMVStore { ...@@ -754,16 +787,19 @@ public class TestConcurrent extends TestMVStore {
} }
}; };
task.execute(); task.execute();
Thread.sleep(1); try {
for (int j = 0; j < 100; j++) {
x = (int) s.getCurrentVersion();
for (int i = 0; i < size; i++) {
m.put(i, x);
}
s.commit();
Thread.sleep(1); Thread.sleep(1);
for (int j = 0; j < 100; j++) {
x = (int) s.getCurrentVersion();
for (int i = 0; i < size; i++) {
m.put(i, x);
}
s.commit();
Thread.sleep(1);
}
} finally {
task.get();
} }
task.get();
s.close(); s.close();
} }
......
...@@ -597,31 +597,39 @@ public class TestMVTableEngine extends TestBase { ...@@ -597,31 +597,39 @@ public class TestMVTableEngine extends TestBase {
private void testTransactionLogUsuallyNotStored() throws Exception { private void testTransactionLogUsuallyNotStored() throws Exception {
Connection conn; Connection conn;
Statement stat; Statement stat;
deleteDb(getTestName()); // we expect the transaction log is empty in at least some of the cases
String url = getTestName() + ";MV_STORE=TRUE"; for (int test = 0; test < 5; test++) {
url = getURL(url, true); deleteDb(getTestName());
conn = getConnection(url); String url = getTestName() + ";MV_STORE=TRUE";
stat = conn.createStatement(); url = getURL(url, true);
stat.execute("create table test(id identity, name varchar)"); conn = getConnection(url);
conn.setAutoCommit(false); stat = conn.createStatement();
PreparedStatement prep = conn.prepareStatement( stat.execute("create table test(id identity, name varchar)");
"insert into test(name) values(space(10000))"); conn.setAutoCommit(false);
for (int j = 0; j < 100; j++) { PreparedStatement prep = conn.prepareStatement(
for (int i = 0; i < 100; i++) { "insert into test(name) values(space(10000))");
prep.execute(); for (int j = 0; j < 100; j++) {
for (int i = 0; i < 100; i++) {
prep.execute();
}
conn.commit();
}
stat.execute("shutdown immediately");
JdbcUtils.closeSilently(conn);
String file = getBaseDir() + "/" + getTestName() +
Constants.SUFFIX_MV_FILE;
MVStore store = MVStore.open(file);
TransactionStore t = new TransactionStore(store);
t.init();
int openTransactions = t.getOpenTransactions().size();
store.close();
if (openTransactions == 0) {
return;
} }
conn.commit();
} }
stat.execute("shutdown immediately"); fail("transaction log was never empty");
JdbcUtils.closeSilently(conn);
String file = getBaseDir() + "/" + getTestName() + Constants.SUFFIX_MV_FILE;
MVStore store = MVStore.open(file);
TransactionStore t = new TransactionStore(store);
t.init();
assertEquals(0, t.getOpenTransactions().size());
store.close();
} }
private void testShrinkDatabaseFile() throws Exception { private void testShrinkDatabaseFile() throws Exception {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论