提交 08218b9f authored 作者: Thomas Mueller's avatar Thomas Mueller

MVCC: when trying to insert two rows with the same key from two connections, the…

MVCC: when trying to insert two rows with the same key from two connections, the second connection immediately threw the exception "Unique index or primary key violation".
上级 8a43aca5
...@@ -16,8 +16,8 @@ import org.h2.result.SearchRow; ...@@ -16,8 +16,8 @@ import org.h2.result.SearchRow;
import org.h2.schema.Schema; import org.h2.schema.Schema;
import org.h2.table.Column; import org.h2.table.Column;
import org.h2.table.IndexColumn; import org.h2.table.IndexColumn;
import org.h2.table.Table;
import org.h2.table.RegularTable; import org.h2.table.RegularTable;
import org.h2.table.Table;
import org.h2.value.Value; import org.h2.value.Value;
import org.h2.value.ValueNull; import org.h2.value.ValueNull;
...@@ -126,6 +126,23 @@ public class MultiVersionIndex implements Index { ...@@ -126,6 +126,23 @@ public class MultiVersionIndex implements Index {
return base.needRebuild(); return base.needRebuild();
} }
/**
* Check if there is an uncommitted row with the given key
* within a different session.
*
* @param session the original session
* @param row the row (only the key is checked)
* @return true if there is an uncommitted row
*/
public boolean isUncommittedFromOtherSession(Session session, Row row) {
Cursor c = delta.find(session, row, row);
while (c.next()) {
Row r = c.get();
return r.getSessionId() != session.getId();
}
return false;
}
private boolean removeIfExists(Session session, Row row) { private boolean removeIfExists(Session session, Row row) {
// maybe it was inserted by the same session just before // maybe it was inserted by the same session just before
Cursor c = delta.find(session, row, row); Cursor c = delta.find(session, row, row);
......
...@@ -131,7 +131,19 @@ public class RegularTable extends TableBase { ...@@ -131,7 +131,19 @@ public class RegularTable extends TableBase {
trace.error("Could not undo operation", e); trace.error("Could not undo operation", e);
throw e2; throw e2;
} }
throw DbException.convert(e); DbException dbe = DbException.convert(e);
if (dbe.getErrorCode() == ErrorCode.DUPLICATE_KEY_1) {
for (int j = 0; j < indexes.size(); j++) {
Index index = indexes.get(j);
if (index.getIndexType().isUnique() && index instanceof MultiVersionIndex) {
MultiVersionIndex mv = (MultiVersionIndex) index;
if (mv.isUncommittedFromOtherSession(session, row)) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, index.getName());
}
}
}
}
throw dbe;
} }
analyzeIfRequired(session); analyzeIfRequired(session);
} }
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
package org.h2.test.mvcc; package org.h2.test.mvcc;
import java.sql.Connection; import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement; import java.sql.Statement;
import org.h2.constant.ErrorCode; import org.h2.constant.ErrorCode;
...@@ -35,11 +36,13 @@ public class TestMvcc2 extends TestBase { ...@@ -35,11 +36,13 @@ public class TestMvcc2 extends TestBase {
test.test(); test.test();
} }
public void test() throws SQLException { public void test() throws Exception {
if (!config.mvcc) { if (!config.mvcc) {
return; return;
} }
deleteDb("mvcc2"); deleteDb("mvcc2");
testConcurrentInsert();
testConcurrentUpdate();
testSelectForUpdate(); testSelectForUpdate();
testInsertUpdateRollback(); testInsertUpdateRollback();
testInsertRollback(); testInsertRollback();
...@@ -50,6 +53,86 @@ public class TestMvcc2 extends TestBase { ...@@ -50,6 +53,86 @@ public class TestMvcc2 extends TestBase {
return getConnection("mvcc2"); return getConnection("mvcc2");
} }
private void testConcurrentInsert() throws Exception {
Connection conn = getConnection();
final Connection conn2 = getConnection();
Statement stat = conn.createStatement();
final Statement stat2 = conn2.createStatement();
stat2.execute("set lock_timeout 1000");
stat.execute("create table test(id int primary key, name varchar)");
conn.setAutoCommit(false);
final boolean[] committed = { false };
final SQLException[] ex = { null };
Thread t = new Thread() {
public void run() {
try {
//System.out.println("insert2 hallo");
stat2.execute("insert into test values(0, 'Hallo')");
//System.out.println("insert2 hallo done");
} catch (SQLException e) {
//System.out.println("insert2 hallo e " + e);
if (!committed[0]) {
ex[0] = e;
}
}
}
};
//System.out.println("insert hello");
stat.execute("insert into test values(0, 'Hello')");
t.start();
Thread.sleep(500);
//System.out.println("insert hello commit");
committed[0] = true;
conn.commit();
t.join();
if (ex[0] != null) {
throw ex[0];
}
ResultSet rs;
rs = stat.executeQuery("select name from test");
rs.next();
assertEquals("Hello", rs.getString(1));
stat.execute("drop table test");
conn2.close();
conn.close();
}
private void testConcurrentUpdate() throws Exception {
Connection conn = getConnection();
final Connection conn2 = getConnection();
Statement stat = conn.createStatement();
final Statement stat2 = conn2.createStatement();
stat2.execute("set lock_timeout 1000");
stat.execute("create table test(id int primary key, name varchar)");
stat.execute("insert into test values(0, 'Hello')");
conn.setAutoCommit(false);
final SQLException[] ex = { null };
Thread t = new Thread() {
public void run() {
try {
stat2.execute("update test set name = 'Hallo'");
} catch (SQLException e) {
ex[0] = e;
}
}
};
stat.execute("update test set name = 'Hi'");
t.start();
Thread.sleep(500);
conn.commit();
t.join();
if (ex[0] != null) {
throw ex[0];
}
ResultSet rs;
rs = stat.executeQuery("select name from test");
rs.next();
assertEquals("Hallo", rs.getString(1));
stat.execute("drop table test");
conn2.close();
conn.close();
}
private void testSelectForUpdate() throws SQLException { private void testSelectForUpdate() throws SQLException {
if (!SysProperties.SELECT_FOR_UPDATE_MVCC) { if (!SysProperties.SELECT_FOR_UPDATE_MVCC) {
return; return;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论