提交 9565beff authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore mode: creating indexes is now much faster.

上级 9d0f56d3
......@@ -688,6 +688,7 @@ public class Database implements DataHandler {
}
if (mvStore != null) {
mvStore.initTransactions();
mvStore.removeTemporaryMaps();
}
recompileInvalidViews(systemSession);
starting = false;
......
......@@ -815,7 +815,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return store.getMapName(id);
}
MVStore getStore() {
public MVStore getStore() {
return store;
}
......
......@@ -46,17 +46,16 @@ Documentation
- better document how to do non-unique indexes
- document pluggable store and OffHeapStore
TestMVStoreDataLoss
MVTableEngine:
- verify tests don't use the PageStore
- verify that tests don't use the PageStore
- test and possibly allow MVCC & MULTI_THREADED
- maybe enable MVCC by default (but allow to disable it)
- use StreamStore to avoid deadlocks
- config options for compression and page size (maybe combined)
- test with MVStore.ASSERT enabled
TransactionStore:
- ability to disable the transaction log,
if there is only one connection
MVStore:
- automated 'kill process' and 'power failure' test
......@@ -67,7 +66,7 @@ MVStore:
- compact: avoid processing pages using a counting bloom filter
- defragment (re-creating maps, specially those with small pages)
- chunk header: store changed chunk data as row; maybe after the root
- chunk checksum (header, last page, 2 bytes per page?)
- two chunk checksums (header+last page; the second one 2 bytes per page)
- maybe let a chunk point to a list of potential next chunks
(so no fixed location header is needed), similar to a skip list
- store number of write operations per page (maybe defragment
......@@ -78,7 +77,6 @@ MVStore:
- MVStoreTool.dump: dump values (using a callback)
- ensure data is overwritten eventually if the system doesn't have a
real-time clock (Raspberry Pi) and if there are few writes per startup
- SSD-friendly write (always in blocks of 4 MB / 1 second?)
- close the file on out of memory or disk write error (out of disk space or so)
- implement a sharded map (in one store, multiple stores)
to support concurrent updates and writes, and very large maps
......@@ -88,9 +86,6 @@ MVStore:
- maybe rename 'rollback' to 'revert' to distinguish from transactions
- support other compression algorithms (deflate, LZ4,...)
- support opening (existing) maps by id
- more consistent null handling (keys/values sometimes may be null)
- autocommit (to avoid having to call commit,
as it could be called too often or it is easily forgotten)
- remove features that are not really needed; simplify the code
possibly using a separate layer or tools
(retainVersion?)
......@@ -117,22 +112,15 @@ MVStore:
- support log structured merge style operations (blind writes)
using one map per level plus bloom filter
- have a strict call order MVStore -> MVMap -> Page -> FileStore
- autocommit mode (default) and manual mode
- manual mode: combine commit and store;
rollback only to chunk
- rename writeDelay to commitDelay, default 1 s
- rollback() to rollback to the latest commit; throws exception
in autocommit mode
- fix documentation (including examples)
- autocommit commits, stores, and compacts from time to time;
the background thread should wait at least 90% of the
configured write delay to store changes
- currently, uncommitted changes are stored if there are many transient changes,
and rolled back when opening - is this really needed?
- compact* should also store uncommitted changes (if there are any)
- write a LSM-tree (log structured merge tree) utility on top of the MVStore
- improve memory calculation for transient and cache
specially for large pages (StreamStore)
specially for large pages (when using the StreamStore)
- StreamStore: split blocks similar to rsync crypto, where the split is made
"if the sum of the past 8196 bytes divides by 4096 with zero remainder"
*/
......
......@@ -6,6 +6,8 @@
*/
package org.h2.mvstore.db;
import java.util.List;
import org.h2.engine.Session;
import org.h2.index.BaseIndex;
import org.h2.index.Cursor;
......@@ -22,7 +24,7 @@ import org.h2.value.ValueLong;
/**
* An index that delegates indexing to another index.
*/
public class MVDelegateIndex extends BaseIndex {
public class MVDelegateIndex extends BaseIndex implements MVIndex {
private final MVPrimaryIndex mainIndex;
......@@ -37,6 +39,16 @@ public class MVDelegateIndex extends BaseIndex {
}
}
@Override
public void addRowsToBuffer(List<Row> rows, String bufferName) {
throw DbException.throwInternalError();
}
@Override
public void addBufferedRows(List<String> bufferNames) {
throw DbException.throwInternalError();
}
@Override
public void add(Session session, Row row) {
// nothing to do
......
/*
* Copyright 2004-2013 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore.db;
import java.util.List;
import org.h2.index.Index;
import org.h2.result.Row;
/**
* An index that stores the data in an MVStore.
*/
public interface MVIndex extends Index {
/**
* Add the rows to a temporary storage (not to the index yet). The rows are
* sorted by the index columns. This is to more quickly build the index.
*
* @param rows the rows
* @param bufferName the name of the temporary storage
*/
void addRowsToBuffer(List<Row> rows, String bufferName);
/**
* Add all the index data from the buffers to the index. The index will
* typically use merge sort to add the data more quickly in sorted order.
*
* @param bufferNames the names of the temporary storage
*/
void addBufferedRows(List<String> bufferNames);
}
......@@ -7,8 +7,11 @@
package org.h2.mvstore.db;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import org.h2.constant.ErrorCode;
import org.h2.engine.Database;
......@@ -17,6 +20,7 @@ import org.h2.index.BaseIndex;
import org.h2.index.Cursor;
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.mvstore.db.TransactionStore.TransactionMap;
import org.h2.result.Row;
......@@ -26,6 +30,7 @@ import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.TableFilter;
import org.h2.util.New;
import org.h2.value.CompareMode;
import org.h2.value.Value;
import org.h2.value.ValueArray;
import org.h2.value.ValueLong;
......@@ -34,7 +39,7 @@ import org.h2.value.ValueNull;
/**
* A table stored in a MVStore.
*/
public class MVSecondaryIndex extends BaseIndex {
public class MVSecondaryIndex extends BaseIndex implements MVIndex {
/**
* The multi-value table.
......@@ -55,20 +60,128 @@ public class MVSecondaryIndex extends BaseIndex {
// always store the row key in the map key,
// even for unique indexes, as some of the index columns could be null
keyColumns = columns.length + 1;
mapName = "index." + getId();
int[] sortTypes = new int[keyColumns];
for (int i = 0; i < columns.length; i++) {
sortTypes[i] = columns[i].sortType;
}
sortTypes[keyColumns - 1] = SortOrder.ASCENDING;
mapName = "index." + getId();
ValueDataType keyType = new ValueDataType(
db.getCompareMode(), db, sortTypes);
ValueDataType valueType = new ValueDataType(null, null, null);
dataMap = mvTable.getTransaction(null).openMap(
mapName, keyType, valueType);
if (keyType != dataMap.getKeyType()) {
if (!keyType.equals(dataMap.getKeyType())) {
throw DbException.throwInternalError("Incompatible key type");
}
}
@Override
public void addRowsToBuffer(List<Row> rows, String bufferName) {
MVMap<Value, Value> map = openMap(bufferName);
for (Row row : rows) {
ValueArray key = getKey(row);
map.put(key, ValueNull.INSTANCE);
}
}
@Override
public void addBufferedRows(List<String> bufferNames) {
ArrayList<String> mapNames = New.arrayList(bufferNames);
final CompareMode compareMode = database.getCompareMode();
/**
* A source of values.
*/
class Source implements Comparable<Source> {
Value value;
Iterator<Value> next;
int sourceId;
@Override
public int compareTo(Source o) {
int comp = value.compareTo(o.value, compareMode);
if (comp == 0) {
comp = sourceId - o.sourceId;
}
return comp;
}
}
TreeSet<Source> sources = new TreeSet<Source>();
for (int i = 0; i < bufferNames.size(); i++) {
MVMap<Value, Value> map = openMap(bufferNames.get(i));
Iterator<Value> it = map.keyIterator(null);
if (it.hasNext()) {
Source s = new Source();
s.value = it.next();
s.next = it;
s.sourceId = i;
sources.add(s);
}
}
try {
while (true) {
Source s = sources.first();
Value v = s.value;
if (indexType.isUnique()) {
ValueArray unique = (ValueArray) v;
Value[] array = unique.getList();
array = Arrays.copyOf(array, array.length);
unique = ValueArray.get(unique.getList());
unique.getList()[keyColumns - 1] = ValueLong.get(Long.MIN_VALUE);
ValueArray key = (ValueArray) dataMap.getLatestCeilingKey(unique);
if (key != null) {
SearchRow r2 = getRow(key.getList());
SearchRow row = getRow(((ValueArray) v).getList());
if (compareRows(row, r2) == 0) {
if (!containsNullAndAllowMultipleNull(r2)) {
throw getDuplicateKeyException(key.toString());
}
}
}
}
dataMap.putCommitted(v, ValueNull.INSTANCE);
Iterator<Value> it = s.next;
if (!it.hasNext()) {
sources.remove(s);
if (sources.size() == 0) {
break;
}
} else {
Value nextValue = it.next();
sources.remove(s);
if (sources.size() == 0) {
break;
}
s.value = nextValue;
sources.add(s);
}
}
} finally {
for (String tempMapName : mapNames) {
MVMap<Value, Value> map = openMap(tempMapName);
map.getStore().removeMap(map);
}
}
}
private MVMap<Value, Value> openMap(String mapName) {
int[] sortTypes = new int[keyColumns];
for (int i = 0; i < indexColumns.length; i++) {
sortTypes[i] = indexColumns[i].sortType;
}
sortTypes[keyColumns - 1] = SortOrder.ASCENDING;
ValueDataType keyType = new ValueDataType(
database.getCompareMode(), database, sortTypes);
ValueDataType valueType = new ValueDataType(null, null, null);
MVMap.Builder<Value, Value> builder =
new MVMap.Builder<Value, Value>().keyType(keyType).valueType(valueType);
MVMap<Value, Value> map = database.getMvStore().getStore().openMap(mapName, builder);
if (!keyType.equals(map.getKeyType())) {
throw DbException.throwInternalError("Incompatible key type");
}
return map;
}
@Override
......@@ -101,8 +214,6 @@ public class MVSecondaryIndex extends BaseIndex {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
}
if (indexType.isUnique()) {
// check if there is another (uncommitted) entry
// TODO use entry iterator
Iterator<Value> it = map.keyIterator(unique, true);
while (it.hasNext()) {
ValueArray k = (ValueArray) it.next();
......
......@@ -6,6 +6,7 @@
package org.h2.mvstore.db;
import java.util.Iterator;
import java.util.List;
import org.h2.constant.ErrorCode;
import org.h2.engine.Constants;
......@@ -44,7 +45,7 @@ import com.vividsolutions.jts.geom.Geometry;
* @author Noel Grandin
* @author Nicolas Fortin, Atelier SIG, IRSTV FR CNRS 24888
*/
public class MVSpatialIndex extends BaseIndex implements SpatialIndex {
public class MVSpatialIndex extends BaseIndex implements SpatialIndex, MVIndex {
/**
* The multi-value table.
......@@ -100,6 +101,16 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex {
dataMap = mvTable.getTransaction(null).openMap(spatialMap);
}
@Override
public void addRowsToBuffer(List<Row> rows, String bufferName) {
throw DbException.throwInternalError();
}
@Override
public void addBufferedRows(List<String> bufferNames) {
throw DbException.throwInternalError();
}
@Override
public void close(Session session) {
// ok
......
......@@ -28,6 +28,7 @@ import org.h2.index.IndexType;
import org.h2.index.MultiVersionIndex;
import org.h2.message.DbException;
import org.h2.message.Trace;
import org.h2.mvstore.db.MVTableEngine.Store;
import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.result.Row;
import org.h2.result.SortOrder;
......@@ -333,7 +334,6 @@ public class MVTable extends TableBase {
@Override
public boolean canTruncate() {
// TODO copy & pasted source code from RegularTable
if (getCheckForeignKeyConstraints() && database.getReferentialIntegrity()) {
ArrayList<Constraint> constraints = getConstraints();
if (constraints != null) {
......@@ -379,7 +379,7 @@ public class MVTable extends TableBase {
if (!isSessionTemporary) {
database.lockMeta(session);
}
Index index;
MVIndex index;
// TODO support in-memory indexes
// if (isPersistIndexes() && indexType.isPersistent()) {
int mainIndexColumn;
......@@ -405,64 +405,122 @@ public class MVTable extends TableBase {
indexName, cols, indexType);
}
if (index.needRebuild()) {
rebuildIndex(session, index, indexName);
}
index.setTemporary(isTemporary());
if (index.getCreateSQL() != null) {
index.setComment(indexComment);
if (isSessionTemporary) {
session.addLocalTempTableIndex(index);
} else {
database.addSchemaObject(session, index);
}
}
indexes.add(index);
setModified();
return index;
}
private void rebuildIndex(Session session, MVIndex index, String indexName) {
try {
// TODO Speed up creating an index, for example as follows: Read
// up to about 1 MB of entries in memory, sort them (detect
// duplicates here), write to a new map (in sorted order);
// repeat (using a new map for every block of 1 MB) until all
// record are read. Merge all maps to the target (using merge
// sort; duplicates are detected in the target). This is similar
// to a LSM tree. For randomly ordered data, this should use
// much less write operations than the current algorithm.
if (session.getDatabase().getMvStore() == null) {
// in-memory
rebuildIndexBuffered(session, index);
} else {
rebuildIndexBlockMerge(session, index);
}
} catch (DbException e) {
getSchema().freeUniqueName(indexName);
try {
index.remove(session);
} catch (DbException e2) {
// this could happen, for example on failure in the storage
// but if that is not the case it means
// there is something wrong with the database
trace.error(e2, "could not remove index");
throw e2;
}
throw e;
}
}
private void rebuildIndexBlockMerge(Session session, MVIndex index) {
if (index instanceof MVSpatialIndex) {
// the spatial index doesn't support multi-way merge sort
rebuildIndexBuffered(session, index);
}
// Read entries in memory, sort them, write to a new map (in sorted
// order); repeat (using a new map for every block of 1 MB) until all
// record are read. Merge all maps to the target (using merge sort;
// duplicates are detected in the target). For randomly ordered data,
// this should use relatively few write operations.
// A possible optimization is: change the buffer size from "row count"
// to "amount of memory", and buffer index keys instead of rows.
Index scan = getScanIndex(session);
long remaining = scan.getRowCount(session);
long total = remaining;
Cursor cursor = scan.find(session, null, null);
long i = 0;
int bufferSize = (int) Math.min(total, Constants.DEFAULT_MAX_MEMORY_ROWS);
Store store = session.getDatabase().getMvStore();
int bufferSize = Constants.DEFAULT_MAX_MEMORY_ROWS / 2;
ArrayList<Row> buffer = New.arrayList(bufferSize);
String n = getName() + ":" + index.getName();
int t = MathUtils.convertLongToInt(total);
ArrayList<String> bufferNames = New.arrayList();
while (cursor.next()) {
Row row = cursor.get();
buffer.add(row);
database.setProgress(DatabaseEventListener.STATE_CREATE_INDEX, n,
MathUtils.convertLongToInt(i++), t);
if (buffer.size() >= bufferSize) {
addRowsToIndex(session, buffer, index);
sortRows(buffer, index);
String mapName = store.nextTemporaryMapName();
index.addRowsToBuffer(buffer, mapName);
bufferNames.add(mapName);
buffer.clear();
}
remaining--;
}
sortRows(buffer, index);
if (bufferNames.size() > 0) {
String mapName = store.nextTemporaryMapName();
index.addRowsToBuffer(buffer, mapName);
bufferNames.add(mapName);
buffer.clear();
index.addBufferedRows(bufferNames);
} else {
addRowsToIndex(session, buffer, index);
}
if (SysProperties.CHECK && remaining != 0) {
DbException.throwInternalError("rowcount remaining=" + remaining + " " + getName());
}
} catch (DbException e) {
getSchema().freeUniqueName(indexName);
try {
index.remove(session);
} catch (DbException e2) {
// this could happen, for example on failure in the storage
// but if that is not the case it means
// there is something wrong with the database
trace.error(e2, "could not remove index");
throw e2;
}
throw e;
}
private void rebuildIndexBuffered(Session session, Index index) {
Index scan = getScanIndex(session);
long remaining = scan.getRowCount(session);
long total = remaining;
Cursor cursor = scan.find(session, null, null);
long i = 0;
int bufferSize = (int) Math.min(total, Constants.DEFAULT_MAX_MEMORY_ROWS);
ArrayList<Row> buffer = New.arrayList(bufferSize);
String n = getName() + ":" + index.getName();
int t = MathUtils.convertLongToInt(total);
while (cursor.next()) {
Row row = cursor.get();
buffer.add(row);
database.setProgress(DatabaseEventListener.STATE_CREATE_INDEX, n,
MathUtils.convertLongToInt(i++), t);
if (buffer.size() >= bufferSize) {
addRowsToIndex(session, buffer, index);
}
index.setTemporary(isTemporary());
if (index.getCreateSQL() != null) {
index.setComment(indexComment);
if (isSessionTemporary) {
session.addLocalTempTableIndex(index);
} else {
database.addSchemaObject(session, index);
remaining--;
}
addRowsToIndex(session, buffer, index);
if (SysProperties.CHECK && remaining != 0) {
DbException.throwInternalError("rowcount remaining=" + remaining + " " + getName());
}
indexes.add(index);
setModified();
return index;
}
private int getMainIndexColumn(IndexType indexType, IndexColumn[] cols) {
......@@ -489,17 +547,20 @@ public class MVTable extends TableBase {
}
private static void addRowsToIndex(Session session, ArrayList<Row> list, Index index) {
final Index idx = index;
sortRows(list, index);
for (Row row : list) {
index.add(session, row);
}
list.clear();
}
private static void sortRows(ArrayList<Row> list, final Index index) {
Collections.sort(list, new Comparator<Row>() {
@Override
public int compare(Row r1, Row r2) {
return idx.compareRows(r1, r2);
return index.compareRows(r1, r2);
}
});
for (Row row : list) {
index.add(session, row);
}
list.clear();
}
@Override
......
......@@ -24,6 +24,7 @@ import org.h2.engine.Session;
import org.h2.message.DbException;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.FileStore;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.store.InDoubtTransaction;
......@@ -143,6 +144,8 @@ public class MVTableEngine implements TableEngine {
private long statisticsStart;
private int temporaryMapId;
public Store(Database db, MVStore store) {
this.db = db;
this.store = store;
......@@ -209,6 +212,27 @@ public class MVTableEngine implements TableEngine {
}
}
/**
* Remove all temporary maps.
*/
public void removeTemporaryMaps() {
for (String mapName : store.getMapNames()) {
if (mapName.startsWith("temp.")) {
MVMap<?, ?> map = store.openMap(mapName);
store.removeMap(map);
}
}
}
/**
* Get the name of the next available temporary map.
*
* @return the map name
*/
public synchronized String nextTemporaryMapName() {
return "temp." + temporaryMapId++;
}
/**
* Prepare a transaction.
*
......
......@@ -967,6 +967,22 @@ public class TransactionStore {
return set(key, value);
}
/**
* Update the value for the given key, without adding an undo log entry.
*
* @param key the key
* @param value the value
* @return the old value
*/
@SuppressWarnings("unchecked")
public V putCommitted(K key, V value) {
DataUtils.checkArgument(value != null, "The value may not be null");
VersionedValue newValue = new VersionedValue();
newValue.value = value;
VersionedValue oldValue = map.put(key, newValue);
return (V) (oldValue == null ? null : oldValue.value);
}
private V set(K key, V value) {
transaction.checkNotClosed();
V old = get(key);
......
......@@ -12,6 +12,7 @@ import java.nio.ByteBuffer;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Arrays;
import org.h2.constant.ErrorCode;
import org.h2.message.DbException;
......@@ -571,4 +572,23 @@ public class ValueDataType implements DataType {
return DataUtils.readString(buff, len);
}
@Override
public int hashCode() {
return compareMode.hashCode() ^ Arrays.hashCode(sortTypes);
}
@Override
public boolean equals(Object obj) {
if (obj == this) {
return true;
} else if (!(obj instanceof ValueDataType)) {
return false;
}
ValueDataType v = (ValueDataType) obj;
if (!compareMode.equals(v.compareMode)) {
return false;
}
return Arrays.equals(sortTypes, v.sortTypes);
}
}
......@@ -42,7 +42,49 @@ public class TestBenchmark extends TestBase {
}
private void test(boolean mvStore) throws Exception {
testBinary(mvStore);
testCreateIndex(mvStore);
}
private void testCreateIndex(boolean mvStore) throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn;
Statement stat;
String url = "mvstore";
if (mvStore) {
url += ";MV_STORE=TRUE;MV_STORE=TRUE";
}
url = getURL(url, true);
conn = getConnection(url);
stat = conn.createStatement();
stat.execute("create table test(id bigint primary key, data bigint)");
conn.setAutoCommit(false);
PreparedStatement prep = conn
.prepareStatement("insert into test values(?, ?)");
// int rowCount = 10000000;
int rowCount = 1000000;
Random r = new Random(1);
for (int i = 0; i < rowCount; i++) {
prep.setInt(1, i);
prep.setInt(2, i);
// prep.setInt(2, r.nextInt());
prep.execute();
if (i % 10000 == 0) {
conn.commit();
}
}
long start = System.currentTimeMillis();
// Profiler prof = new Profiler().startCollecting();
stat.execute("create index on test(data)");
// System.out.println(prof.getTop(5));
System.out.println((System.currentTimeMillis() - start) + " "
+ (mvStore ? "mvstore" : "default"));
conn.close();
}
private void testBinary(boolean mvStore) throws Exception {
......@@ -89,7 +131,7 @@ public class TestBenchmark extends TestBase {
conn.close();
}
void randomize(byte[] data, int i) {
private void randomize(byte[] data, int i) {
Random r = new Random(i);
r.nextBytes(data);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论