提交 47598ca2 authored 作者: Thomas Mueller Graf's avatar Thomas Mueller Graf

MVStore: add feature to set the cache concurrency (benchmark)

上级 4ee7e5fe
......@@ -9,10 +9,17 @@ import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Statement;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVStore;
import org.h2.store.FileLister;
import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase;
import org.h2.util.Profiler;
import org.h2.util.Task;
/**
* Tests performance and helps analyze bottlenecks.
......@@ -30,6 +37,7 @@ public class TestBenchmark extends TestBase {
@Override
public void test() throws Exception {
testConcurrency();
// TODO this test is currently disabled
......@@ -41,6 +49,75 @@ public class TestBenchmark extends TestBase {
test(false);
}
private void testConcurrency() throws Exception {
// String fileName = getBaseDir() + "/" + getTestName();
String fileName = "nioMemFS:/" + getTestName();
FileUtils.delete(fileName);
MVStore store = new MVStore.Builder().cacheSize(16).
fileName(fileName).open();
MVMap<Integer, byte[]> map = store.openMap("test");
byte[] data = new byte[1024];
int count = 1000000;
for (int i = 0; i < count; i++) {
map.put(i, data);
}
store.close();
for (int concurrency = 1024; concurrency > 0; concurrency /= 2) {
testConcurrency(fileName, concurrency, count);
testConcurrency(fileName, concurrency, count);
testConcurrency(fileName, concurrency, count);
}
FileUtils.delete(fileName);
}
private void testConcurrency(String fileName, int concurrency, final int count) throws Exception {
Thread.sleep(1000);
final MVStore store = new MVStore.Builder().cacheSize(256).
cacheConcurrency(concurrency).
fileName(fileName).open();
int threadCount = 128;
final CountDownLatch wait = new CountDownLatch(1);
final AtomicInteger counter = new AtomicInteger();
final AtomicBoolean stopped = new AtomicBoolean();
Task[] tasks = new Task[threadCount];
// Profiler prof = new Profiler().startCollecting();
for (int i = 0; i < threadCount; i++) {
final int x = i;
Task t = new Task() {
@Override
public void call() throws Exception {
MVMap<Integer, byte[]> map = store.openMap("test");
Random random = new Random(x);
wait.await();
while (!stopped.get()) {
int key = random.nextInt(count);
byte[] data = map.get(key);
if (data.length > 1) {
counter.incrementAndGet();
}
}
}
};
t.execute("t" + i);
tasks[i] = t;
}
wait.countDown();
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
stopped.set(true);
for (Task t : tasks) {
t.get();
}
// System.out.println(prof.getTop(5));
String msg = "concurrency " + concurrency + " threads " + threadCount + " requests: " + counter;
System.out.println(msg);
trace(msg);
store.close();
}
private void test(boolean mvStore) throws Exception {
// testInsertSelect(mvStore);
// testBinary(mvStore);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论