提交 e3695afe authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore table engine (reduced memory usage and improved performance)

上级 47da2c44
......@@ -18,7 +18,8 @@ Change Log
<h1>Change Log</h1>
<h2>Next Version (unreleased)</h2>
<ul><li>The optimization for IN(...) queries combined with OR could result
<ul><li>The auto-analyze feature now only reads 1000 rows per table instead of 10000.
</li><li>The optimization for IN(...) queries combined with OR could result
in a strange exception of the type "column x must be included in the group by list".
</li><li>Issue 454: Use Charset for type-safety.
</li><li>Queries with both LIMIT and OFFSET could throw an IllegalArgumentException.
......
......@@ -103,7 +103,7 @@ public class MVPrimaryIndex extends BaseIndex {
row.setKey(++lastKey);
}
} else {
Long c = row.getValue(mainIndexColumn).getLong();
long c = row.getValue(mainIndexColumn).getLong();
row.setKey(c);
}
......
......@@ -554,7 +554,7 @@ public class MVTable extends TableBase {
if (n > 0) {
nextAnalyze = n;
}
int rows = session.getDatabase().getSettings().analyzeSample;
int rows = session.getDatabase().getSettings().analyzeSample / 10;
Analyze.analyzeTable(session, this, rows, false);
}
......
......@@ -8,7 +8,6 @@ package org.h2.mvstore.db;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -29,6 +28,7 @@ public class TransactionStore {
private static final String LAST_TRANSACTION_ID = "lastTransactionId";
// TODO should not be hardcoded
private static final int MAX_UNSAVED_PAGES = 4 * 1024;
/**
......@@ -37,16 +37,10 @@ public class TransactionStore {
final MVStore store;
/**
* The persisted map of open transaction.
* The persisted map of prepared transactions.
* Key: transactionId, value: [ status, name ].
*/
final MVMap<Long, Object[]> openTransactions;
/**
* The map of open transaction objects.
* Key: transactionId, value: transaction object.
*/
final HashMap<Long, Transaction> openTransactionMap = New.hashMap();
final MVMap<Long, Object[]> preparedTransactions;
/**
* The undo log.
......@@ -60,7 +54,8 @@ public class TransactionStore {
long lockTimeout;
/**
* The transaction settings. "lastTransaction" the last transaction id.
* The transaction settings. The entry "lastTransaction" contains the last
* transaction id.
*/
private final MVMap<String, String> settings;
......@@ -86,14 +81,11 @@ public class TransactionStore {
public TransactionStore(MVStore store, DataType keyType) {
this.store = store;
settings = store.openMap("settings");
openTransactions = store.openMap("openTransactions",
preparedTransactions = store.openMap("openTransactions",
new MVMap.Builder<Long, Object[]>());
// commit could be faster if we have one undo log per transaction,
// or a range delete operation for maps
ArrayType oldValueType = new ArrayType(new DataType[]{
new ObjectDataType(), new ObjectDataType(),
keyType
});
// TODO commit of larger transaction could be faster if we have one undo
// log per transaction, or a range delete operation for maps
VersionedValueType oldValueType = new VersionedValueType(keyType);
ArrayType valueType = new ArrayType(new DataType[]{
new ObjectDataType(), new ObjectDataType(), keyType,
oldValueType
......@@ -112,36 +104,39 @@ public class TransactionStore {
lastTransactionId = Long.parseLong(s);
lastTransactionIdStored = lastTransactionId;
}
Long lastKey = openTransactions.lastKey();
Long lastKey = preparedTransactions.lastKey();
if (lastKey != null && lastKey.longValue() > lastTransactionId) {
throw DataUtils.newIllegalStateException("Last transaction not stored");
}
Cursor<Long> cursor = openTransactions.keyIterator(null);
while (cursor.hasNext()) {
long id = cursor.next();
Object[] data = openTransactions.get(id);
int status = (Integer) data[0];
String name = (String) data[1];
long[] next = { id + 1, -1 };
long[] last = undoLog.floorKey(next);
if (last == null) {
// no entry
} else if (last[0] == id) {
Transaction t = new Transaction(this, id, status, name, last[1]);
t.setStored(true);
openTransactionMap.put(id, t);
}
}
}
/**
* Get the list of currently open transactions that have pending writes.
* Get the list of unclosed transactions that have pending writes.
*
* @return the list of transactions
* @return the list of transactions (sorted by id)
*/
public synchronized List<Transaction> getOpenTransactions() {
ArrayList<Transaction> list = New.arrayList();
list.addAll(openTransactionMap.values());
long[] key = undoLog.firstKey();
while (key != null) {
long transactionId = key[0];
long[] end = { transactionId, Long.MAX_VALUE };
key = undoLog.floorKey(end);
long logId = key[1] + 1;
Object[] data = preparedTransactions.get(transactionId);
int status;
String name;
if (data == null) {
status = Transaction.STATUS_OPEN;
name = null;
} else {
status = (Integer) data[0];
name = (String) data[1];
}
Transaction t = new Transaction(this, transactionId, status, name, logId);
list.add(t);
key = undoLog.higherKey(end);
}
return list;
}
......@@ -161,39 +156,31 @@ public class TransactionStore {
*/
public synchronized Transaction begin() {
long transactionId = lastTransactionId++;
if (lastTransactionId > lastTransactionIdStored) {
lastTransactionIdStored += 64;
settings.put(LAST_TRANSACTION_ID, "" + lastTransactionIdStored);
}
int status = Transaction.STATUS_OPEN;
return new Transaction(this, transactionId, status, null, 0);
}
private void storeTransaction(Transaction t) {
private void commitIfNeeded() {
if (store.getUnsavedPageCount() > MAX_UNSAVED_PAGES) {
store.commit();
}
if (t.isStored()) {
return;
}
t.setStored(true);
long transactionId = t.getId();
Object[] v = { t.getStatus(), null };
openTransactions.put(transactionId, v);
openTransactionMap.put(transactionId, t);
if (lastTransactionId > lastTransactionIdStored) {
lastTransactionIdStored += 32;
settings.put(LAST_TRANSACTION_ID, "" + lastTransactionIdStored);
store.store();
}
}
/**
* Prepare a transaction.
* Store a transaction.
*
* @param transactionId the transaction id
* @param t the transaction
*/
void prepare(Transaction t) {
storeTransaction(t);
Object[] old = openTransactions.get(t.getId());
Object[] v = { Transaction.STATUS_PREPARED, old[1] };
openTransactions.put(t.getId(), v);
store.commit();
void storeTransaction(Transaction t) {
if (t.getStatus() == Transaction.STATUS_PREPARED || t.getName() != null) {
Object[] v = { t.getStatus(), t.getName() };
preparedTransactions.put(t.getId(), v);
}
}
/**
......@@ -208,25 +195,12 @@ public class TransactionStore {
*/
void log(Transaction t, long logId, int opType, int mapId,
Object key, Object oldValue) {
storeTransaction(t);
commitIfNeeded();
long[] undoKey = { t.getId(), logId };
Object[] log = new Object[] { opType, mapId, key, oldValue };
undoLog.put(undoKey, log);
}
/**
* Set the name of a transaction.
*
* @param t the transaction
* @param name the new name
*/
void setTransactionName(Transaction t, String name) {
storeTransaction(t);
Object[] old = openTransactions.get(t.getId());
Object[] v = { old[0], name };
openTransactions.put(t.getId(), v);
}
/**
* Commit a transaction.
*
......@@ -240,6 +214,7 @@ public class TransactionStore {
for (long logId = 0; logId < maxLogId; logId++) {
long[] undoKey = new long[] {
t.getId(), logId };
commitIfNeeded();
Object[] op = undoLog.get(undoKey);
int opType = (Integer) op[0];
if (opType == Transaction.OP_REMOVE) {
......@@ -247,12 +222,12 @@ public class TransactionStore {
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
String mapName = DataUtils.parseMap(m).get("name");
MVMap<Object, Object[]> map = store.openMap(mapName);
MVMap<Object, VersionedValue> map = store.openMap(mapName);
Object key = op[2];
Object[] value = map.get(key);
VersionedValue value = map.get(key);
// possibly the entry was added later on
// so we have to check
if (value[2] == null) {
if (value.value == null) {
// remove the value
map.remove(key);
}
......@@ -273,9 +248,21 @@ public class TransactionStore {
endTransaction(t);
}
boolean isTransactionOpen(long transactionId) {
int todoSlow;
// if (transactionId < firstOpenTransaction) {
// return false;
// }
long[] key = { transactionId, -1 };
key = undoLog.higherKey(key);
return key != null && key[0] == transactionId;
}
private void endTransaction(Transaction t) {
openTransactions.remove(t.getId());
openTransactionMap.remove(t.getId());
if (t.getStatus() == Transaction.STATUS_PREPARED) {
preparedTransactions.remove(t.getId());
}
t.setStatus(Transaction.STATUS_CLOSED);
}
/**
......@@ -287,15 +274,16 @@ public class TransactionStore {
*/
void rollbackTo(Transaction t, long maxLogId, long toLogId) {
for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
commitIfNeeded();
Object[] op = undoLog.get(new long[] {
t.getId(), logId });
int mapId = ((Integer) op[1]).intValue();
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
String mapName = DataUtils.parseMap(m).get("name");
MVMap<Object, Object[]> map = store.openMap(mapName);
MVMap<Object, VersionedValue> map = store.openMap(mapName);
Object key = op[2];
Object[] oldValue = (Object[]) op[3];
VersionedValue oldValue = (VersionedValue) op[3];
if (oldValue == null) {
// this transaction added the value
map.remove(key);
......@@ -345,7 +333,7 @@ public class TransactionStore {
/**
* The transaction id.
*/
final Long transactionId;
final long transactionId;
long logId;
......@@ -353,8 +341,6 @@ public class TransactionStore {
private String name;
private boolean stored;
Transaction(TransactionStore store, long transactionId, int status, String name, long logId) {
this.store = store;
this.startVersion = store.store.getCurrentVersion();
......@@ -364,48 +350,24 @@ public class TransactionStore {
this.logId = logId;
}
boolean isStored() {
return stored;
}
void setStored(boolean stored) {
this.stored = stored;
}
/**
* Get the transaction id.
*
* @return the transaction id
*/
public Long getId() {
public long getId() {
return transactionId;
}
/**
* Get the transaction status.
*
* @return the status
*/
public int getStatus() {
return status;
}
/**
* Set the name of the transaction.
*
* @param name the new name
*/
void setStatus(int status) {
this.status = status;
}
public void setName(String name) {
checkOpen();
store.setTransactionName(this, name);
this.name = name;
store.storeTransaction(this);
}
/**
* Get the name of the transaction.
*
* @return name the name
*/
public String getName() {
return name;
}
......@@ -486,8 +448,8 @@ public class TransactionStore {
*/
public void prepare() {
checkOpen();
store.prepare(this);
status = STATUS_PREPARED;
store.storeTransaction(this);
}
/**
......@@ -496,7 +458,6 @@ public class TransactionStore {
public void commit() {
if (status != STATUS_CLOSED) {
store.commit(this, logId);
status = STATUS_CLOSED;
}
}
......@@ -506,7 +467,6 @@ public class TransactionStore {
public void rollback() {
if (status != STATUS_CLOSED) {
store.rollback(this, logId);
status = STATUS_CLOSED;
}
}
......@@ -539,7 +499,7 @@ public class TransactionStore {
* Key: key the key of the data.
* Value: { transactionId, oldVersion, value }
*/
final MVMap<K, Object[]> map;
final MVMap<K, VersionedValue> map;
private Transaction transaction;
......@@ -556,16 +516,14 @@ public class TransactionStore {
TransactionMap(Transaction transaction, String name, DataType keyType,
DataType valueType) {
this.transaction = transaction;
ArrayType arrayType = new ArrayType(new DataType[] {
new ObjectDataType(), new ObjectDataType(), valueType
});
MVMap.Builder<K, Object[]> builder = new MVMap.Builder<K, Object[]>()
.keyType(keyType).valueType(arrayType);
VersionedValueType vt = new VersionedValueType(valueType);
MVMap.Builder<K, VersionedValue> builder = new MVMap.Builder<K, VersionedValue>()
.keyType(keyType).valueType(vt);
map = transaction.store.store.openMap(name, builder);
mapId = map.getId();
}
private TransactionMap(Transaction transaction, MVMap<K, Object[]> map, int mapId) {
private TransactionMap(Transaction transaction, MVMap<K, VersionedValue> map, int mapId) {
this.transaction = transaction;
this.map = map;
this.mapId = mapId;
......@@ -612,10 +570,6 @@ public class TransactionStore {
return size;
}
private void checkOpen() {
transaction.checkOpen();
}
/**
* Remove an entry.
* <p>
......@@ -646,7 +600,7 @@ public class TransactionStore {
}
private V set(K key, V value) {
checkOpen();
transaction.checkOpen();
long start = 0;
while (true) {
V old = get(key);
......@@ -717,18 +671,18 @@ public class TransactionStore {
* @return true if the value was set
*/
public boolean trySet(K key, V value, boolean onlyIfUnchanged) {
Object[] current = map.get(key);
VersionedValue current = map.get(key);
if (onlyIfUnchanged) {
Object[] old = getArray(key, readLogId);
VersionedValue old = getValue(key, readLogId);
if (!map.areValuesEqual(old, current)) {
long tx = (Long) current[0];
long tx = current.transactionId;
if (tx == transaction.transactionId) {
if (value == null) {
// ignore removing an entry
// if it was added or changed
// in the same statement
return true;
} else if (current[2] == null) {
} else if (current.value == null) {
// add an entry that was removed
// in the same statement
} else {
......@@ -740,7 +694,7 @@ public class TransactionStore {
}
}
int opType;
if (current == null || current[2] == null) {
if (current == null || current.value == null) {
if (value == null) {
// remove a removed value
opType = Transaction.OP_SET;
......@@ -754,19 +708,20 @@ public class TransactionStore {
opType = Transaction.OP_SET;
}
}
Object[] newValue = {
transaction.transactionId,
transaction.logId, value };
VersionedValue newValue = new VersionedValue();
newValue.transactionId = transaction.transactionId;
newValue.logId = transaction.logId;
newValue.value = value;
if (current == null) {
// a new value
Object[] old = map.putIfAbsent(key, newValue);
VersionedValue old = map.putIfAbsent(key, newValue);
if (old == null) {
transaction.log(opType, mapId, key, current);
return true;
}
return false;
}
long tx = (Long) current[0];
long tx = current.transactionId;
if (tx == transaction.transactionId) {
// added or updated by this transaction
if (map.replace(key, current, newValue)) {
......@@ -778,7 +733,7 @@ public class TransactionStore {
return false;
}
// added or updated by another transaction
boolean open = transaction.store.openTransactions.containsKey(tx);
boolean open = transaction.store.isTransactionOpen(tx);
if (!open) {
// the transaction is committed:
// overwrite the value
......@@ -832,21 +787,21 @@ public class TransactionStore {
*/
@SuppressWarnings("unchecked")
public V get(K key, long maxLogId) {
checkOpen();
Object[] data = getArray(key, maxLogId);
return data == null ? null : (V) data[2];
transaction.checkOpen();
VersionedValue data = getValue(key, maxLogId);
return data == null ? null : (V) data.value;
}
private Object[] getArray(K key, long maxLog) {
Object[] data = map.get(key);
private VersionedValue getValue(K key, long maxLog) {
VersionedValue data = map.get(key);
while (true) {
long tx;
if (data == null) {
// doesn't exist or deleted by a committed transaction
return null;
}
tx = (Long) data[0];
long logId = (Long) data[1];
tx = data.transactionId;
long logId = data.logId;
if (tx == transaction.transactionId) {
// added by this transaction
if (logId < maxLog) {
......@@ -854,15 +809,15 @@ public class TransactionStore {
}
}
// added or updated by another transaction
boolean open = transaction.store.openTransactions.containsKey(tx);
boolean open = transaction.store.isTransactionOpen(tx);
if (!open) {
// it is committed
return data;
}
// get the value before the uncommitted transaction
long[] x = new long[] { tx, logId };
data = transaction.store.undoLog.get(x);
data = (Object[]) data[3];
Object[] d = transaction.store.undoLog.get(x);
data = (VersionedValue) d[3];
}
}
......@@ -1008,6 +963,68 @@ public class TransactionStore {
}
/**
* A versioned value (possibly null). It contains a pointer to the old
* value, and the value itself.
*/
static class VersionedValue {
public long transactionId, logId;
public Object value;
}
/**
* The value type for a versioned value.
*/
public static class VersionedValueType implements DataType {
private final DataType valueType;
VersionedValueType(DataType valueType) {
this.valueType = valueType;
}
@Override
public int getMemory(Object obj) {
VersionedValue v = (VersionedValue) obj;
return valueType.getMemory(v.value) + 16;
}
@Override
public int compare(Object aObj, Object bObj) {
if (aObj == bObj) {
return 0;
}
VersionedValue a = (VersionedValue) aObj;
VersionedValue b = (VersionedValue) bObj;
long comp = a.transactionId - b.transactionId;
if (comp == 0) {
comp = a.logId - b.logId;
if (comp == 0) {
return valueType.compare(a.value, b.value);
}
}
return Long.signum(comp);
}
@Override
public ByteBuffer write(ByteBuffer buff, Object obj) {
VersionedValue v = (VersionedValue) obj;
DataUtils.writeVarLong(buff, v.transactionId);
DataUtils.writeVarLong(buff, v.logId);
return valueType.write(buff, v.value);
}
@Override
public Object read(ByteBuffer buff) {
VersionedValue v = new VersionedValue();
v.transactionId = DataUtils.readVarLong(buff);
v.logId = DataUtils.readVarLong(buff);
v.value = valueType.read(buff);
return v;
}
}
/**
* A data type that contains an array of objects with the specified data
* types.
......
......@@ -19,6 +19,7 @@ public interface DataType {
* @param a the first key
* @param b the second key
* @return -1 if the first key is smaller, 1 if larger, and 0 if equal
* @throws UnsupportedOperationException if the type is not orderable
*/
int compare(Object a, Object b);
......
......@@ -401,7 +401,7 @@ public class RegularTable extends TableBase {
if (n > 0) {
nextAnalyze = n;
}
int rows = session.getDatabase().getSettings().analyzeSample;
int rows = session.getDatabase().getSettings().analyzeSample / 10;
Analyze.analyzeTable(session, this, rows, false);
}
......
......@@ -8,7 +8,6 @@ package org.h2.test.store;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.DriverManager;
......@@ -16,12 +15,12 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.h2.constant.ErrorCode;
import org.h2.engine.Constants;
import org.h2.mvstore.db.MVTableEngine;
import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase;
import org.h2.util.Profiler;
import org.h2.util.Task;
/**
......@@ -39,7 +38,7 @@ public class TestMVTableEngine extends TestBase {
}
public void test() throws Exception {
// testSpeed();
// testSpeed();
testReopen();
testBlob();
testExclusiveLock();
......@@ -95,32 +94,21 @@ int tes;
// -mx4g
// fast size
// 10000 / 8000
// 1278 mvstore;LOCK_MODE=0 before
// 1524 mvstore;LOCK_MODE=0 after
// 790 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0 before
// 834 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0 after
// 1000 / 80000
// 1753 mvstore;LOCK_MODE=0 before
// 1998 mvstore;LOCK_MODE=0 after
// 810 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0 before
// 818 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0 after
// 100 / 800000
// 2270 mvstore;LOCK_MODE=0 before
// 2841 mvstore;LOCK_MODE=0 after
// 2107 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0 before
// 2116 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0 after
// 10 / 800000
// 1312 mvstore;LOCK_MODE=0 before
// 1500 mvstore;LOCK_MODE=0 after
// 1541 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0 before
// 1551 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0 after
// 1265 mvstore;LOCK_MODE=0 before
// 1434 mvstore;LOCK_MODE=0 after
// 1126 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0;LOG=0 before
// 1136 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0;LOG=0 after
prep.setString(2, new String(new char[100]).replace((char) 0, 'x'));
for (int i = 0; i < 200000; i++) {
// 100 / 800000
// 2010 mvstore;LOCK_MODE=0 before
// 2261 mvstore;LOCK_MODE=0 after
// 1536 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0;LOG=0 before
// 1546 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0;LOG=0 after
prep.setString(2, new String(new char[10]).replace((char) 0, 'x'));
// for (int i = 0; i < 20000; i++) {
for (int i = 0; i < 800000; i++) {
prep.setInt(1, i);
prep.execute();
......
......@@ -40,7 +40,6 @@ public class TestTransactionStore extends TestBase {
public void test() throws Exception {
FileUtils.createDirectories(getBaseDir());
testKeyIterator();
testMultiStatement();
testTwoPhaseCommit();
......@@ -205,7 +204,7 @@ public class TestTransactionStore extends TestBase {
list = ts.getOpenTransactions();
assertEquals(1, list.size());
txOld = list.get(0);
assertTrue(tx == txOld);
assertTrue(tx.getId() == txOld.getId());
s.commit();
ts.close();
s.close();
......@@ -233,7 +232,7 @@ public class TestTransactionStore extends TestBase {
tx = ts.begin();
m = tx.openMap("test");
// TransactionStore was not closed, so we lost some ids
assertEquals(33, tx.getId());
assertEquals(65, tx.getId());
list = ts.getOpenTransactions();
assertEquals(2, list.size());
txOld = list.get(1);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论