提交 a77eb02a authored 作者: andrei's avatar andrei

remove batch API for now

fix issue #999
上级 45a1bba8
......@@ -21,14 +21,8 @@ import org.h2.mvstore.type.StringDataType;
/**
* A stored map.
* <p>
* Read operations can happen concurrently with all other
* All read and write operations can happen concurrently with all other
* operations, without risk of corruption.
* <p>
* Write operations first read the relevant area from disk to memory
* concurrently, and only then modify the data. The in-memory part of write
* operations is synchronized. For scalable concurrent in-memory write
* operations, the map should be split into multiple smaller sub-maps that are
* then synchronized independently.
*
* @param <K> the key class
* @param <V> the value class
......@@ -887,7 +881,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Forget those old versions that are no longer needed.
* @param rootReference to inspect
*/
void removeUnusedOldVersions(RootReference rootReference) {
private void removeUnusedOldVersions(RootReference rootReference) {
long oldest = store.getOldestVersionToKeep();
// We are trying to keep at least one previous version (if any) here.
// This is not really necessary, just need to mimic existing
......@@ -1129,7 +1123,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return Page.createEmptyLeaf(this);
}
public Page createEmptyNode() {
protected Page createEmptyNode() {
return Page.createEmptyNode(this);
}
......@@ -1404,86 +1398,83 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
}
public interface EntryProcessor<K,V> {
boolean process(K key, V value);
}
public enum Decision { ABORT, REMOVE, PUT }
@SuppressWarnings("unchecked")
public static <K,V> void process(Page root, K from, EntryProcessor<K,V> entryProcessor) {
CursorPos cursorPos = Cursor.traverseDown(root, from);
CursorPos keeper = null;
while (true) {
Page page = cursorPos.page;
int index = cursorPos.index;
if (index >= (page.isLeaf() ? page.getKeyCount() : root.map.getChildPageCount(page))) {
CursorPos tmp = cursorPos;
cursorPos = cursorPos.parent;
tmp.parent = keeper;
keeper = tmp;
if(cursorPos == null) {
return;
public abstract static class DecisionMaker<V>
{
public static final DecisionMaker<Object> DEFAULT = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return providedValue == null ? Decision.REMOVE : Decision.PUT;
}
} else {
while (!page.isLeaf()) {
page = page.getChildPage(index);
if (keeper == null) {
cursorPos = new CursorPos(page, 0, cursorPos);
} else {
CursorPos tmp = keeper;
keeper = keeper.parent;
tmp.parent = cursorPos;
tmp.page = page;
tmp.index = 0;
cursorPos = tmp;
@Override
public String toString() {
return "default";
}
index = 0;
};
public static final DecisionMaker<Object> PUT = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return Decision.PUT;
}
K key = (K) page.getKey(index);
V value = (V) page.getValue(index);
if(entryProcessor.process(key, value)) {
return;
@Override
public String toString() {
return "put";
}
};
public static final DecisionMaker<Object> REMOVE = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return Decision.REMOVE;
}
++cursorPos.index;
@Override
public String toString() {
return "remove";
}
};
private static final DecisionMaker<Object> IF_ABSENT = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return existingValue == null ? Decision.PUT : Decision.ABORT;
}
public interface LeafProcessor
{
CursorPos locate(Page rootPage);
Page[] process(CursorPos pos);
CursorPos locateNext(Page rootPage);
void stepBack();
void confirmSuccess();
@Override
public String toString() {
return "if_absent";
}
};
public final void operateBatch(LeafProcessor processor) {
beforeWrite();
int attempt = 0;
RootReference rootReference = getRoot();
RootReference oldRootReference = null;
CursorPos pos;
while(true) {
if (rootReference.semaphore) {
Thread.yield();
rootReference = getRoot();
continue;
private static final DecisionMaker<Object> IF_PRESENT = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return existingValue != null ? Decision.PUT : Decision.ABORT;
}
pos = processor.locate(rootReference.root);
if (pos == null) {
return;
@Override
public String toString() {
return "if_present";
}
Page replacement[] = processor.process(pos);
if (replacement == null) {
if(rootReference != getRoot()) {
processor.stepBack();
rootReference = getRoot();
continue;
};
public abstract Decision decide(V existingValue, V providedValue);
public <T extends V> T selectValue(T existingValue, T providedValue) {
return providedValue;
}
return;
public void reset() {}
}
public V operate(K key, V value, DecisionMaker<? super V> decisionMaker) {
beforeWrite();
int attempt = 0;
RootReference oldRootReference = null;
while(true) {
RootReference rootReference = getRoot();
int contention = 0;
if (oldRootReference != null) {
long updateAttemptCounter = rootReference.updateAttemptCounter - oldRootReference.updateAttemptCounter;
......@@ -1495,39 +1486,54 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
oldRootReference = rootReference;
++attempt;
CursorPos tip = pos;
CursorPos pos = traverseDown(rootReference.root, key);
Page p = pos.page;
int index = pos.index;
CursorPos tip = pos;
pos = pos.parent;
final V result = index < 0 ? null : (V)p.getValue(index);
Decision decision = decisionMaker.decide(result, value);
int unsavedMemory = 0;
boolean needUnlock = false;
try {
if (attempt > 4 && !(needUnlock = lockRoot(processor, rootReference, attempt, contention))) {
processor.stepBack();
rootReference = getRoot();
switch (decision) {
case ABORT:
if(rootReference != getRoot()) {
decisionMaker.reset();
continue;
}
int index;
switch (replacement.length) {
case 0:
if (pos != null) {
return result;
case REMOVE: {
if (index < 0) {
return null;
}
if (attempt > 2 && !(needUnlock = lockRoot(decisionMaker, rootReference, attempt, contention))) {
continue;
}
if (p.getTotalCount() == 1 && pos != null) {
p = pos.page;
index = pos.index;
pos = pos.parent;
assert p.getKeyCount() > 0;
if (p.getKeyCount() == 1) {
assert index <= 1;
p = p.getChildPage(1 - index);
break;
}
assert p.getKeyCount() > 1;
}
p = p.copy();
p.remove(index);
} else {
p = createEmptyLeaf();
}
break;
case 1:
p = replacement[0];
}
case PUT: {
if (attempt > 2 && !(needUnlock = lockRoot(decisionMaker, rootReference, attempt, contention))) {
continue;
}
value = decisionMaker.selectValue(result, value);
p = p.copy();
if (index < 0) {
p.insertLeaf(-index - 1, key, value);
int keyCount;
while ((keyCount = p.getKeyCount()) > store.getKeysPerPage() || p.getMemory() > store.getMaxPageSize()
&& keyCount > (p.isLeaf() ? 1 : 2)) {
......@@ -1554,9 +1560,12 @@ public class MVMap<K, V> extends AbstractMap<K, V>
p.setChild(index, split);
p.insertNode(index, k, c);
}
default:
} else {
p.setValue(index, value);
}
break;
}
}
unsavedMemory += p.getMemory();
while (pos != null) {
Page c = p;
......@@ -1566,15 +1575,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
unsavedMemory += p.getMemory();
pos = pos.parent;
}
if ((pos = processor.locateNext(p)) != null) {
continue; // Multi-node batch operation
}
if(needUnlock) {
unlockRoot(p, attempt);
needUnlock = false;
} else if(!updateRoot(rootReference, p, attempt)) {
processor.stepBack();
rootReference = getRoot();
decisionMaker.reset();
continue;
}
while (tip != null) {
......@@ -1584,8 +1589,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (store.getFileStore() != null) {
store.registerUnsavedPage(unsavedMemory);
}
processor.confirmSuccess();
break;
return result;
} finally {
if(needUnlock) {
unlockRoot(rootReference.root, attempt);
......@@ -1594,12 +1598,12 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
}
private boolean lockRoot(LeafProcessor processor, RootReference rootReference,
private boolean lockRoot(DecisionMaker<? super V> decisionMaker, RootReference rootReference,
int attempt, int contention) {
boolean success = root.compareAndSet(rootReference, new RootReference(rootReference));
boolean success = lockRoot(rootReference);
if (!success) {
processor.stepBack();
if(attempt > 8) {
decisionMaker.reset();
if(attempt > 4) {
if (attempt <= 24) {
Thread.yield();
} else {
......@@ -1612,6 +1616,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return success;
}
private boolean lockRoot(RootReference rootReference) {
return !rootReference.semaphore
&& root.compareAndSet(rootReference, new RootReference(rootReference));
}
private void unlockRoot(Page newRoot, int attempt) {
boolean success;
do {
......@@ -1635,154 +1644,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return new CursorPos(p, p.binarySearch(key), pos);
}
public static final class SingleDecisionMaker<K,V> implements LeafProcessor
{
private final DecisionMaker<? super V> decisionMaker;
private final K key;
private final V value;
private V result;
public SingleDecisionMaker(K key, V value, DecisionMaker<? super V> decisionMaker) {
this.decisionMaker = decisionMaker;
this.key = key;
this.value = value;
}
@Override
public CursorPos locate(Page rootPage) {
return traverseDown(rootPage, key);
}
@Override
public CursorPos locateNext(Page rootPage) {
return null;
}
@Override
public void stepBack() {
decisionMaker.reset();
}
@Override
public void confirmSuccess() {}
@SuppressWarnings("unchecked")
@Override
public Page[] process(CursorPos pos) {
Page leaf = pos.page;
int index = pos.index;
result = index < 0 ? null : (V)leaf.getValue(index);
Decision decision = decisionMaker.decide(result, value);
switch (decision) {
case ABORT:
return null;
case REMOVE: {
if (index < 0) {
return null;
}
if (leaf.getTotalCount() == 1) {
return new Page[0];
}
leaf = leaf.copy();
leaf.remove(index);
return new Page[] { leaf };
}
case PUT: {
V v = decisionMaker.selectValue(result, value);
leaf = leaf.copy();
if (index < 0) {
leaf.insertLeaf(-index - 1, key, v);
} else {
leaf.setValue(index, v);
}
return new Page[] { leaf };
}
default:
return null;
}
}
public V getResult() {
return result;
}
}
public enum Decision { ABORT, REMOVE, PUT }
public abstract static class DecisionMaker<V> {
public static final DecisionMaker<Object> DEFAULT = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return providedValue == null ? Decision.REMOVE : Decision.PUT;
}
@Override
public String toString() {
return "default";
}
};
public static final DecisionMaker<Object> PUT = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return Decision.PUT;
}
@Override
public String toString() {
return "put";
}
};
public static final DecisionMaker<Object> REMOVE = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return Decision.REMOVE;
}
@Override
public String toString() {
return "remove";
}
};
private static final DecisionMaker<Object> IF_ABSENT = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return existingValue == null ? Decision.PUT : Decision.ABORT;
}
@Override
public String toString() {
return "if_absent";
}
};
private static final DecisionMaker<Object> IF_PRESENT = new DecisionMaker<Object>() {
@Override
public Decision decide(Object existingValue, Object providedValue) {
return existingValue != null ? Decision.PUT : Decision.ABORT;
}
@Override
public String toString() {
return "if_present";
}
};
public abstract Decision decide(V existingValue, V providedValue);
public <T extends V> T selectValue(T existingValue, T providedValue) {
return providedValue;
}
public void reset() {}
}
public V operate(K key, V value, DecisionMaker<? super V> decisionMaker) {
SingleDecisionMaker<K, V> processor = new SingleDecisionMaker<>(key, value, decisionMaker);
operateBatch(processor);
return processor.getResult();
}
private static final class EqualsDecisionMaker<V> extends DecisionMaker<V> {
private final DataType dataType;
private final V expectedValue;
......
......@@ -114,8 +114,8 @@ public class MVPrimaryIndex extends BaseIndex {
TransactionMap<Value, Value> map = getMap(session);
Value key = ValueLong.get(row.getKey());
Value old = map.getLatest(key);
if (old != null) {
try {
if (map.put(key, ValueArray.get(row.getValueList())) != null) {
String sql = "PRIMARY KEY ON " + table.getSQL();
if (mainIndexColumn >= 0 && mainIndexColumn < indexColumns.length) {
sql += "(" + indexColumns[mainIndexColumn].getSQL() + ")";
......@@ -124,8 +124,6 @@ public class MVPrimaryIndex extends BaseIndex {
e.setSource(this);
throw e;
}
try {
map.put(key, ValueArray.get(row.getValueList()));
} catch (IllegalStateException e) {
throw mvTable.convertException(e);
}
......
......@@ -1198,16 +1198,6 @@ public class TransactionStore {
return get(key, readLogId);
}
/**
* Get the most recent value for the given key.
*
* @param key the key
* @return the value or null
*/
public V getLatest(K key) {
return get(key, Long.MAX_VALUE);
}
/**
* Whether the map contains the key.
*
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论