提交 bfa2d6dd authored 作者: Thomas Mueller's avatar Thomas Mueller

Experimental page store.

上级 89611112
......@@ -3781,7 +3781,7 @@ public class Parser {
columns.add(new Column(cols[i], Value.STRING));
}
int id = database.allocateObjectId(true, true);
recursiveTable = schema.createTable(tempViewName, id, columns, false, true, false, Index.EMPTY_HEAD);
recursiveTable = schema.createTable(tempViewName, id, columns, false, true, false, Index.EMPTY_HEAD, session);
recursiveTable.setTemporary(true);
session.addLocalTempTable(recursiveTable);
String querySQL = StringCache.getNew(sqlCommand.substring(parseIndex));
......
......@@ -240,7 +240,7 @@ public class AlterTableAlterColumn extends SchemaCommand {
// still need a new id because using 0 would mean: the new table tries
// to use the rows of the table 0 (the meta table)
int id = db.allocateObjectId(true, true);
TableData newTable = getSchema().createTable(tempName, id, newColumns, table.isPersistIndexes(), table.isPersistData(), false, Index.EMPTY_HEAD);
TableData newTable = getSchema().createTable(tempName, id, newColumns, table.isPersistIndexes(), table.isPersistData(), false, Index.EMPTY_HEAD, session);
newTable.setComment(table.getComment());
StringBuffer buff = new StringBuffer();
buff.append(newTable.getCreateSQL());
......
......@@ -144,7 +144,7 @@ public class CreateTable extends SchemaCommand {
}
}
int id = getObjectId(true, true);
TableData table = getSchema().createTable(tableName, id, columns, persistIndexes, persistData, clustered, headPos);
TableData table = getSchema().createTable(tableName, id, columns, persistIndexes, persistData, clustered, headPos, session);
table.setComment(comment);
table.setTemporary(temporary);
table.setGlobalTemporary(globalTemporary);
......
......@@ -626,7 +626,7 @@ public class Database implements DataHandler {
if (pageStore != null) {
headPos = pageStore.getSystemTableHeadPos();
}
meta = mainSchema.createTable("SYS", 0, cols, persistent, persistent, false, headPos);
meta = mainSchema.createTable("SYS", 0, cols, persistent, persistent, false, headPos, systemSession);
IndexColumn[] pkCols = IndexColumn.wrap(new Column[] { columnId });
metaIdIndex = meta.addIndex(systemSession, "SYS_ID", 0, pkCols, IndexType.createPrimaryKey(
false, false), Index.EMPTY_HEAD, null);
......
......@@ -35,7 +35,7 @@ public class PageBtreeIndex extends BaseIndex {
private boolean needRebuild;
public PageBtreeIndex(TableData table, int id, String indexName, IndexColumn[] columns,
IndexType indexType, int headPos) throws SQLException {
IndexType indexType, int headPos, Session session) throws SQLException {
initBaseIndex(table, id, indexName, columns, indexType);
int test;
// trace.setLevel(TraceSystem.DEBUG);
......@@ -54,7 +54,7 @@ public class PageBtreeIndex extends BaseIndex {
this.headPos = headPos = store.allocatePage();
PageBtreeLeaf root = new PageBtreeLeaf(this, headPos, Page.ROOT, store.createDataPage());
store.updateRecord(root, true, root.data);
store.addMeta(this);
store.addMeta(this, session);
} else {
this.headPos = headPos;
PageBtree root = getPage(headPos);
......@@ -211,7 +211,7 @@ public class PageBtreeIndex extends BaseIndex {
if (trace.isDebugEnabled()) {
trace.debug("remove");
}
store.removeMeta(this);
store.removeMeta(this, session);
}
public void truncate(Session session) throws SQLException {
......
......@@ -37,7 +37,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
private int lastKey;
private long rowCount;
public PageScanIndex(TableData table, int id, IndexColumn[] columns, IndexType indexType, int headPos) throws SQLException {
public PageScanIndex(TableData table, int id, IndexColumn[] columns, IndexType indexType, int headPos, Session session) throws SQLException {
initBaseIndex(table, id, table.getName() + "_TABLE_SCAN", columns, indexType);
int test;
// trace.setLevel(TraceSystem.DEBUG);
......@@ -54,7 +54,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
if (headPos == Index.EMPTY_HEAD) {
// new table
this.headPos = headPos = store.allocatePage();
store.addMeta(this);
store.addMeta(this, session);
PageDataLeaf root = new PageDataLeaf(this, headPos, Page.ROOT, store.createDataPage());
store.updateRecord(root, true, root.data);
......@@ -227,7 +227,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
if (trace.isDebugEnabled()) {
trace.debug("remove");
}
store.removeMeta(this);
store.removeMeta(this, session);
}
public void truncate(Session session) throws SQLException {
......
......@@ -471,12 +471,12 @@ public class LogSystem {
return;
}
synchronized (database) {
if (closed) {
return;
}
if (pageStore != null) {
pageStore.commit(session);
}
if (closed) {
return;
}
currentLog.commit(session);
session.setAllCommitted();
}
......@@ -490,6 +490,9 @@ public class LogSystem {
return;
}
synchronized (database) {
if (pageStore != null) {
pageStore.flushLog();
}
if (closed) {
return;
}
......
......@@ -46,7 +46,7 @@ public class ResultTempTable implements ResultExternal {
columns.add(column);
int tableId = session.getDatabase().allocateObjectId(true, true);
String tableName = "TEMP_RESULT_SET_" + tableId;
table = schema.createTable(tableName, tableId, columns, false, true, false, Index.EMPTY_HEAD);
table = schema.createTable(tableName, tableId, columns, false, true, false, Index.EMPTY_HEAD, session);
int indexId = session.getDatabase().allocateObjectId(true, false);
IndexColumn indexColumn = new IndexColumn();
indexColumn.column = column;
......@@ -55,7 +55,7 @@ public class ResultTempTable implements ResultExternal {
indexType = IndexType.createPrimaryKey(true, false);
IndexColumn[] indexCols = new IndexColumn[]{indexColumn};
if (SysProperties.PAGE_STORE) {
index = new PageBtreeIndex(table, indexId, tableName, indexCols, indexType, Index.EMPTY_HEAD);
index = new PageBtreeIndex(table, indexId, tableName, indexCols, indexType, Index.EMPTY_HEAD, session);
} else {
index = new BtreeIndex(session, table, indexId, tableName, indexCols, indexType, Index.EMPTY_HEAD);
}
......
......@@ -476,9 +476,9 @@ public class Schema extends DbObjectBase {
* @param headPos the position (page number) of the head
* @return the created {@link TableData} object
*/
public TableData createTable(String tableName, int id, ObjectArray columns, boolean persistIndexes, boolean persistData, boolean clustered, int headPos)
public TableData createTable(String tableName, int id, ObjectArray columns, boolean persistIndexes, boolean persistData, boolean clustered, int headPos, Session session)
throws SQLException {
return new TableData(this, tableName, id, columns, persistIndexes, persistData, clustered, headPos);
return new TableData(this, tableName, id, columns, persistIndexes, persistData, clustered, headPos, session);
}
/**
......
......@@ -6,10 +6,10 @@
*/
package org.h2.store;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import org.h2.constant.ErrorCode;
import org.h2.index.Page;
import org.h2.message.Message;
import org.h2.message.Trace;
......@@ -19,8 +19,9 @@ import org.h2.message.Trace;
* The format is:
* <ul><li>0-3: parent page id
* </li><li>4-4: page type
* </li><li>5-8: the next page (if there is one) or length
* </li><li>9-remainder: data
* </li><li>5-5: stream id
* </li><li>6-9: the next page (if there is one) or length
* </li><li>10-remainder: data
* </li></ul>
*/
public class PageInputStream extends InputStream {
......@@ -29,6 +30,7 @@ public class PageInputStream extends InputStream {
private final Trace trace;
private int parentPage;
private int type;
private int streamId = -1;
private int nextPage;
private DataPage page;
private boolean endOfFile;
......@@ -92,31 +94,32 @@ public class PageInputStream extends InputStream {
page.reset();
try {
store.readPage(nextPage, page);
int p = page.readInt();
int t = page.readByte();
boolean last = (t & Page.FLAG_LAST) != 0;
t &= ~Page.FLAG_LAST;
if (type != t || p != parentPage) {
int todoNeedBetterWayToDetectEOF;
throw Message.getSQLException(
ErrorCode.FILE_CORRUPTED_1,
"page:" +nextPage+ " type:" + t + " parent:" + p +
" expected type:" + type + " expected parent:" + parentPage);
}
parentPage = nextPage;
if (last) {
nextPage = 0;
remaining = page.readInt();
} else {
nextPage = page.readInt();
remaining = store.getPageSize() - page.length();
}
if (trace.isDebugEnabled()) {
// trace.debug("pageIn.readPage " + parentPage + " next:" + nextPage);
}
} catch (SQLException e) {
throw Message.convertToIOException(e);
}
int p = page.readInt();
int t = page.readByte();
int id = page.readByte();
if (streamId == -1) {
// set the stream id on the first page
streamId = id;
}
boolean last = (t & Page.FLAG_LAST) != 0;
t &= ~Page.FLAG_LAST;
if (type != t || p != parentPage || id != streamId) {
throw new EOFException();
}
parentPage = nextPage;
if (last) {
nextPage = 0;
remaining = page.readInt();
} else {
nextPage = page.readInt();
remaining = store.getPageSize() - page.length();
}
if (trace.isDebugEnabled()) {
// trace.debug("pageIn.readPage " + parentPage + " next:" + nextPage);
}
}
}
......@@ -8,11 +8,12 @@ package org.h2.store;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.sql.SQLException;
import org.h2.engine.Database;
import org.h2.engine.Session;
import org.h2.index.Page;
import org.h2.log.LogSystem;
import org.h2.message.Message;
import org.h2.message.Trace;
import org.h2.result.Row;
......@@ -92,7 +93,7 @@ public class PageLog {
void openForWriting(int id) throws SQLException {
this.id = id;
trace.debug("log openForWriting " + id + " firstPage:" + firstPage);
pageOut = new PageOutputStream(store, 0, firstPage, Page.TYPE_LOG, true);
pageOut = new PageOutputStream(store, 0, firstPage, Page.TYPE_LOG, id, true);
out = new DataOutputStream(pageOut);
try {
out.writeInt(id);
......@@ -159,20 +160,26 @@ public class PageLog {
trace.debug("log redo " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row);
}
store.redo(tableId, row, x == ADD);
} else {
if (trace.isDebugEnabled()) {
trace.debug("log ignore s:" + sessionId + " " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row);
}
}
}
} else if (x == COMMIT) {
int sessionId = in.readInt();
if (trace.isDebugEnabled()) {
trace.debug("log commit " + sessionId + " id:" + id + " pos:" + pos);
}
if (undo) {
store.setLastCommitForSession(sessionId, id, pos);
}
}
}
} catch (Exception e) {
e.printStackTrace();
int todoOnlyIOExceptionAndSQLException;
int todoSomeExceptionAreOkSomeNot;
} catch (EOFException e) {
trace.debug("log recovery stopped: " + e.toString());
} catch (IOException e) {
throw Message.convertIOException(e, "recover");
}
}
......@@ -227,15 +234,15 @@ e.printStackTrace();
private void reservePages(int pageCount) throws SQLException {
int testIfRequired;
if (pageCount > reservedPages.length) {
reservedPages = new int[pageCount];
}
for (int i = 0; i < pageCount; i++) {
reservedPages[i] = store.allocatePage();
}
for (int i = 0; i < pageCount; i++) {
store.freePage(reservedPages[i], false, null);
}
// if (pageCount > reservedPages.length) {
// reservedPages = new int[pageCount];
// }
// for (int i = 0; i < pageCount; i++) {
// reservedPages[i] = store.allocatePage();
// }
// for (int i = 0; i < pageCount; i++) {
// store.freePage(reservedPages[i], false, null);
// }
}
/**
......@@ -245,11 +252,18 @@ e.printStackTrace();
*/
void commit(Session session) throws SQLException {
try {
trace.debug("log commit");
if (trace.isDebugEnabled()) {
trace.debug("log commit s:" + session.getId());
}
LogSystem log = store.getDatabase().getLog();
if (log == null) {
// database already closed
return;
}
reservePages(1);
out.write(COMMIT);
out.writeInt(session.getId());
if (store.getDatabase().getLog().getFlushOnEachCommit()) {
if (log.getFlushOnEachCommit()) {
flush();
}
} catch (IOException e) {
......@@ -268,7 +282,7 @@ e.printStackTrace();
void logAddOrRemoveRow(Session session, int tableId, Row row, boolean add) throws SQLException {
try {
if (trace.isDebugEnabled()) {
trace.debug("log " + (add?"+":"-") + " table:" + tableId +
trace.debug("log " + (add?"+":"-") + " s:" + session.getId() + " table:" + tableId +
" row:" + row);
}
int todoLogPosShouldBeLong;
......@@ -329,8 +343,6 @@ e.printStackTrace();
*/
void flush() throws SQLException {
try {
int todoUseLessSpace;
trace.debug("log flush");
out.flush();
} catch (IOException e) {
throw Message.convertIOException(e, null);
......
......@@ -29,6 +29,7 @@ public class PageOutputStream extends OutputStream {
private final boolean allocateAtEnd;
private byte[] buffer = new byte[1];
private boolean needFlush;
private final int streamId;
/**
* Create a new page output stream.
......@@ -37,14 +38,18 @@ public class PageOutputStream extends OutputStream {
* @param parentPage the parent page id
* @param headPage the first page
* @param type the page type
* @param streamId the stream identifier
* @param allocateAtEnd whether new pages should be allocated at the end of
* the file
*/
public PageOutputStream(PageStore store, int parentPage, int headPage, int type, boolean allocateAtEnd) {
public PageOutputStream(PageStore store, int parentPage, int headPage, int type, int streamId, boolean allocateAtEnd) {
this.trace = store.getTrace();
this.store = store;
this.parentPage = parentPage;
this.pageId = headPage;
this.type = type;
this.allocateAtEnd = allocateAtEnd;
this.streamId = streamId;
page = store.createDataPage();
initPage();
}
......@@ -62,6 +67,7 @@ public class PageOutputStream extends OutputStream {
page.reset();
page.writeInt(parentPage);
page.writeByte((byte) type);
page.writeByte((byte) streamId);
page.writeInt(0);
remaining = store.getPageSize() - page.length();
}
......@@ -81,6 +87,7 @@ public class PageOutputStream extends OutputStream {
}
page.setPos(4);
page.writeByte((byte) type);
page.writeByte((byte) streamId);
page.writeInt(nextPage);
storePage();
parentPage = pageId;
......@@ -108,6 +115,7 @@ public class PageOutputStream extends OutputStream {
int len = page.length();
page.setPos(4);
page.writeByte((byte) (type | Page.FLAG_LAST));
page.writeByte((byte) streamId);
page.writeInt(store.getPageSize() - remaining - 9);
page.setPos(len);
storePage();
......
......@@ -37,7 +37,6 @@ import org.h2.util.FileUtils;
import org.h2.util.ObjectArray;
import org.h2.util.ObjectUtils;
import org.h2.util.StringUtils;
import org.h2.value.CompareMode;
import org.h2.value.Value;
import org.h2.value.ValueInt;
import org.h2.value.ValueString;
......@@ -60,11 +59,10 @@ import org.h2.value.ValueString;
*/
public class PageStore implements CacheWriter {
// TODO check if PageLog.reservePages is required
// TODO PageDataLeaf and Node: support random delete/add
// TODO PageStore.openMetaIndex (add collation for indexes,
// desc columns support)
// TODO btree index with fixed size values doesn't need offset and so on
// TODO log block allocation
// TODO block compression: maybe http://en.wikipedia.org/wiki/LZJB
......@@ -406,8 +404,7 @@ public class PageStore implements CacheWriter {
}
public void flushLog() throws SQLException {
// TODO write log entries to increase Record.lastLog / lastPos
int todo;
getLog().flush();
}
public Trace getTrace() {
......@@ -794,7 +791,7 @@ public class PageStore implements CacheWriter {
void redo(int tableId, Row row, boolean add) throws SQLException {
if (tableId == META_TABLE_ID) {
if (add) {
addMeta(row);
addMeta(row, database.getSystemSession());
} else {
removeMeta(row);
}
......@@ -822,7 +819,7 @@ public class PageStore implements CacheWriter {
metaSchema = new Schema(database, 0, "", null, true);
int headPos = metaTableRootPageId;
metaTable = new TableData(metaSchema, "PAGE_INDEX",
META_TABLE_ID, cols, true, true, false, headPos);
META_TABLE_ID, cols, true, true, false, headPos, database.getSystemSession());
metaIndex = (PageScanIndex) metaTable.getScanIndex(
database.getSystemSession());
metaObjects = new HashMap();
......@@ -833,7 +830,7 @@ public class PageStore implements CacheWriter {
Cursor cursor = metaIndex.find(database.getSystemSession(), null, null);
while (cursor.next()) {
Row row = cursor.get();
addMeta(row);
addMeta(row, database.getSystemSession());
}
}
......@@ -846,7 +843,7 @@ public class PageStore implements CacheWriter {
}
}
private void addMeta(Row row) throws SQLException {
private void addMeta(Row row, Session session) throws SQLException {
int id = row.getValue(0).getInt();
int type = row.getValue(1).getInt();
int parent = row.getValue(2).getInt();
......@@ -864,8 +861,8 @@ public class PageStore implements CacheWriter {
Column col = new Column("C" + i, Value.INT);
columnArray.add(col);
}
TableData table = new TableData(metaSchema, "T" + id, id, columnArray, true, true, false, headPos);
meta = table.getScanIndex(database.getSystemSession());
TableData table = new TableData(metaSchema, "T" + id, id, columnArray, true, true, false, headPos, session);
meta = table.getScanIndex(session);
} else {
PageScanIndex p = (PageScanIndex) metaObjects.get(ObjectUtils.getInteger(parent));
if (p == null) {
......@@ -878,7 +875,7 @@ public class PageStore implements CacheWriter {
cols[i] = tableCols[Integer.parseInt(columns[i])];
}
IndexColumn[] indexColumns = IndexColumn.wrap(cols);
meta = table.addIndex(database.getSystemSession(), "I" + id, id, indexColumns, indexType, headPos, null);
meta = table.addIndex(session, "I" + id, id, indexColumns, indexType, headPos, null);
}
metaObjects.put(ObjectUtils.getInteger(id), meta);
}
......@@ -888,7 +885,7 @@ public class PageStore implements CacheWriter {
*
* @param index the index to add
*/
public void addMeta(Index index) throws SQLException {
public void addMeta(Index index, Session session) throws SQLException {
int type = index instanceof PageScanIndex ? META_TYPE_SCAN_INDEX : META_TYPE_BTREE_INDEX;
Column[] columns = index.getColumns();
String[] columnIndexes = new String[columns.length];
......@@ -896,10 +893,10 @@ public class PageStore implements CacheWriter {
columnIndexes[i] = String.valueOf(columns[i].getColumnId());
}
String columnList = StringUtils.arrayCombine(columnIndexes, ',');
addMeta(index.getId(), type, index.getTable().getId(), index.getHeadPos(), columnList);
addMeta(index.getId(), type, index.getTable().getId(), index.getHeadPos(), columnList, session);
}
private void addMeta(int id, int type, int parent, int headPos, String columnList) throws SQLException {
private void addMeta(int id, int type, int parent, int headPos, String columnList, Session session) throws SQLException {
Row row = metaTable.getTemplateRow();
row.setValue(0, ValueInt.get(id));
row.setValue(1, ValueInt.get(type));
......@@ -907,7 +904,7 @@ public class PageStore implements CacheWriter {
row.setValue(3, ValueInt.get(headPos));
row.setValue(4, ValueString.get(columnList));
row.setPos(id + 1);
metaIndex.add(database.getSystemSession(), row);
metaIndex.add(session, row);
int assertion;
metaIndex.getRow(database.getSystemSession(), row.getPos());
......@@ -918,8 +915,7 @@ metaIndex.getRow(database.getSystemSession(), row.getPos());
*
* @param index the index to remove
*/
public void removeMeta(Index index) throws SQLException {
Session session = database.getSystemSession();
public void removeMeta(Index index, Session session) throws SQLException {
Row row = metaIndex.getRow(session, index.getId() + 1);
metaIndex.remove(session, row);
}
......
......@@ -62,7 +62,7 @@ public class TableData extends Table implements RecordReader {
private boolean containsLargeObject;
public TableData(Schema schema, String tableName, int id, ObjectArray columns,
boolean persistIndexes, boolean persistData, boolean clustered, int headPos) throws SQLException {
boolean persistIndexes, boolean persistData, boolean clustered, int headPos, Session session) throws SQLException {
super(schema, id, tableName, persistIndexes, persistData);
Column[] cols = new Column[columns.size()];
columns.toArray(cols);
......@@ -70,7 +70,7 @@ public class TableData extends Table implements RecordReader {
this.clustered = clustered;
if (!clustered) {
if (SysProperties.PAGE_STORE && persistData && database.isPersistent()) {
scanIndex = new PageScanIndex(this, id, IndexColumn.wrap(cols), IndexType.createScan(persistData), headPos);
scanIndex = new PageScanIndex(this, id, IndexColumn.wrap(cols), IndexType.createScan(persistData), headPos, session);
} else {
scanIndex = new ScanIndex(this, id, IndexColumn.wrap(cols), IndexType.createScan(persistData));
}
......@@ -179,7 +179,7 @@ public class TableData extends Table implements RecordReader {
Index index;
if (isPersistIndexes() && indexType.getPersistent()) {
if (SysProperties.PAGE_STORE) {
index = new PageBtreeIndex(this, indexId, indexName, cols, indexType, headPos);
index = new PageBtreeIndex(this, indexId, indexName, cols, indexType, headPos, session);
} else {
index = new BtreeIndex(session, this, indexId, indexName, cols, indexType, headPos);
}
......
......@@ -1023,7 +1023,7 @@ public abstract class TestBase {
for (int i = 0; i < list1.size(); i++) {
String s = (String) list1.get(i);
if (!list2.remove(s)) {
fail("not found: " + s);
fail("only found in first: " + s);
}
}
assertEquals(list2.size(), 0);
......
......@@ -247,7 +247,7 @@ public class TestPageStore extends TestBase {
ObjectArray cols = new ObjectArray();
cols.add(new Column("ID", Value.INT));
schema = new Schema(db, 0, "", null, true);
table = new TableData(schema, "PAGE_INDEX", 1, cols, true, true, false, 100);
table = new TableData(schema, "PAGE_INDEX", 1, cols, true, true, false, 100, null);
index = (PageScanIndex) table.getScanIndex(
db.getSystemSession());
}
......@@ -257,7 +257,7 @@ public class TestPageStore extends TestBase {
cols.add(new Column("ID", Value.INT));
schema = new Schema(db, 0, "", null, true);
int id = db.allocateObjectId(true, true);
table = new TableData(schema, "BTREE_INDEX", id, cols, true, true, false, 100);
table = new TableData(schema, "BTREE_INDEX", id, cols, true, true, false, 100, null);
id = db.allocateObjectId(true, true);
table.addIndex(db.getSystemSession(), "BTREE", id,
IndexColumn.wrap(table.getColumns()),
......@@ -314,7 +314,7 @@ public class TestPageStore extends TestBase {
if (file) {
out = new BufferedOutputStream(new FileOutputStream(f), 4 * 1024);
} else {
out = new PageOutputStream(store, 0, head, Page.TYPE_LOG, false);
out = new PageOutputStream(store, 0, head, Page.TYPE_LOG, 0, false);
}
for (int i = 0; i < count; i++) {
out.write(buff);
......@@ -354,7 +354,7 @@ public class TestPageStore extends TestBase {
byte[] data = new byte[len];
random.nextBytes(data);
int head = store.allocatePage();
PageOutputStream out = new PageOutputStream(store, 0, head, Page.TYPE_LOG, false);
PageOutputStream out = new PageOutputStream(store, 0, head, Page.TYPE_LOG, 0, false);
for (int p = 0; p < len;) {
int l = len == 0 ? 0 : Math.min(len - p, random.nextInt(len / 10));
out.write(data, p, l);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论