提交 f5669d6a authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 292df4ab
......@@ -18,7 +18,11 @@ Change Log
<h1>Change Log</h1>
<h2>Next Version (unreleased)</h2>
<ul><li>Views with IN(..) that used a view itself did not work.
<ul><li>Linked tables: a workaround for Oracle DATE columns has been implemented.
</li><li>DatabaseMetaData.getPrimaryKeys: The column PK_NAME now contains the
constraint name instead of the index name (compatiblity for PostgreSQL and Derby).
</li><li>Using IN(..) inside a IN(SELECT..) did not always work.
</li><li>Views with IN(..) that used a view itself did not work.
</li><li>Union queries with LIMIT or ORDER BY that are used in a view or subquery did not work.
</li><li>The license change a bit: so far the license was modified to say
'Swiss law'. This is now changed back to the original 'US law'.
......
......@@ -99,6 +99,8 @@ public class Database implements DataHandler {
private final HashMap userDataTypes = new HashMap();
private final HashMap aggregates = new HashMap();
private final HashMap comments = new HashMap();
private IntHashMap tableMap = new IntHashMap();
private final Set userSessions = Collections.synchronizedSet(new HashSet());
private Session exclusiveSession;
private final BitField objectIds = new BitField();
......@@ -452,7 +454,10 @@ public class Database implements DataHandler {
private synchronized void open(int traceLevelFile, int traceLevelSystemOut) throws SQLException {
if (persistent) {
if (SysProperties.PAGE_STORE) {
getPageStore();
PageStore store = getPageStore();
if (!store.isNew()) {
store.getLog().recover(true);
}
}
String dataFileName = databaseName + Constants.SUFFIX_DATA_FILE;
if (FileUtils.exists(dataFileName)) {
......@@ -540,6 +545,7 @@ public class Database implements DataHandler {
headPos = pageStore.getSystemRootPageId();
}
meta = mainSchema.createTable("SYS", 0, cols, persistent, false, headPos);
tableMap.put(0, meta);
IndexColumn[] pkCols = IndexColumn.wrap(new Column[] { columnId });
metaIdIndex = meta.addIndex(systemSession, "SYS_ID", 0, pkCols, IndexType.createPrimaryKey(
false, false), Index.EMPTY_HEAD, null);
......@@ -563,6 +569,12 @@ public class Database implements DataHandler {
MetaRecord rec = (MetaRecord) records.get(i);
rec.execute(this, systemSession, eventListener);
}
if (SysProperties.PAGE_STORE) {
PageStore store = getPageStore();
if (!store.isNew()) {
getPageStore().getLog().recover(false);
}
}
// try to recompile the views that are invalid
recompileInvalidViews(systemSession);
starting = false;
......@@ -793,6 +805,9 @@ public class Database implements DataHandler {
if (id > 0 && !starting) {
addMeta(session, obj);
}
if (obj instanceof TableData) {
tableMap.put(id, obj);
}
}
/**
......@@ -1146,6 +1161,7 @@ public class Database implements DataHandler {
fileIndex = null;
}
if (pageStore != null) {
pageStore.checkpoint();
pageStore.close();
pageStore = null;
}
......@@ -1579,6 +1595,9 @@ public class Database implements DataHandler {
int id = obj.getId();
obj.removeChildrenAndResources(session);
removeMeta(session, id);
if (obj instanceof TableData) {
tableMap.remove(id);
}
}
/**
......@@ -2082,4 +2101,27 @@ public class Database implements DataHandler {
return pageStore;
}
/**
* Redo a change in a table.
*
* @param tableId the object id of the table
* @param row the row
* @param add true if the record is added, false if deleted
*/
public void redo(int tableId, Row row, boolean add) throws SQLException {
TableData table = (TableData) tableMap.get(tableId);
if (add) {
table.addRow(systemSession, row);
} else {
table.removeRow(systemSession, row);
}
if (tableId == 0) {
MetaRecord m = new MetaRecord(row);
if (add) {
objectIds.set(m.getId());
m.execute(this, systemSession, eventListener);
}
}
}
}
......@@ -40,6 +40,8 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
// TODO use an undo log and maybe redo log (for performance)
// TODO file position, content checksums
// TODO completely re-use keys of deleted rows
// TODO remove Database.objectIds
private int lastKey;
private long rowCount;
private long rowCountApproximation;
......@@ -104,8 +106,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
root = newRoot;
}
rowCount++;
int todo;
// store.getLog().addRow(headPos, row);
store.getLog().addOrRemoveRow(session, tableData.getId(), row, true);
}
/**
......@@ -184,6 +185,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
// lastKey--;
// }
}
store.getLog().addOrRemoveRow(session, tableData.getId(), row, false);
}
public void remove(Session session) throws SQLException {
......@@ -238,7 +240,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
}
private void trace(String message) {
int todoSometimeSlow;
int slowEvenIfNotEnabled;
if (headPos != 1) {
// System.out.println(message);
}
......
......@@ -10,10 +10,14 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.sql.SQLException;
import org.h2.engine.Database;
import org.h2.engine.MetaRecord;
import org.h2.engine.Session;
import org.h2.index.Page;
import org.h2.message.Message;
import org.h2.result.Row;
import org.h2.util.BitField;
import org.h2.value.Value;
/**
* Transaction log mechanism.
......@@ -27,14 +31,20 @@ public class PageLog {
private static final int UNDO = 0;
private static final int COMMIT = 1;
private static final int ADD = 2;
private static final int REMOVE = 3;
private PageStore store;
private BitField undo = new BitField();
private DataOutputStream out;
private int firstPage;
private DataPage data;
private boolean recoveryRunning;
PageLog(PageStore store, int firstPage) {
this.store = store;
this.firstPage = firstPage;
data = store.createDataPage();
}
/**
......@@ -46,12 +56,18 @@ public class PageLog {
}
/**
* Run the recovery process. Uncommitted transactions are rolled back.
* Run the recovery process. There are two recovery stages:
* first only the undo steps are run (restoring the state before the last checkpoint).
* In the second stage the committed operations are re-applied.
*
* @param undo true if the undo step should be run
*/
public void recover() throws SQLException {
public void recover(boolean undo) throws SQLException {
System.out.println("=recover= " + undo);
DataInputStream in = new DataInputStream(new PageInputStream(store, 0, firstPage, Page.TYPE_LOG));
DataPage data = store.createDataPage();
try {
recoveryRunning = true;
while (true) {
int x = in.read();
if (x < 0) {
......@@ -62,16 +78,48 @@ public class PageLog {
int test;
System.out.println("redo " + pageId);
in.read(data.getBytes(), 0, store.getPageSize());
store.writePage(pageId, data);
if (undo) {
store.writePage(pageId, data);
}
} else if (x == ADD || x == REMOVE) {
int sessionId = in.readInt();
int tableId = in.readInt();
Row row = readRow(in);
System.out.println((x == ADD ? " add" : " remove") + (" " + tableId + " " + row));
Database db = store.getDatabase();
if (!undo) {
db.redo(tableId, row, x == ADD);
}
} else if (x == COMMIT) {
}
}
} catch (IOException e) {
int todoSomeExceptionAreOkSomeNot;
e.printStackTrace();
System.out.println("recovery stopped: " + e.toString());
// throw Message.convertIOException(e, "recovering");
} finally {
recoveryRunning = false;
}
int todoDeleteAfterRecovering;
}
private Row readRow(DataInputStream in) throws IOException, SQLException {
int len = in.readInt();
data.reset();
data.checkCapacity(len);
in.read(data.getBytes(), 0, len);
int columnCount = data.readInt();
Value[] values = new Value[columnCount];
for (int i = 0; i < columnCount; i++) {
values[i] = data.readValue();
}
int todoTableDatareadRowWithMemory;
Row row = new Row(values, 0);
return row;
}
/**
* Add an undo entry to the log. The page data is only written once until
* the next checkpoint.
......@@ -102,6 +150,8 @@ System.out.println("undo " + pageId);
*/
public void commit(Session session) throws SQLException {
try {
int test;
System.out.println("commit");
out.write(COMMIT);
out.writeInt(session.getId());
} catch (IOException e) {
......@@ -109,4 +159,39 @@ System.out.println("undo " + pageId);
}
}
/**
* A record is added to a table, or removed from a table.
*
* @param headPos the head position of the table
* @param row the row to add
*/
public void addOrRemoveRow(Session session, int tableId, Row row, boolean add) throws SQLException {
try {
if (recoveryRunning) {
return;
}
int test;
System.out.println(" " + (add?"+":"-") + " tab:" + tableId + " " + row);
out.write(add ? ADD : REMOVE);
out.writeInt(session.getId());
out.writeInt(tableId);
data.reset();
row.write(data);
out.writeInt(data.length());
out.write(data.getBytes(), 0, data.length());
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
void reopen() throws SQLException {
try {
out.close();
openForWriting();
int todoDeleteOrReUsePages;
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
}
......@@ -121,7 +121,6 @@ public class PageStore implements CacheWriter {
fileLength = file.length();
pageCount = (int) (fileLength / pageSize);
log = new PageLog(this, logRootPageId);
log.recover();
} else {
isNew = true;
setPageSize(PAGE_SIZE_DEFAULT);
......@@ -145,18 +144,19 @@ public class PageStore implements CacheWriter {
}
/**
* Flush all pending changes to disk.
* Flush all pending changes to disk, and re-open the log file.
*/
public void flush() throws SQLException {
public void checkpoint() throws SQLException {
System.out.println("PageStore.checkpoint");
synchronized (database) {
database.checkPowerOff();
writeHeader();
ObjectArray list = cache.getAllChanged();
CacheObject.sort(list);
for (int i = 0; i < list.size(); i++) {
Record rec = (Record) list.get(i);
writeBack(rec);
}
log.reopen();
int todoWriteDeletedPages;
}
}
......@@ -240,12 +240,11 @@ public class PageStore implements CacheWriter {
}
/**
* Close the file.
* Close the file without flushing the cache.
*/
public void close() throws SQLException {
int todoTruncateLog;
try {
flush();
if (file != null) {
file.close();
}
......@@ -265,6 +264,9 @@ public class PageStore implements CacheWriter {
public void writeBack(CacheObject obj) throws SQLException {
synchronized (database) {
Record record = (Record) obj;
int test;
System.out.println("writeBack " + record);
int todoRemoveParameter;
record.write(null);
record.setChanged(false);
}
......@@ -448,4 +450,8 @@ public class PageStore implements CacheWriter {
return log;
}
Database getDatabase() {
return database;
}
}
......@@ -278,12 +278,16 @@ java org.h2.test.TestAll timer
System.setProperty("h2.check2", "true");
/*
drop table test;
create table test(id int primary key) as select 1;
@LOOP 10 select * from test t where t.id in (select t2.id from test t2 where t2.id in (?, ?));
remove google analytics
JCR: for each node type, create a table; one 'dynamic' table with parameter;
option to cache the results
<link rel="icon" type="image/png" href="/path/image.png">
create a short one page documentation
update roadmap (specially priority 1)
checksum: no need to checksum all data; every 128th byte is enough;
but need position+counter
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论