提交 b628409b authored 作者: Evgenij Ryazanov's avatar Evgenij Ryazanov

Add TestIndex.testConcurrentUpdate()

上级 6b67928a
......@@ -11,7 +11,9 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.api.ErrorCode;
import org.h2.result.SortOrder;
......@@ -44,6 +46,7 @@ public class TestIndex extends TestBase {
testHashIndexOnMemoryTable();
testErrorMessage();
testDuplicateKeyException();
testConcurrentUpdate();
testNonUniqueHashIndex();
testRenamePrimaryKey();
testRandomized();
......@@ -187,6 +190,103 @@ public class TestIndex extends TestBase {
stat.execute("drop table test");
}
private class ConcurrentUpdateThread extends Thread {
private final AtomicInteger concurrentUpdateId, concurrentUpdateValue;
private final PreparedStatement psInsert, psDelete;
boolean haveDuplicateKeyException;
ConcurrentUpdateThread(Connection c, AtomicInteger concurrentUpdateId,
AtomicInteger concurrentUpdateValue) throws SQLException {
this.concurrentUpdateId = concurrentUpdateId;
this.concurrentUpdateValue = concurrentUpdateValue;
psInsert = c.prepareStatement("insert into test(id, value) values (?, ?)");
psDelete = c.prepareStatement("delete from test where value = ?");
}
@Override
public void run() {
for (int i = 0; i < 10000; i++) {
try {
if (Math.random() > 0.05) {
psInsert.setInt(1, concurrentUpdateId.incrementAndGet());
psInsert.setInt(2, concurrentUpdateValue.get());
psInsert.executeUpdate();
} else {
psDelete.setInt(1, concurrentUpdateValue.get());
psDelete.executeUpdate();
}
} catch (SQLException ex) {
switch (ex.getErrorCode()) {
case 23505:
haveDuplicateKeyException = true;
break;
case 90131:
// Unlikely but possible
break;
default:
ex.printStackTrace();
}
}
if (Math.random() > 0.95)
concurrentUpdateValue.incrementAndGet();
}
}
}
private void testConcurrentUpdate() throws SQLException {
Connection c = getConnection("index");
Statement stat = c.createStatement();
stat.execute("create table test(id int primary key, value int)");
stat.execute("create unique index idx_value_name on test(value)");
PreparedStatement check = c.prepareStatement("select value from test");
ConcurrentUpdateThread[] threads = new ConcurrentUpdateThread[4];
AtomicInteger concurrentUpdateId = new AtomicInteger(), concurrentUpdateValue = new AtomicInteger();
// The same connection
for (int i = 0; i < threads.length; i++) {
threads[i] = new ConcurrentUpdateThread(c, concurrentUpdateId, concurrentUpdateValue);
}
testConcurrentUpdateRun(threads, check);
// Different connections
Connection[] connections = new Connection[threads.length];
for (int i = 0; i < threads.length; i++) {
Connection c2 = getConnection("index");
connections[i] = c2;
threads[i] = new ConcurrentUpdateThread(c2, concurrentUpdateId, concurrentUpdateValue);
}
testConcurrentUpdateRun(threads, check);
for (Connection c2 : connections) {
c2.close();
}
stat.execute("drop table test");
c.close();
}
void testConcurrentUpdateRun(ConcurrentUpdateThread[] threads, PreparedStatement check) throws SQLException {
for (ConcurrentUpdateThread t : threads) {
t.start();
}
boolean haveDuplicateKeyException = false;
for (ConcurrentUpdateThread t : threads) {
try {
t.join();
haveDuplicateKeyException |= t.haveDuplicateKeyException;
} catch (InterruptedException e) {
}
}
assertTrue("haveDuplicateKeys", haveDuplicateKeyException);
HashSet<Integer> set = new HashSet<>();
try (ResultSet rs = check.executeQuery()) {
while (rs.next()) {
if (!set.add(rs.getInt(1))) {
fail("unique index violation");
}
}
}
}
private void testNonUniqueHashIndex() throws SQLException {
reconnect();
stat.execute("create memory table test(id bigint, data bigint)");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论