提交 e67b6759 authored 作者: Thomas Mueller's avatar Thomas Mueller

Improved update performance, reduced memory and disk footprint

上级 051d3708
...@@ -27,9 +27,6 @@ import org.h2.util.New; ...@@ -27,9 +27,6 @@ import org.h2.util.New;
*/ */
public class TransactionStore { public class TransactionStore {
// TODO should not be hard-coded
private static final int MAX_UNSAVED_PAGES = 4 * 1024;
/** /**
* The store. * The store.
*/ */
...@@ -49,26 +46,26 @@ public class TransactionStore { ...@@ -49,26 +46,26 @@ public class TransactionStore {
* is not possible). Log entries are written before the data is changed * is not possible). Log entries are written before the data is changed
* (write-ahead). * (write-ahead).
* <p> * <p>
* Key: [ transactionId, logId ], value: [ opType, mapId, key, oldValue ]. * Key: [ opId ], value: [ mapId, key, oldValue ].
*/ */
final MVMap<long[], Object[]> undoLog; final MVMap<Long, Object[]> undoLog;
;
// TODO should be <long, Object[]> (operationId, oldValue)
// TODO probably opType is not needed
/** /**
* The lock timeout in milliseconds. 0 means timeout immediately. * The lock timeout in milliseconds. 0 means timeout immediately.
*/ */
long lockTimeout; long lockTimeout;
/**
* The map of maps.
*/
private HashMap<Integer, MVMap<Object, VersionedValue>> maps = New.hashMap();
private final DataType dataType; private final DataType dataType;
private int lastTransactionId; private int lastTransactionId;
private int maxTransactionId = 0xffff; private int maxTransactionId = 0xffff;
private HashMap<Integer, MVMap<Object, VersionedValue>> maps = New.hashMap();
/** /**
* Create a new transaction store. * Create a new transaction store.
* *
...@@ -89,15 +86,12 @@ public class TransactionStore { ...@@ -89,15 +86,12 @@ public class TransactionStore {
this.dataType = dataType; this.dataType = dataType;
preparedTransactions = store.openMap("openTransactions", preparedTransactions = store.openMap("openTransactions",
new MVMap.Builder<Integer, Object[]>()); new MVMap.Builder<Integer, Object[]>());
// TODO commit of larger transaction could be faster if we have one undo
// log per transaction, or a range delete operation for maps
VersionedValueType oldValueType = new VersionedValueType(dataType); VersionedValueType oldValueType = new VersionedValueType(dataType);
ArrayType undoLogValueType = new ArrayType(new DataType[]{ ArrayType undoLogValueType = new ArrayType(new DataType[]{
new ObjectDataType(), new ObjectDataType(), dataType, new ObjectDataType(), dataType, oldValueType
oldValueType
}); });
MVMap.Builder<long[], Object[]> builder = MVMap.Builder<Long, Object[]> builder =
new MVMap.Builder<long[], Object[]>(). new MVMap.Builder<Long, Object[]>().
valueType(undoLogValueType); valueType(undoLogValueType);
// TODO escape other map names, to avoid conflicts // TODO escape other map names, to avoid conflicts
undoLog = store.openMap("undoLog", builder); undoLog = store.openMap("undoLog", builder);
...@@ -115,7 +109,7 @@ public class TransactionStore { ...@@ -115,7 +109,7 @@ public class TransactionStore {
this.maxTransactionId = max; this.maxTransactionId = max;
} }
private static long getOperationId(int transactionId, long logId) { static long getOperationId(int transactionId, long logId) {
DataUtils.checkArgument(transactionId >= 0 && transactionId < (1 << 24), DataUtils.checkArgument(transactionId >= 0 && transactionId < (1 << 24),
"Transaction id out of range: {0}", transactionId); "Transaction id out of range: {0}", transactionId);
DataUtils.checkArgument(logId >= 0 && logId < (1L << 40), DataUtils.checkArgument(logId >= 0 && logId < (1L << 40),
...@@ -123,19 +117,19 @@ public class TransactionStore { ...@@ -123,19 +117,19 @@ public class TransactionStore {
return ((long) transactionId << 40) | logId; return ((long) transactionId << 40) | logId;
} }
private static int getTransactionId(long operationId) { static int getTransactionId(long operationId) {
return (int) (operationId >>> 40); return (int) (operationId >>> 40);
} }
private static long getLogId(long operationId) { static long getLogId(long operationId) {
return operationId & ((1L << 40) - 1); return operationId & ((1L << 40) - 1);
} }
private synchronized void init() { private synchronized void init() {
synchronized (undoLog) { synchronized (undoLog) {
if (undoLog.size() > 0) { if (undoLog.size() > 0) {
long[] key = undoLog.firstKey(); Long key = undoLog.firstKey();
lastTransactionId = (int) key[0]; lastTransactionId = getTransactionId(key);
} }
} }
} }
...@@ -148,18 +142,16 @@ public class TransactionStore { ...@@ -148,18 +142,16 @@ public class TransactionStore {
public List<Transaction> getOpenTransactions() { public List<Transaction> getOpenTransactions() {
synchronized (undoLog) { synchronized (undoLog) {
ArrayList<Transaction> list = New.arrayList(); ArrayList<Transaction> list = New.arrayList();
long[] key = undoLog.firstKey(); Long key = undoLog.firstKey();
while (key != null) { while (key != null) {
int transactionId = (int) key[0]; int transactionId = getTransactionId(key);
long[] end = { transactionId, Long.MAX_VALUE }; key = undoLog.lowerKey(getOperationId(transactionId + 1, 0));
key = undoLog.floorKey(end); long logId = getLogId(key) + 1;
long logId = key[1] + 1;
Object[] data = preparedTransactions.get(transactionId); Object[] data = preparedTransactions.get(transactionId);
int status; int status;
String name; String name;
if (data == null) { if (data == null) {
key = new long[] { key[0], 0 }; if (undoLog.containsKey(getOperationId(transactionId, 0))) {
if (undoLog.containsKey(key)) {
status = Transaction.STATUS_OPEN; status = Transaction.STATUS_OPEN;
} else { } else {
status = Transaction.STATUS_COMMITTING; status = Transaction.STATUS_COMMITTING;
...@@ -171,7 +163,7 @@ public class TransactionStore { ...@@ -171,7 +163,7 @@ public class TransactionStore {
} }
Transaction t = new Transaction(this, transactionId, status, name, logId); Transaction t = new Transaction(this, transactionId, status, name, logId);
list.add(t); list.add(t);
key = undoLog.higherKey(end); key = undoLog.ceilingKey(getOperationId(transactionId + 1, 0));
} }
return list; return list;
} }
...@@ -198,12 +190,6 @@ public class TransactionStore { ...@@ -198,12 +190,6 @@ public class TransactionStore {
return new Transaction(this, transactionId, status, null, 0); return new Transaction(this, transactionId, status, null, 0);
} }
private void commitIfNeeded() {
if (store.getUnsavedPageCount() > MAX_UNSAVED_PAGES) {
store.commit();
}
}
/** /**
* Store a transaction. * Store a transaction.
* *
...@@ -221,16 +207,14 @@ public class TransactionStore { ...@@ -221,16 +207,14 @@ public class TransactionStore {
* *
* @param t the transaction * @param t the transaction
* @param logId the log id * @param logId the log id
* @param opType the operation type
* @param mapId the map id * @param mapId the map id
* @param key the key * @param key the key
* @param oldValue the old value * @param oldValue the old value
*/ */
void log(Transaction t, long logId, int opType, int mapId, void log(Transaction t, long logId, int mapId,
Object key, Object oldValue) { Object key, Object oldValue) {
commitIfNeeded(); Long undoKey = getOperationId(t.getId(), logId);
long[] undoKey = { t.getId(), logId }; Object[] log = new Object[] { mapId, key, oldValue };
Object[] log = new Object[] { opType, mapId, key, oldValue };
synchronized (undoLog) { synchronized (undoLog) {
if (logId == 0) { if (logId == 0) {
if (undoLog.containsKey(undoKey)) { if (undoLog.containsKey(undoKey)) {
...@@ -275,25 +259,23 @@ public class TransactionStore { ...@@ -275,25 +259,23 @@ public class TransactionStore {
synchronized (undoLog) { synchronized (undoLog) {
t.setStatus(Transaction.STATUS_COMMITTING); t.setStatus(Transaction.STATUS_COMMITTING);
for (long logId = 0; logId < maxLogId; logId++) { for (long logId = 0; logId < maxLogId; logId++) {
commitIfNeeded(); Long undoKey = getOperationId(t.getId(), logId);
long[] undoKey = new long[] { t.getId(), logId };
Object[] op = undoLog.get(undoKey); Object[] op = undoLog.get(undoKey);
if (op == null) { if (op == null) {
// partially committed: load next // partially committed: load next
undoKey = undoLog.ceilingKey(undoKey); undoKey = undoLog.ceilingKey(undoKey);
if (undoKey == null || undoKey[0] != t.getId()) { if (undoKey == null || getTransactionId(undoKey) != t.getId()) {
break; break;
} }
logId = undoKey[1] - 1; logId = getLogId(undoKey) - 1;
continue; continue;
} }
int mapId = (Integer) op[0];
;
// TODO undoLog: do we need the opType?
int mapId = (Integer) op[1];
MVMap<Object, VersionedValue> map = openMap(mapId); MVMap<Object, VersionedValue> map = openMap(mapId);
Object key = op[2]; if (map == null) {
// map was later removed
} else {
Object key = op[1];
VersionedValue value = map.get(key); VersionedValue value = map.get(key);
if (value == null) { if (value == null) {
// nothing to do // nothing to do
...@@ -305,13 +287,14 @@ public class TransactionStore { ...@@ -305,13 +287,14 @@ public class TransactionStore {
v2.value = value.value; v2.value = value.value;
map.put(key, v2); map.put(key, v2);
} }
}
undoLog.remove(undoKey); undoLog.remove(undoKey);
} }
} }
endTransaction(t); endTransaction(t);
} }
private synchronized MVMap<Object, VersionedValue> openMap(int mapId) { synchronized MVMap<Object, VersionedValue> openMap(int mapId) {
MVMap<Object, VersionedValue> map = maps.get(mapId); MVMap<Object, VersionedValue> map = maps.get(mapId);
if (map != null) { if (map != null) {
return map; return map;
...@@ -340,14 +323,8 @@ public class TransactionStore { ...@@ -340,14 +323,8 @@ public class TransactionStore {
* @param transactionId the transaction id * @param transactionId the transaction id
* @return true if it is open * @return true if it is open
*/ */
boolean isTransactionOpen(long transactionId) { boolean isTransactionOpen(int transactionId) {
; return transactionId != 0;
// TODO probably not needed at all
synchronized (undoLog) {
long[] key = { transactionId, 0 };
key = undoLog.ceilingKey(key);
return key != null && key[0] == transactionId;
}
} }
/** /**
...@@ -387,23 +364,22 @@ public class TransactionStore { ...@@ -387,23 +364,22 @@ public class TransactionStore {
void rollbackTo(Transaction t, long maxLogId, long toLogId) { void rollbackTo(Transaction t, long maxLogId, long toLogId) {
synchronized (undoLog) { synchronized (undoLog) {
for (long logId = maxLogId - 1; logId >= toLogId; logId--) { for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
commitIfNeeded(); Long undoKey = getOperationId(t.getId(), logId);
long[] undoKey = new long[] { t.getId(), logId };
Object[] op = undoLog.get(undoKey); Object[] op = undoLog.get(undoKey);
if (op == null) { if (op == null) {
// partially rolled back: load previous // partially rolled back: load previous
undoKey = undoLog.floorKey(undoKey); undoKey = undoLog.floorKey(undoKey);
if (undoKey == null || undoKey[0] != t.getId()) { if (undoKey == null || getTransactionId(undoKey) != t.getId()) {
break; break;
} }
logId = undoKey[1] + 1; logId = getLogId(undoKey) + 1;
continue; continue;
} }
int mapId = ((Integer) op[1]).intValue(); int mapId = ((Integer) op[0]).intValue();
MVMap<Object, VersionedValue> map = openMap(mapId); MVMap<Object, VersionedValue> map = openMap(mapId);
if (map != null) { if (map != null) {
Object key = op[2]; Object key = op[1];
VersionedValue oldValue = (VersionedValue) op[3]; VersionedValue oldValue = (VersionedValue) op[2];
if (oldValue == null) { if (oldValue == null) {
// this transaction added the value // this transaction added the value
map.remove(key); map.remove(key);
...@@ -439,29 +415,27 @@ public class TransactionStore { ...@@ -439,29 +415,27 @@ public class TransactionStore {
private void fetchNext() { private void fetchNext() {
synchronized (undoLog) { synchronized (undoLog) {
while (logId >= toLogId) { while (logId >= toLogId) {
long[] undoKey = new long[] { t.getId(), logId }; Long undoKey = getOperationId(t.getId(), logId);
Object[] op = undoLog.get(undoKey); Object[] op = undoLog.get(undoKey);
logId--; logId--;
if (op == null) { if (op == null) {
// partially rolled back: load previous // partially rolled back: load previous
undoKey = undoLog.floorKey(undoKey); undoKey = undoLog.floorKey(undoKey);
if (undoKey == null || undoKey[0] != t.getId()) { if (undoKey == null || getTransactionId(undoKey) != t.getId()) {
break; break;
} }
logId = undoKey[1]; logId = getLogId(undoKey);
continue; continue;
} }
int mapId = ((Integer) op[1]).intValue(); int mapId = ((Integer) op[0]).intValue();
// TODO open map by id if possible MVMap<Object, VersionedValue> m = openMap(mapId);
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
if (m == null) { if (m == null) {
// map was removed later on // map was removed later on
} else { } else {
current = new Change(); current = new Change();
current.mapName = DataUtils.parseMap(m).get("name"); current.mapName = m.getName();
current.key = op[2]; current.key = op[1];
VersionedValue oldValue = (VersionedValue) op[3]; VersionedValue oldValue = (VersionedValue) op[2];
current.value = oldValue == null ? null : oldValue.value; current.value = oldValue == null ? null : oldValue.value;
return; return;
} }
...@@ -542,11 +516,6 @@ public class TransactionStore { ...@@ -542,11 +516,6 @@ public class TransactionStore {
*/ */
public static final int STATUS_COMMITTING = 3; public static final int STATUS_COMMITTING = 3;
/**
* The operation type for changes in a map.
*/
static final int OP_REMOVE = 0, OP_ADD = 1, OP_SET = 2;
/** /**
* The transaction store. * The transaction store.
*/ */
...@@ -609,13 +578,12 @@ public class TransactionStore { ...@@ -609,13 +578,12 @@ public class TransactionStore {
/** /**
* Add a log entry. * Add a log entry.
* *
* @param opType the operation type
* @param mapId the map id * @param mapId the map id
* @param key the key * @param key the key
* @param oldValue the old value * @param oldValue the old value
*/ */
void log(int opType, int mapId, Object key, Object oldValue) { void log(int mapId, Object key, Object oldValue) {
store.log(this, logId, opType, mapId, key, oldValue); store.log(this, logId, mapId, key, oldValue);
// only increment the log id if logging was successful // only increment the log id if logging was successful
logId++; logId++;
} }
...@@ -942,7 +910,7 @@ public class TransactionStore { ...@@ -942,7 +910,7 @@ public class TransactionStore {
if (onlyIfUnchanged) { if (onlyIfUnchanged) {
VersionedValue old = getValue(key, readLogId); VersionedValue old = getValue(key, readLogId);
if (!map.areValuesEqual(old, current)) { if (!map.areValuesEqual(old, current)) {
long tx = current.transactionId; long tx = getTransactionId(current.operationId);
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
if (value == null) { if (value == null) {
// ignore removing an entry // ignore removing an entry
...@@ -960,28 +928,13 @@ public class TransactionStore { ...@@ -960,28 +928,13 @@ public class TransactionStore {
} }
} }
} }
int opType;
if (current == null || current.value == null) {
if (value == null) {
// remove a removed value
opType = Transaction.OP_SET;
} else {
opType = Transaction.OP_ADD;
}
} else {
if (value == null) {
opType = Transaction.OP_REMOVE;
} else {
opType = Transaction.OP_SET;
}
}
VersionedValue newValue = new VersionedValue(); VersionedValue newValue = new VersionedValue();
newValue.transactionId = transaction.transactionId; newValue.operationId = getOperationId(
newValue.logId = transaction.logId; transaction.transactionId, transaction.logId);
newValue.value = value; newValue.value = value;
if (current == null) { if (current == null) {
// a new value // a new value
transaction.log(opType, mapId, key, current); transaction.log(mapId, key, current);
VersionedValue old = map.putIfAbsent(key, newValue); VersionedValue old = map.putIfAbsent(key, newValue);
if (old != null) { if (old != null) {
transaction.logUndo(); transaction.logUndo();
...@@ -989,10 +942,10 @@ public class TransactionStore { ...@@ -989,10 +942,10 @@ public class TransactionStore {
} }
return true; return true;
} }
long tx = current.transactionId; int tx = getTransactionId(current.operationId);
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
// added or updated by this transaction // added or updated by this transaction
transaction.log(opType, mapId, key, current); transaction.log(mapId, key, current);
if (!map.replace(key, current, newValue)) { if (!map.replace(key, current, newValue)) {
// strange, somebody overwrite the value // strange, somebody overwrite the value
// even thought the change was not committed // even thought the change was not committed
...@@ -1004,7 +957,7 @@ public class TransactionStore { ...@@ -1004,7 +957,7 @@ public class TransactionStore {
// added or updated by another transaction // added or updated by another transaction
boolean open = transaction.store.isTransactionOpen(tx); boolean open = transaction.store.isTransactionOpen(tx);
if (!open) { if (!open) {
transaction.log(opType, mapId, key, current); transaction.log(mapId, key, current);
// the transaction is committed: // the transaction is committed:
// overwrite the value // overwrite the value
if (!map.replace(key, current, newValue)) { if (!map.replace(key, current, newValue)) {
...@@ -1074,20 +1027,21 @@ public class TransactionStore { ...@@ -1074,20 +1027,21 @@ public class TransactionStore {
// doesn't exist or deleted by a committed transaction // doesn't exist or deleted by a committed transaction
return false; return false;
} }
long tx = data.transactionId; int tx = getTransactionId(data.operationId);
return tx == transaction.transactionId; return tx == transaction.transactionId;
} }
private VersionedValue getValue(K key, long maxLog) { private VersionedValue getValue(K key, long maxLog) {
VersionedValue data = map.get(key); VersionedValue data = map.get(key);
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
long tx; int tx;
if (data == null) { if (data == null) {
// doesn't exist or deleted by a committed transaction // doesn't exist or deleted by a committed transaction
return null; return null;
} }
tx = data.transactionId; long id = data.operationId;
long logId = data.logId; tx = getTransactionId(id);
long logId = getLogId(id);
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
// added by this transaction // added by this transaction
if (logId < maxLog) { if (logId < maxLog) {
...@@ -1101,7 +1055,7 @@ public class TransactionStore { ...@@ -1101,7 +1055,7 @@ public class TransactionStore {
return data; return data;
} }
// get the value before the uncommitted transaction // get the value before the uncommitted transaction
long[] x = new long[] { tx, logId }; Long x = getOperationId(tx, logId);
Object[] d; Object[] d;
synchronized (transaction.store.undoLog) { synchronized (transaction.store.undoLog) {
d = transaction.store.undoLog.get(x); d = transaction.store.undoLog.get(x);
...@@ -1111,7 +1065,7 @@ public class TransactionStore { ...@@ -1111,7 +1065,7 @@ public class TransactionStore {
// in the meantime (the transaction might still be open) // in the meantime (the transaction might still be open)
data = map.get(key); data = map.get(key);
} else { } else {
data = (VersionedValue) d[3]; data = (VersionedValue) d[2];
} }
} }
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
...@@ -1306,14 +1260,9 @@ public class TransactionStore { ...@@ -1306,14 +1260,9 @@ public class TransactionStore {
static class VersionedValue { static class VersionedValue {
/** /**
* The transaction id. * The operation id.
*/ */
public long transactionId; public long operationId;
/**
* The log id.
*/
public long logId;
/** /**
* The value. * The value.
...@@ -1322,7 +1271,10 @@ public class TransactionStore { ...@@ -1322,7 +1271,10 @@ public class TransactionStore {
@Override @Override
public String toString() { public String toString() {
return "{" + transactionId + "/" + logId + "}: " + value; return value + (operationId == 0 ? "" : (
" " +
getTransactionId(operationId) + "/" +
getLogId(operationId)));
} }
} }
...@@ -1341,7 +1293,7 @@ public class TransactionStore { ...@@ -1341,7 +1293,7 @@ public class TransactionStore {
@Override @Override
public int getMemory(Object obj) { public int getMemory(Object obj) {
VersionedValue v = (VersionedValue) obj; VersionedValue v = (VersionedValue) obj;
return valueType.getMemory(v.value) + 16; return valueType.getMemory(v.value) + 8;
} }
@Override @Override
...@@ -1351,21 +1303,17 @@ public class TransactionStore { ...@@ -1351,21 +1303,17 @@ public class TransactionStore {
} }
VersionedValue a = (VersionedValue) aObj; VersionedValue a = (VersionedValue) aObj;
VersionedValue b = (VersionedValue) bObj; VersionedValue b = (VersionedValue) bObj;
long comp = a.transactionId - b.transactionId; long comp = a.operationId - b.operationId;
if (comp == 0) {
comp = a.logId - b.logId;
if (comp == 0) { if (comp == 0) {
return valueType.compare(a.value, b.value); return valueType.compare(a.value, b.value);
} }
}
return Long.signum(comp); return Long.signum(comp);
} }
@Override @Override
public void write(WriteBuffer buff, Object obj) { public void write(WriteBuffer buff, Object obj) {
VersionedValue v = (VersionedValue) obj; VersionedValue v = (VersionedValue) obj;
buff.putVarLong(v.transactionId); buff.putVarLong(v.operationId);
buff.putVarLong(v.logId);
if (v.value == null) { if (v.value == null) {
buff.put((byte) 0); buff.put((byte) 0);
} else { } else {
...@@ -1377,8 +1325,7 @@ public class TransactionStore { ...@@ -1377,8 +1325,7 @@ public class TransactionStore {
@Override @Override
public Object read(ByteBuffer buff) { public Object read(ByteBuffer buff) {
VersionedValue v = new VersionedValue(); VersionedValue v = new VersionedValue();
v.transactionId = DataUtils.readVarLong(buff); v.operationId = DataUtils.readVarLong(buff);
v.logId = DataUtils.readVarLong(buff);
if (buff.get() == 1) { if (buff.get() == 1) {
v.value = valueType.read(buff); v.value = valueType.read(buff);
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论