提交 5f5eb72d authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: simpler API

上级 7cb0c93e
......@@ -847,7 +847,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Forget those old versions that are no longer needed.
*/
void removeUnusedOldVersions() {
long oldest = store.getRetainOrStoreVersion();
long oldest = store.getOldestVersionToKeep();
if (oldest == -1) {
return;
}
......
......@@ -96,7 +96,7 @@ public class MVSecondaryIndex extends BaseIndex {
}
}
try {
map.put(array, ValueLong.get(0));
map.put(array, ValueNull.INSTANCE);
} catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
}
......
......@@ -54,10 +54,11 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex {
private final String mapName;
private TransactionMap<SpatialKey, Value> dataMap;
private MVRTreeMap<VersionedValue> spatialMap;
/**
* Constructor.
*
* @param db the database
* @param table the table instance
* @param id the index id
* @param indexName the index name
......@@ -66,7 +67,7 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex {
*/
public MVSpatialIndex(
Database db, MVTable table, int id, String indexName,
IndexColumn[] columns, IndexType indexType) {
IndexColumn[] columns, IndexType indexType) {
if (columns.length != 1) {
throw DbException.getUnsupportedException("Can only index one column");
}
......@@ -92,7 +93,7 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex {
mapName = "index." + getId();
ValueDataType vt = new ValueDataType(null, null, null);
VersionedValueType valueType = new VersionedValueType(vt);
MVRTreeMap.Builder<VersionedValue> mapBuilder =
MVRTreeMap.Builder<VersionedValue> mapBuilder =
new MVRTreeMap.Builder<VersionedValue>().
valueType(valueType);
spatialMap = db.getMvStore().getStore().openMap(mapName, mapBuilder);
......@@ -144,7 +145,7 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex {
}
}
}
private SpatialKey getKey(SearchRow r) {
if (r == null) {
return null;
......@@ -171,7 +172,7 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
}
}
@Override
public Cursor find(TableFilter filter, SearchRow first, SearchRow last) {
return find(filter.getSession());
......@@ -200,7 +201,7 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex {
Iterator<SpatialKey> it = map.wrapIterator(cursor, false);
return new MVStoreCursor(session, it);
}
private SpatialKey getEnvelope(SearchRow row) {
Value v = row.getValue(columnIds[0]);
Geometry g = ((ValueGeometry) v.convertTo(Value.GEOMETRY)).getGeometry();
......@@ -232,7 +233,7 @@ public class MVSpatialIndex extends BaseIndex implements SpatialIndex {
public double getCost(Session session, int[] masks, TableFilter filter, SortOrder sortOrder) {
return getCostRangeIndex(masks, table.getRowCountApproximation(), filter, sortOrder);
}
@Override
protected long getCostRangeIndex(int[] masks, long rowCount, TableFilter filter, SortOrder sortOrder) {
rowCount += Constants.COST_ROW_OFFSET;
......
......@@ -399,7 +399,7 @@ public class MVTable extends TableBase {
indexName, primaryIndex, indexType);
} else if (indexType.isSpatial()) {
index = new MVSpatialIndex(session.getDatabase(),
this, indexId,
this, indexId,
indexName, cols, indexType);
} else {
index = new MVSecondaryIndex(session.getDatabase(),
......
......@@ -168,13 +168,13 @@ public class MVTableEngine implements TableEngine {
/**
* Store all pending changes.
*/
public void store() {
public void flush() {
FileStore s = store.getFileStore();
if (s == null || s.isReadOnly()) {
return;
}
if (!store.compact(50)) {
store.store();
store.commit();
}
}
......@@ -213,9 +213,7 @@ public class MVTableEngine implements TableEngine {
Transaction t = session.getTransaction();
t.setName(transactionName);
t.prepare();
if (store.getFileStore() != null) {
store.store();
}
store.commit();
}
public ArrayList<InDoubtTransaction> getInDoubtTransactions() {
......@@ -245,7 +243,7 @@ public class MVTableEngine implements TableEngine {
* Force the changes to disk.
*/
public void sync() {
store();
flush();
store.sync();
}
......@@ -316,9 +314,7 @@ public class MVTableEngine implements TableEngine {
} else {
transaction.rollback();
}
if (store.getFileStore() != null) {
store.store();
}
store.commit();
this.state = state;
}
......
......@@ -170,9 +170,7 @@ public class TransactionStore {
public synchronized void close() {
// to avoid losing transaction ids
settings.put(LAST_TRANSACTION_ID, "" + lastTransactionId);
if (store.getFileStore() != null) {
store.store();
}
store.commit();
}
/**
......@@ -192,9 +190,7 @@ public class TransactionStore {
private void commitIfNeeded() {
if (store.getUnsavedPageCount() > MAX_UNSAVED_PAGES) {
if (store.getFileStore() != null) {
store.store();
}
store.commit();
}
}
......@@ -354,10 +350,8 @@ public class TransactionStore {
if (t.getId() == firstOpenTransaction) {
firstOpenTransaction = -1;
}
if (store.getWriteDelay() == 0) {
if (store.getFileStore() != null) {
store.store();
}
if (store.getAutoCommitDelay() == 0) {
store.commit();
return;
}
// to avoid having to store the transaction log,
......@@ -365,10 +359,10 @@ public class TransactionStore {
// and if there have been many changes, store them now
if (undoLog.isEmpty()) {
int unsaved = store.getUnsavedPageCount();
int max = store.getUnsavedPageCountMax();
int max = store.getAutoCommitPageCount();
// save at 3/4 capacity
if (unsaved * 4 > max * 3) {
store.store();
store.commit();
}
}
}
......@@ -658,7 +652,15 @@ public class TransactionStore {
int mapId = map.getId();
return new TransactionMap<K, V>(this, map, mapId);
}
/**
* Open the transactional version of the given map.
*
* @param <K> the key type
* @param <V> the value type
* @param map the base map
* @return the transactional map
*/
public <K, V> TransactionMap<K, V> openMap(MVMap<K, VersionedValue> map) {
checkNotClosed();
int mapId = map.getId();
......@@ -726,9 +728,11 @@ public class TransactionStore {
DataUtils.ERROR_CLOSED, "Transaction is closed");
}
}
/**
* Remove the map.
*
* @param map the map
*/
public <K, V> void removeMap(TransactionMap<K, V> map) {
store.store.removeMap(map.map);
......@@ -1258,7 +1262,7 @@ public class TransactionStore {
}
};
}
public Transaction getTransaction() {
return transaction;
}
......
......@@ -56,33 +56,32 @@ public class TestConcurrent extends TestMVStore {
testConcurrentWrite();
testConcurrentRead();
}
private void testConcurrentFree() throws InterruptedException {
String fileName = "memFS:testConcurrentFree.h3";
for (int test = 0; test < 10; test++) {
FileUtils.delete(fileName);
final MVStore s1 = new MVStore.Builder().
fileName(fileName).writeDelay(-1).open();
fileName(fileName).autoCommitDisabled().open();
s1.setRetentionTime(0);
final int count = 200;
for (int i = 0; i < count; i++) {
MVMap<Integer, Integer> m = s1.openMap("d" + i);
m.put(1, 1);
if (i % 2 == 0) {
s1.store();
s1.commit();
}
}
s1.store();
s1.close();
final MVStore s = new MVStore.Builder().
fileName(fileName).writeDelay(-1).open();
fileName(fileName).autoCommitDisabled().open();
s.setRetentionTime(0);
final ArrayList<MVMap<Integer, Integer>> list = New.arrayList();
for (int i = 0; i < count; i++) {
MVMap<Integer, Integer> m = s.openMap("d" + i);
list.add(m);
}
final AtomicInteger counter = new AtomicInteger();
Task task = new Task() {
@Override
......@@ -113,8 +112,8 @@ public class TestConcurrent extends TestMVStore {
}
}
task.get();
s.store();
s.commit();
MVMap<String, String> meta = s.getMetaMap();
int chunkCount = 0;
for (String k : meta.keyList()) {
......@@ -142,7 +141,7 @@ public class TestConcurrent extends TestMVStore {
public void call() throws Exception {
while (!stop) {
counter.incrementAndGet();
s.store();
s.commit();
}
}
};
......@@ -172,7 +171,7 @@ public class TestConcurrent extends TestMVStore {
public void call() throws Exception {
while (!stop) {
s.setStoreVersion(counter.incrementAndGet());
s.store();
s.commit();
}
}
};
......@@ -270,9 +269,9 @@ public class TestConcurrent extends TestMVStore {
for (int i = 0; i < 10; i++) {
map.put(i, new byte[100 * r.nextInt(100)]);
}
s.store();
s.commit();
map.clear();
s.store();
s.commit();
long len = s.getFileStore().size();
if (len > 1024 * 1024) {
// slow down writing a lot
......@@ -322,6 +321,7 @@ public class TestConcurrent extends TestMVStore {
private void testConcurrentIterate() {
MVStore s = new MVStore.Builder().pageSplitSize(3).open();
s.setVersionsToKeep(100);
final MVMap<Integer, Integer> map = s.openMap("test");
final int len = 10;
final Random r = new Random();
......@@ -343,7 +343,6 @@ public class TestConcurrent extends TestMVStore {
Iterator<Integer> it = map.keyIterator(r.nextInt(len));
long old = s.getCurrentVersion();
s.commit();
s.setRetainVersion(old - 100);
while (map.getVersion() == old) {
Thread.yield();
}
......
......@@ -84,9 +84,8 @@ public class TestKillProcessWhileWriting extends TestBase {
s = new MVStore.Builder().
fileName(fileName).
pageSplitSize(50).
writeDelay(0).
autoCommitDisabled().
open();
s.setWriteDelay(0);
m = s.openMap("data");
Random r = new Random(seed);
int op = 0;
......@@ -107,7 +106,7 @@ public class TestKillProcessWhileWriting extends TestBase {
m.remove(k);
break;
case 6:
s.store();
s.commit();
break;
case 7:
s.compact(80);
......@@ -120,12 +119,12 @@ public class TestKillProcessWhileWriting extends TestBase {
s = new MVStore.Builder().
fileName(fileName).
pageSplitSize(50).
writeDelay(0).open();
autoCommitDisabled().
open();
m = s.openMap("data");
break;
}
}
s.store();
s.close();
return 0;
} catch (Exception e) {
......
......@@ -54,7 +54,7 @@ public class TestMVRTree extends TestMVStore {
testRandom();
testRandomFind();
}
private void testSpatialKey() {
SpatialKey a0 = new SpatialKey(0, 1, 2, 3, 4);
SpatialKey a1 = new SpatialKey(0, 1, 2, 3, 4);
......@@ -119,7 +119,7 @@ public class TestMVRTree extends TestMVStore {
SpatialKey k = new SpatialKey(i, x - p, x + p, y - p, y + p);
r.add(k, "" + i);
if (i > 0 && (i % len / 10) == 0) {
s.store();
s.commit();
}
if (i > 0 && (i % 10000) == 0) {
render(r, getBaseDir() + "/test.png");
......@@ -127,7 +127,6 @@ public class TestMVRTree extends TestMVStore {
}
// System.out.println(prof.getTop(5));
// System.out.println("add: " + (System.currentTimeMillis() - t));
s.store();
s.close();
s = openStore(fileName);
r = s.openMap("data",
......
......@@ -92,7 +92,7 @@ public class TestRandomMapOps extends TestBase {
for (; op < size; op++) {
int k = r.nextInt(100);
byte[] v = new byte[r.nextInt(10) * 10];
int type = r.nextInt(13);
int type = r.nextInt(12);
switch (type) {
case 0:
case 1:
......@@ -109,23 +109,19 @@ public class TestRandomMapOps extends TestBase {
map.remove(k);
break;
case 6:
log(op, k, v, "s.store()");
s.store();
break;
case 7:
log(op, k, v, "s.compact(90)");
s.compact(90);
break;
case 8:
case 7:
log(op, k, v, "m.clear()");
m.clear();
map.clear();
break;
case 9:
case 8:
log(op, k, v, "s.commit()");
s.commit();
break;
case 10:
case 9:
log(op, k, v, "s.commit()");
s.commit();
log(op, k, v, "s.close()");
......@@ -135,13 +131,13 @@ public class TestRandomMapOps extends TestBase {
log(op, k, v, "m = s.openMap(\"data\")");
m = s.openMap("data");
break;
case 11:
case 10:
log(op, k, v, "s.commit()");
s.commit();
log(op, k, v, "s.compactMoveChunks()");
s.compactMoveChunks();
break;
case 12:
case 11:
log(op, k, v, "m.getKeyIndex({0})");
ArrayList<Integer> keyList = new ArrayList<Integer>(map.keySet());
int index = Collections.binarySearch(keyList, k, null);
......@@ -165,13 +161,12 @@ public class TestRandomMapOps extends TestBase {
assertEquals(map.lastKey(), m.lastKey());
}
}
s.store();
s.close();
}
private static MVStore openStore(String fileName) {
MVStore s = new MVStore.Builder().fileName(fileName).
pageSplitSize(50).writeDelay(0).open();
pageSplitSize(50).autoCommitDisabled().open();
s.setRetentionTime(0);
return s;
}
......
......@@ -65,7 +65,7 @@ public class TestStreamStore extends TestBase {
for (int i = 0; i < 100; i++) {
streamStore.put(new RandomStream(size, i));
}
s.store();
s.commit();
MVMap<Long, byte[]> map = s.openMap("data");
assertTrue("size: " + map.size(), map.sizeAsLong() >= 100);
s.close();
......@@ -77,7 +77,7 @@ public class TestStreamStore extends TestBase {
for (int i = 0; i < 100; i++) {
streamStore.put(new RandomStream(size, -i));
}
s.store();
s.commit();
long readCount = s.getFileStore().getReadCount();
// the read count should be low because new blocks
// are appended at the end (not between existing blocks)
......@@ -92,7 +92,7 @@ public class TestStreamStore extends TestBase {
return new StreamStore(map) {
@Override
protected void onStore(int len) {
if (s.getUnsavedPageCount() > s.getUnsavedPageCountMax() / 2) {
if (s.getUnsavedPageCount() > s.getAutoCommitPageCount() / 2) {
s.commit();
}
}
......@@ -111,12 +111,11 @@ public class TestStreamStore extends TestBase {
@Override
protected void onStore(int len) {
count.incrementAndGet();
s.store();
s.commit();
}
};
long size = 1 * 1024 * 1024;
streamStore.put(new RandomStream(size, 0));
s.store();
s.close();
assertEquals(4, count.get());
}
......
......@@ -86,7 +86,7 @@ public class TestTransactionStore extends TestBase {
for (int i = 0; !stop; i++) {
state.set(i);
other.put(i, value);
store.store();
store.commit();
}
}
};
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论