Unverified 提交 f796e624 authored 作者: Andrei Tokar's avatar Andrei Tokar 提交者: GitHub

Merge pull request #1188 from h2database/undo-log-split

Undo log split to reduce contention
......@@ -77,9 +77,13 @@ public class AlterTableAddConstraint extends SchemaCommand {
try {
return tryUpdate();
} catch (DbException e) {
try {
for (Index index : createdIndexes) {
session.getDatabase().removeSchemaObject(session, index);
}
} catch (Throwable ex) {
e.addSuppressed(ex);
}
throw e;
} finally {
getSchema().freeUniqueName(constraintName);
......
......@@ -159,11 +159,15 @@ public class CreateTable extends CommandWithColumns {
}
}
} catch (DbException e) {
try {
db.checkPowerOff();
db.removeSchemaObject(session, table);
if (!transactional) {
session.commit(true);
}
} catch (Throwable ex) {
e.addSuppressed(ex);
}
throw e;
}
return 0;
......
......@@ -752,6 +752,9 @@ public class Database implements DataHandler {
getPageStore();
}
}
if(mvStore != null) {
mvStore.getTransactionStore().init();
}
systemUser = new User(this, 0, SYSTEM_USER_NAME, true);
mainSchema = new Schema(this, 0, Constants.SCHEMA_MAIN, systemUser, true);
infoSchema = new Schema(this, -1, "INFORMATION_SCHEMA", systemUser, true);
......@@ -762,9 +765,6 @@ public class Database implements DataHandler {
systemUser.setAdmin(true);
systemSession = new Session(this, systemUser, ++nextSessionId);
lobSession = new Session(this, systemUser, ++nextSessionId);
if(mvStore != null) {
mvStore.getTransactionStore().init(systemSession);
}
CreateTableData data = new CreateTableData();
ArrayList<Column> cols = data.columns;
Column columnId = new Column("ID", Value.INT);
......
......@@ -300,10 +300,10 @@ public class Trace {
if (!space) {
buff.append(' ');
}
buff.append("*/").
append(StringUtils.javaEncode(sql)).
append(StringUtils.javaEncode(params)).
append(';');
buff.append("*/");
StringUtils.javaEncode(sql, buff);
StringUtils.javaEncode(params, buff);
buff.append(';');
sql = buff.toString();
traceWriter.write(TraceSystem.INFO, module, sql, null);
}
......
......@@ -128,7 +128,6 @@ public class FileStore {
if (file != null) {
return;
}
if (fileName != null) {
// ensure the Cache file system is registered
FilePathCache.INSTANCE.getScheme();
FilePath p = FilePath.get(fileName);
......@@ -139,7 +138,6 @@ public class FileStore {
FilePathNio.class.getName();
fileName = "nio:" + fileName;
}
}
this.fileName = fileName;
FilePath f = FilePath.get(fileName);
FilePath parent = f.getParent();
......
......@@ -23,9 +23,10 @@ import java.util.PriorityQueue;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.h2.compress.CompressDeflate;
import org.h2.compress.CompressLZF;
import org.h2.compress.Compressor;
......@@ -33,6 +34,7 @@ import org.h2.engine.Constants;
import org.h2.mvstore.cache.CacheLongKeyLIRS;
import org.h2.util.MathUtils;
import static org.h2.mvstore.MVMap.INITIAL_VERSION;
import org.h2.util.Utils;
/*
......@@ -144,6 +146,14 @@ public class MVStore {
*/
private static final int MARKED_FREE = 10_000_000;
/**
* Lock which governs access to major store operations: store(), close(), ...
* It should used in a non-reentrant fashion.
* It serves as a replacement for synchronized(this), except it allows for
* non-blocking lock attempts.
*/
private final ReentrantLock storeLock = new ReentrantLock(true);
/**
* The background thread, if any.
*/
......@@ -195,8 +205,7 @@ public class MVStore {
private final Map<Integer, Chunk> freedPageSpace = new HashMap<>();
/**
* The metadata map. Write access to this map needs to be synchronized on
* the store.
* The metadata map. Write access to this map needs to be done under storeLock.
*/
private final MVMap<String, String> meta;
......@@ -207,7 +216,7 @@ public class MVStore {
private WriteBuffer writeBuffer;
private int lastMapId;
private final AtomicInteger lastMapId = new AtomicInteger();
private int versionsToKeep = 5;
......@@ -274,12 +283,6 @@ public class MVStore {
*/
private volatile long currentStoreVersion = -1;
/**
* Holds reference to a thread performing store operation (if any)
* or null if there is none is in progress.
*/
private final AtomicReference<Thread> currentStoreThread = new AtomicReference<>();
private volatile boolean metaChanged;
/**
......@@ -289,8 +292,10 @@ public class MVStore {
private final int autoCompactFillRate;
private long autoCompactLastFileOpCount;
private final Object compactSync = new Object();
/**
* Simple lock to ensure that no more than one compaction runs at any given time
*/
private boolean compactInProgress;
private volatile IllegalStateException panicException;
......@@ -352,9 +357,10 @@ public class MVStore {
meta.init();
if (this.fileStore != null) {
retentionTime = this.fileStore.getDefaultRetentionTime();
int kb = DataUtils.getConfigParam(config, "autoCommitBufferSize", 1024);
// 19 KB memory is about 1 KB storage
autoCommitMemory = kb * 1024 * 19;
int kb = Math.max(1, Math.min(19, Utils.scaleForAvailableMemory(64))) * 1024;
kb = DataUtils.getConfigParam(config, "autoCommitBufferSize", kb);
autoCommitMemory = kb * 1024;
autoCompactFillRate = DataUtils.getConfigParam(config, "autoCompactFillRate", 40);
char[] encryptionKey = (char[]) config.get("encryptionKey");
try {
......@@ -472,15 +478,14 @@ public class MVStore {
* @param builder the map builder
* @return the map
*/
public synchronized <M extends MVMap<K, V>, K, V> M openMap(
String name, MVMap.MapBuilder<M, K, V> builder) {
public <M extends MVMap<K, V>, K, V> M openMap(String name, MVMap.MapBuilder<M, K, V> builder) {
int id = getMapId(name);
M map;
if (id >= 0) {
map = openMap(id, builder);
} else {
HashMap<String, Object> c = new HashMap<>();
id = ++lastMapId;
id = lastMapId.incrementAndGet();
c.put("id", id);
c.put("createVersion", currentVersion);
map = builder.create(this, c);
......@@ -491,16 +496,17 @@ public class MVStore {
map.setRootPos(0, lastStoredVersion);
markMetaChanged();
@SuppressWarnings("unchecked")
M existingMap = (M)maps.putIfAbsent(id, map);
if(existingMap != null) {
M existingMap = (M) maps.putIfAbsent(id, map);
if (existingMap != null) {
map = existingMap;
}
}
return map;
}
public synchronized <M extends MVMap<K, V>, K, V> M openMap(int id,
MVMap.MapBuilder<M, K, V> builder) {
public <M extends MVMap<K, V>, K, V> M openMap(int id, MVMap.MapBuilder<M, K, V> builder) {
storeLock.lock();
try {
@SuppressWarnings("unchecked")
M map = (M) getMap(id);
if (map == null) {
......@@ -517,6 +523,9 @@ public class MVStore {
}
}
return map;
} finally {
storeLock.unlock();
}
}
public <K, V> MVMap<K,V> getMap(int id) {
......@@ -764,12 +773,12 @@ public class MVStore {
lastChunk = last;
if (last == null) {
// no valid chunk
lastMapId = 0;
lastMapId.set(0);
currentVersion = 0;
lastStoredVersion = INITIAL_VERSION;
meta.setRootPos(0, INITIAL_VERSION);
} else {
lastMapId = last.mapId;
lastMapId.set(last.mapId);
currentVersion = last.version;
chunks.put(last.id, last);
lastStoredVersion = currentVersion - 1;
......@@ -952,12 +961,10 @@ public class MVStore {
if (closed) {
return;
}
// can not synchronize on this yet, because
// the thread also synchronized on this, which
// could result in a deadlock
stopBackgroundThread();
closed = true;
synchronized (this) {
storeLock.lock();
try {
if (fileStore != null && shrinkIfPossible) {
shrinkFileIfPossible(0);
}
......@@ -977,6 +984,8 @@ public class MVStore {
if (fileStore != null && !fileStoreIsProvided) {
fileStore.close();
}
} finally {
storeLock.unlock();
}
}
......@@ -1041,11 +1050,15 @@ public class MVStore {
* @return the new version (incremented if there were changes)
*/
public long tryCommit() {
// unlike synchronization, this will also prevent re-entrance,
// which may be possible, if the meta map have changed
if (currentStoreThread.compareAndSet(null, Thread.currentThread())) {
synchronized (this) {
// we need to prevent re-entrance, which may be possible,
// because meta map is modified within storeNow() and that
// causes beforeWrite() call with possibility of going back here
if ((!storeLock.isHeldByCurrentThread() || currentStoreVersion < 0) &&
storeLock.tryLock()) {
try {
store();
} finally {
storeLock.unlock();
}
}
return currentVersion;
......@@ -1067,9 +1080,18 @@ public class MVStore {
*
* @return the new version (incremented if there were changes)
*/
public synchronized long commit() {
currentStoreThread.set(Thread.currentThread());
public long commit() {
// we need to prevent re-entrance, which may be possible,
// because meta map is modified within storeNow() and that
// causes beforeWrite() call with possibility of going back here
if(!storeLock.isHeldByCurrentThread() || currentStoreVersion < 0) {
storeLock.lock();
try {
store();
} finally {
storeLock.unlock();
}
}
return currentVersion;
}
......@@ -1101,12 +1123,11 @@ public class MVStore {
// in any case reset the current store version,
// to allow closing the store
currentStoreVersion = -1;
currentStoreThread.set(null);
}
}
private void storeNow() {
assert Thread.holdsLock(this);
assert storeLock.isHeldByCurrentThread();
long time = getTimeSinceCreation();
freeUnusedIfNeeded(time);
int currentUnsavedPageCount = unsavedMemory;
......@@ -1151,7 +1172,7 @@ public class MVStore {
c.len = Integer.MAX_VALUE;
c.time = time;
c.version = version;
c.mapId = lastMapId;
c.mapId = lastMapId.get();
c.next = Long.MAX_VALUE;
chunks.put(c.id, c);
// force a metadata update
......@@ -1312,7 +1333,8 @@ public class MVStore {
}
}
private synchronized void freeUnusedChunks() {
private void freeUnusedChunks() {
assert storeLock.isHeldByCurrentThread();
if (lastChunk != null && reuseSpace) {
Set<Integer> referenced = collectReferencedChunks();
long time = getTimeSinceCreation();
......@@ -1652,7 +1674,6 @@ public class MVStore {
* @return if there are any changes
*/
public boolean hasUnsavedChanges() {
assert !metaChanged || meta.hasChangesSince(lastStoredVersion) : metaChanged;
if (metaChanged) {
return true;
}
......@@ -1684,7 +1705,9 @@ public class MVStore {
*
* @return if anything was written
*/
public synchronized boolean compactRewriteFully() {
public boolean compactRewriteFully() {
storeLock.lock();
try {
checkOpen();
if (lastChunk == null) {
// nothing to do
......@@ -1709,6 +1732,10 @@ public class MVStore {
}
commit();
return true;
} finally {
storeLock.unlock();
}
}
/**
......@@ -1728,7 +1755,9 @@ public class MVStore {
* than this
* @param moveSize the number of bytes to move
*/
public synchronized void compactMoveChunks(int targetFillRate, long moveSize) {
public void compactMoveChunks(int targetFillRate, long moveSize) {
storeLock.lock();
try {
checkOpen();
if (lastChunk != null && reuseSpace) {
int oldRetentionTime = retentionTime;
......@@ -1746,6 +1775,9 @@ public class MVStore {
retentionTime = oldRetentionTime;
}
}
} finally {
storeLock.unlock();
}
}
private ArrayList<Chunk> findChunksToMove(long startBlock, long moveSize) {
......@@ -1884,18 +1916,32 @@ public class MVStore {
if (!reuseSpace) {
return false;
}
synchronized (compactSync) {
checkOpen();
ArrayList<Chunk> old;
synchronized (this) {
old = findOldChunks(targetFillRate, write);
}
// We can't wait forever for the lock here,
// because if called from the background thread,
// it might go into deadlock with concurrent database closure
// and attempt to stop this thread.
try {
if (storeLock.tryLock(10, TimeUnit.MILLISECONDS)) {
try {
if (!compactInProgress) {
compactInProgress = true;
ArrayList<Chunk> old = findOldChunks(targetFillRate, write);
if (old == null || old.isEmpty()) {
return false;
}
compactRewrite(old);
return true;
}
} finally {
compactInProgress = false;
storeLock.unlock();
}
}
return false;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
/**
......@@ -2293,14 +2339,21 @@ public class MVStore {
* @param map the map
*/
void beforeWrite(MVMap<?, ?> map) {
if (saveNeeded && fileStore != null && !closed && autoCommitDelay > 0) {
if (saveNeeded && fileStore != null && !closed) {
saveNeeded = false;
// check again, because it could have been written by now
if (unsavedMemory > autoCommitMemory && autoCommitMemory > 0) {
// if unsaved memory creation rate is to high,
// some back pressure need to be applied
// to slow things down and avoid OOME
if (3 * unsavedMemory > 4 * autoCommitMemory) {
commit();
} else {
tryCommit();
}
}
}
}
/**
* Get the store version. The store version is usually used to upgrade the
......@@ -2320,10 +2373,15 @@ public class MVStore {
*
* @param version the new store version
*/
public synchronized void setStoreVersion(int version) {
public void setStoreVersion(int version) {
storeLock.lock();
try {
checkOpen();
markMetaChanged();
meta.put("setting.storeVersion", Integer.toHexString(version));
} finally {
storeLock.unlock();
}
}
/**
......@@ -2342,7 +2400,9 @@ public class MVStore {
*
* @param version the version to revert to
*/
public synchronized void rollbackTo(long version) {
public void rollbackTo(long version) {
storeLock.lock();
try {
checkOpen();
if (version == 0) {
// special case: remove all data
......@@ -2441,6 +2501,9 @@ public class MVStore {
if (lastStoredVersion == INITIAL_VERSION) {
lastStoredVersion = currentVersion - 1;
}
} finally {
storeLock.unlock();
}
}
private static long getRootPos(MVMap<String, String> map, int mapId) {
......@@ -2495,19 +2558,25 @@ public class MVStore {
* @param map the map
* @param newName the new name
*/
public synchronized void renameMap(MVMap<?, ?> map, String newName) {
public void renameMap(MVMap<?, ?> map, String newName) {
checkOpen();
DataUtils.checkArgument(map != meta,
"Renaming the meta map is not allowed");
int id = map.getId();
String oldName = getMapName(id);
if (oldName != null && !oldName.equals(newName)) {
String idHexStr = Integer.toHexString(id);
// we need to cope whith the case of previously unfinished rename
String existingIdHexStr = meta.get("name." + newName);
DataUtils.checkArgument(
!meta.containsKey("name." + newName),
existingIdHexStr == null || existingIdHexStr.equals(idHexStr),
"A map named {0} already exists", newName);
meta.remove("name." + oldName);
// at first create a new name as an "alias"
meta.put("name." + newName, idHexStr);
// switch roles of a new and old names - old one is an alias now
meta.put(MVMap.getMapKey(id), map.asString(newName));
meta.put("name." + newName, Integer.toHexString(id));
// get rid of the old name completely
meta.remove("name." + oldName);
markMetaChanged();
}
}
......@@ -2522,7 +2591,9 @@ public class MVStore {
removeMap(map, true);
}
public synchronized void removeMap(MVMap<?, ?> map, boolean delayed) {
public void removeMap(MVMap<?, ?> map, boolean delayed) {
storeLock.lock();
try {
checkOpen();
DataUtils.checkArgument(map != meta,
"Removing the meta map is not allowed");
......@@ -2534,6 +2605,9 @@ public class MVStore {
int id = map.getId();
String name = getMapName(id);
removeMap(name, id, delayed);
} finally {
storeLock.unlock();
}
}
private void removeMap(String name, int id, boolean delayed) {
......@@ -2659,11 +2733,7 @@ public class MVStore {
synchronized (t.sync) {
t.sync.notifyAll();
}
if (Thread.holdsLock(this)) {
// called from storeNow: can not join,
// because that could result in a deadlock
return;
}
try {
t.join();
} catch (Exception e) {
......@@ -2833,11 +2903,11 @@ public class MVStore {
public void deregisterVersionUsage(TxCounter txCounter) {
if(txCounter != null) {
if(txCounter.counter.decrementAndGet() <= 0) {
if (currentStoreThread.compareAndSet(null, Thread.currentThread())) {
if (!storeLock.isHeldByCurrentThread() && storeLock.tryLock()) {
try {
dropUnusedVersions();
} finally {
currentStoreThread.set(null);
storeLock.unlock();
}
}
}
......
......@@ -253,7 +253,7 @@ public class MVTableEngine implements TableEngine {
public void initTransactions() {
List<Transaction> list = transactionStore.getOpenTransactions();
for (Transaction t : list) {
if (t.getStatus() == Transaction.STATUS_COMMITTING) {
if (t.getStatus() == Transaction.STATUS_COMMITTED) {
t.commit();
} else if (t.getStatus() != Transaction.STATUS_PREPARED) {
t.rollback();
......
......@@ -30,7 +30,9 @@ final class RollbackDecisionMaker extends MVMap.DecisionMaker<Object[]> {
@Override
public MVMap.Decision decide(Object[] existingValue, Object[] providedValue) {
assert decision == null;
assert existingValue != null;
// normaly existingValue will always be there except of db initialization
// where some undo log enty was captured on disk but actual map entry was not
if (existingValue != null ) {
VersionedValue valueToRestore = (VersionedValue) existingValue[2];
long operationId;
if (valueToRestore == null ||
......@@ -45,6 +47,7 @@ final class RollbackDecisionMaker extends MVMap.DecisionMaker<Object[]> {
listener.onRollback(map, key, previousValue, valueToRestore);
}
}
}
decision = MVMap.Decision.REMOVE;
return decision;
}
......
......@@ -32,41 +32,36 @@ public class Transaction {
*/
public static final int STATUS_PREPARED = 2;
/**
* The status of a transaction that is being committed, but possibly not
* yet finished. A transactions can go into this state when the store is
* closed while the transaction is committing. When opening a store,
* such transactions should be committed.
*/
public static final int STATUS_COMMITTING = 3;
/**
* The status of a transaction that has been logically committed or rather
* marked as committed, because it might be still listed among prepared,
* if it was prepared for commit, undo log entries might still exists for it
* if it was prepared for commit. Undo log entries might still exists for it
* and not all of it's changes within map's are re-written as committed yet.
* Nevertheless, those changes should be already viewed by other
* transactions as committed.
* This transaction's id can not be re-used until all the above is completed
* This transaction's id can not be re-used until all of the above is completed
* and transaction is closed.
* A transactions can be observed in this state when the store was
* closed while the transaction was not closed yet.
* When opening a store, such transactions will automatically
* be processed and closed as committed.
*/
private static final int STATUS_COMMITTED = 4;
public static final int STATUS_COMMITTED = 3;
/**
* The status of a transaction that currently in a process of rolling back
* to a savepoint.
*/
private static final int STATUS_ROLLING_BACK = 5;
private static final int STATUS_ROLLING_BACK = 4;
/**
* The status of a transaction that has been rolled back completely,
* but undo operations are not finished yet.
*/
private static final int STATUS_ROLLED_BACK = 6;
private static final int STATUS_ROLLED_BACK = 5;
private static final String STATUS_NAMES[] = {
"CLOSED", "OPEN", "PREPARED", "COMMITTING",
"COMMITTED", "ROLLING_BACK", "ROLLED_BACK"
"CLOSED", "OPEN", "PREPARED", "COMMITTED", "ROLLING_BACK", "ROLLED_BACK"
};
static final int LOG_ID_BITS = 40;
private static final int LOG_ID_BITS1 = LOG_ID_BITS + 1;
......@@ -175,6 +170,11 @@ public class Transaction {
return getStatus(statusAndLogId.get());
}
/**
* Changes transaction status to a specified value
* @param status to be set
* @return transaction state as it was before status change
*/
long setStatus(int status) {
while (true) {
long currentState = statusAndLogId.get();
......@@ -192,23 +192,19 @@ public class Transaction {
case STATUS_PREPARED:
valid = currentStatus == STATUS_OPEN;
break;
case STATUS_COMMITTING:
case STATUS_COMMITTED:
valid = currentStatus == STATUS_OPEN ||
currentStatus == STATUS_PREPARED ||
// this case is only possible if called
// from endLeftoverTransactions()
currentStatus == STATUS_COMMITTING;
break;
case STATUS_COMMITTED:
valid = currentStatus == STATUS_COMMITTING;
currentStatus == STATUS_COMMITTED;
break;
case STATUS_ROLLED_BACK:
valid = currentStatus == STATUS_OPEN ||
currentStatus == STATUS_PREPARED;
break;
case STATUS_CLOSED:
valid = currentStatus == STATUS_COMMITTING ||
currentStatus == STATUS_COMMITTED ||
valid = currentStatus == STATUS_COMMITTED ||
currentStatus == STATUS_ROLLED_BACK;
break;
default:
......@@ -365,11 +361,11 @@ public class Transaction {
Throwable ex = null;
boolean hasChanges = false;
try {
long state = setStatus(STATUS_COMMITTING);
long state = setStatus(STATUS_COMMITTED);
hasChanges = hasChanges(state);
int previousStatus = getStatus(state);
if (hasChanges) {
long logId = getLogId(state);
store.commit(this, logId);
store.commit(this, previousStatus == STATUS_COMMITTED);
}
} catch (Throwable e) {
ex = e;
......
......@@ -77,11 +77,22 @@ public class TransactionMap<K, V> {
// when none of the variables concurrently changes it's value.
BitSet committingTransactions;
MVMap.RootReference mapRootReference;
MVMap.RootReference undoLogRootReference;
MVMap.RootReference[] undoLogRootReferences;
long undoLogSize;
do {
committingTransactions = store.committingTransactions.get();
mapRootReference = map.getRoot();
undoLogRootReference = store.undoLog.getRoot();
BitSet opentransactions = store.openTransactions.get();
undoLogRootReferences = new MVMap.RootReference[opentransactions.length()];
undoLogSize = 0;
for (int i = opentransactions.nextSetBit(0); i >= 0; i = opentransactions.nextSetBit(i+1)) {
MVMap<Long, Object[]> undoLog = store.undoLogs[i];
if (undoLog != null) {
MVMap.RootReference rootReference = undoLog.getRoot();
undoLogRootReferences[i] = rootReference;
undoLogSize += rootReference.root.getTotalCount();
}
}
} while(committingTransactions != store.committingTransactions.get() ||
mapRootReference != map.getRoot());
// Now we have a snapshot, where mapRootReference points to state of the map,
......@@ -89,8 +100,6 @@ public class TransactionMap<K, V> {
// and committingTransactions mask tells us which of seemingly uncommitted changes
// should be considered as committed.
// Subsequent processing uses this snapshot info only.
Page undoRootPage = undoLogRootReference.root;
long undoLogSize = undoRootPage.getTotalCount();
Page mapRootPage = mapRootReference.root;
long size = mapRootPage.getTotalCount();
// if we are looking at the map without any uncommitted values
......@@ -112,7 +121,8 @@ public class TransactionMap<K, V> {
long operationId = currentValue.getOperationId();
if (operationId != 0) { // skip committed entries
int txId = TransactionStore.getTransactionId(operationId);
boolean isVisible = txId == transaction.transactionId || committingTransactions.get(txId);
boolean isVisible = txId == transaction.transactionId ||
committingTransactions.get(txId);
Object v = isVisible ? currentValue.value : currentValue.getCommittedValue();
if (v == null) {
--size;
......@@ -120,12 +130,14 @@ public class TransactionMap<K, V> {
}
}
} else {
// The undo log is much smaller than the map - scan the undo log, and then lookup relevant map entry.
Cursor<Long, Object[]> cursor = new Cursor<>(undoRootPage, null);
while(cursor.hasNext()) {
// The undo logs are much smaller than the map - scan all undo logs, and then lookup relevant map entry.
for (MVMap.RootReference undoLogRootReference : undoLogRootReferences) {
if (undoLogRootReference != null) {
Cursor<Long, Object[]> cursor = new Cursor<>(undoLogRootReference.root, null);
while (cursor.hasNext()) {
cursor.next();
Object op[] = cursor.getValue();
if ((int)op[0] == map.getId()) {
if ((int) op[0] == map.getId()) {
VersionedValue currentValue = map.get(mapRootPage, op[1]);
// If map entry is not there, then we never counted it, in the first place, so skip it.
// This is possible when undo entry exists because it belongs
......@@ -136,7 +148,8 @@ public class TransactionMap<K, V> {
long operationId = cursor.getKey();
if (currentValue.getOperationId() == operationId) {
int txId = TransactionStore.getTransactionId(operationId);
boolean isVisible = txId == transaction.transactionId || committingTransactions.get(txId);
boolean isVisible = txId == transaction.transactionId ||
committingTransactions.get(txId);
Object v = isVisible ? currentValue.value : currentValue.getCommittedValue();
if (v == null) {
--size;
......@@ -146,6 +159,8 @@ public class TransactionMap<K, V> {
}
}
}
}
}
return size;
}
......
......@@ -12,6 +12,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.h2.mvstore.Cursor;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
......@@ -41,7 +42,7 @@ public class TransactionStore {
private final MVMap<Integer, Object[]> preparedTransactions;
/**
* The undo log.
* Undo logs.
* <p>
* If the first entry for a transaction doesn't have a logId
* of 0, then the transaction is partially committed (which means rollback
......@@ -50,7 +51,9 @@ public class TransactionStore {
* <p>
* Key: opId, value: [ mapId, key, oldValue ].
*/
final MVMap<Long, Object[]> undoLog;
@SuppressWarnings("unchecked")
final MVMap<Long,Object[]> undoLogs[] = new MVMap[MAX_OPEN_TRANSACTIONS];
private final MVMap.Builder<Long,Object[]> undoLogBuilder;
private final DataType dataType;
......@@ -91,6 +94,9 @@ public class TransactionStore {
*/
private int nextTempMapId;
private static final String UNDO_LOG_NAME_PEFIX = "undoLog";
private static final char UNDO_LOG_COMMITTED = '-'; // must come before open in lexicographical order
private static final char UNDO_LOG_OPEN = '.';
/**
* Hard limit on the number of concurrently opened transactions
......@@ -99,6 +105,11 @@ public class TransactionStore {
private static final int MAX_OPEN_TRANSACTIONS = 65535;
public static String getUndoLogName(boolean committed, int transactionId) {
return UNDO_LOG_NAME_PEFIX +
(committed ? UNDO_LOG_COMMITTED : UNDO_LOG_OPEN) +
(transactionId > 0 ? String.valueOf(transactionId) : "");
}
/**
* Create a new transaction store.
......@@ -126,15 +137,7 @@ public class TransactionStore {
ArrayType undoLogValueType = new ArrayType(new DataType[]{
new ObjectDataType(), dataType, oldValueType
});
MVMap.Builder<Long, Object[]> builder =
new MVMap.Builder<Long, Object[]>().
valueType(undoLogValueType);
undoLog = store.openMap("undoLog", builder);
if (undoLog.getValueType() != undoLogValueType) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_CORRUPT,
"Undo map open with a different value type");
}
undoLogBuilder = new MVMap.Builder<Long, Object[]>().valueType(undoLogValueType);
}
/**
......@@ -143,10 +146,6 @@ public class TransactionStore {
* in which case the store can only be used for reading.
*/
public void init() {
init(RollbackListener.NONE);
}
public synchronized void init(RollbackListener listener) {
if (!init) {
// remove all temporary maps
for (String mapName : store.getMapNames()) {
......@@ -155,32 +154,32 @@ public class TransactionStore {
store.removeMap(temp);
}
}
if (!undoLog.isEmpty()) {
Long key = undoLog.firstKey();
while (key != null) {
int transactionId = getTransactionId(key);
if (!openTransactions.get().get(transactionId)) {
for (String mapName : store.getMapNames()) {
if (mapName.startsWith(UNDO_LOG_NAME_PEFIX)) {
if (store.hasData(mapName)) {
int transactionId = Integer.parseInt(mapName.substring(UNDO_LOG_NAME_PEFIX.length() + 1));
VersionedBitSet openTxBitSet = openTransactions.get();
if (!openTxBitSet.get(transactionId)) {
Object[] data = preparedTransactions.get(transactionId);
int status;
String name;
if (data == null) {
if (undoLog.containsKey(getOperationId(transactionId, 0))) {
status = Transaction.STATUS_OPEN;
} else {
status = Transaction.STATUS_COMMITTING;
}
status = mapName.charAt(UNDO_LOG_NAME_PEFIX.length()) == UNDO_LOG_OPEN ?
Transaction.STATUS_OPEN : Transaction.STATUS_COMMITTED;
name = null;
} else {
status = (Integer) data[0];
name = (String) data[1];
}
long nextTxUndoKey = getOperationId(transactionId + 1, 0);
Long lastUndoKey = undoLog.lowerKey(nextTxUndoKey);
MVMap<Long, Object[]> undoLog = store.openMap(mapName, undoLogBuilder);
undoLogs[transactionId] = undoLog;
Long lastUndoKey = undoLog.lastKey();
assert lastUndoKey != null;
assert getTransactionId(lastUndoKey) == transactionId;
long logId = getLogId(lastUndoKey) + 1;
registerTransaction(transactionId, status, name, logId, timeoutMillis, 0, listener);
key = undoLog.ceilingKey(nextTxUndoKey);
registerTransaction(transactionId, status, name, logId, timeoutMillis, 0, RollbackListener.NONE);
}
}
}
}
......@@ -337,6 +336,11 @@ public class TransactionStore {
assert transactions.get(transactionId) == null;
transactions.set(transactionId, transaction);
if (undoLogs[transactionId] == null) {
String undoName = getUndoLogName(status == Transaction.STATUS_COMMITTED, transactionId);
MVMap<Long, Object[]> undoLog = store.openMap(undoName, undoLogBuilder);
undoLogs[transactionId] = undoLog;
}
return transaction;
}
......@@ -345,7 +349,7 @@ public class TransactionStore {
*
* @param t the transaction
*/
synchronized void storeTransaction(Transaction t) {
void storeTransaction(Transaction t) {
if (t.getStatus() == Transaction.STATUS_PREPARED ||
t.getName() != null) {
Object[] v = { t.getStatus(), t.getName() };
......@@ -362,29 +366,27 @@ public class TransactionStore {
* @param undoLogRecord Object[mapId, key, previousValue]
*/
long addUndoLogRecord(int transactionId, long logId, Object[] undoLogRecord) {
MVMap<Long, Object[]> undoLog = undoLogs[transactionId];
Long undoKey = getOperationId(transactionId, logId);
if (logId == 0) {
if (undoLog.containsKey(undoKey)) {
if (logId == 0 && !undoLog.isEmpty()) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TOO_MANY_OPEN_TRANSACTIONS,
"An old transaction with the same id " +
"is still open: {0}",
transactionId);
}
}
undoLog.put(undoKey, undoLogRecord);
return undoKey;
}
/**
* Remove a log entry.
*
* Remove an undo log entry.
* @param transactionId id of the transaction
* @param logId sequential number of the log record within transaction
*/
public void removeUndoLogRecord(int transactionId, long logId) {
Long undoKey = getOperationId(transactionId, logId);
Object[] old = undoLog.remove(undoKey);
Object[] old = undoLogs[transactionId].remove(undoKey);
if (old == null) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
......@@ -400,20 +402,18 @@ public class TransactionStore {
* @param <V> the value type
* @param map the map
*/
synchronized <K, V> void removeMap(TransactionMap<K, V> map) {
store.removeMap(map.map);
<K, V> void removeMap(TransactionMap<K, V> map) {
store.removeMap(map.map, true);
}
/**
* Commit a transaction.
*
* @param t the transaction
* @param maxLogId the last log id
* @param t transaction to commit
* @param recovery if called during initial transaction recovery procedure
* therefore undo log is stored under "committed" name already
*/
void commit(Transaction t, long maxLogId) {
if (store.isClosed()) {
return;
}
void commit(Transaction t, boolean recovery) {
if (!store.isClosed()) {
int transactionId = t.transactionId;
// this is an atomic action that causes all changes
// made by this transaction, to be considered as "committed"
......@@ -421,19 +421,15 @@ public class TransactionStore {
CommitDecisionMaker commitDecisionMaker = new CommitDecisionMaker();
try {
for (long logId = 0; logId < maxLogId; logId++) {
Long undoKey = getOperationId(transactionId, logId);
Object[] op = undoLog.get(undoKey);
if (op == null) {
// partially committed: load next
undoKey = undoLog.ceilingKey(undoKey);
if (undoKey == null ||
getTransactionId(undoKey) != transactionId) {
break;
}
logId = getLogId(undoKey) - 1;
continue;
MVMap<Long, Object[]> undoLog = undoLogs[transactionId];
if(!recovery) {
store.renameMap(undoLog, getUndoLogName(true, transactionId));
}
try {
Cursor<Long, Object[]> cursor = undoLog.cursor(null);
while (cursor.hasNext()) {
Long undoKey = cursor.next();
Object[] op = cursor.getValue();
int mapId = (Integer) op[0];
MVMap<Object, VersionedValue> map = openMap(mapId);
if (map != null) { // might be null if map was removed later
......@@ -441,12 +437,16 @@ public class TransactionStore {
commitDecisionMaker.setUndoKey(undoKey);
map.operate(key, null, commitDecisionMaker);
}
undoLog.remove(undoKey);
}
undoLog.clear();
} finally {
store.renameMap(undoLog, getUndoLogName(false, transactionId));
}
} finally {
flipCommittingTransactionsBit(transactionId, false);
}
}
}
private void flipCommittingTransactionsBit(int transactionId, boolean flag) {
boolean success;
......@@ -541,11 +541,9 @@ public class TransactionStore {
* (even if they are fully rolled back),
* false if it just performed a data access
*/
synchronized void endTransaction(Transaction t, boolean hasChanges) {
void endTransaction(Transaction t, boolean hasChanges) {
t.closeIt();
int txId = t.transactionId;
assert transactions.get(txId) == t : transactions.get(txId) + " != " + t;
transactions.set(txId, null);
boolean success;
......@@ -562,13 +560,14 @@ public class TransactionStore {
if (wasStored && !preparedTransactions.isClosed()) {
preparedTransactions.remove(txId);
}
if (wasStored || store.getAutoCommitDelay() == 0) {
store.tryCommit();
} else {
if (isUndoEmpty()) {
// to avoid having to store the transaction log,
// if there is no open transaction,
// and if there have been many changes, store them now
if (undoLog.isEmpty()) {
int unsaved = store.getUnsavedMemory();
int max = store.getAutoCommitMemory();
// save at 3/4 capacity
......@@ -580,6 +579,17 @@ public class TransactionStore {
}
}
private boolean isUndoEmpty() {
BitSet openTrans = openTransactions.get();
for (int i = openTrans.nextSetBit(0); i >= 0; i = openTrans.nextSetBit(i + 1)) {
MVMap<Long, Object[]> undoLog = undoLogs[i];
if (undoLog != null && !undoLog.isEmpty()) {
return false;
}
}
return true;
}
Transaction getTransaction(int transactionId) {
return transactions.get(transactionId);
}
......@@ -593,6 +603,7 @@ public class TransactionStore {
*/
void rollbackTo(Transaction t, long maxLogId, long toLogId) {
int transactionId = t.getId();
MVMap<Long, Object[]> undoLog = undoLogs[transactionId];
RollbackDecisionMaker decisionMaker = new RollbackDecisionMaker(this, transactionId, toLogId, t.listener);
for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
Long undoKey = getOperationId(transactionId, logId);
......@@ -612,6 +623,8 @@ public class TransactionStore {
*/
Iterator<Change> getChanges(final Transaction t, final long maxLogId,
final long toLogId) {
final MVMap<Long, Object[]> undoLog = undoLogs[t.getId()];
return new Iterator<Change>() {
private long logId = maxLogId - 1;
......@@ -626,8 +639,7 @@ public class TransactionStore {
if (op == null) {
// partially rolled back: load previous
undoKey = undoLog.floorKey(undoKey);
if (undoKey == null ||
getTransactionId(undoKey) != transactionId) {
if (undoKey == null || getTransactionId(undoKey) != transactionId) {
break;
}
logId = getLogId(undoKey);
......
......@@ -148,8 +148,13 @@ public class StringUtils {
* @return the Java representation
*/
public static String javaEncode(String s) {
StringBuilder buff = new StringBuilder(s.length());
javaEncode(s, buff);
return buff.toString();
}
public static void javaEncode(String s, StringBuilder buff) {
int length = s.length();
StringBuilder buff = new StringBuilder(length);
for (int i = 0; i < length; i++) {
char c = s.charAt(i);
switch (c) {
......@@ -202,7 +207,6 @@ public class StringUtils {
}
}
}
return buff.toString();
}
/**
......
......@@ -513,8 +513,10 @@ public class TestConcurrent extends TestMVStore {
Thread.sleep(1);
}
Exception e = task.getException();
if (e != null) {
assertEquals(DataUtils.ERROR_CLOSED,
DataUtils.getErrorCode(e.getMessage()));
}
} catch (IllegalStateException e) {
// sometimes storing works, in which case
// closing must fail
......
......@@ -151,7 +151,7 @@ public class TestStreamStore extends TestBase {
long readCount = s.getFileStore().getReadCount();
// the read count should be low because new blocks
// are appended at the end (not between existing blocks)
assertTrue("rc: " + readCount, readCount <= 17);
assertTrue("rc: " + readCount, readCount <= 20);
map = s.openMap("data");
assertTrue("size: " + map.size(), map.sizeAsLong() >= 200);
s.close();
......
......@@ -204,6 +204,7 @@ public class TestTransactionStore extends TestBase {
break;
}
}
task.get();
// we expect at least 10% the operations were successful
assertTrue(failCount.toString() + " >= " + (count * 0.9),
failCount.get() < count * 0.9);
......@@ -395,24 +396,22 @@ public class TestTransactionStore extends TestBase {
store.close();
s = MVStore.open(fileName);
// roll back a bit, until we have some undo log entries
assertTrue(s.hasMap("undoLog"));
for (int back = 0; back < 100; back++) {
int minus = r.nextInt(10);
s.rollbackTo(Math.max(0, s.getCurrentVersion() - minus));
MVMap<?, ?> undo = s.openMap("undoLog");
if (undo.size() > 0) {
if (hasDataUndoLog(s)) {
break;
}
}
// re-open the store, because we have opened
// the undoLog map with the wrong data type
// re-open TransactionStore, because we rolled back
// underlying MVStore without rolling back TranactionStore
s.close();
s = MVStore.open(fileName);
ts = new TransactionStore(s);
List<Transaction> list = ts.getOpenTransactions();
if (list.size() != 0) {
tx = list.get(0);
if (tx.getStatus() == Transaction.STATUS_COMMITTING) {
if (tx.getStatus() == Transaction.STATUS_COMMITTED) {
i++;
}
}
......@@ -422,6 +421,15 @@ public class TestTransactionStore extends TestBase {
}
}
private boolean hasDataUndoLog(MVStore s) {
for (int i = 0; i < 255; i++) {
if(s.hasData(TransactionStore.getUndoLogName(true, 1))) {
return true;
}
}
return false;
}
private void testGetModifiedMaps() {
MVStore s = MVStore.open(null);
TransactionStore ts = new TransactionStore(s);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论