提交 a782a8cd authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 02e83540
......@@ -1213,7 +1213,11 @@ public class Database implements DataHandler {
}
if (pageStore != null) {
if (checkpoint) {
pageStore.checkpoint();
try {
pageStore.checkpoint();
} catch (Throwable e) {
traceSystem.getTrace(Trace.DATABASE).error("close", e);
}
}
}
reconnectModified(false);
......
......@@ -469,6 +469,7 @@ public class LogSystem {
synchronized (database) {
if (pageStore != null) {
pageStore.commit(session);
session.setAllCommitted();
}
if (closed) {
return;
......
......@@ -56,17 +56,6 @@ public class PageFreeList extends Record {
return free + getPos();
}
/**
* Allocate a page at the end of the file
*
* @param min the minimum page number
* @return the page id
*/
int allocateAtEnd(int min) throws SQLException {
int pos = Math.max(min, getLastUsed() + 1);
return allocate(pos);
}
int getLastUsed() {
return used.getLastSetBit() + getPos();
}
......@@ -133,10 +122,6 @@ public class PageFreeList extends Record {
store.writePage(getPos(), data);
}
boolean isFull() {
return full;
}
/**
* Get the number of pages that can fit in a free list.
*
......
......@@ -12,13 +12,16 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
import org.h2.engine.Session;
import org.h2.log.LogSystem;
import org.h2.log.SessionState;
import org.h2.message.Message;
import org.h2.message.Trace;
import org.h2.result.Row;
import org.h2.util.BitField;
import org.h2.util.IntIntHashMap;
import org.h2.util.New;
import org.h2.value.Value;
/**
......@@ -32,6 +35,11 @@ import org.h2.value.Value;
*/
public class PageLog {
/**
* No operation.
*/
public static final int NOOP = 0;
/**
* An undo log entry.
* Format: page id, page.
......@@ -56,6 +64,12 @@ public class PageLog {
*/
public static final int REMOVE = 4;
/**
* Perform a checkpoint. The log id is incremented.
* Format: -
*/
public static final int CHECKPOINT = 5;
private final PageStore store;
private int pos;
private Trace trace;
......@@ -71,6 +85,7 @@ public class PageLog {
private int firstLogId;
private BitField undo = new BitField();
private IntIntHashMap logIdPageMap = new IntIntHashMap();
private HashMap<Integer, SessionState> sessionStates = New.hashMap();
PageLog(PageStore store) {
this.store = store;
......@@ -86,6 +101,7 @@ public class PageLog {
*/
void openForWriting(int firstTrunkPage) throws SQLException {
trace.debug("log openForWriting firstPage:" + firstTrunkPage);
this.firstTrunkPage = firstTrunkPage;
pageOut = new PageOutputStream(store, firstTrunkPage);
pageOut.reserve(1);
store.setLogFirstPage(firstTrunkPage, pageOut.getCurrentDataPageId());
......@@ -93,6 +109,18 @@ public class PageLog {
out = new DataOutputStream(buffer);
}
/**
* Free up all pages allocated by the log.
*/
void free() throws SQLException {
if (this.firstTrunkPage != 0) {
// first remove all old log pages
PageStreamTrunk t = new PageStreamTrunk(store, this.firstTrunkPage);
t.read();
t.free();
}
}
/**
* Open the log for reading.
*
......@@ -140,7 +168,7 @@ public class PageLog {
int tableId = in.readInt();
Row row = readRow(in, data);
if (!undo) {
if (store.isSessionCommitted(sessionId, logId, pos)) {
if (isSessionCommitted(sessionId, logId, pos)) {
if (trace.isDebugEnabled()) {
trace.debug("log redo " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row);
}
......@@ -157,8 +185,12 @@ public class PageLog {
trace.debug("log commit " + sessionId + " pos:" + pos);
}
if (undo) {
store.setLastCommitForSession(sessionId, logId, pos);
setLastCommitForSession(sessionId, logId, pos);
}
} else if (x == NOOP) {
// nothing to do
} else if (x == CHECKPOINT) {
logId++;
} else {
if (trace.isDebugEnabled()) {
trace.debug("log end");
......@@ -166,6 +198,10 @@ public class PageLog {
}
}
}
if (!undo) {
// TODO probably still required for 2 phase commit
sessionStates = New.hashMap();
}
} catch (EOFException e) {
trace.debug("log recovery stopped: " + e.toString());
} catch (IOException e) {
......@@ -299,13 +335,19 @@ public class PageLog {
/**
* Switch to a new log id.
*
* @throws SQLException
*/
void checkpoint() {
void checkpoint() throws SQLException {
try {
out.write(CHECKPOINT);
flushOut();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
undo = new BitField();
logId++;
pageOut.fillDataPage();
int currentDataPage = pageOut.getCurrentDataPageId();
logIdPageMap.put(logId, currentDataPage);
logId++;
}
int getLogId() {
......@@ -322,11 +364,11 @@ public class PageLog {
* @param firstUncommittedLog the first log id to keep
*/
void removeUntil(int firstUncommittedLog) throws SQLException {
if (firstUncommittedLog == logId) {
if (firstUncommittedLog == 0) {
return;
}
int firstDataPageToKeep = logIdPageMap.get(firstUncommittedLog);
trace.debug("log.removeUntil " + firstDataPageToKeep);
while (true) {
// TODO keep trunk page in the cache
PageStreamTrunk t = new PageStreamTrunk(store, firstTrunkPage);
......@@ -335,10 +377,14 @@ public class PageLog {
store.setLogFirstPage(t.getPos(), firstDataPageToKeep);
break;
}
firstTrunkPage = t.getNextTrunk();
t.free();
}
while (firstLogId < firstUncommittedLog) {
logIdPageMap.remove(firstLogId);
if (firstLogId > 0) {
// there is no entry for log 0
logIdPageMap.remove(firstLogId);
}
firstLogId++;
}
}
......@@ -358,4 +404,52 @@ public class PageLog {
}
}
/**
* Check if the session committed after than the given position.
*
* @param sessionId the session id
* @param logId the log file id
* @param pos the position in the log file
* @return true if it is committed
*/
private boolean isSessionCommitted(int sessionId, int logId, int pos) {
SessionState state = sessionStates.get(sessionId);
if (state == null) {
return false;
}
return state.isCommitted(logId, pos);
}
/**
* Set the last commit record for a session.
*
* @param sessionId the session id
* @param logId the log file id
* @param pos the position in the log file
*/
private void setLastCommitForSession(int sessionId, int logId, int pos) {
SessionState state = getOrAddSessionState(sessionId);
state.lastCommitLog = logId;
state.lastCommitPos = pos;
state.inDoubtTransaction = null;
}
/**
* Get the session state for this session. A new object is created if there
* is no session state yet.
*
* @param sessionId the session id
* @return the session state object
*/
private SessionState getOrAddSessionState(int sessionId) {
Integer key = sessionId;
SessionState state = sessionStates.get(key);
if (state == null) {
state = new SessionState();
sessionStates.put(key, state);
state.sessionId = sessionId;
}
return state;
}
}
......@@ -20,7 +20,6 @@ public class PageOutputStream extends OutputStream {
private PageStore store;
private final Trace trace;
private int firstTrunkPageId;
private int trunkPageId;
private int trunkNext;
private IntArray reservedPages = new IntArray();
......@@ -42,7 +41,6 @@ public class PageOutputStream extends OutputStream {
this.trace = store.getTrace();
this.store = store;
this.trunkPageId = trunkPage;
firstTrunkPageId = trunkPage;
}
/**
......@@ -58,10 +56,6 @@ public class PageOutputStream extends OutputStream {
int pages = PageStreamTrunk.getPagesAddressed(pageSize);
// allocate x data pages
int pagesToAllocate = pages;
// the first trunk page is already allocated
if (reservedPages.size() == 0) {
reservedPages.add(trunkPageId);
}
int totalCapacity = pages * capacityPerPage;
while (totalCapacity < minBuffer) {
pagesToAllocate += pagesToAllocate;
......@@ -73,6 +67,7 @@ public class PageOutputStream extends OutputStream {
int page = store.allocatePage();
reservedPages.add(page);
}
reserved += totalCapacity;
if (data == null) {
initNextData();
}
......@@ -92,10 +87,7 @@ public class PageOutputStream extends OutputStream {
int nextData = trunk == null ? -1 : trunk.getNextDataPage();
if (nextData == -1) {
int parent = trunkPageId;
if (trunkNext == 0) {
trunkPageId = reservedPages.get(0);
reservedPages.remove(0);
} else {
if (trunkNext != 0) {
trunkPageId = trunkNext;
}
int len = PageStreamTrunk.getPagesAddressed(store.getPageSize());
......@@ -106,7 +98,7 @@ public class PageOutputStream extends OutputStream {
trunkNext = reservedPages.get(len);
trunk = new PageStreamTrunk(store, parent, trunkPageId, trunkNext, pageIds);
trunk.write(null);
reservedPages.removeRange(0, len);
reservedPages.removeRange(0, len + 1);
nextData = trunk.getNextDataPage();
}
data = new PageStreamData(store, nextData, trunk.getPos());
......@@ -126,7 +118,7 @@ public class PageOutputStream extends OutputStream {
while (len > 0) {
int l = data.write(b, off, len);
if (l < len) {
data.write(null);
storePage();
initNextData();
}
reserved -= l;
......@@ -165,8 +157,20 @@ public class PageOutputStream extends OutputStream {
store = null;
}
public int getCurrentDataPageId() {
int getCurrentDataPageId() {
return data.getPos();
}
/**
* Fill the data page with zeros and write it.
* This is required for a checkpoint.
*/
void fillDataPage() throws SQLException {
if (trace.isDebugEnabled()) {
trace.debug("pageOut.storePage fill " + data.getPos());
}
data.write(null);
initNextData();
}
}
......@@ -121,8 +121,8 @@ public class PageStore implements CacheWriter {
*/
public static final int PAGE_SIZE_DEFAULT = 1024;
private static final int PAGE_ID_FREE_LIST_ROOT = 2;
private static final int PAGE_ID_META_ROOT = 3;
private static final int PAGE_ID_FREE_LIST_ROOT = 3;
private static final int PAGE_ID_META_ROOT = 4;
private static final int PAGE_ID_LOG_TRUNK = 5;
private static final int INCREMENT_PAGES = 128;
......@@ -150,7 +150,6 @@ public class PageStore implements CacheWriter {
private int freeListPagesPerList;
private boolean recoveryRunning;
private HashMap<Integer, SessionState> sessionStates = New.hashMap();
/**
* The file size in bytes.
......@@ -237,6 +236,8 @@ public class PageStore implements CacheWriter {
recover(true);
recover(false);
recoveryRunning = true;
log.free();
logFirstTrunkPage = allocatePage();
log.openForWriting(logFirstTrunkPage);
recoveryRunning = false;
checkpoint();
......@@ -278,6 +279,7 @@ public class PageStore implements CacheWriter {
for (CacheObject rec : list) {
writeBack(rec);
}
log.checkpoint();
switchLog();
// TODO shrink file if required here
// int pageCount = getFreeList().getLastUsed() + 1;
......@@ -287,7 +289,7 @@ public class PageStore implements CacheWriter {
}
private void switchLog() throws SQLException {
trace.debug("switchLogIfPossible");
trace.debug("switchLog");
if (database.isReadOnly()) {
return;
}
......@@ -302,7 +304,12 @@ public class PageStore implements CacheWriter {
}
}
}
log.removeUntil(firstUncommittedLog);
try {
log.removeUntil(firstUncommittedLog);
} catch (SQLException e) {
int test;
e.printStackTrace();
}
}
private void readStaticHeader() throws SQLException {
......@@ -480,15 +487,6 @@ public class PageStore implements CacheWriter {
}
}
/**
* Allocate a page.
*
* @return the page id
*/
public int allocatePage() throws SQLException {
return allocatePage(false);
}
private PageFreeList getFreeList(int i) throws SQLException {
int p;
if (i == 0) {
......@@ -512,41 +510,31 @@ public class PageStore implements CacheWriter {
}
private void freePage(int pageId) throws SQLException {
if (pageId < 0) {
System.out.println("stop");
}
PageFreeList list = getFreeList(pageId / freeListPagesPerList);
list.free(pageId);
}
private void allocatePage(int pageId) throws SQLException {
PageFreeList list = getFreeList(pageId / freeListPagesPerList);
list.allocate(pageId % freeListPagesPerList);
list.allocate(pageId);
}
/**
* Allocate a new page.
* Allocate a page.
*
* @param atEnd allocate at the end of the file
* @return the page id
*/
int allocatePage(boolean atEnd) throws SQLException {
public int allocatePage() throws SQLException {
int pos;
if (atEnd) {
PageFreeList list = getFreeList(pageCount / freeListPagesPerList);
pos = list.getLastUsed() + 1;
list.allocate(pos);
} else {
// TODO could remember the first possible free list page
for (int i = 0;; i++) {
PageFreeList list = getFreeList(i);
if (!list.isFull()) {
pos = list.allocate();
break;
}
// TODO could remember the first possible free list page
for (int i = 0;; i++) {
PageFreeList list = getFreeList(i);
pos = list.allocate();
if (pos >= 0) {
break;
}
}
if (pos > pageCount) {
if (pos >= pageCount) {
increaseFileSize(INCREMENT_PAGES);
}
return pos;
......@@ -654,7 +642,10 @@ public class PageStore implements CacheWriter {
* @param data the data
*/
public void writePage(int pageId, DataPage data) throws SQLException {
file.seek(pageId << pageSizeShift);
if ((pageId << pageSizeShift) <= 0) {
System.out.println("stop");
}
file.seek(((long) pageId) << pageSizeShift);
file.write(data.getBytes(), 0, pageSize);
}
......@@ -691,8 +682,6 @@ public class PageStore implements CacheWriter {
log.recover(undo);
if (!undo) {
switchLog();
int todoProbablyStillRequiredForTwoPhaseCommit;
sessionStates = New.hashMap();
}
} catch (SQLException e) {
int test;
......@@ -743,54 +732,6 @@ public class PageStore implements CacheWriter {
log.commit(session);
}
/**
* Get the session state for this session. A new object is created if there
* is no session state yet.
*
* @param sessionId the session id
* @return the session state object
*/
private SessionState getOrAddSessionState(int sessionId) {
Integer key = sessionId;
SessionState state = sessionStates.get(key);
if (state == null) {
state = new SessionState();
sessionStates.put(key, state);
state.sessionId = sessionId;
}
return state;
}
/**
* Set the last commit record for a session.
*
* @param sessionId the session id
* @param logId the log file id
* @param pos the position in the log file
*/
void setLastCommitForSession(int sessionId, int logId, int pos) {
SessionState state = getOrAddSessionState(sessionId);
state.lastCommitLog = logId;
state.lastCommitPos = pos;
state.inDoubtTransaction = null;
}
/**
* Check if the session contains uncommitted log entries at the given position.
*
* @param sessionId the session id
* @param logId the log file id
* @param pos the position in the log file
* @return true if this session contains an uncommitted transaction
*/
boolean isSessionCommitted(int sessionId, int logId, int pos) {
SessionState state = sessionStates.get(sessionId);
if (state == null) {
return true;
}
return state.isCommitted(logId, pos);
}
/**
* Get the position of the system table head.
*
......
......@@ -35,6 +35,10 @@ public class PageStreamData extends Record {
setPos(pageId);
this.store = store;
this.trunk = trunk;
int test;
if(pageId==5) {
System.out.println("stop!");
}
}
/**
......
......@@ -128,10 +128,14 @@ public class PageStreamTrunk extends Record {
* Free this page and all data pages.
*/
void free() throws SQLException {
DataPage empty = store.createDataPage();
store.freePage(getPos(), false, null);
for (int i = 0; i < pageCount; i++) {
store.freePage(pageIds[i], false, null);
int page = pageIds[i];
store.freePage(page, false, null);
store.writePage(page, empty);
}
store.writePage(getPos(), empty);
}
}
......@@ -841,6 +841,10 @@ public class Recover extends Tool implements DataHandler {
} else if (x == PageLog.COMMIT) {
int sessionId = in.readInt();
writer.println("-- commit " + sessionId);
} else if (x == PageLog.NOOP) {
// nothing to do
} else if (x == PageLog.CHECKPOINT) {
writer.println("-- checkpoint");
} else {
writer.println("-- end " + x);
break;
......
......@@ -318,7 +318,7 @@ public class IntArray {
*/
public void removeRange(int fromIndex, int toIndex) {
if (SysProperties.CHECK) {
if (fromIndex > toIndex || toIndex >= size) {
if (fromIndex > toIndex || toIndex > size) {
throw new ArrayIndexOutOfBoundsException("from=" + fromIndex + " to=" + toIndex + " size=" + size);
}
}
......
......@@ -295,8 +295,6 @@ java org.h2.test.TestAll timer
test what is wrong with -Djava.net.preferIPv6Addresses=true
download checksums (auto-verify every day)
test case for running out of disk space (using a special file system)
auto-build: prepare release
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论