Unverified 提交 8c10350d authored 作者: Andrei Tokar's avatar Andrei Tokar 提交者: GitHub

Merge pull request #1250 from h2database/batch-append

Batch append mode for MVMap
......@@ -8,6 +8,7 @@ package org.h2.mvstore;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
......@@ -45,6 +46,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private final long createVersion;
private final DataType keyType;
private final DataType valueType;
private final int keysPerPage;
private final boolean singleWriter;
private final K keysBuffer[];
private final V valuesBuffer[];
/**
* Whether the map is closed. Volatile so we don't accidentally write to a
......@@ -66,7 +72,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
(DataType) config.get("val"),
DataUtils.readHexInt(config, "id", 0),
DataUtils.readHexLong(config, "createVersion", 0),
new AtomicReference<RootReference>()
new AtomicReference<RootReference>(),
((MVStore) config.get("store")).getKeysPerPage(),
config.containsKey("singleWriter") && (Boolean) config.get("singleWriter")
);
setInitialRoot(createEmptyLeaf(), store.getCurrentVersion());
}
......@@ -74,23 +82,29 @@ public class MVMap<K, V> extends AbstractMap<K, V>
// constructor for cloneIt()
protected MVMap(MVMap<K, V> source) {
this(source.store, source.keyType, source.valueType, source.id, source.createVersion,
new AtomicReference<>(source.root.get()));
new AtomicReference<>(source.root.get()), source.keysPerPage, source.singleWriter);
}
// meta map constructor
MVMap(MVStore store) {
this(store, StringDataType.INSTANCE,StringDataType.INSTANCE, 0, 0, new AtomicReference<RootReference>());
this(store, StringDataType.INSTANCE,StringDataType.INSTANCE, 0, 0, new AtomicReference<RootReference>(),
store.getKeysPerPage(), false);
setInitialRoot(createEmptyLeaf(), store.getCurrentVersion());
}
@SuppressWarnings("unchecked")
private MVMap(MVStore store, DataType keyType, DataType valueType, int id, long createVersion,
AtomicReference<RootReference> root) {
AtomicReference<RootReference> root, int keysPerPage, boolean singleWriter) {
this.store = store;
this.id = id;
this.createVersion = createVersion;
this.keyType = keyType;
this.valueType = valueType;
this.root = root;
this.keysPerPage = keysPerPage;
this.keysBuffer = singleWriter ? (K[]) new Object[keysPerPage] : null;
this.valuesBuffer = singleWriter ? (V[]) new Object[keysPerPage] : null;
this.singleWriter = singleWriter;
}
protected MVMap<K, V> cloneIt() {
......@@ -947,13 +961,15 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the number of entries
*/
public final long sizeAsLong() {
return getRootPage().getTotalCount();
RootReference rootReference = getRoot();
return rootReference.root.getTotalCount() + rootReference.getAppendCounter();
}
@Override
public boolean isEmpty() {
Page rootPage = getRootPage();
return rootPage.isLeaf() && rootPage.getKeyCount() == 0;
RootReference rootReference = getRoot();
Page rootPage = rootReference.root;
return rootPage.isLeaf() && rootPage.getKeyCount() == 0 && rootReference.getAppendCounter() == 0;
}
public final long getCreateVersion() {
......@@ -1037,6 +1053,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return getVersion() > version;
}
public boolean isSingleWriter() {
return singleWriter;
}
/**
* Get the child page count for this page. This is to allow another map
* implementation to override the default, in case the last child is not to
......@@ -1091,6 +1111,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
return rootReference;
}
rootReference = flushAppendBuffer(rootReference);
RootReference updatedRootReference = new RootReference(rootReference, writeVersion, ++attempt);
if(root.compareAndSet(rootReference, updatedRootReference)) {
removeUnusedOldVersions(updatedRootReference);
......@@ -1148,6 +1169,151 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return target;
}
public RootReference flushAppendBuffer() {
return flushAppendBuffer(null);
}
private RootReference flushAppendBuffer(RootReference rootReference) {
int attempt = 0;
while(true) {
if (rootReference == null) {
rootReference = getRoot();
}
int keyCount = rootReference.getAppendCounter();
if (keyCount == 0) {
break;
}
Page page = Page.create(this,
Arrays.copyOf(keysBuffer, keyCount),
Arrays.copyOf(valuesBuffer, keyCount),
null, keyCount, 0);
rootReference = appendLeafPage(rootReference, page, ++attempt);
if (rootReference != null) {
break;
}
}
assert rootReference.getAppendCounter() == 0;
return rootReference;
}
private RootReference appendLeafPage(RootReference rootReference, Page split, int attempt) {
CursorPos pos = rootReference.root.getAppendCursorPos(null);
assert split.map == this;
assert pos != null;
assert split.getKeyCount() > 0;
Object key = split.getKey(0);
assert pos.index < 0 : pos.index;
int index = -pos.index - 1;
assert index == pos.page.getKeyCount() : index + " != " + pos.page.getKeyCount();
Page p = pos.page;
pos = pos.parent;
CursorPos tip = pos;
int unsavedMemory = 0;
while (true) {
if (pos == null) {
if (p.getKeyCount() == 0) {
p = split;
} else {
Object keys[] = new Object[] { key };
Page.PageReference children[] = new Page.PageReference[store.getKeysPerPage() + 1];
children[0] = new Page.PageReference(p);
children[1] = new Page.PageReference(split);
p = Page.create(this, keys, null, children, p.getTotalCount() + split.getTotalCount(), 0);
}
break;
}
Page c = p;
p = pos.page;
index = pos.index;
pos = pos.parent;
p = p.copy();
p.setChild(index, split);
p.insertNode(index, key, c);
int keyCount;
if ((keyCount = p.getKeyCount()) <= store.getKeysPerPage() && (p.getMemory() < store.getMaxPageSize() || keyCount <= (p.isLeaf() ? 1 : 2))) {
break;
}
int at = keyCount - 2;
key = p.getKey(at);
split = p.split(at);
unsavedMemory += p.getMemory() + split.getMemory();
}
unsavedMemory += p.getMemory();
while (pos != null) {
Page c = p;
p = pos.page;
p = p.copy();
p.setChild(pos.index, c);
unsavedMemory += p.getMemory();
pos = pos.parent;
}
RootReference updatedRootReference = new RootReference(rootReference, p, ++attempt);
if(root.compareAndSet(rootReference, updatedRootReference)) {
while (tip != null) {
tip.page.removePage();
tip = tip.parent;
}
if (store.getFileStore() != null) {
store.registerUnsavedPage(unsavedMemory);
}
return updatedRootReference;
}
return null;
}
/**
* Appends entry to this map. this method is NOT thread safe and can not be used
* neither concurrently, nor in combination with any method that updates this map.
* Non-updating method may be used concurrently, but latest appended values
* are not guaranteed to be visible.
* @param key should be higher in map's order than any existing key
* @param value to be appended
*/
public void append(K key, V value) {
int attempt = 0;
boolean success = false;
while(!success) {
RootReference rootReference = getRoot();
int appendCounter = rootReference.getAppendCounter();
if (appendCounter >= keysPerPage) {
rootReference = flushAppendBuffer(rootReference);
appendCounter = rootReference.getAppendCounter();
assert appendCounter < keysPerPage;
}
keysBuffer[appendCounter] = key;
valuesBuffer[appendCounter] = value;
RootReference updatedRootReference = new RootReference(rootReference, appendCounter + 1, ++attempt);
success = root.compareAndSet(rootReference, updatedRootReference);
}
}
/**
* Removes last entry from this map. this method is NOT thread safe and can not be used
* neither concurrently, nor in combination with any method that updates this map.
* Non-updating method may be used concurrently, but latest removal may not be visible.
*/
public void trimLast() {
int attempt = 0;
boolean success;
do {
RootReference rootReference = getRoot();
int appendCounter = rootReference.getAppendCounter();
if (appendCounter > 0) {
RootReference updatedRootReference = new RootReference(rootReference, appendCounter - 1, ++attempt);
success = root.compareAndSet(rootReference, updatedRootReference);
} else {
assert rootReference.root.getKeyCount() > 0;
Page lastLeaf = rootReference.root.getAppendCursorPos(null).page;
assert lastLeaf.isLeaf();
assert lastLeaf.getKeyCount() > 0;
Object key = lastLeaf.getKey(lastLeaf.getKeyCount() - 1);
success = remove(key) != null;
assert success;
}
} while(!success);
}
@Override
public final String toString() {
return asString(null);
......@@ -1179,6 +1345,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Counter for attempted root updates.
*/
public final long updateAttemptCounter;
/**
* Size of the occupied part of the append buffer.
*/
public final byte appendCounter;
RootReference(Page root, long version, RootReference previous,
long updateCounter, long updateAttemptCounter,
......@@ -1189,6 +1359,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.updateCounter = updateCounter;
this.updateAttemptCounter = updateAttemptCounter;
this.lockedForUpdate = lockedForUpdate;
this.appendCounter = 0;
}
// This one is used for locking
......@@ -1199,6 +1370,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.updateCounter = r.updateCounter;
this.updateAttemptCounter = r.updateAttemptCounter;
this.lockedForUpdate = true;
this.appendCounter = 0;
}
// This one is used for unlocking
......@@ -1209,6 +1381,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.lockedForUpdate = false;
this.appendCounter = 0;
}
// This one is used for version change
......@@ -1224,6 +1397,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.lockedForUpdate = r.lockedForUpdate;
this.appendCounter = r.appendCounter;
}
// This one is used for r/o snapshots
......@@ -1234,6 +1408,22 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.updateCounter = 1;
this.updateAttemptCounter = 1;
this.lockedForUpdate = false;
this.appendCounter = 0;
}
// This one is used for append buffer maintance
RootReference(RootReference r, int appendCounter, int attempt) {
this.root = r.root;
this.version = r.version;
this.previous = r.previous;
this.updateCounter = r.updateCounter + 1;
this.updateAttemptCounter = r.updateAttemptCounter + attempt;
this.lockedForUpdate = r.lockedForUpdate;
this.appendCounter = (byte)appendCounter;
}
public int getAppendCounter() {
return appendCounter & 0xff;
}
@Override
......@@ -1357,6 +1547,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param <V> the value type
*/
public static class Builder<K, V> extends BasicBuilder<MVMap<K, V>, K, V> {
private boolean singleWriter;
public Builder() {}
......@@ -1372,8 +1563,20 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return this;
}
/**
* Set up this Builder to produce MVMap, which can be used in append mode
* by a single thread.
* @see MVMap#append(Object, Object)
* @return this Builder for chained execution
*/
public Builder<K,V> singleWriter() {
singleWriter = true;
return this;
}
@Override
protected MVMap<K, V> create(Map<String, Object> config) {
config.put("singleWriter", singleWriter);
Object type = config.get("type");
if(type == null || type.equals("rtree")) {
return new MVMap<>(config);
......
......@@ -836,6 +836,8 @@ public abstract class Page implements Cloneable
}
}
public abstract CursorPos getAppendCursorPos(CursorPos cursorPos);
public abstract void removeAllRecursive();
private Object[] createKeyStorage(int size)
......@@ -1083,6 +1085,13 @@ public abstract class Page implements Cloneable
removePage();
}
@Override
public CursorPos getAppendCursorPos(CursorPos cursorPos) {
int keyCount = getKeyCount();
Page childPage = getChildPage(keyCount);
return childPage.getAppendCursorPos(new CursorPos(this, keyCount, cursorPos));
}
@Override
protected void readPayLoad(ByteBuffer buff) {
int keyCount = getKeyCount();
......@@ -1322,6 +1331,12 @@ public abstract class Page implements Cloneable
removePage();
}
@Override
public CursorPos getAppendCursorPos(CursorPos cursorPos) {
int keyCount = getKeyCount();
return new CursorPos(this, -keyCount - 1, cursorPos);
}
@Override
protected void readPayLoad(ByteBuffer buff) {
int keyCount = getKeyCount();
......
......@@ -19,6 +19,7 @@ import org.h2.index.Cursor;
import org.h2.index.IndexType;
import org.h2.message.DbException;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.tx.Transaction;
import org.h2.mvstore.tx.TransactionMap;
import org.h2.result.Row;
......@@ -79,8 +80,9 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex {
MVMap<ValueArray, Value> map = openMap(bufferName);
for (Row row : rows) {
ValueArray key = convertToKey(row);
map.put(key, ValueNull.INSTANCE);
map.append(key, ValueNull.INSTANCE);
}
map.flushAppendBuffer();
}
private static final class Source {
......@@ -154,9 +156,9 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex {
}
}
} finally {
MVStore store = database.getMvStore().getStore();
for (String tempMapName : bufferNames) {
MVMap<ValueArray, Value> map = openMap(tempMapName);
map.getStore().removeMap(map);
store.removeMap(tempMapName);
}
}
}
......@@ -171,7 +173,9 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex {
database.getCompareMode(), database, sortTypes);
ValueDataType valueType = new ValueDataType(null, null, null);
MVMap.Builder<ValueArray, Value> builder =
new MVMap.Builder<ValueArray, Value>().keyType(keyType).valueType(valueType);
new MVMap.Builder<ValueArray, Value>()
.singleWriter()
.keyType(keyType).valueType(valueType);
MVMap<ValueArray, Value> map = database.getMvStore().
getStore().openMap(mapName, builder);
if (!keyType.equals(map.getKeyType())) {
......
......@@ -300,7 +300,7 @@ public class Transaction {
}
int currentStatus = getStatus(currentState);
checkOpen(currentStatus);
store.removeUndoLogRecord(transactionId, logId);
store.removeUndoLogRecord(transactionId);
}
/**
......
......@@ -88,9 +88,9 @@ public class TransactionMap<K, V> {
for (int i = opentransactions.nextSetBit(0); i >= 0; i = opentransactions.nextSetBit(i+1)) {
MVMap<Long, Object[]> undoLog = store.undoLogs[i];
if (undoLog != null) {
MVMap.RootReference rootReference = undoLog.getRoot();
MVMap.RootReference rootReference = undoLog.flushAppendBuffer();
undoLogRootReferences[i] = rootReference;
undoLogSize += rootReference.root.getTotalCount();
undoLogSize += rootReference.root.getTotalCount() + rootReference.getAppendCounter();
}
}
} while(committingTransactions != store.committingTransactions.get() ||
......
......@@ -16,6 +16,7 @@ import org.h2.mvstore.Cursor;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.Page;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.ObjectDataType;
......@@ -132,7 +133,9 @@ public class TransactionStore {
ArrayType undoLogValueType = new ArrayType(new DataType[]{
new ObjectDataType(), dataType, oldValueType
});
undoLogBuilder = new MVMap.Builder<Long, Object[]>().valueType(undoLogValueType);
undoLogBuilder = new MVMap.Builder<Long, Object[]>()
.singleWriter()
.valueType(undoLogValueType);
}
/**
......@@ -391,24 +394,16 @@ public class TransactionStore {
"is still open: {0}",
transactionId);
}
undoLog.put(undoKey, undoLogRecord);
undoLog.append(undoKey, undoLogRecord);
return undoKey;
}
/**
* Remove an undo log entry.
* @param transactionId id of the transaction
* @param logId sequential number of the log record within transaction
*/
public void removeUndoLogRecord(int transactionId, long logId) {
Long undoKey = getOperationId(transactionId, logId);
Object[] old = undoLogs[transactionId].remove(undoKey);
if (old == null) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_TRANSACTION_ILLEGAL_STATE,
"Transaction {0} was concurrently rolled back",
transactionId);
}
void removeUndoLogRecord(int transactionId) {
undoLogs[transactionId].trimLast();
}
/**
......@@ -442,7 +437,9 @@ public class TransactionStore {
store.renameMap(undoLog, getUndoLogName(true, transactionId));
}
try {
Cursor<Long, Object[]> cursor = undoLog.cursor(null);
MVMap.RootReference rootReference = undoLog.flushAppendBuffer();
Page rootPage = rootReference.root;
Cursor<Long, Object[]> cursor = new Cursor<>(rootPage, null);
while (cursor.hasNext()) {
Long undoKey = cursor.next();
Object[] op = cursor.getValue();
......@@ -597,6 +594,7 @@ public class TransactionStore {
void rollbackTo(Transaction t, long maxLogId, long toLogId) {
int transactionId = t.getId();
MVMap<Long, Object[]> undoLog = undoLogs[transactionId];
undoLog.flushAppendBuffer();
RollbackDecisionMaker decisionMaker = new RollbackDecisionMaker(this, transactionId, toLogId, t.listener);
for (long logId = maxLogId - 1; logId >= toLogId; logId--) {
Long undoKey = getOperationId(transactionId, logId);
......@@ -618,6 +616,7 @@ public class TransactionStore {
final long toLogId) {
final MVMap<Long, Object[]> undoLog = undoLogs[t.getId()];
undoLog.flushAppendBuffer();
return new Iterator<Change>() {
private long logId = maxLogId - 1;
......
......@@ -509,7 +509,7 @@ public class TestMultiThread extends TestDb implements Runnable {
@Override
public void run() {
try {
Connection c = getConnection("concurrentUpdate2");
Connection c = getConnection("concurrentUpdate2;LOCK_TIMEOUT=10000");
PreparedStatement ps = c.prepareStatement("UPDATE TEST SET V = ? WHERE " + column + " = ?");
for (int test = 0; test < 1000; test++) {
for (int i = 0; i < 16; i++) {
......
......@@ -76,7 +76,7 @@ public class TestMVTableEngine extends TestDb {
testMinMaxWithNull();
testTimeout();
testExplainAnalyze();
testTransactionLogUsuallyNotStored();
testTransactionLogEmptyAfterCommit();
testShrinkDatabaseFile();
testTwoPhaseCommit();
testRecover();
......@@ -642,42 +642,39 @@ public class TestMVTableEngine extends TestDb {
conn.close();
}
private void testTransactionLogUsuallyNotStored() throws Exception {
private void testTransactionLogEmptyAfterCommit() throws Exception {
Connection conn;
Statement stat;
// we expect the transaction log is empty in at least some of the cases
for (int test = 0; test < 5; test++) {
deleteDb(getTestName());
String url = getTestName() + ";MV_STORE=TRUE";
url = getURL(url, true);
conn = getConnection(url);
stat = conn.createStatement();
stat.execute("create table test(id identity, name varchar)");
conn.setAutoCommit(false);
PreparedStatement prep = conn.prepareStatement(
"insert into test(name) values(space(10000))");
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 100; i++) {
prep.execute();
}
conn.commit();
}
stat.execute("shutdown immediately");
JdbcUtils.closeSilently(conn);
String file = getBaseDir() + "/" + getTestName() +
Constants.SUFFIX_MV_FILE;
MVStore store = MVStore.open(file);
TransactionStore t = new TransactionStore(store);
t.init();
int openTransactions = t.getOpenTransactions().size();
store.close();
if (openTransactions == 0) {
return;
deleteDb(getTestName());
String url = getTestName() + ";MV_STORE=TRUE";
url = getURL(url, true);
conn = getConnection(url);
stat = conn.createStatement();
stat.execute("create table test(id identity, name varchar)");
stat.execute("set write_delay 0");
conn.setAutoCommit(false);
PreparedStatement prep = conn.prepareStatement(
"insert into test(name) values(space(10000))");
for (int j = 0; j < 100; j++) {
for (int i = 0; i < 100; i++) {
prep.execute();
}
conn.commit();
}
stat.execute("shutdown immediately");
JdbcUtils.closeSilently(conn);
String file = getBaseDir() + "/" + getTestName() +
Constants.SUFFIX_MV_FILE;
MVStore store = MVStore.open(file);
TransactionStore t = new TransactionStore(store);
t.init();
int openTransactions = t.getOpenTransactions().size();
store.close();
if (openTransactions != 0) {
fail("transaction log was not empty");
}
fail("transaction log was never empty");
}
private void testShrinkDatabaseFile() throws Exception {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论