Unverified 提交 80050699 authored 作者: Andrei Tokar's avatar Andrei Tokar 提交者: GitHub

Merge pull request #1030 from h2database/mv_nb

Make MVMap to operate in non-blocking fashion
......@@ -331,6 +331,11 @@ public class Constants {
// Java 6, 32 bit: 12
public static final int MEMORY_OBJECT = 24;
/**
* The memory needed by an array.
*/
public static final int MEMORY_ARRAY = 24;
/**
* The memory needed by an object of class PageBtree.
*/
......
......@@ -31,6 +31,7 @@ import org.h2.message.DbException;
import org.h2.message.Trace;
import org.h2.message.TraceSystem;
import org.h2.mvstore.db.MVTable;
import org.h2.mvstore.db.MVTableEngine;
import org.h2.mvstore.db.TransactionStore.Change;
import org.h2.mvstore.db.TransactionStore.Transaction;
import org.h2.result.ResultInterface;
......@@ -1681,11 +1682,14 @@ public class Session extends SessionWithState {
*/
public Transaction getTransaction() {
if (transaction == null) {
if (database.getMvStore().getStore().isClosed()) {
database.shutdownImmediately();
throw DbException.get(ErrorCode.DATABASE_IS_CLOSED);
MVTableEngine.Store store = database.getMvStore();
if (store != null) {
if (store.getStore().isClosed()) {
database.shutdownImmediately();
throw DbException.get(ErrorCode.DATABASE_IS_CLOSED);
}
transaction = store.getTransactionStore().begin();
}
transaction = database.getMvStore().getTransactionStore().begin();
startStatement = -1;
}
return transaction;
......@@ -1702,6 +1706,10 @@ public class Session extends SessionWithState {
* Start a new statement within a transaction.
*/
public void startStatementWithinTransaction() {
Transaction transaction = getTransaction();
if(transaction != null) {
transaction.markStatementStart();
}
startStatement = -1;
}
......@@ -1710,6 +1718,9 @@ public class Session extends SessionWithState {
* set, and deletes all temporary files held by the result sets.
*/
public void endStatement() {
if(transaction != null) {
transaction.markStatementEnd();
}
startStatement = -1;
closeTemporaryResults();
}
......
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore;
import java.util.Arrays;
import java.util.Iterator;
/**
* A very simple array list that supports concurrent access.
* Internally, it uses immutable objects.
*
* @param <K> the key type
*/
public class ConcurrentArrayList<K> {
/**
* The array.
*/
@SuppressWarnings("unchecked")
K[] array = (K[]) new Object[0];
/**
* Get the first element, or null if none.
*
* @return the first element
*/
public K peekFirst() {
K[] a = array;
return a.length == 0 ? null : a[0];
}
/**
* Get the last element, or null if none.
*
* @return the last element
*/
public K peekLast() {
K[] a = array;
int len = a.length;
return len == 0 ? null : a[len - 1];
}
/**
* Add an element at the end.
*
* @param obj the element
*/
public synchronized void add(K obj) {
if (obj == null) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL, "adding null value to list");
}
int len = array.length;
array = Arrays.copyOf(array, len + 1);
array[len] = obj;
}
/**
* Remove the first element, if it matches.
*
* @param obj the element to remove
* @return true if the element matched and was removed
*/
public synchronized boolean removeFirst(K obj) {
if (peekFirst() != obj) {
return false;
}
int len = array.length;
@SuppressWarnings("unchecked")
K[] a = (K[]) new Object[len - 1];
System.arraycopy(array, 1, a, 0, len - 1);
array = a;
return true;
}
/**
* Remove the last element, if it matches.
*
* @param obj the element to remove
* @return true if the element matched and was removed
*/
public synchronized boolean removeLast(K obj) {
if (peekLast() != obj) {
return false;
}
array = Arrays.copyOf(array, array.length - 1);
return true;
}
/**
* Get an iterator over all entries.
*
* @return the iterator
*/
public Iterator<K> iterator() {
return new Iterator<K>() {
K[] a = array;
int index;
@Override
public boolean hasNext() {
return index < a.length;
}
@Override
public K next() {
return a[index++];
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException("remove");
}
};
}
}
......@@ -14,41 +14,75 @@ import java.util.Iterator;
* @param <V> the value type
*/
public class Cursor<K, V> implements Iterator<K> {
private final MVMap<K, ?> map;
private final K from;
private CursorPos pos;
private K current, last;
private V currentValue, lastValue;
private final K to;
private CursorPos cursorPos;
private CursorPos keeper;
private K current;
private K last;
private V lastValue;
private Page lastPage;
private final Page root;
private boolean initialized;
Cursor(MVMap<K, ?> map, Page root, K from) {
this.map = map;
this.root = root;
this.from = from;
public Cursor(Page root, K from) {
this(root, from, null);
}
public Cursor(Page root, K from, K to) {
this.cursorPos = traverseDown(root, from);
this.to = to;
}
@Override
@SuppressWarnings("unchecked")
public boolean hasNext() {
if (!initialized) {
min(root, from);
initialized = true;
fetchNext();
if (cursorPos != null) {
while (current == null) {
Page page = cursorPos.page;
int index = cursorPos.index;
if (index >= (page.isLeaf() ? page.getKeyCount() : page.map.getChildPageCount(page))) {
CursorPos tmp = cursorPos;
cursorPos = cursorPos.parent;
tmp.parent = keeper;
keeper = tmp;
if(cursorPos == null)
{
return false;
}
} else {
while (!page.isLeaf()) {
page = page.getChildPage(index);
if (keeper == null) {
cursorPos = new CursorPos(page, 0, cursorPos);
} else {
CursorPos tmp = keeper;
keeper = keeper.parent;
tmp.parent = cursorPos;
tmp.page = page;
tmp.index = 0;
cursorPos = tmp;
}
index = 0;
}
K key = (K) page.getKey(index);
if (to != null && page.map.getKeyType().compare(key, to) > 0) {
return false;
}
current = last = key;
lastValue = (V) page.getValue(index);
lastPage = page;
}
++cursorPos.index;
}
}
return current != null;
}
@Override
public K next() {
hasNext();
K c = current;
last = current;
lastValue = currentValue;
lastPage = pos == null ? null : pos.page;
fetchNext();
return c;
if(!hasNext()) {
return null;
}
current = null;
return last;
}
/**
......@@ -69,6 +103,11 @@ public class Cursor<K, V> implements Iterator<K> {
return lastValue;
}
/**
* Get the page where last retrieved key is located.
*
* @return the page
*/
Page getPage() {
return lastPage;
}
......@@ -80,77 +119,58 @@ public class Cursor<K, V> implements Iterator<K> {
* @param n the number of entries to skip
*/
public void skip(long n) {
if (!hasNext()) {
return;
}
if (n < 10) {
while (n-- > 0) {
fetchNext();
while (n-- > 0 && hasNext()) {
next();
}
return;
} else if(hasNext()) {
assert cursorPos != null;
CursorPos cp = cursorPos;
CursorPos parent;
while ((parent = cp.parent) != null) cp = parent;
Page root = cp.page;
@SuppressWarnings("unchecked")
MVMap<K, ?> map = (MVMap<K, ?>) root.map;
long index = map.getKeyIndex(next());
last = map.getKey(index + n);
this.cursorPos = traverseDown(root, last);
}
long index = map.getKeyIndex(current);
K k = map.getKey(index + n);
pos = null;
min(root, k);
fetchNext();
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException(
"Removing is not supported");
"Removal is not supported");
}
/**
* Fetch the next entry that is equal or larger than the given key, starting
* from the given page. This method retains the stack.
*
* @param p the page to start
* @param from the key to search
* @param p the page to start from
* @param key the key to search, null means search for the first key
*/
private void min(Page p, K from) {
while (true) {
if (p.isLeaf()) {
int x = from == null ? 0 : p.binarySearch(from);
if (x < 0) {
x = -x - 1;
public static CursorPos traverseDown(Page p, Object key) {
CursorPos cursorPos = null;
while (!p.isLeaf()) {
assert p.getKeyCount() > 0;
int index = 0;
if(key != null) {
index = p.binarySearch(key) + 1;
if (index < 0) {
index = -index;
}
pos = new CursorPos(p, x, pos);
break;
}
int x = from == null ? -1 : p.binarySearch(from);
if (x < 0) {
x = -x - 1;
} else {
x++;
}
pos = new CursorPos(p, x + 1, pos);
p = p.getChildPage(x);
cursorPos = new CursorPos(p, index, cursorPos);
p = p.getChildPage(index);
}
}
/**
* Fetch the next entry if there is one.
*/
@SuppressWarnings("unchecked")
private void fetchNext() {
while (pos != null) {
if (pos.index < pos.page.getKeyCount()) {
int index = pos.index++;
current = (K) pos.page.getKey(index);
currentValue = (V) pos.page.getValue(index);
return;
}
pos = pos.parent;
if (pos == null) {
break;
}
if (pos.index < map.getChildPageCount(pos.page)) {
min(pos.page.getChildPage(pos.index++), null);
int index = 0;
if(key != null) {
index = p.binarySearch(key);
if (index < 0) {
index = -index - 1;
}
}
current = null;
return new CursorPos(p, index, cursorPos);
}
}
......@@ -23,7 +23,7 @@ public class CursorPos {
/**
* The position in the parent page, if any.
*/
public final CursorPos parent;
public CursorPos parent;
public CursorPos(Page page, int index, CursorPos parent) {
this.page = page;
......
......@@ -138,16 +138,6 @@ public final class DataUtils {
*/
public static final long COMPRESSED_VAR_LONG_MAX = 0x1ffffffffffffL;
/**
* The estimated number of bytes used per page object.
*/
public static final int PAGE_MEMORY = 128;
/**
* The estimated number of bytes used per child entry.
*/
public static final int PAGE_MEMORY_CHILD = 16;
/**
* The marker size of a very large page.
*/
......@@ -522,6 +512,16 @@ public final class DataUtils {
return ((int) pos) & 1;
}
/**
* Find out if page was saved.
*
* @param pos the position
* @return true if page has been saved
*/
public static boolean isPageSaved(long pos) {
return pos != 0;
}
/**
* Get the position of this page. The following information is encoded in
* the position: the chunk id, the offset, the maximum length, and the type
......@@ -1005,7 +1005,7 @@ public final class DataUtils {
* @return the parsed value
* @throws IllegalStateException if parsing fails
*/
public static int readHexInt(HashMap<String, ?> map, String key, int defaultValue) {
public static int readHexInt(Map<String, ?> map, String key, int defaultValue) {
Object v = map.get(key);
if (v == null) {
return defaultValue;
......
/*
* Copyright 2004-2018 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore;
import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.ObjectDataType;
/**
* A class used for backward compatibility.
*
* @param <K> the key type
* @param <V> the value type
*/
public class MVMapConcurrent<K, V> extends MVMap<K, V> {
public MVMapConcurrent(DataType keyType, DataType valueType) {
super(keyType, valueType);
}
/**
* A builder for this class.
*
* @param <K> the key type
* @param <V> the value type
*/
public static class Builder<K, V> implements
MapBuilder<MVMapConcurrent<K, V>, K, V> {
protected DataType keyType;
protected DataType valueType;
/**
* Create a new builder with the default key and value data types.
*/
public Builder() {
// ignore
}
/**
* Set the key data type.
*
* @param keyType the key type
* @return this
*/
public Builder<K, V> keyType(DataType keyType) {
this.keyType = keyType;
return this;
}
/**
* Set the key data type.
*
* @param valueType the key type
* @return this
*/
public Builder<K, V> valueType(DataType valueType) {
this.valueType = valueType;
return this;
}
@Override
public MVMapConcurrent<K, V> create() {
if (keyType == null) {
keyType = new ObjectDataType();
}
if (valueType == null) {
valueType = new ObjectDataType();
}
return new MVMapConcurrent<>(keyType, valueType);
}
}
}
......@@ -114,18 +114,16 @@ public class MVPrimaryIndex extends BaseIndex {
TransactionMap<Value, Value> map = getMap(session);
Value key = ValueLong.get(row.getKey());
Value old = map.getLatest(key);
if (old != null) {
String sql = "PRIMARY KEY ON " + table.getSQL();
if (mainIndexColumn >= 0 && mainIndexColumn < indexColumns.length) {
sql += "(" + indexColumns[mainIndexColumn].getSQL() + ")";
}
DbException e = DbException.get(ErrorCode.DUPLICATE_KEY_1, sql);
e.setSource(this);
throw e;
}
try {
map.put(key, ValueArray.get(row.getValueList()));
if (map.put(key, ValueArray.get(row.getValueList())) != null) {
String sql = "PRIMARY KEY ON " + table.getSQL();
if (mainIndexColumn >= 0 && mainIndexColumn < indexColumns.length) {
sql += "(" + indexColumns[mainIndexColumn].getSQL() + ")";
}
DbException e = DbException.get(ErrorCode.DUPLICATE_KEY_1, sql);
e.setSource(this);
throw e;
}
} catch (IllegalStateException e) {
throw mvTable.convertException(e);
}
......
......@@ -475,7 +475,7 @@ public class TransactionStore {
t.setStatus(Transaction.STATUS_CLOSED);
openTransactions.clear(t.transactionId);
if (oldStatus == Transaction.STATUS_PREPARED || store.getAutoCommitDelay() == 0) {
store.commit();
store.tryCommit();
return;
}
// to avoid having to store the transaction log,
......@@ -486,7 +486,7 @@ public class TransactionStore {
int max = store.getAutoCommitMemory();
// save at 3/4 capacity
if (unsaved * 4 > max * 3) {
store.commit();
store.tryCommit();
}
}
}
......@@ -681,6 +681,8 @@ public class TransactionStore {
private int status;
private MVStore.TxCounter txCounter;
private String name;
Transaction(TransactionStore store, int transactionId, int status,
......@@ -723,6 +725,19 @@ public class TransactionStore {
return logId;
}
public void markStatementStart() {
markStatementEnd();
txCounter = store.store.registerVersionUsage();
}
public void markStatementEnd() {
MVStore.TxCounter counter = txCounter;
txCounter = null;
if(counter != null) {
store.store.deregisterVersionUsage(counter);
}
}
/**
* Add a log entry.
*
......@@ -1183,16 +1198,6 @@ public class TransactionStore {
return get(key, readLogId);
}
/**
* Get the most recent value for the given key.
*
* @param key the key
* @return the value or null
*/
public V getLatest(K key) {
return get(key, Long.MAX_VALUE);
}
/**
* Whether the map contains the key.
*
......
......@@ -135,7 +135,6 @@ import org.h2.test.store.TestCacheConcurrentLIRS;
import org.h2.test.store.TestCacheLIRS;
import org.h2.test.store.TestCacheLongKeyLIRS;
import org.h2.test.store.TestConcurrent;
import org.h2.test.store.TestConcurrentLinkedList;
import org.h2.test.store.TestDataUtils;
import org.h2.test.store.TestFreeSpace;
import org.h2.test.store.TestKillProcessWhileWriting;
......@@ -882,7 +881,6 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
addTest(new TestCacheConcurrentLIRS());
addTest(new TestCacheLIRS());
addTest(new TestCacheLongKeyLIRS());
addTest(new TestConcurrentLinkedList());
addTest(new TestDataUtils());
addTest(new TestFreeSpace());
addTest(new TestKillProcessWhileWriting());
......
......@@ -77,8 +77,6 @@ public class TestQueryCache extends TestBase {
prep = conn.prepareStatement(query);
} else if (i == 1001) {
first = time;
// try to avoid pauses in subsequent iterations
System.gc();
} else if (i > 1001) {
if (first > time) {
firstGreater++;
......
......@@ -9,7 +9,6 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.atomic.AtomicBoolean;
import org.h2.api.ErrorCode;
import org.h2.test.TestBase;
import org.h2.util.Task;
......@@ -65,33 +64,26 @@ public class TestMvcc2 extends TestBase {
stat2.execute("set lock_timeout 1000");
stat.execute("create table test(id int primary key, name varchar)");
conn.setAutoCommit(false);
final AtomicBoolean committed = new AtomicBoolean(false);
Task t = new Task() {
@Override
public void call() throws SQLException {
public void call() {
try {
//System.out.println("insert2 hallo");
stat2.execute("insert into test values(0, 'Hallo')");
//System.out.println("insert2 hallo done");
fail();
} catch (SQLException e) {
//System.out.println("insert2 hallo e " + e);
if (!committed.get()) {
throw e;
}
assertTrue(e.toString(),
e.getErrorCode() == ErrorCode.DUPLICATE_KEY_1 ||
e.getErrorCode() == ErrorCode.CONCURRENT_UPDATE_1);
}
}
};
//System.out.println("insert hello");
stat.execute("insert into test values(0, 'Hello')");
t.execute();
Thread.sleep(500);
//System.out.println("insert hello commit");
committed.set(true);
conn.commit();
t.get();
ResultSet rs;
rs = stat.executeQuery("select name from test");
rs.next();
assertTrue(rs.next());
assertEquals("Hello", rs.getString(1));
stat.execute("drop table test");
conn2.close();
......@@ -111,16 +103,17 @@ public class TestMvcc2 extends TestBase {
@Override
public void call() throws SQLException {
stat2.execute("update test set name = 'Hallo'");
assertEquals(1, stat2.getUpdateCount());
}
};
stat.execute("update test set name = 'Hi'");
assertEquals(1, stat.getUpdateCount());
t.execute();
Thread.sleep(500);
conn.commit();
t.get();
ResultSet rs;
rs = stat.executeQuery("select name from test");
rs.next();
assertTrue(rs.next());
assertEquals("Hallo", rs.getString(1));
stat.execute("drop table test");
conn2.close();
......
......@@ -7,6 +7,7 @@ package org.h2.test.store;
import java.util.AbstractSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.h2.mvstore.MVMap;
......@@ -25,8 +26,8 @@ public class SequenceMap extends MVMap<Long, Long> {
*/
int max = 10;
public SequenceMap() {
super(null, null);
public SequenceMap(Map<String, Object> config) {
super(config);
}
@Override
......@@ -67,20 +68,11 @@ public class SequenceMap extends MVMap<Long, Long> {
/**
* A builder for this class.
*/
public static class Builder implements MapBuilder<SequenceMap, Long, Long> {
/**
* Create a new builder.
*/
public Builder() {
// ignore
}
public static class Builder extends MVMap.Builder<Long, Long> {
@Override
public SequenceMap create() {
return new SequenceMap();
public SequenceMap create(Map<String, Object> config) {
return new SequenceMap(config);
}
}
}
......@@ -296,7 +296,7 @@ public class TestMVRTree extends TestMVStore {
}
g2d.setColor(Color.red);
ArrayList<SpatialKey> list = New.arrayList();
r.addNodeKeys(list, r.getRoot());
r.addNodeKeys(list, r.getRootPage());
for (SpatialKey x : list) {
int[] rect = scale(b, x, width, height);
g2d.drawRect(rect[0], rect[1], rect[2] - rect[0], rect[3] - rect[1]);
......
......@@ -151,7 +151,7 @@ public class TestStreamStore extends TestBase {
long readCount = s.getFileStore().getReadCount();
// the read count should be low because new blocks
// are appended at the end (not between existing blocks)
assertTrue("rc: " + readCount, readCount < 15);
assertTrue("rc: " + readCount, readCount <= 17);
map = s.openMap("data");
assertTrue("size: " + map.size(), map.sizeAsLong() >= 200);
s.close();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论