提交 b9e88d26 authored 作者: Thomas Mueller's avatar Thomas Mueller

Page store: log truncate

上级 18db1b22
......@@ -294,6 +294,7 @@ public class PageScanIndex extends BaseIndex implements RowIndex {
if (trace.isDebugEnabled()) {
trace.debug("truncate");
}
store.logTruncate(session, tableData.getId());
removeAllRows();
if (tableData.getContainsLargeObject() && tableData.isPersistData()) {
ValueLob.removeAllForTable(database, table.getId());
......
......@@ -80,17 +80,23 @@ public class PageLog {
*/
public static final int REMOVE = 6;
/**
* Truncate a table.
* Format: session id, table id.
*/
public static final int TRUNCATE = 7;
/**
* Perform a checkpoint. The log id is incremented.
* Format: -
*/
public static final int CHECKPOINT = 7;
public static final int CHECKPOINT = 8;
/**
* Free a log page.
* Format: count, page ids
*/
public static final int FREE_LOG = 8;
public static final int FREE_LOG = 9;
/**
* The recovery stage to undo changes (re-apply the backup).
......@@ -152,11 +158,7 @@ public class PageLog {
*/
void free() throws SQLException {
while (this.firstTrunkPage != 0) {
// first remove all old log pages
if (store.getRecord(firstTrunkPage) != null) {
// if the page is in use, don't free it
// TODO cleanup - this is a hack
// break;
throw Message.throwInternalError("" + store.getRecord(firstTrunkPage));
}
PageStreamTrunk t = new PageStreamTrunk(store, this.firstTrunkPage);
......@@ -242,6 +244,21 @@ public class PageLog {
}
}
}
} else if (x == TRUNCATE) {
int sessionId = in.readInt();
int tableId = in.readInt();
if (stage == RECOVERY_STAGE_REDO) {
if (isSessionCommitted(sessionId, logId, pos)) {
if (trace.isDebugEnabled()) {
trace.debug("log redo truncate table:" + tableId);
}
store.redoTruncate(tableId);
} else {
if (trace.isDebugEnabled()) {
trace.debug("log ignore s:" + sessionId + " truncate table:" + tableId);
}
}
}
} else if (x == PREPARE_COMMIT) {
int sessionId = in.readInt();
int len = in.readInt();
......@@ -482,6 +499,29 @@ public class PageLog {
throw Message.convertIOException(e, null);
}
}
/**
* A table is truncated.
*
* @param session the session
* @param tableId the table id
*/
void logTruncate(Session session, int tableId) throws SQLException {
try {
if (trace.isDebugEnabled()) {
trace.debug("log truncate s:" + session.getId() + " table:" + tableId);
}
session.addLogPos(logId, logPos);
data.reset();
out.write(TRUNCATE);
out.writeInt(session.getId());
out.writeInt(tableId);
flushOut();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
/**
* Flush the transaction log.
......
......@@ -69,6 +69,11 @@ import org.h2.value.ValueString;
*/
public class PageStore implements CacheWriter {
// TODO what if the log contains undo page for a later log page
// TODO what if the log contains a head page for a later log page
// TODO allocate log: must not use page if undo is in an active log
// TODO or don't redo if page is now a log page
// TODO var int: see google protocol buffers
// TODO don't save parent (only root); remove setPageId
// TODO implement checksum - 0 for empty
......@@ -77,6 +82,7 @@ public class PageStore implements CacheWriter {
// TODO replace CRC32
// TODO optimization: try to avoid allocating a byte array per page
// TODO PageBtreeNode: 4 bytes offset - others use only 2
// TODO block compression: don't store the middle zeroes
// TODO block compression: maybe http://en.wikipedia.org/wiki/LZJB
// with RLE, specially for 0s.
// TODO order pages so that searching for a key only seeks forward
......@@ -879,6 +885,17 @@ public class PageStore implements CacheWriter {
table.removeRow(systemSession, row);
}
}
/**
* Redo a truncate.
*
* @param tableId the object id of the table
*/
void redoTruncate(int tableId) throws SQLException {
PageScanIndex index = (PageScanIndex) metaObjects.get(tableId);
Table table = index.getTable();
table.truncate(systemSession);
}
private void openMetaIndex() throws SQLException {
ObjectArray<Column> cols = ObjectArray.newInstance();
......@@ -1108,6 +1125,20 @@ public class PageStore implements CacheWriter {
return writeCount;
}
/**
* A table is truncated.
*
* @param session the session
* @param tableId the table id
*/
public void logTruncate(Session session, int tableId) throws SQLException {
synchronized (database) {
if (!recoveryRunning) {
log.logTruncate(session, tableId);
}
}
}
// TODO implement checksum
// private void updateChecksum(byte[] d, int pos) {
// int ps = pageSize;
......
......@@ -881,6 +881,12 @@ public class Recover extends Tool implements DataHandler {
writer.println("-- session " + sessionId +
" table " + storageId +
" " + (x == PageLog.ADD ? "add" : "remove") + " " + row.toString());
} else if (x == PageLog.TRUNCATE) {
int sessionId = in.readInt();
setStorage(in.readInt());
writer.println("-- session " + sessionId +
" table " + storageId +
" truncate");
} else if (x == PageLog.COMMIT) {
int sessionId = in.readInt();
writer.println("-- commit " + sessionId);
......
......@@ -32,11 +32,33 @@ public class TestPageStore extends TestBase {
}
public void test() throws Exception {
testTruncate();
testLargeIndex();
testUniqueIndex();
testCreateIndexLater();
testFuzzOperations();
}
private void testTruncate() throws SQLException {
if (config.memory) {
return;
}
deleteDb("pageStore");
Connection conn = getConnection("pageStore");
Statement stat = conn.createStatement();
stat.execute("set write_delay 0");
stat.execute("create table test(id int) as select 1");
stat.execute("truncate table test");
stat.execute("insert into test values(1)");
stat.execute("shutdown immediately");
try {
conn.close();
} catch (SQLException e) {
// ignore
}
conn = getConnection("pageStore");
conn.close();
}
private void testLargeIndex() throws SQLException {
if (config.memory) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论