提交 203f2a1c authored 作者: Thomas Mueller's avatar Thomas Mueller

MVTableEngine

上级 36f3c9ef
...@@ -79,9 +79,12 @@ public class MVSecondaryIndex extends BaseIndex { ...@@ -79,9 +79,12 @@ public class MVSecondaryIndex extends BaseIndex {
public void add(Session session, Row row) { public void add(Session session, Row row) {
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, Value> map = getMap(session);
ValueArray array = getKey(row); ValueArray array = getKey(row);
ValueArray unique = null;
if (indexType.isUnique()) { if (indexType.isUnique()) {
array.getList()[keyColumns - 1] = ValueLong.get(Long.MIN_VALUE); // this will detect committed entries only
ValueArray key = (ValueArray) map.getLatestCeilingKey(array); unique = getKey(row);
unique.getList()[keyColumns - 1] = ValueLong.get(Long.MIN_VALUE);
ValueArray key = (ValueArray) map.getLatestCeilingKey(unique);
if (key != null) { if (key != null) {
SearchRow r2 = getRow(key.getList()); SearchRow r2 = getRow(key.getList());
if (compareRows(row, r2) == 0) { if (compareRows(row, r2) == 0) {
...@@ -91,12 +94,35 @@ public class MVSecondaryIndex extends BaseIndex { ...@@ -91,12 +94,35 @@ public class MVSecondaryIndex extends BaseIndex {
} }
} }
} }
array.getList()[keyColumns - 1] = ValueLong.get(row.getKey());
try { try {
map.put(array, ValueLong.get(0)); map.put(array, ValueLong.get(0));
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName()); throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
} }
if (indexType.isUnique()) {
// check if there is another (uncommitted) entry
Iterator<Value> it = map.keyIterator(unique, true);
while (it.hasNext()) {
ValueArray k = (ValueArray) it.next();
SearchRow r2 = getRow(k.getList());
if (compareRows(row, r2) != 0) {
break;
}
if (containsNullAndAllowMultipleNull(r2)) {
// this is allowed
continue;
}
if (map.isSameTransaction(k)) {
continue;
}
map.remove(array);
if (map.get(k) != null) {
// committed
throw getDuplicateKeyException(k.toString());
}
throw DbException.get(ErrorCode.CONCURRENT_UPDATE_1, table.getName());
}
}
} }
@Override @Override
......
...@@ -700,6 +700,10 @@ public class MVTable extends TableBase { ...@@ -700,6 +700,10 @@ public class MVTable extends TableBase {
return true; return true;
} }
/**
* Mark the transaction as committed, so that the modification counter of
* the database is incremented.
*/
public void commit() { public void commit() {
if (database != null) { if (database != null) {
lastModificationId = database.getNextModificationDataId(); lastModificationId = database.getNextModificationDataId();
......
...@@ -8,6 +8,7 @@ package org.h2.mvstore.db; ...@@ -8,6 +8,7 @@ package org.h2.mvstore.db;
import java.io.InputStream; import java.io.InputStream;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
...@@ -233,7 +234,11 @@ public class MVTableEngine implements TableEngine { ...@@ -233,7 +234,11 @@ public class MVTableEngine implements TableEngine {
} }
public InputStream getInputStream() { public InputStream getInputStream() {
return new FileChannelInputStream(store.getFileStore().getFile(), false); FileChannel fc = store.getFileStore().getEncryptedFile();
if (fc == null) {
fc = store.getFileStore().getFile();
}
return new FileChannelInputStream(fc, false);
} }
/** /**
......
...@@ -1032,6 +1032,22 @@ public class TransactionStore { ...@@ -1032,6 +1032,22 @@ public class TransactionStore {
return data == null ? null : (V) data.value; return data == null ? null : (V) data.value;
} }
/**
* Whether the entry for this key was added or removed from this session.
*
* @param key the key
* @return true if yes
*/
public boolean isSameTransaction(K key) {
VersionedValue data = map.get(key);
if (data == null) {
// doesn't exist or deleted by a committed transaction
return false;
}
long tx = data.transactionId;
return tx == transaction.transactionId;
}
private VersionedValue getValue(K key, long maxLog) { private VersionedValue getValue(K key, long maxLog) {
VersionedValue data = map.get(key); VersionedValue data = map.get(key);
while (true) { while (true) {
...@@ -1182,7 +1198,7 @@ public class TransactionStore { ...@@ -1182,7 +1198,7 @@ public class TransactionStore {
* @return the result * @return the result
*/ */
public K lowerKey(K key) { public K lowerKey(K key) {
// TODO Auto-generated method stub // TODO transactional lowerKey
return map.lowerKey(key); return map.lowerKey(key);
} }
...@@ -1193,6 +1209,17 @@ public class TransactionStore { ...@@ -1193,6 +1209,17 @@ public class TransactionStore {
* @return the iterator * @return the iterator
*/ */
public Iterator<K> keyIterator(final K from) { public Iterator<K> keyIterator(final K from) {
return keyIterator(from, false);
}
/**
* Iterate over all keys.
*
* @param from the first key to return
* @param includeUncommitted whether uncommitted entries should be included
* @return the iterator
*/
public Iterator<K> keyIterator(final K from, final boolean includeUncommitted) {
return new Iterator<K>() { return new Iterator<K>() {
private final Cursor<K> cursor = map.keyIterator(from); private final Cursor<K> cursor = map.keyIterator(from);
private K current; private K current;
...@@ -1204,6 +1231,9 @@ public class TransactionStore { ...@@ -1204,6 +1231,9 @@ public class TransactionStore {
private void fetchNext() { private void fetchNext() {
while (cursor.hasNext()) { while (cursor.hasNext()) {
current = cursor.next(); current = cursor.next();
if (includeUncommitted) {
return;
}
if (containsKey(current)) { if (containsKey(current)) {
return; return;
} }
......
...@@ -43,7 +43,8 @@ public class TestStreamStore extends TestBase { ...@@ -43,7 +43,8 @@ public class TestStreamStore extends TestBase {
FileUtils.deleteRecursive(getBaseDir(), true); FileUtils.deleteRecursive(getBaseDir(), true);
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
testVeryLarge(); testReadCount();
testLarge();
testDetectIllegalId(); testDetectIllegalId();
testTreeStructure(); testTreeStructure();
testFormat(); testFormat();
...@@ -52,7 +53,53 @@ public class TestStreamStore extends TestBase { ...@@ -52,7 +53,53 @@ public class TestStreamStore extends TestBase {
testLoop(); testLoop();
} }
private void testVeryLarge() throws IOException { private void testReadCount() throws IOException {
String fileName = getBaseDir() + "/testReadCount.h3";
FileUtils.delete(fileName);
MVStore s = new MVStore.Builder().
fileName(fileName).
open();
s.setCacheSize(1);
StreamStore streamStore = getAutoCommitStreamStore(s);
long size = s.getPageSplitSize() * 2;
for (int i = 0; i < 100; i++) {
streamStore.put(new RandomStream(size, i));
}
s.store();
MVMap<Long, byte[]> map = s.openMap("data");
assertTrue("size: " + map.size(), map.sizeAsLong() >= 100);
s.close();
s = new MVStore.Builder().
fileName(fileName).
open();
streamStore = getAutoCommitStreamStore(s);
for (int i = 0; i < 100; i++) {
streamStore.put(new RandomStream(size, -i));
}
s.store();
long readCount = s.getFileStore().getReadCount();
// the read count should be low because new blocks
// are appended at the end (not between existing blocks)
assertTrue("rc: " + readCount, readCount < 10);
map = s.openMap("data");
assertTrue("size: " + map.size(), map.sizeAsLong() >= 200);
s.close();
}
private static StreamStore getAutoCommitStreamStore(final MVStore s) {
MVMap<Long, byte[]> map = s.openMap("data");
return new StreamStore(map) {
@Override
protected void onStore(int len) {
if (s.getUnsavedPageCount() > s.getUnsavedPageCountMax() / 2) {
s.commit();
}
}
};
}
private void testLarge() throws IOException {
String fileName = getBaseDir() + "/testVeryLarge.h3"; String fileName = getBaseDir() + "/testVeryLarge.h3";
FileUtils.delete(fileName); FileUtils.delete(fileName);
final MVStore s = new MVStore.Builder(). final MVStore s = new MVStore.Builder().
...@@ -101,7 +148,7 @@ public class TestStreamStore extends TestBase { ...@@ -101,7 +148,7 @@ public class TestStreamStore extends TestBase {
} }
len = (int) Math.min(size - pos, len); len = (int) Math.min(size - pos, len);
int x = seed, end = off + len; int x = seed, end = off + len;
// a very simple pseudo-random number generator // a fast and very simple pseudo-random number generator
// with a period length of 4 GB // with a period length of 4 GB
// also good: x * 9 + 1, shift 6; x * 11 + 1, shift 7 // also good: x * 9 + 1, shift 6; x * 11 + 1, shift 7
while (off < end) { while (off < end) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论