提交 9b8db89c authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 a782a8cd
...@@ -451,6 +451,9 @@ public class Database implements DataHandler { ...@@ -451,6 +451,9 @@ public class Database implements DataHandler {
* @return true if one exists * @return true if one exists
*/ */
public static boolean exists(String name) { public static boolean exists(String name) {
if (SysProperties.PAGE_STORE) {
return FileUtils.exists(name + Constants.SUFFIX_PAGE_FILE);
}
return FileUtils.exists(name + Constants.SUFFIX_DATA_FILE); return FileUtils.exists(name + Constants.SUFFIX_DATA_FILE);
} }
......
...@@ -147,6 +147,10 @@ abstract class PageBtree extends Record { ...@@ -147,6 +147,10 @@ abstract class PageBtree extends Record {
* @return the row * @return the row
*/ */
SearchRow getRow(int at) throws SQLException { SearchRow getRow(int at) throws SQLException {
int test;
if (at < 0) {
System.out.println("stop");
}
SearchRow row = rows[at]; SearchRow row = rows[at];
if (row == null) { if (row == null) {
row = index.readRow(data, offsets[at]); row = index.readRow(data, offsets[at]);
......
...@@ -60,8 +60,11 @@ public class PageBtreeIndex extends BaseIndex { ...@@ -60,8 +60,11 @@ public class PageBtreeIndex extends BaseIndex {
this.headPos = headPos; this.headPos = headPos;
PageBtree root = getPage(headPos); PageBtree root = getPage(headPos);
rowCount = root.getRowCount(); rowCount = root.getRowCount();
if (!database.isReadOnly()) {
// could have been created before, but never committed // could have been created before, but never committed
// TODO test if really required
store.updateRecord(root, false, null); store.updateRecord(root, false, null);
}
int reuseKeysIfManyDeleted; int reuseKeysIfManyDeleted;
} }
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
...@@ -307,6 +310,10 @@ public class PageBtreeIndex extends BaseIndex { ...@@ -307,6 +310,10 @@ public class PageBtreeIndex extends BaseIndex {
* @param row the row to write * @param row the row to write
*/ */
void writeRow(DataPage data, int offset, SearchRow row) throws SQLException { void writeRow(DataPage data, int offset, SearchRow row) throws SQLException {
if (offset < 0) {
int test;
System.out.println("stop");
}
data.setPos(offset); data.setPos(offset);
data.writeInt(row.getPos()); data.writeInt(row.getPos());
for (Column col : columns) { for (Column col : columns) {
......
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
package org.h2.index; package org.h2.index;
import java.sql.SQLException; import java.sql.SQLException;
import org.h2.constant.ErrorCode;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.result.SearchRow; import org.h2.result.SearchRow;
import org.h2.store.DataPage; import org.h2.store.DataPage;
...@@ -73,6 +74,9 @@ class PageBtreeNode extends PageBtree { ...@@ -73,6 +74,9 @@ class PageBtreeNode extends PageBtree {
return (entryCount / 2) + 1; return (entryCount / 2) + 1;
} }
int offset = last - rowLength; int offset = last - rowLength;
if(offset < 0) {
throw Message.getSQLException(ErrorCode.FEATURE_NOT_SUPPORTED_1, "Wide indexes");
}
int[] newOffsets = new int[entryCount + 1]; int[] newOffsets = new int[entryCount + 1];
SearchRow[] newRows = new SearchRow[entryCount + 1]; SearchRow[] newRows = new SearchRow[entryCount + 1];
int[] newChildPageIds = new int[entryCount + 2]; int[] newChildPageIds = new int[entryCount + 2];
...@@ -105,7 +109,7 @@ class PageBtreeNode extends PageBtree { ...@@ -105,7 +109,7 @@ class PageBtreeNode extends PageBtree {
int addRow(SearchRow row) throws SQLException { int addRow(SearchRow row) throws SQLException {
while (true) { while (true) {
int x = find(row, false, true); int x = find(row, false, false);
PageBtree page = index.getPage(childPageIds[x]); PageBtree page = index.getPage(childPageIds[x]);
int splitPoint = page.addRow(row); int splitPoint = page.addRow(row);
if (splitPoint == 0) { if (splitPoint == 0) {
...@@ -323,7 +327,8 @@ class PageBtreeNode extends PageBtree { ...@@ -323,7 +327,8 @@ class PageBtreeNode extends PageBtree {
return; return;
} }
PageBtreeNode next = (PageBtreeNode) index.getPage(parentPageId); PageBtreeNode next = (PageBtreeNode) index.getPage(parentPageId);
next.nextPage(cursor, getRow(entryCount - 1)); SearchRow r = entryCount == 0 ? row : getRow(entryCount - 1);
next.nextPage(cursor, r);
return; return;
} }
PageBtree page = index.getPage(childPageIds[i]); PageBtree page = index.getPage(childPageIds[i]);
......
...@@ -69,7 +69,10 @@ public class PageScanIndex extends BaseIndex implements RowIndex { ...@@ -69,7 +69,10 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
lastKey = root.getLastKey(); lastKey = root.getLastKey();
rowCount = root.getRowCount(); rowCount = root.getRowCount();
// could have been created before, but never committed // could have been created before, but never committed
if (!database.isReadOnly()) {
// TODO check if really required
store.updateRecord(root, false, null); store.updateRecord(root, false, null);
}
int reuseKeysIfManyDeleted; int reuseKeysIfManyDeleted;
} }
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
......
...@@ -107,4 +107,24 @@ public class PageInputStream extends InputStream { ...@@ -107,4 +107,24 @@ public class PageInputStream extends InputStream {
remaining = data.getLength(); remaining = data.getLength();
} }
public void allocateAllPages() throws SQLException {
int trunkPage = trunkNext;
while (trunkPage != 0) {
store.allocatePage(trunkPage);
PageStreamTrunk t = new PageStreamTrunk(store, trunkPage);
t.read();
while (true) {
int n = t.getNextDataPage();
if (n == -1) {
break;
}
store.allocatePage(n);
}
trunkPage = t.getNextTrunk();
if (trunkPage != 0) {
break;
}
}
}
} }
...@@ -35,6 +35,21 @@ import org.h2.value.Value; ...@@ -35,6 +35,21 @@ import org.h2.value.Value;
*/ */
public class PageLog { public class PageLog {
/**
* The recovery stage to undo changes (re-apply the backup).
*/
static final int RECOVERY_STAGE_UNDO = 0;
/**
* The recovery stage to allocate pages used by the transaction log.
*/
static final int RECOVERY_STAGE_ALLOCATE = 1;
/**
* The recovery stage to redo operations.
*/
static final int RECOVERY_STAGE_REDO = 2;
/** /**
* No operation. * No operation.
*/ */
...@@ -133,15 +148,21 @@ public class PageLog { ...@@ -133,15 +148,21 @@ public class PageLog {
} }
/** /**
* Run the recovery process. There are two recovery stages: first only the * Run one recovery stage. There are three recovery stages: 0: only the undo
* undo steps are run (restoring the state before the last checkpoint). In * steps are run (restoring the state before the last checkpoint). 1: the
* the second stage the committed operations are re-applied. * pages that are used by the transaction log are allocated. 2: the
* committed operations are re-applied.
* *
* @param undo true if the undo step should be run * @param stage the recovery stage
*/ */
void recover(boolean undo) throws SQLException { void recover(int stage) throws SQLException {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log recover undo:" + undo); trace.debug("log recover stage:" + stage);
}
if (stage == RECOVERY_STAGE_ALLOCATE) {
PageInputStream in = new PageInputStream(store, firstTrunkPage, firstDataPage);
in.allocateAllPages();
return;
} }
in = new DataInputStream(new PageInputStream(store, firstTrunkPage, firstDataPage)); in = new DataInputStream(new PageInputStream(store, firstTrunkPage, firstDataPage));
int logId = 0; int logId = 0;
...@@ -157,7 +178,7 @@ public class PageLog { ...@@ -157,7 +178,7 @@ public class PageLog {
if (x == UNDO) { if (x == UNDO) {
int pageId = in.readInt(); int pageId = in.readInt();
in.readFully(data.getBytes(), 0, store.getPageSize()); in.readFully(data.getBytes(), 0, store.getPageSize());
if (undo) { if (stage == RECOVERY_STAGE_UNDO) {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log undo " + pageId); trace.debug("log undo " + pageId);
} }
...@@ -167,7 +188,7 @@ public class PageLog { ...@@ -167,7 +188,7 @@ public class PageLog {
int sessionId = in.readInt(); int sessionId = in.readInt();
int tableId = in.readInt(); int tableId = in.readInt();
Row row = readRow(in, data); Row row = readRow(in, data);
if (!undo) { if (stage == RECOVERY_STAGE_REDO) {
if (isSessionCommitted(sessionId, logId, pos)) { if (isSessionCommitted(sessionId, logId, pos)) {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log redo " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row); trace.debug("log redo " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row);
...@@ -184,7 +205,7 @@ public class PageLog { ...@@ -184,7 +205,7 @@ public class PageLog {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log commit " + sessionId + " pos:" + pos); trace.debug("log commit " + sessionId + " pos:" + pos);
} }
if (undo) { if (stage == RECOVERY_STAGE_UNDO) {
setLastCommitForSession(sessionId, logId, pos); setLastCommitForSession(sessionId, logId, pos);
} }
} else if (x == NOOP) { } else if (x == NOOP) {
...@@ -198,7 +219,7 @@ public class PageLog { ...@@ -198,7 +219,7 @@ public class PageLog {
} }
} }
} }
if (!undo) { if (stage == RECOVERY_STAGE_REDO) {
// TODO probably still required for 2 phase commit // TODO probably still required for 2 phase commit
sessionStates = New.hashMap(); sessionStates = New.hashMap();
} }
......
...@@ -169,6 +169,7 @@ public class PageOutputStream extends OutputStream { ...@@ -169,6 +169,7 @@ public class PageOutputStream extends OutputStream {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("pageOut.storePage fill " + data.getPos()); trace.debug("pageOut.storePage fill " + data.getPos());
} }
reserved -= data.getRemaining();
data.write(null); data.write(null);
initNextData(); initNextData();
} }
......
...@@ -20,7 +20,6 @@ import org.h2.index.IndexType; ...@@ -20,7 +20,6 @@ import org.h2.index.IndexType;
import org.h2.index.PageBtreeIndex; import org.h2.index.PageBtreeIndex;
import org.h2.index.PageScanIndex; import org.h2.index.PageScanIndex;
import org.h2.log.LogSystem; import org.h2.log.LogSystem;
import org.h2.log.SessionState;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.message.TraceSystem; import org.h2.message.TraceSystem;
...@@ -67,9 +66,8 @@ import org.h2.value.ValueString; ...@@ -67,9 +66,8 @@ import org.h2.value.ValueString;
*/ */
public class PageStore implements CacheWriter { public class PageStore implements CacheWriter {
// TODO currently working on PageLog.removeUntil // TODO currently working on PageBtreeNode Wide indexes
// TODO unlimited number of log streams (TestPageStoreDb) // TODO implement redo log in Recover tool
// TODO check if PageLog.reservePages is required - yes it is - change it
// TODO PageStore.openMetaIndex (desc and nulls first / last) // TODO PageStore.openMetaIndex (desc and nulls first / last)
// TODO btree index with fixed size values doesn't need offset and so on // TODO btree index with fixed size values doesn't need offset and so on
...@@ -233,14 +231,15 @@ public class PageStore implements CacheWriter { ...@@ -233,14 +231,15 @@ public class PageStore implements CacheWriter {
readVariableHeader(); readVariableHeader();
log = new PageLog(this); log = new PageLog(this);
log.openForReading(logFirstTrunkPage, logFirstDataPage); log.openForReading(logFirstTrunkPage, logFirstDataPage);
recover(true); recover();
recover(false); if (!database.isReadOnly()) {
recoveryRunning = true; recoveryRunning = true;
log.free(); log.free();
logFirstTrunkPage = allocatePage(); logFirstTrunkPage = allocatePage();
log.openForWriting(logFirstTrunkPage); log.openForWriting(logFirstTrunkPage);
recoveryRunning = false; recoveryRunning = false;
checkpoint(); checkpoint();
}
} else { } else {
// new // new
setPageSize(PAGE_SIZE_DEFAULT); setPageSize(PAGE_SIZE_DEFAULT);
...@@ -476,7 +475,6 @@ public class PageStore implements CacheWriter { ...@@ -476,7 +475,6 @@ public class PageStore implements CacheWriter {
record.setChanged(true); record.setChanged(true);
int pos = record.getPos(); int pos = record.getPos();
allocatePage(pos); allocatePage(pos);
// getFreeList().allocate(pos);
cache.update(pos, record); cache.update(pos, record);
if (logUndo && !recoveryRunning) { if (logUndo && !recoveryRunning) {
if (old == null) { if (old == null) {
...@@ -514,7 +512,7 @@ public class PageStore implements CacheWriter { ...@@ -514,7 +512,7 @@ public class PageStore implements CacheWriter {
list.free(pageId); list.free(pageId);
} }
private void allocatePage(int pageId) throws SQLException { void allocatePage(int pageId) throws SQLException {
PageFreeList list = getFreeList(pageId / freeListPagesPerList); PageFreeList list = getFreeList(pageId / freeListPagesPerList);
list.allocate(pageId); list.allocate(pageId);
} }
...@@ -642,9 +640,6 @@ public class PageStore implements CacheWriter { ...@@ -642,9 +640,6 @@ public class PageStore implements CacheWriter {
* @param data the data * @param data the data
*/ */
public void writePage(int pageId, DataPage data) throws SQLException { public void writePage(int pageId, DataPage data) throws SQLException {
if ((pageId << pageSizeShift) <= 0) {
System.out.println("stop");
}
file.seek(((long) pageId) << pageSizeShift); file.seek(((long) pageId) << pageSizeShift);
file.write(data.getBytes(), 0, pageSize); file.write(data.getBytes(), 0, pageSize);
} }
...@@ -663,26 +658,21 @@ public class PageStore implements CacheWriter { ...@@ -663,26 +658,21 @@ public class PageStore implements CacheWriter {
} }
/** /**
* Run one recovery stage. There are two recovery stages: first (undo is * Run one recovery stage. There are three recovery stages: 0: only the undo
* true) only the undo steps are run (restoring the state before the last * steps are run (restoring the state before the last checkpoint). 1: the
* checkpoint). In the second stage (undo is false) the committed operations * pages that are used by the transaction log are allocated. 2: the
* are re-applied. * committed operations are re-applied.
*
* @param undo true if the undo step should be run
*/ */
private void recover(boolean undo) throws SQLException { private void recover() throws SQLException {
trace.debug("log recover #" + undo); trace.debug("log recover");
try { try {
recoveryRunning = true; recoveryRunning = true;
if (!undo) { log.recover(PageLog.RECOVERY_STAGE_UNDO);
log.recover(PageLog.RECOVERY_STAGE_ALLOCATE);
openMetaIndex(); openMetaIndex();
readMetaData(); readMetaData();
} log.recover(PageLog.RECOVERY_STAGE_REDO);
log.recover(undo);
if (!undo) {
switchLog(); switchLog();
}
} catch (SQLException e) { } catch (SQLException e) {
int test; int test;
e.printStackTrace(); e.printStackTrace();
...@@ -694,7 +684,6 @@ public class PageStore implements CacheWriter { ...@@ -694,7 +684,6 @@ public class PageStore implements CacheWriter {
} finally { } finally {
recoveryRunning = false; recoveryRunning = false;
} }
if (!undo) {
PageScanIndex index = (PageScanIndex) metaObjects.get(0); PageScanIndex index = (PageScanIndex) metaObjects.get(0);
if (index == null) { if (index == null) {
systemTableHeadPos = Index.EMPTY_HEAD; systemTableHeadPos = Index.EMPTY_HEAD;
...@@ -707,7 +696,6 @@ public class PageStore implements CacheWriter { ...@@ -707,7 +696,6 @@ public class PageStore implements CacheWriter {
metaObjects = null; metaObjects = null;
trace.debug("log recover done"); trace.debug("log recover done");
} }
}
/** /**
* A record is added to a table, or removed from a table. * A record is added to a table, or removed from a table.
......
...@@ -35,10 +35,6 @@ public class PageStreamData extends Record { ...@@ -35,10 +35,6 @@ public class PageStreamData extends Record {
setPos(pageId); setPos(pageId);
this.store = store; this.store = store;
this.trunk = trunk; this.trunk = trunk;
int test;
if(pageId==5) {
System.out.println("stop!");
}
} }
/** /**
...@@ -117,4 +113,13 @@ public class PageStreamData extends Record { ...@@ -117,4 +113,13 @@ public class PageStreamData extends Record {
data.read(buff, off, len); data.read(buff, off, len);
} }
/**
* Get the number of remaining data bytes of this page.
*
* @return the remaining byte count
*/
int getRemaining() {
return remaining;
}
} }
\ No newline at end of file
...@@ -852,9 +852,10 @@ public class Recover extends Tool implements DataHandler { ...@@ -852,9 +852,10 @@ public class Recover extends Tool implements DataHandler {
} }
} }
private void setStorage(int storageId) { private String setStorage(int storageId) {
this.storageId = storageId; this.storageId = storageId;
this.storageName = String.valueOf(storageId).replace('-', 'M'); this.storageName = "O_" + String.valueOf(storageId).replace('-', 'M');
return storageName;
} }
/** /**
...@@ -1070,7 +1071,7 @@ public class Recover extends Tool implements DataHandler { ...@@ -1070,7 +1071,7 @@ public class Recover extends Tool implements DataHandler {
private void writeRow(PrintWriter writer, DataPage s, Value[] data) { private void writeRow(PrintWriter writer, DataPage s, Value[] data) {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO O_" + storageName + " VALUES("); sb.append("INSERT INTO " + storageName + " VALUES(");
for (valueId = 0; valueId < recordLength; valueId++) { for (valueId = 0; valueId < recordLength; valueId++) {
try { try {
Value v = s.readValue(); Value v = s.readValue();
...@@ -1224,11 +1225,13 @@ public class Recover extends Tool implements DataHandler { ...@@ -1224,11 +1225,13 @@ public class Recover extends Tool implements DataHandler {
Integer objectId = entry.getKey(); Integer objectId = entry.getKey();
String name = entry.getValue(); String name = entry.getValue();
if (objectIdSet.contains(objectId)) { if (objectIdSet.contains(objectId)) {
writer.println("INSERT INTO " + name + " SELECT * FROM O_" + objectId + ";"); setStorage(objectId);
writer.println("INSERT INTO " + name + " SELECT * FROM " + storageName + ";");
} }
} }
for (Integer objectId : objectIdSet) { for (Integer objectId : objectIdSet) {
writer.println("DROP TABLE O_" + objectId + ";"); setStorage(objectId);
writer.println("DROP TABLE " + storageName + ";");
} }
writer.println("DROP ALIAS READ_CLOB;"); writer.println("DROP ALIAS READ_CLOB;");
writer.println("DROP ALIAS READ_BLOB;"); writer.println("DROP ALIAS READ_BLOB;");
...@@ -1244,7 +1247,7 @@ public class Recover extends Tool implements DataHandler { ...@@ -1244,7 +1247,7 @@ public class Recover extends Tool implements DataHandler {
private void createTemporaryTable(PrintWriter writer) { private void createTemporaryTable(PrintWriter writer) {
if (!objectIdSet.contains(storageId)) { if (!objectIdSet.contains(storageId)) {
objectIdSet.add(storageId); objectIdSet.add(storageId);
StatementBuilder buff = new StatementBuilder("CREATE TABLE O_"); StatementBuilder buff = new StatementBuilder("CREATE TABLE ");
buff.append(storageName).append('('); buff.append(storageName).append('(');
for (int i = 0; i < recordLength; i++) { for (int i = 0; i < recordLength; i++) {
buff.appendExceptFirst(", "); buff.appendExceptFirst(", ");
......
...@@ -289,6 +289,7 @@ java org.h2.test.TestAll timer ...@@ -289,6 +289,7 @@ java org.h2.test.TestAll timer
// 2009-05-15: 25 tests fail with page store (first loop) // 2009-05-15: 25 tests fail with page store (first loop)
// 2009-05-18: 18 tests fail with page store (first loop) // 2009-05-18: 18 tests fail with page store (first loop)
// 2009-05-30: 15 tests fail with page store (first loop) // 2009-05-30: 15 tests fail with page store (first loop)
// 2009-06-16: 13 tests fail with page store (first loop)
// System.setProperty("h2.pageStore", "true"); // System.setProperty("h2.pageStore", "true");
/* /*
......
...@@ -445,12 +445,20 @@ public class TestTools extends TestBase { ...@@ -445,12 +445,20 @@ public class TestTools extends TestBase {
private void testServer() throws SQLException { private void testServer() throws SQLException {
Connection conn; Connection conn;
deleteDb("test"); deleteDb("test");
Server server = Server.createTcpServer(new String[] { "-baseDir", baseDir, "-tcpPort", "9192", "-tcpAllowOthers" }).start(); Server server = Server.createTcpServer(
new String[] {
"-baseDir", baseDir,
"-tcpPort", "9192",
"-tcpAllowOthers" }).start();
conn = DriverManager.getConnection("jdbc:h2:tcp://localhost:9192/test", "sa", ""); conn = DriverManager.getConnection("jdbc:h2:tcp://localhost:9192/test", "sa", "");
conn.close(); conn.close();
server.stop(); server.stop();
Server.createTcpServer( Server.createTcpServer(
new String[] { "-ifExists", "-tcpPassword", "abc", "-baseDir", baseDir, "-tcpPort", "9192" }).start(); new String[] {
"-ifExists",
"-tcpPassword", "abc",
"-baseDir", baseDir,
"-tcpPort", "9192" }).start();
try { try {
conn = DriverManager.getConnection("jdbc:h2:tcp://localhost:9192/test2", "sa", ""); conn = DriverManager.getConnection("jdbc:h2:tcp://localhost:9192/test2", "sa", "");
fail("should not be able to create new db"); fail("should not be able to create new db");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论