提交 ba2e2de7 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVTableEngine (WIP)

上级 a7fbd0b9
......@@ -9,8 +9,9 @@ package org.h2.engine;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import org.h2.command.Command;
import org.h2.command.CommandInterface;
......@@ -26,6 +27,7 @@ import org.h2.message.DbException;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.mvstore.db.MVTable;
import org.h2.mvstore.db.TransactionStore.Change;
import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.result.ResultInterface;
import org.h2.result.Row;
......@@ -38,6 +40,7 @@ import org.h2.table.Table;
import org.h2.util.New;
import org.h2.util.SmallLRUCache;
import org.h2.value.Value;
import org.h2.value.ValueArray;
import org.h2.value.ValueLong;
import org.h2.value.ValueNull;
import org.h2.value.ValueString;
......@@ -517,22 +520,20 @@ public class Session extends SessionWithState {
*/
public void rollback() {
checkCommitRollback();
if (transaction != null) {
Set<String> changed = transaction.getChangedMaps(0);
for (MVTable t : database.getMvStore().getTables()) {
if (changed.contains(t.getMapName())) {
t.setModified();
}
}
transaction.rollback();
transaction = null;
}
currentTransactionName = null;
boolean needCommit = false;
if (undoLog.size() > 0) {
rollbackTo(null, false);
needCommit = true;
}
if (transaction != null) {
rollbackTo(null, false);
needCommit = true;
// rollback stored the undo operations in the transaction
// committing will end the transaction
transaction.commit();
transaction = null;
}
if (locks.size() > 0 || needCommit) {
database.commit(this);
}
......@@ -558,13 +559,33 @@ public class Session extends SessionWithState {
undoLog.removeLast(trimToSize);
}
if (transaction != null) {
Set<String> changed = transaction.getChangedMaps(savepoint.transactionSavepoint);
for (MVTable t : database.getMvStore().getTables()) {
if (changed.contains(t.getMapName())) {
t.setModified();
long savepointId = savepoint == null ? 0 : savepoint.transactionSavepoint;
List<MVTable> tables = database.getMvStore().getTables();
HashMap<String, MVTable> tableMap = New.hashMap();
for (MVTable t : tables) {
tableMap.put(t.getMapName(), t);
}
Iterator<Change> it = transaction.getChanges(savepointId);
while (it.hasNext()) {
Change c = it.next();
MVTable t = tableMap.get(c.mapName);
if (t != null) {
long key = ((ValueLong) c.key).getLong();
ValueArray value = (ValueArray) c.value;
short op;
Row row;
if (value == null) {
op = UndoLogRecord.INSERT;
row = t.getRow(this, key);
} else {
op = UndoLogRecord.DELETE;
row = new Row(value.getList(), Row.MEMORY_CALCULATE);
}
row.setKey(key);
UndoLogRecord log = new UndoLogRecord(t, op, row);
log.undo(this);
}
}
transaction.rollbackToSavepoint(savepoint.transactionSavepoint);
}
if (savepoints != null) {
String[] names = new String[savepoints.size()];
......
......@@ -679,14 +679,14 @@ public class MVStore {
}
closeFile(true);
}
/**
* Close the file and the store, without writing anything.
*/
public void closeImmediately() {
closeFile(false);
}
private void closeFile(boolean shrinkIfPossible) {
if (closed) {
return;
......
......@@ -74,7 +74,7 @@ public class MVDelegateIndex extends BaseIndex {
@Override
public double getCost(Session session, int[] masks, SortOrder sortOrder) {
return 10 * getCostRangeIndex(masks, mainIndex.getRowCount(session), sortOrder);
return 10 * getCostRangeIndex(masks, mainIndex.getRowCountApproximation(), sortOrder);
}
@Override
......
......@@ -36,7 +36,7 @@ import org.h2.value.ValueNull;
public class MVPrimaryIndex extends BaseIndex {
private final MVTable mvTable;
private String mapName;
private final String mapName;
private TransactionMap<Value, Value> dataMap;
private long lastKey;
private int mainIndexColumn = -1;
......@@ -53,7 +53,7 @@ public class MVPrimaryIndex extends BaseIndex {
null, null, null);
ValueDataType valueType = new ValueDataType(
db.getCompareMode(), db, sortTypes);
mapName = getName() + "_" + getId();
mapName = "table." + getId();
MVMap.Builder<Value, Value> mapBuilder = new MVMap.Builder<Value, Value>().
keyType(keyType).
valueType(valueType);
......@@ -62,19 +62,6 @@ public class MVPrimaryIndex extends BaseIndex {
lastKey = k == null ? 0 : k.getLong();
}
/**
* Rename the index.
*
* @param newName the new name
*/
public void renameTable(String newName) {
TransactionMap<Value, Value> map = getMap(null);
rename(newName + "_DATA");
String newMapName = newName + "_DATA_" + getId();
map.renameMap(newMapName);
mapName = newMapName;
}
@Override
public String getCreateSQL() {
return null;
......@@ -134,7 +121,11 @@ public class MVPrimaryIndex extends BaseIndex {
e.setSource(this);
throw e;
}
map.put(key, ValueArray.get(row.getValueList()));
try {
map.put(key, ValueArray.get(row.getValueList()));
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
}
lastKey = Math.max(lastKey, row.getKey());
}
......@@ -149,10 +140,14 @@ public class MVPrimaryIndex extends BaseIndex {
}
}
TransactionMap<Value, Value> map = getMap(session);
Value old = map.remove(ValueLong.get(row.getKey()));
if (old == null) {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1,
getSQL() + ": " + row.getKey());
try {
Value old = map.remove(ValueLong.get(row.getKey()));
if (old == null) {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1,
getSQL() + ": " + row.getKey());
}
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
}
}
......@@ -201,8 +196,12 @@ public class MVPrimaryIndex extends BaseIndex {
@Override
public double getCost(Session session, int[] masks, SortOrder sortOrder) {
long cost = 10 * (dataMap.map.getSize() + Constants.COST_ROW_OFFSET);
return cost;
try {
long cost = 10 * (dataMap.map.getSize() + Constants.COST_ROW_OFFSET);
return cost;
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.OBJECT_CLOSED);
}
}
@Override
......@@ -261,7 +260,11 @@ public class MVPrimaryIndex extends BaseIndex {
@Override
public long getRowCountApproximation() {
return dataMap.map.getSize();
try {
return dataMap.map.getSize();
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.OBJECT_CLOSED);
}
}
@Override
......
......@@ -41,7 +41,7 @@ public class MVSecondaryIndex extends BaseIndex {
final MVTable mvTable;
private final int keyColumns;
private String mapName;
private final String mapName;
private TransactionMap<Value, Value> dataMap;
public MVSecondaryIndex(Database db, MVTable table, int id, String indexName,
......@@ -59,7 +59,7 @@ public class MVSecondaryIndex extends BaseIndex {
sortTypes[i] = columns[i].sortType;
}
sortTypes[keyColumns - 1] = SortOrder.ASCENDING;
mapName = getName() + "_" + getId();
mapName = "index." + getId();
ValueDataType keyType = new ValueDataType(
db.getCompareMode(), db, sortTypes);
ValueDataType valueType = new ValueDataType(null, null, null);
......@@ -74,15 +74,6 @@ public class MVSecondaryIndex extends BaseIndex {
// ok
}
@Override
public void rename(String newName) {
TransactionMap<Value, Value> map = getMap(null);
String newMapName = newName + "_" + getId();
map.renameMap(newMapName);
mapName = newMapName;
super.rename(newName);
}
@Override
public void add(Session session, Row row) {
TransactionMap<Value, Value> map = getMap(session);
......@@ -100,17 +91,25 @@ public class MVSecondaryIndex extends BaseIndex {
}
}
array.getList()[keyColumns - 1] = ValueLong.get(row.getKey());
map.put(array, ValueLong.get(0));
try {
map.put(array, ValueLong.get(0));
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
}
}
@Override
public void remove(Session session, Row row) {
ValueArray array = getKey(row);
TransactionMap<Value, Value> map = getMap(session);
Value old = map.remove(array);
if (old == null) {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1,
getSQL() + ": " + row.getKey());
try {
Value old = map.remove(array);
if (old == null) {
throw DbException.get(ErrorCode.ROW_NOT_FOUND_WHEN_DELETING_1,
getSQL() + ": " + row.getKey());
}
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
}
}
......@@ -164,7 +163,11 @@ public class MVSecondaryIndex extends BaseIndex {
@Override
public double getCost(Session session, int[] masks, SortOrder sortOrder) {
return 10 * getCostRangeIndex(masks, dataMap.map.getSize(), sortOrder);
try {
return 10 * getCostRangeIndex(masks, dataMap.map.getSize(), sortOrder);
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.OBJECT_CLOSED);
}
}
@Override
......@@ -208,7 +211,11 @@ public class MVSecondaryIndex extends BaseIndex {
@Override
public boolean needRebuild() {
return dataMap.map.getSize() == 0;
try {
return dataMap.map.getSize() == 0;
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.OBJECT_CLOSED);
}
}
@Override
......@@ -219,7 +226,11 @@ public class MVSecondaryIndex extends BaseIndex {
@Override
public long getRowCountApproximation() {
return dataMap.map.getSize();
try {
return dataMap.map.getSize();
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.OBJECT_CLOSED);
}
}
@Override
......
......@@ -131,12 +131,6 @@ public class MVTable extends TableBase {
}
}
@Override
public void rename(String newName) {
super.rename(newName);
primaryIndex.renameTable(newName);
}
private void doLock(Session session, int lockMode, boolean exclusive) {
traceLock(session, exclusive, "requesting for");
// don't get the current time unless necessary
......@@ -367,7 +361,7 @@ public class MVTable extends TableBase {
* @param key the primary key
* @return the row
*/
Row getRow(Session session, long key) {
public Row getRow(Session session, long key) {
return primaryIndex.getRow(session, key);
}
......@@ -393,7 +387,14 @@ public class MVTable extends TableBase {
// if (isPersistIndexes() && indexType.isPersistent()) {
int mainIndexColumn;
mainIndexColumn = getMainIndexColumn(indexType, cols);
if (!database.isStarting() && primaryIndex.getRowCount(session) != 0) {
if (database.isStarting()) {
index = new MVSecondaryIndex(session.getDatabase(),
this, indexId,
indexName, cols, indexType);
if (index.getRowCountApproximation() != 0) {
mainIndexColumn = -1;
}
} else if (primaryIndex.getRowCount(session) != 0) {
mainIndexColumn = -1;
}
if (mainIndexColumn != -1) {
......@@ -417,10 +418,10 @@ public class MVTable extends TableBase {
String n = getName() + ":" + index.getName();
int t = MathUtils.convertLongToInt(total);
while (cursor.next()) {
database.setProgress(DatabaseEventListener.STATE_CREATE_INDEX, n,
MathUtils.convertLongToInt(i++), t);
Row row = cursor.get();
buffer.add(row);
database.setProgress(DatabaseEventListener.STATE_CREATE_INDEX, n,
MathUtils.convertLongToInt(i++), t);
if (buffer.size() >= bufferSize) {
addRowsToIndex(session, buffer, index);
}
......
......@@ -149,6 +149,9 @@ public class MVTableEngine implements TableEngine {
store.setWriteDelay(value);
}
/**
* Rollback all open transactions.
*/
public void rollback() {
List<Transaction> list = transactionStore.getOpenTransactions();
for (Transaction t : list) {
......
......@@ -8,11 +8,9 @@ package org.h2.mvstore.db;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.h2.mvstore.Cursor;
import org.h2.mvstore.DataUtils;
......@@ -60,7 +58,7 @@ public class TransactionStore {
* transaction id.
*/
private final MVMap<String, String> settings;
private final DataType dataType;
private long lastTransactionIdStored;
......@@ -237,7 +235,9 @@ public class TransactionStore {
VersionedValue value = map.get(key);
// possibly the entry was added later on
// so we have to check
if (value.value == null) {
if (value == null) {
// nothing to do
} else if (value.value == null) {
// remove the value
map.remove(key);
}
......@@ -246,7 +246,7 @@ public class TransactionStore {
}
endTransaction(t);
}
private MVMap<Object, VersionedValue> openMap(int mapId) {
// TODO open map by id if possible
Map<String, String> meta = store.getMetaMap();
......@@ -257,7 +257,7 @@ public class TransactionStore {
}
String mapName = DataUtils.parseMap(m).get("name");
VersionedValueType vt = new VersionedValueType(dataType);
MVMap.Builder<Object, VersionedValue> mapBuilder =
MVMap.Builder<Object, VersionedValue> mapBuilder =
new MVMap.Builder<Object, VersionedValue>().
keyType(dataType).valueType(vt);
MVMap<Object, VersionedValue> map = store.openMap(mapName, mapBuilder);
......@@ -280,6 +280,10 @@ public class TransactionStore {
return false;
}
long[] key = undoLog.firstKey();
if (key == null) {
// unusual, but can happen
return false;
}
firstOpenTransaction = key[0];
}
if (firstOpenTransaction == transactionId) {
......@@ -338,30 +342,89 @@ public class TransactionStore {
}
/**
* Get the set of changed maps.
* Get the changes of the given transaction, starting from the latest log id
* back to the given log id.
*
* @param t the transaction
* @param maxLogId the maximum log id
* @param toLogId the minimum log id
* @return the set of changed maps
* @return the changes
*/
HashSet<String> getChangedMaps(Transaction t, long maxLogId, long toLogId) {
HashSet<String> set = New.hashSet();
for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
Object[] op = undoLog.get(new long[] {
t.getId(), logId });
int mapId = ((Integer) op[1]).intValue();
// TODO open map by id if possible
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
if (m == null) {
// map was removed later on
} else {
String mapName = DataUtils.parseMap(m).get("name");
set.add(mapName);
Iterator<Change> getChanges(final Transaction t, final long maxLogId, final long toLogId) {
return new Iterator<Change>() {
private long logId = maxLogId - 1;
private Change current;
{
fetchNext();
}
}
return set;
private void fetchNext() {
while (logId >= toLogId) {
Object[] op = undoLog.get(new long[] {
t.getId(), logId });
int mapId = ((Integer) op[1]).intValue();
// TODO open map by id if possible
Map<String, String> meta = store.getMetaMap();
String m = meta.get("map." + mapId);
logId--;
if (m == null) {
// map was removed later on
} else {
current = new Change();
current.mapName = DataUtils.parseMap(m).get("name");
current.key = op[2];
VersionedValue oldValue = (VersionedValue) op[3];
current.value = oldValue == null ? null : oldValue.value;
return;
}
}
current = null;
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public Change next() {
if (current == null) {
throw DataUtils.newUnsupportedOperationException("no data");
}
Change result = current;
fetchNext();
return result;
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException("remove");
}
};
}
/**
* A change in a map.
*/
public static class Change {
/**
* The name of the map where the change occurred.
*/
public String mapName;
/**
* The key.
*/
public Object key;
/**
* The value.
*/
public Object value;
}
/**
......@@ -536,15 +599,16 @@ public class TransactionStore {
}
/**
* Get the set of changed maps starting at the given savepoint up to
* now.
* Get the list of changes, starting with the latest change, up to the
* given savepoint (in reverse order than they occurred). The value of
* the change is the value before the change was applied.
*
* @param savepointId the savepoint id, 0 meaning the beginning of the
* transaction
* @return the set of changed maps
* @return the changes
*/
public Set<String> getChangedMaps(long savepointId) {
return store.getChangedMaps(this, logId, savepointId);
public Iterator<Change> getChanges(long savepointId) {
return store.getChanges(this, logId, savepointId);
}
/**
......@@ -945,8 +1009,8 @@ public class TransactionStore {
* @return the first key, or null if empty
*/
public K firstKey() {
// TODO transactional firstKey
return map.firstKey();
Iterator<K> it = keyIterator(null);
return it.hasNext() ? it.next() : null;
}
/**
......@@ -955,10 +1019,18 @@ public class TransactionStore {
* @return the last key, or null if empty
*/
public K lastKey() {
// TODO transactional lastKey
return map.lastKey();
K k = map.lastKey();
while (true) {
if (k == null) {
return null;
}
if (get(k) != null) {
return k;
}
k = map.lowerKey(k);
}
}
/**
* Get the most recent smallest key that is larger or equal to this key.
*
......@@ -975,7 +1047,7 @@ public class TransactionStore {
}
return null;
}
/**
* Get the smallest key that is larger or equal to this key.
*
......@@ -983,7 +1055,6 @@ public class TransactionStore {
* @return the result
*/
public K ceilingKey(K key) {
int test;
// TODO this method is slow
Cursor<K> cursor = map.keyIterator(key);
while (cursor.hasNext()) {
......@@ -993,8 +1064,6 @@ public class TransactionStore {
}
}
return null;
// TODO transactional ceilingKey
// return map.ceilingKey(key);
}
/**
......
......@@ -83,7 +83,7 @@ public class ValueDataType implements DataType {
final CompareMode compareMode;
final int[] sortTypes;
ValueDataType(CompareMode compareMode, DataHandler handler, int[] sortTypes) {
public ValueDataType(CompareMode compareMode, DataHandler handler, int[] sortTypes) {
this.compareMode = compareMode;
this.handler = handler;
this.sortTypes = sortTypes;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论