提交 cd8eee34 authored 作者: Thomas Mueller's avatar Thomas Mueller

Page store: when using a very small page size (128 bytes), writing a large row…

Page store: when using a very small page size (128 bytes), writing a large row could result in an endless recursion.
上级 6e0434cd
......@@ -8,10 +8,10 @@ package org.h2.store;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.sql.SQLException;
import java.util.HashMap;
import org.h2.compress.CompressLZF;
import org.h2.constant.ErrorCode;
import org.h2.constant.SysProperties;
import org.h2.engine.Session;
import org.h2.log.InDoubtTransaction;
......@@ -119,9 +119,8 @@ public class PageLog {
private int pos;
private Trace trace;
private Data outBuffer;
private Data writeBuffer;
private PageInputStream pageIn;
private OutputStream pageBuffer;
private PageOutputStream pageOut;
private DataReader in;
private int firstTrunkPage;
......@@ -184,19 +183,28 @@ public class PageLog {
logKey++;
pageOut = new PageOutputStream(store, firstTrunkPage, undoAll, logKey, atEnd);
pageOut.reserve(1);
// TODO maybe buffer to improve speed
pageBuffer = pageOut;
// pageBuffer = new BufferedOutputStream(pageOut, 8 * 1024);
store.setLogFirstPage(logKey, firstTrunkPage, pageOut.getCurrentDataPageId());
outBuffer = store.createData();
writeBuffer = store.createData();
}
/**
* Free up all pages allocated by the log.
*/
void free() throws SQLException {
while (firstTrunkPage != 0) {
PageStreamTrunk t = (PageStreamTrunk) store.getPage(firstTrunkPage);
while (firstTrunkPage != 0 && firstTrunkPage < store.getPageCount()) {
PageStreamTrunk t = null;
try {
Page p = store.getPage(firstTrunkPage);
if (p instanceof PageStreamTrunk) {
t = (PageStreamTrunk) p;
}
} catch (SQLException e) {
if (e.getErrorCode() != ErrorCode.FILE_CORRUPTED_1) {
// wrong checksum means end of stream
throw e;
}
}
if (t == null) {
store.free(firstTrunkPage, false);
// EOF
......@@ -437,26 +445,27 @@ public class PageLog {
}
undo.set(pageId);
undoAll.set(pageId);
outBuffer.writeByte((byte) UNDO);
outBuffer.writeVarInt(pageId);
Data buffer = getBuffer();
buffer.writeByte((byte) UNDO);
buffer.writeVarInt(pageId);
int pageSize = store.getPageSize();
if (COMPRESS_UNDO) {
int size = compress.compress(page.getBytes(), pageSize, compressBuffer, 0);
if (size < pageSize) {
outBuffer.writeVarInt(size);
outBuffer.checkCapacity(size);
outBuffer.write(compressBuffer, 0, size);
buffer.writeVarInt(size);
buffer.checkCapacity(size);
buffer.write(compressBuffer, 0, size);
} else {
outBuffer.writeVarInt(0);
outBuffer.checkCapacity(pageSize);
outBuffer.write(page.getBytes(), 0, pageSize);
buffer.writeVarInt(0);
buffer.checkCapacity(pageSize);
buffer.write(page.getBytes(), 0, pageSize);
}
} else {
outBuffer.writeVarInt(0);
outBuffer.checkCapacity(pageSize);
outBuffer.write(page.getBytes(), 0, pageSize);
buffer.writeVarInt(0);
buffer.checkCapacity(pageSize);
buffer.write(page.getBytes(), 0, pageSize);
}
flushOut();
write(buffer);
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
......@@ -467,20 +476,21 @@ public class PageLog {
if (trace.isDebugEnabled()) {
trace.debug("log frees " + pages.get(0) + ".." + pages.get(pages.size() - 1));
}
outBuffer.writeByte((byte) FREE_LOG);
outBuffer.writeVarInt(pages.size());
Data buffer = getBuffer();
buffer.writeByte((byte) FREE_LOG);
buffer.writeVarInt(pages.size());
for (int i = 0; i < pages.size(); i++) {
outBuffer.writeVarInt(pages.get(i));
buffer.writeVarInt(pages.get(i));
}
flushOut();
write(buffer);
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
private void flushOut() throws IOException {
pageBuffer.write(outBuffer.getBytes(), 0, outBuffer.length());
outBuffer.reset();
private void write(Data data) throws IOException {
pageOut.write(data.getBytes(), 0, data.length());
data.reset();
}
/**
......@@ -498,9 +508,10 @@ public class PageLog {
// database already closed
return;
}
outBuffer.writeByte((byte) COMMIT);
outBuffer.writeVarInt(sessionId);
flushOut();
Data buffer = getBuffer();
buffer.writeByte((byte) COMMIT);
buffer.writeVarInt(sessionId);
write(buffer);
if (log.getFlushOnEachCommit()) {
flush();
}
......@@ -527,17 +538,18 @@ public class PageLog {
}
// store it on a separate log page
int pageSize = store.getPageSize();
flushBuffer();
flushOut();
pageOut.fillPage();
outBuffer.writeByte((byte) PREPARE_COMMIT);
outBuffer.writeVarInt(session.getId());
outBuffer.writeString(transaction);
if (outBuffer.length() >= PageStreamData.getCapacity(pageSize)) {
Data buffer = getBuffer();
buffer.writeByte((byte) PREPARE_COMMIT);
buffer.writeVarInt(session.getId());
buffer.writeString(transaction);
if (buffer.length() >= PageStreamData.getCapacity(pageSize)) {
throw Message.getInvalidValueException(transaction, "transaction name (too long)");
}
flushOut();
write(buffer);
// store it on a separate log page
flushBuffer();
flushOut();
pageOut.fillPage();
if (log.getFlushOnEachCommit()) {
flush();
......@@ -571,16 +583,17 @@ public class PageLog {
for (int i = 0; i < columns; i++) {
data.writeValue(row.getValue(i));
}
outBuffer.writeByte((byte) (add ? ADD : REMOVE));
outBuffer.writeVarInt(session.getId());
outBuffer.writeVarInt(tableId);
outBuffer.writeVarLong(row.getKey());
Data buffer = getBuffer();
buffer.writeByte((byte) (add ? ADD : REMOVE));
buffer.writeVarInt(session.getId());
buffer.writeVarInt(tableId);
buffer.writeVarLong(row.getKey());
if (add) {
outBuffer.writeVarInt(data.length());
outBuffer.checkCapacity(data.length());
outBuffer.write(data.getBytes(), 0, data.length());
buffer.writeVarInt(data.length());
buffer.checkCapacity(data.length());
buffer.write(data.getBytes(), 0, data.length());
}
flushOut();
write(buffer);
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
......@@ -599,12 +612,11 @@ public class PageLog {
}
session.addLogPos(logSectionId, logPos);
logPos++;
data.reset();
outBuffer.writeByte((byte) TRUNCATE);
outBuffer.writeVarInt(session.getId());
outBuffer.writeVarInt(tableId);
flushOut();
Data buffer = getBuffer();
buffer.writeByte((byte) TRUNCATE);
buffer.writeVarInt(session.getId());
buffer.writeVarInt(tableId);
write(buffer);
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
......@@ -615,13 +627,8 @@ public class PageLog {
* Flush the transaction log.
*/
void flush() throws SQLException {
try {
if (pageOut != null) {
flushBuffer();
pageOut.flush();
}
} catch (IOException e) {
throw Message.convertIOException(e, null);
if (pageOut != null) {
flushOut();
}
}
......@@ -630,15 +637,16 @@ public class PageLog {
*/
void checkpoint() throws SQLException {
try {
outBuffer.writeByte((byte) CHECKPOINT);
flushOut();
Data buffer = getBuffer();
buffer.writeByte((byte) CHECKPOINT);
write(buffer);
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
undo = new BitField();
logSectionId++;
logPos = 0;
flushBuffer();
flushOut();
pageOut.fillPage();
int currentDataPage = pageOut.getCurrentDataPageId();
logSectionPageMap.put(logSectionId, currentDataPage);
......@@ -683,7 +691,8 @@ public class PageLog {
private int removeUntil(int firstTrunkPage, int firstDataPageToKeep) throws SQLException {
trace.debug("log.removeUntil " + firstDataPageToKeep);
while (true) {
PageStreamTrunk t = (PageStreamTrunk) store.getPage(firstTrunkPage);
Page p = store.getPage(firstTrunkPage);
PageStreamTrunk t = (PageStreamTrunk) p;
logKey = t.getLogKey();
t.resetIndex();
if (t.contains(firstDataPageToKeep)) {
......@@ -713,7 +722,7 @@ public class PageLog {
pageOut.close();
pageOut = null;
}
outBuffer = null;
writeBuffer = null;
}
/**
......@@ -806,12 +815,20 @@ public class PageLog {
sessionStates = New.hashMap();
}
private void flushBuffer() throws SQLException {
private void flushOut() throws SQLException {
try {
pageBuffer.flush();
pageOut.flush();
} catch (IOException e) {
throw Message.convertIOException(e, null);
}
}
private Data getBuffer() {
if (writeBuffer.length() == 0) {
return writeBuffer;
}
//new Error("recurse").printStackTrace();
return store.createData();
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论