提交 b3443d1e authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 fa0ba065
......@@ -20,16 +20,18 @@ public class PageInputStream extends InputStream {
private PageStore store;
private final Trace trace;
private int trunkNext;
private int dataPage;
private PageStreamTrunk trunk;
private PageStreamData data;
private boolean endOfFile;
private int remaining;
private byte[] buffer = new byte[1];
public PageInputStream(PageStore store, int trunkPage) {
public PageInputStream(PageStore store, int trunkPage, int dataPage) {
this.store = store;
this.trace = store.getTrace();
this.trunkNext = trunkPage;
this.dataPage = dataPage;
}
public int read() throws IOException {
......@@ -87,16 +89,19 @@ public class PageInputStream extends InputStream {
}
int next;
while (true) {
next = trunk.getNextPage();
if (next >= 0) {
break;
next = trunk.getNextDataPage();
if (dataPage == -1 || dataPage == next) {
if (next != 0) {
break;
}
trunk = new PageStreamTrunk(store, trunkNext);
trunk.read();
}
trunk = new PageStreamTrunk(store, trunkNext);
trunk.read();
}
if (trace.isDebugEnabled()) {
trace.debug("pageIn.readPage " + next);
}
dataPage = -1;
data = new PageStreamData(store, next, 0);
data.read();
remaining = data.getLength();
......
......@@ -6,31 +6,29 @@
*/
package org.h2.store;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.sql.SQLException;
import org.h2.engine.Session;
import org.h2.index.Page;
import org.h2.log.LogSystem;
import org.h2.message.Message;
import org.h2.message.Trace;
import org.h2.result.Row;
import org.h2.util.BitField;
import org.h2.util.IntIntHashMap;
import org.h2.value.Value;
/**
* Transaction log mechanism.
* The format is:
* <ul><li>0-3: log id
* </li><li>records
* </li></ul>
* The data format for a record is:
* <ul><li>0-0: type (0: undo,...)
* </li><li>1-4: page id
* </li><li>5-: data
* </li></ul>
* Transaction log mechanism. The stream contains a list of records. The data
* format for a record is:
* <ul>
* <li>0-0: type (0: undo,...)</li>
* <li>1-4: page id</li>
* <li>5-: data</li>
* </ul>
*/
public class PageLog {
......@@ -62,17 +60,18 @@ public class PageLog {
private int pos;
private Trace trace;
private PageOutputStream pageOut;
private DataOutputStream out;
private PageOutputStream pageOut;
private DataInputStream in;
private int firstPage;
private int firstTrunkPage;
private DataPage data;
private long operation;
private int logId, logPos;
private int firstLogId;
private BitField undo = new BitField();
private IntIntHashMap logIdPageMap = new IntIntHashMap();
PageLog(PageStore store, int firstPage) {
PageLog(PageStore store) {
this.store = store;
this.firstPage = firstPage;
data = store.createDataPage();
trace = store.getTrace();
}
......@@ -80,20 +79,25 @@ public class PageLog {
/**
* Open the log for writing. For an existing database, the recovery
* must be run first.
*
* @param firstTrunkPage the first trunk page
*/
void openForWriting() {
trace.debug("log openForWriting firstPage:" + firstPage);
pageOut = new PageOutputStream(store, firstPage);
out = new DataOutputStream(pageOut);
void openForWriting(int firstTrunkPage) {
trace.debug("log openForWriting firstPage:" + firstTrunkPage);
pageOut = new PageOutputStream(store, firstTrunkPage);
}
/**
* Open the log for reading.
*
* @param firstTrunkPage the first trunk page
* @param firstDataPage the index of the first data page
*/
void openForReading() {
in = new DataInputStream(new PageInputStream(store, firstPage));
void openForReading(int firstTrunkPage, int firstDataPage) {
this.firstTrunkPage = firstTrunkPage;
in = new DataInputStream(new PageInputStream(store, firstTrunkPage, firstDataPage));
if (trace.isDebugEnabled()) {
trace.debug("log openForReading firstPage:" + firstPage);
trace.debug("log openForReading firstPage:" + firstTrunkPage);
}
}
......@@ -205,7 +209,7 @@ public class PageLog {
trace.debug("log undo " + pageId);
}
undo.set(pageId);
pageOut.prepareWriting(store.getPageSize() * 3);
pageOut.reserve(store.getPageSize() * 3);
out.write(UNDO);
out.writeInt(pageId);
out.write(page.getBytes(), 0, store.getPageSize());
......@@ -229,7 +233,7 @@ public class PageLog {
// database already closed
return;
}
pageOut.prepareWriting(store.getPageSize());
pageOut.reserve(store.getPageSize());
out.write(COMMIT);
out.writeInt(session.getId());
if (log.getFlushOnEachCommit()) {
......@@ -254,15 +258,14 @@ public class PageLog {
trace.debug("log " + (add?"+":"-") + " s:" + session.getId() + " table:" + tableId +
" row:" + row);
}
int todoLogPosShouldBeLong;
session.addLogPos(0, (int) operation);
row.setLastLog(0, (int) operation);
session.addLogPos(logId, logPos);
row.setLastLog(logId, logPos);
data.reset();
int todoWriteIntoOutputDirectly;
row.write(data);
pageOut.prepareWriting(data.length() + store.getPageSize());
pageOut.reserve(data.length() + store.getPageSize());
out.write(add ? ADD : REMOVE);
out.writeInt(session.getId());
out.writeInt(tableId);
......@@ -275,58 +278,75 @@ public class PageLog {
}
/**
* Close the log.
* Flush the transaction log.
*/
void close() throws SQLException {
void flush() throws SQLException {
try {
trace.debug("log close");
if (out != null) {
out.close();
}
out = null;
out.flush();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/**
* Close the log, truncate it, and re-open it.
* Switch to a new log id.
*
* @param id the new log id
* @throws SQLException
*/
private void reopen() throws SQLException {
try {
trace.debug("log reopen");
out.close();
openForWriting();
flush();
int todoDeleteOrReUsePages;
} catch (IOException e) {
throw Message.convertIOException(e, null);
void checkpoint() {
int currentDataPage = pageOut.getCurrentDataPageId();
logIdPageMap.put(logId, currentDataPage);
logId++;
}
int getLogId() {
return logId;
}
int getLogPos() {
return logPos;
}
/**
* Remove all pages until the given log (excluding).
*
* @param firstUncommittedLog the first log id to keep
*/
void removeUntil(int firstUncommittedLog) throws SQLException {
if (firstUncommittedLog == logId) {
return;
}
int firstDataPageToKeep = logIdPageMap.get(firstUncommittedLog);
while (true) {
// TODO keep trunk page in the cache
PageStreamTrunk t = new PageStreamTrunk(store, firstTrunkPage);
t.read();
if (t.contains(firstDataPageToKeep)) {
store.setLogFirstPage(t.getPos(), firstDataPageToKeep);
break;
}
t.free();
}
while (firstLogId < firstUncommittedLog) {
logIdPageMap.remove(firstLogId);
firstLogId++;
}
}
/**
* Flush the transaction log.
* Close the log.
*/
void flush() throws SQLException {
void close() throws SQLException {
try {
out.flush();
trace.debug("log close");
if (out != null) {
out.close();
}
out = null;
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/**
* Flush and close the log.
*/
// public void close() throws SQLException {
// try {
// trace.debug("log close");
// out.close();
// } catch (IOException e) {
// throw Message.convertIOException(e, null);
// }
// }
}
......@@ -20,6 +20,7 @@ public class PageOutputStream extends OutputStream {
private PageStore store;
private final Trace trace;
private int firstTrunkPageId;
private int trunkPageId;
private int trunkNext;
private IntArray reservedPages = new IntArray();
......@@ -41,6 +42,7 @@ public class PageOutputStream extends OutputStream {
this.trace = store.getTrace();
this.store = store;
this.trunkPageId = trunkPage;
firstTrunkPageId = trunkPage;
}
/**
......@@ -49,7 +51,7 @@ public class PageOutputStream extends OutputStream {
*
* @param minBuffer the number of bytes to allocate
*/
void prepareWriting(int minBuffer) throws SQLException {
void reserve(int minBuffer) throws SQLException {
if (reserved < minBuffer) {
int pageSize = store.getPageSize();
int capacityPerPage = PageStreamData.getCapacity(pageSize);
......@@ -71,6 +73,9 @@ public class PageOutputStream extends OutputStream {
int page = store.allocatePage();
reservedPages.add(page);
}
if (data == null) {
initNextData();
}
}
}
......@@ -84,8 +89,8 @@ public class PageOutputStream extends OutputStream {
}
private void initNextData() {
int nextData = trunk == null ? -1 : trunk.getNextPage();
if (nextData < 0) {
int nextData = trunk == null ? 0 : trunk.getNextDataPage();
if (nextData == -1) {
int parent = trunkPageId;
if (trunkNext == 0) {
trunkPageId = reservedPages.get(0);
......@@ -102,7 +107,7 @@ public class PageOutputStream extends OutputStream {
trunk = new PageStreamTrunk(store, parent, trunkPageId, trunkNext, pageIds);
reservedPages.removeRange(0, len + 1);
}
data = new PageStreamData(store, trunk.getNextPage(), trunk.getPos());
data = new PageStreamData(store, trunk.getNextDataPage(), trunk.getPos());
data.initWrite();
}
......@@ -115,6 +120,7 @@ public class PageOutputStream extends OutputStream {
}
writing = true;
try {
reserve(len);
while (len >= 0) {
int l = data.write(b, off, len);
if (l <= len) {
......@@ -157,4 +163,8 @@ public class PageOutputStream extends OutputStream {
store = null;
}
public int getCurrentDataPageId() {
return data.getPos();
}
}
......@@ -19,6 +19,7 @@ import org.h2.index.Index;
import org.h2.index.IndexType;
import org.h2.index.PageBtreeIndex;
import org.h2.index.PageScanIndex;
import org.h2.log.LogSystem;
import org.h2.log.SessionState;
import org.h2.message.Message;
import org.h2.message.Trace;
......@@ -56,8 +57,9 @@ import org.h2.value.ValueString;
* The format of page 1 and 2 is:
* <ul>
* <li>0-7: write counter (incremented each time the header changes)</li>
* <li>8-11: log head page (initially 4)</li>
* <li>12-19: checksum of bytes 0-16 (CRC32)</li>
* <li>8-11: log trunk page (initially 4)</li>
* <li>12-15: log data page (initially 5)</li>
* <li>16-23: checksum of bytes 0-15 (CRC32)</li>
* </ul>
* Page 2 contains the first free list page.
* Page 3 contains the meta table root page.
......@@ -65,12 +67,14 @@ import org.h2.value.ValueString;
*/
public class PageStore implements CacheWriter {
// TODO currently working on PageLog.removeUntil
// TODO unlimited number of log streams (TestPageStoreDb)
// TODO check if PageLog.reservePages is required - yes it is - change it
// TODO PageStore.openMetaIndex (desc and nulls first / last)
// TODO btree index with fixed size values doesn't need offset and so on
// TODO better checksums (for example, multiple fletcher)
// TODO replace CRC32
// TODO log block allocation
// TODO block compression: maybe http://en.wikipedia.org/wiki/LZJB
// with RLE, specially for 0s.
......@@ -137,7 +141,7 @@ public class PageStore implements CacheWriter {
private int pageSize;
private int pageSizeShift;
private long writeCounter;
private int logFirstTrunkPage;
private int logFirstTrunkPage, logFirstDataPage;
private int cacheSize;
private Cache cache;
......@@ -221,7 +225,7 @@ public class PageStore implements CacheWriter {
freeListPagesPerList = PageFreeList.getPagesAddressed(pageSize);
fileLength = file.length();
pageCount = (int) (fileLength / pageSize);
initLog();
log = new PageLog(this);
recover(true);
recover(false);
checkpoint();
......@@ -234,12 +238,9 @@ public class PageStore implements CacheWriter {
pageCount = 5;
increaseFileSize(INCREMENT_PAGES - pageCount);
writeStaticHeader();
writeVariableHeader();
initLog();
log = new PageLog(this);
log.openForWriting(logFirstTrunkPage);
openMetaIndex();
log.openForWriting();
switchLog();
log.flush();
systemTableHeadPos = Index.EMPTY_HEAD;
}
// lastUsedPage = getFreeList().getLastUsed() + 1;
......@@ -265,18 +266,12 @@ public class PageStore implements CacheWriter {
for (CacheObject rec : list) {
writeBack(rec);
}
int todoFlushBeforeReopen;
switchLog();
int todoWriteDeletedPages;
// TODO shrink file if required here
// int pageCount = getFreeList().getLastUsed() + 1;
// trace.debug("pageCount:" + pageCount);
// file.setLength(pageSize * pageCount);
}
// TODO shrink file if required here
// int pageCount = getFreeList().getLastUsed() + 1;
// trace.debug("pageCount:" + pageCount);
// file.setLength(pageSize * pageCount);
}
private void initLog() {
log = new PageLog(this, logFirstTrunkPage);
}
private void switchLog() throws SQLException {
......@@ -284,30 +279,18 @@ public class PageStore implements CacheWriter {
if (database.isReadOnly()) {
return;
}
log.close();
int todoCanOnlyReuseAfterLoggedChangesAreWritten;
log.openForWriting();
// Session[] sessions = database.getSessions(true);
// int firstUncommittedLog = getLog().getId();
// int firstUncommittedPos = getLog().getPos();
// for (int i = 0; i < sessions.length; i++) {
// Session session = sessions[i];
// int log = session.getFirstUncommittedLog();
// int pos = session.getFirstUncommittedPos();
// if (pos != LOG_WRITTEN) {
// if (log < firstUncommittedLog ||
// (log == firstUncommittedLog && pos < firstUncommittedPos)) {
// firstUncommittedLog = log;
// firstUncommittedPos = pos;
// }
// }
// }
// if (nextLog.containsUncommitted())
// activeLog = nextLogId;
// getLog().reopen();
Session[] sessions = database.getSessions(true);
int firstUncommittedLog = log.getLogId();
for (int i = 0; i < sessions.length; i++) {
Session session = sessions[i];
int log = session.getFirstUncommittedLog();
if (log != LogSystem.LOG_WRITTEN) {
if (log < firstUncommittedLog) {
firstUncommittedLog = log;
}
}
}
log.removeUntil(firstUncommittedLog);
}
private void readHeader() throws SQLException {
......@@ -334,7 +317,6 @@ public class PageStore implements CacheWriter {
accessMode = "r";
file = database.openFile(fileName, accessMode, true);
}
CRC32 crc = new CRC32();
for (int i = 1;; i++) {
if (i == 3) {
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, fileName);
......@@ -343,13 +325,14 @@ public class PageStore implements CacheWriter {
readPage(i, page);
writeCounter = page.readLong();
logFirstTrunkPage = page.readInt();
crc.update(page.getBytes(), 0, 12);
logFirstDataPage = page.readInt();
CRC32 crc = new CRC32();
crc.update(page.getBytes(), 0, page.length());
long expected = crc.getValue();
long got = page.readLong();
if (expected == got) {
break;
}
crc.reset();
}
}
......@@ -389,6 +372,18 @@ public class PageStore implements CacheWriter {
file.write(page.getBytes(), 0, pageSize - FileStore.HEADER_LENGTH);
}
/**
* Set the trunk page and data page id of the log.
*
* @param trunkPageId the trunk page id
* @param dataPageId the data page id
*/
void setLogFirstPage(int trunkPageId, int dataPageId) throws SQLException {
this.logFirstTrunkPage = trunkPageId;
this.logFirstDataPage = dataPageId;
writeVariableHeader();
}
private void writeVariableHeader() throws SQLException {
DataPage page = DataPage.create(database, new byte[pageSize]);
page.writeLong(writeCounter);
......@@ -671,25 +666,7 @@ public class PageStore implements CacheWriter {
}
try {
recoveryRunning = true;
int maxId = 0;
// for (int i = 0; i < LOG_COUNT; i++) {
// int id = logs[i].openForReading();
// if (id > maxId) {
// maxId = id;
// activeLog = i;
// }
// }
// for (int i = 0; i < LOG_COUNT; i++) {
// int j;
// if (undo) {
// // undo: start with the newest file and go backward
// j = Math.abs(activeLog - i) % LOG_COUNT;
// } else {
// // redo: start with the oldest log file
// j = (activeLog + 1 + i) % LOG_COUNT;
// }
// logs[j].recover(undo);
// }
log.recover(undo);
if (!undo) {
switchLog();
int todoProbablyStillRequiredForTwoPhaseCommit;
......
......@@ -67,11 +67,11 @@ public class PageStreamTrunk extends Record {
}
}
void setNextPage(int page) {
void setNextDataPage(int page) {
pageIds[index++] = page;
}
int getNextPage() {
int getNextDataPage() {
if (index >= pageIds.length) {
return -1;
}
......@@ -108,4 +108,29 @@ public class PageStreamTrunk extends Record {
return (pageSize - DATA_START) / 4;
}
/**
* Check if the given data page is in this trunk page.
*
* @param dataPageId the page id
* @return true if it is
*/
boolean contains(int dataPageId) {
for (int i = 0; i < pageCount; i++) {
if (pageIds[i] == dataPageId) {
return true;
}
}
return false;
}
/**
* Free this page and all data pages.
*/
void free() throws SQLException {
store.freePage(getPos(), false, null);
for (int i = 0; i < pageCount; i++) {
store.freePage(pageIds[i], false, null);
}
}
}
......@@ -17,7 +17,6 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
......@@ -27,7 +26,6 @@ import org.h2.engine.Database;
import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.index.IndexType;
import org.h2.index.Page;
import org.h2.index.PageScanIndex;
import org.h2.result.Row;
import org.h2.schema.Schema;
......@@ -322,7 +320,7 @@ public class TestPageStore extends TestBase {
if (file) {
in = new BufferedInputStream(new FileInputStream(f), 4 * 1024);
} else {
in = new PageInputStream(store, 0);
in = new PageInputStream(store, 0, 0);
}
while (true) {
int len = in.read(buff);
......@@ -360,7 +358,7 @@ public class TestPageStore extends TestBase {
p += l;
}
out.close();
PageInputStream in = new PageInputStream(store, 0);
PageInputStream in = new PageInputStream(store, 0, 0);
byte[] data2 = new byte[len];
for (int off = 0;;) {
int l = random.nextInt(1 + len / 10) + 1;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论