提交 2376a5e1 authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 9f307295
......@@ -22,6 +22,7 @@ public class FileStoreOutputStream extends OutputStream {
private DataPage page;
private String compressionAlgorithm;
private CompressTool compress;
private byte[] buffer = new byte[1];
public FileStoreOutputStream(FileStore store, DataHandler handler, String compressionAlgorithm) {
this.store = store;
......@@ -32,6 +33,11 @@ public class FileStoreOutputStream extends OutputStream {
page = DataPage.create(handler, Constants.FILE_BLOCK_SIZE);
}
public void write(int b) throws IOException {
buffer[0] = (byte) b;
write(buffer);
}
public void write(byte[] buff) throws IOException {
write(buff, 0, buff.length);
}
......@@ -75,8 +81,4 @@ public class FileStoreOutputStream extends OutputStream {
}
}
public void write(int b) throws IOException {
throw new IOException("this method is not implemented");
}
}
......@@ -31,6 +31,7 @@ public class PageInputStream extends InputStream {
private DataPage page;
private boolean endOfFile;
private int remaining;
private byte[] buffer = new byte[1];
public PageInputStream(PageStore store, int parentPage, int headPage, int type) {
this.store = store;
......@@ -41,9 +42,8 @@ public class PageInputStream extends InputStream {
}
public int read() throws IOException {
byte[] b = new byte[1];
int len = read(b);
return len < 0 ? -1 : (b[0] & 255);
int len = read(buffer);
return len < 0 ? -1 : (buffer[0] & 255);
}
public int read(byte[] b) throws IOException {
......@@ -94,6 +94,7 @@ public class PageInputStream extends InputStream {
boolean last = (t & Page.FLAG_LAST) != 0;
t &= ~Page.FLAG_LAST;
if (type != t || p != parentPage) {
int todoNeedBetterWayToDetectEOF;
throw Message.getSQLException(
ErrorCode.FILE_CORRUPTED_1,
"page:" +nextPage+ " type:" + t + " parent:" + p +
......
......@@ -21,7 +21,11 @@ import org.h2.value.Value;
/**
* Transaction log mechanism.
* The data format is:
* The format is:
* <ul><li>0-3: log id
* </li><li>records
* </li></ul>
* The data format for a record is:
* <ul><li>0-0: type (0: undo,...)
* </li><li>1-4: page id
* </li><li>5-: data
......@@ -58,11 +62,14 @@ public class PageLog {
*/
public static final int REMOVE = 4;
private PageStore store;
private final PageStore store;
private int id;
private int pos;
private Trace trace;
private PageOutputStream pageOut;
private DataOutputStream out;
private DataInputStream in;
private int firstPage;
private DataPage data;
private long operation;
......@@ -76,13 +83,38 @@ public class PageLog {
}
/**
* Open the log file for writing. For an existing database, the recovery
* Open the log for writing. For an existing database, the recovery
* must be run first.
*
* @param id the log id
*/
void openForWriting() {
trace.debug("log openForWriting");
void openForWriting(int id) throws SQLException {
this.id = id;
trace.debug("log openForWriting " + id + " firstPage:" + firstPage);
pageOut = new PageOutputStream(store, 0, firstPage, Page.TYPE_LOG, true);
out = new DataOutputStream(pageOut);
try {
out.writeInt(id);
out.flush();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/**
* Open the log for reading. This will also read the log id.
*
* @return the log id
*/
int openForReading() throws SQLException {
in = new DataInputStream(new PageInputStream(store, 0, firstPage, Page.TYPE_LOG));
try {
id = in.readInt();
trace.debug("log openForReading " + id + " firstPage:" + firstPage + " id:" + id);
return id;
} catch (IOException e) {
return 0;
}
}
/**
......@@ -93,14 +125,18 @@ public class PageLog {
* @param undo true if the undo step should be run
*/
void recover(boolean undo) throws SQLException {
DataInputStream in = new DataInputStream(new PageInputStream(store, 0, firstPage, Page.TYPE_LOG));
if (trace.isDebugEnabled()) {
trace.debug("log recover " + id + " undo:" + undo);
}
DataPage data = store.createDataPage();
try {
pos = 0;
while (true) {
int x = in.read();
if (x < 0) {
break;
}
pos++;
if (x == NO_OP) {
// nothing to do
} else if (x == UNDO) {
......@@ -118,16 +154,22 @@ public class PageLog {
Row row = readRow(in, data);
if (!undo) {
Database db = store.getDatabase();
if (store.isSessionCommitted(sessionId, id, pos)) {
if (trace.isDebugEnabled()) {
trace.debug("log redo " + (x == ADD ? "+" : "-") + " " + row);
}
db.redo(tableId, row, x == ADD);
}
}
} else if (x == COMMIT) {
in.readInt();
int sessionId = in.readInt();
if (undo) {
store.setLastCommitForSession(sessionId, id, pos);
}
}
}
} catch (Exception e) {
e.printStackTrace();
int todoOnlyIOExceptionAndSQLException;
int todoSomeExceptionAreOkSomeNot;
trace.debug("log recovery stopped: " + e.toString());
......@@ -170,6 +212,9 @@ public class PageLog {
if (undo.get(pageId)) {
return;
}
if (trace.isDebugEnabled()) {
trace.debug("log undo " + pageId);
}
out.write(UNDO);
out.writeInt(pageId);
out.write(page.getBytes(), 0, store.getPageSize());
......@@ -209,7 +254,7 @@ public class PageLog {
try {
if (trace.isDebugEnabled()) {
trace.debug("log " + (add?"+":"-") + " table:" + tableId +
" remaining:" + pageOut.getRemainingBytes() + " row:" + row);
" row:" + row);
}
int todoLogPosShouldBeLong;
session.addLogPos(0, (int) operation);
......@@ -228,14 +273,31 @@ public class PageLog {
}
}
/**
* Close the log.
*/
void close() throws SQLException {
try {
trace.debug("log close " + id);
if (out != null) {
out.close();
}
out = null;
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/**
* Close the log, truncate it, and re-open it.
*
* @param id the new log id
*/
void reopen() throws SQLException {
private void reopen(int id) throws SQLException {
try {
trace.debug("log reopen");
out.close();
openForWriting();
openForWriting(id);
flush();
int todoDeleteOrReUsePages;
} catch (IOException e) {
......@@ -246,21 +308,25 @@ public class PageLog {
/**
* Flush the transaction log.
*/
private void flush() throws SQLException {
void flush() throws SQLException {
try {
int todoUseLessSpace;
trace.debug("log flush");
out.flush();
int filler = pageOut.getRemainingBytes();
for (int i = 0; i < filler; i++) {
out.writeByte(NO_OP);
}
out.flush();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/**
* Get the log id.
*
* @return the log id
*/
int getId() {
return id;
}
/**
* Flush and close the log.
*/
......
......@@ -20,13 +20,15 @@ public class PageOutputStream extends OutputStream {
private final Trace trace;
private PageStore store;
private int parentPage;
private int type;
private int parentPage;
private int pageId;
private int nextPage;
private DataPage page;
private int remaining;
private final boolean allocateAtEnd;
private byte[] buffer = new byte[1];
private boolean needFlush;
/**
* Create a new page output stream.
......@@ -40,7 +42,7 @@ public class PageOutputStream extends OutputStream {
this.trace = store.getTrace();
this.store = store;
this.parentPage = parentPage;
this.nextPage = headPage;
this.pageId = headPage;
this.type = type;
this.allocateAtEnd = allocateAtEnd;
page = store.createDataPage();
......@@ -48,8 +50,8 @@ public class PageOutputStream extends OutputStream {
}
public void write(int b) throws IOException {
int todoOptimizeIfNeeded;
write(new byte[] { (byte) b });
buffer[0] = (byte) b;
write(buffer);
}
public void write(byte[] b) throws IOException {
......@@ -72,18 +74,21 @@ public class PageOutputStream extends OutputStream {
page.write(b, off, remaining);
off += remaining;
len -= remaining;
parentPage = nextPage;
pageId = nextPage;
try {
nextPage = store.allocatePage(allocateAtEnd);
} catch (SQLException e) {
throw Message.convertToIOException(e);
}
page.setInt(5, nextPage);
page.setPos(4);
page.writeByte((byte) type);
page.writeInt(nextPage);
storePage();
parentPage = pageId;
pageId = nextPage;
initPage();
}
page.write(b, off, len);
needFlush = true;
remaining -= len;
}
......@@ -98,26 +103,21 @@ public class PageOutputStream extends OutputStream {
}
}
public void close() throws IOException {
public void flush() throws IOException {
if (needFlush) {
int len = page.length();
page.setPos(4);
page.writeByte((byte) (type | Page.FLAG_LAST));
page.writeInt(store.getPageSize() - remaining - 9);
pageId = nextPage;
page.setPos(len);
storePage();
store = null;
needFlush = false;
}
public void flush() throws IOException {
int todo;
}
/**
* Get the number of remaining bytes that fit in the current page.
*
* @return the number of bytes
*/
public int getRemainingBytes() {
return remaining;
public void close() throws IOException {
flush();
store = null;
}
}
......@@ -9,15 +9,16 @@ package org.h2.store;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
import java.util.HashMap;
import org.h2.constant.ErrorCode;
import org.h2.engine.Database;
import org.h2.engine.Session;
import org.h2.index.Page;
import org.h2.log.SessionState;
import org.h2.message.Message;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.result.Row;
import org.h2.util.BitField;
import org.h2.util.Cache;
import org.h2.util.Cache2Q;
import org.h2.util.CacheLRU;
......@@ -25,6 +26,7 @@ import org.h2.util.CacheObject;
import org.h2.util.CacheWriter;
import org.h2.util.FileUtils;
import org.h2.util.ObjectArray;
import org.h2.util.ObjectUtils;
/**
* This class represents a file that is organized as a number of pages. The
......@@ -60,6 +62,7 @@ public class PageStore implements CacheWriter {
// at runtime and recovery
// synchronized correctly (on the index?)
// TODO two phase commit: append (not patch) commit & rollback
// TODO remove trace or use isDebugEnabled
/**
* The smallest possible page size.
......@@ -97,6 +100,7 @@ public class PageStore implements CacheWriter {
private int activeLog;
private int[] logRootPageIds = new int[LOG_COUNT];
private boolean recoveryRunning;
private HashMap sessionStates = new HashMap();
/**
* The file size in bytes.
......@@ -144,7 +148,7 @@ public class PageStore implements CacheWriter {
this.database = database;
trace = database.getTrace(Trace.PAGE_STORE);
int test;
// trace.setLevel(TraceSystem.DEBUG);
trace.setLevel(TraceSystem.DEBUG);
this.cacheSize = cacheSizeDefault;
String cacheType = database.getCacheType();
if (Cache2Q.TYPE_NAME.equals(cacheType)) {
......@@ -190,15 +194,6 @@ public class PageStore implements CacheWriter {
pageCount = (int) (fileLength / pageSize);
initLogs();
lastUsedPage = pageCount - 1;
while (true) {
DataPage page = readPage(lastUsedPage);
page.readInt();
int type = page.readByte();
if (type != Page.TYPE_EMPTY) {
break;
}
lastUsedPage--;
}
} else {
isNew = true;
setPageSize(PAGE_SIZE_DEFAULT);
......@@ -210,14 +205,16 @@ public class PageStore implements CacheWriter {
for (int i = 0; i < LOG_COUNT; i++) {
logRootPageIds[i] = 3 + i;
}
lastUsedPage = pageCount;
lastUsedPage = 3 + LOG_COUNT;
int todoShouldBeOneMoreStartWith0;
pageCount = lastUsedPage;
increaseFileSize(INCREMENT_PAGES - pageCount);
writeHeader();
initLogs();
getLog().openForWriting(0);
switchLogIfPossible();
getLog().flush();
}
getLog().openForWriting();
} catch (SQLException e) {
close();
throw e;
......@@ -255,9 +252,14 @@ public class PageStore implements CacheWriter {
file.setLength(pageSize * pageCount);
}
private void switchLogIfPossible() {
int nextLogId = (activeLog + 1) % LOG_COUNT;
PageLog nextLog = logs[nextLogId];
private void switchLogIfPossible() throws SQLException {
trace.debug("switchLogIfPossible");
int id = getLog().getId();
getLog().close();
activeLog = (activeLog + 1) % LOG_COUNT;
int todoCanOnlyReuseAfterLoggedChangesAreWritten;
getLog().openForWriting(id + 1);
// Session[] sessions = database.getSessions(true);
// int firstUncommittedLog = getLog().getId();
......@@ -277,7 +279,7 @@ public class PageStore implements CacheWriter {
// if (nextLog.containsUncommitted())
activeLog = nextLogId;
// activeLog = nextLogId;
// getLog().reopen();
}
......@@ -415,7 +417,7 @@ public class PageStore implements CacheWriter {
record.setChanged(true);
int pos = record.getPos();
cache.update(pos, record);
if (logUndo) {
if (logUndo && !recoveryRunning) {
if (old == null) {
old = readPage(pos);
}
......@@ -604,9 +606,10 @@ public class PageStore implements CacheWriter {
}
/**
* Run the recovery process. There are two recovery stages: first only the
* undo steps are run (restoring the state before the last checkpoint). In
* the second stage the committed operations are re-applied.
* Run the recovery process. There are two recovery stages: first (undo is
* true) only the undo steps are run (restoring the state before the last
* checkpoint). In the second stage (undo is false) the committed operations
* are re-applied.
*
* @param undo true if the undo step should be run
*/
......@@ -614,10 +617,36 @@ public class PageStore implements CacheWriter {
trace.debug("log recover");
try {
recoveryRunning = true;
int todoBothMaybe;
getLog().recover(undo);
int maxId = 0;
for (int i = 0; i < LOG_COUNT; i++) {
int id = logs[i].openForReading();
if (id > maxId) {
maxId = id;
activeLog = i;
}
}
for (int i = 0; i < LOG_COUNT; i++) {
// start with the oldest log file
int j = (activeLog + 1 + i) % LOG_COUNT;
logs[j].recover(undo);
}
if (!undo) {
switchLogIfPossible();
int todoProbablyStillRequiredForTwoPhaseCommit;
sessionStates = new HashMap();
}
} finally {
recoveryRunning = false;
// re-calculate the last used page
while (true) {
DataPage page = readPage(lastUsedPage);
page.readInt();
int type = page.readByte();
if (type != Page.TYPE_EMPTY) {
break;
}
lastUsedPage--;
}
}
trace.debug("log recover done");
}
......@@ -645,4 +674,53 @@ public class PageStore implements CacheWriter {
getLog().commit(session);
}
/**
* Get the session state for this session. A new object is created if there
* is no session state yet.
*
* @param sessionId the session id
* @return the session state object
*/
private SessionState getOrAddSessionState(int sessionId) {
Integer key = ObjectUtils.getInteger(sessionId);
SessionState state = (SessionState) sessionStates.get(key);
if (state == null) {
state = new SessionState();
sessionStates.put(key, state);
state.sessionId = sessionId;
}
return state;
}
/**
* Set the last commit record for a session.
*
* @param sessionId the session id
* @param logId the log file id
* @param pos the position in the log file
*/
void setLastCommitForSession(int sessionId, int logId, int pos) {
SessionState state = getOrAddSessionState(sessionId);
state.lastCommitLog = logId;
state.lastCommitPos = pos;
state.inDoubtTransaction = null;
}
/**
* Check if the session contains uncommitted log entries at the given position.
*
* @param sessionId the session id
* @param logId the log file id
* @param pos the position in the log file
* @return true if this session contains an uncommitted transaction
*/
boolean isSessionCommitted(int sessionId, int logId, int pos) {
Integer key = ObjectUtils.getInteger(sessionId);
SessionState state = (SessionState) sessionStates.get(key);
if (state == null) {
return true;
}
return state.isCommitted(logId, pos);
}
}
......@@ -72,7 +72,7 @@ public class CacheLRU implements Cache {
} else {
if (SysProperties.CHECK) {
if (old != rec) {
Message.throwInternalError("old != record old=" + old + " new=" + rec);
Message.throwInternalError("old!=record pos:" + pos + " old:" + old + " new:" + rec);
}
}
removeFromLinkedList(rec);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论