提交 fbe949a4 authored 作者: Thomas Mueller's avatar Thomas Mueller

Improved performance.

上级 6086bebb
......@@ -243,18 +243,16 @@ public class PageStore implements CacheWriter {
* @param out the output stream
* @return the new position, or -1 if there is no more data to copy
*/
public int copyDirect(int pageId, OutputStream out) throws IOException {
synchronized (database) {
byte[] buffer = new byte[pageSize];
if (pageId >= pageCount) {
return -1;
}
file.seek((long) pageId << pageSizeShift);
file.readFullyDirect(buffer, 0, pageSize);
readCount++;
out.write(buffer, 0, pageSize);
return pageId + 1;
public synchronized int copyDirect(int pageId, OutputStream out) throws IOException {
byte[] buffer = new byte[pageSize];
if (pageId >= pageCount) {
return -1;
}
file.seek((long) pageId << pageSizeShift);
file.readFullyDirect(buffer, 0, pageSize);
readCount++;
out.write(buffer, 0, pageSize);
return pageId + 1;
}
/**
......@@ -395,50 +393,48 @@ public class PageStore implements CacheWriter {
/**
* Flush all pending changes to disk, and switch the new transaction log.
*/
public void checkpoint() {
public synchronized void checkpoint() {
trace.debug("checkpoint");
if (log == null || database.isReadOnly()) {
// the file was never fully opened
return;
}
synchronized (database) {
database.checkPowerOff();
writeIndexRowCounts();
database.checkPowerOff();
writeIndexRowCounts();
log.checkpoint();
writeBack();
log.checkpoint();
writeBack();
int firstUncommittedSection = getFirstUncommittedSection();
int firstUncommittedSection = getFirstUncommittedSection();
log.removeUntil(firstUncommittedSection);
log.removeUntil(firstUncommittedSection);
// write back the free list
writeBack();
// write back the free list
writeBack();
// ensure the free list is backed up again
log.checkpoint();
// ensure the free list is backed up again
log.checkpoint();
if (trace.isDebugEnabled()) {
trace.debug("writeFree");
}
byte[] test = new byte[16];
byte[] empty = new byte[pageSize];
for (int i = PAGE_ID_FREE_LIST_ROOT; i < pageCount; i++) {
if (isUsed(i)) {
freed.clear(i);
} else if (!freed.get(i)) {
if (trace.isDebugEnabled()) {
trace.debug("free {0}", i);
}
if (trace.isDebugEnabled()) {
trace.debug("writeFree");
}
byte[] test = new byte[16];
byte[] empty = new byte[pageSize];
for (int i = PAGE_ID_FREE_LIST_ROOT; i < pageCount; i++) {
if (isUsed(i)) {
freed.clear(i);
} else if (!freed.get(i)) {
if (trace.isDebugEnabled()) {
trace.debug("free {0}", i);
}
file.seek((long) i << pageSizeShift);
file.readFully(test, 0, 16);
if (test[0] != 0) {
file.seek((long) i << pageSizeShift);
file.readFully(test, 0, 16);
if (test[0] != 0) {
file.seek((long) i << pageSizeShift);
file.write(empty, 0, pageSize);
writeCount++;
}
freed.set(i);
file.write(empty, 0, pageSize);
writeCount++;
}
freed.set(i);
}
}
}
......@@ -495,7 +491,7 @@ public class PageStore implements CacheWriter {
for (int x = lastUsed, j = 0; x > MIN_PAGE_COUNT && j < maxMove; x -= blockSize) {
for (int full = x - blockSize + 1; full <= x; full++) {
if (full > MIN_PAGE_COUNT && isUsed(full)) {
synchronized (database) {
synchronized (this) {
firstFree = getFirstFree(firstFree);
if (firstFree == -1 || firstFree >= full) {
j = maxMove;
......@@ -695,111 +691,109 @@ public class PageStore implements CacheWriter {
* @param pageId the page id
* @return the page
*/
public Page getPage(int pageId) {
synchronized (database) {
Page p = (Page) cache.get(pageId);
if (p != null) {
return p;
}
public synchronized Page getPage(int pageId) {
Page p = (Page) cache.get(pageId);
if (p != null) {
return p;
}
Data data = createData();
readPage(pageId, data);
int type = data.readByte();
if (type == Page.TYPE_EMPTY) {
return null;
Data data = createData();
readPage(pageId, data);
int type = data.readByte();
if (type == Page.TYPE_EMPTY) {
return null;
}
data.readShortInt();
data.readInt();
if (!checksumTest(data.getBytes(), pageId, pageSize)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "wrong checksum");
}
switch (type & ~Page.FLAG_LAST) {
case Page.TYPE_FREE_LIST:
p = PageFreeList.read(this, data, pageId);
break;
case Page.TYPE_DATA_LEAF: {
int indexId = data.readVarInt();
PageIndex idx = metaObjects.get(indexId);
if (idx == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
data.readShortInt();
data.readInt();
if (!checksumTest(data.getBytes(), pageId, pageSize)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "wrong checksum");
if (!(idx instanceof PageDataIndex)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "not a data index " + indexId + " " + idx);
}
switch (type & ~Page.FLAG_LAST) {
case Page.TYPE_FREE_LIST:
p = PageFreeList.read(this, data, pageId);
break;
case Page.TYPE_DATA_LEAF: {
int indexId = data.readVarInt();
PageIndex idx = metaObjects.get(indexId);
if (idx == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
if (!(idx instanceof PageDataIndex)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "not a data index " + indexId + " " + idx);
}
PageDataIndex index = (PageDataIndex) idx;
if (statistics != null) {
statisticsIncrement(index.getTable().getName() + "." + index.getName() + " read");
}
p = PageDataLeaf.read(index, data, pageId);
break;
PageDataIndex index = (PageDataIndex) idx;
if (statistics != null) {
statisticsIncrement(index.getTable().getName() + "." + index.getName() + " read");
}
case Page.TYPE_DATA_NODE: {
int indexId = data.readVarInt();
PageIndex idx = metaObjects.get(indexId);
if (idx == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
if (!(idx instanceof PageDataIndex)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "not a data index " + indexId + " " + idx);
}
PageDataIndex index = (PageDataIndex) idx;
if (statistics != null) {
statisticsIncrement(index.getTable().getName() + "." + index.getName() + " read");
}
p = PageDataNode.read(index, data, pageId);
break;
p = PageDataLeaf.read(index, data, pageId);
break;
}
case Page.TYPE_DATA_NODE: {
int indexId = data.readVarInt();
PageIndex idx = metaObjects.get(indexId);
if (idx == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
case Page.TYPE_DATA_OVERFLOW: {
p = PageDataOverflow.read(this, data, pageId);
if (statistics != null) {
statisticsIncrement("overflow read");
}
break;
if (!(idx instanceof PageDataIndex)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "not a data index " + indexId + " " + idx);
}
case Page.TYPE_BTREE_LEAF: {
int indexId = data.readVarInt();
PageIndex idx = metaObjects.get(indexId);
if (idx == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
if (!(idx instanceof PageBtreeIndex)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "not a btree index " + indexId + " " + idx);
}
PageBtreeIndex index = (PageBtreeIndex) idx;
if (statistics != null) {
statisticsIncrement(index.getTable().getName() + "." + index.getName() + " read");
}
p = PageBtreeLeaf.read(index, data, pageId);
break;
PageDataIndex index = (PageDataIndex) idx;
if (statistics != null) {
statisticsIncrement(index.getTable().getName() + "." + index.getName() + " read");
}
case Page.TYPE_BTREE_NODE: {
int indexId = data.readVarInt();
PageIndex idx = metaObjects.get(indexId);
if (idx == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
if (!(idx instanceof PageBtreeIndex)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "not a btree index " + indexId + " " + idx);
}
PageBtreeIndex index = (PageBtreeIndex) idx;
if (statistics != null) {
statisticsIncrement(index.getTable().getName() + "." + index.getName() + " read");
}
p = PageBtreeNode.read(index, data, pageId);
break;
p = PageDataNode.read(index, data, pageId);
break;
}
case Page.TYPE_DATA_OVERFLOW: {
p = PageDataOverflow.read(this, data, pageId);
if (statistics != null) {
statisticsIncrement("overflow read");
}
case Page.TYPE_STREAM_TRUNK:
p = PageStreamTrunk.read(this, data, pageId);
break;
case Page.TYPE_STREAM_DATA:
p = PageStreamData.read(this, data, pageId);
break;
default:
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "page=" + pageId + " type=" + type);
break;
}
case Page.TYPE_BTREE_LEAF: {
int indexId = data.readVarInt();
PageIndex idx = metaObjects.get(indexId);
if (idx == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
cache.put(p);
return p;
if (!(idx instanceof PageBtreeIndex)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "not a btree index " + indexId + " " + idx);
}
PageBtreeIndex index = (PageBtreeIndex) idx;
if (statistics != null) {
statisticsIncrement(index.getTable().getName() + "." + index.getName() + " read");
}
p = PageBtreeLeaf.read(index, data, pageId);
break;
}
case Page.TYPE_BTREE_NODE: {
int indexId = data.readVarInt();
PageIndex idx = metaObjects.get(indexId);
if (idx == null) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "index not found " + indexId);
}
if (!(idx instanceof PageBtreeIndex)) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "not a btree index " + indexId + " " + idx);
}
PageBtreeIndex index = (PageBtreeIndex) idx;
if (statistics != null) {
statisticsIncrement(index.getTable().getName() + "." + index.getName() + " read");
}
p = PageBtreeNode.read(index, data, pageId);
break;
}
case Page.TYPE_STREAM_TRUNK:
p = PageStreamTrunk.read(this, data, pageId);
break;
case Page.TYPE_STREAM_DATA:
p = PageStreamData.read(this, data, pageId);
break;
default:
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, "page=" + pageId + " type=" + type);
}
cache.put(p);
return p;
}
private int getFirstUncommittedSection() {
......@@ -937,41 +931,35 @@ public class PageStore implements CacheWriter {
/**
* Close the file without further writing.
*/
public void close() {
public synchronized void close() {
trace.debug("close");
synchronized (database) {
if (log != null) {
log.close();
log = null;
}
if (file != null) {
try {
file.releaseLock();
file.close();
} finally {
file = null;
}
if (log != null) {
log.close();
log = null;
}
if (file != null) {
try {
file.releaseLock();
file.close();
} finally {
file = null;
}
}
}
public void flushLog() {
public synchronized void flushLog() {
if (file != null) {
synchronized (database) {
log.flush();
}
log.flush();
}
}
/**
* Flush the transaction log and sync the file.
*/
public void sync() {
public synchronized void sync() {
if (file != null) {
synchronized (database) {
log.flush();
file.sync();
}
log.flush();
file.sync();
}
}
......@@ -979,15 +967,13 @@ public class PageStore implements CacheWriter {
return trace;
}
public void writeBack(CacheObject obj) {
public synchronized void writeBack(CacheObject obj) {
Page record = (Page) obj;
if (trace.isDebugEnabled()) {
trace.debug("writeBack {0}", record);
}
synchronized (database) {
record.write();
record.setChanged(false);
}
record.write();
record.setChanged(false);
}
/**
......@@ -996,21 +982,19 @@ public class PageStore implements CacheWriter {
* @param page the page
* @param old the old data (if known) or null
*/
public void logUndo(Page page, Data old) {
public synchronized void logUndo(Page page, Data old) {
if (logMode == LOG_MODE_OFF) {
return;
}
synchronized (database) {
checkOpen();
database.checkWritingAllowed();
if (!recoveryRunning) {
int pos = page.getPos();
if (!log.getUndo(pos)) {
if (old == null) {
old = readPage(pos);
}
log.addUndo(pos, old);
checkOpen();
database.checkWritingAllowed();
if (!recoveryRunning) {
int pos = page.getPos();
if (!log.getUndo(pos)) {
if (old == null) {
old = readPage(pos);
}
log.addUndo(pos, old);
}
}
}
......@@ -1020,26 +1004,24 @@ public class PageStore implements CacheWriter {
*
* @param page the page
*/
public void update(Page page) {
synchronized (database) {
if (trace.isDebugEnabled()) {
if (!page.isChanged()) {
trace.debug("updateRecord {0}", page.toString());
}
public synchronized void update(Page page) {
if (trace.isDebugEnabled()) {
if (!page.isChanged()) {
trace.debug("updateRecord {0}", page.toString());
}
checkOpen();
database.checkWritingAllowed();
page.setChanged(true);
int pos = page.getPos();
if (SysProperties.CHECK && !recoveryRunning) {
// ensure the undo entry is already written
if (logMode != LOG_MODE_OFF) {
log.addUndo(pos, null);
}
}
checkOpen();
database.checkWritingAllowed();
page.setChanged(true);
int pos = page.getPos();
if (SysProperties.CHECK && !recoveryRunning) {
// ensure the undo entry is already written
if (logMode != LOG_MODE_OFF) {
log.addUndo(pos, null);
}
allocatePage(pos);
cache.update(pos, page);
}
allocatePage(pos);
cache.update(pos, page);
}
private int getFreeListId(int pageId) {
......@@ -1050,7 +1032,7 @@ public class PageStore implements CacheWriter {
return getFreeList(getFreeListId(pageId));
}
private PageFreeList getFreeList(int i) {
private synchronized PageFreeList getFreeList(int i) {
PageFreeList list = null;
if (i < freeLists.size()) {
list = freeLists.get(i);
......@@ -1058,24 +1040,22 @@ public class PageStore implements CacheWriter {
return list;
}
}
synchronized (database) {
int p = PAGE_ID_FREE_LIST_ROOT + i * freeListPagesPerList;
while (p >= pageCount) {
increaseFileSize();
}
if (p < pageCount) {
list = (PageFreeList) getPage(p);
}
if (list == null) {
list = PageFreeList.create(this, p);
cache.put(list);
}
while (freeLists.size() <= i) {
freeLists.add(null);
}
freeLists.set(i, list);
return list;
int p = PAGE_ID_FREE_LIST_ROOT + i * freeListPagesPerList;
while (p >= pageCount) {
increaseFileSize();
}
if (p < pageCount) {
list = (PageFreeList) getPage(p);
}
if (list == null) {
list = PageFreeList.create(this, p);
cache.put(list);
}
while (freeLists.size() <= i) {
freeLists.add(null);
}
freeLists.set(i, list);
return list;
}
private void freePage(int pageId) {
......@@ -1129,25 +1109,23 @@ public class PageStore implements CacheWriter {
return pos;
}
private int allocatePage(BitField exclude, int first) {
private synchronized int allocatePage(BitField exclude, int first) {
int page;
synchronized (database) {
// TODO could remember the first possible free list page
for (int i = 0;; i++) {
PageFreeList list = getFreeList(i);
page = list.allocate(exclude, first);
if (page >= 0) {
break;
}
}
while (page >= pageCount) {
increaseFileSize();
}
if (trace.isDebugEnabled()) {
// trace.debug("allocatePage " + pos);
// TODO could remember the first possible free list page
for (int i = 0;; i++) {
PageFreeList list = getFreeList(i);
page = list.allocate(exclude, first);
if (page >= 0) {
break;
}
return page;
}
while (page >= pageCount) {
increaseFileSize();
}
if (trace.isDebugEnabled()) {
// trace.debug("allocatePage " + pos);
}
return page;
}
private void increaseFileSize() {
......@@ -1185,27 +1163,25 @@ public class PageStore implements CacheWriter {
* @param pageId the page id
* @param undo if the undo record must have been written
*/
void free(int pageId, boolean undo) {
synchronized void free(int pageId, boolean undo) {
if (trace.isDebugEnabled()) {
// trace.debug("free " + pageId + " " + undo);
}
synchronized (database) {
cache.remove(pageId);
if (SysProperties.CHECK && !recoveryRunning && undo) {
// ensure the undo entry is already written
if (logMode != LOG_MODE_OFF) {
log.addUndo(pageId, null);
}
cache.remove(pageId);
if (SysProperties.CHECK && !recoveryRunning && undo) {
// ensure the undo entry is already written
if (logMode != LOG_MODE_OFF) {
log.addUndo(pageId, null);
}
freePage(pageId);
if (recoveryRunning) {
writePage(pageId, createData());
if (reservedPages != null && reservedPages.containsKey(pageId)) {
// re-allocate the page if it is used later on again
int latestPos = reservedPages.get(pageId);
if (latestPos > log.getLogPos()) {
allocatePage(pageId);
}
}
freePage(pageId);
if (recoveryRunning) {
writePage(pageId, createData());
if (reservedPages != null && reservedPages.containsKey(pageId)) {
// re-allocate the page if it is used later on again
int latestPos = reservedPages.get(pageId);
if (latestPos > log.getLogPos()) {
allocatePage(pageId);
}
}
}
......@@ -1217,15 +1193,13 @@ public class PageStore implements CacheWriter {
*
* @param pageId the page id
*/
void freeUnused(int pageId) {
synchronized void freeUnused(int pageId) {
if (trace.isDebugEnabled()) {
trace.debug("freeUnused {0}", pageId);
}
synchronized (database) {
cache.remove(pageId);
freePage(pageId);
freed.set(pageId);
}
cache.remove(pageId);
freePage(pageId);
freed.set(pageId);
}
/**
......@@ -1255,21 +1229,19 @@ public class PageStore implements CacheWriter {
* @param pos the page id
* @param page the page
*/
void readPage(int pos, Data page) {
synchronized void readPage(int pos, Data page) {
if (recordPageReads) {
if (pos >= MIN_PAGE_COUNT && recordedPagesIndex.get(pos) == IntIntHashMap.NOT_FOUND) {
recordedPagesIndex.put(pos, recordedPagesList.size());
recordedPagesList.add(pos);
}
}
synchronized (database) {
if (pos < 0 || pos >= pageCount) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, pos + " of " + pageCount);
}
file.seek((long) pos << pageSizeShift);
file.readFully(page.getBytes(), 0, pageSize);
readCount++;
if (pos < 0 || pos >= pageCount) {
throw DbException.get(ErrorCode.FILE_CORRUPTED_1, pos + " of " + pageCount);
}
file.seek((long) pos << pageSizeShift);
file.readFully(page.getBytes(), 0, pageSize);
readCount++;
}
/**
......@@ -1296,7 +1268,7 @@ public class PageStore implements CacheWriter {
* @param pageId the page id
* @param data the data
*/
public void writePage(int pageId, Data data) {
public synchronized void writePage(int pageId, Data data) {
if (pageId <= 0) {
DbException.throwInternalError("write to page " + pageId);
}
......@@ -1309,11 +1281,9 @@ public class PageStore implements CacheWriter {
}
}
checksumSet(bytes, pageId);
synchronized (database) {
file.seek((long) pageId << pageSizeShift);
file.write(bytes, 0, pageSize);
writeCount++;
}
file.seek((long) pageId << pageSizeShift);
file.write(bytes, 0, pageSize);
writeCount++;
}
/**
......@@ -1321,10 +1291,8 @@ public class PageStore implements CacheWriter {
*
* @param pageId the page id
*/
public void removeRecord(int pageId) {
synchronized (database) {
cache.remove(pageId);
}
public synchronized void removeRecord(int pageId) {
cache.remove(pageId);
}
Database getDatabase() {
......@@ -1401,12 +1369,10 @@ public class PageStore implements CacheWriter {
* @param row the row to add
* @param add true if the row is added, false if it is removed
*/
public void logAddOrRemoveRow(Session session, int tableId, Row row, boolean add) {
public synchronized void logAddOrRemoveRow(Session session, int tableId, Row row, boolean add) {
if (logMode != LOG_MODE_OFF) {
if (!recoveryRunning) {
synchronized (database) {
log.logAddOrRemoveRow(session, tableId, row, add);
}
log.logAddOrRemoveRow(session, tableId, row, add);
}
}
}
......@@ -1416,14 +1382,12 @@ public class PageStore implements CacheWriter {
*
* @param session the session
*/
public void commit(Session session) {
synchronized (database) {
checkOpen();
log.commit(session.getId());
if (log.getSize() - logSizeBase > maxLogSize) {
checkpoint();
logSizeBase = log.getSize();
}
public synchronized void commit(Session session) {
checkOpen();
log.commit(session.getId());
if (log.getSize() - logSizeBase > maxLogSize) {
checkpoint();
logSizeBase = log.getSize();
}
}
......@@ -1433,10 +1397,8 @@ public class PageStore implements CacheWriter {
* @param session the session
* @param transaction the name of the transaction
*/
public void prepareCommit(Session session, String transaction) {
synchronized (database) {
log.prepareCommit(session, transaction);
}
public synchronized void prepareCommit(Session session, String transaction) {
log.prepareCommit(session, transaction);
}
/**
......@@ -1822,11 +1784,9 @@ public class PageStore implements CacheWriter {
* @param session the session
* @param tableId the table id
*/
public void logTruncate(Session session, int tableId) {
synchronized (database) {
if (!recoveryRunning) {
log.logTruncate(session, tableId);
}
public synchronized void logTruncate(Session session, int tableId) {
if (!recoveryRunning) {
log.logTruncate(session, tableId);
}
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论