提交 c0dcb0df authored 作者: Andrei Tokar's avatar Andrei Tokar

remove syncronized from MVStore.renameMap()

make beforeWrite() to call commit() instead of tryCommit() in case of unsaved memory churn
上级 c11e2576
...@@ -128,17 +128,15 @@ public class FileStore { ...@@ -128,17 +128,15 @@ public class FileStore {
if (file != null) { if (file != null) {
return; return;
} }
if (fileName != null) { // ensure the Cache file system is registered
// ensure the Cache file system is registered FilePathCache.INSTANCE.getScheme();
FilePathCache.INSTANCE.getScheme(); FilePath p = FilePath.get(fileName);
FilePath p = FilePath.get(fileName); // if no explicit scheme was specified, NIO is used
// if no explicit scheme was specified, NIO is used if (p instanceof FilePathDisk &&
if (p instanceof FilePathDisk && !fileName.startsWith(p.getScheme() + ":")) {
!fileName.startsWith(p.getScheme() + ":")) { // ensure the NIO file system is registered
// ensure the NIO file system is registered FilePathNio.class.getName();
FilePathNio.class.getName(); fileName = "nio:" + fileName;
fileName = "nio:" + fileName;
}
} }
this.fileName = fileName; this.fileName = fileName;
FilePath f = FilePath.get(fileName); FilePath f = FilePath.get(fileName);
......
...@@ -354,6 +354,8 @@ public class MVStore { ...@@ -354,6 +354,8 @@ public class MVStore {
retentionTime = this.fileStore.getDefaultRetentionTime(); retentionTime = this.fileStore.getDefaultRetentionTime();
int kb = DataUtils.getConfigParam(config, "autoCommitBufferSize", 1024); int kb = DataUtils.getConfigParam(config, "autoCommitBufferSize", 1024);
// 19 KB memory is about 1 KB storage // 19 KB memory is about 1 KB storage
// TODO: maybe keep 19 MB as an upper bound,
// TODO: but derive actual value from the amount of RAM available
autoCommitMemory = kb * 1024 * 19; autoCommitMemory = kb * 1024 * 19;
autoCompactFillRate = DataUtils.getConfigParam(config, "autoCompactFillRate", 40); autoCompactFillRate = DataUtils.getConfigParam(config, "autoCompactFillRate", 40);
char[] encryptionKey = (char[]) config.get("encryptionKey"); char[] encryptionKey = (char[]) config.get("encryptionKey");
...@@ -1068,8 +1070,16 @@ public class MVStore { ...@@ -1068,8 +1070,16 @@ public class MVStore {
* @return the new version (incremented if there were changes) * @return the new version (incremented if there were changes)
*/ */
public synchronized long commit() { public synchronized long commit() {
currentStoreThread.set(Thread.currentThread()); Thread currentThread = Thread.currentThread();
store(); Thread storeThread = currentStoreThread.get();
if (currentThread != storeThread) { // to avoid re-entrance
currentStoreThread.set(currentThread);
try {
store();
} finally {
currentStoreThread.set(storeThread);
}
}
return currentVersion; return currentVersion;
} }
...@@ -1652,7 +1662,6 @@ public class MVStore { ...@@ -1652,7 +1662,6 @@ public class MVStore {
* @return if there are any changes * @return if there are any changes
*/ */
public boolean hasUnsavedChanges() { public boolean hasUnsavedChanges() {
assert !metaChanged || meta.hasChangesSince(lastStoredVersion) : metaChanged;
if (metaChanged) { if (metaChanged) {
return true; return true;
} }
...@@ -2293,11 +2302,18 @@ public class MVStore { ...@@ -2293,11 +2302,18 @@ public class MVStore {
* @param map the map * @param map the map
*/ */
void beforeWrite(MVMap<?, ?> map) { void beforeWrite(MVMap<?, ?> map) {
if (saveNeeded && fileStore != null && !closed && autoCommitDelay > 0) { if (saveNeeded && fileStore != null && !closed) {
saveNeeded = false; saveNeeded = false;
// check again, because it could have been written by now // check again, because it could have been written by now
if (unsavedMemory > autoCommitMemory && autoCommitMemory > 0) { if (unsavedMemory > autoCommitMemory && autoCommitMemory > 0) {
tryCommit(); // if unsaved memory creation rate is to high,
// some back pressure need to be applied
// to slow things down and avoid OOME
if (3 * unsavedMemory > 4 * autoCommitMemory) {
commit();
} else {
tryCommit();
}
} }
} }
} }
...@@ -2495,7 +2511,7 @@ public class MVStore { ...@@ -2495,7 +2511,7 @@ public class MVStore {
* @param map the map * @param map the map
* @param newName the new name * @param newName the new name
*/ */
public synchronized void renameMap(MVMap<?, ?> map, String newName) { public void renameMap(MVMap<?, ?> map, String newName) {
checkOpen(); checkOpen();
DataUtils.checkArgument(map != meta, DataUtils.checkArgument(map != meta,
"Renaming the meta map is not allowed"); "Renaming the meta map is not allowed");
...@@ -2505,9 +2521,12 @@ public class MVStore { ...@@ -2505,9 +2521,12 @@ public class MVStore {
DataUtils.checkArgument( DataUtils.checkArgument(
!meta.containsKey("name." + newName), !meta.containsKey("name." + newName),
"A map named {0} already exists", newName); "A map named {0} already exists", newName);
meta.remove("name." + oldName); // at first create a new name as an "alias"
meta.put(MVMap.getMapKey(id), map.asString(newName));
meta.put("name." + newName, Integer.toHexString(id)); meta.put("name." + newName, Integer.toHexString(id));
// switch roles of a new and old names - old one is an alias now
meta.put(MVMap.getMapKey(id), map.asString(newName));
// get rid of the old name completely
meta.remove("name." + oldName);
markMetaChanged(); markMetaChanged();
} }
} }
......
...@@ -95,8 +95,8 @@ public class TransactionStore { ...@@ -95,8 +95,8 @@ public class TransactionStore {
private int nextTempMapId; private int nextTempMapId;
private static final String UNDO_LOG_NAME_PEFIX = "undoLog"; private static final String UNDO_LOG_NAME_PEFIX = "undoLog";
private static final char UNDO_LOG_OPEN = '-'; private static final char UNDO_LOG_COMMITTED = '-'; // must come before open in lexicographical order
private static final char UNDO_LOG_COMMITTED = '.'; private static final char UNDO_LOG_OPEN = '.';
/** /**
* Hard limit on the number of concurrently opened transactions * Hard limit on the number of concurrently opened transactions
...@@ -105,6 +105,11 @@ public class TransactionStore { ...@@ -105,6 +105,11 @@ public class TransactionStore {
private static final int MAX_OPEN_TRANSACTIONS = 65535; private static final int MAX_OPEN_TRANSACTIONS = 65535;
public static String getUndoLogName(boolean committed, int transactionId) {
return UNDO_LOG_NAME_PEFIX +
(committed ? UNDO_LOG_COMMITTED : UNDO_LOG_OPEN) +
(transactionId > 0 ? String.valueOf(transactionId) : "");
}
/** /**
* Create a new transaction store. * Create a new transaction store.
...@@ -158,24 +163,27 @@ public class TransactionStore { ...@@ -158,24 +163,27 @@ public class TransactionStore {
if (mapName.startsWith(UNDO_LOG_NAME_PEFIX)) { if (mapName.startsWith(UNDO_LOG_NAME_PEFIX)) {
if (store.hasData(mapName)) { if (store.hasData(mapName)) {
int transactionId = Integer.parseInt(mapName.substring(UNDO_LOG_NAME_PEFIX.length() + 1)); int transactionId = Integer.parseInt(mapName.substring(UNDO_LOG_NAME_PEFIX.length() + 1));
Object[] data = preparedTransactions.get(transactionId); VersionedBitSet openTxBitSet = openTransactions.get();
int status; if (!openTxBitSet.get(transactionId)) {
String name; Object[] data = preparedTransactions.get(transactionId);
if (data == null) { int status;
status = mapName.charAt(UNDO_LOG_NAME_PEFIX.length()) == UNDO_LOG_OPEN ? String name;
Transaction.STATUS_OPEN : Transaction.STATUS_COMMITTING; if (data == null) {
name = null; status = mapName.charAt(UNDO_LOG_NAME_PEFIX.length()) == UNDO_LOG_OPEN ?
} else { Transaction.STATUS_OPEN : Transaction.STATUS_COMMITTING;
status = (Integer) data[0]; name = null;
name = (String) data[1]; } else {
status = (Integer) data[0];
name = (String) data[1];
}
MVMap<Long, Object[]> undoLog = store.openMap(mapName, undoLogBuilder);
undoLogs[transactionId] = undoLog;
Long lastUndoKey = undoLog.lastKey();
assert lastUndoKey != null;
assert getTransactionId(lastUndoKey) == transactionId;
long logId = getLogId(lastUndoKey) + 1;
registerTransaction(transactionId, status, name, logId, timeoutMillis, 0, listener);
} }
MVMap<Long,Object[]> undoLog = store.openMap(mapName, undoLogBuilder);
undoLogs[transactionId] = undoLog;
Long lastUndoKey = undoLog.lastKey();
assert lastUndoKey != null;
assert getTransactionId(lastUndoKey) == transactionId;
long logId = getLogId(lastUndoKey) + 1;
registerTransaction(transactionId, status, name, logId, timeoutMillis, 0, listener);
} }
} }
} }
...@@ -333,7 +341,7 @@ public class TransactionStore { ...@@ -333,7 +341,7 @@ public class TransactionStore {
transactions.set(transactionId, transaction); transactions.set(transactionId, transaction);
if (undoLogs[transactionId] == null) { if (undoLogs[transactionId] == null) {
String undoName = UNDO_LOG_NAME_PEFIX + UNDO_LOG_OPEN + transactionId; String undoName = getUndoLogName(false, transactionId);
undoLogs[transactionId] = store.openMap(undoName, undoLogBuilder); undoLogs[transactionId] = store.openMap(undoName, undoLogBuilder);
} }
return transaction; return transaction;
...@@ -416,7 +424,7 @@ public class TransactionStore { ...@@ -416,7 +424,7 @@ public class TransactionStore {
try { try {
t.setStatus(Transaction.STATUS_COMMITTED); t.setStatus(Transaction.STATUS_COMMITTED);
MVMap<Long, Object[]> undoLog = undoLogs[transactionId]; MVMap<Long, Object[]> undoLog = undoLogs[transactionId];
store.renameMap(undoLog, UNDO_LOG_NAME_PEFIX + UNDO_LOG_COMMITTED + transactionId); store.renameMap(undoLog, getUndoLogName(true, transactionId));
try { try {
Cursor<Long, Object[]> cursor = undoLog.cursor(null); Cursor<Long, Object[]> cursor = undoLog.cursor(null);
while (cursor.hasNext()) { while (cursor.hasNext()) {
...@@ -432,7 +440,7 @@ public class TransactionStore { ...@@ -432,7 +440,7 @@ public class TransactionStore {
} }
undoLog.clear(); undoLog.clear();
} finally { } finally {
store.renameMap(undoLog, UNDO_LOG_NAME_PEFIX + UNDO_LOG_OPEN + transactionId); store.renameMap(undoLog, getUndoLogName(false, transactionId));
} }
} finally { } finally {
flipCommittingTransactionsBit(transactionId, false); flipCommittingTransactionsBit(transactionId, false);
......
...@@ -396,7 +396,8 @@ public class TestTransactionStore extends TestBase { ...@@ -396,7 +396,8 @@ public class TestTransactionStore extends TestBase {
store.close(); store.close();
s = MVStore.open(fileName); s = MVStore.open(fileName);
// roll back a bit, until we have some undo log entries // roll back a bit, until we have some undo log entries
assertTrue(s.hasMap("undoLog-1") || s.hasMap("undoLog.1")); assertTrue(s.hasMap(TransactionStore.getUndoLogName(false, 1)) ||
s.hasMap(TransactionStore.getUndoLogName(true, 1)));
for (int back = 0; back < 100; back++) { for (int back = 0; back < 100; back++) {
int minus = r.nextInt(10); int minus = r.nextInt(10);
s.rollbackTo(Math.max(0, s.getCurrentVersion() - minus)); s.rollbackTo(Math.max(0, s.getCurrentVersion() - minus));
...@@ -424,7 +425,7 @@ public class TestTransactionStore extends TestBase { ...@@ -424,7 +425,7 @@ public class TestTransactionStore extends TestBase {
private boolean hasDataUndoLog(MVStore s) { private boolean hasDataUndoLog(MVStore s) {
for (int i = 0; i < 255; i++) { for (int i = 0; i < 255; i++) {
if(s.hasData("undoLog."+i)) { if(s.hasData(TransactionStore.getUndoLogName(true, 1))) {
return true; return true;
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论