提交 6c5d1ec8 authored 作者: Thomas Mueller's avatar Thomas Mueller

When using temporary table, the database didn't shrink sometimes when closing.

上级 dadfe1d7
...@@ -55,7 +55,7 @@ public class InDoubtTransaction { ...@@ -55,7 +55,7 @@ public class InDoubtTransaction {
/** /**
* Change the state of this transaction. * Change the state of this transaction.
* This will also update the log file. * This will also update the transaction log.
* *
* @param state the new state * @param state the new state
*/ */
......
...@@ -24,7 +24,9 @@ public class PageInputStream extends InputStream { ...@@ -24,7 +24,9 @@ public class PageInputStream extends InputStream {
private PageStreamTrunk.Iterator trunkIterator; private PageStreamTrunk.Iterator trunkIterator;
private int dataPage; private int dataPage;
private PageStreamTrunk trunk; private PageStreamTrunk trunk;
private int trunkIndex;
private PageStreamData data; private PageStreamData data;
private int dataPos;
private boolean endOfFile; private boolean endOfFile;
private int remaining; private int remaining;
private byte[] buffer = new byte[1]; private byte[] buffer = new byte[1];
...@@ -73,8 +75,9 @@ public class PageInputStream extends InputStream { ...@@ -73,8 +75,9 @@ public class PageInputStream extends InputStream {
return -1; return -1;
} }
int l = Math.min(remaining, len); int l = Math.min(remaining, len);
data.read(buff, off, l); data.read(dataPos, buff, off, l);
remaining -= l; remaining -= l;
dataPos += l;
return l; return l;
} catch (DbException e) { } catch (DbException e) {
throw new EOFException(); throw new EOFException();
...@@ -89,15 +92,15 @@ public class PageInputStream extends InputStream { ...@@ -89,15 +92,15 @@ public class PageInputStream extends InputStream {
while (true) { while (true) {
if (trunk == null) { if (trunk == null) {
trunk = trunkIterator.next(); trunk = trunkIterator.next();
trunkIndex = 0;
logKey++; logKey++;
if (trunk == null || trunk.getLogKey() != logKey) { if (trunk == null || trunk.getLogKey() != logKey) {
endOfFile = true; endOfFile = true;
return; return;
} }
trunk.resetIndex();
} }
if (trunk != null) { if (trunk != null) {
next = trunk.getNextPageData(); next = trunk.getPageData(trunkIndex++);
if (next == -1) { if (next == -1) {
trunk = null; trunk = null;
} else if (dataPage == -1 || dataPage == next) { } else if (dataPage == -1 || dataPage == next) {
...@@ -118,8 +121,8 @@ public class PageInputStream extends InputStream { ...@@ -118,8 +121,8 @@ public class PageInputStream extends InputStream {
endOfFile = true; endOfFile = true;
return; return;
} }
data.initRead(); dataPos = data.getReadStart();
remaining = data.getRemaining(); remaining = store.getPageSize() - dataPos;
} }
/** /**
...@@ -141,9 +144,8 @@ public class PageInputStream extends InputStream { ...@@ -141,9 +144,8 @@ public class PageInputStream extends InputStream {
break; break;
} }
pages.set(t.getPos()); pages.set(t.getPos());
t.resetIndex(); for (int i = 0;; i++) {
while (true) { int n = t.getPageData(i);
int n = t.getNextPageData();
if (n == -1) { if (n == -1) {
break; break;
} }
......
...@@ -31,7 +31,7 @@ import org.h2.value.ValueNull; ...@@ -31,7 +31,7 @@ import org.h2.value.ValueNull;
* <li>type (0: no-op, 1: undo, 2: commit, ...)</li> * <li>type (0: no-op, 1: undo, 2: commit, ...)</li>
* <li>data</li> * <li>data</li>
* </ul> * </ul>
* The log file is split into sections. * The transaction log is split into sections.
* A checkpoint starts a new section. * A checkpoint starts a new section.
*/ */
public class PageLog { public class PageLog {
...@@ -533,7 +533,7 @@ public class PageLog { ...@@ -533,7 +533,7 @@ public class PageLog {
} }
// store it on a separate log page // store it on a separate log page
int pageSize = store.getPageSize(); int pageSize = store.getPageSize();
flushOut(); pageOut.flush();
pageOut.fillPage(); pageOut.fillPage();
Data buffer = getBuffer(); Data buffer = getBuffer();
buffer.writeByte((byte) PREPARE_COMMIT); buffer.writeByte((byte) PREPARE_COMMIT);
...@@ -636,7 +636,7 @@ public class PageLog { ...@@ -636,7 +636,7 @@ public class PageLog {
undo = new BitSet(); undo = new BitSet();
logSectionId++; logSectionId++;
logPos = 0; logPos = 0;
flushOut(); pageOut.flush();
pageOut.fillPage(); pageOut.fillPage();
int currentDataPage = pageOut.getCurrentDataPageId(); int currentDataPage = pageOut.getCurrentDataPageId();
logSectionPageMap.put(logSectionId, currentDataPage); logSectionPageMap.put(logSectionId, currentDataPage);
...@@ -684,15 +684,14 @@ public class PageLog { ...@@ -684,15 +684,14 @@ public class PageLog {
Page p = store.getPage(trunkPage); Page p = store.getPage(trunkPage);
PageStreamTrunk t = (PageStreamTrunk) p; PageStreamTrunk t = (PageStreamTrunk) p;
logKey = t.getLogKey(); logKey = t.getLogKey();
t.resetIndex();
if (t.contains(firstDataPageToKeep)) { if (t.contains(firstDataPageToKeep)) {
return t.getPos(); return t.getPos();
} }
trunkPage = t.getNextTrunk(); trunkPage = t.getNextTrunk();
IntArray list = new IntArray(); IntArray list = new IntArray();
list.add(t.getPos()); list.add(t.getPos());
while (true) { for (int i = 0;; i++) {
int next = t.getNextPageData(); int next = t.getPageData(i);
if (next == -1) { if (next == -1) {
break; break;
} }
...@@ -719,8 +718,8 @@ public class PageLog { ...@@ -719,8 +718,8 @@ public class PageLog {
* Check if the session committed after than the given position. * Check if the session committed after than the given position.
* *
* @param sessionId the session id * @param sessionId the session id
* @param logId the log file id * @param logId the log id
* @param pos the position in the log file * @param pos the position in the log
* @return true if it is committed * @return true if it is committed
*/ */
private boolean isSessionCommitted(int sessionId, int logId, int pos) { private boolean isSessionCommitted(int sessionId, int logId, int pos) {
...@@ -735,8 +734,8 @@ public class PageLog { ...@@ -735,8 +734,8 @@ public class PageLog {
* Set the last commit record for a session. * Set the last commit record for a session.
* *
* @param sessionId the session id * @param sessionId the session id
* @param logId the log file id * @param logId the log id
* @param pos the position in the log file * @param pos the position in the log
*/ */
private void setLastCommitForSession(int sessionId, int logId, int pos) { private void setLastCommitForSession(int sessionId, int logId, int pos) {
SessionState state = getOrAddSessionState(sessionId); SessionState state = getOrAddSessionState(sessionId);
......
...@@ -24,6 +24,7 @@ public class PageOutputStream { ...@@ -24,6 +24,7 @@ public class PageOutputStream {
private int trunkNext; private int trunkNext;
private IntArray reservedPages = new IntArray(); private IntArray reservedPages = new IntArray();
private PageStreamTrunk trunk; private PageStreamTrunk trunk;
private int trunkIndex;
private PageStreamData data; private PageStreamData data;
private int reserved; private int reserved;
private int remaining; private int remaining;
...@@ -79,7 +80,7 @@ public class PageOutputStream { ...@@ -79,7 +80,7 @@ public class PageOutputStream {
} }
private void initNextData() { private void initNextData() {
int nextData = trunk == null ? -1 : trunk.getNextPageData(); int nextData = trunk == null ? -1 : trunk.getPageData(trunkIndex++);
if (nextData == -1) { if (nextData == -1) {
int parent = trunkPageId; int parent = trunkPageId;
if (trunkNext != 0) { if (trunkNext != 0) {
...@@ -93,10 +94,11 @@ public class PageOutputStream { ...@@ -93,10 +94,11 @@ public class PageOutputStream {
trunkNext = reservedPages.get(len); trunkNext = reservedPages.get(len);
logKey++; logKey++;
trunk = PageStreamTrunk.create(store, parent, trunkPageId, trunkNext, logKey, pageIds); trunk = PageStreamTrunk.create(store, parent, trunkPageId, trunkNext, logKey, pageIds);
trunkIndex = 0;
pageCount++; pageCount++;
trunk.write(); trunk.write();
reservedPages.removeRange(0, len + 1); reservedPages.removeRange(0, len + 1);
nextData = trunk.getNextPageData(); nextData = trunk.getPageData(trunkIndex++);
} }
data = PageStreamData.create(store, nextData, trunk.getPos(), logKey); data = PageStreamData.create(store, nextData, trunk.getPos(), logKey);
pageCount++; pageCount++;
......
...@@ -12,6 +12,7 @@ import java.util.ArrayList; ...@@ -12,6 +12,7 @@ import java.util.ArrayList;
import java.util.BitSet; import java.util.BitSet;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.zip.CRC32; import java.util.zip.CRC32;
import org.h2.command.ddl.CreateTableData; import org.h2.command.ddl.CreateTableData;
import org.h2.constant.ErrorCode; import org.h2.constant.ErrorCode;
...@@ -323,7 +324,7 @@ public class PageStore implements CacheWriter { ...@@ -323,7 +324,7 @@ public class PageStore implements CacheWriter {
} }
/** /**
* Flush all pending changes to disk, and re-open the log file. * Flush all pending changes to disk, and switch the new transaction log.
*/ */
public void checkpoint() { public void checkpoint() {
trace.debug("checkpoint"); trace.debug("checkpoint");
...@@ -564,8 +565,6 @@ public class PageStore implements CacheWriter { ...@@ -564,8 +565,6 @@ public class PageStore implements CacheWriter {
} }
private void readStaticHeader() { private void readStaticHeader() {
long length = file.length();
database.notifyFileSize(length);
file.seek(FileStore.HEADER_LENGTH); file.seek(FileStore.HEADER_LENGTH);
Data page = Data.create(database, new byte[PAGE_SIZE_MIN - FileStore.HEADER_LENGTH]); Data page = Data.create(database, new byte[PAGE_SIZE_MIN - FileStore.HEADER_LENGTH]);
file.readFully(page.getBytes(), 0, PAGE_SIZE_MIN - FileStore.HEADER_LENGTH); file.readFully(page.getBytes(), 0, PAGE_SIZE_MIN - FileStore.HEADER_LENGTH);
...@@ -1056,11 +1055,13 @@ public class PageStore implements CacheWriter { ...@@ -1056,11 +1055,13 @@ public class PageStore implements CacheWriter {
} }
PageDataIndex systemTable = (PageDataIndex) metaObjects.get(0); PageDataIndex systemTable = (PageDataIndex) metaObjects.get(0);
isNew = systemTable == null; isNew = systemTable == null;
for (Index openIndex : metaObjects.values()) { for (Iterator<PageIndex> it = metaObjects.values().iterator(); it.hasNext();) {
Index openIndex = it.next();
if (openIndex.getTable().isTemporary()) { if (openIndex.getTable().isTemporary()) {
openIndex.truncate(systemSession); openIndex.truncate(systemSession);
openIndex.remove(systemSession); openIndex.remove(systemSession);
removeMetaIndex(openIndex, systemSession); removeMetaIndex(openIndex, systemSession);
it.remove();
} else { } else {
openIndex.close(systemSession); openIndex.close(systemSession);
} }
...@@ -1414,9 +1415,9 @@ public class PageStore implements CacheWriter { ...@@ -1414,9 +1415,9 @@ public class PageStore implements CacheWriter {
} }
/** /**
* Set the maximum log file size in megabytes. * Set the maximum transaction log size in megabytes.
* *
* @param maxSize the new maximum log file size * @param maxSize the new maximum log size
*/ */
public void setMaxLogSize(long maxSize) { public void setMaxLogSize(long maxSize) {
this.maxLogSize = maxSize; this.maxLogSize = maxSize;
...@@ -1586,20 +1587,4 @@ public class PageStore implements CacheWriter { ...@@ -1586,20 +1587,4 @@ public class PageStore implements CacheWriter {
return changeCount; return changeCount;
} }
int getLogFirstTrunkPage() {
return logFirstTrunkPage;
}
int getLogKey() {
return logKey;
}
public PageLog getLog() {
return log;
}
int getLogFirstDataPage() {
return logFirstDataPage;
}
} }
...@@ -118,12 +118,13 @@ public class PageStreamData extends Page { ...@@ -118,12 +118,13 @@ public class PageStreamData extends Page {
/** /**
* Read the next bytes from the buffer. * Read the next bytes from the buffer.
* *
* @param startPos the position in the data page
* @param buff the target buffer * @param buff the target buffer
* @param off the offset in the target buffer * @param off the offset in the target buffer
* @param len the number of bytes to read * @param len the number of bytes to read
*/ */
void read(byte[] buff, int off, int len) { void read(int startPos, byte[] buff, int off, int len) {
data.read(buff, off, len); System.arraycopy(data.getBytes(), startPos, buff, off, len);
} }
/** /**
...@@ -144,14 +145,6 @@ public class PageStreamData extends Page { ...@@ -144,14 +145,6 @@ public class PageStreamData extends Page {
return store.getPageSize() >> 2; return store.getPageSize() >> 2;
} }
/**
* Reset the index.
*/
void initRead() {
data.setPos(DATA_START);
remaining = store.getPageSize() - DATA_START;
}
public void moveTo(Session session, int newPos) { public void moveTo(Session session, int newPos) {
// not required // not required
} }
...@@ -168,4 +161,8 @@ public class PageStreamData extends Page { ...@@ -168,4 +161,8 @@ public class PageStreamData extends Page {
return true; return true;
} }
public int getReadStart() {
return DATA_START;
}
} }
\ No newline at end of file
...@@ -42,7 +42,6 @@ public class PageStreamTrunk extends Page { ...@@ -42,7 +42,6 @@ public class PageStreamTrunk extends Page {
private int[] pageIds; private int[] pageIds;
private int pageCount; private int pageCount;
private Data data; private Data data;
private int index;
private PageStreamTrunk(PageStore store, int parent, int pageId, int next, int logKey, int[] pageIds) { private PageStreamTrunk(PageStore store, int parent, int pageId, int next, int logKey, int[] pageIds) {
setPos(pageId); setPos(pageId);
...@@ -107,13 +106,12 @@ public class PageStreamTrunk extends Page { ...@@ -107,13 +106,12 @@ public class PageStreamTrunk extends Page {
} }
/** /**
* Reset the read/write index. * Get the data page id at the given position.
*
* @param index the index (0, 1, ...)
* @return the value, or -1 if the index is too large
*/ */
void resetIndex() { int getPageData(int index) {
index = 0;
}
int getNextPageData() {
if (index >= pageIds.length) { if (index >= pageIds.length) {
return -1; return -1;
} }
......
...@@ -19,7 +19,7 @@ class SessionState { ...@@ -19,7 +19,7 @@ class SessionState {
public int sessionId; public int sessionId;
/** /**
* The last log file id where a commit for this session is found. * The last log id where a commit for this session is found.
*/ */
public int lastCommitLog; public int lastCommitLog;
...@@ -36,8 +36,8 @@ class SessionState { ...@@ -36,8 +36,8 @@ class SessionState {
/** /**
* Check if this session state is already committed at this point. * Check if this session state is already committed at this point.
* *
* @param logId the log file id * @param logId the log id
* @param pos the position in the log file * @param pos the position in the log
* @return true if it is committed * @return true if it is committed
*/ */
public boolean isCommitted(int logId, int pos) { public boolean isCommitted(int logId, int pos) {
......
...@@ -14,8 +14,8 @@ import org.h2.message.Trace; ...@@ -14,8 +14,8 @@ import org.h2.message.Trace;
import org.h2.message.TraceSystem; import org.h2.message.TraceSystem;
/** /**
* The writer thread is responsible to flush the transaction log file from time * The writer thread is responsible to flush the transaction transaction log
* to time. * from time to time.
*/ */
public class WriterThread implements Runnable { public class WriterThread implements Runnable {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论