提交 cd42fd90 authored 作者: Thomas Mueller's avatar Thomas Mueller

Page store: use a log key, so that pages don't need to be set to zero on checkpoint.

上级 00df2d41
...@@ -69,6 +69,11 @@ public abstract class Page extends Record { ...@@ -69,6 +69,11 @@ public abstract class Page extends Record {
*/ */
public static final int TYPE_STREAM_DATA = 8; public static final int TYPE_STREAM_DATA = 8;
/**
* A header page.
*/
public static final int TYPE_HEADER = 9;
/** /**
* Copy the data to a new location, change the parent to point to the new * Copy the data to a new location, change the parent to point to the new
* location, and free up the current page. * location, and free up the current page.
......
...@@ -27,10 +27,13 @@ public class PageInputStream extends InputStream { ...@@ -27,10 +27,13 @@ public class PageInputStream extends InputStream {
private boolean endOfFile; private boolean endOfFile;
private int remaining; private int remaining;
private byte[] buffer = new byte[1]; private byte[] buffer = new byte[1];
private int logKey;
PageInputStream(PageStore store, int trunkPage, int dataPage) { PageInputStream(PageStore store, int logKey, int trunkPage, int dataPage) {
this.store = store; this.store = store;
this.trace = store.getTrace(); this.trace = store.getTrace();
// minus one, because we increment before reading the trunk page
this.logKey = logKey - 1;
this.trunkNext = trunkPage; this.trunkNext = trunkPage;
this.dataPage = dataPage; this.dataPage = dataPage;
} }
...@@ -88,8 +91,11 @@ public class PageInputStream extends InputStream { ...@@ -88,8 +91,11 @@ public class PageInputStream extends InputStream {
while (true) { while (true) {
if (trunk == null) { if (trunk == null) {
trunk = (PageStreamTrunk) store.getPage(trunkNext); trunk = (PageStreamTrunk) store.getPage(trunkNext);
logKey++;
if (trunk == null) { if (trunk == null) {
throw new EOFException(); throw new EOFException();
} else if (trunk.getLogKey() != logKey) {
throw new EOFException();
} }
trunk.resetIndex(); trunk.resetIndex();
trunkNext = trunk.getNextTrunk(); trunkNext = trunk.getNextTrunk();
...@@ -110,9 +116,11 @@ public class PageInputStream extends InputStream { ...@@ -110,9 +116,11 @@ public class PageInputStream extends InputStream {
data = (PageStreamData) store.getPage(next); data = (PageStreamData) store.getPage(next);
if (data == null) { if (data == null) {
throw new EOFException(); throw new EOFException();
} else if (data.getLogKey() != logKey) {
throw new EOFException();
} }
data.initRead(); data.initRead();
remaining = data.getLength(); remaining = data.getRemaining();
} }
/** /**
......
...@@ -30,11 +30,10 @@ import org.h2.value.Value; ...@@ -30,11 +30,10 @@ import org.h2.value.Value;
* Transaction log mechanism. The stream contains a list of records. The data * Transaction log mechanism. The stream contains a list of records. The data
* format for a record is: * format for a record is:
* <ul> * <ul>
* <li>0-0: type (0: undo,...)</li> * <li>type (0: no-op, 1: undo, 2: commit, ...)</li>
* <li>1-4: page id</li> * <li>data</li>
* <li>5-: data</li>
* </ul> * </ul>
* The log file is split into sections, each section starts with a new log id. * The log file is split into sections.
* A checkpoint starts a new section. * A checkpoint starts a new section.
*/ */
public class PageLog { public class PageLog {
...@@ -87,7 +86,7 @@ public class PageLog { ...@@ -87,7 +86,7 @@ public class PageLog {
public static final int TRUNCATE = 7; public static final int TRUNCATE = 7;
/** /**
* Perform a checkpoint. The log id is incremented. * Perform a checkpoint. The log section id is incremented.
* Format: - * Format: -
*/ */
public static final int CHECKPOINT = 8; public static final int CHECKPOINT = 8;
...@@ -127,8 +126,9 @@ public class PageLog { ...@@ -127,8 +126,9 @@ public class PageLog {
private int firstTrunkPage; private int firstTrunkPage;
private int firstDataPage; private int firstDataPage;
private Data data; private Data data;
private int logKey;
private int logSectionId, logPos; private int logSectionId, logPos;
private int firstLogId; private int firstSectionId;
private CompressLZF compress; private CompressLZF compress;
private byte[] compressBuffer; private byte[] compressBuffer;
...@@ -180,12 +180,13 @@ public class PageLog { ...@@ -180,12 +180,13 @@ public class PageLog {
void openForWriting(int firstTrunkPage, boolean atEnd) throws SQLException { void openForWriting(int firstTrunkPage, boolean atEnd) throws SQLException {
trace.debug("log openForWriting firstPage:" + firstTrunkPage); trace.debug("log openForWriting firstPage:" + firstTrunkPage);
this.firstTrunkPage = firstTrunkPage; this.firstTrunkPage = firstTrunkPage;
pageOut = new PageOutputStream(store, firstTrunkPage, undoAll, atEnd); logKey++;
pageOut = new PageOutputStream(store, firstTrunkPage, undoAll, logKey, atEnd);
pageOut.reserve(1); pageOut.reserve(1);
// TODO maybe buffer to improve speed // TODO maybe buffer to improve speed
pageBuffer = pageOut; pageBuffer = pageOut;
// pageBuffer = new BufferedOutputStream(pageOut, 8 * 1024); // pageBuffer = new BufferedOutputStream(pageOut, 8 * 1024);
store.setLogFirstPage(firstTrunkPage, pageOut.getCurrentDataPageId()); store.setLogFirstPage(logKey, firstTrunkPage, pageOut.getCurrentDataPageId());
outBuffer = store.createData(); outBuffer = store.createData();
} }
...@@ -208,10 +209,12 @@ public class PageLog { ...@@ -208,10 +209,12 @@ public class PageLog {
/** /**
* Open the log for reading. * Open the log for reading.
* *
* @param logKey the first expected log key
* @param firstTrunkPage the first trunk page * @param firstTrunkPage the first trunk page
* @param firstDataPage the index of the first data page * @param firstDataPage the index of the first data page
*/ */
void openForReading(int firstTrunkPage, int firstDataPage) { void openForReading(int logKey, int firstTrunkPage, int firstDataPage) {
this.logKey = logKey;
this.firstTrunkPage = firstTrunkPage; this.firstTrunkPage = firstTrunkPage;
this.firstDataPage = firstDataPage; this.firstDataPage = firstDataPage;
} }
...@@ -229,12 +232,12 @@ public class PageLog { ...@@ -229,12 +232,12 @@ public class PageLog {
trace.debug("log recover stage:" + stage); trace.debug("log recover stage:" + stage);
} }
if (stage == RECOVERY_STAGE_ALLOCATE) { if (stage == RECOVERY_STAGE_ALLOCATE) {
PageInputStream in = new PageInputStream(store, firstTrunkPage, firstDataPage); PageInputStream in = new PageInputStream(store, logKey, firstTrunkPage, firstDataPage);
usedLogPages = in.allocateAllPages(); usedLogPages = in.allocateAllPages();
in.close(); in.close();
return; return;
} }
pageIn = new PageInputStream(store, firstTrunkPage, firstDataPage); pageIn = new PageInputStream(store, logKey, firstTrunkPage, firstDataPage);
in = new DataReader(pageIn); in = new DataReader(pageIn);
int logId = 0; int logId = 0;
Data data = store.createData(); Data data = store.createData();
...@@ -249,8 +252,8 @@ public class PageLog { ...@@ -249,8 +252,8 @@ public class PageLog {
if (x == UNDO) { if (x == UNDO) {
int pageId = in.readVarInt(); int pageId = in.readVarInt();
int size = in.readVarInt(); int size = in.readVarInt();
if (size == store.getPageSize()) { if (size == 0) {
in.readFully(data.getBytes(), 0, size); in.readFully(data.getBytes(), 0, store.getPageSize());
} else { } else {
in.readFully(compressBuffer, 0, size); in.readFully(compressBuffer, 0, size);
compress.expand(compressBuffer, 0, size, data.getBytes(), 0, store.getPageSize()); compress.expand(compressBuffer, 0, size, data.getBytes(), 0, store.getPageSize());
...@@ -440,11 +443,12 @@ public class PageLog { ...@@ -440,11 +443,12 @@ public class PageLog {
outBuffer.checkCapacity(size); outBuffer.checkCapacity(size);
outBuffer.write(compressBuffer, 0, size); outBuffer.write(compressBuffer, 0, size);
} else { } else {
outBuffer.writeVarInt(pageSize); outBuffer.writeVarInt(0);
outBuffer.checkCapacity(pageSize); outBuffer.checkCapacity(pageSize);
outBuffer.write(page.getBytes(), 0, pageSize); outBuffer.write(page.getBytes(), 0, pageSize);
} }
} else { } else {
outBuffer.writeVarInt(0);
outBuffer.checkCapacity(pageSize); outBuffer.checkCapacity(pageSize);
outBuffer.write(page.getBytes(), 0, pageSize); outBuffer.write(page.getBytes(), 0, pageSize);
} }
...@@ -616,7 +620,7 @@ public class PageLog { ...@@ -616,7 +620,7 @@ public class PageLog {
} }
/** /**
* Switch to a new log id. * Switch to a new log section.
*/ */
void checkpoint() throws SQLException { void checkpoint() throws SQLException {
try { try {
...@@ -645,21 +649,21 @@ public class PageLog { ...@@ -645,21 +649,21 @@ public class PageLog {
/** /**
* Remove all pages until the given log (excluding). * Remove all pages until the given log (excluding).
* *
* @param firstUncommittedLog the first log id to keep * @param firstUncommittedSection the first log section to keep
*/ */
void removeUntil(int firstUncommittedLog) throws SQLException { void removeUntil(int firstUncommittedSection) throws SQLException {
if (firstUncommittedLog == 0) { if (firstUncommittedSection == 0) {
return; return;
} }
int firstDataPageToKeep = logSectionPageMap.get(firstUncommittedLog); int firstDataPageToKeep = logSectionPageMap.get(firstUncommittedSection);
firstTrunkPage = removeUntil(firstTrunkPage, firstDataPageToKeep); firstTrunkPage = removeUntil(firstTrunkPage, firstDataPageToKeep);
store.setLogFirstPage(firstTrunkPage, firstDataPageToKeep); store.setLogFirstPage(logKey, firstTrunkPage, firstDataPageToKeep);
while (firstLogId < firstUncommittedLog) { while (firstSectionId < firstUncommittedSection) {
if (firstLogId > 0) { if (firstSectionId > 0) {
// there is no entry for log 0 // there is no entry for log 0
logSectionPageMap.remove(firstLogId); logSectionPageMap.remove(firstSectionId);
} }
firstLogId++; firstSectionId++;
} }
} }
...@@ -674,6 +678,7 @@ public class PageLog { ...@@ -674,6 +678,7 @@ public class PageLog {
trace.debug("log.removeUntil " + firstDataPageToKeep); trace.debug("log.removeUntil " + firstDataPageToKeep);
while (true) { while (true) {
PageStreamTrunk t = (PageStreamTrunk) store.getPage(firstTrunkPage); PageStreamTrunk t = (PageStreamTrunk) store.getPage(firstTrunkPage);
logKey = t.getLogKey();
t.resetIndex(); t.resetIndex();
if (t.contains(firstDataPageToKeep)) { if (t.contains(firstDataPageToKeep)) {
return t.getPos(); return t.getPos();
......
...@@ -35,19 +35,24 @@ public class PageOutputStream extends OutputStream { ...@@ -35,19 +35,24 @@ public class PageOutputStream extends OutputStream {
private boolean writing; private boolean writing;
private int pages; private int pages;
private boolean atEnd; private boolean atEnd;
private int logKey;
/** /**
* Create a new page output stream. * Create a new page output stream.
* *
* @param store the page store * @param store the page store
* @param trunkPage the first trunk page (already allocated) * @param trunkPage the first trunk page (already allocated)
* @param exclude the pages not to use
* @param logKey the log key of the first trunk page
* @param atEnd whether only pages at the end of the file should be used * @param atEnd whether only pages at the end of the file should be used
*/ */
public PageOutputStream(PageStore store, int trunkPage, BitField exclude, boolean atEnd) { public PageOutputStream(PageStore store, int trunkPage, BitField exclude, int logKey, boolean atEnd) {
this.trace = store.getTrace(); this.trace = store.getTrace();
this.store = store; this.store = store;
this.trunkPageId = trunkPage; this.trunkPageId = trunkPage;
this.exclude = exclude; this.exclude = exclude;
// minus one, because we increment before creating a trunk page
this.logKey = logKey - 1;
this.atEnd = atEnd; this.atEnd = atEnd;
} }
...@@ -102,13 +107,14 @@ public class PageOutputStream extends OutputStream { ...@@ -102,13 +107,14 @@ public class PageOutputStream extends OutputStream {
pageIds[i] = reservedPages.get(i); pageIds[i] = reservedPages.get(i);
} }
trunkNext = reservedPages.get(len); trunkNext = reservedPages.get(len);
trunk = PageStreamTrunk.create(store, parent, trunkPageId, trunkNext, pageIds); logKey++;
trunk = PageStreamTrunk.create(store, parent, trunkPageId, trunkNext, logKey, pageIds);
pages++; pages++;
trunk.write(null); trunk.write(null);
reservedPages.removeRange(0, len + 1); reservedPages.removeRange(0, len + 1);
nextData = trunk.getNextPageData(); nextData = trunk.getNextPageData();
} }
data = PageStreamData.create(store, nextData, trunk.getPos()); data = PageStreamData.create(store, nextData, trunk.getPos(), logKey);
pages++; pages++;
data.initWrite(); data.initWrite();
} }
...@@ -145,7 +151,7 @@ public class PageOutputStream extends OutputStream { ...@@ -145,7 +151,7 @@ public class PageOutputStream extends OutputStream {
private void storePage() throws IOException { private void storePage() throws IOException {
try { try {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("pageOut.storePage " + data.getPos()); trace.debug("pageOut.storePage " + data);
} }
data.write(null); data.write(null);
} catch (SQLException e) { } catch (SQLException e) {
...@@ -195,4 +201,8 @@ public class PageOutputStream extends OutputStream { ...@@ -195,4 +201,8 @@ public class PageOutputStream extends OutputStream {
pages -= t.free(); pages -= t.free();
} }
int getCurrentLogKey() {
return trunk.getLogKey();
}
} }
...@@ -22,12 +22,12 @@ import org.h2.index.IndexType; ...@@ -22,12 +22,12 @@ import org.h2.index.IndexType;
import org.h2.index.PageBtreeIndex; import org.h2.index.PageBtreeIndex;
import org.h2.index.PageBtreeLeaf; import org.h2.index.PageBtreeLeaf;
import org.h2.index.PageBtreeNode; import org.h2.index.PageBtreeNode;
import org.h2.index.PageDataIndex;
import org.h2.index.PageDataLeaf; import org.h2.index.PageDataLeaf;
import org.h2.index.PageDataNode; import org.h2.index.PageDataNode;
import org.h2.index.PageDataOverflow; import org.h2.index.PageDataOverflow;
import org.h2.index.PageDelegateIndex; import org.h2.index.PageDelegateIndex;
import org.h2.index.PageIndex; import org.h2.index.PageIndex;
import org.h2.index.PageDataIndex;
import org.h2.log.InDoubtTransaction; import org.h2.log.InDoubtTransaction;
import org.h2.log.LogSystem; import org.h2.log.LogSystem;
import org.h2.message.Message; import org.h2.message.Message;
...@@ -69,19 +69,24 @@ import org.h2.value.ValueString; ...@@ -69,19 +69,24 @@ import org.h2.value.ValueString;
* </ul> * </ul>
* The format of page 1 and 2 is: * The format of page 1 and 2 is:
* <ul> * <ul>
* <li>0-7: write counter (incremented each time the header changes)</li> * <li>page type: byte (0)</li>
* <li>8-11: log trunk page (0 for none)</li> * <li>write counter (incremented each time the header changes): long (1-8)</li>
* <li>12-15: log data page (0 for none)</li> * <li>log trunk key: int (9-12)</li>
* <li>16-23: checksum of bytes 0-15 (CRC32)</li> * <li>log trunk page (0 for none): int (13-16)</li>
* <li>log data page (0 for none): int (17-20)</li>
* <li>CRC32 checksum of the page: int (20-23)</li>
* </ul> * </ul>
* Page 3 contains the first free list page. * Page 3 contains the first free list page.
* Page 4 contains the meta table root page. * Page 4 contains the meta table root page.
*/ */
public class PageStore implements CacheWriter { public class PageStore implements CacheWriter {
// TODO freed: truncate when decreasing size
// TODO check commit delay // TODO check commit delay
// TODO do not trim large databases fully, only up to x seconds // TODO do not trim large databases fully, only up to x seconds
// TODO use regular page type for page 1 and 2
// TODO implement checksum; 0 for empty pages // TODO implement checksum; 0 for empty pages
// TODO undo log: don't store empty space between head and data // TODO undo log: don't store empty space between head and data
// TODO update: only log the key and changed values // TODO update: only log the key and changed values
...@@ -164,7 +169,7 @@ public class PageStore implements CacheWriter { ...@@ -164,7 +169,7 @@ public class PageStore implements CacheWriter {
private int pageSize; private int pageSize;
private int pageSizeShift; private int pageSizeShift;
private long writeCount; private long writeCount;
private int logFirstTrunkPage, logFirstDataPage; private int logKey, logFirstTrunkPage, logFirstDataPage;
private Cache cache; private Cache cache;
...@@ -264,7 +269,6 @@ public class PageStore implements CacheWriter { ...@@ -264,7 +269,6 @@ public class PageStore implements CacheWriter {
} else { } else {
openNew(); openNew();
} }
// lastUsedPage = getFreeList().getLastUsed() + 1;
} catch (SQLException e) { } catch (SQLException e) {
close(); close();
throw e; throw e;
...@@ -301,7 +305,7 @@ public class PageStore implements CacheWriter { ...@@ -301,7 +305,7 @@ public class PageStore implements CacheWriter {
} }
readVariableHeader(); readVariableHeader();
log = new PageLog(this); log = new PageLog(this);
log.openForReading(logFirstTrunkPage, logFirstDataPage); log.openForReading(logKey, logFirstTrunkPage, logFirstDataPage);
recover(); recover();
if (!database.isReadOnly()) { if (!database.isReadOnly()) {
recoveryRunning = true; recoveryRunning = true;
...@@ -384,7 +388,7 @@ public class PageStore implements CacheWriter { ...@@ -384,7 +388,7 @@ public class PageStore implements CacheWriter {
recoveryRunning = true; recoveryRunning = true;
try { try {
log.free(); log.free();
setLogFirstPage(0, 0); setLogFirstPage(0, 0, 0);
} finally { } finally {
recoveryRunning = false; recoveryRunning = false;
} }
...@@ -443,7 +447,7 @@ public class PageStore implements CacheWriter { ...@@ -443,7 +447,7 @@ public class PageStore implements CacheWriter {
if (rec != null) { if (rec != null) {
return (Page) rec; return (Page) rec;
} }
Data data = Data.create(database, pageSize); Data data = createData();
readPage(pageId, data); readPage(pageId, data);
data.readInt(); data.readInt();
int type = data.readByte(); int type = data.readByte();
...@@ -510,17 +514,17 @@ public class PageStore implements CacheWriter { ...@@ -510,17 +514,17 @@ public class PageStore implements CacheWriter {
private void switchLog() throws SQLException { private void switchLog() throws SQLException {
trace.debug("switchLog"); trace.debug("switchLog");
Session[] sessions = database.getSessions(true); Session[] sessions = database.getSessions(true);
int firstUncommittedLog = log.getLogSectionId(); int firstUncommittedSection = log.getLogSectionId();
for (int i = 0; i < sessions.length; i++) { for (int i = 0; i < sessions.length; i++) {
Session session = sessions[i]; Session session = sessions[i];
int log = session.getFirstUncommittedLog(); int log = session.getFirstUncommittedLog();
if (log != LogSystem.LOG_WRITTEN) { if (log != LogSystem.LOG_WRITTEN) {
if (log < firstUncommittedLog) { if (log < firstUncommittedSection) {
firstUncommittedLog = log; firstUncommittedSection = log;
} }
} }
} }
log.removeUntil(firstUncommittedLog); log.removeUntil(firstUncommittedSection);
} }
private void readStaticHeader() throws SQLException { private void readStaticHeader() throws SQLException {
...@@ -544,22 +548,30 @@ public class PageStore implements CacheWriter { ...@@ -544,22 +548,30 @@ public class PageStore implements CacheWriter {
} }
private void readVariableHeader() throws SQLException { private void readVariableHeader() throws SQLException {
Data page = Data.create(database, pageSize); Data page = createData();
for (int i = 1;; i++) { for (int i = 1;; i++) {
if (i == 3) { if (i == 3) {
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, fileName); throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, fileName);
} }
page.reset(); page.reset();
readPage(i, page); readPage(i, page);
writeCount = page.readLong(); int type = page.readByte();
logFirstTrunkPage = page.readInt(); if (type == Page.TYPE_HEADER) {
logFirstDataPage = page.readInt(); writeCount = page.readLong();
CRC32 crc = new CRC32(); logKey = page.readInt();
crc.update(page.getBytes(), 0, page.length()); logFirstTrunkPage = page.readInt();
long expected = crc.getValue(); logFirstDataPage = page.readInt();
long got = page.readLong(); // read the CRC, then reset it to 0
if (expected == got) { int start = page.length();
break; int got = page.readInt();
page.setPos(start);
page.writeInt(0);
CRC32 crc = new CRC32();
crc.update(page.getBytes(), 0, pageSize);
int expected = (int) crc.getValue();
if (expected == got) {
break;
}
} }
} }
} }
...@@ -603,23 +615,30 @@ public class PageStore implements CacheWriter { ...@@ -603,23 +615,30 @@ public class PageStore implements CacheWriter {
/** /**
* Set the trunk page and data page id of the log. * Set the trunk page and data page id of the log.
* *
* @param logKey the log key of the trunk page
* @param trunkPageId the trunk page id * @param trunkPageId the trunk page id
* @param dataPageId the data page id * @param dataPageId the data page id
*/ */
void setLogFirstPage(int trunkPageId, int dataPageId) throws SQLException { void setLogFirstPage(int logKey, int trunkPageId, int dataPageId) throws SQLException {
if (trace.isDebugEnabled()) {
trace.debug("setLogFirstPage key: " + logKey + " trunk: " + trunkPageId + " data: " + dataPageId);
}
this.logKey = logKey;
this.logFirstTrunkPage = trunkPageId; this.logFirstTrunkPage = trunkPageId;
this.logFirstDataPage = dataPageId; this.logFirstDataPage = dataPageId;
writeVariableHeader(); writeVariableHeader();
} }
private void writeVariableHeader() throws SQLException { private void writeVariableHeader() throws SQLException {
Data page = Data.create(database, pageSize); Data page = createData();
page.writeByte((byte) Page.TYPE_HEADER);
page.writeLong(writeCount); page.writeLong(writeCount);
page.writeInt(logKey);
page.writeInt(logFirstTrunkPage); page.writeInt(logFirstTrunkPage);
page.writeInt(logFirstDataPage); page.writeInt(logFirstDataPage);
CRC32 crc = new CRC32(); CRC32 crc = new CRC32();
crc.update(page.getBytes(), 0, page.length()); crc.update(page.getBytes(), 0, pageSize);
page.writeLong(crc.getValue()); page.writeInt((int) crc.getValue());
file.seek(pageSize); file.seek(pageSize);
file.write(page.getBytes(), 0, pageSize); file.write(page.getBytes(), 0, pageSize);
file.seek(pageSize + pageSize); file.seek(pageSize + pageSize);
...@@ -709,12 +728,12 @@ public class PageStore implements CacheWriter { ...@@ -709,12 +728,12 @@ public class PageStore implements CacheWriter {
private PageFreeList getFreeList(int i) throws SQLException { private PageFreeList getFreeList(int i) throws SQLException {
PageFreeList list = null; PageFreeList list = null;
// if (i < freeLists.size()) { if (i < freeLists.size()) {
// list = freeLists.get(i); list = freeLists.get(i);
// if (list != null) { if (list != null) {
// return list; return list;
// } }
// } }
int p = PAGE_ID_FREE_LIST_ROOT + i * freeListPagesPerList; int p = PAGE_ID_FREE_LIST_ROOT + i * freeListPagesPerList;
while (p >= pageCount) { while (p >= pageCount) {
increaseFileSize(INCREMENT_PAGES); increaseFileSize(INCREMENT_PAGES);
...@@ -799,6 +818,9 @@ public class PageStore implements CacheWriter { ...@@ -799,6 +818,9 @@ public class PageStore implements CacheWriter {
} }
private void increaseFileSize(int increment) throws SQLException { private void increaseFileSize(int increment) throws SQLException {
for (int i = pageCount; i < pageCount + increment; i++) {
freed.set(i);
}
pageCount += increment; pageCount += increment;
long newLength = (long) pageCount << pageSizeShift; long newLength = (long) pageCount << pageSizeShift;
file.setLength(newLength); file.setLength(newLength);
......
...@@ -14,25 +14,25 @@ import org.h2.engine.Session; ...@@ -14,25 +14,25 @@ import org.h2.engine.Session;
* <ul> * <ul>
* <li>the trunk page id: int (0-3)</li> * <li>the trunk page id: int (0-3)</li>
* <li>page type: byte (4)</li> * <li>page type: byte (4)</li>
* <li>the number of bytes used: short (5-6)</li> * <li>log key: int (5-8)</li>
* <li>data (9-)</li> * <li>data (9-)</li>
* </ul> * </ul>
*/ */
public class PageStreamData extends Page { public class PageStreamData extends Page {
private static final int LENGTH_START = 5;
private static final int DATA_START = 9; private static final int DATA_START = 9;
private final PageStore store; private final PageStore store;
private int trunk; private int trunk;
private int logKey;
private Data data; private Data data;
private int remaining; private int remaining;
private int length;
private PageStreamData(PageStore store, int pageId, int trunk) { private PageStreamData(PageStore store, int pageId, int trunk, int logKey) {
setPos(pageId); setPos(pageId);
this.store = store; this.store = store;
this.trunk = trunk; this.trunk = trunk;
this.logKey = logKey;
} }
/** /**
...@@ -44,7 +44,7 @@ public class PageStreamData extends Page { ...@@ -44,7 +44,7 @@ public class PageStreamData extends Page {
* @return the page * @return the page
*/ */
static PageStreamData read(PageStore store, Data data, int pageId) { static PageStreamData read(PageStore store, Data data, int pageId) {
PageStreamData p = new PageStreamData(store, pageId, 0); PageStreamData p = new PageStreamData(store, pageId, 0, 0);
p.data = data; p.data = data;
p.read(); p.read();
return p; return p;
...@@ -56,10 +56,11 @@ public class PageStreamData extends Page { ...@@ -56,10 +56,11 @@ public class PageStreamData extends Page {
* @param store the page store * @param store the page store
* @param pageId the page id * @param pageId the page id
* @param trunk the trunk page * @param trunk the trunk page
* @param logKey the log key
* @return the page * @return the page
*/ */
static PageStreamData create(PageStore store, int pageId, int trunk) { static PageStreamData create(PageStore store, int pageId, int trunk, int logKey) {
return new PageStreamData(store, pageId, trunk); return new PageStreamData(store, pageId, trunk, logKey);
} }
/** /**
...@@ -68,9 +69,8 @@ public class PageStreamData extends Page { ...@@ -68,9 +69,8 @@ public class PageStreamData extends Page {
private void read() { private void read() {
data.reset(); data.reset();
trunk = data.readInt(); trunk = data.readInt();
data.setPos(4);
data.readByte(); data.readByte();
length = data.readInt(); logKey = data.readInt();
} }
public int getByteCount(DataPage dummy) { public int getByteCount(DataPage dummy) {
...@@ -84,9 +84,8 @@ public class PageStreamData extends Page { ...@@ -84,9 +84,8 @@ public class PageStreamData extends Page {
data = store.createData(); data = store.createData();
data.writeInt(trunk); data.writeInt(trunk);
data.writeByte((byte) Page.TYPE_STREAM_DATA); data.writeByte((byte) Page.TYPE_STREAM_DATA);
data.writeInt(0); data.writeInt(logKey);
remaining = store.getPageSize() - data.length(); remaining = store.getPageSize() - data.length();
length = 0;
} }
/** /**
...@@ -100,13 +99,11 @@ public class PageStreamData extends Page { ...@@ -100,13 +99,11 @@ public class PageStreamData extends Page {
int write(byte[] buff, int offset, int len) { int write(byte[] buff, int offset, int len) {
int max = Math.min(remaining, len); int max = Math.min(remaining, len);
data.write(buff, offset, max); data.write(buff, offset, max);
length += max;
remaining -= max; remaining -= max;
return max; return max;
} }
public void write(DataPage buff) throws SQLException { public void write(DataPage buff) throws SQLException {
data.setInt(LENGTH_START, length);
store.writePage(getPos(), data); store.writePage(getPos(), data);
} }
...@@ -120,10 +117,6 @@ public class PageStreamData extends Page { ...@@ -120,10 +117,6 @@ public class PageStreamData extends Page {
return pageSize - DATA_START; return pageSize - DATA_START;
} }
int getLength() {
return length;
}
/** /**
* Read the next bytes from the buffer. * Read the next bytes from the buffer.
* *
...@@ -158,11 +151,19 @@ public class PageStreamData extends Page { ...@@ -158,11 +151,19 @@ public class PageStreamData extends Page {
*/ */
void initRead() { void initRead() {
data.setPos(DATA_START); data.setPos(DATA_START);
remaining = length; remaining = store.getPageSize() - DATA_START;
} }
public void moveTo(Session session, int newPos) { public void moveTo(Session session, int newPos) {
// not required // not required
} }
int getLogKey() {
return logKey;
}
public String toString() {
return "[" + getPos() + "] stream data pos:" + data.length() + " remaining:" + remaining;
}
} }
\ No newline at end of file
...@@ -15,28 +15,31 @@ import org.h2.engine.Session; ...@@ -15,28 +15,31 @@ import org.h2.engine.Session;
* <ul> * <ul>
* <li>previous trunk page, or 0 if none: int (0-3)</li> * <li>previous trunk page, or 0 if none: int (0-3)</li>
* <li>page type: byte (4)</li> * <li>page type: byte (4)</li>
* <li>next trunk page: int (5-8)</li> * <li>log key: int (5-8)</li>
* <li>number of pages (9-10)</li> * <li>next trunk page: int (9-12)</li>
* <li>remainder: page ids (11-)</li> * <li>number of pages: short (13-14)</li>
* <li>page ids (15-)</li>
* </ul> * </ul>
*/ */
public class PageStreamTrunk extends Page { public class PageStreamTrunk extends Page {
private static final int DATA_START = 11; private static final int DATA_START = 15;
private final PageStore store; private final PageStore store;
private int parent; private int parent;
private int logKey;
private int nextTrunk; private int nextTrunk;
private int[] pageIds; private int[] pageIds;
private int pageCount; private int pageCount;
private Data data; private Data data;
private int index; private int index;
private PageStreamTrunk(PageStore store, int parent, int pageId, int next, int[] pageIds) { private PageStreamTrunk(PageStore store, int parent, int pageId, int next, int logKey, int[] pageIds) {
setPos(pageId); setPos(pageId);
this.parent = parent; this.parent = parent;
this.store = store; this.store = store;
this.nextTrunk = next; this.nextTrunk = next;
this.logKey = logKey;
this.pageCount = pageIds.length; this.pageCount = pageIds.length;
this.pageIds = pageIds; this.pageIds = pageIds;
} }
...@@ -68,11 +71,12 @@ public class PageStreamTrunk extends Page { ...@@ -68,11 +71,12 @@ public class PageStreamTrunk extends Page {
* @param parent the parent page * @param parent the parent page
* @param pageId the page id * @param pageId the page id
* @param next the next trunk page * @param next the next trunk page
* @param logKey the log key
* @param pageIds the stream data page ids * @param pageIds the stream data page ids
* @return the page * @return the page
*/ */
static PageStreamTrunk create(PageStore store, int parent, int pageId, int next, int[] pageIds) { static PageStreamTrunk create(PageStore store, int parent, int pageId, int next, int logKey, int[] pageIds) {
return new PageStreamTrunk(store, parent, pageId, next, pageIds); return new PageStreamTrunk(store, parent, pageId, next, logKey, pageIds);
} }
/** /**
...@@ -82,6 +86,7 @@ public class PageStreamTrunk extends Page { ...@@ -82,6 +86,7 @@ public class PageStreamTrunk extends Page {
data.reset(); data.reset();
parent = data.readInt(); parent = data.readInt();
data.readByte(); data.readByte();
logKey = data.readInt();
nextTrunk = data.readInt(); nextTrunk = data.readInt();
pageCount = data.readShortInt(); pageCount = data.readShortInt();
pageIds = new int[pageCount]; pageIds = new int[pageCount];
...@@ -120,6 +125,7 @@ public class PageStreamTrunk extends Page { ...@@ -120,6 +125,7 @@ public class PageStreamTrunk extends Page {
data = store.createData(); data = store.createData();
data.writeInt(parent); data.writeInt(parent);
data.writeByte((byte) Page.TYPE_STREAM_TRUNK); data.writeByte((byte) Page.TYPE_STREAM_TRUNK);
data.writeInt(logKey);
data.writeInt(nextTrunk); data.writeInt(nextTrunk);
data.writeShortInt(pageCount); data.writeShortInt(pageCount);
for (int i = 0; i < pageCount; i++) { for (int i = 0; i < pageCount; i++) {
...@@ -159,16 +165,13 @@ public class PageStreamTrunk extends Page { ...@@ -159,16 +165,13 @@ public class PageStreamTrunk extends Page {
* @return the number of pages freed * @return the number of pages freed
*/ */
int free() throws SQLException { int free() throws SQLException {
Data empty = store.createData();
store.freePage(getPos(), false, null); store.freePage(getPos(), false, null);
int freed = 1; int freed = 1;
for (int i = 0; i < pageCount; i++) { for (int i = 0; i < pageCount; i++) {
int page = pageIds[i]; int page = pageIds[i];
store.freePage(page, false, null); store.freePage(page, false, null);
freed++; freed++;
store.writePage(page, empty);
} }
store.writePage(getPos(), empty);
return freed; return freed;
} }
...@@ -201,4 +204,8 @@ public class PageStreamTrunk extends Page { ...@@ -201,4 +204,8 @@ public class PageStreamTrunk extends Page {
// not required // not required
} }
int getLogKey() {
return logKey;
}
} }
...@@ -817,7 +817,7 @@ public class Recover extends Tool implements DataHandler { ...@@ -817,7 +817,7 @@ public class Recover extends Tool implements DataHandler {
} }
int pageCount = (int) (length / pageSize); int pageCount = (int) (length / pageSize);
s = Data.create(this, pageSize); s = Data.create(this, pageSize);
int logFirstTrunkPage = 0, logFirstDataPage = 0; int logKey = 0, logFirstTrunkPage = 0, logFirstDataPage = 0;
for (int i = 1;; i++) { for (int i = 1;; i++) {
if (i == 3) { if (i == 3) {
break; break;
...@@ -825,22 +825,28 @@ public class Recover extends Tool implements DataHandler { ...@@ -825,22 +825,28 @@ public class Recover extends Tool implements DataHandler {
s.reset(); s.reset();
store.seek(i * pageSize); store.seek(i * pageSize);
store.readFully(s.getBytes(), 0, pageSize); store.readFully(s.getBytes(), 0, pageSize);
int type = s.readByte();
long writeCounter = s.readLong(); long writeCounter = s.readLong();
int key = s.readInt();
int firstTrunkPage = s.readInt(); int firstTrunkPage = s.readInt();
int firstDataPage = s.readInt(); int firstDataPage = s.readInt();
int start = s.length();
int got = s.readInt();
s.setPos(start);
s.writeInt(0);
CRC32 crc = new CRC32(); CRC32 crc = new CRC32();
crc.update(s.getBytes(), 0, s.length()); crc.update(s.getBytes(), 0, pageSize);
long expected = crc.getValue(); int expected = (int) crc.getValue();
long got = s.readLong();
if (expected == got) { if (expected == got) {
if (logFirstTrunkPage == 0) { if (logFirstTrunkPage == 0) {
logKey = key;
logFirstTrunkPage = firstTrunkPage; logFirstTrunkPage = firstTrunkPage;
logFirstDataPage = firstDataPage; logFirstDataPage = firstDataPage;
} }
} }
writer.println("-- head " + i + writer.println("-- head " + i +
": writeCounter: " + writeCounter + ": type: " + type + " writeCounter: " + writeCounter +
" trunk: " + firstTrunkPage + "/" + firstDataPage + " log key: " + key + " trunk: " + firstTrunkPage + "/" + firstDataPage +
" crc expected " + expected + " crc expected " + expected +
" got " + got + " (" + (expected == got ? "ok" : "different") + ")"); " got " + got + " (" + (expected == got ? "ok" : "different") + ")");
} }
...@@ -934,7 +940,7 @@ public class Recover extends Tool implements DataHandler { ...@@ -934,7 +940,7 @@ public class Recover extends Tool implements DataHandler {
} }
writeSchema(writer); writeSchema(writer);
try { try {
dumpPageLogStream(writer, logFirstTrunkPage, logFirstDataPage); dumpPageLogStream(writer, logKey, logFirstTrunkPage, logFirstDataPage);
} catch (EOFException e) { } catch (EOFException e) {
// ignore // ignore
} }
...@@ -955,10 +961,10 @@ public class Recover extends Tool implements DataHandler { ...@@ -955,10 +961,10 @@ public class Recover extends Tool implements DataHandler {
} }
} }
private void dumpPageLogStream(PrintWriter writer, int logFirstTrunkPage, int logFirstDataPage) throws IOException, SQLException { private void dumpPageLogStream(PrintWriter writer, int logKey, int logFirstTrunkPage, int logFirstDataPage) throws IOException, SQLException {
Data s = Data.create(this, pageSize); Data s = Data.create(this, pageSize);
DataReader in = new DataReader( DataReader in = new DataReader(
new PageInputStream(writer, this, store, logFirstTrunkPage, logFirstDataPage, pageSize) new PageInputStream(writer, this, store, logKey, logFirstTrunkPage, logFirstDataPage, pageSize)
); );
while (true) { while (true) {
int x = in.read(); int x = in.read();
...@@ -1037,16 +1043,17 @@ public class Recover extends Tool implements DataHandler { ...@@ -1037,16 +1043,17 @@ public class Recover extends Tool implements DataHandler {
private IntArray dataPages = new IntArray(); private IntArray dataPages = new IntArray();
private boolean endOfFile; private boolean endOfFile;
private int remaining; private int remaining;
private int logKey;
public PageInputStream(PrintWriter writer, DataHandler handler, public PageInputStream(PrintWriter writer, DataHandler handler,
FileStore store, int firstTrunkPage, int firstDataPage, int pageSize) { FileStore store, int logKey, int firstTrunkPage, int firstDataPage, int pageSize) {
this.writer = writer; this.writer = writer;
this.store = store; this.store = store;
this.pageSize = pageSize; this.pageSize = pageSize;
this.logKey = logKey - 1;
this.trunkPage = firstTrunkPage; this.trunkPage = firstTrunkPage;
this.dataPage = firstDataPage; this.dataPage = firstDataPage;
page = DataPage.create(handler, pageSize); page = DataPage.create(handler, pageSize);
} }
public int read() throws IOException { public int read() throws IOException {
...@@ -1108,7 +1115,12 @@ public class Recover extends Tool implements DataHandler { ...@@ -1108,7 +1115,12 @@ public class Recover extends Tool implements DataHandler {
return; return;
} }
trunkPage = page.readInt(); trunkPage = page.readInt();
int pageCount = page.readInt(); int key = page.readInt();
logKey++;
if (key != logKey) {
writer.println("-- eof page: " +trunkPage + " type: " + t + " expected key: " + logKey + " got: " + key);
}
int pageCount = page.readShortInt();
for (int i = 0; i < pageCount; i++) { for (int i = 0; i < pageCount; i++) {
int d = page.readInt(); int d = page.readInt();
if (dataPage != 0) { if (dataPage != 0) {
...@@ -1130,11 +1142,17 @@ public class Recover extends Tool implements DataHandler { ...@@ -1130,11 +1142,17 @@ public class Recover extends Tool implements DataHandler {
page.reset(); page.reset();
int p = page.readInt(); int p = page.readInt();
int t = page.readByte(); int t = page.readByte();
int k = page.readInt();
if (t != Page.TYPE_STREAM_DATA) { if (t != Page.TYPE_STREAM_DATA) {
writer.println("-- eof page: " +nextPage+ " type: " + t + " parent: " + p + writer.println("-- eof page: " +nextPage+ " type: " + t + " parent: " + p +
" expected type: " + Page.TYPE_STREAM_DATA); " expected type: " + Page.TYPE_STREAM_DATA);
endOfFile = true; endOfFile = true;
return; return;
} else if (k != logKey) {
writer.println("-- eof page: " +nextPage+ " type: " + t + " parent: " + p +
" expected key: " + logKey + " got: " + k);
endOfFile = true;
return;
} }
remaining = page.readInt(); remaining = page.readInt();
} catch (SQLException e) { } catch (SQLException e) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论