提交 376edeb3 authored 作者: Thomas Mueller's avatar Thomas Mueller

TransactionStore: new persisted transaction state "committing".

上级 be3e14bf
...@@ -648,7 +648,7 @@ public class Database implements DataHandler { ...@@ -648,7 +648,7 @@ public class Database implements DataHandler {
rec.execute(this, systemSession, eventListener); rec.execute(this, systemSession, eventListener);
} }
if (mvStore != null) { if (mvStore != null) {
mvStore.rollback(); mvStore.initTransactions();
} }
recompileInvalidViews(systemSession); recompileInvalidViews(systemSession);
starting = false; starting = false;
......
...@@ -51,7 +51,7 @@ TransactionStore: ...@@ -51,7 +51,7 @@ TransactionStore:
- support reading the undo log - support reading the undo log
MVStore: MVStore:
- rolling docs review: at convert "Features" to top-level (linked) entries - rolling docs review: at "Features"
- additional test async write / read algorithm for speed and errors - additional test async write / read algorithm for speed and errors
- move setters to the builder, except for setRetainVersion, setReuseSpace, - move setters to the builder, except for setRetainVersion, setReuseSpace,
and settings that are persistent (setStoreVersion) and settings that are persistent (setStoreVersion)
...@@ -61,7 +61,7 @@ MVStore: ...@@ -61,7 +61,7 @@ MVStore:
- maybe split database into multiple files, to speed up compact - maybe split database into multiple files, to speed up compact
- auto-compact from time to time and on close - auto-compact from time to time and on close
- test and possibly improve compact operation (for large dbs) - test and possibly improve compact operation (for large dbs)
- possibly split chunk data into immutable and mutable - possibly split chunk metadata into immutable and mutable
- compact: avoid processing pages using a counting bloom filter - compact: avoid processing pages using a counting bloom filter
- defragment (re-creating maps, specially those with small pages) - defragment (re-creating maps, specially those with small pages)
- chunk header: store changed chunk data as row; maybe after the root - chunk header: store changed chunk data as row; maybe after the root
...@@ -69,8 +69,8 @@ MVStore: ...@@ -69,8 +69,8 @@ MVStore:
- is there a better name for the file header, - is there a better name for the file header,
-- if it's no longer always at the beginning of a file? store header? -- if it's no longer always at the beginning of a file? store header?
- on insert, if the child page is already full, don't load and modify it - on insert, if the child page is already full, don't load and modify it
-- split directly (for leaves with 1 entry) -- split directly (specially for leaves with one large entry)
- maybe let a chunk point to possible next chunks - maybe let a chunk point to a list of potential next chunks
-- (so no fixed location header is needed) -- (so no fixed location header is needed)
- support stores that span multiple files (chunks stored in other files) - support stores that span multiple files (chunks stored in other files)
- triggers (can be implemented with a custom map) - triggers (can be implemented with a custom map)
...@@ -107,6 +107,7 @@ MVStore: ...@@ -107,6 +107,7 @@ MVStore:
- unit test for the FreeSpaceList; maybe find a simpler implementation - unit test for the FreeSpaceList; maybe find a simpler implementation
- support opening (existing) maps by id - support opening (existing) maps by id
- more consistent null handling (keys/values sometimes may be null) - more consistent null handling (keys/values sometimes may be null)
- logging mechanism, specially for operations in a background thread
*/ */
...@@ -959,6 +960,7 @@ public class MVStore { ...@@ -959,6 +960,7 @@ public class MVStore {
buff.position(0); buff.position(0);
fileWriteCount++; fileWriteCount++;
DataUtils.writeFully(file, filePos, buff); DataUtils.writeFully(file, filePos, buff);
fileSize = Math.max(fileSize, filePos + buff.position()); fileSize = Math.max(fileSize, filePos + buff.position());
if (buff.capacity() <= 4 * 1024 * 1024) { if (buff.capacity() <= 4 * 1024 * 1024) {
writeBuffer = buff; writeBuffer = buff;
...@@ -1846,7 +1848,13 @@ public class MVStore { ...@@ -1846,7 +1848,13 @@ public class MVStore {
// ignore // ignore
} }
} }
store.storeInBackground(); try {
store.storeInBackground();
} catch (Exception e) {
int todo;
// TODO throw the exception in the main thread
// at some point, or log the problem
}
} }
} }
......
...@@ -191,12 +191,15 @@ public class MVTableEngine implements TableEngine { ...@@ -191,12 +191,15 @@ public class MVTableEngine implements TableEngine {
} }
/** /**
* Rollback all open transactions. * Commit all transactions that are in the committing state, and
* rollback all open transactions.
*/ */
public void rollback() { public void initTransactions() {
List<Transaction> list = transactionStore.getOpenTransactions(); List<Transaction> list = transactionStore.getOpenTransactions();
for (Transaction t : list) { for (Transaction t : list) {
if (t.getStatus() != Transaction.STATUS_PREPARED) { if (t.getStatus() == Transaction.STATUS_COMITTING) {
t.commit();
} else if (t.getStatus() != Transaction.STATUS_PREPARED) {
t.rollback(); t.rollback();
} }
} }
......
...@@ -44,6 +44,8 @@ public class TransactionStore { ...@@ -44,6 +44,8 @@ public class TransactionStore {
/** /**
* The undo log. * The undo log.
* If the first entry for a transaction doesn't have a logId of 0, then
* the transaction is committing (partially committed).
* Key: [ transactionId, logId ], value: [ opType, mapId, key, oldValue ]. * Key: [ transactionId, logId ], value: [ opType, mapId, key, oldValue ].
*/ */
final MVMap<long[], Object[]> undoLog; final MVMap<long[], Object[]> undoLog;
...@@ -141,7 +143,12 @@ public class TransactionStore { ...@@ -141,7 +143,12 @@ public class TransactionStore {
int status; int status;
String name; String name;
if (data == null) { if (data == null) {
status = Transaction.STATUS_OPEN; key[1] = 0;
if (undoLog.containsKey(key)) {
status = Transaction.STATUS_OPEN;
} else {
status = Transaction.STATUS_COMITTING;
}
name = null; name = null;
} else { } else {
status = (Integer) data[0]; status = (Integer) data[0];
...@@ -232,12 +239,18 @@ public class TransactionStore { ...@@ -232,12 +239,18 @@ public class TransactionStore {
return; return;
} }
synchronized (undoLog) { synchronized (undoLog) {
t.setStatus(Transaction.STATUS_COMITTING);
for (long logId = 0; logId < maxLogId; logId++) { for (long logId = 0; logId < maxLogId; logId++) {
commitIfNeeded(); commitIfNeeded();
long[] undoKey = new long[] { t.getId(), logId }; long[] undoKey = new long[] { t.getId(), logId };
Object[] op = undoLog.get(undoKey); Object[] op = undoLog.get(undoKey);
if (op == null) { if (op == null) {
int todoImprove; // partially committed: load next
undoKey = undoLog.ceilingKey(undoKey);
if (undoKey == null || undoKey[0] != t.getId()) {
break;
}
logId = undoKey[1] - 1;
continue; continue;
} }
int opType = (Integer) op[0]; int opType = (Integer) op[0];
...@@ -342,7 +355,12 @@ public class TransactionStore { ...@@ -342,7 +355,12 @@ public class TransactionStore {
long[] undoKey = new long[] { t.getId(), logId }; long[] undoKey = new long[] { t.getId(), logId };
Object[] op = undoLog.get(undoKey); Object[] op = undoLog.get(undoKey);
if (op == null) { if (op == null) {
int todoImprove; // partially rolled back: load previous
undoKey = undoLog.floorKey(undoKey);
if (undoKey == null || undoKey[0] != t.getId()) {
break;
}
logId = undoKey[1] + 1;
continue; continue;
} }
int mapId = ((Integer) op[1]).intValue(); int mapId = ((Integer) op[1]).intValue();
...@@ -385,11 +403,16 @@ public class TransactionStore { ...@@ -385,11 +403,16 @@ public class TransactionStore {
private void fetchNext() { private void fetchNext() {
synchronized (undoLog) { synchronized (undoLog) {
while (logId >= toLogId) { while (logId >= toLogId) {
Object[] op = undoLog.get(new long[] { long[] undoKey = new long[] { t.getId(), logId };
t.getId(), logId }); Object[] op = undoLog.get(undoKey);
logId--; logId--;
if (op == null) { if (op == null) {
int todoImprove; // partially rolled back: load previous
undoKey = undoLog.floorKey(undoKey);
if (undoKey == null || undoKey[0] != t.getId()) {
break;
}
logId = undoKey[1];
continue; continue;
} }
int mapId = ((Integer) op[1]).intValue(); int mapId = ((Integer) op[1]).intValue();
...@@ -460,20 +483,28 @@ public class TransactionStore { ...@@ -460,20 +483,28 @@ public class TransactionStore {
*/ */
public static class Transaction { public static class Transaction {
/**
* The status of a closed transaction (committed or rolled back).
*/
public static final int STATUS_CLOSED = 0;
/** /**
* The status of an open transaction. * The status of an open transaction.
*/ */
public static final int STATUS_OPEN = 0; public static final int STATUS_OPEN = 1;
/** /**
* The status of a prepared transaction. * The status of a prepared transaction.
*/ */
public static final int STATUS_PREPARED = 1; public static final int STATUS_PREPARED = 2;
/** /**
* The status of a closed transaction (committed or rolled back). * The status of a transaction that is being committed, but possibly not
* yet finished. A transactions can go into this state when the store is
* closed while the transaction is committing. When opening a store,
* such transactions should be committed.
*/ */
public static final int STATUS_CLOSED = 2; public static final int STATUS_COMITTING = 3;
/** /**
* The operation type for changes in a map. * The operation type for changes in a map.
......
...@@ -15,7 +15,9 @@ import java.util.ArrayList; ...@@ -15,7 +15,9 @@ import java.util.ArrayList;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore; import org.h2.mvstore.MVStore;
import org.h2.mvstore.db.TransactionStore; import org.h2.mvstore.db.TransactionStore;
import org.h2.mvstore.db.TransactionStore.Change; import org.h2.mvstore.db.TransactionStore.Change;
...@@ -24,6 +26,7 @@ import org.h2.mvstore.db.TransactionStore.TransactionMap; ...@@ -24,6 +26,7 @@ import org.h2.mvstore.db.TransactionStore.TransactionMap;
import org.h2.store.fs.FileUtils; import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase; import org.h2.test.TestBase;
import org.h2.util.New; import org.h2.util.New;
import org.h2.util.Task;
/** /**
* Test concurrent transactions. * Test concurrent transactions.
...@@ -42,6 +45,7 @@ public class TestTransactionStore extends TestBase { ...@@ -42,6 +45,7 @@ public class TestTransactionStore extends TestBase {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
testStopWhileCommitting();
testGetModifiedMaps(); testGetModifiedMaps();
testKeyIterator(); testKeyIterator();
testMultiStatement(); testMultiStatement();
...@@ -51,6 +55,70 @@ public class TestTransactionStore extends TestBase { ...@@ -51,6 +55,70 @@ public class TestTransactionStore extends TestBase {
testSingleConnection(); testSingleConnection();
testCompareWithPostgreSQL(); testCompareWithPostgreSQL();
} }
private void testStopWhileCommitting() throws Exception {
String fileName = getBaseDir() + "/testStopWhileCommitting.h3";
FileUtils.delete(fileName);
for (int i = 0; i < 10;) {
MVStore s;
TransactionStore ts;
Transaction tx;
TransactionMap<Integer, String> m;
s = MVStore.open(fileName);
ts = new TransactionStore(s);
tx = ts.begin();
s.setReuseSpace(false);
m = tx.openMap("test");
final String value = "x" + i;
for (int j = 0; j < 1000; j++) {
m.put(j, value);
}
final AtomicInteger state = new AtomicInteger();
final MVStore store = s;
final MVMap<Integer, String> other = s.openMap("other");
Task task = new Task() {
@Override
public void call() throws Exception {
for (int i = 0; state.get() < Integer.MAX_VALUE; i++) {
state.set(i);
other.put(i, value);
store.store();
}
}
};
task.execute();
// wait for the task to start
while (state.get() < 1) {
Thread.yield();
}
// commit while writing in the task
tx.commit();
// stop writing
state.set(Integer.MAX_VALUE);
// wait for the task to stop
task.get();
store.close();
s = MVStore.open(fileName);
ts = new TransactionStore(s);
List<Transaction> list = ts.getOpenTransactions();
if (list.size() != 0) {
tx = list.get(0);
if (tx.getStatus() == Transaction.STATUS_COMITTING) {
i++;
}
}
s.close();
FileUtils.delete(fileName);
assertFalse(FileUtils.exists(fileName));
FileUtils.delete(fileName);
assertFalse(FileUtils.exists(fileName));
s.close();
FileUtils.delete(fileName);
}
}
private void testGetModifiedMaps() { private void testGetModifiedMaps() {
MVStore s = MVStore.open(null); MVStore s = MVStore.open(null);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论