提交 e873f830 authored 作者: Noel Grandin's avatar Noel Grandin

Fix multi-threaded mode insert exception "Unique index or primary key…

Fix multi-threaded mode insert exception "Unique index or primary key violation", test case by Anatolii K
上级 33120d89
......@@ -21,6 +21,8 @@ Change Log
<h2>Next Version (unreleased)</h2>
<ul>
<li>Fix multi-threaded mode insert exception "Unique index or primary key violation", test case by Anatolii K
</li>
<li>Implement ILIKE operator for case-insensitive matching
</li>
<li>Optimise LIKE queries for the common cases of '%Foo' and '%Foo%'
......
......@@ -57,7 +57,7 @@ public class MultiVersionIndex implements Index {
synchronized (sync) {
base.add(session, row);
if (removeIfExists(session, row)) {
// for example rolling back an delete operation
// for example rolling back a delete operation
} else if (row.getSessionId() != 0) {
// don't insert rows that are added when creating an index
delta.add(session, row);
......
......@@ -11,6 +11,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.api.ErrorCode;
import org.h2.engine.Database;
import org.h2.engine.Session;
......@@ -55,7 +56,7 @@ public class MVPrimaryIndex extends BaseIndex {
private final MVTable mvTable;
private final String mapName;
private TransactionMap<Value, Value> dataMap;
private long lastKey;
private final AtomicLong lastKey = new AtomicLong(0);
private int mainIndexColumn = -1;
public MVPrimaryIndex(Database db, MVTable table, int id,
......@@ -77,7 +78,7 @@ public class MVPrimaryIndex extends BaseIndex {
dataMap.map.setVolatile(true);
}
Value k = dataMap.lastKey();
lastKey = k == null ? 0 : k.getLong();
lastKey.set(k == null ? 0 : k.getLong());
}
@Override
......@@ -107,7 +108,7 @@ public class MVPrimaryIndex extends BaseIndex {
public void add(Session session, Row row) {
if (mainIndexColumn == -1) {
if (row.getKey() == 0) {
row.setKey(++lastKey);
row.setKey(lastKey.incrementAndGet());
}
} else {
long c = row.getValue(mainIndexColumn).getLong();
......@@ -144,7 +145,10 @@ public class MVPrimaryIndex extends BaseIndex {
} catch (IllegalStateException e) {
throw mvTable.convertException(e);
}
lastKey = Math.max(lastKey, row.getKey());
// because it's possible to directly update the key using the _rowid_ syntax
if (row.getKey() > lastKey.get()) {
lastKey.set(row.getKey());
}
}
@Override
......
......@@ -71,6 +71,7 @@ public class TestMultiThread extends TestBase implements Runnable {
testConcurrentInsertUpdateSelect();
testLockModeWithMultiThreaded();
testViews();
testConcurrentInsert();
}
private void testConcurrentSchemaChange() throws Exception {
......@@ -316,7 +317,7 @@ public class TestMultiThread extends TestBase implements Runnable {
"CREATE VIEW INVOICE_DETAIL_VIEW as SELECT * FROM INVOICE_DETAIL");
stat.close();
// create views that reference the common views in different threads
final ExecutorService executor = Executors.newFixedThreadPool(8);
try {
......@@ -328,10 +329,10 @@ public class TestMultiThread extends TestBase implements Runnable {
public Void call() throws Exception {
final Connection conn2 = getConnection(url);
Statement stat2 = conn2.createStatement();
stat2.execute("CREATE VIEW INVOICE_VIEW" + j
+ " as SELECT * FROM INVOICE_VIEW");
// the following query intermittently results in a
// NullPointerException
stat2.execute("CREATE VIEW INVOICE_DETAIL_VIEW" + j
......@@ -339,17 +340,17 @@ public class TestMultiThread extends TestBase implements Runnable {
+ " INV JOIN INVOICE_DETAIL_VIEW DTL "
+ "ON INV.INVOICE_ID = DTL.INVOICE_ID"
+ " WHERE DESCRIPTION='TEST'");
ResultSet rs = stat2
.executeQuery("SELECT * FROM INVOICE_VIEW" + j);
rs.next();
rs.close();
rs = stat2.executeQuery(
"SELECT * FROM INVOICE_DETAIL_VIEW" + j);
rs.next();
rs.close();
stat.close();
conn.close();
return null;
......@@ -378,4 +379,55 @@ public class TestMultiThread extends TestBase implements Runnable {
deleteDb("lockMode");
}
private void testConcurrentInsert() throws Exception {
deleteDb("lockMode");
final String url = getURL("lockMode;MULTI_THREADED=1", true);
final Connection conn = getConnection(url);
conn.createStatement().execute(
"CREATE TABLE IF NOT EXISTS TRAN (ID NUMBER(18,0) not null PRIMARY KEY)");
final int threadCount = 100;
final ArrayList<Callable<Integer>> callables = new ArrayList<Callable<Integer>>();
for (int i = 0; i < threadCount; i++) {
final Connection taskConn = getConnection(url);
taskConn.setAutoCommit(false);
final PreparedStatement insertTranStmt = taskConn
.prepareStatement("INSERT INTO tran (id) values(?)");
// to guarantee uniqueness
final long initialTransactionId = i * 1000000L;
callables.add(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
long tranId = initialTransactionId;
for (int j = 0; j < 10000; j++) {
insertTranStmt.setLong(1, tranId++);
insertTranStmt.execute();
taskConn.commit();
}
return 1;
}
});
}
final ExecutorService executor = Executors
.newFixedThreadPool(threadCount);
try {
final ArrayList<Future<Integer>> jobs = new ArrayList<Future<Integer>>();
for (int i = 0; i < threadCount; i++) {
jobs.add(executor.submit(callables.get(i)));
}
// check for exceptions
for (Future<Integer> job : jobs) {
if (job.get(180, TimeUnit.SECONDS) == null) {
throw new RuntimeException("timed out");
}
}
} finally {
conn.close();
executor.shutdown();
executor.awaitTermination(20, TimeUnit.SECONDS);
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论