提交 7740acbb authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: support transactions (WIP)

上级 b6487f6d
......@@ -23,6 +23,8 @@ import org.h2.jdbc.JdbcConnection;
import org.h2.message.DbException;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.mvstore.TransactionStore;
import org.h2.mvstore.TransactionStore.Transaction;
import org.h2.result.ResultInterface;
import org.h2.result.Row;
import org.h2.schema.Schema;
......@@ -102,6 +104,7 @@ public class Session extends SessionWithState {
private int objectId;
private final int queryCacheSize;
private SmallLRUCache<String, Command> queryCache;
private Transaction transaction;
public Session(Database database, User user, int id) {
this.database = database;
......@@ -1231,4 +1234,11 @@ public class Session extends SessionWithState {
return redoLogBinary;
}
public Transaction getTransaction(TransactionStore store) {
if (transaction == null) {
transaction = store.begin();
}
return transaction;
}
}
......@@ -96,6 +96,7 @@ TODO:
-- use a transaction log where only the deltas are stored
- serialization for lists, sets, sets, sorted sets, maps, sorted maps
- maybe rename 'rollback' to 'revert'
- support other compression algorithms (deflate,...)
*/
......
......@@ -11,6 +11,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.h2.mvstore.MVMap.Builder;
import org.h2.util.New;
/**
......@@ -616,8 +617,23 @@ public class TransactionStore {
if (onlyIfUnchanged) {
Object[] old = m.get(key);
if (!mapWrite.areValuesEqual(old, current)) {
long tx = (Long) current[0];
if (tx == transaction.transactionId) {
if (value == null) {
// ignore removing an entry
// if it was added or changed
// in the same statement
return true;
} else if (current[2] == null) {
// add an entry that was removed
// in the same statement
} else {
return false;
}
} else {
return false;
}
}
}
long oldVersion = transaction.store.store.getCurrentVersion() - 1;
int opType;
......@@ -739,5 +755,10 @@ public class TransactionStore {
}
}
public <A, B> MVMap<A, B> openMap(String name, Builder<A, B> builder) {
int todo;
return store.openMap(name, builder);
}
}
......@@ -58,6 +58,7 @@ public class MVPrimaryIndex extends BaseIndex {
* @param newName the new name
*/
public void renameTable(String newName) {
MVMap<Long, Value[]> map = getMap(null);
rename(newName + "_DATA");
map.renameMap(newName + "_DATA_" + getId());
}
......@@ -97,6 +98,7 @@ public class MVPrimaryIndex extends BaseIndex {
for (int i = 0; i < array.length; i++) {
array[i] = row.getValue(i);
}
MVMap<Long, Value[]> map = getMap(session);
if (map.containsKey(row.getKey())) {
String sql = "PRIMARY KEY ON " + table.getSQL();
if (mainIndexColumn >= 0 && mainIndexColumn < indexColumns.length) {
......@@ -112,6 +114,7 @@ public class MVPrimaryIndex extends BaseIndex {
@Override
public void remove(Session session, Row row) {
MVMap<Long, Value[]> map = getMap(session);
Value[] old = map.remove(row.getKey());
if (old == null) {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1,
......@@ -142,6 +145,7 @@ public class MVPrimaryIndex extends BaseIndex {
max = v.getLong();
}
}
MVMap<Long, Value[]> map = getMap(session);
return new MVStoreCursor(session, map.keyIterator(min), max);
}
......@@ -150,6 +154,7 @@ public class MVPrimaryIndex extends BaseIndex {
}
public Row getRow(Session session, long key) {
MVMap<Long, Value[]> map = getMap(session);
Value[] array = map.get(key);
Row row = new Row(array, 0);
row.setKey(key);
......@@ -158,6 +163,7 @@ public class MVPrimaryIndex extends BaseIndex {
@Override
public double getCost(Session session, int[] masks) {
MVMap<Long, Value[]> map = getMap(session);
long cost = 10 * (map.getSize() + Constants.COST_ROW_OFFSET);
return cost;
}
......@@ -170,6 +176,7 @@ public class MVPrimaryIndex extends BaseIndex {
@Override
public void remove(Session session) {
MVMap<Long, Value[]> map = getMap(session);
if (!map.isClosed()) {
map.removeMap();
}
......@@ -177,6 +184,7 @@ public class MVPrimaryIndex extends BaseIndex {
@Override
public void truncate(Session session) {
MVMap<Long, Value[]> map = getMap(session);
if (mvTable.getContainsLargeObject()) {
database.getLobStorage().removeAllForTable(table.getId());
}
......@@ -190,6 +198,7 @@ public class MVPrimaryIndex extends BaseIndex {
@Override
public Cursor findFirstOrLast(Session session, boolean first) {
MVMap<Long, Value[]> map = getMap(session);
if (map.getSize() == 0) {
return new MVStoreCursor(session, Collections.<Long>emptyList().iterator(), 0);
}
......@@ -206,11 +215,13 @@ public class MVPrimaryIndex extends BaseIndex {
@Override
public long getRowCount(Session session) {
MVMap<Long, Value[]> map = getMap(session);
return map.getSize();
}
@Override
public long getRowCountApproximation() {
MVMap<Long, Value[]> map = getMap(null);
return map.getSize();
}
......@@ -254,6 +265,7 @@ public class MVPrimaryIndex extends BaseIndex {
* @return the cursor
*/
Cursor find(Session session, long first, long last) {
MVMap<Long, Value[]> map = getMap(session);
return new MVStoreCursor(session, map.keyIterator(first), last);
}
......@@ -311,4 +323,9 @@ public class MVPrimaryIndex extends BaseIndex {
return true;
}
MVMap<Long, Value[]> getMap(Session session) {
// return mvTable.getTransaction(session).openMap(name)
return map;
}
}
......@@ -38,7 +38,7 @@ public class MVSecondaryIndex extends BaseIndex {
final MVTable mvTable;
private final int keyColumns;
private MVMap<Value[], Long> map;
private MVMap<Value[], Long> map2;
public MVSecondaryIndex(Database db, MVTable table, int id, String indexName,
IndexColumn[] columns, IndexType indexType) {
......@@ -58,7 +58,7 @@ public class MVSecondaryIndex extends BaseIndex {
String name = getName() + "_" + getId();
ValueArrayDataType t = new ValueArrayDataType(
db.getCompareMode(), db, sortTypes);
map = table.getStore().openMap(name,
map2 = table.getStore().openMap(name,
new MVMap.Builder<Value[], Long>().keyType(t));
}
......@@ -78,12 +78,14 @@ public class MVSecondaryIndex extends BaseIndex {
}
public void rename(String newName) {
MVMap<Value[], Long> map = getMap(null);
map.renameMap(newName + "_" + getId());
super.rename(newName);
}
@Override
public void add(Session session, Row row) {
MVMap<Value[], Long> map = getMap(session);
Value[] array = getKey(row);
if (indexType.isUnique()) {
array[keyColumns - 1] = ValueLong.get(0);
......@@ -104,6 +106,7 @@ public class MVSecondaryIndex extends BaseIndex {
@Override
public void remove(Session session, Row row) {
Value[] array = getKey(row);
MVMap<Value[], Long> map = getMap(session);
Long old = map.remove(array);
if (old == null) {
if (old == null) {
......@@ -116,6 +119,7 @@ public class MVSecondaryIndex extends BaseIndex {
@Override
public Cursor find(Session session, SearchRow first, SearchRow last) {
Value[] min = getKey(first);
MVMap<Value[], Long> map = getMap(session);
return new MVStoreCursor(session, map.keyIterator(min), last);
}
......@@ -160,11 +164,13 @@ public class MVSecondaryIndex extends BaseIndex {
@Override
public double getCost(Session session, int[] masks) {
MVMap<Value[], Long> map = getMap(session);
return 10 * getCostRangeIndex(masks, map.getSize());
}
@Override
public void remove(Session session) {
MVMap<Value[], Long> map = getMap(session);
if (!map.isClosed()) {
map.removeMap();
}
......@@ -172,6 +178,7 @@ public class MVSecondaryIndex extends BaseIndex {
@Override
public void truncate(Session session) {
MVMap<Value[], Long> map = getMap(session);
map.clear();
}
......@@ -182,6 +189,7 @@ public class MVSecondaryIndex extends BaseIndex {
@Override
public Cursor findFirstOrLast(Session session, boolean first) {
MVMap<Value[], Long> map = getMap(session);
Value[] key = first ? map.firstKey() : map.lastKey();
while (true) {
if (key == null) {
......@@ -201,16 +209,19 @@ public class MVSecondaryIndex extends BaseIndex {
@Override
public boolean needRebuild() {
MVMap<Value[], Long> map = getMap(null);
return map.getSize() == 0;
}
@Override
public long getRowCount(Session session) {
MVMap<Value[], Long> map = getMap(session);
return map.getSize();
}
@Override
public long getRowCountApproximation() {
MVMap<Value[], Long> map = getMap(null);
return map.getSize();
}
......@@ -285,4 +296,8 @@ public class MVSecondaryIndex extends BaseIndex {
}
MVMap<Value[], Long> getMap(Session session) {
return map2;
}
}
......@@ -11,6 +11,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;
import org.h2.api.DatabaseEventListener;
import org.h2.command.ddl.Analyze;
import org.h2.command.ddl.CreateTableData;
......@@ -27,7 +28,8 @@ import org.h2.index.IndexType;
import org.h2.index.MultiVersionIndex;
import org.h2.message.DbException;
import org.h2.message.Trace;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.TransactionStore;
import org.h2.mvstore.TransactionStore.Transaction;
import org.h2.result.Row;
import org.h2.result.SortOrder;
import org.h2.schema.SchemaObject;
......@@ -47,7 +49,7 @@ import org.h2.value.Value;
public class MVTable extends TableBase {
private final String storeName;
private final MVStore store;
private final TransactionStore store;
private MVPrimaryIndex primaryIndex;
private ArrayList<Index> indexes = New.arrayList();
private long lastModificationId;
......@@ -67,7 +69,7 @@ public class MVTable extends TableBase {
*/
private boolean waitForLock;
public MVTable(CreateTableData data, String storeName, MVStore store) {
public MVTable(CreateTableData data, String storeName, TransactionStore store) {
super(data);
nextAnalyze = database.getSettings().analyzeAuto;
this.storeName = storeName;
......@@ -468,7 +470,7 @@ public class MVTable extends TableBase {
return first.column.getColumnId();
}
private void addRowsToIndex(Session session, ArrayList<Row> list, Index index) {
private static void addRowsToIndex(Session session, ArrayList<Row> list, Index index) {
final Index idx = index;
Collections.sort(list, new Comparator<Row>() {
public int compare(Row r1, Row r2) {
......@@ -479,7 +481,6 @@ public class MVTable extends TableBase {
index.add(session, row);
}
list.clear();
storeIfRequired();
}
@Override
......@@ -508,7 +509,6 @@ public class MVTable extends TableBase {
throw DbException.convert(e);
}
analyzeIfRequired(session);
storeIfRequired();
}
@Override
......@@ -520,7 +520,6 @@ public class MVTable extends TableBase {
}
rowCount = 0;
changesSinceAnalyze = 0;
storeIfRequired();
}
@Override
......@@ -561,7 +560,6 @@ public class MVTable extends TableBase {
throw de;
}
analyzeIfRequired(session);
storeIfRequired();
}
private void analyzeIfRequired(Session session) {
......@@ -675,13 +673,11 @@ public class MVTable extends TableBase {
// ok
}
private void storeIfRequired() {
if (store.getUnsavedPageCount() > 1000) {
MVTableEngine.store(store);
}
Transaction getTransaction(Session session) {
return session.getTransaction(store);
}
public MVStore getStore() {
public TransactionStore getStore() {
return store;
}
......
......@@ -17,6 +17,7 @@ import org.h2.engine.Constants;
import org.h2.engine.Database;
import org.h2.message.DbException;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.TransactionStore;
import org.h2.table.TableBase;
import org.h2.util.New;
......@@ -82,7 +83,7 @@ public class MVTableEngine implements TableEngine {
}
}
}
MVTable table = new MVTable(data, storeName, store.getStore());
MVTable table = new MVTable(data, storeName, store.getTransactionStore());
store.openTables.add(table);
table.init(data.session);
return table;
......@@ -139,15 +140,25 @@ public class MVTableEngine implements TableEngine {
*/
private final MVStore store;
/**
* The transaction store.
*/
private final TransactionStore transactionStore;
public Store(Database db, MVStore store) {
this.db = db;
this.store = store;
this.transactionStore = new TransactionStore(store);
}
public MVStore getStore() {
return store;
}
public TransactionStore getTransactionStore() {
return transactionStore;
}
}
}
......@@ -48,8 +48,13 @@ public class TestTransactionStore extends TestBase {
/**
* Tests behavior when used for a sequence of SQL statements. Each statement
* uses a savepoint. Within a statement, a change by the statement itself is
* uses a savepoint. Within a statement, changes by the statement itself are
* not seen; the change is only seen when the statement finished.
* <p>
* Update statements that change the key of multiple rows may use delete/add
* pairs to do so (they don't need to first delete all entries and then
* re-add them). Trying to add multiple values for the same key is not
* allowed (an update statement that would result in a duplicate key).
*/
private void testMultiStatement() {
MVStore s = MVStore.open(null);
......@@ -61,8 +66,6 @@ public class TestTransactionStore extends TestBase {
tx = ts.begin();
// TODO support and test rollback of table creation / removal
// start of statement
// create table test
startUpdate = tx.setSavepoint();
......@@ -94,9 +97,7 @@ public class TestTransactionStore extends TestBase {
assertEquals("World", m.get("2"));
// already updated by this statement, so it has no effect
// but still returns true because it was changed by this transaction
int TODO;
// assertTrue(m.trySet("2", null, true));
assertTrue(m.trySet("2", null, true));
assertTrue(m.trySet("3", "World", true));
// not seen within this statement
......@@ -118,7 +119,18 @@ public class TestTransactionStore extends TestBase {
version = s.getCurrentVersion();
m = tx.openMap("test", version);
// update test set id = 1
// TODO should fail: duplicate key
// should fail: duplicate key
assertTrue(m.trySet("2", null, true));
assertTrue(m.trySet("1", "Hello", true));
assertTrue(m.trySet("3", null, true));
assertFalse(m.trySet("1", "World", true));
tx.rollbackToSavepoint(startUpdate);
version = s.getCurrentVersion();
m = tx.openMap("test", version);
assertNull(m.get("1"));
assertEquals("Hello", m.get("2"));
assertEquals("World", m.get("3"));
tx.commit();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论