提交 84318143 authored 作者: Thomas Mueller's avatar Thomas Mueller

Issue 354: when using the multi-threaded kernel option, and multiple threads…

Issue 354: when using the multi-threaded kernel option, and multiple threads concurrently prepared SQL statements that use the same view or the same index of a view, then in some cases an infinite loop could occur.
上级 34e74b80
......@@ -19,7 +19,10 @@ Change Log
<h1>Change Log</h1>
<h2>Next Version (unreleased)</h2>
<ul><li>Issue 350: when using instead of triggers, executeUpdate for delete operations always returned 0.
<ul><li>Issue 354: when using the multi-threaded kernel option,
and multiple threads concurrently prepared SQL statements that use the same view
or the same index of a view, then in some cases an infinite loop could occur.
</li><li>Issue 350: when using instead of triggers, executeUpdate for delete operations always returned 0.
</li><li>Some timestamps with timezone were not converted correctly.
For example, in the PST timezone, the timestamp 2011-10-26 08:00:00Z was converted to
2011-10-25 25:00:00 instead of 2011-10-26 01:00:00.
......
......@@ -25,6 +25,7 @@ import org.h2.table.TableView;
import org.h2.util.IntArray;
import org.h2.util.New;
import org.h2.util.SmallLRUCache;
import org.h2.util.SynchronizedVerifier;
import org.h2.util.Utils;
import org.h2.value.Value;
......@@ -105,11 +106,12 @@ public class ViewIndex extends BaseIndex {
double cost;
}
public double getCost(Session session, int[] masks) {
public synchronized double getCost(Session session, int[] masks) {
if (recursive) {
return 1000;
}
IntArray masksArray = new IntArray(masks == null ? Utils.EMPTY_INT_ARRAY : masks);
SynchronizedVerifier.check(costCache);
CostElement cachedCost = costCache.get(masksArray);
if (cachedCost != null) {
long time = System.currentTimeMillis();
......
......@@ -29,6 +29,7 @@ import org.h2.util.New;
import org.h2.util.SmallLRUCache;
import org.h2.util.StatementBuilder;
import org.h2.util.StringUtils;
import org.h2.util.SynchronizedVerifier;
import org.h2.util.Utils;
import org.h2.value.Value;
......@@ -83,12 +84,13 @@ public class TableView extends Table {
}
}
private void init(String querySQL, ArrayList<Parameter> params,
private synchronized void init(String querySQL, ArrayList<Parameter> params,
String[] columnNames, Session session, boolean recursive) {
this.querySQL = querySQL;
this.columnNames = columnNames;
this.recursive = recursive;
index = new ViewIndex(this, querySQL, params, recursive);
SynchronizedVerifier.check(indexCache);
indexCache.clear();
initColumnsAndTables(session);
}
......@@ -109,7 +111,7 @@ public class TableView extends Table {
* @return the exception if re-compiling this or any dependent view failed
* (only when force is disabled)
*/
public DbException recompile(Session session, boolean force) {
public synchronized DbException recompile(Session session, boolean force) {
try {
compileViewQuery(session, querySQL);
} catch (DbException e) {
......@@ -121,6 +123,7 @@ public class TableView extends Table {
if (views != null) {
views = New.arrayList(views);
}
SynchronizedVerifier.check(indexCache);
indexCache.clear();
initColumnsAndTables(session);
if (views != null) {
......@@ -196,10 +199,11 @@ public class TableView extends Table {
return createException != null;
}
public PlanItem getBestPlanItem(Session session, int[] masks) {
public synchronized PlanItem getBestPlanItem(Session session, int[] masks) {
PlanItem item = new PlanItem();
item.cost = index.getCost(session, masks);
IntArray masksArray = new IntArray(masks == null ? Utils.EMPTY_INT_ARRAY : masks);
SynchronizedVerifier.check(indexCache);
ViewIndex i2 = indexCache.get(masksArray);
if (i2 == null || i2.getSession() != session) {
i2 = new ViewIndex(this, index, session, masks);
......
/*
* Copyright 2004-2011 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.util;
import java.util.Collections;
import java.util.HashMap;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* A utility class that allows to verify access to a resource is synchronized.
*/
public class SynchronizedVerifier {
private static volatile boolean enabled;
private static final Map<Class<?>, AtomicBoolean> DETECT =
Collections.synchronizedMap(new HashMap<Class<?>, AtomicBoolean>());
private static final Map<Object, Object> CURRENT =
Collections.synchronizedMap(new IdentityHashMap<Object, Object>());
/**
* Enable or disable detection for a given class.
*
* @param clazz the class
* @param value the new value (true means detection is enabled)
*/
public static void setDetect(Class<?> clazz, boolean value) {
if (value) {
DETECT.put(clazz, new AtomicBoolean());
} else {
AtomicBoolean b = DETECT.remove(clazz);
if (b == null) {
throw new AssertionError("Detection was not enabled");
} else if (!b.get()) {
throw new AssertionError("No object of this class was tested");
}
}
enabled = DETECT.size() > 0;
}
/**
* Verify the object is not accessed concurrently.
*
* @param o the object
*/
public static void check(Object o) {
if (enabled) {
detectConcurrentAccess(o);
}
}
private static void detectConcurrentAccess(Object o) {
AtomicBoolean value = DETECT.get(o.getClass());
if (value != null) {
value.set(true);
if (CURRENT.remove(o) != null) {
throw new AssertionError("Concurrent access");
}
CURRENT.put(o, o);
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// ignore
}
Object old = CURRENT.remove(o);
if (old == null) {
throw new AssertionError("Concurrent access");
}
}
}
}
......@@ -14,6 +14,8 @@ import java.sql.Statement;
import java.util.Random;
import org.h2.test.TestAll;
import org.h2.test.TestBase;
import org.h2.util.SmallLRUCache;
import org.h2.util.SynchronizedVerifier;
import org.h2.util.Task;
/**
......@@ -49,11 +51,48 @@ public class TestMultiThread extends TestBase implements Runnable {
}
public void test() throws Exception {
testConcurrentView();
testConcurrentAlter();
testConcurrentAnalyze();
testConcurrentInsertUpdateSelect();
}
private void testConcurrentView() throws Exception {
String db = "concurrentView";
deleteDb(db);
final String url = getURL(db + ";MULTI_THREADED=1", true);
final Random r = new Random();
Connection conn = getConnection(url);
Statement stat = conn.createStatement();
StringBuilder buff = new StringBuilder();
buff.append("create table test(id int");
final int len = 3;
for (int i = 0; i < len; i++) {
buff.append(", x" + i + " int");
}
buff.append(")");
stat.execute(buff.toString());
stat.execute("create view test_view as select * from test");
stat.execute("insert into test(id) select x from system_range(1, 2)");
Task t = new Task() {
public void call() throws Exception {
Connection c2 = getConnection(url);
while (!stop) {
c2.prepareStatement("select * from test_view where x" + r.nextInt(len) + "=1");
}
c2.close();
}
};
t.execute();
SynchronizedVerifier.setDetect(SmallLRUCache.class, true);
for (int i = 0; i < 1000; i++) {
conn.prepareStatement("select * from test_view where x" + r.nextInt(len) + "=1");
}
t.get();
SynchronizedVerifier.setDetect(SmallLRUCache.class, false);
conn.close();
}
private void testConcurrentAlter() throws Exception {
deleteDb("concurrentAlter");
final Connection conn = getConnection("concurrentAlter");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论