提交 ccbbfb3d authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 dc563c4c
...@@ -6,10 +6,10 @@ ...@@ -6,10 +6,10 @@
*/ */
package org.h2.store; package org.h2.store;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.sql.SQLException; import java.sql.SQLException;
import org.h2.message.Message;
import org.h2.message.Trace; import org.h2.message.Trace;
/** /**
...@@ -71,7 +71,7 @@ public class PageInputStream extends InputStream { ...@@ -71,7 +71,7 @@ public class PageInputStream extends InputStream {
remaining -= l; remaining -= l;
return l; return l;
} catch (SQLException e) { } catch (SQLException e) {
throw Message.convertToIOException(e); throw new EOFException();
} }
} }
......
...@@ -61,9 +61,11 @@ public class PageLog { ...@@ -61,9 +61,11 @@ public class PageLog {
private Trace trace; private Trace trace;
private DataOutputStream out; private DataOutputStream out;
private ByteArrayOutputStream buffer;
private PageOutputStream pageOut; private PageOutputStream pageOut;
private DataInputStream in; private DataInputStream in;
private int firstTrunkPage; private int firstTrunkPage;
private int firstDataPage;
private DataPage data; private DataPage data;
private int logId, logPos; private int logId, logPos;
private int firstLogId; private int firstLogId;
...@@ -82,9 +84,13 @@ public class PageLog { ...@@ -82,9 +84,13 @@ public class PageLog {
* *
* @param firstTrunkPage the first trunk page * @param firstTrunkPage the first trunk page
*/ */
void openForWriting(int firstTrunkPage) { void openForWriting(int firstTrunkPage) throws SQLException {
trace.debug("log openForWriting firstPage:" + firstTrunkPage); trace.debug("log openForWriting firstPage:" + firstTrunkPage);
pageOut = new PageOutputStream(store, firstTrunkPage); pageOut = new PageOutputStream(store, firstTrunkPage);
pageOut.reserve(1);
store.setLogFirstPage(firstTrunkPage, pageOut.getCurrentDataPageId());
buffer = new ByteArrayOutputStream();
out = new DataOutputStream(buffer);
} }
/** /**
...@@ -95,10 +101,7 @@ public class PageLog { ...@@ -95,10 +101,7 @@ public class PageLog {
*/ */
void openForReading(int firstTrunkPage, int firstDataPage) { void openForReading(int firstTrunkPage, int firstDataPage) {
this.firstTrunkPage = firstTrunkPage; this.firstTrunkPage = firstTrunkPage;
in = new DataInputStream(new PageInputStream(store, firstTrunkPage, firstDataPage)); this.firstDataPage = firstDataPage;
if (trace.isDebugEnabled()) {
trace.debug("log openForReading firstPage:" + firstTrunkPage);
}
} }
/** /**
...@@ -112,6 +115,7 @@ public class PageLog { ...@@ -112,6 +115,7 @@ public class PageLog {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log recover undo:" + undo); trace.debug("log recover undo:" + undo);
} }
in = new DataInputStream(new PageInputStream(store, firstTrunkPage, firstDataPage));
int logId = 0; int logId = 0;
DataPage data = store.createDataPage(); DataPage data = store.createDataPage();
try { try {
...@@ -209,15 +213,21 @@ public class PageLog { ...@@ -209,15 +213,21 @@ public class PageLog {
trace.debug("log undo " + pageId); trace.debug("log undo " + pageId);
} }
undo.set(pageId); undo.set(pageId);
pageOut.reserve(store.getPageSize() * 3);
out.write(UNDO); out.write(UNDO);
out.writeInt(pageId); out.writeInt(pageId);
out.write(page.getBytes(), 0, store.getPageSize()); out.write(page.getBytes(), 0, store.getPageSize());
flushOut();
} catch (IOException e) { } catch (IOException e) {
throw Message.convertIOException(e, null); throw Message.convertIOException(e, null);
} }
} }
private void flushOut() throws IOException {
out.flush();
pageOut.write(buffer.toByteArray());
buffer.reset();
}
/** /**
* Mark a committed transaction. * Mark a committed transaction.
* *
...@@ -233,9 +243,9 @@ public class PageLog { ...@@ -233,9 +243,9 @@ public class PageLog {
// database already closed // database already closed
return; return;
} }
pageOut.reserve(store.getPageSize());
out.write(COMMIT); out.write(COMMIT);
out.writeInt(session.getId()); out.writeInt(session.getId());
flushOut();
if (log.getFlushOnEachCommit()) { if (log.getFlushOnEachCommit()) {
flush(); flush();
} }
...@@ -264,14 +274,13 @@ public class PageLog { ...@@ -264,14 +274,13 @@ public class PageLog {
data.reset(); data.reset();
int todoWriteIntoOutputDirectly; int todoWriteIntoOutputDirectly;
row.write(data); row.write(data);
pageOut.reserve(data.length() + store.getPageSize());
out.write(add ? ADD : REMOVE); out.write(add ? ADD : REMOVE);
out.writeInt(session.getId()); out.writeInt(session.getId());
out.writeInt(tableId); out.writeInt(tableId);
out.writeInt(row.getPos()); out.writeInt(row.getPos());
out.writeInt(data.length()); out.writeInt(data.length());
out.write(data.getBytes(), 0, data.length()); out.write(data.getBytes(), 0, data.length());
flushOut();
} catch (IOException e) { } catch (IOException e) {
throw Message.convertIOException(e, null); throw Message.convertIOException(e, null);
} }
...@@ -282,7 +291,7 @@ public class PageLog { ...@@ -282,7 +291,7 @@ public class PageLog {
*/ */
void flush() throws SQLException { void flush() throws SQLException {
try { try {
out.flush(); pageOut.flush();
} catch (IOException e) { } catch (IOException e) {
throw Message.convertIOException(e, null); throw Message.convertIOException(e, null);
} }
......
...@@ -88,8 +88,8 @@ public class PageOutputStream extends OutputStream { ...@@ -88,8 +88,8 @@ public class PageOutputStream extends OutputStream {
write(b, 0, b.length); write(b, 0, b.length);
} }
private void initNextData() { private void initNextData() throws SQLException {
int nextData = trunk == null ? 0 : trunk.getNextDataPage(); int nextData = trunk == null ? -1 : trunk.getNextDataPage();
if (nextData == -1) { if (nextData == -1) {
int parent = trunkPageId; int parent = trunkPageId;
if (trunkNext == 0) { if (trunkNext == 0) {
...@@ -105,9 +105,11 @@ public class PageOutputStream extends OutputStream { ...@@ -105,9 +105,11 @@ public class PageOutputStream extends OutputStream {
} }
trunkNext = reservedPages.get(len); trunkNext = reservedPages.get(len);
trunk = new PageStreamTrunk(store, parent, trunkPageId, trunkNext, pageIds); trunk = new PageStreamTrunk(store, parent, trunkPageId, trunkNext, pageIds);
reservedPages.removeRange(0, len + 1); trunk.write(null);
reservedPages.removeRange(0, len);
nextData = trunk.getNextDataPage();
} }
data = new PageStreamData(store, trunk.getNextDataPage(), trunk.getPos()); data = new PageStreamData(store, nextData, trunk.getPos());
data.initWrite(); data.initWrite();
} }
...@@ -118,12 +120,12 @@ public class PageOutputStream extends OutputStream { ...@@ -118,12 +120,12 @@ public class PageOutputStream extends OutputStream {
if (writing) { if (writing) {
Message.throwInternalError("writing while still writing"); Message.throwInternalError("writing while still writing");
} }
writing = true;
try { try {
reserve(len); reserve(len);
while (len >= 0) { writing = true;
while (len > 0) {
int l = data.write(b, off, len); int l = data.write(b, off, len);
if (l <= len) { if (l < len) {
data.write(null); data.write(null);
initNextData(); initNextData();
} }
......
...@@ -61,9 +61,9 @@ import org.h2.value.ValueString; ...@@ -61,9 +61,9 @@ import org.h2.value.ValueString;
* <li>12-15: log data page (initially 5)</li> * <li>12-15: log data page (initially 5)</li>
* <li>16-23: checksum of bytes 0-15 (CRC32)</li> * <li>16-23: checksum of bytes 0-15 (CRC32)</li>
* </ul> * </ul>
* Page 2 contains the first free list page. * Page 3 contains the first free list page.
* Page 3 contains the meta table root page. * Page 4 contains the meta table root page.
* For a new database, page 4 contains the first log trunk page. * For a new database, page 5 contains the first log trunk page.
*/ */
public class PageStore implements CacheWriter { public class PageStore implements CacheWriter {
...@@ -123,6 +123,7 @@ public class PageStore implements CacheWriter { ...@@ -123,6 +123,7 @@ public class PageStore implements CacheWriter {
private static final int PAGE_ID_FREE_LIST_ROOT = 2; private static final int PAGE_ID_FREE_LIST_ROOT = 2;
private static final int PAGE_ID_META_ROOT = 3; private static final int PAGE_ID_META_ROOT = 3;
private static final int PAGE_ID_LOG_TRUNK = 5;
private static final int INCREMENT_PAGES = 128; private static final int INCREMENT_PAGES = 128;
...@@ -221,27 +222,38 @@ public class PageStore implements CacheWriter { ...@@ -221,27 +222,38 @@ public class PageStore implements CacheWriter {
if (FileUtils.exists(fileName)) { if (FileUtils.exists(fileName)) {
// existing // existing
file = database.openFile(fileName, accessMode, true); file = database.openFile(fileName, accessMode, true);
readHeader(); readStaticHeader();
freeListPagesPerList = PageFreeList.getPagesAddressed(pageSize); freeListPagesPerList = PageFreeList.getPagesAddressed(pageSize);
fileLength = file.length(); fileLength = file.length();
pageCount = (int) (fileLength / pageSize); pageCount = (int) (fileLength / pageSize);
if (pageCount < 6) {
// not enough pages - must be a new database
// that didn't get created correctly
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, fileName);
}
readVariableHeader();
log = new PageLog(this); log = new PageLog(this);
log.openForReading(logFirstTrunkPage, logFirstDataPage);
recover(true); recover(true);
recover(false); recover(false);
recoveryRunning = true;
log.openForWriting(logFirstTrunkPage);
recoveryRunning = false;
checkpoint(); checkpoint();
} else { } else {
// new // new
setPageSize(PAGE_SIZE_DEFAULT); setPageSize(PAGE_SIZE_DEFAULT);
freeListPagesPerList = PageFreeList.getPagesAddressed(pageSize); freeListPagesPerList = PageFreeList.getPagesAddressed(pageSize);
file = database.openFile(fileName, accessMode, false); file = database.openFile(fileName, accessMode, false);
logFirstTrunkPage = 4; recoveryRunning = true;
pageCount = 5; increaseFileSize(INCREMENT_PAGES);
increaseFileSize(INCREMENT_PAGES - pageCount);
writeStaticHeader(); writeStaticHeader();
log = new PageLog(this); log = new PageLog(this);
log.openForWriting(logFirstTrunkPage);
openMetaIndex(); openMetaIndex();
logFirstTrunkPage = allocatePage();
log.openForWriting(logFirstTrunkPage);
systemTableHeadPos = Index.EMPTY_HEAD; systemTableHeadPos = Index.EMPTY_HEAD;
recoveryRunning = false;
} }
// lastUsedPage = getFreeList().getLastUsed() + 1; // lastUsedPage = getFreeList().getLastUsed() + 1;
} catch (SQLException e) { } catch (SQLException e) {
...@@ -293,7 +305,7 @@ public class PageStore implements CacheWriter { ...@@ -293,7 +305,7 @@ public class PageStore implements CacheWriter {
log.removeUntil(firstUncommittedLog); log.removeUntil(firstUncommittedLog);
} }
private void readHeader() throws SQLException { private void readStaticHeader() throws SQLException {
long length = file.length(); long length = file.length();
if (length < PAGE_SIZE_MIN * 2) { if (length < PAGE_SIZE_MIN * 2) {
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, fileName); throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, fileName);
...@@ -317,6 +329,10 @@ public class PageStore implements CacheWriter { ...@@ -317,6 +329,10 @@ public class PageStore implements CacheWriter {
accessMode = "r"; accessMode = "r";
file = database.openFile(fileName, accessMode, true); file = database.openFile(fileName, accessMode, true);
} }
}
private void readVariableHeader() throws SQLException {
DataPage page = DataPage.create(database, pageSize);
for (int i = 1;; i++) { for (int i = 1;; i++) {
if (i == 3) { if (i == 3) {
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, fileName); throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, fileName);
...@@ -385,16 +401,17 @@ public class PageStore implements CacheWriter { ...@@ -385,16 +401,17 @@ public class PageStore implements CacheWriter {
} }
private void writeVariableHeader() throws SQLException { private void writeVariableHeader() throws SQLException {
DataPage page = DataPage.create(database, new byte[pageSize]); DataPage page = DataPage.create(database, pageSize);
page.writeLong(writeCounter); page.writeLong(writeCounter);
page.writeInt(logFirstTrunkPage); page.writeInt(logFirstTrunkPage);
page.writeInt(logFirstDataPage);
CRC32 crc = new CRC32(); CRC32 crc = new CRC32();
crc.update(page.getBytes(), 0, 12); crc.update(page.getBytes(), 0, page.length());
page.writeLong(crc.getValue()); page.writeLong(crc.getValue());
file.seek(pageSize); file.seek(pageSize);
file.write(page.getBytes(), 0, page.length()); file.write(page.getBytes(), 0, pageSize);
file.seek(pageSize + pageSize); file.seek(pageSize + pageSize);
file.write(page.getBytes(), 0, page.length()); file.write(page.getBytes(), 0, pageSize);
} }
/** /**
...@@ -413,8 +430,10 @@ public class PageStore implements CacheWriter { ...@@ -413,8 +430,10 @@ public class PageStore implements CacheWriter {
} }
public void flushLog() throws SQLException { public void flushLog() throws SQLException {
if (file != null) {
log.flush(); log.flush();
} }
}
public Trace getTrace() { public Trace getTrace() {
return trace; return trace;
...@@ -478,10 +497,15 @@ public class PageStore implements CacheWriter { ...@@ -478,10 +497,15 @@ public class PageStore implements CacheWriter {
} else { } else {
p = i * freeListPagesPerList; p = i * freeListPagesPerList;
} }
while (p >= pageCount) {
increaseFileSize(INCREMENT_PAGES);
}
PageFreeList list = (PageFreeList) getRecord(p); PageFreeList list = (PageFreeList) getRecord(p);
if (list == null) { if (list == null) {
list = new PageFreeList(this, p); list = new PageFreeList(this, p);
if (p < pageCount) {
list.read(); list.read();
}
cache.put(list); cache.put(list);
} }
return list; return list;
...@@ -515,9 +539,6 @@ public class PageStore implements CacheWriter { ...@@ -515,9 +539,6 @@ public class PageStore implements CacheWriter {
} else { } else {
// TODO could remember the first possible free list page // TODO could remember the first possible free list page
for (int i = 0;; i++) { for (int i = 0;; i++) {
if (i * freeListPagesPerList > pageCount) {
increaseFileSize(INCREMENT_PAGES);
}
PageFreeList list = getFreeList(i); PageFreeList list = getFreeList(i);
if (!list.isFull()) { if (!list.isFull()) {
pos = list.allocate(); pos = list.allocate();
...@@ -660,12 +681,13 @@ public class PageStore implements CacheWriter { ...@@ -660,12 +681,13 @@ public class PageStore implements CacheWriter {
*/ */
private void recover(boolean undo) throws SQLException { private void recover(boolean undo) throws SQLException {
trace.debug("log recover #" + undo); trace.debug("log recover #" + undo);
try {
recoveryRunning = true;
if (!undo) { if (!undo) {
openMetaIndex(); openMetaIndex();
readMetaData(); readMetaData();
} }
try {
recoveryRunning = true;
log.recover(undo); log.recover(undo);
if (!undo) { if (!undo) {
switchLog(); switchLog();
......
...@@ -43,6 +43,7 @@ public class PageStreamData extends Record { ...@@ -43,6 +43,7 @@ public class PageStreamData extends Record {
void read() throws SQLException { void read() throws SQLException {
data = store.createDataPage(); data = store.createDataPage();
store.readPage(getPos(), data); store.readPage(getPos(), data);
data.setPos(4);
int t = data.readByte(); int t = data.readByte();
if (t != Page.TYPE_STREAM_DATA) { if (t != Page.TYPE_STREAM_DATA) {
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, "pos:" + getPos() + " type:" + t + throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, "pos:" + getPos() + " type:" + t +
......
...@@ -62,6 +62,7 @@ public class PageStreamTrunk extends Record { ...@@ -62,6 +62,7 @@ public class PageStreamTrunk extends Record {
} }
nextTrunk = data.readInt(); nextTrunk = data.readInt();
pageCount = data.readInt(); pageCount = data.readInt();
pageIds = new int[pageCount];
for (int i = 0; i < pageCount; i++) { for (int i = 0; i < pageCount; i++) {
pageIds[i] = data.readInt(); pageIds[i] = data.readInt();
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论