提交 ea7c55c1 authored 作者: Thomas Mueller's avatar Thomas Mueller

When using the MULTI_THREADED option, concurrently reading from a database could…

When using the MULTI_THREADED option, concurrently reading from a database could throw an exception.
上级 851e0b85
......@@ -323,23 +323,25 @@ public class PageBtreeIndex extends PageIndex {
* @return the row
*/
SearchRow readRow(Data data, int offset, boolean onlyPosition, boolean needData) {
data.setPos(offset);
long key = data.readVarLong();
if (onlyPosition) {
if (needData) {
return tableData.getRow(null, key);
synchronized (data) {
data.setPos(offset);
long key = data.readVarLong();
if (onlyPosition) {
if (needData) {
return tableData.getRow(null, key);
}
SearchRow row = table.getTemplateSimpleRow(true);
row.setKey(key);
return row;
}
SearchRow row = table.getTemplateSimpleRow(true);
SearchRow row = table.getTemplateSimpleRow(columns.length == 1);
row.setKey(key);
for (Column col : columns) {
int idx = col.getColumnId();
row.setValue(idx, data.readValue());
}
return row;
}
SearchRow row = table.getTemplateSimpleRow(columns.length == 1);
row.setKey(key);
for (Column col : columns) {
int idx = col.getColumnId();
row.setValue(idx, data.readValue());
}
return row;
}
/**
......
......@@ -122,12 +122,12 @@ abstract class PageData extends Page {
* Get a cursor.
*
* @param session the session
* @param min the smallest key
* @param max the largest key
* @param minKey the smallest key
* @param maxKey the largest key
* @param multiVersion if the delta should be used
* @return the cursor
*/
abstract Cursor find(Session session, long min, long max, boolean multiVersion);
abstract Cursor find(Session session, long minKey, long maxKey, boolean multiVersion);
/**
* Get the key at this position.
......
......@@ -19,16 +19,16 @@ class PageDataCursor implements Cursor {
private PageDataLeaf current;
private int idx;
private final long max;
private final long maxKey;
private Row row;
private final boolean multiVersion;
private final Session session;
private Iterator<Row> delta;
PageDataCursor(Session session, PageDataLeaf current, int idx, long max, boolean multiVersion) {
PageDataCursor(Session session, PageDataLeaf current, int idx, long maxKey, boolean multiVersion) {
this.current = current;
this.idx = idx;
this.max = max;
this.maxKey = maxKey;
this.multiVersion = multiVersion;
this.session = session;
if (multiVersion) {
......@@ -73,9 +73,9 @@ class PageDataCursor implements Cursor {
private boolean checkMax() {
if (row != null) {
if (max != Long.MAX_VALUE) {
long x = current.index.getLong(row, Long.MAX_VALUE);
if (x > max) {
if (maxKey != Long.MAX_VALUE) {
long x = current.index.getKey(row, Long.MAX_VALUE);
if (x > maxKey) {
row = null;
return false;
}
......
......@@ -239,13 +239,13 @@ public class PageDataIndex extends PageIndex {
}
/**
* Get the key from the row
* Get the key from the row.
*
* @param row the row
* @param ifEmpty the value to use if the row is empty
* @return the key
*/
long getLong(SearchRow row, long ifEmpty) {
long getKey(SearchRow row, long ifEmpty) {
if (row == null) {
return ifEmpty;
}
......@@ -402,13 +402,17 @@ public class PageDataIndex extends PageIndex {
* Read a row from the data page at the given position.
*
* @param data the data page
* @param pos the position to read from
* @param columnCount the number of columns
* @return the row
*/
Row readRow(Data data, int columnCount) {
Row readRow(Data data, int pos, int columnCount) {
Value[] values = new Value[columnCount];
for (int i = 0; i < columnCount; i++) {
values[i] = data.readValue();
synchronized (data) {
data.setPos(pos);
for (int i = 0; i < columnCount; i++) {
values[i] = data.readValue();
}
}
return tableData.createRow(values);
}
......
......@@ -306,9 +306,9 @@ public class PageDataLeaf extends PageData {
rows = remove(rows, entryCount + 1, i);
}
Cursor find(Session session, long min, long max, boolean multiVersion) {
int x = find(min);
return new PageDataCursor(session, this, x, max, multiVersion);
Cursor find(Session session, long minKey, long maxKey, boolean multiVersion) {
int x = find(minKey);
return new PageDataCursor(session, this, x, maxKey, multiVersion);
}
/**
......@@ -321,8 +321,7 @@ public class PageDataLeaf extends PageData {
Row r = rows[at];
if (r == null) {
if (firstOverflowPageId == 0) {
data.setPos(offsets[at]);
r = index.readRow(data, columnCount);
r = index.readRow(data, offsets[at], columnCount);
} else {
if (rowRef != null) {
r = rowRef.get();
......@@ -341,8 +340,7 @@ public class PageDataLeaf extends PageData {
next = page.readInto(buff);
} while (next != 0);
overflowRowSize = pageSize + buff.length();
buff.setPos(0);
r = index.readRow(buff, columnCount);
r = index.readRow(buff, 0, columnCount);
}
r.setKey(keys[at]);
if (firstOverflowPageId != 0) {
......
......@@ -158,10 +158,10 @@ public class PageDataNode extends PageData {
}
}
Cursor find(Session session, long min, long max, boolean multiVersion) {
int x = find(min);
Cursor find(Session session, long minKey, long maxKey, boolean multiVersion) {
int x = find(minKey);
int child = childPageIds[x];
return index.getPage(child, getPos()).find(session, min, max, multiVersion);
return index.getPage(child, getPos()).find(session, minKey, maxKey, multiVersion);
}
PageData split(int splitPoint) {
......
......@@ -53,8 +53,8 @@ public class PageDelegateIndex extends PageIndex {
}
public Cursor find(Session session, SearchRow first, SearchRow last) {
long min = mainIndex.getLong(first, Long.MIN_VALUE);
long max = mainIndex.getLong(last, Long.MAX_VALUE);
long min = mainIndex.getKey(first, Long.MIN_VALUE);
long max = mainIndex.getKey(last, Long.MAX_VALUE);
return mainIndex.find(session, min, max, false);
}
......
......@@ -45,6 +45,7 @@ public class TestMultiThreadedKernel extends TestBase {
return;
}
deleteDb("multiThreadedKernel");
testConcurrentRead();
testCache();
deleteDb("multiThreadedKernel");
final String url = getURL("multiThreadedKernel;DB_CLOSE_DELAY=-1;MULTI_THREADED=1", true);
......@@ -88,6 +89,45 @@ public class TestMultiThreadedKernel extends TestBase {
deleteDb("multiThreadedKernel");
}
private void testConcurrentRead() throws Exception {
ArrayList<Thread> list = New.arrayList();
int size = 2;
final int count = 1000;
final boolean[] stopped = { false };
String url = getURL("multiThreadedKernel;MULTI_THREADED=TRUE;CACHE_SIZE=16", true);
for (int i = 0; i < size; i++) {
final Connection conn = DriverManager.getConnection(url, getUser(), getPassword());
if (i == 0) {
Statement stat = conn.createStatement();
stat.execute("drop table test if exists");
stat.execute("create table test(id int primary key, name varchar) "
+ "as select x, x || space(10) from system_range(1, " + count + ")");
}
final Random random = new Random(i);
Thread t = new Thread() {
public void run() {
try {
PreparedStatement prep = conn.prepareStatement(
"select * from test where id = ?");
while (!stopped[0]) {
prep.setInt(1, random.nextInt(count));
prep.execute();
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
list.add(t);
}
Thread.sleep(1000);
stopped[0] = true;
for (Thread t : list) {
t.join();
}
}
private void testCache() throws Exception {
ArrayList<Thread> list = New.arrayList();
int size = 3;
......@@ -98,6 +138,7 @@ public class TestMultiThreadedKernel extends TestBase {
final Connection conn = DriverManager.getConnection(url, getUser(), getPassword());
if (i == 0) {
Statement stat = conn.createStatement();
stat.execute("drop table test if exists");
stat.execute("create table test(id int primary key, name varchar) "
+ "as select x, space(3000) from system_range(1, " + count + ")");
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论