提交 03402e74 authored 作者: Thomas Mueller's avatar Thomas Mueller

Transaction store: re-use transaction ids, and they are not integer and no…

Transaction store: re-use transaction ids, and they are not integer and no longer long (work in progress)
上级 71888e43
...@@ -76,6 +76,11 @@ public class DataUtils { ...@@ -76,6 +76,11 @@ public class DataUtils {
*/ */
public static final int ERROR_TRANSACTION_LOCK_TIMEOUT = 101; public static final int ERROR_TRANSACTION_LOCK_TIMEOUT = 101;
/**
* A very old transaction is still open.
*/
public static final int ERROR_TRANSACTION_STILL_OPEN = 102;
/** /**
* The type for leaf page. * The type for leaf page.
*/ */
......
...@@ -27,8 +27,6 @@ import org.h2.util.New; ...@@ -27,8 +27,6 @@ import org.h2.util.New;
*/ */
public class TransactionStore { public class TransactionStore {
private static final String LAST_TRANSACTION_ID = "lastTransactionId";
// TODO should not be hard-coded // TODO should not be hard-coded
private static final int MAX_UNSAVED_PAGES = 4 * 1024; private static final int MAX_UNSAVED_PAGES = 4 * 1024;
...@@ -41,7 +39,7 @@ public class TransactionStore { ...@@ -41,7 +39,7 @@ public class TransactionStore {
* The persisted map of prepared transactions. * The persisted map of prepared transactions.
* Key: transactionId, value: [ status, name ]. * Key: transactionId, value: [ status, name ].
*/ */
final MVMap<Long, Object[]> preparedTransactions; final MVMap<Integer, Object[]> preparedTransactions;
/** /**
* The undo log. * The undo log.
...@@ -55,26 +53,19 @@ public class TransactionStore { ...@@ -55,26 +53,19 @@ public class TransactionStore {
*/ */
final MVMap<long[], Object[]> undoLog; final MVMap<long[], Object[]> undoLog;
; ;
// TODO should be <long, Object[]> // TODO should be <long, Object[]> (operationId, oldValue)
// TODO probably opType is not needed
/** /**
* The lock timeout in milliseconds. 0 means timeout immediately. * The lock timeout in milliseconds. 0 means timeout immediately.
*/ */
long lockTimeout; long lockTimeout;
/**
* The transaction settings. The entry "lastTransaction" contains the last
* transaction id.
*/
private final MVMap<String, String> settings;
private final DataType dataType; private final DataType dataType;
private long lastTransactionIdStored; private int lastTransactionId;
private long lastTransactionId; private int maxTransactionId = 0xffff;
private long firstOpenTransaction = -1;
private HashMap<Integer, MVMap<Object, VersionedValue>> maps = New.hashMap(); private HashMap<Integer, MVMap<Object, VersionedValue>> maps = New.hashMap();
...@@ -96,9 +87,8 @@ public class TransactionStore { ...@@ -96,9 +87,8 @@ public class TransactionStore {
public TransactionStore(MVStore store, DataType dataType) { public TransactionStore(MVStore store, DataType dataType) {
this.store = store; this.store = store;
this.dataType = dataType; this.dataType = dataType;
settings = store.openMap("settings");
preparedTransactions = store.openMap("openTransactions", preparedTransactions = store.openMap("openTransactions",
new MVMap.Builder<Long, Object[]>()); new MVMap.Builder<Integer, Object[]>());
// TODO commit of larger transaction could be faster if we have one undo // TODO commit of larger transaction could be faster if we have one undo
// log per transaction, or a range delete operation for maps // log per transaction, or a range delete operation for maps
VersionedValueType oldValueType = new VersionedValueType(dataType); VersionedValueType oldValueType = new VersionedValueType(dataType);
...@@ -113,21 +103,39 @@ public class TransactionStore { ...@@ -113,21 +103,39 @@ public class TransactionStore {
undoLog = store.openMap("undoLog", builder); undoLog = store.openMap("undoLog", builder);
init(); init();
} }
/**
* Set the maximum transaction id, after which ids are re-used. If the old
* transaction is still in use when re-using an old id, the new transaction
* fails.
*
* @param max the maximum id
*/
public void setMaxTransactionId(int max) {
this.maxTransactionId = max;
}
private static long getOperationId(int transactionId, long logId) {
DataUtils.checkArgument(transactionId >= 0 && transactionId < (1 << 24),
"Transaction id out of range: {0}", transactionId);
DataUtils.checkArgument(logId >= 0 && logId < (1L << 40),
"Transaction log id out of range: {0}", logId);
return ((long) transactionId << 40) | logId;
}
private static int getTransactionId(long operationId) {
return (int) (operationId >>> 40);
}
private static long getLogId(long operationId) {
return operationId & ((1L << 40) - 1);
}
private synchronized void init() { private synchronized void init() {
String s = settings.get(LAST_TRANSACTION_ID);
lastTransactionId = DataUtils.parseLong(s, 0);
lastTransactionIdStored = lastTransactionId;
Long lastKey = preparedTransactions.lastKey();
if (lastKey != null && lastKey.longValue() > lastTransactionId) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_CORRUPT,
"Last transaction not stored");
}
synchronized (undoLog) { synchronized (undoLog) {
if (undoLog.size() > 0) { if (undoLog.size() > 0) {
long[] key = undoLog.firstKey(); long[] key = undoLog.firstKey();
firstOpenTransaction = key[0]; lastTransactionId = (int) key[0];
} }
} }
} }
...@@ -142,7 +150,7 @@ public class TransactionStore { ...@@ -142,7 +150,7 @@ public class TransactionStore {
ArrayList<Transaction> list = New.arrayList(); ArrayList<Transaction> list = New.arrayList();
long[] key = undoLog.firstKey(); long[] key = undoLog.firstKey();
while (key != null) { while (key != null) {
long transactionId = key[0]; int transactionId = (int) key[0];
long[] end = { transactionId, Long.MAX_VALUE }; long[] end = { transactionId, Long.MAX_VALUE };
key = undoLog.floorKey(end); key = undoLog.floorKey(end);
long logId = key[1] + 1; long logId = key[1] + 1;
...@@ -173,8 +181,6 @@ public class TransactionStore { ...@@ -173,8 +181,6 @@ public class TransactionStore {
* Close the transaction store. * Close the transaction store.
*/ */
public synchronized void close() { public synchronized void close() {
// to avoid losing transaction ids
settings.put(LAST_TRANSACTION_ID, "" + lastTransactionId);
store.commit(); store.commit();
} }
...@@ -184,10 +190,9 @@ public class TransactionStore { ...@@ -184,10 +190,9 @@ public class TransactionStore {
* @return the transaction * @return the transaction
*/ */
public synchronized Transaction begin() { public synchronized Transaction begin() {
long transactionId = lastTransactionId++; int transactionId = ++lastTransactionId;
if (lastTransactionId > lastTransactionIdStored) { if (lastTransactionId >= maxTransactionId) {
lastTransactionIdStored += 64; lastTransactionId = 0;
settings.put(LAST_TRANSACTION_ID, "" + lastTransactionIdStored);
} }
int status = Transaction.STATUS_OPEN; int status = Transaction.STATUS_OPEN;
return new Transaction(this, transactionId, status, null, 0); return new Transaction(this, transactionId, status, null, 0);
...@@ -227,11 +232,16 @@ public class TransactionStore { ...@@ -227,11 +232,16 @@ public class TransactionStore {
long[] undoKey = { t.getId(), logId }; long[] undoKey = { t.getId(), logId };
Object[] log = new Object[] { opType, mapId, key, oldValue }; Object[] log = new Object[] { opType, mapId, key, oldValue };
synchronized (undoLog) { synchronized (undoLog) {
if (logId == 0) {
if (undoLog.containsKey(undoKey)) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_STILL_OPEN,
"An old transaction with the same id is still open: {0}",
t.getId());
}
}
undoLog.put(undoKey, log); undoLog.put(undoKey, log);
} }
if (firstOpenTransaction == -1 || t.getId() < firstOpenTransaction) {
firstOpenTransaction = t.getId();
}
} }
/** /**
...@@ -277,20 +287,23 @@ public class TransactionStore { ...@@ -277,20 +287,23 @@ public class TransactionStore {
logId = undoKey[1] - 1; logId = undoKey[1] - 1;
continue; continue;
} }
int opType = (Integer) op[0];
if (opType == Transaction.OP_REMOVE) { ;
int mapId = (Integer) op[1]; // TODO undoLog: do we need the opType?
MVMap<Object, VersionedValue> map = openMap(mapId);
Object key = op[2]; int mapId = (Integer) op[1];
VersionedValue value = map.get(key); MVMap<Object, VersionedValue> map = openMap(mapId);
// possibly the entry was added later on Object key = op[2];
// so we have to check VersionedValue value = map.get(key);
if (value == null) { if (value == null) {
// nothing to do // nothing to do
} else if (value.value == null) { } else if (value.value == null) {
// remove the value // remove the value
map.remove(key); map.remove(key);
} } else {
VersionedValue v2 = new VersionedValue();
v2.value = value.value;
map.put(key, v2);
} }
undoLog.remove(undoKey); undoLog.remove(undoKey);
} }
...@@ -328,26 +341,11 @@ public class TransactionStore { ...@@ -328,26 +341,11 @@ public class TransactionStore {
* @return true if it is open * @return true if it is open
*/ */
boolean isTransactionOpen(long transactionId) { boolean isTransactionOpen(long transactionId) {
if (transactionId < firstOpenTransaction) { ;
return false; // TODO probably not needed at all
}
synchronized (undoLog) { synchronized (undoLog) {
if (firstOpenTransaction == -1) { long[] key = { transactionId, 0 };
if (undoLog.size() == 0) { key = undoLog.ceilingKey(key);
return false;
}
long[] key = undoLog.firstKey();
if (key == null) {
// unusual, but can happen
return false;
}
firstOpenTransaction = key[0];
}
if (firstOpenTransaction == transactionId) {
return true;
}
long[] key = { transactionId, -1 };
key = undoLog.higherKey(key);
return key != null && key[0] == transactionId; return key != null && key[0] == transactionId;
} }
} }
...@@ -362,9 +360,6 @@ public class TransactionStore { ...@@ -362,9 +360,6 @@ public class TransactionStore {
preparedTransactions.remove(t.getId()); preparedTransactions.remove(t.getId());
} }
t.setStatus(Transaction.STATUS_CLOSED); t.setStatus(Transaction.STATUS_CLOSED);
if (t.getId() == firstOpenTransaction) {
firstOpenTransaction = -1;
}
if (store.getAutoCommitDelay() == 0) { if (store.getAutoCommitDelay() == 0) {
store.commit(); store.commit();
return; return;
...@@ -560,7 +555,7 @@ public class TransactionStore { ...@@ -560,7 +555,7 @@ public class TransactionStore {
/** /**
* The transaction id. * The transaction id.
*/ */
final long transactionId; final int transactionId;
/** /**
* The log id of the last entry in the undo log map. * The log id of the last entry in the undo log map.
...@@ -571,7 +566,7 @@ public class TransactionStore { ...@@ -571,7 +566,7 @@ public class TransactionStore {
private String name; private String name;
Transaction(TransactionStore store, long transactionId, int status, String name, long logId) { Transaction(TransactionStore store, int transactionId, int status, String name, long logId) {
this.store = store; this.store = store;
this.transactionId = transactionId; this.transactionId = transactionId;
this.status = status; this.status = status;
...@@ -579,7 +574,7 @@ public class TransactionStore { ...@@ -579,7 +574,7 @@ public class TransactionStore {
this.logId = logId; this.logId = logId;
} }
public long getId() { public int getId() {
return transactionId; return transactionId;
} }
...@@ -620,7 +615,9 @@ public class TransactionStore { ...@@ -620,7 +615,9 @@ public class TransactionStore {
* @param oldValue the old value * @param oldValue the old value
*/ */
void log(int opType, int mapId, Object key, Object oldValue) { void log(int opType, int mapId, Object key, Object oldValue) {
store.log(this, logId++, opType, mapId, key, oldValue); store.log(this, logId, opType, mapId, key, oldValue);
// only increment the log id if logging was successful
logId++;
} }
/** /**
......
...@@ -45,6 +45,7 @@ public class TestTransactionStore extends TestBase { ...@@ -45,6 +45,7 @@ public class TestTransactionStore extends TestBase {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
testTransactionAge();
testStopWhileCommitting(); testStopWhileCommitting();
testGetModifiedMaps(); testGetModifiedMaps();
testKeyIterator(); testKeyIterator();
...@@ -55,6 +56,46 @@ public class TestTransactionStore extends TestBase { ...@@ -55,6 +56,46 @@ public class TestTransactionStore extends TestBase {
testSingleConnection(); testSingleConnection();
testCompareWithPostgreSQL(); testCompareWithPostgreSQL();
} }
private void testTransactionAge() throws Exception {
MVStore s;
TransactionStore ts;
s = MVStore.open(null);
ts = new TransactionStore(s);
ts.setMaxTransactionId(16);
for (int i = 0, j = 1; i < 64; i++) {
Transaction t = ts.begin();
assertEquals(j, t.getId());
t.commit();
j++;
if (j > 16) {
j = 1;
}
}
s = MVStore.open(null);
ts = new TransactionStore(s);
ts.setMaxTransactionId(16);
ArrayList<Transaction> fifo = New.arrayList();
int open = 0;
for (int i = 0; i < 64; i++) {
Transaction t = ts.begin();
if (open >= 16) {
try {
t.openMap("data").put(i, i);
fail();
} catch (IllegalStateException e) {
// expected - too many open
}
Transaction first = fifo.remove(0);
first.commit();
open--;
}
fifo.add(t);
open++;
t.openMap("data").put(i, i);
}
s.close();
}
private void testStopWhileCommitting() throws Exception { private void testStopWhileCommitting() throws Exception {
String fileName = getBaseDir() + "/testStopWhileCommitting.h3"; String fileName = getBaseDir() + "/testStopWhileCommitting.h3";
...@@ -342,7 +383,7 @@ public class TestTransactionStore extends TestBase { ...@@ -342,7 +383,7 @@ public class TestTransactionStore extends TestBase {
assertEquals(null, tx.getName()); assertEquals(null, tx.getName());
tx.setName("first transaction"); tx.setName("first transaction");
assertEquals("first transaction", tx.getName()); assertEquals("first transaction", tx.getName());
assertEquals(0, tx.getId()); assertEquals(1, tx.getId());
assertEquals(Transaction.STATUS_OPEN, tx.getStatus()); assertEquals(Transaction.STATUS_OPEN, tx.getStatus());
m = tx.openMap("test"); m = tx.openMap("test");
m.put("1", "Hello"); m.put("1", "Hello");
...@@ -350,6 +391,7 @@ public class TestTransactionStore extends TestBase { ...@@ -350,6 +391,7 @@ public class TestTransactionStore extends TestBase {
assertEquals(1, list.size()); assertEquals(1, list.size());
txOld = list.get(0); txOld = list.get(0);
assertTrue(tx.getId() == txOld.getId()); assertTrue(tx.getId() == txOld.getId());
assertEquals("first transaction", txOld.getName());
s.commit(); s.commit();
ts.close(); ts.close();
s.close(); s.close();
...@@ -357,14 +399,14 @@ public class TestTransactionStore extends TestBase { ...@@ -357,14 +399,14 @@ public class TestTransactionStore extends TestBase {
s = MVStore.open(fileName); s = MVStore.open(fileName);
ts = new TransactionStore(s); ts = new TransactionStore(s);
tx = ts.begin(); tx = ts.begin();
assertEquals(1, tx.getId()); assertEquals(2, tx.getId());
m = tx.openMap("test"); m = tx.openMap("test");
assertEquals(null, m.get("1")); assertEquals(null, m.get("1"));
m.put("2", "Hello"); m.put("2", "Hello");
list = ts.getOpenTransactions(); list = ts.getOpenTransactions();
assertEquals(2, list.size()); assertEquals(2, list.size());
txOld = list.get(0); txOld = list.get(0);
assertEquals(0, txOld.getId()); assertEquals(1, txOld.getId());
assertEquals(Transaction.STATUS_OPEN, txOld.getStatus()); assertEquals(Transaction.STATUS_OPEN, txOld.getStatus());
assertEquals("first transaction", txOld.getName()); assertEquals("first transaction", txOld.getName());
txOld.prepare(); txOld.prepare();
...@@ -376,17 +418,16 @@ public class TestTransactionStore extends TestBase { ...@@ -376,17 +418,16 @@ public class TestTransactionStore extends TestBase {
ts = new TransactionStore(s); ts = new TransactionStore(s);
tx = ts.begin(); tx = ts.begin();
m = tx.openMap("test"); m = tx.openMap("test");
// TransactionStore was not closed, so we lost some ids assertEquals(2, tx.getId());
assertEquals(65, tx.getId());
list = ts.getOpenTransactions(); list = ts.getOpenTransactions();
assertEquals(2, list.size()); assertEquals(2, list.size());
txOld = list.get(1); txOld = list.get(1);
assertEquals(1, txOld.getId()); assertEquals(2, txOld.getId());
assertEquals(Transaction.STATUS_OPEN, txOld.getStatus()); assertEquals(Transaction.STATUS_OPEN, txOld.getStatus());
assertEquals(null, txOld.getName()); assertEquals(null, txOld.getName());
txOld.rollback(); txOld.rollback();
txOld = list.get(0); txOld = list.get(0);
assertEquals(0, txOld.getId()); assertEquals(1, txOld.getId());
assertEquals(Transaction.STATUS_PREPARED, txOld.getStatus()); assertEquals(Transaction.STATUS_PREPARED, txOld.getStatus());
assertEquals("first transaction", txOld.getName()); assertEquals("first transaction", txOld.getName());
txOld.commit(); txOld.commit();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论