提交 5df4131c authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 5785fdf3
...@@ -57,9 +57,14 @@ public class Page { ...@@ -57,9 +57,14 @@ public class Page {
public static final int TYPE_FREE_LIST = 7; public static final int TYPE_FREE_LIST = 7;
/** /**
* A log page. * A stream trunk page.
*/ */
public static final int TYPE_LOG = 8; public static final int TYPE_STREAM_TRUNK = 8;
/**
* A stream data page.
*/
public static final int TYPE_STREAM_DATA = 9;
/** /**
* This is a root page. * This is a root page.
......
...@@ -26,25 +26,15 @@ public class PageFreeList extends Record { ...@@ -26,25 +26,15 @@ public class PageFreeList extends Record {
private final PageStore store; private final PageStore store;
private final BitField used = new BitField(); private final BitField used = new BitField();
private final int firstAddressed;
private final int pageCount; private final int pageCount;
private final int nextPage;
private boolean full; private boolean full;
private DataPage data; private DataPage data;
PageFreeList(PageStore store, int pageId, int firstAddressed) { PageFreeList(PageStore store, int pageId) {
setPos(pageId); setPos(pageId);
this.store = store; this.store = store;
this.firstAddressed = firstAddressed;
pageCount = (store.getPageSize() - DATA_START) * 8; pageCount = (store.getPageSize() - DATA_START) * 8;
for (int i = firstAddressed; i <= pageId; i++) { used.set(pageId);
used.set(getAddress(i));
}
nextPage = firstAddressed + pageCount;
}
private int getAddress(int pageId) {
return pageId - firstAddressed;
} }
/** /**
...@@ -54,20 +44,16 @@ public class PageFreeList extends Record { ...@@ -54,20 +44,16 @@ public class PageFreeList extends Record {
*/ */
int allocate() throws SQLException { int allocate() throws SQLException {
if (full) { if (full) {
PageFreeList next = getNext(); return -1;
if (next == null) {
return -1;
}
return next.allocate();
} }
int free = used.nextClearBit(0); int free = used.nextClearBit(0);
if (free > pageCount) { if (free > pageCount) {
full = true; full = true;
return allocate(); return -1;
} }
used.set(free); used.set(free);
store.updateRecord(this, true, data); store.updateRecord(this, true, data);
return free + firstAddressed; return free + getPos();
} }
/** /**
...@@ -81,25 +67,8 @@ public class PageFreeList extends Record { ...@@ -81,25 +67,8 @@ public class PageFreeList extends Record {
return allocate(pos); return allocate(pos);
} }
public int getLastUsed() throws SQLException { int getLastUsed() {
if (nextPage < store.getPageCount()) { return used.getLastSetBit() + getPos();
PageFreeList next = getNext();
// TODO avoid recursion
return next.getLastUsed();
}
return used.getLastSetBit() + firstAddressed;
}
private PageFreeList getNext() throws SQLException {
PageFreeList next = (PageFreeList) store.getRecord(nextPage);
if (next == null) {
if (nextPage < store.getPageCount()) {
next = new PageFreeList(store, nextPage, nextPage);
next.read();
store.updateRecord(next, false, null);
}
}
return next;
} }
/** /**
...@@ -109,16 +78,9 @@ public class PageFreeList extends Record { ...@@ -109,16 +78,9 @@ public class PageFreeList extends Record {
* @return the page id, or -1 * @return the page id, or -1
*/ */
int allocate(int pos) throws SQLException { int allocate(int pos) throws SQLException {
if (pos - firstAddressed > pageCount) { int idx = pos - getPos();
PageFreeList next = getNext();
if (next == null) {
return -1;
}
return next.allocate(pos);
}
int idx = pos - firstAddressed;
if (idx >= 0 && !used.get(idx)) { if (idx >= 0 && !used.get(idx)) {
used.set(pos - firstAddressed); used.set(idx);
store.updateRecord(this, true, data); store.updateRecord(this, true, data);
} }
return pos; return pos;
...@@ -131,7 +93,7 @@ public class PageFreeList extends Record { ...@@ -131,7 +93,7 @@ public class PageFreeList extends Record {
*/ */
void free(int pageId) throws SQLException { void free(int pageId) throws SQLException {
full = false; full = false;
used.clear(pageId - firstAddressed); used.clear(pageId - getPos());
store.updateRecord(this, true, data); store.updateRecord(this, true, data);
} }
...@@ -153,6 +115,7 @@ public class PageFreeList extends Record { ...@@ -153,6 +115,7 @@ public class PageFreeList extends Record {
for (int i = 0; i < pageCount; i += 8) { for (int i = 0; i < pageCount; i += 8) {
used.setByte(i, data.readByte()); used.setByte(i, data.readByte());
} }
full = used.nextClearBit(0) >= pageCount * 8;
} }
public int getByteCount(DataPage dummy) { public int getByteCount(DataPage dummy) {
...@@ -170,4 +133,18 @@ public class PageFreeList extends Record { ...@@ -170,4 +133,18 @@ public class PageFreeList extends Record {
store.writePage(getPos(), data); store.writePage(getPos(), data);
} }
boolean isFull() {
return full;
}
/**
* Get the number of pages that can fit in a free list.
*
* @param pageSize the page size
* @return the number of pages
*/
static int getPagesAddressed(int pageSize) {
return (pageSize - DATA_START) * 8;
}
} }
...@@ -6,49 +6,30 @@ ...@@ -6,49 +6,30 @@
*/ */
package org.h2.store; package org.h2.store;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.sql.SQLException; import java.sql.SQLException;
import org.h2.index.Page;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.message.Trace; import org.h2.message.Trace;
/** /**
* An output stream that writes into a page store. * An input stream that reads from a page store.
* The format is:
* <ul><li>0-3: parent page id
* </li><li>4-4: page type
* </li><li>5-5: stream id
* </li><li>6-9: the next page (if there is one) or length
* </li><li>10-remainder: data
* </li></ul>
*/ */
public class PageInputStream extends InputStream { public class PageInputStream extends InputStream {
/**
* The number of header bytes per stream page.
*/
public static final int OVERHEAD = 10;
private PageStore store; private PageStore store;
private final Trace trace; private final Trace trace;
private int parentPage; private int trunkNext;
private int type; private PageStreamTrunk trunk;
private int streamId = -1; private PageStreamData data;
private int nextPage;
private DataPage page;
private boolean endOfFile; private boolean endOfFile;
private int remaining; private int remaining;
private byte[] buffer = new byte[1]; private byte[] buffer = new byte[1];
public PageInputStream(PageStore store, int parentPage, int headPage, int type) { public PageInputStream(PageStore store, int trunkPage) {
this.store = store; this.store = store;
this.trace = store.getTrace(); this.trace = store.getTrace();
this.parentPage = parentPage; this.trunkNext = trunkPage;
this.type = type;
nextPage = headPage;
page = store.createDataPage();
} }
public int read() throws IOException { public int read() throws IOException {
...@@ -78,53 +59,47 @@ public class PageInputStream extends InputStream { ...@@ -78,53 +59,47 @@ public class PageInputStream extends InputStream {
} }
private int readBlock(byte[] buff, int off, int len) throws IOException { private int readBlock(byte[] buff, int off, int len) throws IOException {
fillBuffer(); try {
if (endOfFile) { fillBuffer();
return -1; if (endOfFile) {
return -1;
}
int l = Math.min(remaining, len);
data.read(buff, off, l);
remaining -= l;
return l;
} catch (SQLException e) {
throw Message.convertToIOException(e);
} }
int l = Math.min(remaining, len);
page.read(buff, off, l);
remaining -= l;
return l;
} }
private void fillBuffer() throws IOException { private void fillBuffer() throws SQLException {
if (remaining > 0 || endOfFile) { if (remaining > 0 || endOfFile) {
return; return;
} }
if (nextPage == 0) { if (trunkNext == 0) {
endOfFile = true; endOfFile = true;
return; return;
} }
page.reset(); if (trunk == null) {
try { trunk = new PageStreamTrunk(store, trunkNext);
store.readPage(nextPage, page); trunk.read();
} catch (SQLException e) {
throw Message.convertToIOException(e);
}
int p = page.readInt();
int t = page.readByte();
int id = page.readByte();
if (streamId == -1) {
// set the stream id on the first page
streamId = id;
} }
boolean last = (t & Page.FLAG_LAST) != 0; int next;
t &= ~Page.FLAG_LAST; while (true) {
if (type != t || p != parentPage || id != streamId) { next = trunk.getNextPage();
throw new EOFException(); if (next >= 0) {
} break;
parentPage = nextPage; }
if (last) { trunk = new PageStreamTrunk(store, trunkNext);
nextPage = 0; trunk.read();
remaining = page.readInt();
} else {
nextPage = page.readInt();
remaining = store.getPageSize() - page.length();
} }
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("pageIn.readPage " + parentPage + " next:" + nextPage); trace.debug("pageIn.readPage " + next);
} }
data = new PageStreamData(store, next, 0);
data.read();
remaining = data.getLength();
} }
} }
...@@ -59,7 +59,6 @@ public class PageLog { ...@@ -59,7 +59,6 @@ public class PageLog {
public static final int REMOVE = 4; public static final int REMOVE = 4;
private final PageStore store; private final PageStore store;
private int id;
private int pos; private int pos;
private Trace trace; private Trace trace;
...@@ -70,7 +69,6 @@ public class PageLog { ...@@ -70,7 +69,6 @@ public class PageLog {
private DataPage data; private DataPage data;
private long operation; private long operation;
private BitField undo = new BitField(); private BitField undo = new BitField();
private int[] reservedPages = new int[3];
PageLog(PageStore store, int firstPage) { PageLog(PageStore store, int firstPage) {
this.store = store; this.store = store;
...@@ -82,35 +80,20 @@ public class PageLog { ...@@ -82,35 +80,20 @@ public class PageLog {
/** /**
* Open the log for writing. For an existing database, the recovery * Open the log for writing. For an existing database, the recovery
* must be run first. * must be run first.
*
* @param id the log id
*/ */
void openForWriting(int id) throws SQLException { void openForWriting() {
this.id = id; trace.debug("log openForWriting firstPage:" + firstPage);
trace.debug("log openForWriting " + id + " firstPage:" + firstPage); pageOut = new PageOutputStream(store, firstPage);
pageOut = new PageOutputStream(store, 0, firstPage, Page.TYPE_LOG, id, true);
out = new DataOutputStream(pageOut); out = new DataOutputStream(pageOut);
try {
out.writeInt(id);
out.flush();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
} }
/** /**
* Open the log for reading. This will also read the log id. * Open the log for reading.
*
* @return the log id
*/ */
int openForReading() { void openForReading() {
in = new DataInputStream(new PageInputStream(store, 0, firstPage, Page.TYPE_LOG)); in = new DataInputStream(new PageInputStream(store, firstPage));
try { if (trace.isDebugEnabled()) {
id = in.readInt(); trace.debug("log openForReading firstPage:" + firstPage);
trace.debug("log openForReading " + id + " firstPage:" + firstPage + " id:" + id);
return id;
} catch (IOException e) {
return 0;
} }
} }
...@@ -123,8 +106,9 @@ public class PageLog { ...@@ -123,8 +106,9 @@ public class PageLog {
*/ */
void recover(boolean undo) throws SQLException { void recover(boolean undo) throws SQLException {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log recover " + id + " undo:" + undo); trace.debug("log recover undo:" + undo);
} }
int logId = 0;
DataPage data = store.createDataPage(); DataPage data = store.createDataPage();
try { try {
pos = 0; pos = 0;
...@@ -148,7 +132,7 @@ public class PageLog { ...@@ -148,7 +132,7 @@ public class PageLog {
int tableId = in.readInt(); int tableId = in.readInt();
Row row = readRow(in, data); Row row = readRow(in, data);
if (!undo) { if (!undo) {
if (store.isSessionCommitted(sessionId, id, pos)) { if (store.isSessionCommitted(sessionId, logId, pos)) {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log redo " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row); trace.debug("log redo " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row);
} }
...@@ -162,10 +146,10 @@ public class PageLog { ...@@ -162,10 +146,10 @@ public class PageLog {
} else if (x == COMMIT) { } else if (x == COMMIT) {
int sessionId = in.readInt(); int sessionId = in.readInt();
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log commit " + sessionId + " id:" + id + " pos:" + pos); trace.debug("log commit " + sessionId + " pos:" + pos);
} }
if (undo) { if (undo) {
store.setLastCommitForSession(sessionId, id, pos); store.setLastCommitForSession(sessionId, logId, pos);
} }
} else { } else {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
...@@ -221,7 +205,7 @@ public class PageLog { ...@@ -221,7 +205,7 @@ public class PageLog {
trace.debug("log undo " + pageId); trace.debug("log undo " + pageId);
} }
undo.set(pageId); undo.set(pageId);
reservePages(3); pageOut.prepareWriting(store.getPageSize() * 3);
out.write(UNDO); out.write(UNDO);
out.writeInt(pageId); out.writeInt(pageId);
out.write(page.getBytes(), 0, store.getPageSize()); out.write(page.getBytes(), 0, store.getPageSize());
...@@ -230,19 +214,6 @@ public class PageLog { ...@@ -230,19 +214,6 @@ public class PageLog {
} }
} }
private void reservePages(int pageCount) throws SQLException {
int todoThisIsSlow;
if (pageCount > reservedPages.length) {
reservedPages = new int[pageCount];
}
for (int i = 0; i < pageCount; i++) {
reservedPages[i] = store.allocatePage(true);
}
for (int i = pageCount - 1; i >= 0; i--) {
store.freePage(reservedPages[i], false, null);
}
}
/** /**
* Mark a committed transaction. * Mark a committed transaction.
* *
...@@ -258,7 +229,7 @@ public class PageLog { ...@@ -258,7 +229,7 @@ public class PageLog {
// database already closed // database already closed
return; return;
} }
reservePages(1); pageOut.prepareWriting(store.getPageSize());
out.write(COMMIT); out.write(COMMIT);
out.writeInt(session.getId()); out.writeInt(session.getId());
if (log.getFlushOnEachCommit()) { if (log.getFlushOnEachCommit()) {
...@@ -291,8 +262,7 @@ public class PageLog { ...@@ -291,8 +262,7 @@ public class PageLog {
int todoWriteIntoOutputDirectly; int todoWriteIntoOutputDirectly;
row.write(data); row.write(data);
reservePages(3 + data.length() / (store.getPageSize() - PageInputStream.OVERHEAD)); pageOut.prepareWriting(data.length() + store.getPageSize());
out.write(add ? ADD : REMOVE); out.write(add ? ADD : REMOVE);
out.writeInt(session.getId()); out.writeInt(session.getId());
out.writeInt(tableId); out.writeInt(tableId);
...@@ -309,7 +279,7 @@ public class PageLog { ...@@ -309,7 +279,7 @@ public class PageLog {
*/ */
void close() throws SQLException { void close() throws SQLException {
try { try {
trace.debug("log close " + id); trace.debug("log close");
if (out != null) { if (out != null) {
out.close(); out.close();
} }
...@@ -324,11 +294,11 @@ public class PageLog { ...@@ -324,11 +294,11 @@ public class PageLog {
* *
* @param id the new log id * @param id the new log id
*/ */
private void reopen(int id) throws SQLException { private void reopen() throws SQLException {
try { try {
trace.debug("log reopen"); trace.debug("log reopen");
out.close(); out.close();
openForWriting(id); openForWriting();
flush(); flush();
int todoDeleteOrReUsePages; int todoDeleteOrReUsePages;
} catch (IOException e) { } catch (IOException e) {
...@@ -347,15 +317,6 @@ public class PageLog { ...@@ -347,15 +317,6 @@ public class PageLog {
} }
} }
/**
* Get the log id.
*
* @return the log id
*/
int getId() {
return id;
}
/** /**
* Flush and close the log. * Flush and close the log.
*/ */
......
...@@ -9,50 +9,69 @@ package org.h2.store; ...@@ -9,50 +9,69 @@ package org.h2.store;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.sql.SQLException; import java.sql.SQLException;
import org.h2.index.Page;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.util.IntArray;
/** /**
* An output stream that writes into a page store. * An output stream that writes into a page store.
*/ */
public class PageOutputStream extends OutputStream { public class PageOutputStream extends OutputStream {
private final Trace trace;
private PageStore store; private PageStore store;
private int type; private final Trace trace;
private int parentPage; private int trunkPageId;
private int pageId; private int trunkNext;
private int nextPage; private IntArray reservedPages = new IntArray();
private DataPage page; private PageStreamTrunk trunk;
private PageStreamData data;
private int reserved;
private int remaining; private int remaining;
private final boolean allocateAtEnd;
private byte[] buffer = new byte[1]; private byte[] buffer = new byte[1];
private boolean needFlush; private boolean needFlush;
private final int streamId;
private boolean writing; private boolean writing;
/** /**
* Create a new page output stream. * Create a new page output stream.
* *
* @param store the page store * @param store the page store
* @param parentPage the parent page id * @param trunkPage the first trunk page (already allocated)
* @param headPage the first page
* @param type the page type
* @param streamId the stream identifier
* @param allocateAtEnd whether new pages should be allocated at the end of
* the file
*/ */
public PageOutputStream(PageStore store, int parentPage, int headPage, int type, int streamId, boolean allocateAtEnd) { public PageOutputStream(PageStore store, int trunkPage) {
this.trace = store.getTrace(); this.trace = store.getTrace();
this.store = store; this.store = store;
this.parentPage = parentPage; this.trunkPageId = trunkPage;
this.pageId = headPage; }
this.type = type;
this.allocateAtEnd = allocateAtEnd; /**
this.streamId = streamId; * Allocate the required pages so that no pages need to be allocated while
page = store.createDataPage(); * writing.
initPage(); *
* @param minBuffer the number of bytes to allocate
*/
void prepareWriting(int minBuffer) throws SQLException {
if (reserved < minBuffer) {
int pageSize = store.getPageSize();
int capacityPerPage = PageStreamData.getCapacity(pageSize);
int pages = PageStreamTrunk.getPagesAddressed(pageSize);
// allocate x data pages
int pagesToAllocate = pages;
// the first trunk page is already allocated
if (reservedPages.size() == 0) {
reservedPages.add(trunkPageId);
}
int totalCapacity = pages * capacityPerPage;
while (totalCapacity < minBuffer) {
pagesToAllocate += pagesToAllocate;
totalCapacity += totalCapacity;
}
// allocate the next trunk page as well
pagesToAllocate++;
for (int i = 0; i < pagesToAllocate; i++) {
int page = store.allocatePage();
reservedPages.add(page);
}
}
} }
public void write(int b) throws IOException { public void write(int b) throws IOException {
...@@ -64,13 +83,27 @@ public class PageOutputStream extends OutputStream { ...@@ -64,13 +83,27 @@ public class PageOutputStream extends OutputStream {
write(b, 0, b.length); write(b, 0, b.length);
} }
private void initPage() { private void initNextData() {
page.reset(); int nextData = trunk == null ? -1 : trunk.getNextPage();
page.writeInt(parentPage); if (nextData < 0) {
page.writeByte((byte) type); int parent = trunkPageId;
page.writeByte((byte) streamId); if (trunkNext == 0) {
page.writeInt(0); trunkPageId = reservedPages.get(0);
remaining = store.getPageSize() - page.length(); reservedPages.remove(0);
} else {
trunkPageId = trunkNext;
}
int len = PageStreamTrunk.getPagesAddressed(store.getPageSize());
int[] pageIds = new int[len];
for (int i = 0; i < len; i++) {
pageIds[i] = reservedPages.get(i);
}
trunkNext = reservedPages.get(len);
trunk = new PageStreamTrunk(store, parent, trunkPageId, trunkNext, pageIds);
reservedPages.removeRange(0, len + 1);
}
data = new PageStreamData(store, trunk.getNextPage(), trunk.getPos());
data.initWrite();
} }
public void write(byte[] b, int off, int len) throws IOException { public void write(byte[] b, int off, int len) throws IOException {
...@@ -78,31 +111,24 @@ public class PageOutputStream extends OutputStream { ...@@ -78,31 +111,24 @@ public class PageOutputStream extends OutputStream {
return; return;
} }
if (writing) { if (writing) {
throw Message.throwInternalError("writing while still writing"); Message.throwInternalError("writing while still writing");
} }
writing = true; writing = true;
try { try {
while (len >= remaining) { while (len >= 0) {
page.write(b, off, remaining); int l = data.write(b, off, len);
off += remaining; if (l <= len) {
len -= remaining; data.write(null);
try { initNextData();
nextPage = store.allocatePage(allocateAtEnd);
} catch (SQLException e) {
throw Message.convertToIOException(e);
} }
page.setPos(4); reserved -= l;
page.writeByte((byte) type); off += l;
page.writeByte((byte) streamId); len -= l;
page.writeInt(nextPage);
storePage();
parentPage = pageId;
pageId = nextPage;
initPage();
} }
page.write(b, off, len);
needFlush = true; needFlush = true;
remaining -= len; remaining -= len;
} catch (SQLException e) {
throw Message.convertToIOException(e);
} finally { } finally {
writing = false; writing = false;
} }
...@@ -111,9 +137,9 @@ public class PageOutputStream extends OutputStream { ...@@ -111,9 +137,9 @@ public class PageOutputStream extends OutputStream {
private void storePage() throws IOException { private void storePage() throws IOException {
try { try {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("pageOut.storePage " + pageId + " next:" + nextPage); trace.debug("pageOut.storePage " + data.getPos());
} }
store.writePage(pageId, page); data.write(null);
} catch (SQLException e) { } catch (SQLException e) {
throw Message.convertToIOException(e); throw Message.convertToIOException(e);
} }
...@@ -121,12 +147,6 @@ public class PageOutputStream extends OutputStream { ...@@ -121,12 +147,6 @@ public class PageOutputStream extends OutputStream {
public void flush() throws IOException { public void flush() throws IOException {
if (needFlush) { if (needFlush) {
int len = page.length();
page.setPos(4);
page.writeByte((byte) (type | Page.FLAG_LAST));
page.writeByte((byte) streamId);
page.writeInt(store.getPageSize() - remaining - 9);
page.setPos(len);
storePage(); storePage();
needFlush = false; needFlush = false;
} }
......
/*
* Copyright 2004-2009 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.store;
import java.sql.SQLException;
import org.h2.constant.ErrorCode;
import org.h2.index.Page;
import org.h2.message.Message;
/**
* A data page of a stream. The format is:
* <ul>
* <li>0-3: the trunk page id</li>
* <li>4-4: page type</li>
* <li>5-8: the number of bytes used</li>
* <li>9-remainder: data</li>
* </ul>
*/
public class PageStreamData extends Record {
private static final int LENGTH_START = 5;
private static final int DATA_START = 9;
private final PageStore store;
private final int trunk;
private DataPage data;
private int remaining;
private int length;
PageStreamData(PageStore store, int pageId, int trunk) {
setPos(pageId);
this.store = store;
this.trunk = trunk;
}
/**
* Read the page from the disk.
*/
void read() throws SQLException {
data = store.createDataPage();
store.readPage(getPos(), data);
int t = data.readByte();
if (t != Page.TYPE_STREAM_DATA) {
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, "pos:" + getPos() + " type:" + t +
" expected type:" + Page.TYPE_STREAM_DATA);
}
length = data.readInt();
}
public int getByteCount(DataPage dummy) {
return store.getPageSize();
}
/**
* Write the header data.
*/
void initWrite() {
data = store.createDataPage();
data.writeInt(trunk);
data.writeByte((byte) Page.TYPE_STREAM_DATA);
data.writeInt(0);
remaining = store.getPageSize() - data.length();
}
/**
* Write the data to the buffer.
*
* @param buff the source data
* @param off the offset in the source buffer
* @param len the number of bytes to write
* @return the number of bytes written
*/
int write(byte[] buff, int offset, int len) {
int max = Math.min(remaining, len);
data.write(buff, offset, max);
length += max;
remaining -= max;
return max;
}
public void write(DataPage buff) throws SQLException {
data.setInt(LENGTH_START, length);
store.writePage(getPos(), data);
}
/**
* Get the number of bytes that fit in a page.
*
* @param pageSize the page size
* @return the number of bytes
*/
static int getCapacity(int pageSize) {
return pageSize - DATA_START;
}
int getLength() {
return length;
}
/**
* Read the next bytes from the buffer.
*
* @param buff the target buffer
* @param off the offset in the target buffer
* @param len the number of bytes to read
*/
void read(byte[] buff, int off, int len) {
data.read(buff, off, len);
}
}
\ No newline at end of file
/*
* Copyright 2004-2009 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.store;
import java.sql.SQLException;
import org.h2.constant.ErrorCode;
import org.h2.index.Page;
import org.h2.message.Message;
/**
* A trunk page of a stream. It contains the page numbers of the stream, and
* the page number of the next trunk. The format is:
* <ul>
* <li>0-3: the last trunk page, or 0 if none</li>
* <li>4-4: page type</li>
* <li>5-8: the next trunk page</li>
* <li>9-12: the number of pages</li>
* <li>13-remainder: page ids</li>
* </ul>
*/
public class PageStreamTrunk extends Record {
private static final int DATA_START = 13;
private final PageStore store;
private int parent;
private int nextTrunk;
private int[] pageIds;
private int pageCount;
private DataPage data;
private int index;
PageStreamTrunk(PageStore store, int parent, int pageId, int next, int[] pageIds) {
setPos(pageId);
this.parent = parent;
this.store = store;
this.nextTrunk = next;
this.pageCount = pageIds.length;
this.pageIds = pageIds;
}
public PageStreamTrunk(PageStore store, int pageId) {
setPos(pageId);
this.store = store;
}
/**
* Read the page from the disk.
*/
void read() throws SQLException {
data = store.createDataPage();
store.readPage(getPos(), data);
parent = data.readInt();
int t = data.readByte();
if (t != Page.TYPE_STREAM_TRUNK) {
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, "pos:" + getPos() + " type:" + t + " parent:" + parent
+ " expected type:" + Page.TYPE_STREAM_TRUNK);
}
nextTrunk = data.readInt();
pageCount = data.readInt();
for (int i = 0; i < pageCount; i++) {
pageIds[i] = data.readInt();
}
}
void setNextPage(int page) {
pageIds[index++] = page;
}
int getNextPage() {
if (index >= pageIds.length) {
return -1;
}
return pageIds[index++];
}
int getNextTrunk() {
return nextTrunk;
}
public int getByteCount(DataPage dummy) {
return store.getPageSize();
}
public void write(DataPage buff) throws SQLException {
data = store.createDataPage();
data.writeInt(parent);
data.writeByte((byte) Page.TYPE_STREAM_TRUNK);
data.writeInt(nextTrunk);
data.writeInt(pageCount);
for (int i = 0; i < pageCount; i++) {
data.writeInt(pageIds[i]);
}
store.writePage(getPos(), data);
}
/**
* Get the number of pages that can be addressed in a stream trunk page.
*
* @param pageSize the page size
* @return the number of pages
*/
static int getPagesAddressed(int pageSize) {
return (pageSize - DATA_START) / 4;
}
}
...@@ -790,9 +790,11 @@ public class Recover extends Tool implements DataHandler { ...@@ -790,9 +790,11 @@ public class Recover extends Tool implements DataHandler {
case Page.TYPE_FREE_LIST: case Page.TYPE_FREE_LIST:
writer.println("-- page " + page + ": free list " + (last ? "(last)" : "")); writer.println("-- page " + page + ": free list " + (last ? "(last)" : ""));
break; break;
case Page.TYPE_LOG: case Page.TYPE_STREAM_TRUNK:
writer.println("-- page " + page + ": log " + (last ? "(last)" : "")); writer.println("-- page " + page + ": log trunk");
dumpPageLog(writer, s, last); break;
case Page.TYPE_STREAM_DATA:
writer.println("-- page " + page + ": log data");
break; break;
default: default:
writer.println("-- page " + page + ": ERROR unknown type " + type); writer.println("-- page " + page + ": ERROR unknown type " + type);
...@@ -800,9 +802,9 @@ public class Recover extends Tool implements DataHandler { ...@@ -800,9 +802,9 @@ public class Recover extends Tool implements DataHandler {
} }
} }
writeSchema(writer); writeSchema(writer);
for (int i = 0; i < PageStore.LOG_COUNT; i++) { // for (int i = 0; i < PageStore.LOG_COUNT; i++) {
dumpPageLogStream(writer, store, logHead + i, pageSize); // dumpPageLogStream(writer, store, logHead + i, pageSize);
} // }
writer.close(); writer.close();
} catch (Throwable e) { } catch (Throwable e) {
writeError(writer, e); writeError(writer, e);
...@@ -815,7 +817,8 @@ public class Recover extends Tool implements DataHandler { ...@@ -815,7 +817,8 @@ public class Recover extends Tool implements DataHandler {
private void dumpPageLogStream(PrintWriter writer, FileStore store, int logHead, int pageSize) throws IOException, SQLException { private void dumpPageLogStream(PrintWriter writer, FileStore store, int logHead, int pageSize) throws IOException, SQLException {
DataPage s = DataPage.create(this, pageSize); DataPage s = DataPage.create(this, pageSize);
DataInputStream in = new DataInputStream( DataInputStream in = new DataInputStream(
new PageInputStream(writer, this, store, logHead, pageSize, 0, Page.TYPE_LOG) new PageInputStream(writer, this, store, logHead, pageSize, 0,
Page.TYPE_STREAM_TRUNK)
); );
int logId = in.readInt(); int logId = in.readInt();
writer.println("-- log " + logId); writer.println("-- log " + logId);
...@@ -843,7 +846,6 @@ public class Recover extends Tool implements DataHandler { ...@@ -843,7 +846,6 @@ public class Recover extends Tool implements DataHandler {
break; break;
} }
} }
} }
private void setStorage(int storageId) { private void setStorage(int storageId) {
......
...@@ -310,4 +310,20 @@ public class IntArray { ...@@ -310,4 +310,20 @@ public class IntArray {
return buff.append('}').toString(); return buff.append('}').toString();
} }
/**
* Remove a number of elements.
*
* @param fromIndex the index of the first item to remove
* @param toIndex upper bound (exclusive)
*/
public void removeRange(int fromIndex, int toIndex) {
if (SysProperties.CHECK) {
if (fromIndex > toIndex || toIndex >= size) {
throw new ArrayIndexOutOfBoundsException("from=" + fromIndex + " to=" + toIndex + " size=" + size);
}
}
System.arraycopy(data, toIndex, data, fromIndex, size - toIndex);
size -= toIndex - fromIndex;
}
} }
...@@ -28,6 +28,16 @@ public class TestIntArray extends TestBase { ...@@ -28,6 +28,16 @@ public class TestIntArray extends TestBase {
public void test() { public void test() {
testInit(); testInit();
testRandom(); testRandom();
testRemoveRange();
}
private void testRemoveRange() {
IntArray array = new IntArray(new int[] {1, 2, 3, 4, 5});
array.removeRange(1, 3);
assertEquals(3, array.size());
assertEquals(1, array.get(0));
assertEquals(4, array.get(1));
assertEquals(5, array.get(2));
} }
private void testInit() { private void testInit() {
......
...@@ -313,7 +313,7 @@ public class TestPageStore extends TestBase { ...@@ -313,7 +313,7 @@ public class TestPageStore extends TestBase {
if (file) { if (file) {
out = new BufferedOutputStream(new FileOutputStream(f), 4 * 1024); out = new BufferedOutputStream(new FileOutputStream(f), 4 * 1024);
} else { } else {
out = new PageOutputStream(store, 0, head, Page.TYPE_LOG, 0, false); out = new PageOutputStream(store, 0);
} }
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
out.write(buff); out.write(buff);
...@@ -322,7 +322,7 @@ public class TestPageStore extends TestBase { ...@@ -322,7 +322,7 @@ public class TestPageStore extends TestBase {
if (file) { if (file) {
in = new BufferedInputStream(new FileInputStream(f), 4 * 1024); in = new BufferedInputStream(new FileInputStream(f), 4 * 1024);
} else { } else {
in = new PageInputStream(store, 0, head, Page.TYPE_LOG); in = new PageInputStream(store, 0);
} }
while (true) { while (true) {
int len = in.read(buff); int len = in.read(buff);
...@@ -353,14 +353,14 @@ public class TestPageStore extends TestBase { ...@@ -353,14 +353,14 @@ public class TestPageStore extends TestBase {
byte[] data = new byte[len]; byte[] data = new byte[len];
random.nextBytes(data); random.nextBytes(data);
int head = store.allocatePage(); int head = store.allocatePage();
PageOutputStream out = new PageOutputStream(store, 0, head, Page.TYPE_LOG, 0, false); PageOutputStream out = new PageOutputStream(store, 0);
for (int p = 0; p < len;) { for (int p = 0; p < len;) {
int l = len == 0 ? 0 : Math.min(len - p, random.nextInt(len / 10)); int l = len == 0 ? 0 : Math.min(len - p, random.nextInt(len / 10));
out.write(data, p, l); out.write(data, p, l);
p += l; p += l;
} }
out.close(); out.close();
PageInputStream in = new PageInputStream(store, 0, head, Page.TYPE_LOG); PageInputStream in = new PageInputStream(store, 0);
byte[] data2 = new byte[len]; byte[] data2 = new byte[len];
for (int off = 0;;) { for (int off = 0;;) {
int l = random.nextInt(1 + len / 10) + 1; int l = random.nextInt(1 + len / 10) + 1;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论