提交 362df6d9 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVTableEngine improvements, formatting

上级 33fbe34f
...@@ -74,9 +74,9 @@ public class DataUtils { ...@@ -74,9 +74,9 @@ public class DataUtils {
public static final int ERROR_TRANSACTION_CORRUPT = 100; public static final int ERROR_TRANSACTION_CORRUPT = 100;
/** /**
* A lock timeout occurred. * An entry is still locked by another transaction.
*/ */
public static final int ERROR_TRANSACTION_LOCK_TIMEOUT = 101; public static final int ERROR_TRANSACTION_LOCKED = 101;
/** /**
* A very old transaction is still open. * A very old transaction is still open.
......
...@@ -50,7 +50,6 @@ public class MVTable extends TableBase { ...@@ -50,7 +50,6 @@ public class MVTable extends TableBase {
private MVPrimaryIndex primaryIndex; private MVPrimaryIndex primaryIndex;
private ArrayList<Index> indexes = New.arrayList(); private ArrayList<Index> indexes = New.arrayList();
private long lastModificationId; private long lastModificationId;
private long rowCount;
private volatile Session lockExclusive; private volatile Session lockExclusive;
private HashSet<Session> lockShared = New.hashSet(); private HashSet<Session> lockShared = New.hashSet();
private final Trace traceLock; private final Trace traceLock;
...@@ -92,7 +91,6 @@ public class MVTable extends TableBase { ...@@ -92,7 +91,6 @@ public class MVTable extends TableBase {
IndexColumn.wrap(getColumns()), IndexColumn.wrap(getColumns()),
IndexType.createScan(true) IndexType.createScan(true)
); );
rowCount = primaryIndex.getRowCount(session);
indexes.add(primaryIndex); indexes.add(primaryIndex);
} }
...@@ -406,14 +404,14 @@ public class MVTable extends TableBase { ...@@ -406,14 +404,14 @@ public class MVTable extends TableBase {
this, indexId, this, indexId,
indexName, cols, indexType); indexName, cols, indexType);
} }
if (index.needRebuild() && rowCount > 0) { if (index.needRebuild()) {
try { try {
Index scan = getScanIndex(session); Index scan = getScanIndex(session);
long remaining = scan.getRowCount(session); long remaining = scan.getRowCount(session);
long total = remaining; long total = remaining;
Cursor cursor = scan.find(session, null, null); Cursor cursor = scan.find(session, null, null);
long i = 0; long i = 0;
int bufferSize = (int) Math.min(rowCount, Constants.DEFAULT_MAX_MEMORY_ROWS); int bufferSize = (int) Math.min(total, Constants.DEFAULT_MAX_MEMORY_ROWS);
ArrayList<Row> buffer = New.arrayList(bufferSize); ArrayList<Row> buffer = New.arrayList(bufferSize);
String n = getName() + ":" + index.getName(); String n = getName() + ":" + index.getName();
int t = MathUtils.convertLongToInt(total); int t = MathUtils.convertLongToInt(total);
...@@ -506,7 +504,6 @@ public class MVTable extends TableBase { ...@@ -506,7 +504,6 @@ public class MVTable extends TableBase {
Index index = indexes.get(i); Index index = indexes.get(i);
index.remove(session, row); index.remove(session, row);
} }
rowCount--;
} catch (Throwable e) { } catch (Throwable e) {
t.rollbackToSavepoint(savepoint); t.rollbackToSavepoint(savepoint);
throw DbException.convert(e); throw DbException.convert(e);
...@@ -521,7 +518,6 @@ public class MVTable extends TableBase { ...@@ -521,7 +518,6 @@ public class MVTable extends TableBase {
Index index = indexes.get(i); Index index = indexes.get(i);
index.truncate(session); index.truncate(session);
} }
rowCount = 0;
changesSinceAnalyze = 0; changesSinceAnalyze = 0;
} }
...@@ -535,7 +531,6 @@ public class MVTable extends TableBase { ...@@ -535,7 +531,6 @@ public class MVTable extends TableBase {
Index index = indexes.get(i); Index index = indexes.get(i);
index.add(session, row); index.add(session, row);
} }
rowCount++;
} catch (Throwable e) { } catch (Throwable e) {
t.rollbackToSavepoint(savepoint); t.rollbackToSavepoint(savepoint);
DbException de = DbException.convert(e); DbException de = DbException.convert(e);
......
...@@ -12,6 +12,7 @@ import java.nio.channels.FileChannel; ...@@ -12,6 +12,7 @@ import java.nio.channels.FileChannel;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.h2.api.TableEngine; import org.h2.api.TableEngine;
...@@ -140,6 +141,8 @@ public class MVTableEngine implements TableEngine { ...@@ -140,6 +141,8 @@ public class MVTableEngine implements TableEngine {
*/ */
private final TransactionStore transactionStore; private final TransactionStore transactionStore;
private long statisticsStart;
public Store(Database db, MVStore store) { public Store(Database db, MVStore store) {
this.db = db; this.db = db;
this.store = store; this.store = store;
...@@ -294,6 +297,22 @@ public class MVTableEngine implements TableEngine { ...@@ -294,6 +297,22 @@ public class MVTableEngine implements TableEngine {
} }
} }
/**
* Start collecting statistics.
*/
public void statisticsStart() {
FileStore fs = store.getFileStore();
statisticsStart = fs == null ? 0 : fs.getReadCount();
}
public Map<String, Integer> statisticsEnd() {
HashMap<String, Integer> map = New.hashMap();
FileStore fs = store.getFileStore();
int reads = fs == null ? 0 : (int) (fs.getReadCount() - statisticsStart);
map.put("reads", reads);
return map;
}
} }
/** /**
......
...@@ -57,11 +57,6 @@ public class TransactionStore { ...@@ -57,11 +57,6 @@ public class TransactionStore {
*/ */
final MVMap<Long, Object[]> undoLog; final MVMap<Long, Object[]> undoLog;
/**
* The lock timeout in milliseconds. 0 means timeout immediately.
*/
long lockTimeout;
/** /**
* The map of maps. * The map of maps.
*/ */
...@@ -100,7 +95,6 @@ public class TransactionStore { ...@@ -100,7 +95,6 @@ public class TransactionStore {
MVMap.Builder<Long, Object[]> builder = MVMap.Builder<Long, Object[]> builder =
new MVMap.Builder<Long, Object[]>(). new MVMap.Builder<Long, Object[]>().
valueType(undoLogValueType); valueType(undoLogValueType);
// TODO escape other map names, to avoid conflicts
undoLog = store.openMap("undoLog", builder); undoLog = store.openMap("undoLog", builder);
init(); init();
} }
...@@ -289,7 +283,7 @@ public class TransactionStore { ...@@ -289,7 +283,7 @@ public class TransactionStore {
if (store.isClosed()) { if (store.isClosed()) {
return; return;
} }
// TODO could synchronize on blocks // TODO could synchronize on blocks (100 at a time or so)
synchronized (undoLog) { synchronized (undoLog) {
t.setStatus(Transaction.STATUS_COMMITTING); t.setStatus(Transaction.STATUS_COMMITTING);
for (long logId = 0; logId < maxLogId; logId++) { for (long logId = 0; logId < maxLogId; logId++) {
...@@ -427,7 +421,7 @@ public class TransactionStore { ...@@ -427,7 +421,7 @@ public class TransactionStore {
* @param toLogId the log id to roll back to * @param toLogId the log id to roll back to
*/ */
void rollbackTo(Transaction t, long maxLogId, long toLogId) { void rollbackTo(Transaction t, long maxLogId, long toLogId) {
// TODO could synchronize on blocks // TODO could synchronize on blocks (100 at a time or so)
synchronized (undoLog) { synchronized (undoLog) {
for (long logId = maxLogId - 1; logId >= toLogId; logId--) { for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
Long undoKey = getOperationId(t.getId(), logId); Long undoKey = getOperationId(t.getId(), logId);
...@@ -901,36 +895,13 @@ public class TransactionStore { ...@@ -901,36 +895,13 @@ public class TransactionStore {
private V set(K key, V value) { private V set(K key, V value) {
transaction.checkNotClosed(); transaction.checkNotClosed();
long start = 0;
while (true) {
V old = get(key); V old = get(key);
boolean ok = trySet(key, value, false); boolean ok = trySet(key, value, false);
if (ok) { if (ok) {
return old; return old;
} }
// an uncommitted transaction:
// wait until it is committed, or until the lock timeout
long timeout = transaction.store.lockTimeout;
if (timeout == 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_LOCK_TIMEOUT, "Lock timeout");
}
if (start == 0) {
start = System.currentTimeMillis();
} else {
long t = System.currentTimeMillis() - start;
if (t > timeout) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_LOCK_TIMEOUT, "Lock timeout"); DataUtils.ERROR_TRANSACTION_LOCKED, "Entry is locked");
}
// TODO use wait/notify instead, or remove the feature
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// ignore
}
}
}
} }
/** /**
...@@ -1176,7 +1147,7 @@ public class TransactionStore { ...@@ -1176,7 +1147,7 @@ public class TransactionStore {
* Clear the map. * Clear the map.
*/ */
public void clear() { public void clear() {
// TODO truncate transactionally // TODO truncate transactionally?
map.clear(); map.clear();
} }
...@@ -1225,24 +1196,6 @@ public class TransactionStore { ...@@ -1225,24 +1196,6 @@ public class TransactionStore {
return null; return null;
} }
/**
* Get the smallest key that is larger or equal to this key.
*
* @param key the key (may not be null)
* @return the result
*/
public K ceilingKey(K key) {
// TODO this method is slow
Iterator<K> cursor = map.keyIterator(key);
while (cursor.hasNext()) {
key = cursor.next();
if (get(key) != null) {
return key;
}
}
return null;
}
/** /**
* Get the smallest key that is larger than the given key, or null if no * Get the smallest key that is larger than the given key, or null if no
* such key exists. * such key exists.
...@@ -1251,8 +1204,13 @@ public class TransactionStore { ...@@ -1251,8 +1204,13 @@ public class TransactionStore {
* @return the result * @return the result
*/ */
public K higherKey(K key) { public K higherKey(K key) {
// TODO transactional higherKey while (true) {
return map.higherKey(key); K k = map.higherKey(key);
if (k == null || get(k) != null) {
return k;
}
key = k;
}
} }
/** /**
...@@ -1263,8 +1221,13 @@ public class TransactionStore { ...@@ -1263,8 +1221,13 @@ public class TransactionStore {
* @return the result * @return the result
*/ */
public K lowerKey(K key) { public K lowerKey(K key) {
// TODO transactional lowerKey while (true) {
return map.lowerKey(key); K k = map.lowerKey(key);
if (k == null || get(k) != null) {
return k;
}
key = k;
}
} }
/** /**
...@@ -1340,56 +1303,6 @@ public class TransactionStore { ...@@ -1340,56 +1303,6 @@ public class TransactionStore {
} }
/**
* Iterate over keys.
*
* @param cursor the cursor to wrap
* @param includeUncommitted whether uncommitted entries should be included
* @return the iterator
*/
public Iterator<K> wrapCursor(final Cursor<K, VersionedValue> cursor, final boolean includeUncommitted) {
return new Iterator<K>() {
private K current;
{
fetchNext();
}
private void fetchNext() {
while (cursor.hasNext()) {
current = cursor.next();
if (includeUncommitted) {
return;
}
VersionedValue data = cursor.getValue();
data = getValue(current, readLogId, data);
if (data != null && data.value != null) {
return;
}
}
current = null;
}
@Override
public boolean hasNext() {
return current != null;
}
@Override
public K next() {
K result = current;
fetchNext();
return result;
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException(
"Removing is not supported");
}
};
}
/** /**
* Iterate over keys. * Iterate over keys.
* *
...@@ -1398,7 +1311,7 @@ public class TransactionStore { ...@@ -1398,7 +1311,7 @@ public class TransactionStore {
* @return the iterator * @return the iterator
*/ */
public Iterator<K> wrapIterator(final Iterator<K> iterator, final boolean includeUncommitted) { public Iterator<K> wrapIterator(final Iterator<K> iterator, final boolean includeUncommitted) {
// TODO duplicate code for wrapCursor and wrapIterator // TODO duplicate code for wrapIterator and entryIterator
return new Iterator<K>() { return new Iterator<K>() {
private K current; private K current;
......
...@@ -235,7 +235,7 @@ java org.h2.test.TestAll timer ...@@ -235,7 +235,7 @@ java org.h2.test.TestAll timer
*/ */
; ;
private static final boolean MV_STORE = false; private static final boolean MV_STORE = true;
/** /**
* If the test should run with many rows. * If the test should run with many rows.
......
...@@ -47,6 +47,10 @@ public class TestMVTableEngine extends TestBase { ...@@ -47,6 +47,10 @@ public class TestMVTableEngine extends TestBase {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
testCount();
testMinMaxWithNull();
testTimeout();
testExplainAnalyze();
testTransactionLogUsuallyNotStored(); testTransactionLogUsuallyNotStored();
testShrinkDatabaseFile(); testShrinkDatabaseFile();
testTwoPhaseCommit(); testTwoPhaseCommit();
...@@ -68,6 +72,88 @@ public class TestMVTableEngine extends TestBase { ...@@ -68,6 +72,88 @@ public class TestMVTableEngine extends TestBase {
testSimple(); testSimple();
} }
private void testCount() throws Exception {
;
// Test that count(*) is fast even if the map is very large
}
private void testMinMaxWithNull() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn;
Connection conn2;
Statement stat;
Statement stat2;
String url = "mvstore;MV_STORE=TRUE;MVCC=TRUE";
url = getURL(url, true);
conn = getConnection(url);
stat = conn.createStatement();
stat.execute("create table test(data int)");
stat.execute("create index on test(data)");
stat.execute("insert into test values(null), (2)");
conn2 = getConnection(url);
stat2 = conn2.createStatement();
conn.setAutoCommit(false);
conn2.setAutoCommit(false);
stat.execute("insert into test values(1)");
ResultSet rs;
rs = stat.executeQuery("select min(data) from test");
rs.next();
assertEquals(1, rs.getInt(1));
rs = stat2.executeQuery("select min(data) from test");
rs.next();
// not yet committed
assertEquals(2, rs.getInt(1));
conn2.close();
conn.close();
}
private void testTimeout() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn;
Connection conn2;
Statement stat;
Statement stat2;
String url = "mvstore;MV_STORE=TRUE;MVCC=TRUE";
url = getURL(url, true);
conn = getConnection(url);
stat = conn.createStatement();
stat.execute("create table test(id identity, name varchar)");
conn2 = getConnection(url);
stat2 = conn2.createStatement();
conn.setAutoCommit(false);
conn2.setAutoCommit(false);
stat.execute("insert into test values(1, 'Hello')");
assertThrows(ErrorCode.LOCK_TIMEOUT_1, stat2).
execute("insert into test values(1, 'Hello')");
conn2.close();
conn.close();
}
private void testExplainAnalyze() throws Exception {
if (config.memory) {
return;
}
FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn;
Statement stat;
String url = "mvstore;MV_STORE=TRUE";
url = getURL(url, true);
conn = getConnection(url);
stat = conn.createStatement();
stat.execute("create table test(id identity, name varchar) as " +
"select x, space(1000) from system_range(1, 1000)");
ResultSet rs;
conn.close();
conn = getConnection(url);
stat = conn.createStatement();
rs = stat.executeQuery("explain analyze select * from test");
rs.next();
String plan = rs.getString(1);
// expect about 249 reads
assertTrue(plan, plan.indexOf("reads: 2") >= 0);
conn.close();
}
private void testTransactionLogUsuallyNotStored() throws Exception { private void testTransactionLogUsuallyNotStored() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true); FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn; Connection conn;
......
...@@ -17,6 +17,7 @@ import java.util.List; ...@@ -17,6 +17,7 @@ import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore; import org.h2.mvstore.MVStore;
import org.h2.mvstore.db.TransactionStore; import org.h2.mvstore.db.TransactionStore;
...@@ -45,6 +46,7 @@ public class TestTransactionStore extends TestBase { ...@@ -45,6 +46,7 @@ public class TestTransactionStore extends TestBase {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
testConcurrentUpdate();
testRepeatedChange(); testRepeatedChange();
testTransactionAge(); testTransactionAge();
testStopWhileCommitting(); testStopWhileCommitting();
...@@ -58,6 +60,33 @@ public class TestTransactionStore extends TestBase { ...@@ -58,6 +60,33 @@ public class TestTransactionStore extends TestBase {
testCompareWithPostgreSQL(); testCompareWithPostgreSQL();
} }
private void testConcurrentUpdate() {
MVStore s;
TransactionStore ts;
s = MVStore.open(null);
ts = new TransactionStore(s);
Transaction tx1 = ts.begin();
TransactionMap<Integer, Integer> map1 = tx1.openMap("data");
map1.put(1, 10);
Transaction tx2 = ts.begin();
TransactionMap<Integer, Integer> map2 = tx2.openMap("data");
try {
map2.put(1, 20);
fail();
} catch (IllegalStateException e) {
assertEquals(DataUtils.ERROR_TRANSACTION_LOCKED,
DataUtils.getErrorCode(e.getMessage()));
}
assertEquals(10, map1.get(1).intValue());
assertNull(map2.get(1));
tx1.commit();
assertEquals(10, map2.get(1).intValue());
s.close();
}
private void testRepeatedChange() { private void testRepeatedChange() {
MVStore s; MVStore s;
TransactionStore ts; TransactionStore ts;
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论