提交 47c6a50f authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 80fcd4a5
......@@ -17,6 +17,7 @@ import java.util.zip.ZipOutputStream;
import org.h2.api.DatabaseEventListener;
import org.h2.command.Prepared;
import org.h2.constant.ErrorCode;
import org.h2.constant.SysProperties;
import org.h2.engine.Constants;
import org.h2.engine.Database;
import org.h2.engine.Session;
......@@ -27,6 +28,7 @@ import org.h2.message.Message;
import org.h2.result.LocalResult;
import org.h2.store.DiskFile;
import org.h2.store.FileLister;
import org.h2.store.PageStore;
import org.h2.util.FileUtils;
import org.h2.util.IOUtils;
import org.h2.util.ObjectArray;
......@@ -71,12 +73,16 @@ public class BackupCommand extends Prepared {
String fn = db.getName() + Constants.SUFFIX_DATA_FILE;
backupDiskFile(out, fn, db.getDataFile());
fn = db.getName() + Constants.SUFFIX_INDEX_FILE;
String base = FileUtils.getParent(fn);
backupDiskFile(out, fn, db.getIndexFile());
if (SysProperties.PAGE_STORE) {
fn = db.getName() + Constants.SUFFIX_PAGE_FILE;
backupPageStore(out, fn, db.getPageStore());
}
ObjectArray list = log.getActiveLogFiles();
int max = list.size();
// synchronize on the database, to avoid concurrent temp file
// creation / deletion / backup
String base = FileUtils.getParent(fn);
synchronized (db.getLobSyncObject()) {
for (int i = 0; i < list.size(); i++) {
LogFile lf = (LogFile) list.get(i);
......@@ -104,6 +110,22 @@ public class BackupCommand extends Prepared {
}
}
private void backupPageStore(ZipOutputStream out, String fileName, PageStore store) throws SQLException, IOException {
Database db = session.getDatabase();
fileName = FileUtils.getFileName(fileName);
out.putNextEntry(new ZipEntry(fileName));
int max = store.getPageCount();
int pos = 0;
while (true) {
pos = store.copyDirect(pos, out);
if (pos < 0) {
break;
}
db.setProgress(DatabaseEventListener.STATE_BACKUP_FILE, fileName, pos, max);
}
out.closeEntry();
}
private void backupDiskFile(ZipOutputStream out, String fileName, DiskFile file) throws SQLException, IOException {
Database db = session.getDatabase();
fileName = FileUtils.getFileName(fileName);
......
......@@ -182,6 +182,10 @@ class PageDataLeaf extends PageData {
* @return the row
*/
Row getRowAt(int at) throws SQLException {
int test;
if(rows == null) {
System.out.println("stop");
}
Row r = rows[at];
if (r == null) {
if (firstOverflowPageId != 0) {
......@@ -262,7 +266,7 @@ class PageDataLeaf extends PageData {
boolean remove(int key) throws SQLException {
int i = find(key);
if (keys[i] != key) {
throw Message.getSQLException(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1, index.getSQL());
throw Message.getSQLException(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1, index.getSQL() + ": " + key);
}
if (entryCount == 1) {
return true;
......
......@@ -137,20 +137,6 @@ class PageDataNode extends PageData {
}
}
private void removeChild(int x) {
int[] newKeys = new int[entryCount - 1];
int[] newChildPageIds = new int[entryCount];
System.arraycopy(keys, 0, newKeys, 0, x);
System.arraycopy(childPageIds, 0, newChildPageIds, 0, x + 1);
if (x < entryCount) {
System.arraycopy(keys, x + 1, newKeys, x, entryCount - x - 1);
System.arraycopy(childPageIds, x + 1, newChildPageIds, x, entryCount - x);
}
keys = newKeys;
childPageIds = newChildPageIds;
entryCount--;
}
/**
* Initialize the page.
*
......@@ -194,23 +180,6 @@ class PageDataNode extends PageData {
return index.getPage(child).getFirstLeaf();
}
private void removeRow(int i) throws SQLException {
entryCount--;
if (entryCount <= 0) {
Message.throwInternalError();
}
int[] newKeys = new int[entryCount];
int[] newChildPageIds = new int[entryCount + 1];
System.arraycopy(keys, 0, newKeys, 0, Math.min(entryCount, i));
System.arraycopy(childPageIds, 0, newChildPageIds, 0, i);
if (entryCount > i) {
System.arraycopy(keys, i + 1, newKeys, i, entryCount - i);
}
System.arraycopy(childPageIds, i + 1, newChildPageIds, i, entryCount - i + 1);
keys = newKeys;
childPageIds = newChildPageIds;
}
boolean remove(int key) throws SQLException {
int at = find(key);
// merge is not implemented to allow concurrent usage of btrees
......@@ -224,13 +193,11 @@ class PageDataNode extends PageData {
}
// this child is now empty
index.getPageStore().freePage(page.getPageId());
if (entryCount == 0) {
if (entryCount < 1) {
// no more children - this page is empty as well
// it can't be the root otherwise the index would have been
// truncated
return true;
}
removeRow(at);
removeChild(at);
index.getPageStore().updateRecord(this, data);
return false;
}
......@@ -288,4 +255,21 @@ class PageDataNode extends PageData {
index.getPageStore().writePage(getPos(), data);
}
private void removeChild(int i) throws SQLException {
entryCount--;
if (entryCount < 0) {
Message.throwInternalError();
}
int[] newKeys = new int[entryCount];
int[] newChildPageIds = new int[entryCount + 1];
System.arraycopy(keys, 0, newKeys, 0, Math.min(entryCount, i));
System.arraycopy(childPageIds, 0, newChildPageIds, 0, i);
if (entryCount > i) {
System.arraycopy(keys, i + 1, newKeys, i, entryCount - i);
}
System.arraycopy(childPageIds, i + 1, newChildPageIds, i, entryCount - i + 1);
keys = newKeys;
childPageIds = newChildPageIds;
}
}
......@@ -41,6 +41,8 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
// TODO file position, content checksums
// TODO completely re-use keys of deleted rows
// TODO remove Database.objectIds
// TODO detect circles in linked lists
// (input stream, free list, extend pages...)
private int lastKey;
private long rowCount;
......@@ -48,6 +50,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
public PageScanIndex(TableData table, int id, IndexColumn[] columns, IndexType indexType, int headPos) throws SQLException {
initBaseIndex(table, id, table.getName() + "_TABLE_SCAN", columns, indexType);
// trace.setLevel(TraceSystem.DEBUG);
if (database.isMultiVersion()) {
int todoMvcc;
}
......@@ -72,7 +75,9 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
int reuseKeysIfManyDeleted;
}
this.headPos = headPos;
trace("open " + rowCount);
if (trace.isDebugEnabled()) {
trace.debug("open " + rowCount);
}
table.setRowCount(rowCount);
}
......@@ -82,14 +87,18 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
public void add(Session session, Row row) throws SQLException {
row.setPos(++lastKey);
trace("add " + row.getPos());
if (trace.isDebugEnabled()) {
trace.debug("add " + row.getPos());
}
while (true) {
PageData root = getPage(headPos);
int splitPoint = root.addRow(row);
if (splitPoint == 0) {
break;
}
trace("split " + splitPoint);
if (trace.isDebugEnabled()) {
trace.debug("split " + splitPoint);
}
int pivot = root.getKey(splitPoint - 1);
PageData page1 = root;
PageData page2 = root.split(splitPoint);
......@@ -147,7 +156,9 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
}
public void close(Session session) throws SQLException {
trace("close");
if (trace.isDebugEnabled()) {
trace.debug("close");
}
int writeRowCount;
}
......@@ -170,10 +181,13 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
}
public void remove(Session session, Row row) throws SQLException {
trace("remove " + row.getPos());
if (trace.isDebugEnabled()) {
trace.debug("remove " + row.getPos());
}
int invalidateRowCount;
// setChanged(session);
if (rowCount == 1) {
int todoMaybeImprove;
truncate(session);
} else {
int key = row.getPos();
......@@ -189,12 +203,16 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
}
public void remove(Session session) throws SQLException {
trace("remove");
if (trace.isDebugEnabled()) {
trace.debug("remove");
}
int todo;
}
public void truncate(Session session) throws SQLException {
trace("truncate");
if (trace.isDebugEnabled()) {
trace.debug("truncate");
}
store.removeRecord(headPos);
int todoLogOldData;
int freePages;
......@@ -239,13 +257,6 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
return null;
}
private void trace(String message) {
int slowEvenIfNotEnabled;
if (headPos != 1) {
// System.out.println(message);
}
}
public int getColumnIndex(Column col) {
// the scan index cannot use any columns
// TODO it can if there is an INT primary key
......
......@@ -94,6 +94,11 @@ public class Trace {
*/
public static final String USER = "user";
/**
* The trace module name for the page store.
*/
public static final String PAGE_STORE = "pageStore";
private TraceWriter traceWriter;
private String module;
private String lineSeparator;
......
......@@ -216,7 +216,9 @@ public class TraceSystem implements TraceWriter {
}
public void write(int level, String module, String s, Throwable t) {
if (level <= levelSystemOut) {
if (level <= levelSystemOut || level > this.level) {
// level <= levelSystemOut: the system out level is set higher
// level > this.level: the level for this module is set higher
System.out.println(format(module, s));
if (t != null && levelSystemOut == DEBUG) {
t.printStackTrace();
......
......@@ -9,6 +9,7 @@ package org.h2.store;
import java.sql.SQLException;
import java.util.ArrayList;
import org.h2.constant.SysProperties;
import org.h2.engine.Constants;
import org.h2.util.FileUtils;
......@@ -29,8 +30,14 @@ public class FileLister {
* @return the database name or null
*/
public static String getDatabaseNameFromFileName(String fileName) {
if (fileName.endsWith(Constants.SUFFIX_DATA_FILE)) {
return fileName.substring(0, fileName.length() - Constants.SUFFIX_DATA_FILE.length());
if (SysProperties.PAGE_STORE) {
if (fileName.endsWith(Constants.SUFFIX_PAGE_FILE)) {
return fileName.substring(0, fileName.length() - Constants.SUFFIX_PAGE_FILE.length());
}
} else {
if (fileName.endsWith(Constants.SUFFIX_DATA_FILE)) {
return fileName.substring(0, fileName.length() - Constants.SUFFIX_DATA_FILE.length());
}
}
return null;
}
......
......@@ -64,9 +64,6 @@ public class PageInputStream extends InputStream {
off += r;
len -= r;
}
int test;
if(read==0)
System.out.println("stop");
return read == 0 ? -1 : read;
}
......@@ -110,8 +107,6 @@ System.out.println("stop");
nextPage = page.readInt();
remaining = store.getPageSize() - page.length();
}
int test;
System.out.println(" pageIn.read " + page + " next:" + nextPage);
} catch (SQLException e) {
throw Message.convertToIOException(e);
}
......
......@@ -56,7 +56,7 @@ public class PageLog {
* must be run first.
*/
void openForWriting() {
trace.debug("openForWriting");
trace.debug("log openForWriting");
pageOut = new PageOutputStream(store, 0, firstPage, Page.TYPE_LOG);
out = new DataOutputStream(pageOut);
}
......@@ -69,7 +69,7 @@ public class PageLog {
* @param undo true if the undo step should be run
*/
public void recover(boolean undo) throws SQLException {
trace.debug("recover");
trace.debug("log recover");
DataInputStream in = new DataInputStream(new PageInputStream(store, 0, firstPage, Page.TYPE_LOG));
DataPage data = store.createDataPage();
try {
......@@ -85,6 +85,9 @@ public class PageLog {
int pageId = in.readInt();
in.read(data.getBytes(), 0, store.getPageSize());
if (undo) {
if (trace.isDebugEnabled()) {
trace.debug("log write " + pageId);
}
store.writePage(pageId, data);
}
} else if (x == ADD || x == REMOVE) {
......@@ -93,6 +96,9 @@ public class PageLog {
Row row = readRow(in);
Database db = store.getDatabase();
if (!undo) {
if (trace.isDebugEnabled()) {
trace.debug("log redo " + (x == ADD ? "+" : "-") + " " + row);
}
db.redo(tableId, row, x == ADD);
}
} else if (x == COMMIT) {
......@@ -102,15 +108,16 @@ public class PageLog {
} catch (Exception e) {
int todoOnlyIOExceptionAndSQLException;
int todoSomeExceptionAreOkSomeNot;
//e.printStackTrace();
trace.debug("recovery stopped: " + e.toString());
trace.debug("log recovery stopped: " + e.toString());
} finally {
recoveryRunning = false;
}
trace.debug("log recover done");
int todoDeleteAfterRecovering;
}
private Row readRow(DataInputStream in) throws IOException, SQLException {
int pos = in.readInt();
int len = in.readInt();
data.reset();
data.checkCapacity(len);
......@@ -122,6 +129,7 @@ public class PageLog {
}
int todoTableDataReadRowWithMemory;
Row row = new Row(values, 0);
row.setPos(pos);
return row;
}
......@@ -153,7 +161,7 @@ public class PageLog {
*/
public void commit(Session session) throws SQLException {
try {
trace.debug("commit");
trace.debug("log commit");
out.write(COMMIT);
out.writeInt(session.getId());
} catch (IOException e) {
......@@ -175,13 +183,15 @@ public class PageLog {
return;
}
if (trace.isDebugEnabled()) {
trace.debug((add?"+":"-") + " table:" + tableId +
trace.debug("log " + (add?"+":"-") + " table:" + tableId +
" remaining:" + pageOut.getRemainingBytes() + " row:" + row);
}
out.write(add ? ADD : REMOVE);
out.writeInt(session.getId());
out.writeInt(tableId);
out.writeInt(row.getPos());
data.reset();
int todoWriteIntoOutputDirectly;
row.write(data);
out.writeInt(data.length());
out.write(data.getBytes(), 0, data.length());
......@@ -195,8 +205,10 @@ public class PageLog {
*/
void reopen() throws SQLException {
try {
trace.debug("log reopen");
out.close();
openForWriting();
flush();
int todoDeleteOrReUsePages;
} catch (IOException e) {
throw Message.convertIOException(e, null);
......@@ -208,12 +220,13 @@ public class PageLog {
*/
public void flush() throws SQLException {
try {
trace.debug("flush");
trace.debug("log flush");
out.flush();
int filler = pageOut.getRemainingBytes();
for (int i = 0; i < filler; i++) {
out.writeByte(NO_OP);
}
out.flush();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
......@@ -222,13 +235,13 @@ public class PageLog {
/**
* Flush and close the log.
*/
public void close() throws SQLException {
try {
trace.debug("close");
out.close();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
// public void close() throws SQLException {
// try {
// trace.debug("log close");
// out.close();
// } catch (IOException e) {
// throw Message.convertIOException(e, null);
// }
// }
}
......@@ -11,12 +11,14 @@ import java.io.OutputStream;
import java.sql.SQLException;
import org.h2.index.Page;
import org.h2.message.Message;
import org.h2.message.Trace;
/**
* An output stream that writes into a page store.
*/
public class PageOutputStream extends OutputStream {
private final Trace trace;
private PageStore store;
private int parentPage;
private int type;
......@@ -34,6 +36,7 @@ public class PageOutputStream extends OutputStream {
* @param type the page type
*/
public PageOutputStream(PageStore store, int parentPage, int headPage, int type) {
this.trace = store.getTrace();
this.store = store;
this.parentPage = parentPage;
this.nextPage = headPage;
......@@ -43,6 +46,7 @@ public class PageOutputStream extends OutputStream {
}
public void write(int b) throws IOException {
int todoOptimizeIfNeeded;
write(new byte[] { (byte) b });
}
......@@ -62,7 +66,7 @@ public class PageOutputStream extends OutputStream {
if (len <= 0) {
return;
}
while (len > remaining) {
while (len >= remaining) {
page.write(b, off, remaining);
off += remaining;
len -= remaining;
......@@ -83,8 +87,9 @@ public class PageOutputStream extends OutputStream {
private void storePage() throws IOException {
try {
int test;
System.out.println(" pageOut.storePage " + pageId + " next:" + nextPage);
if (trace.isDebugEnabled()) {
trace.debug("pageOut.storePage " + pageId + " next:" + nextPage);
}
store.writePage(pageId, page);
} catch (SQLException e) {
throw Message.convertToIOException(e);
......@@ -113,33 +118,4 @@ System.out.println(" pageOut.storePage " + pageId + " next:" + nextPage);
return remaining;
}
// public void write(byte[] buff, int off, int len) throws IOException {
// if (len > 0) {
// try {
// page.reset();
// if (compress != null) {
// if (off != 0 || len != buff.length) {
// byte[] b2 = new byte[len];
// System.arraycopy(buff, off, b2, 0, len);
// buff = b2;
// off = 0;
// }
// int uncompressed = len;
// buff = compress.compress(buff, compressionAlgorithm);
// len = buff.length;
// page.writeInt(len);
// page.writeInt(uncompressed);
// page.write(buff, off, len);
// } else {
// page.writeInt(len);
// page.write(buff, off, len);
// }
// page.fillAligned();
// store.write(page.getBytes(), 0, page.length());
// } catch (SQLException e) {
// throw Message.convertToIOException(e);
// }
// }
// }
}
......@@ -7,11 +7,14 @@
package org.h2.store;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
import org.h2.constant.ErrorCode;
import org.h2.engine.Database;
import org.h2.index.Page;
import org.h2.message.Message;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.util.Cache;
import org.h2.util.Cache2Q;
import org.h2.util.CacheLRU;
......@@ -45,6 +48,7 @@ public class PageStore implements CacheWriter {
private static final int WRITE_VERSION = 0;
private Database database;
private final Trace trace;
private String fileName;
private FileStore file;
private String accessMode;
......@@ -99,6 +103,9 @@ public class PageStore implements CacheWriter {
*/
public PageStore(Database database, String fileName, String accessMode, int cacheSizeDefault) {
this.database = database;
trace = database.getTrace(Trace.PAGE_STORE);
int test;
trace.setLevel(TraceSystem.DEBUG);
this.fileName = fileName;
this.accessMode = accessMode;
this.cacheSize = cacheSizeDefault;
......@@ -110,6 +117,30 @@ public class PageStore implements CacheWriter {
}
}
/**
* Copy the next page to the output stream.
*
* @param pageId the page to copy
* @param out the output stream
* @return the new position, or -1 if there is no more data to copy
*/
public int copyDirect(int pageId, OutputStream out) throws SQLException {
synchronized (database) {
byte[] buffer = new byte[pageSize];
try {
if (pageId >= pageCount) {
return -1;
}
file.seek(pageId * pageSize);
file.readFullyDirect(buffer, 0, pageSize);
out.write(buffer, 0, pageSize);
return pageId + 1;
} catch (IOException e) {
throw Message.convertIOException(e, fileName);
}
}
}
/**
* Open the file and read the header.
*/
......@@ -121,6 +152,16 @@ public class PageStore implements CacheWriter {
fileLength = file.length();
pageCount = (int) (fileLength / pageSize);
log = new PageLog(this, logRootPageId);
lastUsedPage = pageCount - 1;
while (true) {
DataPage page = readPage(lastUsedPage);
page.readInt();
int type = page.readByte();
if (type != Page.TYPE_EMPTY) {
break;
}
lastUsedPage--;
}
} else {
isNew = true;
setPageSize(PAGE_SIZE_DEFAULT);
......@@ -147,7 +188,7 @@ public class PageStore implements CacheWriter {
* Flush all pending changes to disk, and re-open the log file.
*/
public void checkpoint() throws SQLException {
System.out.println("PageStore.checkpoint");
trace.debug("checkpoint");
synchronized (database) {
database.checkPowerOff();
ObjectArray list = cache.getAllChanged();
......@@ -156,9 +197,12 @@ System.out.println("PageStore.checkpoint");
Record rec = (Record) list.get(i);
writeBack(rec);
}
int todoFlushBeforeReopen;
log.reopen();
int todoWriteDeletedPages;
}
pageCount = lastUsedPage + 1;
file.setLength(pageSize * pageCount);
}
private void readHeader() throws SQLException {
......@@ -240,15 +284,15 @@ System.out.println("PageStore.checkpoint");
}
/**
* Close the file without flushing the cache.
* Close the file without writing anything.
*/
public void close() throws SQLException {
int todoTruncateLog;
try {
trace.debug("close");
if (file != null) {
log.close();
file.close();
}
file = null;
} catch (IOException e) {
throw Message.convertIOException(e, "close");
}
......@@ -259,14 +303,15 @@ System.out.println("PageStore.checkpoint");
}
public Trace getTrace() {
return database.getTrace(Trace.DATABASE);
return trace;
}
public void writeBack(CacheObject obj) throws SQLException {
synchronized (database) {
Record record = (Record) obj;
int test;
System.out.println("writeBack " + record.getPos() + ":" + record);
if (trace.isDebugEnabled()) {
trace.debug("writeBack " + record.getPos() + ":" + record);
}
int todoRemoveParameter;
record.write(null);
record.setChanged(false);
......@@ -281,6 +326,9 @@ System.out.println("writeBack " + record.getPos() + ":" + record);
*/
public void updateRecord(Record record, DataPage old) throws SQLException {
int todoLogHeaderPageAsWell;
if (trace.isDebugEnabled()) {
trace.debug("updateRecord " + record.getPos() + " " + record.toString());
}
synchronized (database) {
record.setChanged(true);
int pos = record.getPos();
......@@ -331,6 +379,9 @@ System.out.println("writeBack " + record.getPos() + ":" + record);
* @param pageId the page id
*/
public void freePage(int pageId) throws SQLException {
if (trace.isDebugEnabled()) {
trace.debug("freePage " + pageId);
}
freePageCount++;
PageFreeList free;
cache.remove(pageId);
......
......@@ -173,7 +173,7 @@ public class Restore extends Tool {
boolean copy = false;
if (db == null) {
copy = true;
} else if (fileName.startsWith(originalDbName)) {
} else if (fileName.startsWith(originalDbName + ".")) {
fileName = db + fileName.substring(originalDbName.length());
copy = true;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论