提交 18db1b22 authored 作者: Thomas Mueller's avatar Thomas Mueller

Page store: reserve index head pages while recovering.

上级 cc064d81
...@@ -100,7 +100,6 @@ class PageDataLeaf extends PageData { ...@@ -100,7 +100,6 @@ class PageDataLeaf extends PageData {
if (entryCount > 1) { if (entryCount > 1) {
return entryCount / 2; return entryCount / 2;
} }
int todoIncorrect; int todoIncorrect;
if (find(row.getPos()) != 1) { if (find(row.getPos()) != 1) {
System.out.println("todo " + find(row.getPos())); System.out.println("todo " + find(row.getPos()));
......
...@@ -60,16 +60,16 @@ public class PageFreeList extends Record { ...@@ -60,16 +60,16 @@ public class PageFreeList extends Record {
/** /**
* Mark a page as used. * Mark a page as used.
* *
* @param pos the page id * @param pageId the page id
* @return the page id, or -1 * @return the page id, or -1
*/ */
int allocate(int pos) throws SQLException { int allocate(int pageId) throws SQLException {
int idx = pos - getPos(); int idx = pageId - getPos();
if (idx >= 0 && !used.get(idx)) { if (idx >= 0 && !used.get(idx)) {
used.set(idx); used.set(idx);
store.updateRecord(this, true, data); store.updateRecord(this, true, data);
} }
return pos; return pageId;
} }
/** /**
......
...@@ -228,12 +228,14 @@ public class PageLog { ...@@ -228,12 +228,14 @@ public class PageLog {
int sessionId = in.readInt(); int sessionId = in.readInt();
int tableId = in.readInt(); int tableId = in.readInt();
Row row = readRow(in, data); Row row = readRow(in, data);
if (stage == RECOVERY_STAGE_REDO) { if (stage == RECOVERY_STAGE_UNDO && x == ADD) {
store.allocateIfHead(pos, tableId, row);
} else if (stage == RECOVERY_STAGE_REDO) {
if (isSessionCommitted(sessionId, logId, pos)) { if (isSessionCommitted(sessionId, logId, pos)) {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log redo " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row); trace.debug("log redo " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row);
} }
store.redo(tableId, row, x == ADD); store.redo(pos, tableId, row, x == ADD);
} else { } else {
if (trace.isDebugEnabled()) { if (trace.isDebugEnabled()) {
trace.debug("log ignore s:" + sessionId + " " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row); trace.debug("log ignore s:" + sessionId + " " + (x == ADD ? "+" : "-") + " table:" + tableId + " " + row);
......
...@@ -184,6 +184,13 @@ public class PageStore implements CacheWriter { ...@@ -184,6 +184,13 @@ public class PageStore implements CacheWriter {
private TableData metaTable; private TableData metaTable;
private PageScanIndex metaIndex; private PageScanIndex metaIndex;
private HashMap<Integer, Index> metaObjects; private HashMap<Integer, Index> metaObjects;
/**
* The map of reserved pages, to ensure index head pages
* are not used for regular data during recovery. The key is the page id,
* and the value the latest transaction position where this page is used.
*/
private HashMap<Integer, Integer> reservedPages;
private int systemTableHeadPos; private int systemTableHeadPos;
// TODO reduce DEFAULT_MAX_LOG_SIZE, and don't divide here // TODO reduce DEFAULT_MAX_LOG_SIZE, and don't divide here
private long maxLogSize = Constants.DEFAULT_MAX_LOG_SIZE / 10; private long maxLogSize = Constants.DEFAULT_MAX_LOG_SIZE / 10;
...@@ -202,6 +209,7 @@ public class PageStore implements CacheWriter { ...@@ -202,6 +209,7 @@ public class PageStore implements CacheWriter {
this.accessMode = accessMode; this.accessMode = accessMode;
this.database = database; this.database = database;
trace = database.getTrace(Trace.PAGE_STORE); trace = database.getTrace(Trace.PAGE_STORE);
// int test;
// trace.setLevel(TraceSystem.DEBUG); // trace.setLevel(TraceSystem.DEBUG);
this.cacheSize = cacheSizeDefault; this.cacheSize = cacheSizeDefault;
String cacheType = database.getCacheType(); String cacheType = database.getCacheType();
...@@ -733,6 +741,11 @@ public class PageStore implements CacheWriter { ...@@ -733,6 +741,11 @@ public class PageStore implements CacheWriter {
trace.debug("log recover"); trace.debug("log recover");
recoveryRunning = true; recoveryRunning = true;
log.recover(PageLog.RECOVERY_STAGE_UNDO); log.recover(PageLog.RECOVERY_STAGE_UNDO);
if (reservedPages != null) {
for (int r : reservedPages.keySet()) {
allocatePage(r);
}
}
log.recover(PageLog.RECOVERY_STAGE_ALLOCATE); log.recover(PageLog.RECOVERY_STAGE_ALLOCATE);
openMetaIndex(); openMetaIndex();
readMetaData(); readMetaData();
...@@ -760,6 +773,7 @@ public class PageStore implements CacheWriter { ...@@ -760,6 +773,7 @@ public class PageStore implements CacheWriter {
openIndex.close(systemSession); openIndex.close(systemSession);
} }
recoveryRunning = false; recoveryRunning = false;
reservedPages = null;
writeBack(); writeBack();
// clear the cache because it contains pages with closed indexes // clear the cache because it contains pages with closed indexes
cache.clear(); cache.clear();
...@@ -821,19 +835,37 @@ public class PageStore implements CacheWriter { ...@@ -821,19 +835,37 @@ public class PageStore implements CacheWriter {
return systemTableHeadPos; return systemTableHeadPos;
} }
/**
* Reserve the page if this is a index head entry.
*
* @param logPos the redo log position
* @param tableId the table id
* @param row the row
*/
void allocateIfHead(int logPos, int tableId, Row row) throws SQLException {
if (tableId == META_TABLE_ID) {
int headPos = row.getValue(3).getInt();
if (reservedPages == null) {
reservedPages = New.hashMap();
}
reservedPages.put(headPos, logPos);
}
}
/** /**
* Redo a change in a table. * Redo a change in a table.
* *
* @param logPos the redo log position
* @param tableId the object id of the table * @param tableId the object id of the table
* @param row the row * @param row the row
* @param add true if the record is added, false if deleted * @param add true if the record is added, false if deleted
*/ */
void redo(int tableId, Row row, boolean add) throws SQLException { void redo(int logPos, int tableId, Row row, boolean add) throws SQLException {
if (tableId == META_TABLE_ID) { if (tableId == META_TABLE_ID) {
if (add) { if (add) {
addMeta(row, systemSession, true); addMeta(row, systemSession, true);
} else { } else {
removeMeta(row); removeMeta(logPos, row);
} }
} }
PageScanIndex index = (PageScanIndex) metaObjects.get(tableId); PageScanIndex index = (PageScanIndex) metaObjects.get(tableId);
...@@ -874,14 +906,22 @@ public class PageStore implements CacheWriter { ...@@ -874,14 +906,22 @@ public class PageStore implements CacheWriter {
} }
} }
private void removeMeta(Row row) throws SQLException { private void removeMeta(int logPos, Row row) throws SQLException {
int id = row.getValue(0).getInt(); int id = row.getValue(0).getInt();
Index index = metaObjects.remove(id); Index index = metaObjects.remove(id);
int headPos = index.getHeadPos();
index.getTable().removeIndex(index); index.getTable().removeIndex(index);
if (index instanceof PageBtreeIndex) { if (index instanceof PageBtreeIndex) {
index.getSchema().remove(index); index.getSchema().remove(index);
} }
index.remove(systemSession); index.remove(systemSession);
if (reservedPages != null && reservedPages.containsKey(headPos)) {
// re-allocate the page if it is used later on again
int latestPos = reservedPages.get(headPos);
if (latestPos > logPos) {
allocatePage(headPos);
}
}
} }
private void addMeta(Row row, Session session, boolean redo) throws SQLException { private void addMeta(Row row, Session session, boolean redo) throws SQLException {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论