提交 4c28636c authored 作者: Thomas Mueller's avatar Thomas Mueller

MULTI_THREADED did not work correctly.

上级 822498a8
......@@ -434,7 +434,9 @@ public class PageStore implements CacheWriter {
maxMove = Integer.MAX_VALUE;
}
for (int x = lastUsed, j = 0; x > MIN_PAGE_COUNT && j < maxMove; x--, j++) {
compact(x);
synchronized (database) {
compact(x);
}
long now = System.currentTimeMillis();
if (now > start + maxCompactTime) {
break;
......@@ -523,74 +525,73 @@ public class PageStore implements CacheWriter {
if (p != null) {
return p;
}
}
Data data = createData();
readPage(pageId, data);
int type = data.readByte();
if (type == Page.TYPE_EMPTY) {
return null;
}
data.readShortInt();
data.readInt();
if (!checksumTest(data.getBytes(), pageId, pageSize)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "wrong checksum");
}
Page p;
switch (type & ~Page.FLAG_LAST) {
case Page.TYPE_FREE_LIST:
p = PageFreeList.read(this, data, pageId);
break;
case Page.TYPE_DATA_LEAF: {
int indexId = data.readVarInt();
PageDataIndex index = (PageDataIndex) metaObjects.get(indexId);
if (index == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
Data data = createData();
readPage(pageId, data);
int type = data.readByte();
if (type == Page.TYPE_EMPTY) {
return null;
}
p = PageDataLeaf.read(index, data, pageId);
break;
}
case Page.TYPE_DATA_NODE: {
int indexId = data.readVarInt();
PageDataIndex index = (PageDataIndex) metaObjects.get(indexId);
if (index == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
data.readShortInt();
data.readInt();
if (!checksumTest(data.getBytes(), pageId, pageSize)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "wrong checksum");
}
p = PageDataNode.read(index, data, pageId);
break;
}
case Page.TYPE_DATA_OVERFLOW: {
p = PageDataOverflow.read(this, data, pageId);
break;
}
case Page.TYPE_BTREE_LEAF: {
int indexId = data.readVarInt();
PageBtreeIndex index = (PageBtreeIndex) metaObjects.get(indexId);
if (index == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
switch (type & ~Page.FLAG_LAST) {
case Page.TYPE_FREE_LIST:
p = PageFreeList.read(this, data, pageId);
break;
case Page.TYPE_DATA_LEAF: {
int indexId = data.readVarInt();
PageDataIndex index = (PageDataIndex) metaObjects.get(indexId);
if (index == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
p = PageDataLeaf.read(index, data, pageId);
break;
}
p = PageBtreeLeaf.read(index, data, pageId);
break;
}
case Page.TYPE_BTREE_NODE: {
int indexId = data.readVarInt();
PageBtreeIndex index = (PageBtreeIndex) metaObjects.get(indexId);
if (index == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
case Page.TYPE_DATA_NODE: {
int indexId = data.readVarInt();
PageDataIndex index = (PageDataIndex) metaObjects.get(indexId);
if (index == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
p = PageDataNode.read(index, data, pageId);
break;
}
p = PageBtreeNode.read(index, data, pageId);
break;
}
case Page.TYPE_STREAM_TRUNK:
p = PageStreamTrunk.read(this, data, pageId);
break;
case Page.TYPE_STREAM_DATA:
p = PageStreamData.read(this, data, pageId);
break;
default:
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "page=" + pageId + " type=" + type);
case Page.TYPE_DATA_OVERFLOW: {
p = PageDataOverflow.read(this, data, pageId);
break;
}
case Page.TYPE_BTREE_LEAF: {
int indexId = data.readVarInt();
PageBtreeIndex index = (PageBtreeIndex) metaObjects.get(indexId);
if (index == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
p = PageBtreeLeaf.read(index, data, pageId);
break;
}
case Page.TYPE_BTREE_NODE: {
int indexId = data.readVarInt();
PageBtreeIndex index = (PageBtreeIndex) metaObjects.get(indexId);
if (index == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
p = PageBtreeNode.read(index, data, pageId);
break;
}
case Page.TYPE_STREAM_TRUNK:
p = PageStreamTrunk.read(this, data, pageId);
break;
case Page.TYPE_STREAM_DATA:
p = PageStreamData.read(this, data, pageId);
break;
default:
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "page=" + pageId + " type=" + type);
}
cache.put(p);
return p;
}
cache.put(p);
return p;
}
private int getFirstUncommittedSection() {
......@@ -729,15 +730,17 @@ public class PageStore implements CacheWriter {
*/
public void close() {
trace.debug("close");
if (log != null) {
log.close();
log = null;
}
if (file != null) {
try {
file.close();
} finally {
file = null;
synchronized (database) {
if (log != null) {
log.close();
log = null;
}
if (file != null) {
try {
file.close();
} finally {
file = null;
}
}
}
}
......@@ -845,22 +848,24 @@ public class PageStore implements CacheWriter {
return list;
}
}
int p = PAGE_ID_FREE_LIST_ROOT + i * freeListPagesPerList;
while (p >= pageCount) {
increaseFileSize(INCREMENT_PAGES);
}
if (p < pageCount) {
list = (PageFreeList) getPage(p);
}
if (list == null) {
list = PageFreeList.create(this, p);
cache.put(list);
}
while (freeLists.size() <= i) {
freeLists.add(null);
synchronized (database) {
int p = PAGE_ID_FREE_LIST_ROOT + i * freeListPagesPerList;
while (p >= pageCount) {
increaseFileSize(INCREMENT_PAGES);
}
if (p < pageCount) {
list = (PageFreeList) getPage(p);
}
if (list == null) {
list = PageFreeList.create(this, p);
cache.put(list);
}
while (freeLists.size() <= i) {
freeLists.add(null);
}
freeLists.set(i, list);
return list;
}
freeLists.set(i, list);
return list;
}
private void freePage(int pageId) {
......
......@@ -8,9 +8,13 @@ package org.h2.test.db;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Random;
import org.h2.test.TestBase;
import org.h2.util.JdbcUtils;
import org.h2.util.New;
/**
* A multi-threaded test case.
......@@ -41,6 +45,8 @@ public class TestMultiThreadedKernel extends TestBase {
return;
}
deleteDb("multiThreadedKernel");
testCache();
deleteDb("multiThreadedKernel");
final String url = getURL("multiThreadedKernel;DB_CLOSE_DELAY=-1;MULTI_THREADED=1", true);
final String user = getUser(), password = getPassword();
int len = 3;
......@@ -82,4 +88,46 @@ public class TestMultiThreadedKernel extends TestBase {
deleteDb("multiThreadedKernel");
}
private void testCache() throws Exception {
ArrayList<Thread> list = New.arrayList();
int size = 3;
final int count = 100;
final boolean[] stopped = { false };
String url = getURL("multiThreadedKernel;MULTI_THREADED=TRUE;CACHE_SIZE=1", true);
for (int i = 0; i < size; i++) {
final Connection conn = DriverManager.getConnection(url, getUser(), getPassword());
if (i == 0) {
Statement stat = conn.createStatement();
stat.execute("create table test(id int primary key, name varchar) "
+ "as select x, space(3000) from system_range(1, " + count + ")");
}
final Random random = new Random(i);
final int x = i;
Thread t = new Thread() {
public void run() {
try {
PreparedStatement prep = conn.prepareStatement(
"select * from test where id = ?");
Statement stat = conn.createStatement();
while (!stopped[0]) {
prep.setInt(1, random.nextInt(count));
prep.execute();
stat.execute("create table test" + x + "(id int)");
stat.execute("drop table test" + x);
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
t.start();
list.add(t);
}
Thread.sleep(1000);
stopped[0] = true;
for (Thread t : list) {
t.join();
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论