提交 6b6a28fd authored 作者: Thomas Mueller's avatar Thomas Mueller

A concurrent queue, to replace the array list of old roots (work in progress)

上级 9b92121d
...@@ -39,38 +39,39 @@ public class ConcurrentLinkedList<K> { ...@@ -39,38 +39,39 @@ public class ConcurrentLinkedList<K> {
} }
} }
public void removeFirst(K obj) { public boolean removeFirst(K obj) {
Entry<K> x = head; Entry<K> x = head;
if (x == null) { if (x == null || x.obj != obj) {
return; return false;
} }
if (x.obj.equals(obj)) { if (head == tail) {
if (head == tail) { tail = x.next;
tail = x.next;
}
head = x.next;
} }
head = x.next;
return true;
} }
public void removeLast(K obj) { public boolean removeLast(K obj) {
Entry<K> x = head; Entry<K> x = head;
if (x == null) { if (x == null) {
return; return false;
} }
Entry<K> prev = null; Entry<K> prev = null;
while (x.next != null) { while (x.next != null) {
prev = x; prev = x;
x = x.next; x = x.next;
} }
if (x.obj.equals(obj)) { if (x.obj != obj) {
if (prev != null) { return false;
prev.next = null; }
} if (prev != null) {
if (head == tail) { prev.next = null;
head = prev; }
} if (head == tail) {
tail = prev; head = prev;
} }
tail = prev;
return true;
} }
public Iterator<K> iterator() { public Iterator<K> iterator() {
......
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore;
import java.util.Iterator;
/**
* A ring buffer that supports concurrent access.
*
* @param <K> the key type
*/
public class ConcurrentRing<K> {
K[] buffer;
volatile int readPos;
volatile int writePos;
@SuppressWarnings("unchecked")
public ConcurrentRing() {
buffer = (K[]) new Object[4];
}
public K peekFirst() {
return buffer[getIndex(readPos)];
}
public K peekLast() {
return buffer[getIndex(writePos - 1)];
}
public void add(K obj) {
buffer[getIndex(writePos)] = obj;
writePos++;
if (writePos - readPos >= buffer.length) {
// double the capacity
@SuppressWarnings("unchecked")
K[] b2 = (K[]) new Object[buffer.length * 2];
for (int i = readPos; i < writePos; i++) {
K x = buffer[getIndex(i)];
int i2 = i & b2.length - 1;
b2[i2] = x;
}
buffer = b2;
}
}
public boolean removeFirst(K obj) {
int p = readPos;
int idx = getIndex(p);
if (buffer[idx] != obj) {
return false;
}
buffer[idx] = null;
readPos = p + 1;
return true;
}
public boolean removeLast(K obj) {
int p = writePos;
int idx = getIndex(p - 1);
if (buffer[idx] != obj) {
return false;
}
buffer[idx] = null;
writePos = p - 1;
return true;
}
int getIndex(int pos) {
return pos & buffer.length - 1;
}
public Iterator<K> iterator() {
return new Iterator<K>() {
int pos = readPos;
@Override
public boolean hasNext() {
return pos != writePos;
}
@Override
public K next() {
return buffer[getIndex(pos++)];
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException("remove");
}
};
}
}
...@@ -54,8 +54,8 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -54,8 +54,8 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private final DataType keyType; private final DataType keyType;
private final DataType valueType; private final DataType valueType;
private ConcurrentLinkedList<Page> oldRootsList = private ConcurrentRing<Page> oldRoots =
new ConcurrentLinkedList<Page>(); new ConcurrentRing<Page>();
private boolean closed; private boolean closed;
private boolean readOnly; private boolean readOnly;
...@@ -720,9 +720,9 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -720,9 +720,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (root != newRoot) { if (root != newRoot) {
removeUnusedOldVersions(); removeUnusedOldVersions();
if (root.getVersion() != newRoot.getVersion()) { if (root.getVersion() != newRoot.getVersion()) {
Page last = oldRootsList.peekLast(); Page last = oldRoots.peekLast();
if (last == null || last.getVersion() != root.getVersion()) { if (last == null || last.getVersion() != root.getVersion()) {
oldRootsList.add(root); oldRoots.add(root);
} }
} }
root = newRoot; root = newRoot;
...@@ -970,12 +970,12 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -970,12 +970,12 @@ public class MVMap<K, V> extends AbstractMap<K, V>
// the map is removed later // the map is removed later
} else if (root.getVersion() >= version) { } else if (root.getVersion() >= version) {
while (true) { while (true) {
Page last = oldRootsList.peekLast(); Page last = oldRoots.peekLast();
if (last == null) { if (last == null) {
break; break;
} }
// slow, but rollback is not a common operation // slow, but rollback is not a common operation
oldRootsList.removeLast(last); oldRoots.removeLast(last);
root = last; root = last;
if (root.getVersion() < version) { if (root.getVersion() < version) {
break; break;
...@@ -995,16 +995,16 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -995,16 +995,16 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (oldest == -1) { if (oldest == -1) {
return; return;
} }
Page last = oldRootsList.peekLast(); Page last = oldRoots.peekLast();
// TODO why is this? // TODO why is this?
; ;
oldest--; oldest--;
while (true) { while (true) {
Page p = oldRootsList.peekFirst(); Page p = oldRoots.peekFirst();
if (p == null || p.getVersion() >= oldest || p == last) { if (p == null || p.getVersion() >= oldest || p == last) {
break; break;
} }
oldRootsList.removeFirst(p); oldRoots.removeFirst(p);
} }
} }
...@@ -1166,12 +1166,12 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1166,12 +1166,12 @@ public class MVMap<K, V> extends AbstractMap<K, V>
store.getFileStore() == null)) { store.getFileStore() == null)) {
newest = r; newest = r;
} else { } else {
Page last = oldRootsList.peekFirst(); Page last = oldRoots.peekFirst();
if (last == null || version < last.getVersion()) { if (last == null || version < last.getVersion()) {
// smaller than all in-memory versions // smaller than all in-memory versions
return store.openMapVersion(version, id, this); return store.openMapVersion(version, id, this);
} }
Iterator<Page> it = oldRootsList.iterator(); Iterator<Page> it = oldRoots.iterator();
while (it.hasNext()) { while (it.hasNext()) {
Page p = it.next(); Page p = it.next();
if (p.getVersion() > version) { if (p.getVersion() > version) {
......
...@@ -8,8 +8,9 @@ package org.h2.test.store; ...@@ -8,8 +8,9 @@ package org.h2.test.store;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.mvstore.ConcurrentLinkedList; import org.h2.mvstore.ConcurrentRing;
import org.h2.test.TestBase; import org.h2.test.TestBase;
import org.h2.util.Task; import org.h2.util.Task;
...@@ -47,18 +48,26 @@ public class TestConcurrentLinkedList extends TestBase { ...@@ -47,18 +48,26 @@ public class TestConcurrentLinkedList extends TestBase {
private void testPerformance(final boolean stock) { private void testPerformance(final boolean stock) {
System.out.print(stock ? "stock " : "custom "); System.out.print(stock ? "stock " : "custom ");
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
final ConcurrentLinkedList<Integer> test = new ConcurrentLinkedList<Integer>(); // final ConcurrentLinkedList<Integer> test = new ConcurrentLinkedList<Integer>();
final ConcurrentRing<Integer> test = new ConcurrentRing<Integer>();
final LinkedList<Integer> x = new LinkedList<Integer>(); final LinkedList<Integer> x = new LinkedList<Integer>();
final AtomicInteger counter = new AtomicInteger();
Task task = new Task() { Task task = new Task() {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
while (!stop) { while (!stop) {
if (stock) { if (stock) {
synchronized (x) { synchronized (x) {
x.peekFirst(); Integer y = x.peekFirst();
if (y == null) {
counter.incrementAndGet();
}
} }
} else { } else {
test.peekFirst(); Integer y = test.peekFirst();
if (y == null) {
counter.incrementAndGet();
}
} }
} }
} }
...@@ -66,28 +75,29 @@ public class TestConcurrentLinkedList extends TestBase { ...@@ -66,28 +75,29 @@ public class TestConcurrentLinkedList extends TestBase {
task.execute(); task.execute();
test.add(-1); test.add(-1);
x.add(-1); x.add(-1);
for (int i = 0; i < 10000000; i++) { for (int i = 0; i < 2000000; i++) {
Integer value = i; Integer value = Integer.valueOf(i & 63);
if (stock) { if (stock) {
synchronized (x) { synchronized (x) {
Integer f = x.peekLast(); Integer f = x.peekLast();
if (!f.equals(value)) { if (f != value) {
x.add(i); x.add(i);
} }
} }
Math.sin(i);
synchronized (x) { synchronized (x) {
if (x.peek() != x.peekLast()) { if (x.peekFirst() != x.peekLast()) {
x.peek();
x.removeFirst(); x.removeFirst();
} }
} }
} else { } else {
Integer f = test.peekLast(); Integer f = test.peekLast();
if (!f.equals(value)) { if (f != value) {
test.add(i); test.add(i);
} }
if (test.peekFirst() != test.peekLast()) { Math.sin(i);
f = test.peekFirst(); f = test.peekFirst();
if (f != test.peekLast()) {
test.removeFirst(f); test.removeFirst(f);
} }
} }
...@@ -97,14 +107,40 @@ public class TestConcurrentLinkedList extends TestBase { ...@@ -97,14 +107,40 @@ public class TestConcurrentLinkedList extends TestBase {
} }
private void testConcurrent() { private void testConcurrent() {
// TODO Auto-generated method stub // final ConcurrentLinkedList<Integer> test = new ConcurrentLinkedList<Integer>();
final ConcurrentRing<Integer> test = new ConcurrentRing<Integer>();
final AtomicInteger counter = new AtomicInteger();
final AtomicInteger size = new AtomicInteger();
Task task = new Task() {
@Override
public void call() {
while (!stop) {
if (size.get() < 10) {
test.add(counter.getAndIncrement());
size.getAndIncrement();
}
}
}
};
task.execute();
for (int i = 0; i < 1000000;) {
Integer x = test.peekFirst();
if (x == null) {
continue;
}
assertEquals(i, x.intValue());
if (test.removeFirst(x)) {
size.getAndDecrement();
i++;
}
}
task.get();
} }
private void testRandomized() { private void testRandomized() {
Random r = new Random(0); Random r = new Random(0);
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
ConcurrentLinkedList<Integer> test = new ConcurrentLinkedList<Integer>(); ConcurrentRing<Integer> test = new ConcurrentRing<Integer>();
LinkedList<Integer> x = new LinkedList<Integer>(); LinkedList<Integer> x = new LinkedList<Integer>();
StringBuilder buff = new StringBuilder(); StringBuilder buff = new StringBuilder();
for (int j = 0; j < 10000; j++) { for (int j = 0; j < 10000; j++) {
...@@ -120,7 +156,7 @@ public class TestConcurrentLinkedList extends TestBase { ...@@ -120,7 +156,7 @@ public class TestConcurrentLinkedList extends TestBase {
} }
case 1: { case 1: {
Integer value = x.peek(); Integer value = x.peek();
if (value != null && r.nextBoolean()) { if (value != null && (x.size() > 5 || r.nextBoolean())) {
buff.append("removeFirst\n"); buff.append("removeFirst\n");
x.removeFirst(); x.removeFirst();
test.removeFirst(value); test.removeFirst(value);
...@@ -132,7 +168,7 @@ public class TestConcurrentLinkedList extends TestBase { ...@@ -132,7 +168,7 @@ public class TestConcurrentLinkedList extends TestBase {
} }
case 2: { case 2: {
Integer value = x.peekLast(); Integer value = x.peekLast();
if (value != null && r.nextBoolean()) { if (value != null && (x.size() > 5 || r.nextBoolean())) {
buff.append("removeLast\n"); buff.append("removeLast\n");
x.removeLast(); x.removeLast();
test.removeLast(value); test.removeLast(value);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论