提交 aa3c5565 authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 75403d55
...@@ -57,10 +57,6 @@ public class PageFreeList extends Record { ...@@ -57,10 +57,6 @@ public class PageFreeList extends Record {
return free + getPos(); return free + getPos();
} }
int getLastUsed() {
return used.getLastSetBit() + getPos();
}
/** /**
* Mark a page as used. * Mark a page as used.
* *
......
...@@ -27,7 +27,7 @@ public class PageInputStream extends InputStream { ...@@ -27,7 +27,7 @@ public class PageInputStream extends InputStream {
private int remaining; private int remaining;
private byte[] buffer = new byte[1]; private byte[] buffer = new byte[1];
public PageInputStream(PageStore store, int trunkPage, int dataPage) { PageInputStream(PageStore store, int trunkPage, int dataPage) {
this.store = store; this.store = store;
this.trace = store.getTrace(); this.trace = store.getTrace();
this.trunkNext = trunkPage; this.trunkNext = trunkPage;
...@@ -110,7 +110,7 @@ public class PageInputStream extends InputStream { ...@@ -110,7 +110,7 @@ public class PageInputStream extends InputStream {
/** /**
* Set all pages as 'allocated' in the page store. * Set all pages as 'allocated' in the page store.
*/ */
public void allocateAllPages() throws SQLException { void allocateAllPages() throws SQLException {
int trunkPage = trunkNext; int trunkPage = trunkNext;
while (trunkPage != 0) { while (trunkPage != 0) {
store.allocatePage(trunkPage); store.allocatePage(trunkPage);
......
...@@ -21,6 +21,7 @@ import org.h2.message.Message; ...@@ -21,6 +21,7 @@ import org.h2.message.Message;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.util.BitField; import org.h2.util.BitField;
import org.h2.util.IntArray;
import org.h2.util.IntIntHashMap; import org.h2.util.IntIntHashMap;
import org.h2.util.New; import org.h2.util.New;
import org.h2.util.ObjectArray; import org.h2.util.ObjectArray;
...@@ -85,6 +86,12 @@ public class PageLog { ...@@ -85,6 +86,12 @@ public class PageLog {
*/ */
public static final int CHECKPOINT = 7; public static final int CHECKPOINT = 7;
/**
* Free a log page.
* Format: count, page ids
*/
public static final int FREE_LOG = 8;
/** /**
* The recovery stage to undo changes (re-apply the backup). * The recovery stage to undo changes (re-apply the backup).
*/ */
...@@ -261,6 +268,14 @@ public class PageLog { ...@@ -261,6 +268,14 @@ public class PageLog {
// nothing to do // nothing to do
} else if (x == CHECKPOINT) { } else if (x == CHECKPOINT) {
logId++; logId++;
} else if (x == FREE_LOG) {
int count = in.readInt();
for (int i = 0; i < count; i++) {
int pageId = in.readInt();
if (stage == RECOVERY_STAGE_REDO) {
store.freePage(pageId, false, null);
}
}
} else { } else {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log end"); trace.debug("log end");
...@@ -273,9 +288,6 @@ public class PageLog { ...@@ -273,9 +288,6 @@ public class PageLog {
} catch (IOException e) { } catch (IOException e) {
throw Message.convertIOException(e, "recover"); throw Message.convertIOException(e, "recover");
} }
if (stage == RECOVERY_STAGE_REDO) {
sessionStates = New.hashMap();
}
} }
/** /**
...@@ -346,6 +358,22 @@ public class PageLog { ...@@ -346,6 +358,22 @@ public class PageLog {
} }
} }
private void freeLogPages(IntArray pages) throws SQLException {
try {
if (trace.isDebugEnabled()) {
trace.debug("log frees " + pages.get(0) + ".." + pages.get(pages.size() - 1));
}
out.write(FREE_LOG);
out.writeInt(pages.size());
for (int i = 0; i < pages.size(); i++) {
out.writeInt(pages.get(i));
}
flushOut();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
private void flushOut() throws IOException { private void flushOut() throws IOException {
out.flush(); out.flush();
pageOut.write(buffer.toByteArray()); pageOut.write(buffer.toByteArray());
...@@ -417,32 +445,6 @@ public class PageLog { ...@@ -417,32 +445,6 @@ public class PageLog {
} }
} }
/**
* Rollback a prepared transaction.
*
* @param session the session
*/
void rollbackPrepared(int sessionId) throws SQLException {
try {
if (trace.isDebugEnabled()) {
trace.debug("log rollback prepared s:" + sessionId);
}
LogSystem log = store.getDatabase().getLog();
if (log == null) {
// database already closed
return;
}
out.write(ROLLBACK);
out.writeInt(sessionId);
flushOut();
if (log.getFlushOnEachCommit()) {
flush();
}
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/** /**
* A record is added to a table, or removed from a table. * A record is added to a table, or removed from a table.
* *
...@@ -507,10 +509,6 @@ public class PageLog { ...@@ -507,10 +509,6 @@ public class PageLog {
return logId; return logId;
} }
int getLogPos() {
return logPos;
}
/** /**
* Remove all pages until the given log (excluding). * Remove all pages until the given log (excluding).
* *
...@@ -521,7 +519,7 @@ public class PageLog { ...@@ -521,7 +519,7 @@ public class PageLog {
return; return;
} }
int firstDataPageToKeep = logIdPageMap.get(firstUncommittedLog); int firstDataPageToKeep = logIdPageMap.get(firstUncommittedLog);
firstTrunkPage = pageOut.removeUntil(firstTrunkPage, firstDataPageToKeep); firstTrunkPage = removeUntil(firstTrunkPage, firstDataPageToKeep);
store.setLogFirstPage(firstTrunkPage, firstDataPageToKeep); store.setLogFirstPage(firstTrunkPage, firstDataPageToKeep);
while (firstLogId < firstUncommittedLog) { while (firstLogId < firstUncommittedLog) {
if (firstLogId > 0) { if (firstLogId > 0) {
...@@ -532,6 +530,37 @@ public class PageLog { ...@@ -532,6 +530,37 @@ public class PageLog {
} }
} }
/**
* Remove all pages until the given data page.
*
* @param firstTrunkPage the first trunk page
* @param firstDataPageToKeep the first data page to keep
* @return the trunk page of the data page to keep
*/
private int removeUntil(int firstTrunkPage, int firstDataPageToKeep) throws SQLException {
trace.debug("log.removeUntil " + firstDataPageToKeep);
while (true) {
// TODO keep trunk page in the cache
PageStreamTrunk t = new PageStreamTrunk(store, firstTrunkPage);
t.read();
if (t.contains(firstDataPageToKeep)) {
return t.getPos();
}
firstTrunkPage = t.getNextTrunk();
IntArray list = new IntArray();
list.add(t.getPos());
while (true) {
int next = t.getNextPageData();
if (next == -1) {
break;
}
list.add(next);
}
freeLogPages(list);
pageOut.free(t);
}
}
/** /**
* Close the log. * Close the log.
*/ */
...@@ -585,7 +614,7 @@ public class PageLog { ...@@ -585,7 +614,7 @@ public class PageLog {
* @param sessionId the session id * @param sessionId the session id
* @return the session state object * @return the session state object
*/ */
SessionState getOrAddSessionState(int sessionId) { private SessionState getOrAddSessionState(int sessionId) {
Integer key = sessionId; Integer key = sessionId;
SessionState state = sessionStates.get(key); SessionState state = sessionStates.get(key);
if (state == null) { if (state == null) {
...@@ -637,14 +666,11 @@ public class PageLog { ...@@ -637,14 +666,11 @@ public class PageLog {
d.write(null); d.write(null);
} }
void truncate() throws SQLException { /**
do { * Called after the recvovery has been completed.
// TODO keep trunk page in the cache */
PageStreamTrunk t = new PageStreamTrunk(store, firstTrunkPage); void recoverEnd() {
t.read(); sessionStates = New.hashMap();
firstTrunkPage = t.getNextTrunk();
t.free();
} while (firstTrunkPage != 0);
} }
} }
...@@ -183,27 +183,6 @@ public class PageOutputStream extends OutputStream { ...@@ -183,27 +183,6 @@ public class PageOutputStream extends OutputStream {
initNextData(); initNextData();
} }
/**
* Remove all pages until the given data page.
*
* @param firstTrunkPage the first trunk page
* @param firstDataPageToKeep the first data page to keep
* @return the trunk page of the data page to keep
*/
int removeUntil(int firstTrunkPage, int firstDataPageToKeep) throws SQLException {
trace.debug("log.removeUntil " + firstDataPageToKeep);
while (true) {
// TODO keep trunk page in the cache
PageStreamTrunk t = new PageStreamTrunk(store, firstTrunkPage);
t.read();
if (t.contains(firstDataPageToKeep)) {
return t.getPos();
}
firstTrunkPage = t.getNextTrunk();
pages -= t.free();
}
}
long getSize() { long getSize() {
return pages * store.getPageSize(); return pages * store.getPageSize();
} }
...@@ -217,4 +196,13 @@ public class PageOutputStream extends OutputStream { ...@@ -217,4 +196,13 @@ public class PageOutputStream extends OutputStream {
remaining = 0; remaining = 0;
} }
/**
* Remove a trunk page from the stream.
*
* @param t the trunk page
*/
void free(PageStreamTrunk t) throws SQLException {
pages -= t.free();
}
} }
...@@ -87,7 +87,6 @@ public class PageStore implements CacheWriter { ...@@ -87,7 +87,6 @@ public class PageStore implements CacheWriter {
// (input stream, free list, extend pages...) // (input stream, free list, extend pages...)
// at runtime and recovery // at runtime and recovery
// synchronized correctly (on the index?) // synchronized correctly (on the index?)
// TODO two phase commit: append (not patch) commit & rollback
// TODO remove trace or use isDebugEnabled // TODO remove trace or use isDebugEnabled
// TODO recover tool: don't re-do uncommitted operations // TODO recover tool: don't re-do uncommitted operations
// TODO no need to log old page if it was always empty // TODO no need to log old page if it was always empty
...@@ -98,7 +97,6 @@ public class PageStore implements CacheWriter { ...@@ -98,7 +97,6 @@ public class PageStore implements CacheWriter {
// and delay on each commit // and delay on each commit
// TODO var int: see google protocol buffers // TODO var int: see google protocol buffers
// TODO PageData and PageBtree addRowTry: try to simplify // TODO PageData and PageBtree addRowTry: try to simplify
// TODO space re-use: run TestPerformance multiple times, size should stay
// TODO test running out of disk space (using a special file system) // TODO test running out of disk space (using a special file system)
// TODO check for file size (exception if not exact size expected) // TODO check for file size (exception if not exact size expected)
...@@ -110,7 +108,6 @@ public class PageStore implements CacheWriter { ...@@ -110,7 +108,6 @@ public class PageStore implements CacheWriter {
// remove Record.getByteCount // remove Record.getByteCount
// remove Database.objectIds // remove Database.objectIds
/** /**
* The smallest possible page size. * The smallest possible page size.
*/ */
...@@ -329,9 +326,6 @@ public class PageStore implements CacheWriter { ...@@ -329,9 +326,6 @@ public class PageStore implements CacheWriter {
private void switchLog() throws SQLException { private void switchLog() throws SQLException {
trace.debug("switchLog"); trace.debug("switchLog");
if (database.isReadOnly()) {
return;
}
Session[] sessions = database.getSessions(true); Session[] sessions = database.getSessions(true);
int firstUncommittedLog = log.getLogId(); int firstUncommittedLog = log.getLogId();
for (int i = 0; i < sessions.length; i++) { for (int i = 0; i < sessions.length; i++) {
...@@ -665,7 +659,7 @@ public class PageStore implements CacheWriter { ...@@ -665,7 +659,7 @@ public class PageStore implements CacheWriter {
* @param pos the page id * @param pos the page id
* @param page the page * @param page the page
*/ */
public void readPage(int pos, Data page) throws SQLException { void readPage(int pos, Data page) throws SQLException {
synchronized (database) { synchronized (database) {
if (pos >= pageCount) { if (pos >= pageCount) {
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, pos + " of " + pageCount); throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, pos + " of " + pageCount);
...@@ -732,11 +726,13 @@ public class PageStore implements CacheWriter { ...@@ -732,11 +726,13 @@ public class PageStore implements CacheWriter {
openMetaIndex(); openMetaIndex();
readMetaData(); readMetaData();
log.recover(PageLog.RECOVERY_STAGE_REDO); log.recover(PageLog.RECOVERY_STAGE_REDO);
if (log.getInDoubtTransactions().size() == 0) { if (!database.isReadOnly()) {
log.truncate(); if (log.getInDoubtTransactions().size() == 0) {
switchLog(); log.recoverEnd();
} else { switchLog();
database.setReadOnly(true); } else {
database.setReadOnly(true);
}
} }
recoveryRunning = false; recoveryRunning = false;
PageScanIndex index = (PageScanIndex) metaObjects.get(0); PageScanIndex index = (PageScanIndex) metaObjects.get(0);
......
...@@ -44,7 +44,7 @@ public class PageStreamTrunk extends Record { ...@@ -44,7 +44,7 @@ public class PageStreamTrunk extends Record {
this.pageIds = pageIds; this.pageIds = pageIds;
} }
public PageStreamTrunk(PageStore store, int pageId) { PageStreamTrunk(PageStore store, int pageId) {
setPos(pageId); setPos(pageId);
this.store = store; this.store = store;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论