提交 90793554 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVTableEngine

上级 7255a5a2
......@@ -131,7 +131,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
long[] counts = { p.getTotalCount(), split.getTotalCount() };
p = Page.create(this, writeVersion, 1,
keys, null, children, childrenPages, counts, totalCount, 0, 0);
store.registerUnsavedPage();
return p;
}
......@@ -654,9 +653,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (index >= 0) {
result = p.getValue(index);
p.remove(index);
if (p.getKeyCount() == 0) {
removePage(p.getPos());
}
}
return result;
}
......
......@@ -1094,9 +1094,11 @@ public class MVStore {
return true;
}
for (MVMap<?, ?> m : maps.values()) {
long v = m.getVersion();
if (v >= 0 && v >= lastStoredVersion) {
return true;
if (!m.isClosed()) {
long v = m.getVersion();
if (v >= 0 && v >= lastStoredVersion) {
return true;
}
}
}
return false;
......@@ -1279,7 +1281,9 @@ public class MVStore {
// we need to keep temporary pages,
// to support reading old versions and rollback
if (pos == 0) {
unsavedPageCount--;
// the value could be smaller than 0 because
// in some cases a page is allocated without a store
unsavedPageCount = Math.max(0, unsavedPageCount - 1);
return;
}
// this could result in a cache miss
......
......@@ -146,6 +146,10 @@ public class Page {
p.totalCount = totalCount;
p.sharedFlags = sharedFlags;
p.memory = memory == 0 ? p.calculateMemory() : memory;
MVStore store = map.store;
if (store != null) {
store.registerUnsavedPage();
}
return p;
}
......@@ -274,7 +278,6 @@ public class Page {
counts, totalCount,
SHARED_KEYS | SHARED_VALUES | SHARED_CHILDREN | SHARED_COUNTS,
memory);
map.getStore().registerUnsavedPage();
newPage.cachedCompare = cachedCompare;
return newPage;
}
......@@ -357,7 +360,6 @@ public class Page {
Page newPage = create(map, version, b,
bKeys, bValues, null, null, null,
bKeys.length, 0, 0);
map.getStore().registerUnsavedPage();
memory = calculateMemory();
newPage.memory = newPage.calculateMemory();
return newPage;
......@@ -404,7 +406,6 @@ public class Page {
Page newPage = create(map, version, b - 1,
bKeys, null, bChildren, bChildrenPages,
bCounts, t, 0, 0);
map.getStore().registerUnsavedPage();
memory = calculateMemory();
newPage.memory = newPage.calculateMemory();
return newPage;
......
......@@ -302,7 +302,11 @@ public class MVTable extends TableBase {
public boolean isLockedExclusively() {
return lockExclusive != null;
}
public boolean isLockedExclusivelyBy(Session session) {
return lockExclusive == session;
}
public void unlock(Session s) {
if (database != null) {
traceLock(s, lockExclusive == s, "unlock");
......@@ -551,8 +555,7 @@ public class MVTable extends TableBase {
nextAnalyze = n;
}
int rows = session.getDatabase().getSettings().analyzeSample;
int test;
// Analyze.analyzeTable(session, this, rows, false);
Analyze.analyzeTable(session, this, rows, false);
}
@Override
......@@ -667,10 +670,6 @@ int test;
return session.getTransaction(store);
}
public TransactionStore getStore() {
return store;
}
public Column getRowIdColumn() {
if (rowIdColumn == null) {
rowIdColumn = new Column(Column.ROWID, Value.LONG);
......
......@@ -90,9 +90,13 @@ public class TransactionStore {
new MVMap.Builder<Long, Object[]>());
// commit could be faster if we have one undo log per transaction,
// or a range delete operation for maps
ArrayType oldValueType = new ArrayType(new DataType[]{
new ObjectDataType(), new ObjectDataType(),
keyType
});
ArrayType valueType = new ArrayType(new DataType[]{
new ObjectDataType(), new ObjectDataType(), keyType,
new ObjectDataType()
oldValueType
});
MVMap.Builder<long[], Object[]> builder =
new MVMap.Builder<long[], Object[]>().
......@@ -514,18 +518,18 @@ public class TransactionStore {
*/
public static class TransactionMap<K, V> {
private Transaction transaction;
private final int mapId;
/**
* The map used for writing (the latest version).
* <p>
* Key: key the key of the data.
* Value: { transactionId, oldVersion, value }
*/
private final MVMap<K, Object[]> map;
final MVMap<K, Object[]> map;
private Transaction transaction;
private final int mapId;
/**
* If a record was read that was updated by this transaction, and the
* update occurred before this log id, the older version is read. This
......@@ -647,7 +651,7 @@ public class TransactionStore {
if (t > timeout) {
throw DataUtils.newIllegalStateException("Lock timeout");
}
// TODO use wait/notify instead
// TODO use wait/notify instead, or remove the feature
try {
Thread.sleep(1);
} catch (InterruptedException e) {
......@@ -908,9 +912,43 @@ public class TransactionStore {
* @param from the first key to return
* @return the iterator
*/
public Iterator<K> keyIterator(K from) {
// TODO transactional keyIterator
return map.keyIterator(from);
public Iterator<K> keyIterator(final K from) {
return new Iterator<K>() {
private final Cursor<K> cursor = map.keyIterator(from);
private K current;
{
fetchNext();
}
private void fetchNext() {
while (cursor.hasNext()) {
current = cursor.next();
if (containsKey(current)) {
return;
}
}
current = null;
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public K next() {
K result = current;
fetchNext();
return result;
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException(
"Removing is not supported");
}
};
}
/**
......@@ -974,7 +1012,10 @@ public class TransactionStore {
int size = 0;
for (int i = 0; i < arrayLength; i++) {
DataType t = elementTypes[i];
size += t.getMemory(array[i]);
Object o = array[i];
if (o != null) {
size += t.getMemory(o);
}
}
return size;
}
......@@ -1001,7 +1042,13 @@ public class TransactionStore {
Object[] array = (Object[]) obj;
for (int i = 0; i < arrayLength; i++) {
DataType t = elementTypes[i];
t.write(buff, array[i]);
Object o = array[i];
if (o == null) {
buff.put((byte) 0);
} else {
buff.put((byte) 1);
t.write(buff, o);
}
}
return buff;
}
......@@ -1011,7 +1058,9 @@ public class TransactionStore {
Object[] array = new Object[arrayLength];
for (int i = 0; i < arrayLength; i++) {
DataType t = elementTypes[i];
array[i] = t.read(buff);
if (buff.get() == 1) {
array[i] = t.read(buff);
}
}
return array;
}
......
......@@ -352,7 +352,7 @@ public class TestMVStore extends TestBase {
s.store();
s.close();
int[] expectedReadsForCacheSize = {
3407, 2590, 1924, 1440, 1106, 956, 918
3406, 2590, 1924, 1440, 1108, 956, 918
};
for (int cacheSize = 0; cacheSize <= 6; cacheSize += 4) {
s = new MVStore.Builder().
......@@ -829,7 +829,7 @@ public class TestMVStore extends TestBase {
assertEquals(i + 1, m.size());
}
assertEquals(1000, m.size());
assertEquals(284, s.getUnsavedPageCount());
assertEquals(286, s.getUnsavedPageCount());
s.store();
assertEquals(2, s.getFileWriteCount());
s.close();
......
......@@ -36,6 +36,7 @@ public class TestMVTableEngine extends TestBase {
public void test() throws Exception {
// testSpeed();
testExclusiveLock();
testEncryption();
testReadOnly();
testReuseDiskSpace();
......@@ -107,16 +108,16 @@ int tes;
// 25172 mvstore;DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine;LOCK_MODE=0
prep.setString(2, new String(new char[10]).replace((char) 0, 'x'));
for (int i = 0; i < 8000000; i++) {
for (int i = 0; i < 800000; i++) {
prep.setInt(1, i);
prep.execute();
}
System.out.println((System.currentTimeMillis() - time) + " " + dbName);
System.out.println((System.currentTimeMillis() - time) + " " + dbName + " before");
//Profiler prof = new Profiler().startCollecting();
conn.close();
//System.out.println(prof.getTop(10));
System.out.println((System.currentTimeMillis() - time) + " " + dbName);
System.out.println((System.currentTimeMillis() - time) + " " + dbName + " after");
}
private void testEncryption() throws Exception {
......@@ -138,6 +139,30 @@ int tes;
conn.close();
FileUtils.deleteRecursive(getBaseDir(), true);
}
private void testExclusiveLock() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true);
String dbName = "mvstore" +
";DEFAULT_TABLE_ENGINE=org.h2.mvstore.db.MVTableEngine";
Connection conn, conn2;
Statement stat, stat2;
conn = getConnection(dbName);
stat = conn.createStatement();
stat.execute("create table test(id int)");
stat.execute("insert into test values(1)");
conn.setAutoCommit(false);
// stat.execute("update test set id = 2");
stat.executeQuery("select * from test for update");
conn2 = getConnection(dbName);
stat2 = conn2.createStatement();
ResultSet rs2 = stat2.executeQuery("select * from information_schema.locks");
assertTrue(rs2.next());
assertEquals("TEST", rs2.getString("table_name"));
assertEquals("WRITE", rs2.getString("lock_type"));
conn2.close();
conn.close();
FileUtils.deleteRecursive(getBaseDir(), true);
}
private void testReadOnly() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true);
......
......@@ -12,6 +12,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
......@@ -40,6 +41,7 @@ public class TestTransactionStore extends TestBase {
public void test() throws Exception {
FileUtils.createDirectories(getBaseDir());
testKeyIterator();
testMultiStatement();
testTwoPhaseCommit();
testSavepoint();
......@@ -48,6 +50,49 @@ public class TestTransactionStore extends TestBase {
testCompareWithPostgreSQL();
}
private void testKeyIterator() {
MVStore s = MVStore.open(null);
TransactionStore ts = new TransactionStore(s);
Transaction tx, tx2;
TransactionMap<String, String> m, m2;
Iterator<String> it, it2;
tx = ts.begin();
m = tx.openMap("test");
m.put("1", "Hello");
m.put("2", "World");
m.put("3", ".");
tx.commit();
tx2 = ts.begin();
m2 = tx2.openMap("test");
m2.remove("2");
m2.put("3", "!");
m2.put("4", "?");
tx = ts.begin();
m = tx.openMap("test");
it = m.keyIterator(null);
assertTrue(it.hasNext());
assertEquals("1", it.next());
assertTrue(it.hasNext());
assertEquals("2", it.next());
assertTrue(it.hasNext());
assertEquals("3", it.next());
assertFalse(it.hasNext());
it2 = m2.keyIterator(null);
assertTrue(it2.hasNext());
assertEquals("1", it2.next());
assertTrue(it2.hasNext());
assertEquals("3", it2.next());
assertTrue(it2.hasNext());
assertEquals("4", it2.next());
assertFalse(it2.hasNext());
s.close();
}
/**
* Tests behavior when used for a sequence of SQL statements. Each statement
* uses a savepoint. Within a statement, changes by the statement itself are
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论