提交 a67837cb authored 作者: Thomas Mueller's avatar Thomas Mueller

New experimental page store.

上级 9d9ca196
...@@ -2366,4 +2366,13 @@ public class Database implements DataHandler { ...@@ -2366,4 +2366,13 @@ public class Database implements DataHandler {
return true; return true;
} }
/**
* Switch the database to read-only mode.
*
* @param readOnly the new value
*/
public void setReadOnly(boolean readOnly) {
this.readOnly = readOnly;
}
} }
...@@ -370,4 +370,8 @@ public class PageBtreeIndex extends BaseIndex { ...@@ -370,4 +370,8 @@ public class PageBtreeIndex extends BaseIndex {
return rowsize; return rowsize;
} }
public boolean canFindNext() {
return true;
}
} }
...@@ -8,6 +8,7 @@ package org.h2.index; ...@@ -8,6 +8,7 @@ package org.h2.index;
import java.sql.SQLException; import java.sql.SQLException;
import org.h2.engine.Session;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.store.DataPage; import org.h2.store.DataPage;
import org.h2.store.Record; import org.h2.store.Record;
...@@ -107,9 +108,10 @@ abstract class PageData extends Record { ...@@ -107,9 +108,10 @@ abstract class PageData extends Record {
/** /**
* Get a cursor. * Get a cursor.
* *
* @param session the session
* @return the cursor * @return the cursor
*/ */
abstract Cursor find() throws SQLException; abstract Cursor find(Session session) throws SQLException;
/** /**
* Get the key at this position. * Get the key at this position.
......
...@@ -8,6 +8,7 @@ package org.h2.index; ...@@ -8,6 +8,7 @@ package org.h2.index;
import java.sql.SQLException; import java.sql.SQLException;
import org.h2.constant.ErrorCode; import org.h2.constant.ErrorCode;
import org.h2.engine.Session;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.store.DataPage; import org.h2.store.DataPage;
...@@ -193,8 +194,8 @@ class PageDataLeaf extends PageData { ...@@ -193,8 +194,8 @@ class PageDataLeaf extends PageData {
rows = newRows; rows = newRows;
} }
Cursor find() { Cursor find(Session session) {
return new PageScanCursor(this, 0); return new PageScanCursor(session, this, 0, index.isMultiVersion);
} }
/** /**
......
...@@ -8,6 +8,7 @@ package org.h2.index; ...@@ -8,6 +8,7 @@ package org.h2.index;
import java.sql.SQLException; import java.sql.SQLException;
import org.h2.engine.Session;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.store.DataPage; import org.h2.store.DataPage;
...@@ -110,9 +111,9 @@ class PageDataNode extends PageData { ...@@ -110,9 +111,9 @@ class PageDataNode extends PageData {
} }
} }
Cursor find() throws SQLException { Cursor find(Session session) throws SQLException {
int child = childPageIds[0]; int child = childPageIds[0];
return index.getPage(child).find(); return index.getPage(child).find(session);
} }
PageData split(int splitPoint) throws SQLException { PageData split(int splitPoint) throws SQLException {
......
...@@ -7,7 +7,10 @@ ...@@ -7,7 +7,10 @@
package org.h2.index; package org.h2.index;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Iterator;
import org.h2.engine.Session;
import org.h2.message.Message;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.result.SearchRow; import org.h2.result.SearchRow;
...@@ -17,12 +20,20 @@ import org.h2.result.SearchRow; ...@@ -17,12 +20,20 @@ import org.h2.result.SearchRow;
class PageScanCursor implements Cursor { class PageScanCursor implements Cursor {
private PageDataLeaf current; private PageDataLeaf current;
private int index; private int idx;
private Row row; private Row row;
private final boolean multiVersion;
private final Session session;
private Iterator<Row> delta;
PageScanCursor(PageDataLeaf current, int index) { PageScanCursor(Session session, PageDataLeaf current, int idx, boolean multiVersion) {
this.current = current; this.current = current;
this.index = index; this.idx = idx;
this.multiVersion = multiVersion;
this.session = session;
if (multiVersion) {
delta = current.index.getDelta();
}
} }
public Row get() { public Row get() {
...@@ -38,22 +49,47 @@ class PageScanCursor implements Cursor { ...@@ -38,22 +49,47 @@ class PageScanCursor implements Cursor {
} }
public boolean next() throws SQLException { public boolean next() throws SQLException {
if (index >= current.getEntryCount()) { if (!multiVersion) {
return nextRow();
}
while (true) {
if (delta != null) {
if (!delta.hasNext()) {
delta = null;
row = null;
continue;
}
row = delta.next();
if (!row.getDeleted() || row.getSessionId() == session.getId()) {
continue;
}
} else {
nextRow();
if (row != null && row.getSessionId() != 0 && row.getSessionId() != session.getId()) {
continue;
}
}
break;
}
return row != null;
}
private boolean nextRow() throws SQLException {
if (idx >= current.getEntryCount()) {
current = current.getNextPage(); current = current.getNextPage();
index = 0; idx = 0;
if (current == null) { if (current == null) {
row = null;
return false; return false;
} }
} }
row = current.getRowAt(index); row = current.getRowAt(idx);
index++; idx++;
return true; return true;
} }
public boolean previous() { public boolean previous() {
index--; throw Message.throwInternalError();
int todo;
return true;
} }
} }
...@@ -7,10 +7,16 @@ ...@@ -7,10 +7,16 @@
package org.h2.index; package org.h2.index;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.h2.constant.ErrorCode; import org.h2.constant.ErrorCode;
import org.h2.engine.Constants; import org.h2.engine.Constants;
import org.h2.engine.Session; import org.h2.engine.Session;
import org.h2.log.UndoLogRecord;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.message.TraceSystem; import org.h2.message.TraceSystem;
import org.h2.result.Row; import org.h2.result.Row;
...@@ -21,6 +27,7 @@ import org.h2.store.Record; ...@@ -21,6 +27,7 @@ import org.h2.store.Record;
import org.h2.table.Column; import org.h2.table.Column;
import org.h2.table.IndexColumn; import org.h2.table.IndexColumn;
import org.h2.table.TableData; import org.h2.table.TableData;
import org.h2.util.New;
import org.h2.value.Value; import org.h2.value.Value;
import org.h2.value.ValueLob; import org.h2.value.ValueLob;
...@@ -36,13 +43,17 @@ public class PageScanIndex extends BaseIndex implements RowIndex { ...@@ -36,13 +43,17 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
private final int headPos; private final int headPos;
private int lastKey; private int lastKey;
private long rowCount; private long rowCount;
private HashSet<Row> delta;
private int rowCountDiff;
private HashMap<Integer, Integer> sessionRowCount;
public PageScanIndex(TableData table, int id, IndexColumn[] columns, IndexType indexType, int headPos, Session session) throws SQLException { public PageScanIndex(TableData table, int id, IndexColumn[] columns, IndexType indexType, int headPos, Session session) throws SQLException {
initBaseIndex(table, id, table.getName() + "_TABLE_SCAN", columns, indexType); initBaseIndex(table, id, table.getName() + "_TABLE_SCAN", columns, indexType);
int test; int test;
// trace.setLevel(TraceSystem.DEBUG); // trace.setLevel(TraceSystem.DEBUG);
if (database.isMultiVersion()) { if (database.isMultiVersion()) {
int todoMvcc; sessionRowCount = New.hashMap();
isMultiVersion = true;
} }
tableData = table; tableData = table;
this.store = database.getPageStore(); this.store = database.getPageStore();
...@@ -130,6 +141,16 @@ public class PageScanIndex extends BaseIndex implements RowIndex { ...@@ -130,6 +141,16 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
store.updateRecord(newRoot, true, null); store.updateRecord(newRoot, true, null);
root = newRoot; root = newRoot;
} }
if (database.isMultiVersion()) {
if (delta == null) {
delta = New.hashSet();
}
boolean wasDeleted = delta.remove(row);
if (!wasDeleted) {
delta.add(row);
}
incrementRowCount(session.getId(), 1);
}
rowCount++; rowCount++;
store.logAddOrRemoveRow(session, tableData.getId(), row, true); store.logAddOrRemoveRow(session, tableData.getId(), row, true);
} }
...@@ -143,10 +164,6 @@ public class PageScanIndex extends BaseIndex implements RowIndex { ...@@ -143,10 +164,6 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
PageData getPage(int id) throws SQLException { PageData getPage(int id) throws SQLException {
Record rec = store.getRecord(id); Record rec = store.getRecord(id);
if (rec != null) { if (rec != null) {
if (rec instanceof PageDataLeafOverflow) {
int test;
System.out.println("stop");
}
return (PageData) rec; return (PageData) rec;
} }
DataPage data = store.readPage(id); DataPage data = store.readPage(id);
...@@ -177,7 +194,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex { ...@@ -177,7 +194,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
public Cursor find(Session session, SearchRow first, SearchRow last) throws SQLException { public Cursor find(Session session, SearchRow first, SearchRow last) throws SQLException {
PageData root = getPage(headPos); PageData root = getPage(headPos);
return root.find(); return root.find(session);
} }
public Cursor findFirstOrLast(Session session, boolean first) throws SQLException { public Cursor findFirstOrLast(Session session, boolean first) throws SQLException {
...@@ -220,6 +237,18 @@ public class PageScanIndex extends BaseIndex implements RowIndex { ...@@ -220,6 +237,18 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
// lastKey--; // lastKey--;
// } // }
} }
if (database.isMultiVersion()) {
// if storage is null, the delete flag is not yet set
row.setDeleted(true);
if (delta == null) {
delta = New.hashSet();
}
boolean wasAdded = delta.remove(row);
if (!wasAdded) {
delta.add(row);
}
incrementRowCount(session.getId(), -1);
}
store.logAddOrRemoveRow(session, tableData.getId(), row, false); store.logAddOrRemoveRow(session, tableData.getId(), row, false);
} }
...@@ -238,6 +267,9 @@ public class PageScanIndex extends BaseIndex implements RowIndex { ...@@ -238,6 +267,9 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
if (tableData.getContainsLargeObject() && tableData.isPersistData()) { if (tableData.getContainsLargeObject() && tableData.isPersistData()) {
ValueLob.removeAllForTable(database, table.getId()); ValueLob.removeAllForTable(database, table.getId());
} }
if (database.isMultiVersion()) {
sessionRowCount.clear();
}
tableData.setRowCount(0); tableData.setRowCount(0);
} }
...@@ -279,6 +311,13 @@ public class PageScanIndex extends BaseIndex implements RowIndex { ...@@ -279,6 +311,13 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
} }
public long getRowCount(Session session) { public long getRowCount(Session session) {
if (database.isMultiVersion()) {
Integer i = sessionRowCount.get(session.getId());
long count = i == null ? 0 : i.intValue();
count += rowCount;
count -= rowCountDiff;
return count;
}
return rowCount; return rowCount;
} }
...@@ -301,4 +340,31 @@ public class PageScanIndex extends BaseIndex implements RowIndex { ...@@ -301,4 +340,31 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
int writeRowCount; int writeRowCount;
} }
Iterator<Row> getDelta() {
if (delta == null) {
List<Row> e = Collections.emptyList();
return e.iterator();
}
return delta.iterator();
}
private void incrementRowCount(int sessionId, int count) {
if (database.isMultiVersion()) {
Integer id = sessionId;
Integer c = sessionRowCount.get(id);
int current = c == null ? 0 : c.intValue();
sessionRowCount.put(id, current + count);
rowCountDiff += count;
}
}
public void commit(int operation, Row row) {
if (database.isMultiVersion()) {
if (delta != null) {
delta.remove(row);
}
incrementRowCount(row.getSessionId(), operation == UndoLogRecord.DELETE ? 1 : -1);
}
}
} }
...@@ -280,7 +280,7 @@ public class ScanIndex extends BaseIndex implements RowIndex { ...@@ -280,7 +280,7 @@ public class ScanIndex extends BaseIndex implements RowIndex {
throw Message.getUnsupportedException("SCAN"); throw Message.getUnsupportedException("SCAN");
} }
public Iterator<Row> getDelta() { Iterator<Row> getDelta() {
if (delta == null) { if (delta == null) {
List<Row> e = Collections.emptyList(); List<Row> e = Collections.emptyList();
return e.iterator(); return e.iterator();
......
...@@ -9,6 +9,7 @@ package org.h2.log; ...@@ -9,6 +9,7 @@ package org.h2.log;
import java.sql.SQLException; import java.sql.SQLException;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.store.PageStore;
/** /**
* Represents an in-doubt transaction (a transaction in the prepare phase). * Represents an in-doubt transaction (a transaction in the prepare phase).
...@@ -33,14 +34,26 @@ public class InDoubtTransaction { ...@@ -33,14 +34,26 @@ public class InDoubtTransaction {
// TODO 2-phase-commit: document sql statements and metadata table // TODO 2-phase-commit: document sql statements and metadata table
private LogFile log; private final PageStore store;
private int sessionId; private final LogFile log;
private int pos; private final int sessionId;
private String transaction; private final int pos;
private int blocks; private final String transaction;
private final int blocks;
private int state; private int state;
InDoubtTransaction(LogFile log, int sessionId, int pos, String transaction, int blocks) { /**
* Create a new in-doubt transaction info object.
*
* @param store the page store
* @param log the log file
* @param sessionId the session id
* @param pos the position
* @param transaction the transaction name
* @param blocks the number of blocks the 'prepare commit' entry occupies
*/
public InDoubtTransaction(PageStore store, LogFile log, int sessionId, int pos, String transaction, int blocks) {
this.store = store;
this.log = log; this.log = log;
this.sessionId = sessionId; this.sessionId = sessionId;
this.pos = pos; this.pos = pos;
...@@ -58,10 +71,18 @@ public class InDoubtTransaction { ...@@ -58,10 +71,18 @@ public class InDoubtTransaction {
public void setState(int state) throws SQLException { public void setState(int state) throws SQLException {
switch(state) { switch(state) {
case COMMIT: case COMMIT:
log.updatePreparedCommit(true, pos, sessionId, blocks); if (store != null) {
store.setInDoubtTransactionState(sessionId, pos, true);
} else {
log.updatePreparedCommit(true, pos, sessionId, blocks);
}
break; break;
case ROLLBACK: case ROLLBACK:
log.updatePreparedCommit(false, pos, sessionId, blocks); if (store != null) {
store.setInDoubtTransactionState(sessionId, pos, false);
} else {
log.updatePreparedCommit(false, pos, sessionId, blocks);
}
break; break;
default: default:
Message.throwInternalError("state="+state); Message.throwInternalError("state="+state);
......
...@@ -410,7 +410,7 @@ public class LogSystem { ...@@ -410,7 +410,7 @@ public class LogSystem {
// this is potentially a commit, so // this is potentially a commit, so
// don't roll back the action before it (currently) // don't roll back the action before it (currently)
setLastCommitForSession(sessionId, log.getId(), pos); setLastCommitForSession(sessionId, log.getId(), pos);
state.inDoubtTransaction = new InDoubtTransaction(log, sessionId, pos, transaction, blocks); state.inDoubtTransaction = new InDoubtTransaction(null, log, sessionId, pos, transaction, blocks);
} }
/** /**
...@@ -419,6 +419,9 @@ public class LogSystem { ...@@ -419,6 +419,9 @@ public class LogSystem {
* @return the list * @return the list
*/ */
public ObjectArray<InDoubtTransaction> getInDoubtTransactions() { public ObjectArray<InDoubtTransaction> getInDoubtTransactions() {
if (pageStore != null) {
return pageStore.getInDoubtTransactions();
}
return inDoubtTransactions; return inDoubtTransactions;
} }
...@@ -444,6 +447,9 @@ public class LogSystem { ...@@ -444,6 +447,9 @@ public class LogSystem {
return; return;
} }
synchronized (database) { synchronized (database) {
if (pageStore != null) {
pageStore.prepareCommit(session, transaction);
}
if (closed) { if (closed) {
return; return;
} }
......
...@@ -83,6 +83,10 @@ public class PageFreeList extends Record { ...@@ -83,6 +83,10 @@ public class PageFreeList extends Record {
*/ */
void free(int pageId) throws SQLException { void free(int pageId) throws SQLException {
full = false; full = false;
int test;
if (pageId - getPos() <= 0) {
System.out.println("stop!");
}
used.clear(pageId - getPos()); used.clear(pageId - getPos());
store.updateRecord(this, true, data); store.updateRecord(this, true, data);
} }
......
...@@ -89,7 +89,7 @@ public class PageInputStream extends InputStream { ...@@ -89,7 +89,7 @@ public class PageInputStream extends InputStream {
} }
int next; int next;
while (true) { while (true) {
next = trunk.getNextDataPage(); next = trunk.getNextPageData();
if (dataPage == -1 || dataPage == next) { if (dataPage == -1 || dataPage == next) {
if (next != 0) { if (next != 0) {
break; break;
...@@ -117,7 +117,7 @@ public class PageInputStream extends InputStream { ...@@ -117,7 +117,7 @@ public class PageInputStream extends InputStream {
PageStreamTrunk t = new PageStreamTrunk(store, trunkPage); PageStreamTrunk t = new PageStreamTrunk(store, trunkPage);
t.read(); t.read();
while (true) { while (true) {
int n = t.getNextDataPage(); int n = t.getNextPageData();
if (n == -1) { if (n == -1) {
break; break;
} }
...@@ -130,4 +130,8 @@ public class PageInputStream extends InputStream { ...@@ -130,4 +130,8 @@ public class PageInputStream extends InputStream {
} }
} }
int getDataPage() {
return data.getPos();
}
} }
...@@ -14,6 +14,7 @@ import java.io.IOException; ...@@ -14,6 +14,7 @@ import java.io.IOException;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.HashMap; import java.util.HashMap;
import org.h2.engine.Session; import org.h2.engine.Session;
import org.h2.log.InDoubtTransaction;
import org.h2.log.LogSystem; import org.h2.log.LogSystem;
import org.h2.log.SessionState; import org.h2.log.SessionState;
import org.h2.message.Message; import org.h2.message.Message;
...@@ -22,6 +23,8 @@ import org.h2.result.Row; ...@@ -22,6 +23,8 @@ import org.h2.result.Row;
import org.h2.util.BitField; import org.h2.util.BitField;
import org.h2.util.IntIntHashMap; import org.h2.util.IntIntHashMap;
import org.h2.util.New; import org.h2.util.New;
import org.h2.util.ObjectArray;
import org.h2.util.StringUtils;
import org.h2.value.Value; import org.h2.value.Value;
/** /**
...@@ -52,23 +55,35 @@ public class PageLog { ...@@ -52,23 +55,35 @@ public class PageLog {
*/ */
public static final int COMMIT = 2; public static final int COMMIT = 2;
/**
* A prepare commit entry for a session.
* Format: session id, transaction name length, transaction name (UTF-8).
*/
public static final int PREPARE_COMMIT = 3;
/**
* Roll back a prepared transaction.
* Format: session id.
*/
public static final int ROLLBACK = 4;
/** /**
* Add a record to a table. * Add a record to a table.
* Format: session id, table id, row. * Format: session id, table id, row.
*/ */
public static final int ADD = 3; public static final int ADD = 5;
/** /**
* Remove a record from a table. * Remove a record from a table.
* Format: session id, table id, row. * Format: session id, table id, row.
*/ */
public static final int REMOVE = 4; public static final int REMOVE = 6;
/** /**
* Perform a checkpoint. The log id is incremented. * Perform a checkpoint. The log id is incremented.
* Format: - * Format: -
*/ */
public static final int CHECKPOINT = 5; public static final int CHECKPOINT = 7;
/** /**
* The recovery stage to undo changes (re-apply the backup). * The recovery stage to undo changes (re-apply the backup).
...@@ -91,6 +106,7 @@ public class PageLog { ...@@ -91,6 +106,7 @@ public class PageLog {
private DataOutputStream out; private DataOutputStream out;
private ByteArrayOutputStream buffer; private ByteArrayOutputStream buffer;
private PageInputStream pageIn;
private PageOutputStream pageOut; private PageOutputStream pageOut;
private DataInputStream in; private DataInputStream in;
private int firstTrunkPage; private int firstTrunkPage;
...@@ -164,7 +180,8 @@ public class PageLog { ...@@ -164,7 +180,8 @@ public class PageLog {
in.allocateAllPages(); in.allocateAllPages();
return; return;
} }
in = new DataInputStream(new PageInputStream(store, firstTrunkPage, firstDataPage)); pageIn = new PageInputStream(store, firstTrunkPage, firstDataPage);
in = new DataInputStream(pageIn);
int logId = 0; int logId = 0;
DataPage data = store.createDataPage(); DataPage data = store.createDataPage();
try { try {
...@@ -200,6 +217,25 @@ public class PageLog { ...@@ -200,6 +217,25 @@ public class PageLog {
} }
} }
} }
} else if (x == PREPARE_COMMIT) {
int sessionId = in.readInt();
int len = in.readInt();
byte[] t = new byte[len];
in.readFully(t);
String transaction = StringUtils.utf8Decode(t);
if (trace.isDebugEnabled()) {
trace.debug("log prepare commit " + sessionId + " " + transaction + " pos:" + pos);
}
if (stage == RECOVERY_STAGE_UNDO) {
int page = pageIn.getDataPage();
setPrepareCommit(sessionId, page, transaction);
}
} else if (x == ROLLBACK) {
int sessionId = in.readInt();
if (trace.isDebugEnabled()) {
trace.debug("log rollback " + sessionId + " pos:" + pos);
}
// ignore - this entry is just informational
} else if (x == COMMIT) { } else if (x == COMMIT) {
int sessionId = in.readInt(); int sessionId = in.readInt();
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
...@@ -220,7 +256,6 @@ public class PageLog { ...@@ -220,7 +256,6 @@ public class PageLog {
} }
} }
if (stage == RECOVERY_STAGE_REDO) { if (stage == RECOVERY_STAGE_REDO) {
// TODO probably still required for 2 phase commit
sessionStates = New.hashMap(); sessionStates = New.hashMap();
} }
} catch (EOFException e) { } catch (EOFException e) {
...@@ -230,6 +265,25 @@ public class PageLog { ...@@ -230,6 +265,25 @@ public class PageLog {
} }
} }
/**
* This method is called when a 'prepare commit' log entry is read when
* opening the database.
*
* @param sessionId the session id
* @param the data page with the prepare entry
* @param transaction the transaction name, or null to rollback
*/
private void setPrepareCommit(int sessionId, int pageId, String transaction) {
SessionState state = getOrAddSessionState(sessionId);
InDoubtTransaction doubt;
if (transaction == null) {
doubt = null;
} else {
doubt = new InDoubtTransaction(store, null, sessionId, pageId, transaction, 0);
}
state.inDoubtTransaction = doubt;
}
/** /**
* Read a row from an input stream. * Read a row from an input stream.
* *
...@@ -286,14 +340,14 @@ public class PageLog { ...@@ -286,14 +340,14 @@ public class PageLog {
} }
/** /**
* Mark a committed transaction. * Mark a transaction as committed.
* *
* @param session the session * @param session the session
*/ */
void commit(Session session) throws SQLException { void commit(int sessionId) throws SQLException {
try { try {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log commit s:" + session.getId()); trace.debug("log commit s:" + sessionId);
} }
LogSystem log = store.getDatabase().getLog(); LogSystem log = store.getDatabase().getLog();
if (log == null) { if (log == null) {
...@@ -301,7 +355,72 @@ public class PageLog { ...@@ -301,7 +355,72 @@ public class PageLog {
return; return;
} }
out.write(COMMIT); out.write(COMMIT);
out.writeInt(sessionId);
flushOut();
if (log.getFlushOnEachCommit()) {
flush();
}
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/**
* Prepare a transaction.
*
* @param session the session
* @param transaction the name of the transaction
*/
void prepareCommit(Session session, String transaction) throws SQLException {
try {
if (trace.isDebugEnabled()) {
trace.debug("log prepare commit s:" + session.getId() + " " + transaction);
}
LogSystem log = store.getDatabase().getLog();
if (log == null) {
// database already closed
return;
}
// store it on a separate log page
int pageSize = store.getPageSize();
byte[] t = StringUtils.utf8Encode(transaction);
int len = t.length;
if (1 + DataPage.LENGTH_INT * 2 + len >= PageStreamData.getCapacity(pageSize)) {
throw Message.getInvalidValueException("transaction name too long", transaction);
}
pageOut.fillDataPage();
out.write(PREPARE_COMMIT);
out.writeInt(session.getId()); out.writeInt(session.getId());
out.writeInt(len);
out.write(t);
flushOut();
// store it on a separate log page
pageOut.fillDataPage();
if (log.getFlushOnEachCommit()) {
flush();
}
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/**
* Rollback a prepared transaction.
*
* @param session the session
*/
void rollbackPrepared(int sessionId) throws SQLException {
try {
if (trace.isDebugEnabled()) {
trace.debug("log rollback prepared s:" + sessionId);
}
LogSystem log = store.getDatabase().getLog();
if (log == null) {
// database already closed
return;
}
out.write(ROLLBACK);
out.writeInt(sessionId);
flushOut(); flushOut();
if (log.getFlushOnEachCommit()) { if (log.getFlushOnEachCommit()) {
flush(); flush();
...@@ -452,7 +571,7 @@ public class PageLog { ...@@ -452,7 +571,7 @@ public class PageLog {
* @param sessionId the session id * @param sessionId the session id
* @return the session state object * @return the session state object
*/ */
private SessionState getOrAddSessionState(int sessionId) { SessionState getOrAddSessionState(int sessionId) {
Integer key = sessionId; Integer key = sessionId;
SessionState state = sessionStates.get(key); SessionState state = sessionStates.get(key);
if (state == null) { if (state == null) {
...@@ -463,8 +582,45 @@ public class PageLog { ...@@ -463,8 +582,45 @@ public class PageLog {
return state; return state;
} }
public long getSize() { long getSize() {
return pageOut.getSize(); return pageOut.getSize();
} }
ObjectArray<InDoubtTransaction> getInDoubtTransactions() {
ObjectArray<InDoubtTransaction> list = ObjectArray.newInstance();
for (SessionState state : sessionStates.values()) {
InDoubtTransaction in = state.inDoubtTransaction;
if (in != null) {
list.add(in);
}
}
return list;
}
/**
* Set the state of an in-doubt transaction.
*
* @param sessionId the session
* @param pageId the page where the commit was prepared
* @param commit whether the transaction should be committed
*/
void setInDoubtTransactionState(int sessionId, int pageId, boolean commit) throws SQLException {
PageStreamData d = new PageStreamData(store, pageId, 0);
d.read();
d.initWrite();
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream o = new DataOutputStream(buff);
try {
o.write(commit ? COMMIT : ROLLBACK);
o.writeInt(sessionId);
} catch (IOException e) {
throw Message.convertIOException(e, "");
}
byte[] bytes = buff.toByteArray();
d.write(buff.toByteArray(), 0, bytes.length);
bytes = new byte[d.getRemaining()];
d.write(bytes, 0, bytes.length);
d.write(null);
}
} }
...@@ -85,7 +85,7 @@ public class PageOutputStream extends OutputStream { ...@@ -85,7 +85,7 @@ public class PageOutputStream extends OutputStream {
} }
private void initNextData() throws SQLException { private void initNextData() throws SQLException {
int nextData = trunk == null ? -1 : trunk.getNextDataPage(); int nextData = trunk == null ? -1 : trunk.getNextPageData();
if (nextData == -1) { if (nextData == -1) {
int parent = trunkPageId; int parent = trunkPageId;
if (trunkNext != 0) { if (trunkNext != 0) {
...@@ -101,7 +101,7 @@ public class PageOutputStream extends OutputStream { ...@@ -101,7 +101,7 @@ public class PageOutputStream extends OutputStream {
pages++; pages++;
trunk.write(null); trunk.write(null);
reservedPages.removeRange(0, len + 1); reservedPages.removeRange(0, len + 1);
nextData = trunk.getNextDataPage(); nextData = trunk.getNextPageData();
} }
data = new PageStreamData(store, nextData, trunk.getPos()); data = new PageStreamData(store, nextData, trunk.getPos());
pages++; pages++;
......
...@@ -20,7 +20,9 @@ import org.h2.index.Index; ...@@ -20,7 +20,9 @@ import org.h2.index.Index;
import org.h2.index.IndexType; import org.h2.index.IndexType;
import org.h2.index.PageBtreeIndex; import org.h2.index.PageBtreeIndex;
import org.h2.index.PageScanIndex; import org.h2.index.PageScanIndex;
import org.h2.log.InDoubtTransaction;
import org.h2.log.LogSystem; import org.h2.log.LogSystem;
import org.h2.log.SessionState;
import org.h2.message.Message; import org.h2.message.Message;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.message.TraceSystem; import org.h2.message.TraceSystem;
...@@ -69,7 +71,7 @@ import org.h2.value.ValueString; ...@@ -69,7 +71,7 @@ import org.h2.value.ValueString;
*/ */
public class PageStore implements CacheWriter { public class PageStore implements CacheWriter {
// TODO TestSampleApps // TODO TestTwoPhaseCommit
// TODO TestIndex.wideIndex: btree nodes should be full // TODO TestIndex.wideIndex: btree nodes should be full
// TODO check memory usage // TODO check memory usage
// TODO PageStore.openMetaIndex (desc and nulls first / last) // TODO PageStore.openMetaIndex (desc and nulls first / last)
...@@ -115,6 +117,7 @@ public class PageStore implements CacheWriter { ...@@ -115,6 +117,7 @@ public class PageStore implements CacheWriter {
// TODO when removing DiskFile: // TODO when removing DiskFile:
// remove CacheObject.blockCount // remove CacheObject.blockCount
// remove Record.getMemorySize // remove Record.getMemorySize
// simplify InDoubtTransaction
/** /**
* The smallest possible page size. * The smallest possible page size.
...@@ -348,6 +351,7 @@ public class PageStore implements CacheWriter { ...@@ -348,6 +351,7 @@ public class PageStore implements CacheWriter {
} }
if (writeVersion != 0) { if (writeVersion != 0) {
close(); close();
database.setReadOnly(true);
accessMode = "r"; accessMode = "r";
file = database.openFile(fileName, accessMode, true); file = database.openFile(fileName, accessMode, true);
} }
...@@ -520,7 +524,8 @@ public class PageStore implements CacheWriter { ...@@ -520,7 +524,8 @@ public class PageStore implements CacheWriter {
} }
private void freePage(int pageId) throws SQLException { private void freePage(int pageId) throws SQLException {
PageFreeList list = getFreeList(pageId / freeListPagesPerList); int i = (pageId - PAGE_ID_FREE_LIST_ROOT) / freeListPagesPerList;
PageFreeList list = getFreeList(i);
list.free(pageId); list.free(pageId);
} }
...@@ -686,32 +691,22 @@ public class PageStore implements CacheWriter { ...@@ -686,32 +691,22 @@ public class PageStore implements CacheWriter {
} }
/** /**
* Run one recovery stage. There are three recovery stages: 0: only the undo * Run recovery.
* steps are run (restoring the state before the last checkpoint). 1: the
* pages that are used by the transaction log are allocated. 2: the
* committed operations are re-applied.
*/ */
private void recover() throws SQLException { private void recover() throws SQLException {
trace.debug("log recover"); trace.debug("log recover");
try { recoveryRunning = true;
recoveryRunning = true; log.recover(PageLog.RECOVERY_STAGE_UNDO);
log.recover(PageLog.RECOVERY_STAGE_UNDO); log.recover(PageLog.RECOVERY_STAGE_ALLOCATE);
log.recover(PageLog.RECOVERY_STAGE_ALLOCATE); openMetaIndex();
openMetaIndex(); readMetaData();
readMetaData(); log.recover(PageLog.RECOVERY_STAGE_REDO);
log.recover(PageLog.RECOVERY_STAGE_REDO); if (log.getInDoubtTransactions().size() == 0) {
switchLog(); switchLog();
} catch (SQLException e) { } else {
int test; database.setReadOnly(true);
e.printStackTrace();
throw e;
} catch (RuntimeException e) {
int test;
e.printStackTrace();
throw e;
} finally {
recoveryRunning = false;
} }
recoveryRunning = false;
PageScanIndex index = (PageScanIndex) metaObjects.get(0); PageScanIndex index = (PageScanIndex) metaObjects.get(0);
if (index == null) { if (index == null) {
systemTableHeadPos = Index.EMPTY_HEAD; systemTableHeadPos = Index.EMPTY_HEAD;
...@@ -721,7 +716,6 @@ public class PageStore implements CacheWriter { ...@@ -721,7 +716,6 @@ public class PageStore implements CacheWriter {
for (Index openIndex : metaObjects.values()) { for (Index openIndex : metaObjects.values()) {
openIndex.close(database.getSystemSession()); openIndex.close(database.getSystemSession());
} }
metaObjects = null;
trace.debug("log recover done"); trace.debug("log recover done");
} }
...@@ -748,13 +742,25 @@ public class PageStore implements CacheWriter { ...@@ -748,13 +742,25 @@ public class PageStore implements CacheWriter {
*/ */
public void commit(Session session) throws SQLException { public void commit(Session session) throws SQLException {
synchronized (database) { synchronized (database) {
log.commit(session); log.commit(session.getId());
if (log.getSize() > maxLogSize) { if (log.getSize() > maxLogSize) {
checkpoint(); checkpoint();
} }
} }
} }
/**
* Prepare a transaction.
*
* @param session the session
* @param transaction the name of the transaction
*/
public void prepareCommit(Session session, String transaction) throws SQLException {
synchronized (database) {
log.prepareCommit(session, transaction);
}
}
/** /**
* Get the position of the system table head. * Get the position of the system table head.
* *
...@@ -961,4 +967,31 @@ public class PageStore implements CacheWriter { ...@@ -961,4 +967,31 @@ public class PageStore implements CacheWriter {
this.maxLogSize = maxSize; this.maxLogSize = maxSize;
} }
/**
* Commit or rollback a prepared transaction after opening a database with in-doubt
* transactions.
*
* @param sessionId the session id
* @param pageId the page where the transaction was prepared
* @param commit if the transaction should be committed
*/
public void setInDoubtTransactionState(int sessionId, int pageId, boolean commit) throws SQLException {
boolean old = database.isReadOnly();
try {
database.setReadOnly(false);
log.setInDoubtTransactionState(sessionId, pageId, commit);
} finally {
database.setReadOnly(old);
}
}
/**
* Get the list of in-doubt transaction.
*
* @return the list
*/
public ObjectArray<InDoubtTransaction> getInDoubtTransactions() {
return log.getInDoubtTransactions();
}
} }
...@@ -26,7 +26,7 @@ public class PageStreamData extends Record { ...@@ -26,7 +26,7 @@ public class PageStreamData extends Record {
private static final int DATA_START = 9; private static final int DATA_START = 9;
private final PageStore store; private final PageStore store;
private final int trunk; private int trunk;
private DataPage data; private DataPage data;
private int remaining; private int remaining;
private int length; private int length;
...@@ -43,6 +43,7 @@ public class PageStreamData extends Record { ...@@ -43,6 +43,7 @@ public class PageStreamData extends Record {
void read() throws SQLException { void read() throws SQLException {
data = store.createDataPage(); data = store.createDataPage();
store.readPage(getPos(), data); store.readPage(getPos(), data);
trunk = data.readInt();
data.setPos(4); data.setPos(4);
int t = data.readByte(); int t = data.readByte();
if (t != Page.TYPE_STREAM_DATA) { if (t != Page.TYPE_STREAM_DATA) {
...@@ -65,6 +66,7 @@ public class PageStreamData extends Record { ...@@ -65,6 +66,7 @@ public class PageStreamData extends Record {
data.writeByte((byte) Page.TYPE_STREAM_DATA); data.writeByte((byte) Page.TYPE_STREAM_DATA);
data.writeInt(0); data.writeInt(0);
remaining = store.getPageSize() - data.length(); remaining = store.getPageSize() - data.length();
length = 0;
} }
/** /**
......
...@@ -72,7 +72,7 @@ public class PageStreamTrunk extends Record { ...@@ -72,7 +72,7 @@ public class PageStreamTrunk extends Record {
pageIds[index++] = page; pageIds[index++] = page;
} }
int getNextDataPage() { int getNextPageData() {
if (index >= pageIds.length) { if (index >= pageIds.length) {
return -1; return -1;
} }
......
...@@ -52,6 +52,7 @@ import org.h2.util.ObjectArray; ...@@ -52,6 +52,7 @@ import org.h2.util.ObjectArray;
import org.h2.util.RandomUtils; import org.h2.util.RandomUtils;
import org.h2.util.SmallLRUCache; import org.h2.util.SmallLRUCache;
import org.h2.util.StatementBuilder; import org.h2.util.StatementBuilder;
import org.h2.util.StringUtils;
import org.h2.util.TempFileDeleter; import org.h2.util.TempFileDeleter;
import org.h2.util.Tool; import org.h2.util.Tool;
import org.h2.value.Value; import org.h2.value.Value;
...@@ -864,6 +865,16 @@ public class Recover extends Tool implements DataHandler { ...@@ -864,6 +865,16 @@ public class Recover extends Tool implements DataHandler {
} else if (x == PageLog.COMMIT) { } else if (x == PageLog.COMMIT) {
int sessionId = in.readInt(); int sessionId = in.readInt();
writer.println("-- commit " + sessionId); writer.println("-- commit " + sessionId);
} else if (x == PageLog.ROLLBACK) {
int sessionId = in.readInt();
writer.println("-- rollback " + sessionId);
} else if (x == PageLog.PREPARE_COMMIT) {
int sessionId = in.readInt();
int len = in.readInt();
byte[] t = new byte[len];
in.readFully(t);
String transaction = StringUtils.utf8Decode(t);
writer.println("-- prepare commit " + sessionId + " " + transaction);
} else if (x == PageLog.NOOP) { } else if (x == PageLog.NOOP) {
// nothing to do // nothing to do
} else if (x == PageLog.CHECKPOINT) { } else if (x == PageLog.CHECKPOINT) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论