提交 0d3a6cc7 authored 作者: Thomas Mueller's avatar Thomas Mueller

Page store: shrink the log

上级 94484549
......@@ -451,6 +451,7 @@ See also <a href="build.html#providing_patches">Providing Patches</a>.
</li><li>Issue 107: Prefer using the ORDER BY index if LIMIT is used.
</li><li>Support reading sequences using DatabaseMetaData.getTables(null, null, null, new String[]{"SEQUENCE"}).
See PostgreSQL.
</li><li>Add option to enable TCP_NODELAY using Socket.setTcpNoDelay(true).
</li></ul>
<h2>Not Planned</h2>
......
/*
* Copyright 2004-2009 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.store;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import org.h2.util.IOUtils;
/**
* This class is backed by an input stream and supports reading values and
* variable size data.
*/
public class DataReader {
private static final EOFException EOF = new EOFException();
private InputStream in;
/**
* Create a new data reader.
*
* @param in the input stream
*/
public DataReader(InputStream in) {
this.in = in;
}
/**
* Read a byte.
*
* @return the byte
*/
public byte read() throws IOException {
int x = in.read();
if (x < 0) {
throw EOF;
}
return (byte) x;
}
/**
* Read a variable size integer.
*
* @return the value
*/
public int readVarInt() throws IOException {
int b = read();
if (b >= 0) {
return b;
}
int x = b & 0x7f;
b = read();
if (b >= 0) {
return x | (b << 7);
}
x |= (b & 0x7f) << 7;
b = read();
if (b >= 0) {
return x | (b << 14);
}
x |= (b & 0x7f) << 14;
b = read();
if (b >= 0) {
return x | b << 21;
}
return x | ((b & 0x7f) << 21) | (read() << 28);
}
/**
* Read a variable size long.
*
* @return the value
*/
public long readVarLong() throws IOException {
long x = read();
if (x >= 0) {
return x;
}
x &= 0x7f;
for (int s = 7;; s += 7) {
long b = read();
x |= (b & 0x7f) << s;
if (b >= 0) {
return x;
}
}
}
/**
* Read an integer.
*
* @return the value
*/
public int readInt() throws IOException {
return (read() << 24) + ((read() & 0xff) << 16) + ((read() & 0xff) << 8) + (read() & 0xff);
}
/**
* Read a long.
*
* @return the value
*/
public long readLong() throws IOException {
return ((long) (readInt()) << 32) + (readInt() & 0xffffffffL);
}
/**
* Read a number of bytes.
*
* @param buff the target buffer
* @param offset the offset within the target buffer
* @param len the number of bytes to read
*/
public void readFully(byte[] buff, int offset, int len) throws IOException {
int got = IOUtils.readFully(in, buff, offset, len);
if (got < len) {
throw EOF;
}
}
/**
* Read a string from the stream.
*
* @return the string
*/
public String readString() throws IOException {
int len = readVarInt();
return readString(len);
}
private String readString(int len) throws IOException {
char[] chars = new char[len];
for (int i = 0; i < len; i++) {
int x = read() & 0xff;
if (x < 0x80) {
chars[i] = (char) x;
} else if (x >= 0xe0) {
chars[i] = (char) (((x & 0xf) << 12) + ((read() & 0x3f) << 6) + (read() & 0x3f));
} else {
chars[i] = (char) (((x & 0x1f) << 6) + (read() & 0x3f));
}
}
return new String(chars);
}
}
......@@ -6,13 +6,11 @@
*/
package org.h2.store;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
import org.h2.compress.CompressLZF;
import org.h2.engine.Session;
import org.h2.log.InDoubtTransaction;
import org.h2.log.LogSystem;
......@@ -25,7 +23,6 @@ import org.h2.util.IntArray;
import org.h2.util.IntIntHashMap;
import org.h2.util.New;
import org.h2.util.ObjectArray;
import org.h2.util.StringUtils;
import org.h2.value.Value;
/**
......@@ -48,43 +45,43 @@ public class PageLog {
/**
* An undo log entry.
* Format: page id, page.
* Format: page id: varInt, page.
*/
public static final int UNDO = 1;
/**
* A commit entry of a session.
* Format: session id.
* Format: session id: varInt.
*/
public static final int COMMIT = 2;
/**
* A prepare commit entry for a session.
* Format: session id, transaction name length, transaction name (UTF-8).
* Format: session id: varInt, transaction name: string.
*/
public static final int PREPARE_COMMIT = 3;
/**
* Roll back a prepared transaction.
* Format: session id.
* Format: session id: varInt.
*/
public static final int ROLLBACK = 4;
/**
* Add a record to a table.
* Format: session id, table id, row.
* Format: session id: varInt, table id: varInt, row.
*/
public static final int ADD = 5;
/**
* Remove a record from a table.
* Format: session id, table id, row.
* Format: session id: varInt, table id: varInt, row.
*/
public static final int REMOVE = 6;
/**
* Truncate a table.
* Format: session id, table id.
* Format: session id: varInt, table id: varInt.
*/
public static final int TRUNCATE = 7;
......@@ -96,7 +93,7 @@ public class PageLog {
/**
* Free a log page.
* Format: count, page ids
* Format: count: varInt, page ids: varInt
*/
public static final int FREE_LOG = 9;
......@@ -115,21 +112,25 @@ public class PageLog {
*/
static final int RECOVERY_STAGE_REDO = 2;
private static final boolean COMPRESS_UNDO = true;
private final PageStore store;
private int pos;
private Trace trace;
private DataOutputStream out;
private ByteArrayOutputStream buffer;
private Data outBuffer;
private PageInputStream pageIn;
private PageOutputStream pageOut;
private DataInputStream in;
private DataReader in;
private int firstTrunkPage;
private int firstDataPage;
private Data data;
private int logSectionId, logPos;
private int firstLogId;
private CompressLZF compress;
private byte[] compressBuffer;
/**
* If the bit is set, the given page was written to the current log section.
* The undo entry of these pages doesn't need to be written again.
......@@ -163,6 +164,8 @@ public class PageLog {
this.store = store;
data = store.createData();
trace = store.getTrace();
compress = new CompressLZF();
compressBuffer = new byte[store.getPageSize() * 2];
}
/**
......@@ -178,8 +181,7 @@ public class PageLog {
pageOut = new PageOutputStream(store, firstTrunkPage, undoAll, atEnd);
pageOut.reserve(1);
store.setLogFirstPage(firstTrunkPage, pageOut.getCurrentDataPageId());
buffer = new ByteArrayOutputStream();
out = new DataOutputStream(buffer);
outBuffer = store.createData();
}
/**
......@@ -227,7 +229,7 @@ public class PageLog {
return;
}
pageIn = new PageInputStream(store, firstTrunkPage, firstDataPage);
in = new DataInputStream(pageIn);
in = new DataReader(pageIn);
int logId = 0;
Data data = store.createData();
try {
......@@ -239,8 +241,14 @@ public class PageLog {
}
pos++;
if (x == UNDO) {
int pageId = in.readInt();
in.readFully(data.getBytes(), 0, store.getPageSize());
int pageId = in.readVarInt();
int size = in.readVarInt();
if (size == store.getPageSize()) {
in.readFully(data.getBytes(), 0, size);
} else {
in.readFully(compressBuffer, 0, size);
compress.expand(compressBuffer, 0, size, data.getBytes(), 0, store.getPageSize());
}
if (stage == RECOVERY_STAGE_UNDO) {
if (!undo.get(pageId)) {
if (trace.isDebugEnabled()) {
......@@ -252,8 +260,8 @@ public class PageLog {
}
}
} else if (x == ADD) {
int sessionId = in.readInt();
int tableId = in.readInt();
int sessionId = in.readVarInt();
int tableId = in.readVarInt();
Row row = readRow(in, data);
if (stage == RECOVERY_STAGE_UNDO) {
store.allocateIfIndexRoot(pos, tableId, row);
......@@ -270,9 +278,9 @@ public class PageLog {
}
}
} else if (x == REMOVE) {
int sessionId = in.readInt();
int tableId = in.readInt();
long key = in.readLong();
int sessionId = in.readVarInt();
int tableId = in.readVarInt();
long key = in.readVarLong();
if (stage == RECOVERY_STAGE_REDO) {
if (isSessionCommitted(sessionId, logId, pos)) {
if (trace.isDebugEnabled()) {
......@@ -286,8 +294,8 @@ public class PageLog {
}
}
} else if (x == TRUNCATE) {
int sessionId = in.readInt();
int tableId = in.readInt();
int sessionId = in.readVarInt();
int tableId = in.readVarInt();
if (stage == RECOVERY_STAGE_REDO) {
if (isSessionCommitted(sessionId, logId, pos)) {
if (trace.isDebugEnabled()) {
......@@ -301,11 +309,8 @@ public class PageLog {
}
}
} else if (x == PREPARE_COMMIT) {
int sessionId = in.readInt();
int len = in.readInt();
byte[] t = new byte[len];
in.readFully(t);
String transaction = StringUtils.utf8Decode(t);
int sessionId = in.readVarInt();
String transaction = in.readString();
if (trace.isDebugEnabled()) {
trace.debug("log prepare commit " + sessionId + " " + transaction + " pos:" + pos);
}
......@@ -314,13 +319,13 @@ public class PageLog {
setPrepareCommit(sessionId, page, transaction);
}
} else if (x == ROLLBACK) {
int sessionId = in.readInt();
int sessionId = in.readVarInt();
if (trace.isDebugEnabled()) {
trace.debug("log rollback " + sessionId + " pos:" + pos);
}
// ignore - this entry is just informational
} else if (x == COMMIT) {
int sessionId = in.readInt();
int sessionId = in.readVarInt();
if (trace.isDebugEnabled()) {
trace.debug("log commit " + sessionId + " pos:" + pos);
}
......@@ -332,9 +337,9 @@ public class PageLog {
} else if (x == CHECKPOINT) {
logId++;
} else if (x == FREE_LOG) {
int count = in.readInt();
int count = in.readVarInt();
for (int i = 0; i < count; i++) {
int pageId = in.readInt();
int pageId = in.readVarInt();
if (stage == RECOVERY_STAGE_REDO) {
if (!usedLogPages.get(pageId)) {
store.freePage(pageId, false, null);
......@@ -385,20 +390,20 @@ public class PageLog {
* @param data a temporary buffer
* @return the row
*/
public static Row readRow(DataInputStream in, Data data) throws IOException, SQLException {
int pos = in.readInt();
int len = in.readInt();
public static Row readRow(DataReader in, Data data) throws IOException, SQLException {
long pos = in.readVarLong();
int len = in.readVarInt();
data.reset();
data.checkCapacity(len);
in.readFully(data.getBytes(), 0, len);
int columnCount = data.readInt();
int columnCount = data.readVarInt();
Value[] values = new Value[columnCount];
for (int i = 0; i < columnCount; i++) {
values[i] = data.readValue();
}
// TODO maybe calculate the memory usage
Row row = new Row(values, 0);
row.setPos(pos);
row.setPos((int) pos);
return row;
}
......@@ -419,9 +424,24 @@ public class PageLog {
}
undo.set(pageId);
undoAll.set(pageId);
out.write(UNDO);
out.writeInt(pageId);
out.write(page.getBytes(), 0, store.getPageSize());
outBuffer.writeByte((byte) UNDO);
outBuffer.writeVarInt(pageId);
int pageSize = store.getPageSize();
if (COMPRESS_UNDO) {
int size = compress.compress(page.getBytes(), pageSize, compressBuffer, 0);
if (size < pageSize) {
outBuffer.writeVarInt(size);
outBuffer.checkCapacity(size);
outBuffer.write(compressBuffer, 0, size);
} else {
outBuffer.writeVarInt(pageSize);
outBuffer.checkCapacity(pageSize);
outBuffer.write(page.getBytes(), 0, pageSize);
}
} else {
outBuffer.checkCapacity(pageSize);
outBuffer.write(page.getBytes(), 0, pageSize);
}
flushOut();
} catch (IOException e) {
throw Message.convertIOException(e, null);
......@@ -433,10 +453,10 @@ public class PageLog {
if (trace.isDebugEnabled()) {
trace.debug("log frees " + pages.get(0) + ".." + pages.get(pages.size() - 1));
}
out.write(FREE_LOG);
out.writeInt(pages.size());
outBuffer.writeByte((byte) FREE_LOG);
outBuffer.writeVarInt(pages.size());
for (int i = 0; i < pages.size(); i++) {
out.writeInt(pages.get(i));
outBuffer.writeInt(pages.get(i));
}
flushOut();
} catch (IOException e) {
......@@ -445,9 +465,8 @@ public class PageLog {
}
private void flushOut() throws IOException {
out.flush();
pageOut.write(buffer.toByteArray());
buffer.reset();
pageOut.write(outBuffer.getBytes(), 0, outBuffer.length());
outBuffer.reset();
}
/**
......@@ -465,8 +484,8 @@ public class PageLog {
// database already closed
return;
}
out.write(COMMIT);
out.writeInt(sessionId);
outBuffer.writeByte((byte) COMMIT);
outBuffer.writeVarInt(sessionId);
flushOut();
if (log.getFlushOnEachCommit()) {
flush();
......@@ -494,16 +513,13 @@ public class PageLog {
}
// store it on a separate log page
int pageSize = store.getPageSize();
byte[] t = StringUtils.utf8Encode(transaction);
int len = t.length;
if (1 + DataPage.LENGTH_INT * 2 + len >= PageStreamData.getCapacity(pageSize)) {
throw Message.getInvalidValueException("transaction name too long", transaction);
}
pageOut.fillPage();
out.write(PREPARE_COMMIT);
out.writeInt(session.getId());
out.writeInt(len);
out.write(t);
outBuffer.writeByte((byte) PREPARE_COMMIT);
outBuffer.writeVarInt(session.getId());
outBuffer.writeString(transaction);
if (outBuffer.length() >= PageStreamData.getCapacity(pageSize)) {
throw Message.getInvalidValueException(transaction, "transaction name (too long)");
}
flushOut();
// store it on a separate log page
pageOut.fillPage();
......@@ -532,19 +548,21 @@ public class PageLog {
session.addLogPos(logSectionId, logPos);
row.setLastLog(logSectionId, logPos);
logPos++;
data.reset();
int columns = row.getColumnCount();
data.writeVarInt(columns);
data.checkCapacity(row.getByteCount(data));
row.write(data);
out.write(add ? ADD : REMOVE);
out.writeInt(session.getId());
out.writeInt(tableId);
for (int i = 0; i < columns; i++) {
data.writeValue(row.getValue(i));
}
outBuffer.writeByte((byte) (add ? ADD : REMOVE));
outBuffer.writeVarInt(session.getId());
outBuffer.writeVarInt(tableId);
outBuffer.writeVarLong(row.getPos());
if (add) {
out.writeInt(row.getPos());
out.writeInt(data.length());
out.write(data.getBytes(), 0, data.length());
} else {
out.writeLong(row.getPos());
outBuffer.writeVarInt(data.length());
outBuffer.checkCapacity(data.length());
outBuffer.write(data.getBytes(), 0, data.length());
}
flushOut();
} catch (IOException e) {
......@@ -567,9 +585,9 @@ public class PageLog {
logPos++;
data.reset();
out.write(TRUNCATE);
out.writeInt(session.getId());
out.writeInt(tableId);
outBuffer.writeByte((byte) TRUNCATE);
outBuffer.writeVarInt(session.getId());
outBuffer.writeVarInt(tableId);
flushOut();
} catch (IOException e) {
throw Message.convertIOException(e, null);
......@@ -593,7 +611,7 @@ public class PageLog {
*/
void checkpoint() throws SQLException {
try {
out.write(CHECKPOINT);
outBuffer.writeByte((byte) CHECKPOINT);
flushOut();
} catch (IOException e) {
throw Message.convertIOException(e, null);
......@@ -674,7 +692,7 @@ public class PageLog {
pageOut.close();
pageOut = null;
}
out = null;
outBuffer = null;
}
/**
......@@ -750,16 +768,11 @@ public class PageLog {
void setInDoubtTransactionState(int sessionId, int pageId, boolean commit) throws SQLException {
PageStreamData d = (PageStreamData) store.getPage(pageId);
d.initWrite();
ByteArrayOutputStream buff = new ByteArrayOutputStream();
DataOutputStream o = new DataOutputStream(buff);
try {
o.write(commit ? COMMIT : ROLLBACK);
o.writeInt(sessionId);
} catch (IOException e) {
throw Message.convertIOException(e, "");
}
byte[] bytes = buff.toByteArray();
d.write(buff.toByteArray(), 0, bytes.length);
Data buff = store.createData();
buff.writeByte((byte) (commit ? COMMIT : ROLLBACK));
buff.writeVarInt(sessionId);
byte[] bytes = buff.getBytes();
d.write(bytes, 0, bytes.length);
bytes = new byte[d.getRemaining()];
d.write(bytes, 0, bytes.length);
d.write(null);
......
......@@ -79,16 +79,15 @@ import org.h2.value.ValueString;
*/
public class PageStore implements CacheWriter {
// TODO fix page format of ..
// TODO log: use varInt / varLong
// TODO check commit delay
// TODO record: replace getPos() with long getKey() in page store
// TODO long primary keys don't use delegating index yet (setPos(): int)
// TODO update: only log the key and changed values
// TODO implement checksum; 0 for empty pages
// TODO undo log: (option) fully compress empty pages
// TODO undo log: don't store empty space between head and data
// TODO undo log: lzf compression
// TODO long primary keys don't use delegating index yet (setPos(): int)
// TODO maybe remove parent pointer
// TODO update: only log the key and changed values
// TODO maybe remove some parent pointers
// TODO index creation: use less space (ordered, split at insertion point)
// TODO test running out of disk space (using a special file system)
......
......@@ -36,6 +36,7 @@ import org.h2.security.SHA256;
import org.h2.store.Data;
import org.h2.store.DataHandler;
import org.h2.store.DataPage;
import org.h2.store.DataReader;
import org.h2.store.DiskFile;
import org.h2.store.FileLister;
import org.h2.store.FileStore;
......@@ -56,7 +57,6 @@ import org.h2.util.ObjectArray;
import org.h2.util.RandomUtils;
import org.h2.util.SmallLRUCache;
import org.h2.util.StatementBuilder;
import org.h2.util.StringUtils;
import org.h2.util.TempFileDeleter;
import org.h2.util.Tool;
import org.h2.value.Value;
......@@ -899,7 +899,7 @@ public class Recover extends Tool implements DataHandler {
private void dumpPageLogStream(PrintWriter writer, int logFirstTrunkPage, int logFirstDataPage) throws IOException, SQLException {
Data s = Data.create(this, pageSize);
DataInputStream in = new DataInputStream(
DataReader in = new DataReader(
new PageInputStream(writer, this, store, logFirstTrunkPage, logFirstDataPage, pageSize)
);
while (true) {
......@@ -908,44 +908,41 @@ public class Recover extends Tool implements DataHandler {
break;
}
if (x == PageLog.UNDO) {
int pageId = in.readInt();
in.readFully(new byte[pageSize]);
int pageId = in.readVarInt();
in.readFully(new byte[pageSize], 0, pageSize);
writer.println("-- undo page " + pageId);
} else if (x == PageLog.ADD || x == PageLog.REMOVE) {
int sessionId = in.readInt();
setStorage(in.readInt());
int sessionId = in.readVarInt();
setStorage(in.readVarInt());
Row row = PageLog.readRow(in, s);
writer.println("-- session " + sessionId +
" table " + storageId +
" " + (x == PageLog.ADD ? "add" : "remove") + " " + row.toString());
} else if (x == PageLog.TRUNCATE) {
int sessionId = in.readInt();
setStorage(in.readInt());
int sessionId = in.readVarInt();
setStorage(in.readVarInt());
writer.println("-- session " + sessionId +
" table " + storageId +
" truncate");
} else if (x == PageLog.COMMIT) {
int sessionId = in.readInt();
int sessionId = in.readVarInt();
writer.println("-- commit " + sessionId);
} else if (x == PageLog.ROLLBACK) {
int sessionId = in.readInt();
int sessionId = in.readVarInt();
writer.println("-- rollback " + sessionId);
} else if (x == PageLog.PREPARE_COMMIT) {
int sessionId = in.readInt();
int len = in.readInt();
byte[] t = new byte[len];
in.readFully(t);
String transaction = StringUtils.utf8Decode(t);
int sessionId = in.readVarInt();
String transaction = in.readString();
writer.println("-- prepare commit " + sessionId + " " + transaction);
} else if (x == PageLog.NOOP) {
// nothing to do
} else if (x == PageLog.CHECKPOINT) {
writer.println("-- checkpoint");
} else if (x == PageLog.FREE_LOG) {
int size = in.readInt();
int size = in.readVarInt();
StringBuilder buff = new StringBuilder("-- free");
for (int i = 0; i < size; i++) {
buff.append(' ').append(in.readInt());
buff.append(' ').append(in.readVarInt());
}
writer.println(buff);
} else {
......
......@@ -43,9 +43,32 @@ public class TestTwoPhaseCommit extends TestBase {
prepare();
openWith(false);
test(false);
testLargeTransactionName();
deleteDb("twoPhaseCommit");
}
private void testLargeTransactionName() throws SQLException {
if (!config.pageStore) {
return;
}
Connection conn = getConnection("twoPhaseCommit");
Statement stat = conn.createStatement();
conn.setAutoCommit(false);
stat.execute("CREATE TABLE TEST2(ID INT)");
String name = "tx12345678";
try {
for (int i = 0;; i++) {
stat.execute("INSERT INTO TEST2 VALUES(1)");
name += "x";
stat.execute("PREPARE COMMIT " + name);
}
} catch (SQLException e) {
assertKnownException(e);
}
conn.close();
}
private void test(boolean rolledBack) throws SQLException {
Connection conn = getConnection("twoPhaseCommit");
Statement stat = conn.createStatement();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论