提交 4abe8639 authored 作者: Thomas Mueller's avatar Thomas Mueller

Issue 117: Multi-version concurrency: concurrent MERGE statements now work.

上级 7322a5e7
...@@ -18,7 +18,7 @@ Change Log ...@@ -18,7 +18,7 @@ Change Log
<h1>Change Log</h1> <h1>Change Log</h1>
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>- <ul><li>Issue 117: Multi-version concurrency: concurrent MERGE statements now work.
</li></ul> </li></ul>
<h2>Version 1.2.123 (2009-11-08)</h2> <h2>Version 1.2.123 (2009-11-08)</h2>
......
...@@ -215,9 +215,9 @@ public abstract class Command implements CommandInterface { ...@@ -215,9 +215,9 @@ public abstract class Command implements CommandInterface {
} }
try { try {
if (sync == database) { if (sync == database) {
database.wait(100); database.wait(10);
} else { } else {
Thread.sleep(100); Thread.sleep(10);
} }
} catch (InterruptedException e1) { } catch (InterruptedException e1) {
// ignore // ignore
......
...@@ -173,14 +173,21 @@ public class Merge extends Prepared { ...@@ -173,14 +173,21 @@ public class Merge extends Prepared {
} }
int count = update.update(); int count = update.update();
if (count == 0) { if (count == 0) {
table.fireBefore(session); try {
table.validateConvertUpdateSequence(session, row); table.fireBefore(session);
table.fireBeforeRow(session, null, row); table.validateConvertUpdateSequence(session, row);
table.lock(session, true, false); table.fireBeforeRow(session, null, row);
table.addRow(session, row); table.lock(session, true, false);
session.log(table, UndoLogRecord.INSERT, row); table.addRow(session, row);
table.fireAfter(session); session.log(table, UndoLogRecord.INSERT, row);
table.fireAfterRow(session, null, row); table.fireAfter(session);
table.fireAfterRow(session, null, row);
} catch (SQLException e) {
if (e.getErrorCode() == ErrorCode.DUPLICATE_KEY_1) {
// concurrent merge or insert
throw Message.getSQLException(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
}
}
} else if (count != 1) { } else if (count != 1) {
throw Message.getSQLException(ErrorCode.DUPLICATE_KEY_1, table.getSQL()); throw Message.getSQLException(ErrorCode.DUPLICATE_KEY_1, table.getSQL());
} }
......
...@@ -297,7 +297,6 @@ java org.h2.test.TestAll timer ...@@ -297,7 +297,6 @@ java org.h2.test.TestAll timer
/* /*
mvcc merge problem (testConcurrentMerge)
http://www.apache.org/dev/contrib-email-tips.html http://www.apache.org/dev/contrib-email-tips.html
google app engine google app engine
documentation: rolling review at history.html documentation: rolling review at history.html
......
...@@ -9,8 +9,6 @@ package org.h2.test.mvcc; ...@@ -9,8 +9,6 @@ package org.h2.test.mvcc;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet; import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Random;
import org.h2.test.TestBase; import org.h2.test.TestBase;
/** /**
...@@ -28,7 +26,7 @@ public class TestMvccMultiThreaded extends TestBase { ...@@ -28,7 +26,7 @@ public class TestMvccMultiThreaded extends TestBase {
} }
public void test() throws Exception { public void test() throws Exception {
// testConcurrentMerge(); testConcurrentMerge();
testConcurrentUpdate(""); testConcurrentUpdate("");
// not supported currently // not supported currently
// testConcurrentUpdate(";MULTI_THREADED=TRUE"); // testConcurrentUpdate(";MULTI_THREADED=TRUE");
...@@ -36,14 +34,14 @@ public class TestMvccMultiThreaded extends TestBase { ...@@ -36,14 +34,14 @@ public class TestMvccMultiThreaded extends TestBase {
private void testConcurrentMerge() throws Exception { private void testConcurrentMerge() throws Exception {
deleteDb("mvccMultiThreaded"); deleteDb("mvccMultiThreaded");
int len = 10; int len = 3;
final Connection[] connList = new Connection[len]; final Connection[] connList = new Connection[len];
for (int i = 0; i < len; i++) { for (int i = 0; i < len; i++) {
Connection conn = getConnection("mvccMultiThreaded;MVCC=TRUE"); Connection conn = getConnection("mvccMultiThreaded;MVCC=TRUE;LOCK_TIMEOUT=1000");
connList[i] = conn; connList[i] = conn;
} }
Connection conn = connList[0]; Connection conn = connList[0];
conn.createStatement().execute("create table test(id int primary key, value int)"); conn.createStatement().execute("create table test(id int primary key, name varchar)");
final SQLException[] ex = new SQLException[1]; final SQLException[] ex = new SQLException[1];
Thread[] threads = new Thread[len]; Thread[] threads = new Thread[len];
final boolean[] stop = new boolean[1]; final boolean[] stop = new boolean[1];
...@@ -52,11 +50,9 @@ public class TestMvccMultiThreaded extends TestBase { ...@@ -52,11 +50,9 @@ public class TestMvccMultiThreaded extends TestBase {
c.setAutoCommit(false); c.setAutoCommit(false);
threads[i] = new Thread() { threads[i] = new Thread() {
public void run() { public void run() {
Random random = new Random();
while (!stop[0]) { while (!stop[0]) {
try { try {
int k = random.nextInt(100); c.createStatement().execute("merge into test values(1, 'x')");
c.createStatement().execute("merge into test values(" + k + ", " + k + ")");
c.commit(); c.commit();
} catch (SQLException e) { } catch (SQLException e) {
ex[0] = e; ex[0] = e;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论