提交 5270438e authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 b10f04ec
......@@ -71,7 +71,7 @@ class PageDataLeaf extends PageData {
offsets = new int[entryCount];
keys = new int[entryCount];
rows = new Row[entryCount];
if (type == (Page.TYPE_DATA_LEAF | Page.FLAG_LAST)) {
if (type == Page.TYPE_DATA_LEAF) {
firstOverflowPageId = data.readInt();
}
for (int i = 0; i < entryCount; i++) {
......@@ -91,9 +91,12 @@ class PageDataLeaf extends PageData {
int addRow(Row row) throws SQLException {
int rowLength = row.getByteCount(data);
int pageSize = index.getPageStore().getPageSize();
// TODO currently the order is important
// TODO and can only add at the end
int last = entryCount == 0 ? pageSize : offsets[entryCount - 1];
if (entryCount > 0 && last - rowLength < start + 6) {
int todoSplitAtLastInsertionPoint;
return (entryCount / 2) + 1;
}
int offset = last - rowLength;
int[] newOffsets = new int[entryCount + 1];
......@@ -112,7 +115,6 @@ class PageDataLeaf extends PageData {
System.arraycopy(keys, x, newKeys, x + 1, entryCount - x);
System.arraycopy(rows, x, newRows, x + 1, entryCount - x);
}
return (entryCount / 2) + 1;
}
entryCount++;
start += 6;
......
......@@ -64,6 +64,7 @@ public class PageFreeList extends Record {
* Read the page from the disk.
*/
void read() throws SQLException {
data.reset();
store.readPage(getPos(), data);
int p = data.readInt();
int t = data.readByte();
......@@ -72,7 +73,7 @@ public class PageFreeList extends Record {
if (t != Page.TYPE_FREE_LIST || p != 0) {
throw Message.getSQLException(
ErrorCode.FILE_CORRUPTED_1,
"type:" + t + " parent:" + p +
"pos:" + getPos() + " type:" + t + " parent:" + p +
" expected type:" + Page.TYPE_FREE_LIST);
}
int size;
......@@ -126,6 +127,7 @@ public class PageFreeList extends Record {
for (int i = 0; i < array.size(); i++) {
data.writeInt(array.get(i));
}
store.writePage(getPos(), data);
}
}
......@@ -43,7 +43,7 @@ public class PageInputStream extends InputStream {
public int read() throws IOException {
byte[] b = new byte[1];
int len = read(b);
return len < 0 ? -1 : b[0];
return len < 0 ? -1 : (b[0] & 255);
}
public int read(byte[] b) throws IOException {
......@@ -64,6 +64,9 @@ public class PageInputStream extends InputStream {
off += r;
len -= r;
}
int test;
if(read==0)
System.out.println("stop");
return read == 0 ? -1 : read;
}
......@@ -96,7 +99,7 @@ public class PageInputStream extends InputStream {
if (type != t || p != parentPage) {
throw Message.getSQLException(
ErrorCode.FILE_CORRUPTED_1,
"type:" + t + " parent:" + p +
"page:" +nextPage+ " type:" + t + " parent:" + p +
" expected type:" + type + " expected parent:" + parentPage);
}
parentPage = nextPage;
......@@ -107,6 +110,8 @@ public class PageInputStream extends InputStream {
nextPage = page.readInt();
remaining = store.getPageSize() - page.length();
}
int test;
System.out.println(" pageIn.read " + page + " next:" + nextPage);
} catch (SQLException e) {
throw Message.convertToIOException(e);
}
......
......@@ -14,6 +14,7 @@ import org.h2.engine.Database;
import org.h2.engine.Session;
import org.h2.index.Page;
import org.h2.message.Message;
import org.h2.message.Trace;
import org.h2.result.Row;
import org.h2.util.BitField;
import org.h2.value.Value;
......@@ -28,13 +29,16 @@ import org.h2.value.Value;
*/
public class PageLog {
private static final int UNDO = 0;
private static final int COMMIT = 1;
private static final int ADD = 2;
private static final int REMOVE = 3;
private static final int NO_OP = 0;
private static final int UNDO = 1;
private static final int COMMIT = 2;
private static final int ADD = 3;
private static final int REMOVE = 4;
private PageStore store;
private Trace trace;
private BitField undo = new BitField();
private PageOutputStream pageOut;
private DataOutputStream out;
private int firstPage;
private DataPage data;
......@@ -44,6 +48,7 @@ public class PageLog {
this.store = store;
this.firstPage = firstPage;
data = store.createDataPage();
trace = store.getTrace();
}
/**
......@@ -51,7 +56,9 @@ public class PageLog {
* must be run first.
*/
void openForWriting() {
out = new DataOutputStream(new PageOutputStream(store, 0, firstPage, Page.TYPE_LOG));
trace.debug("openForWriting");
pageOut = new PageOutputStream(store, 0, firstPage, Page.TYPE_LOG);
out = new DataOutputStream(pageOut);
}
/**
......@@ -62,7 +69,7 @@ public class PageLog {
* @param undo true if the undo step should be run
*/
public void recover(boolean undo) throws SQLException {
System.out.println("=recover= " + undo);
trace.debug("recover");
DataInputStream in = new DataInputStream(new PageInputStream(store, 0, firstPage, Page.TYPE_LOG));
DataPage data = store.createDataPage();
try {
......@@ -72,10 +79,10 @@ System.out.println("=recover= " + undo);
if (x < 0) {
break;
}
if (x == UNDO) {
if (x == NO_OP) {
// nothing to do
} else if (x == UNDO) {
int pageId = in.readInt();
int test;
System.out.println("redo " + pageId);
in.read(data.getBytes(), 0, store.getPageSize());
if (undo) {
store.writePage(pageId, data);
......@@ -84,7 +91,6 @@ System.out.println("redo " + pageId);
int sessionId = in.readInt();
int tableId = in.readInt();
Row row = readRow(in);
System.out.println((x == ADD ? " add" : " remove") + (" " + tableId + " " + row));
Database db = store.getDatabase();
if (!undo) {
db.redo(tableId, row, x == ADD);
......@@ -93,11 +99,11 @@ System.out.println((x == ADD ? " add" : " remove") + (" " + tableId + " " + row)
}
}
} catch (IOException e) {
} catch (Exception e) {
int todoOnlyIOExceptionAndSQLException;
int todoSomeExceptionAreOkSomeNot;
e.printStackTrace();
System.out.println("recovery stopped: " + e.toString());
// throw Message.convertIOException(e, "recovering");
//e.printStackTrace();
trace.debug("recovery stopped: " + e.toString());
} finally {
recoveryRunning = false;
}
......@@ -131,8 +137,6 @@ System.out.println("recovery stopped: " + e.toString());
if (undo.get(pageId)) {
return;
}
int test;
System.out.println("undo " + pageId);
out.write(UNDO);
out.writeInt(pageId);
out.write(page.getBytes(), 0, store.getPageSize());
......@@ -149,8 +153,7 @@ System.out.println("undo " + pageId);
*/
public void commit(Session session) throws SQLException {
try {
int test;
System.out.println("commit");
trace.debug("commit");
out.write(COMMIT);
out.writeInt(session.getId());
} catch (IOException e) {
......@@ -171,8 +174,10 @@ System.out.println("commit");
if (recoveryRunning) {
return;
}
int test;
System.out.println(" " + (add?"+":"-") + " tab:" + tableId + " " + row);
if (trace.isDebugEnabled()) {
trace.debug((add?"+":"-") + " table:" + tableId +
" remaining:" + pageOut.getRemainingBytes() + " row:" + row);
}
out.write(add ? ADD : REMOVE);
out.writeInt(session.getId());
out.writeInt(tableId);
......@@ -198,4 +203,32 @@ System.out.println(" " + (add?"+":"-") + " tab:" + tableId + " " + row);
}
}
/**
* Flush the transaction log.
*/
public void flush() throws SQLException {
try {
trace.debug("flush");
out.flush();
int filler = pageOut.getRemainingBytes();
for (int i = 0; i < filler; i++) {
out.writeByte(NO_OP);
}
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/**
* Flush and close the log.
*/
public void close() throws SQLException {
try {
trace.debug("close");
out.close();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
}
......@@ -83,6 +83,8 @@ public class PageOutputStream extends OutputStream {
private void storePage() throws IOException {
try {
int test;
System.out.println(" pageOut.storePage " + pageId + " next:" + nextPage);
store.writePage(pageId, page);
} catch (SQLException e) {
throw Message.convertToIOException(e);
......@@ -102,6 +104,15 @@ public class PageOutputStream extends OutputStream {
int todo;
}
/**
* Get the number of remaining bytes that fit in the current page.
*
* @return the number of bytes
*/
public int getRemainingBytes() {
return remaining;
}
// public void write(byte[] buff, int off, int len) throws IOException {
// if (len > 0) {
// try {
......
......@@ -246,6 +246,7 @@ System.out.println("PageStore.checkpoint");
int todoTruncateLog;
try {
if (file != null) {
log.close();
file.close();
}
} catch (IOException e) {
......@@ -265,7 +266,7 @@ System.out.println("PageStore.checkpoint");
synchronized (database) {
Record record = (Record) obj;
int test;
System.out.println("writeBack " + record);
System.out.println("writeBack " + record.getPos() + ":" + record);
int todoRemoveParameter;
record.write(null);
record.setChanged(false);
......@@ -380,6 +381,9 @@ System.out.println("writeBack " + record);
* @param page the page
*/
public void readPage(int pos, DataPage page) throws SQLException {
if (pos >= pageCount) {
throw Message.getSQLException(ErrorCode.FILE_CORRUPTED_1, pos + " of " + pageCount);
}
file.seek(pos << pageSizeShift);
file.readFully(page.getBytes(), 0, pageSize);
}
......
......@@ -240,7 +240,11 @@ public class TableData extends Table implements RecordReader {
if (index.getIndexType().getPersistent() && !database.getReadOnly()
&& !database.getLog().containsInDoubtTransactions()) {
// can not save anything in the log file if it contains in-doubt transactions
database.update(session, index);
if (!SysProperties.PAGE_STORE) {
// must not do this when using the page store
// because recovery is not done yet
database.update(session, index);
}
}
}
indexes.add(index);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论