提交 de64ab1a authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore table engine (WIP)

上级 237b1a55
...@@ -23,8 +23,8 @@ import org.h2.jdbc.JdbcConnection; ...@@ -23,8 +23,8 @@ import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException; import org.h2.message.DbException;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.message.TraceSystem; import org.h2.message.TraceSystem;
import org.h2.mvstore.db.TransactionStore2; import org.h2.mvstore.db.TransactionStore;
import org.h2.mvstore.db.TransactionStore2.Transaction; import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.result.ResultInterface; import org.h2.result.ResultInterface;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.schema.Schema; import org.h2.schema.Schema;
...@@ -1257,7 +1257,7 @@ public class Session extends SessionWithState { ...@@ -1257,7 +1257,7 @@ public class Session extends SessionWithState {
* @param store the store * @param store the store
* @return the transaction * @return the transaction
*/ */
public Transaction getTransaction(TransactionStore2 store) { public Transaction getTransaction(TransactionStore store) {
if (transaction == null) { if (transaction == null) {
transaction = store.begin(); transaction = store.begin();
startStatement = -1; startStatement = -1;
......
...@@ -76,6 +76,11 @@ public class DataUtils { ...@@ -76,6 +76,11 @@ public class DataUtils {
*/ */
private static final byte[] EMPTY_BYTES = {}; private static final byte[] EMPTY_BYTES = {};
/**
* The maximum byte to grow a buffer at a time.
*/
private static final int MAX_GROW = 16 * 1024 * 1024;
/** /**
* Get the length of the variable size int. * Get the length of the variable size int.
* *
...@@ -739,8 +744,7 @@ public class DataUtils { ...@@ -739,8 +744,7 @@ public class DataUtils {
private static ByteBuffer grow(ByteBuffer buff, int len) { private static ByteBuffer grow(ByteBuffer buff, int len) {
len = buff.remaining() + len; len = buff.remaining() + len;
int capacity = buff.capacity(); int capacity = buff.capacity();
// grow at most 1 MB at a time len = Math.max(len, Math.min(capacity + MAX_GROW, capacity * 2));
len = Math.max(len, Math.min(capacity + 1024 * 1024, capacity * 2));
ByteBuffer temp = ByteBuffer.allocate(len); ByteBuffer temp = ByteBuffer.allocate(len);
buff.flip(); buff.flip();
temp.put(buff); temp.put(buff);
......
...@@ -35,9 +35,7 @@ public class MVDelegateIndex extends BaseIndex { ...@@ -35,9 +35,7 @@ public class MVDelegateIndex extends BaseIndex {
} }
} }
static int count;
public void add(Session session, Row row) { public void add(Session session, Row row) {
count++;
// nothing to do // nothing to do
} }
......
...@@ -18,8 +18,8 @@ import org.h2.index.Cursor; ...@@ -18,8 +18,8 @@ import org.h2.index.Cursor;
import org.h2.index.IndexType; import org.h2.index.IndexType;
import org.h2.message.DbException; import org.h2.message.DbException;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.db.TransactionStore2.Transaction; import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.mvstore.db.TransactionStore2.TransactionMap; import org.h2.mvstore.db.TransactionStore.TransactionMap;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.result.SearchRow; import org.h2.result.SearchRow;
import org.h2.result.SortOrder; import org.h2.result.SortOrder;
...@@ -59,7 +59,6 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -59,7 +59,6 @@ public class MVPrimaryIndex extends BaseIndex {
valueType(valueType); valueType(valueType);
dataMap = mvTable.getTransaction(null).openMap(mapName, mapBuilder); dataMap = mvTable.getTransaction(null).openMap(mapName, mapBuilder);
Value k = dataMap.lastKey(); Value k = dataMap.lastKey();
dataMap.getTransaction().commit();
lastKey = k == null ? 0 : k.getLong(); lastKey = k == null ? 0 : k.getLong();
} }
...@@ -73,7 +72,6 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -73,7 +72,6 @@ public class MVPrimaryIndex extends BaseIndex {
rename(newName + "_DATA"); rename(newName + "_DATA");
String newMapName = newName + "_DATA_" + getId(); String newMapName = newName + "_DATA_" + getId();
map.renameMap(newMapName); map.renameMap(newMapName);
map.getTransaction().commit();
mapName = newMapName; mapName = newMapName;
} }
...@@ -233,10 +231,7 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -233,10 +231,7 @@ public class MVPrimaryIndex extends BaseIndex {
@Override @Override
public long getRowCountApproximation() { public long getRowCountApproximation() {
TransactionMap<Value, Value> map = getMap(null); return getMap(null).getSize();
long size = map.getSize();
map.getTransaction().commit();
return size;
} }
public long getDiskSpaceUsed() { public long getDiskSpaceUsed() {
......
...@@ -17,8 +17,8 @@ import org.h2.index.Cursor; ...@@ -17,8 +17,8 @@ import org.h2.index.Cursor;
import org.h2.index.IndexType; import org.h2.index.IndexType;
import org.h2.message.DbException; import org.h2.message.DbException;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.db.TransactionStore2.Transaction; import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.mvstore.db.TransactionStore2.TransactionMap; import org.h2.mvstore.db.TransactionStore.TransactionMap;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.result.SearchRow; import org.h2.result.SearchRow;
import org.h2.result.SortOrder; import org.h2.result.SortOrder;
...@@ -87,7 +87,6 @@ public class MVSecondaryIndex extends BaseIndex { ...@@ -87,7 +87,6 @@ public class MVSecondaryIndex extends BaseIndex {
TransactionMap<Value, Value> map = getMap(null); TransactionMap<Value, Value> map = getMap(null);
String newMapName = newName + "_" + getId(); String newMapName = newName + "_" + getId();
map.renameMap(newMapName); map.renameMap(newMapName);
map.getTransaction().commit();
mapName = newMapName; mapName = newMapName;
super.rename(newName); super.rename(newName);
} }
...@@ -218,10 +217,7 @@ public class MVSecondaryIndex extends BaseIndex { ...@@ -218,10 +217,7 @@ public class MVSecondaryIndex extends BaseIndex {
@Override @Override
public boolean needRebuild() { public boolean needRebuild() {
TransactionMap<Value, Value> map = getMap(null); return getMap(null).getSize() == 0;
boolean result = map.getSize() == 0;
map.getTransaction().commit();
return result;
} }
@Override @Override
...@@ -232,10 +228,7 @@ public class MVSecondaryIndex extends BaseIndex { ...@@ -232,10 +228,7 @@ public class MVSecondaryIndex extends BaseIndex {
@Override @Override
public long getRowCountApproximation() { public long getRowCountApproximation() {
TransactionMap<Value, Value> map = getMap(null); return getMap(null).getSize();
long size = map.getSize();
map.getTransaction().commit();
return size;
} }
public long getDiskSpaceUsed() { public long getDiskSpaceUsed() {
......
...@@ -28,7 +28,7 @@ import org.h2.index.IndexType; ...@@ -28,7 +28,7 @@ import org.h2.index.IndexType;
import org.h2.index.MultiVersionIndex; import org.h2.index.MultiVersionIndex;
import org.h2.message.DbException; import org.h2.message.DbException;
import org.h2.message.Trace; import org.h2.message.Trace;
import org.h2.mvstore.db.TransactionStore2.Transaction; import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.result.Row; import org.h2.result.Row;
import org.h2.result.SortOrder; import org.h2.result.SortOrder;
import org.h2.schema.SchemaObject; import org.h2.schema.SchemaObject;
...@@ -48,7 +48,7 @@ import org.h2.value.Value; ...@@ -48,7 +48,7 @@ import org.h2.value.Value;
public class MVTable extends TableBase { public class MVTable extends TableBase {
private final String storeName; private final String storeName;
private final TransactionStore2 store; private final TransactionStore store;
private MVPrimaryIndex primaryIndex; private MVPrimaryIndex primaryIndex;
private ArrayList<Index> indexes = New.arrayList(); private ArrayList<Index> indexes = New.arrayList();
private long lastModificationId; private long lastModificationId;
...@@ -68,7 +68,7 @@ public class MVTable extends TableBase { ...@@ -68,7 +68,7 @@ public class MVTable extends TableBase {
*/ */
private boolean waitForLock; private boolean waitForLock;
public MVTable(CreateTableData data, String storeName, TransactionStore2 store) { public MVTable(CreateTableData data, String storeName, TransactionStore store) {
super(data); super(data);
nextAnalyze = database.getSettings().analyzeAuto; nextAnalyze = database.getSettings().analyzeAuto;
this.storeName = storeName; this.storeName = storeName;
...@@ -485,26 +485,16 @@ public class MVTable extends TableBase { ...@@ -485,26 +485,16 @@ public class MVTable extends TableBase {
@Override @Override
public void removeRow(Session session, Row row) { public void removeRow(Session session, Row row) {
lastModificationId = database.getNextModificationDataId(); lastModificationId = database.getNextModificationDataId();
int i = indexes.size() - 1; Transaction t = getTransaction(session);
long savepoint = t.setSavepoint();
try { try {
for (; i >= 0; i--) { for (int i = indexes.size() - 1; i >= 0; i--) {
Index index = indexes.get(i); Index index = indexes.get(i);
index.remove(session, row); index.remove(session, row);
} }
rowCount--; rowCount--;
} catch (Throwable e) { } catch (Throwable e) {
try { t.rollbackToSavepoint(savepoint);
while (++i < indexes.size()) {
Index index = indexes.get(i);
index.add(session, row);
}
} catch (DbException e2) {
// this could happen, for example on failure in the storage
// but if that is not the case it means there is something wrong
// with the database
trace.error(e2, "could not undo operation");
throw e2;
}
throw DbException.convert(e); throw DbException.convert(e);
} }
analyzeIfRequired(session); analyzeIfRequired(session);
...@@ -524,26 +514,16 @@ public class MVTable extends TableBase { ...@@ -524,26 +514,16 @@ public class MVTable extends TableBase {
@Override @Override
public void addRow(Session session, Row row) { public void addRow(Session session, Row row) {
lastModificationId = database.getNextModificationDataId(); lastModificationId = database.getNextModificationDataId();
int i = 0; Transaction t = getTransaction(session);
long savepoint = t.setSavepoint();
try { try {
for (int size = indexes.size(); i < size; i++) { for (int i = 0, size = indexes.size(); i < size; i++) {
Index index = indexes.get(i); Index index = indexes.get(i);
index.add(session, row); index.add(session, row);
} }
rowCount++; rowCount++;
} catch (Throwable e) { } catch (Throwable e) {
try { t.rollbackToSavepoint(savepoint);
while (--i >= 0) {
Index index = indexes.get(i);
index.remove(session, row);
}
} catch (DbException e2) {
// this could happen, for example on failure in the storage
// but if that is not the case it means there is something wrong
// with the database
trace.error(e2, "could not undo operation");
throw e2;
}
DbException de = DbException.convert(e); DbException de = DbException.convert(e);
if (de.getErrorCode() == ErrorCode.DUPLICATE_KEY_1) { if (de.getErrorCode() == ErrorCode.DUPLICATE_KEY_1) {
for (int j = 0; j < indexes.size(); j++) { for (int j = 0; j < indexes.size(); j++) {
...@@ -572,7 +552,7 @@ public class MVTable extends TableBase { ...@@ -572,7 +552,7 @@ public class MVTable extends TableBase {
} }
int rows = session.getDatabase().getSettings().analyzeSample; int rows = session.getDatabase().getSettings().analyzeSample;
int test; int test;
// Analyze.analyzeTable(session, this, rows, false); // Analyze.analyzeTable(session, this, rows, false);
} }
@Override @Override
...@@ -687,7 +667,7 @@ int test; ...@@ -687,7 +667,7 @@ int test;
return session.getTransaction(store); return session.getTransaction(store);
} }
public TransactionStore2 getStore() { public TransactionStore getStore() {
return store; return store;
} }
......
...@@ -42,7 +42,6 @@ public class MVTableEngine implements TableEngine { ...@@ -42,7 +42,6 @@ public class MVTableEngine implements TableEngine {
if (store == null) { if (store == null) {
return; return;
} }
// TODO this stores uncommitted transactions as well
store(store.getStore()); store(store.getStore());
} }
} }
...@@ -115,9 +114,12 @@ public class MVTableEngine implements TableEngine { ...@@ -115,9 +114,12 @@ public class MVTableEngine implements TableEngine {
* @param store the store * @param store the store
*/ */
static void store(MVStore store) { static void store(MVStore store) {
if (!store.isReadOnly()) {
store.commit();
store.compact(50); store.compact(50);
store.store(); store.store();
} }
}
/** /**
* A store with open tables. * A store with open tables.
...@@ -142,12 +144,12 @@ public class MVTableEngine implements TableEngine { ...@@ -142,12 +144,12 @@ public class MVTableEngine implements TableEngine {
/** /**
* The transaction store. * The transaction store.
*/ */
private final TransactionStore2 transactionStore; private final TransactionStore transactionStore;
public Store(Database db, MVStore store) { public Store(Database db, MVStore store) {
this.db = db; this.db = db;
this.store = store; this.store = store;
this.transactionStore = new TransactionStore2(store, this.transactionStore = new TransactionStore(store,
new ValueDataType(null, null, null)); new ValueDataType(null, null, null));
} }
...@@ -155,7 +157,7 @@ public class MVTableEngine implements TableEngine { ...@@ -155,7 +157,7 @@ public class MVTableEngine implements TableEngine {
return store; return store;
} }
public TransactionStore2 getTransactionStore() { public TransactionStore getTransactionStore() {
return transactionStore; return transactionStore;
} }
......
...@@ -16,9 +16,8 @@ import java.util.Map; ...@@ -16,9 +16,8 @@ import java.util.Map;
import org.h2.mvstore.Cursor; import org.h2.mvstore.Cursor;
import org.h2.mvstore.DataUtils; import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVMapConcurrent;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.MVMap.Builder; import org.h2.mvstore.MVMap.Builder;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.type.DataType; import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.ObjectDataType; import org.h2.mvstore.type.ObjectDataType;
import org.h2.util.New; import org.h2.util.New;
...@@ -30,6 +29,8 @@ public class TransactionStore { ...@@ -30,6 +29,8 @@ public class TransactionStore {
private static final String LAST_TRANSACTION_ID = "lastTransactionId"; private static final String LAST_TRANSACTION_ID = "lastTransactionId";
private static final int MAX_UNSAVED_PAGES = 4 * 1024;
/** /**
* The store. * The store.
*/ */
...@@ -49,7 +50,7 @@ public class TransactionStore { ...@@ -49,7 +50,7 @@ public class TransactionStore {
/** /**
* The undo log. * The undo log.
* Key: [ transactionId, logId ], value: [ baseVersion, mapId, key ]. * Key: [ transactionId, logId ], value: [ opType, mapId, key, oldValue ].
*/ */
final MVMap<long[], Object[]> undoLog; final MVMap<long[], Object[]> undoLog;
...@@ -86,14 +87,15 @@ public class TransactionStore { ...@@ -86,14 +87,15 @@ public class TransactionStore {
this.store = store; this.store = store;
settings = store.openMap("settings"); settings = store.openMap("settings");
openTransactions = store.openMap("openTransactions", openTransactions = store.openMap("openTransactions",
new MVMapConcurrent.Builder<Long, Object[]>()); new MVMap.Builder<Long, Object[]>());
// commit could be faster if we have one undo log per transaction, // commit could be faster if we have one undo log per transaction,
// or a range delete operation for maps // or a range delete operation for maps
ArrayType valueType = new ArrayType(new DataType[]{ ArrayType valueType = new ArrayType(new DataType[]{
new ObjectDataType(), new ObjectDataType(), keyType new ObjectDataType(), new ObjectDataType(), keyType,
new ObjectDataType()
}); });
MVMapConcurrent.Builder<long[], Object[]> builder = MVMap.Builder<long[], Object[]> builder =
new MVMapConcurrent.Builder<long[], Object[]>(). new MVMap.Builder<long[], Object[]>().
valueType(valueType); valueType(valueType);
// TODO escape other map names, to avoid conflicts // TODO escape other map names, to avoid conflicts
undoLog = store.openMap("undoLog", builder); undoLog = store.openMap("undoLog", builder);
...@@ -153,13 +155,15 @@ public class TransactionStore { ...@@ -153,13 +155,15 @@ public class TransactionStore {
* @return the transaction * @return the transaction
*/ */
public synchronized Transaction begin() { public synchronized Transaction begin() {
store.incrementVersion();
long transactionId = lastTransactionId++; long transactionId = lastTransactionId++;
int status = Transaction.STATUS_OPEN; int status = Transaction.STATUS_OPEN;
return new Transaction(this, transactionId, status, null, 0); return new Transaction(this, transactionId, status, null, 0);
} }
private void storeTransaction(Transaction t) { private void storeTransaction(Transaction t) {
if (store.getUnsavedPageCount() > MAX_UNSAVED_PAGES) {
store.commit();
}
long transactionId = t.getId(); long transactionId = t.getId();
if (openTransactions.containsKey(transactionId)) { if (openTransactions.containsKey(transactionId)) {
return; return;
...@@ -194,12 +198,13 @@ public class TransactionStore { ...@@ -194,12 +198,13 @@ public class TransactionStore {
* @param opType the operation type * @param opType the operation type
* @param mapId the map id * @param mapId the map id
* @param key the key * @param key the key
* @param oldValue the old value
*/ */
void log(Transaction t, long logId, int opType, int mapId, void log(Transaction t, long logId, int opType, int mapId,
Object key) { Object key, Object oldValue) {
storeTransaction(t); storeTransaction(t);
long[] undoKey = { t.getId(), logId }; long[] undoKey = { t.getId(), logId };
Object[] log = new Object[] { opType, mapId, key }; Object[] log = new Object[] { opType, mapId, key, oldValue };
undoLog.put(undoKey, log); undoLog.put(undoKey, log);
} }
...@@ -214,7 +219,6 @@ public class TransactionStore { ...@@ -214,7 +219,6 @@ public class TransactionStore {
Object[] old = openTransactions.get(t.getId()); Object[] old = openTransactions.get(t.getId());
Object[] v = { old[0], name }; Object[] v = { old[0], name };
openTransactions.put(t.getId(), v); openTransactions.put(t.getId(), v);
store.commit();
} }
/** /**
...@@ -224,7 +228,6 @@ public class TransactionStore { ...@@ -224,7 +228,6 @@ public class TransactionStore {
* @param maxLogId the last log id * @param maxLogId the last log id
*/ */
void commit(Transaction t, long maxLogId) { void commit(Transaction t, long maxLogId) {
store.incrementVersion();
for (long logId = 0; logId < maxLogId; logId++) { for (long logId = 0; logId < maxLogId; logId++) {
long[] undoKey = new long[] { long[] undoKey = new long[] {
t.getId(), logId }; t.getId(), logId };
...@@ -247,17 +250,7 @@ public class TransactionStore { ...@@ -247,17 +250,7 @@ public class TransactionStore {
} }
undoLog.remove(undoKey); undoLog.remove(undoKey);
} }
openTransactions.remove(t.getId()); endTransaction(t);
openTransactionMap.remove(t.getId());
store.commit();
long oldestVersion = store.getCurrentVersion();
for (Transaction u : openTransactionMap.values()) {
long v = u.startVersion;
if (v < oldestVersion) {
oldestVersion = v;
}
}
store.setRetainVersion(oldestVersion);
} }
/** /**
...@@ -268,9 +261,12 @@ public class TransactionStore { ...@@ -268,9 +261,12 @@ public class TransactionStore {
*/ */
void rollback(Transaction t, long maxLogId) { void rollback(Transaction t, long maxLogId) {
rollbackTo(t, maxLogId, 0); rollbackTo(t, maxLogId, 0);
endTransaction(t);
}
private void endTransaction(Transaction t) {
openTransactions.remove(t.getId()); openTransactions.remove(t.getId());
openTransactionMap.remove(t.getId()); openTransactionMap.remove(t.getId());
store.commit();
} }
/** /**
...@@ -281,7 +277,6 @@ public class TransactionStore { ...@@ -281,7 +277,6 @@ public class TransactionStore {
* @param toLogId the log id to roll back to * @param toLogId the log id to roll back to
*/ */
void rollbackTo(Transaction t, long maxLogId, long toLogId) { void rollbackTo(Transaction t, long maxLogId, long toLogId) {
store.incrementVersion();
for (long logId = maxLogId - 1; logId >= toLogId; logId--) { for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
Object[] op = undoLog.get(new long[] { Object[] op = undoLog.get(new long[] {
t.getId(), logId }); t.getId(), logId });
...@@ -291,30 +286,17 @@ public class TransactionStore { ...@@ -291,30 +286,17 @@ public class TransactionStore {
String mapName = DataUtils.parseMap(m).get("name"); String mapName = DataUtils.parseMap(m).get("name");
MVMap<Object, Object[]> map = store.openMap(mapName); MVMap<Object, Object[]> map = store.openMap(mapName);
Object key = op[2]; Object key = op[2];
Object[] value = map.get(key); Object[] oldValue = (Object[]) op[3];
if (value != null) { if (oldValue == null) {
Long oldVersion = (Long) value[1];
if (oldVersion == null) {
// this transaction added the value // this transaction added the value
map.remove(key); map.remove(key);
} else if (oldVersion < map.getCreateVersion()) {
map.remove(key);
} else { } else {
// this transaction updated the value // this transaction updated the value
MVMap<Object, Object[]> mapOld = map map.put(key, oldValue);
.openVersion(oldVersion);
Object[] old = mapOld.get(key);
if (old == null) {
map.remove(key);
} else {
map.put(key, old);
} }
undoLog.remove(op);
} }
} }
undoLog.remove(logId);
}
store.commit();
}
/** /**
* A transaction. * A transaction.
...@@ -354,14 +336,14 @@ public class TransactionStore { ...@@ -354,14 +336,14 @@ public class TransactionStore {
/** /**
* The transaction id. * The transaction id.
*/ */
final long transactionId; final Long transactionId;
long logId;
private int status; private int status;
private String name; private String name;
private long logId;
Transaction(TransactionStore store, long transactionId, int status, String name, long logId) { Transaction(TransactionStore store, long transactionId, int status, String name, long logId) {
this.store = store; this.store = store;
this.startVersion = store.store.getCurrentVersion(); this.startVersion = store.store.getCurrentVersion();
...@@ -376,7 +358,7 @@ public class TransactionStore { ...@@ -376,7 +358,7 @@ public class TransactionStore {
* *
* @return the transaction id * @return the transaction id
*/ */
public long getId() { public Long getId() {
return transactionId; return transactionId;
} }
...@@ -416,7 +398,6 @@ public class TransactionStore { ...@@ -416,7 +398,6 @@ public class TransactionStore {
*/ */
public long setSavepoint() { public long setSavepoint() {
checkOpen(); checkOpen();
store.store.incrementVersion();
return logId; return logId;
} }
...@@ -426,13 +407,14 @@ public class TransactionStore { ...@@ -426,13 +407,14 @@ public class TransactionStore {
* @param opType the operation type * @param opType the operation type
* @param mapId the map id * @param mapId the map id
* @param key the key * @param key the key
* @param oldValue the old value
*/ */
void log(int opType, int mapId, Object key) { void log(int opType, int mapId, Object key, Object oldValue) {
store.log(this, logId++, opType, mapId, key); store.log(this, logId++, opType, mapId, key, oldValue);
} }
/** /**
* Open a data map where reads are always up to date. * Open a data map.
* *
* @param <K> the key type * @param <K> the key type
* @param <V> the value type * @param <V> the value type
...@@ -440,22 +422,9 @@ public class TransactionStore { ...@@ -440,22 +422,9 @@ public class TransactionStore {
* @return the transaction map * @return the transaction map
*/ */
public <K, V> TransactionMap<K, V> openMap(String name) { public <K, V> TransactionMap<K, V> openMap(String name) {
return openMap(name, -1);
}
/**
* Open a data map where reads are based on the specified version / savepoint.
*
* @param <K> the key type
* @param <V> the value type
* @param name the name of the map
* @param readVersion the version used for reading
* @return the transaction map
*/
public <K, V> TransactionMap<K, V> openMap(String name, long readVersion) {
checkOpen(); checkOpen();
return new TransactionMap<K, V>(this, name, new ObjectDataType(), return new TransactionMap<K, V>(this, name, new ObjectDataType(),
new ObjectDataType(), readVersion); new ObjectDataType());
} }
/** /**
...@@ -464,11 +433,10 @@ public class TransactionStore { ...@@ -464,11 +433,10 @@ public class TransactionStore {
* @param <K> the key type * @param <K> the key type
* @param <V> the value type * @param <V> the value type
* @param name the name of the map * @param name the name of the map
* @param readVersion the version used for reading
* @param builder the builder * @param builder the builder
* @return the transaction map * @return the transaction map
*/ */
public <K, V> TransactionMap<K, V> openMap(String name, long readVersion, Builder<K, V> builder) { public <K, V> TransactionMap<K, V> openMap(String name, Builder<K, V> builder) {
checkOpen(); checkOpen();
DataType keyType = builder.getKeyType(); DataType keyType = builder.getKeyType();
if (keyType == null) { if (keyType == null) {
...@@ -478,7 +446,7 @@ public class TransactionStore { ...@@ -478,7 +446,7 @@ public class TransactionStore {
if (valueType == null) { if (valueType == null) {
valueType = new ObjectDataType(); valueType = new ObjectDataType();
} }
return new TransactionMap<K, V>(this, name, keyType, valueType, readVersion); return new TransactionMap<K, V>(this, name, keyType, valueType);
} }
/** /**
...@@ -556,34 +524,55 @@ public class TransactionStore { ...@@ -556,34 +524,55 @@ public class TransactionStore {
* Key: key the key of the data. * Key: key the key of the data.
* Value: { transactionId, oldVersion, value } * Value: { transactionId, oldVersion, value }
*/ */
private final MVMap<K, Object[]> mapWrite; private final MVMap<K, Object[]> map;
/** /**
* The map used for reading (possibly an older version). Reading is done * If a record was read that was updated by this transaction, and the
* on an older version so that changes are not immediately visible, to * update occurred before this log id, the older version is read. This
* support statement processing (for example * is so that changes are not immediately visible, to support statement
* "update test set id = id + 1"). * processing (for example "update test set id = id + 1").
* <p>
* Key: key the key of the data.
* Value: { transactionId, oldVersion, value }
*/ */
private final MVMap<K, Object[]> mapRead; private long readLogId = Long.MAX_VALUE;
TransactionMap(Transaction transaction, String name, DataType keyType, TransactionMap(Transaction transaction, String name, DataType keyType,
DataType valueType, long readVersion) { DataType valueType) {
this.transaction = transaction; this.transaction = transaction;
ArrayType arrayType = new ArrayType(new DataType[] { ArrayType arrayType = new ArrayType(new DataType[] {
new ObjectDataType(), new ObjectDataType(), valueType new ObjectDataType(), new ObjectDataType(), valueType
}); });
MVMap.Builder<K, Object[]> builder = new MVMap.Builder<K, Object[]>() MVMap.Builder<K, Object[]> builder = new MVMap.Builder<K, Object[]>()
.keyType(keyType).valueType(arrayType); .keyType(keyType).valueType(arrayType);
mapWrite = transaction.store.store.openMap(name, builder); map = transaction.store.store.openMap(name, builder);
mapId = mapWrite.getId(); mapId = map.getId();
if (readVersion >= 0) {
mapRead = mapWrite.openVersion(readVersion);
} else {
mapRead = mapWrite;
} }
private TransactionMap(Transaction transaction, MVMap<K, Object[]> map, int mapId) {
this.transaction = transaction;
this.map = map;
this.mapId = mapId;
}
/**
* Set the savepoint. Afterwards, reads are based on the specified
* savepoint.
*
* @param savepoint the savepoint
*/
public void setSavepoint(long savepoint) {
this.readLogId = savepoint;
}
/**
* Get a clone of this map for the given transaction.
*
* @param transaction the transaction
* @param savepoint the savepoint
* @return the map
*/
public TransactionMap<K, V> getInstance(Transaction transaction, long savepoint) {
TransactionMap<K, V> m = new TransactionMap<K, V>(transaction, map, mapId);
m.setSavepoint(savepoint);
return m;
} }
/** /**
...@@ -594,7 +583,7 @@ public class TransactionStore { ...@@ -594,7 +583,7 @@ public class TransactionStore {
public long getSize() { public long getSize() {
// TODO this method is very slow // TODO this method is very slow
long size = 0; long size = 0;
Cursor<K> cursor = mapRead.keyIterator(null); Cursor<K> cursor = map.keyIterator(null);
while (cursor.hasNext()) { while (cursor.hasNext()) {
K key = cursor.next(); K key = cursor.next();
if (get(key) != null) { if (get(key) != null) {
...@@ -708,11 +697,10 @@ public class TransactionStore { ...@@ -708,11 +697,10 @@ public class TransactionStore {
* @return true if the value was set * @return true if the value was set
*/ */
public boolean trySet(K key, V value, boolean onlyIfUnchanged) { public boolean trySet(K key, V value, boolean onlyIfUnchanged) {
MVMap<K, Object[]> m = mapRead; Object[] current = map.get(key);
Object[] current = mapWrite.get(key);
if (onlyIfUnchanged) { if (onlyIfUnchanged) {
Object[] old = m.get(key); Object[] old = getArray(key, readLogId);
if (!mapWrite.areValuesEqual(old, current)) { if (!map.areValuesEqual(old, current)) {
long tx = (Long) current[0]; long tx = (Long) current[0];
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
if (value == null) { if (value == null) {
...@@ -731,7 +719,6 @@ public class TransactionStore { ...@@ -731,7 +719,6 @@ public class TransactionStore {
} }
} }
} }
long oldVersion = transaction.store.store.getCurrentVersion() - 1;
int opType; int opType;
if (current == null || current[2] == null) { if (current == null || current[2] == null) {
if (value == null) { if (value == null) {
...@@ -747,13 +734,14 @@ public class TransactionStore { ...@@ -747,13 +734,14 @@ public class TransactionStore {
opType = Transaction.OP_SET; opType = Transaction.OP_SET;
} }
} }
Object[] newValue = { transaction.transactionId, oldVersion, value }; Object[] newValue = {
transaction.transactionId,
transaction.logId, value };
if (current == null) { if (current == null) {
// a new value // a new value
newValue[1] = null; Object[] old = map.putIfAbsent(key, newValue);
Object[] old = mapWrite.putIfAbsent(key, newValue);
if (old == null) { if (old == null) {
transaction.log(opType, mapId, key); transaction.log(opType, mapId, key, current);
return true; return true;
} }
return false; return false;
...@@ -761,15 +749,8 @@ public class TransactionStore { ...@@ -761,15 +749,8 @@ public class TransactionStore {
long tx = (Long) current[0]; long tx = (Long) current[0];
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
// added or updated by this transaction // added or updated by this transaction
if (mapWrite.replace(key, current, newValue)) { if (map.replace(key, current, newValue)) {
if (current[1] == null) { transaction.log(opType, mapId, key, current);
transaction.log(opType, mapId, key);
} else {
long c = (Long) current[1];
if (c != oldVersion) {
transaction.log(opType, mapId, key);
}
}
return true; return true;
} }
// strange, somebody overwrite the value // strange, somebody overwrite the value
...@@ -781,8 +762,8 @@ public class TransactionStore { ...@@ -781,8 +762,8 @@ public class TransactionStore {
if (!open) { if (!open) {
// the transaction is committed: // the transaction is committed:
// overwrite the value // overwrite the value
if (mapWrite.replace(key, current, newValue)) { if (map.replace(key, current, newValue)) {
transaction.log(opType, mapId, key); transaction.log(opType, mapId, key, current);
return true; return true;
} }
// somebody else was faster // somebody else was faster
...@@ -799,7 +780,7 @@ public class TransactionStore { ...@@ -799,7 +780,7 @@ public class TransactionStore {
* @return the value or null * @return the value or null
*/ */
public V get(K key) { public V get(K key) {
return get(key, mapRead); return get(key, readLogId);
} }
/** /**
...@@ -809,7 +790,7 @@ public class TransactionStore { ...@@ -809,7 +790,7 @@ public class TransactionStore {
* @return the value or null * @return the value or null
*/ */
public V getLatest(K key) { public V getLatest(K key) {
return get(key, mapWrite); return get(key, Long.MAX_VALUE);
} }
/** /**
...@@ -826,41 +807,46 @@ public class TransactionStore { ...@@ -826,41 +807,46 @@ public class TransactionStore {
* Get the value for the given key. * Get the value for the given key.
* *
* @param key the key * @param key the key
* @param m the map * @param maxLogId the maximum log id
* @return the value or null * @return the value or null
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public V get(K key, MVMap<K, Object[]> m) { public V get(K key, long maxLogId) {
checkOpen(); checkOpen();
Object[] data = getArray(key, maxLogId);
return data == null ? null : (V) data[2];
}
private Object[] getArray(K key, long maxLog) {
Object[] data = map.get(key);
while (true) { while (true) {
Object[] data = m.get(key);
long tx; long tx;
if (data == null) { if (data == null) {
// doesn't exist or deleted by a committed transaction // doesn't exist or deleted by a committed transaction
return null; return null;
} }
tx = (Long) data[0]; tx = (Long) data[0];
long logId = (Long) data[1];
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
// added by this transaction // added by this transaction
return (V) data[2]; if (logId < maxLog) {
return data;
}
} }
// added or updated by another transaction // added or updated by another transaction
boolean open = transaction.store.openTransactions.containsKey(tx); boolean open = transaction.store.openTransactions.containsKey(tx);
if (!open) { if (!open) {
// it is committed // it is committed
return (V) data[2]; return data;
} }
tx = (Long) data[0];
// get the value before the uncommitted transaction // get the value before the uncommitted transaction
if (data[1] == null) { long[] x = new long[] { tx, logId };
// a new entry data = transaction.store.undoLog.get(x);
return null; data = (Object[]) data[3];
}
long oldVersion = (Long) data[1];
m = mapWrite.openVersion(oldVersion);
} }
} }
/** /**
* Rename the map. * Rename the map.
* *
...@@ -868,7 +854,7 @@ public class TransactionStore { ...@@ -868,7 +854,7 @@ public class TransactionStore {
*/ */
public void renameMap(String newMapName) { public void renameMap(String newMapName) {
// TODO rename maps transactionally // TODO rename maps transactionally
mapWrite.renameMap(newMapName); map.renameMap(newMapName);
} }
/** /**
...@@ -877,7 +863,7 @@ public class TransactionStore { ...@@ -877,7 +863,7 @@ public class TransactionStore {
* @return true if closed * @return true if closed
*/ */
public boolean isClosed() { public boolean isClosed() {
return mapWrite.isClosed(); return map.isClosed();
} }
/** /**
...@@ -885,7 +871,7 @@ public class TransactionStore { ...@@ -885,7 +871,7 @@ public class TransactionStore {
*/ */
public void removeMap() { public void removeMap() {
// TODO remove in a transaction // TODO remove in a transaction
mapWrite.removeMap(); map.removeMap();
} }
/** /**
...@@ -893,7 +879,7 @@ public class TransactionStore { ...@@ -893,7 +879,7 @@ public class TransactionStore {
*/ */
public void clear() { public void clear() {
// TODO truncate transactionally // TODO truncate transactionally
mapWrite.clear(); map.clear();
} }
/** /**
...@@ -903,7 +889,7 @@ public class TransactionStore { ...@@ -903,7 +889,7 @@ public class TransactionStore {
*/ */
public K firstKey() { public K firstKey() {
// TODO transactional firstKey // TODO transactional firstKey
return mapRead.firstKey(); return map.firstKey();
} }
/** /**
...@@ -913,7 +899,7 @@ public class TransactionStore { ...@@ -913,7 +899,7 @@ public class TransactionStore {
*/ */
public K lastKey() { public K lastKey() {
// TODO transactional lastKey // TODO transactional lastKey
return mapRead.lastKey(); return map.lastKey();
} }
/** /**
...@@ -924,7 +910,7 @@ public class TransactionStore { ...@@ -924,7 +910,7 @@ public class TransactionStore {
*/ */
public Iterator<K> keyIterator(K from) { public Iterator<K> keyIterator(K from) {
// TODO transactional keyIterator // TODO transactional keyIterator
return mapRead.keyIterator(from); return map.keyIterator(from);
} }
/** /**
...@@ -935,7 +921,7 @@ public class TransactionStore { ...@@ -935,7 +921,7 @@ public class TransactionStore {
*/ */
public K ceilingKey(K key) { public K ceilingKey(K key) {
// TODO transactional ceilingKey // TODO transactional ceilingKey
return mapRead.ceilingKey(key); return map.ceilingKey(key);
} }
/** /**
...@@ -947,7 +933,7 @@ public class TransactionStore { ...@@ -947,7 +933,7 @@ public class TransactionStore {
*/ */
public K higherKey(K key) { public K higherKey(K key) {
// TODO transactional higherKey // TODO transactional higherKey
return mapRead.higherKey(key); return map.higherKey(key);
} }
/** /**
...@@ -959,7 +945,7 @@ public class TransactionStore { ...@@ -959,7 +945,7 @@ public class TransactionStore {
*/ */
public K lowerKey(K key) { public K lowerKey(K key) {
// TODO Auto-generated method stub // TODO Auto-generated method stub
return mapRead.lowerKey(key); return map.lowerKey(key);
} }
public Transaction getTransaction() { public Transaction getTransaction() {
......
/*
* Copyright 2004-2013 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore.db;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.h2.mvstore.Cursor;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVMapConcurrent;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.MVMap.Builder;
import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.ObjectDataType;
import org.h2.util.New;
/**
* A store that supports concurrent transactions.
*/
public class TransactionStore2 {
private static final String LAST_TRANSACTION_ID = "lastTransactionId";
/**
* The store.
*/
final MVStore store;
/**
* The persisted map of open transaction.
* Key: transactionId, value: [ status, name ].
*/
final MVMap<Long, Object[]> openTransactions;
/**
* The map of open transaction objects.
* Key: transactionId, value: transaction object.
*/
final HashMap<Long, Transaction> openTransactionMap = New.hashMap();
/**
* The undo log.
* Key: [ transactionId, logId ], value: [ opType, mapId, key, oldValue ].
*/
final MVMap<long[], Object[]> undoLog;
/**
* The lock timeout in milliseconds. 0 means timeout immediately.
*/
long lockTimeout;
/**
* The transaction settings. "lastTransaction" the last transaction id.
*/
private final MVMap<String, String> settings;
private long lastTransactionIdStored;
private long lastTransactionId;
/**
* Create a new transaction store.
*
* @param store the store
*/
public TransactionStore2(MVStore store) {
this(store, new ObjectDataType());
}
/**
* Create a new transaction store.
*
* @param store the store
* @param keyType the data type for map keys
*/
public TransactionStore2(MVStore store, DataType keyType) {
this.store = store;
settings = store.openMap("settings");
openTransactions = store.openMap("openTransactions",
new MVMapConcurrent.Builder<Long, Object[]>());
// commit could be faster if we have one undo log per transaction,
// or a range delete operation for maps
ArrayType valueType = new ArrayType(new DataType[]{
new ObjectDataType(), new ObjectDataType(), keyType,
new ObjectDataType()
});
MVMapConcurrent.Builder<long[], Object[]> builder =
new MVMapConcurrent.Builder<long[], Object[]>().
valueType(valueType);
// TODO escape other map names, to avoid conflicts
undoLog = store.openMap("undoLog", builder);
init();
}
private void init() {
String s = settings.get(LAST_TRANSACTION_ID);
if (s != null) {
lastTransactionId = Long.parseLong(s);
lastTransactionIdStored = lastTransactionId;
}
Long lastKey = openTransactions.lastKey();
if (lastKey != null && lastKey.longValue() > lastTransactionId) {
throw DataUtils.newIllegalStateException("Last transaction not stored");
}
Cursor<Long> cursor = openTransactions.keyIterator(null);
while (cursor.hasNext()) {
long id = cursor.next();
Object[] data = openTransactions.get(id);
int status = (Integer) data[0];
String name = (String) data[1];
long[] next = { id + 1, -1 };
long[] last = undoLog.floorKey(next);
if (last == null) {
// no entry
} else if (last[0] == id) {
Transaction t = new Transaction(this, id, status, name, last[1]);
openTransactionMap.put(id, t);
}
}
}
/**
* Get the list of currently open transactions that have pending writes.
*
* @return the list of transactions
*/
public synchronized List<Transaction> getOpenTransactions() {
ArrayList<Transaction> list = New.arrayList();
list.addAll(openTransactionMap.values());
return list;
}
/**
* Close the transaction store.
*/
public synchronized void close() {
// to avoid losing transaction ids
settings.put(LAST_TRANSACTION_ID, "" + lastTransactionId);
store.commit();
}
/**
* Begin a new transaction.
*
* @return the transaction
*/
public synchronized Transaction begin() {
long transactionId = lastTransactionId++;
int status = Transaction.STATUS_OPEN;
return new Transaction(this, transactionId, status, null, 0);
}
private void storeTransaction(Transaction t) {
long transactionId = t.getId();
if (openTransactions.containsKey(transactionId)) {
return;
}
Object[] v = { t.getStatus(), null };
openTransactions.put(transactionId, v);
openTransactionMap.put(transactionId, t);
if (lastTransactionId > lastTransactionIdStored) {
lastTransactionIdStored += 32;
settings.put(LAST_TRANSACTION_ID, "" + lastTransactionIdStored);
}
}
/**
* Prepare a transaction.
*
* @param transactionId the transaction id
*/
void prepare(Transaction t) {
storeTransaction(t);
Object[] old = openTransactions.get(t.getId());
Object[] v = { Transaction.STATUS_PREPARED, old[1] };
openTransactions.put(t.getId(), v);
store.commit();
}
/**
* Log an entry.
*
* @param t the transaction
* @param logId the log id
* @param opType the operation type
* @param mapId the map id
* @param key the key
* @param oldValue the old value
*/
void log(Transaction t, long logId, int opType, int mapId,
Object key, Object oldValue) {
storeTransaction(t);
long[] undoKey = { t.getId(), logId };
Object[] log = new Object[] { opType, mapId, key, oldValue };
undoLog.put(undoKey, log);
}
/**
* Set the name of a transaction.
*
* @param t the transaction
* @param name the new name
*/
void setTransactionName(Transaction t, String name) {
storeTransaction(t);
Object[] old = openTransactions.get(t.getId());
Object[] v = { old[0], name };
openTransactions.put(t.getId(), v);
}
/**
* Commit a transaction.
*
* @param t the transaction
* @param maxLogId the last log id
*/
void commit(Transaction t, long maxLogId) {
for (long logId = 0; logId < maxLogId; logId++) {
long[] undoKey = new long[] {
t.getId(), logId };
Object[] op = undoLog.get(undoKey);
int opType = (Integer) op[0];
if (opType == Transaction.OP_REMOVE) {
int mapId = (Integer) op[1];
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
String mapName = DataUtils.parseMap(m).get("name");
MVMap<Object, Object[]> map = store.openMap(mapName);
Object key = op[2];
Object[] value = map.get(key);
// possibly the entry was added later on
// so we have to check
if (value[2] == null) {
// remove the value
map.remove(key);
}
}
undoLog.remove(undoKey);
}
openTransactions.remove(t.getId());
openTransactionMap.remove(t.getId());
}
/**
* Roll a transaction back.
*
* @param t the transaction
* @param maxLogId the last log id
*/
void rollback(Transaction t, long maxLogId) {
rollbackTo(t, maxLogId, 0);
openTransactions.remove(t.getId());
openTransactionMap.remove(t.getId());
}
/**
* Rollback to an old savepoint.
*
* @param t the transaction
* @param maxLogId the last log id
* @param toLogId the log id to roll back to
*/
void rollbackTo(Transaction t, long maxLogId, long toLogId) {
for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
Object[] op = undoLog.get(new long[] {
t.getId(), logId });
int mapId = ((Integer) op[1]).intValue();
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
String mapName = DataUtils.parseMap(m).get("name");
MVMap<Object, Object[]> map = store.openMap(mapName);
Object key = op[2];
Object[] oldValue = (Object[]) op[3];
if (oldValue == null) {
// this transaction added the value
map.remove(key);
} else {
// this transaction updated the value
map.put(key, oldValue);
}
undoLog.remove(op);
}
}
/**
* A transaction.
*/
public static class Transaction {
/**
* The status of an open transaction.
*/
public static final int STATUS_OPEN = 0;
/**
* The status of a prepared transaction.
*/
public static final int STATUS_PREPARED = 1;
/**
* The status of a closed transaction (committed or rolled back).
*/
public static final int STATUS_CLOSED = 2;
/**
* The operation type for changes in a map.
*/
static final int OP_REMOVE = 0, OP_ADD = 1, OP_SET = 2;
/**
* The transaction store.
*/
final TransactionStore2 store;
/**
* The version of the store at the time the transaction was started.
*/
final long startVersion;
/**
* The transaction id.
*/
final Long transactionId;
long logId;
private int status;
private String name;
Transaction(TransactionStore2 store, long transactionId, int status, String name, long logId) {
this.store = store;
this.startVersion = store.store.getCurrentVersion();
this.transactionId = transactionId;
this.status = status;
this.name = name;
this.logId = logId;
}
/**
* Get the transaction id.
*
* @return the transaction id
*/
public Long getId() {
return transactionId;
}
/**
* Get the transaction status.
*
* @return the status
*/
public int getStatus() {
return status;
}
/**
* Set the name of the transaction.
*
* @param name the new name
*/
public void setName(String name) {
checkOpen();
store.setTransactionName(this, name);
this.name = name;
}
/**
* Get the name of the transaction.
*
* @return name the name
*/
public String getName() {
return name;
}
/**
* Create a new savepoint.
*
* @return the savepoint id
*/
public long setSavepoint() {
checkOpen();
return logId;
}
/**
* Add a log entry.
*
* @param opType the operation type
* @param mapId the map id
* @param key the key
* @param oldValue the old value
*/
void log(int opType, int mapId, Object key, Object oldValue) {
store.log(this, logId++, opType, mapId, key, oldValue);
}
/**
* Open a data map.
*
* @param <K> the key type
* @param <V> the value type
* @param name the name of the map
* @return the transaction map
*/
public <K, V> TransactionMap<K, V> openMap(String name) {
checkOpen();
return new TransactionMap<K, V>(this, name, new ObjectDataType(),
new ObjectDataType());
}
/**
* Open the map to store the data.
*
* @param <K> the key type
* @param <V> the value type
* @param name the name of the map
* @param builder the builder
* @return the transaction map
*/
public <K, V> TransactionMap<K, V> openMap(String name, Builder<K, V> builder) {
checkOpen();
DataType keyType = builder.getKeyType();
if (keyType == null) {
keyType = new ObjectDataType();
}
DataType valueType = builder.getValueType();
if (valueType == null) {
valueType = new ObjectDataType();
}
return new TransactionMap<K, V>(this, name, keyType, valueType);
}
/**
* Roll back to the given savepoint. This is only allowed if the
* transaction is open.
*
* @param savepointId the savepoint id
*/
public void rollbackToSavepoint(long savepointId) {
checkOpen();
store.rollbackTo(this, this.logId, savepointId);
this.logId = savepointId;
}
/**
* Prepare the transaction. Afterwards, the transaction can only be
* committed or rolled back.
*/
public void prepare() {
checkOpen();
store.prepare(this);
status = STATUS_PREPARED;
}
/**
* Commit the transaction. Afterwards, this transaction is closed.
*/
public void commit() {
if (status != STATUS_CLOSED) {
store.commit(this, logId);
status = STATUS_CLOSED;
}
}
/**
* Roll the transaction back. Afterwards, this transaction is closed.
*/
public void rollback() {
if (status != STATUS_CLOSED) {
store.rollback(this, logId);
status = STATUS_CLOSED;
}
}
/**
* Check whether this transaction is still open.
*/
void checkOpen() {
if (status != STATUS_OPEN) {
throw DataUtils.newIllegalStateException("Transaction is closed");
}
}
public long getCurrentVersion() {
return store.store.getCurrentVersion();
}
}
/**
* A map that supports transactions.
*
* @param <K> the key type
* @param <V> the value type
*/
public static class TransactionMap<K, V> {
private Transaction transaction;
private final int mapId;
/**
* The map used for writing (the latest version).
* <p>
* Key: key the key of the data.
* Value: { transactionId, oldVersion, value }
*/
private final MVMap<K, Object[]> map;
/**
* If a record was read that was updated by this transaction, and the
* update occurred before this log id, the older version is read. This
* is so that changes are not immediately visible, to support statement
* processing (for example "update test set id = id + 1").
*/
private long readLogId = Long.MAX_VALUE;
TransactionMap(Transaction transaction, String name, DataType keyType,
DataType valueType) {
this.transaction = transaction;
ArrayType arrayType = new ArrayType(new DataType[] {
new ObjectDataType(), new ObjectDataType(), valueType
});
MVMap.Builder<K, Object[]> builder = new MVMap.Builder<K, Object[]>()
.keyType(keyType).valueType(arrayType);
map = transaction.store.store.openMap(name, builder);
mapId = map.getId();
}
private TransactionMap(Transaction transaction, MVMap<K, Object[]> map, int mapId) {
this.transaction = transaction;
this.map = map;
this.mapId = mapId;
}
/**
* Set the savepoint. Afterwards, reads are based on the specified
* savepoint.
*
* @param savepoint the savepoint
*/
public void setSavepoint(long savepoint) {
this.readLogId = savepoint;
}
/**
* Get a clone of this map for the given transaction.
*
* @param transaction the transaction
* @param savepoint the savepoint
* @return the map
*/
public TransactionMap<K, V> getInstance(Transaction transaction, long savepoint) {
TransactionMap<K, V> m = new TransactionMap<K, V>(transaction, map, mapId);
m.setSavepoint(savepoint);
return m;
}
/**
* Get the size of the map as seen by this transaction.
*
* @return the size
*/
public long getSize() {
// TODO this method is very slow
long size = 0;
Cursor<K> cursor = map.keyIterator(null);
while (cursor.hasNext()) {
K key = cursor.next();
if (get(key) != null) {
size++;
}
}
return size;
}
private void checkOpen() {
transaction.checkOpen();
}
/**
* Remove an entry.
* <p>
* If the row is locked, this method will retry until the row could be
* updated or until a lock timeout.
*
* @param key the key
* @throws IllegalStateException if a lock timeout occurs
*/
public V remove(K key) {
return set(key, null);
}
/**
* Update the value for the given key.
* <p>
* If the row is locked, this method will retry until the row could be
* updated or until a lock timeout.
*
* @param key the key
* @param value the new value (not null)
* @throws IllegalStateException if a lock timeout occurs
*/
public V put(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null");
return set(key, value);
}
private V set(K key, V value) {
checkOpen();
long start = 0;
while (true) {
V old = get(key);
boolean ok = trySet(key, value, false);
if (ok) {
return old;
}
// an uncommitted transaction:
// wait until it is committed, or until the lock timeout
long timeout = transaction.store.lockTimeout;
if (timeout == 0) {
throw DataUtils.newIllegalStateException("Lock timeout");
}
if (start == 0) {
start = System.currentTimeMillis();
} else {
long t = System.currentTimeMillis() - start;
if (t > timeout) {
throw DataUtils.newIllegalStateException("Lock timeout");
}
// TODO use wait/notify instead
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// ignore
}
}
}
}
/**
* Try to remove the value for the given key.
* <p>
* This will fail if the row is locked by another transaction (that
* means, if another open transaction changed the row).
*
* @param key the key
* @return whether the entry could be removed
*/
public boolean tryRemove(K key) {
return trySet(key, null, false);
}
/**
* Try to update the value for the given key.
* <p>
* This will fail if the row is locked by another transaction (that
* means, if another open transaction changed the row).
*
* @param key the key
* @param value the new value
* @return whether the entry could be updated
*/
public boolean tryPut(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null");
return trySet(key, value, false);
}
/**
* Try to set or remove the value. When updating only unchanged entries,
* then the value is only changed if it was not changed after opening
* the map.
*
* @param key the key
* @param value the new value (null to remove the value)
* @param onlyIfUnchanged only set the value if it was not changed (by
* this or another transaction) since the map was opened
* @return true if the value was set
*/
public boolean trySet(K key, V value, boolean onlyIfUnchanged) {
Object[] current = map.get(key);
if (onlyIfUnchanged) {
Object[] old = getArray(key, readLogId);
if (!map.areValuesEqual(old, current)) {
long tx = (Long) current[0];
if (tx == transaction.transactionId) {
if (value == null) {
// ignore removing an entry
// if it was added or changed
// in the same statement
return true;
} else if (current[2] == null) {
// add an entry that was removed
// in the same statement
} else {
return false;
}
} else {
return false;
}
}
}
int opType;
if (current == null || current[2] == null) {
if (value == null) {
// remove a removed value
opType = Transaction.OP_SET;
} else {
opType = Transaction.OP_ADD;
}
} else {
if (value == null) {
opType = Transaction.OP_REMOVE;
} else {
opType = Transaction.OP_SET;
}
}
Object[] newValue = {
transaction.transactionId,
transaction.logId, value };
if (current == null) {
// a new value
Object[] old = map.putIfAbsent(key, newValue);
if (old == null) {
transaction.log(opType, mapId, key, current);
return true;
}
return false;
}
long tx = (Long) current[0];
if (tx == transaction.transactionId) {
// added or updated by this transaction
if (map.replace(key, current, newValue)) {
transaction.log(opType, mapId, key, current);
return true;
}
// strange, somebody overwrite the value
// even thought the change was not committed
return false;
}
// added or updated by another transaction
boolean open = transaction.store.openTransactions.containsKey(tx);
if (!open) {
// the transaction is committed:
// overwrite the value
if (map.replace(key, current, newValue)) {
transaction.log(opType, mapId, key, current);
return true;
}
// somebody else was faster
return false;
}
// the transaction is not yet committed
return false;
}
/**
* Get the value for the given key at the time when this map was opened.
*
* @param key the key
* @return the value or null
*/
public V get(K key) {
return get(key, readLogId);
}
/**
* Get the most recent value for the given key.
*
* @param key the key
* @return the value or null
*/
public V getLatest(K key) {
return get(key, Long.MAX_VALUE);
}
/**
* Whether the map contains the key.
*
* @param key the key
* @return true if the map contains an entry for this key
*/
public boolean containsKey(K key) {
return get(key) != null;
}
/**
* Get the value for the given key.
*
* @param key the key
* @param maxLogId the maximum log id
* @return the value or null
*/
@SuppressWarnings("unchecked")
public V get(K key, long maxLogId) {
checkOpen();
Object[] data = getArray(key, maxLogId);
return data == null ? null : (V) data[2];
}
private Object[] getArray(K key, long maxLog) {
Object[] data = map.get(key);
while (true) {
long tx;
if (data == null) {
// doesn't exist or deleted by a committed transaction
return null;
}
tx = (Long) data[0];
long logId = (Long) data[1];
if (tx == transaction.transactionId) {
// added by this transaction
if (logId < maxLog) {
return data;
}
}
// added or updated by another transaction
boolean open = transaction.store.openTransactions.containsKey(tx);
if (!open) {
// it is committed
return data;
}
// get the value before the uncommitted transaction
long[] x = new long[] { tx, logId };
data = transaction.store.undoLog.get(x);
data = (Object[]) data[3];
}
}
/**
* Rename the map.
*
* @param newMapName the new map name
*/
public void renameMap(String newMapName) {
// TODO rename maps transactionally
map.renameMap(newMapName);
}
/**
* Check whether this map is closed.
*
* @return true if closed
*/
public boolean isClosed() {
return map.isClosed();
}
/**
* Remove the map.
*/
public void removeMap() {
// TODO remove in a transaction
map.removeMap();
}
/**
* Clear the map.
*/
public void clear() {
// TODO truncate transactionally
map.clear();
}
/**
* Get the first key.
*
* @return the first key, or null if empty
*/
public K firstKey() {
// TODO transactional firstKey
return map.firstKey();
}
/**
* Get the last key.
*
* @return the last key, or null if empty
*/
public K lastKey() {
// TODO transactional lastKey
return map.lastKey();
}
/**
* Iterate over all keys.
*
* @param from the first key to return
* @return the iterator
*/
public Iterator<K> keyIterator(K from) {
// TODO transactional keyIterator
return map.keyIterator(from);
}
/**
* Get the smallest key that is larger or equal to this key.
*
* @param key the key (may not be null)
* @return the result
*/
public K ceilingKey(K key) {
// TODO transactional ceilingKey
return map.ceilingKey(key);
}
/**
* Get the smallest key that is larger than the given key, or null if no
* such key exists.
*
* @param key the key (may not be null)
* @return the result
*/
public K higherKey(K key) {
// TODO transactional higherKey
return map.higherKey(key);
}
/**
* Get the largest key that is smaller than the given key, or null if no
* such key exists.
*
* @param key the key (may not be null)
* @return the result
*/
public K lowerKey(K key) {
// TODO Auto-generated method stub
return map.lowerKey(key);
}
public Transaction getTransaction() {
return transaction;
}
}
/**
* A data type that contains an array of objects with the specified data
* types.
*/
public static class ArrayType implements DataType {
private final int arrayLength;
private final DataType[] elementTypes;
ArrayType(DataType[] elementTypes) {
this.arrayLength = elementTypes.length;
this.elementTypes = elementTypes;
}
@Override
public int getMemory(Object obj) {
Object[] array = (Object[]) obj;
int size = 0;
for (int i = 0; i < arrayLength; i++) {
DataType t = elementTypes[i];
size += t.getMemory(array[i]);
}
return size;
}
@Override
public int compare(Object aObj, Object bObj) {
if (aObj == bObj) {
return 0;
}
Object[] a = (Object[]) aObj;
Object[] b = (Object[]) bObj;
for (int i = 0; i < arrayLength; i++) {
DataType t = elementTypes[i];
int comp = t.compare(a[i], b[i]);
if (comp != 0) {
return comp;
}
}
return 0;
}
@Override
public ByteBuffer write(ByteBuffer buff, Object obj) {
Object[] array = (Object[]) obj;
for (int i = 0; i < arrayLength; i++) {
DataType t = elementTypes[i];
t.write(buff, array[i]);
}
return buff;
}
@Override
public Object read(ByteBuffer buff) {
Object[] array = new Object[arrayLength];
for (int i = 0; i < arrayLength; i++) {
DataType t = elementTypes[i];
array[i] = t.read(buff);
}
return array;
}
}
}
...@@ -39,22 +39,24 @@ public class TestMVTableEngine extends TestBase { ...@@ -39,22 +39,24 @@ public class TestMVTableEngine extends TestBase {
testEncryption(); testEncryption();
testReadOnly(); testReadOnly();
testReuseDiskSpace(); testReuseDiskSpace();
// testDataTypes(); testDataTypes();
testLocking(); testLocking();
// testSimple(); testSimple();
} }
private void testSpeed() throws Exception { private void testSpeed() throws Exception {
String dbName; String dbName;
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
dbName = "mvstore"; dbName = "mvstore";
// dbName += ";LOCK_MODE=0"; dbName += ";LOCK_MODE=0";
// dbName += ";LOG=0";
testSpeed(dbName); testSpeed(dbName);
int tes; int tes;
//Profiler prof = new Profiler().startCollecting(); //Profiler prof = new Profiler().startCollecting();
dbName = "mvstore" + dbName = "mvstore" +
";DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine"; ";DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine";
// dbName += ";LOCK_MODE=0"; dbName += ";LOCK_MODE=0";
// dbName += ";LOG=0";
testSpeed(dbName); testSpeed(dbName);
//System.out.println(prof.getTop(10)); //System.out.println(prof.getTop(10));
} }
...@@ -70,17 +72,51 @@ int tes; ...@@ -70,17 +72,51 @@ int tes;
String password = getPassword(); String password = getPassword();
conn = DriverManager.getConnection(url, user, password); conn = DriverManager.getConnection(url, user, password);
stat = conn.createStatement(); stat = conn.createStatement();
stat.execute("create table test(id int primary key, name varchar(255))"); long time = System.currentTimeMillis();
// stat.execute(
// "create table test(id int primary key, name varchar(255))" +
// "as select x, 'Hello World' from system_range(1, 200000)");
stat.execute("create table test(id int primary key, name varchar)");
PreparedStatement prep = conn PreparedStatement prep = conn
.prepareStatement("insert into test values(?, ?)"); .prepareStatement("insert into test values(?, ?)");
prep.setString(2, "Hello World xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx");
long time = System.currentTimeMillis(); // -mx4g
for (int i = 0; i < 200000; i++) {
// 10000 / 8000
// 1229 mvstore;LOCK_MODE=0
// 1455 mvstore;LOCK_MODE=0
// 19 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0
// 659 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0
// 1000 / 80000
// 1383 mvstore;LOCK_MODE=0
// 1618 mvstore;LOCK_MODE=0
// 244 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0
// 965 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0
// 100 / 800000
// 2061 mvstore;LOCK_MODE=0
// 2281 mvstore;LOCK_MODE=0
// 1414 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0
// 2647 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0
// 10 / 8000000
// 11453 mvstore;LOCK_MODE=0
// 11720 mvstore;LOCK_MODE=0
// 13605 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0
// 25172 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0
prep.setString(2, new String(new char[10]).replace((char) 0, 'x'));
for (int i = 0; i < 8000000; i++) {
prep.setInt(1, i); prep.setInt(1, i);
prep.execute(); prep.execute();
} }
System.out.println((System.currentTimeMillis() - time) + " " + dbName); System.out.println((System.currentTimeMillis() - time) + " " + dbName);
//Profiler prof = new Profiler().startCollecting();
conn.close(); conn.close();
//System.out.println(prof.getTop(10));
System.out.println((System.currentTimeMillis() - time) + " " + dbName);
} }
private void testEncryption() throws Exception { private void testEncryption() throws Exception {
......
...@@ -16,9 +16,9 @@ import java.util.List; ...@@ -16,9 +16,9 @@ import java.util.List;
import java.util.Random; import java.util.Random;
import org.h2.mvstore.MVStore; import org.h2.mvstore.MVStore;
import org.h2.mvstore.db.TransactionStore2; import org.h2.mvstore.db.TransactionStore;
import org.h2.mvstore.db.TransactionStore2.Transaction; import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.mvstore.db.TransactionStore2.TransactionMap; import org.h2.mvstore.db.TransactionStore.TransactionMap;
import org.h2.store.fs.FileUtils; import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase; import org.h2.test.TestBase;
import org.h2.util.New; import org.h2.util.New;
...@@ -60,7 +60,7 @@ public class TestTransactionStore extends TestBase { ...@@ -60,7 +60,7 @@ public class TestTransactionStore extends TestBase {
*/ */
private void testMultiStatement() { private void testMultiStatement() {
MVStore s = MVStore.open(null); MVStore s = MVStore.open(null);
TransactionStore2 ts = new TransactionStore2(s); TransactionStore ts = new TransactionStore(s);
Transaction tx; Transaction tx;
TransactionMap<String, String> m; TransactionMap<String, String> m;
long startUpdate; long startUpdate;
...@@ -141,14 +141,14 @@ public class TestTransactionStore extends TestBase { ...@@ -141,14 +141,14 @@ public class TestTransactionStore extends TestBase {
FileUtils.delete(fileName); FileUtils.delete(fileName);
MVStore s; MVStore s;
TransactionStore2 ts; TransactionStore ts;
Transaction tx; Transaction tx;
Transaction txOld; Transaction txOld;
TransactionMap<String, String> m; TransactionMap<String, String> m;
List<Transaction> list; List<Transaction> list;
s = MVStore.open(fileName); s = MVStore.open(fileName);
ts = new TransactionStore2(s); ts = new TransactionStore(s);
tx = ts.begin(); tx = ts.begin();
assertEquals(null, tx.getName()); assertEquals(null, tx.getName());
tx.setName("first transaction"); tx.setName("first transaction");
...@@ -166,7 +166,7 @@ public class TestTransactionStore extends TestBase { ...@@ -166,7 +166,7 @@ public class TestTransactionStore extends TestBase {
s.close(); s.close();
s = MVStore.open(fileName); s = MVStore.open(fileName);
ts = new TransactionStore2(s); ts = new TransactionStore(s);
tx = ts.begin(); tx = ts.begin();
assertEquals(1, tx.getId()); assertEquals(1, tx.getId());
m = tx.openMap("test"); m = tx.openMap("test");
...@@ -184,7 +184,7 @@ public class TestTransactionStore extends TestBase { ...@@ -184,7 +184,7 @@ public class TestTransactionStore extends TestBase {
s.close(); s.close();
s = MVStore.open(fileName); s = MVStore.open(fileName);
ts = new TransactionStore2(s); ts = new TransactionStore(s);
tx = ts.begin(); tx = ts.begin();
m = tx.openMap("test"); m = tx.openMap("test");
// TransactionStore was not closed, so we lost some ids // TransactionStore was not closed, so we lost some ids
...@@ -209,7 +209,7 @@ public class TestTransactionStore extends TestBase { ...@@ -209,7 +209,7 @@ public class TestTransactionStore extends TestBase {
private void testSavepoint() throws Exception { private void testSavepoint() throws Exception {
MVStore s = MVStore.open(null); MVStore s = MVStore.open(null);
TransactionStore2 ts = new TransactionStore2(s); TransactionStore ts = new TransactionStore(s);
Transaction tx; Transaction tx;
TransactionMap<String, String> m; TransactionMap<String, String> m;
...@@ -262,7 +262,7 @@ public class TestTransactionStore extends TestBase { ...@@ -262,7 +262,7 @@ public class TestTransactionStore extends TestBase {
"create table test(id int primary key, name varchar(255))"); "create table test(id int primary key, name varchar(255))");
MVStore s = MVStore.open(null); MVStore s = MVStore.open(null);
TransactionStore2 ts = new TransactionStore2(s); TransactionStore ts = new TransactionStore(s);
for (int i = 0; i < connectionCount; i++) { for (int i = 0; i < connectionCount; i++) {
Statement stat = statements.get(i); Statement stat = statements.get(i);
// 100 ms to avoid blocking (the test is single threaded) // 100 ms to avoid blocking (the test is single threaded)
...@@ -395,7 +395,7 @@ public class TestTransactionStore extends TestBase { ...@@ -395,7 +395,7 @@ public class TestTransactionStore extends TestBase {
private void testConcurrentTransactionsReadCommitted() { private void testConcurrentTransactionsReadCommitted() {
MVStore s = MVStore.open(null); MVStore s = MVStore.open(null);
TransactionStore2 ts = new TransactionStore2(s); TransactionStore ts = new TransactionStore(s);
Transaction tx1, tx2; Transaction tx1, tx2;
TransactionMap<String, String> m1, m2; TransactionMap<String, String> m1, m2;
...@@ -467,7 +467,7 @@ public class TestTransactionStore extends TestBase { ...@@ -467,7 +467,7 @@ public class TestTransactionStore extends TestBase {
private void testSingleConnection() { private void testSingleConnection() {
MVStore s = MVStore.open(null); MVStore s = MVStore.open(null);
TransactionStore2 ts = new TransactionStore2(s); TransactionStore ts = new TransactionStore(s);
Transaction tx; Transaction tx;
TransactionMap<String, String> m; TransactionMap<String, String> m;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论