提交 5e4ad762 authored 作者: Thomas Mueller's avatar Thomas Mueller

A persistent multi-version map: a utility to store and read streams

上级 c2961aff
......@@ -194,8 +194,13 @@ public class TestAll {
/*
Random test:
PIT test:
java org.pitest.mutationtest.MutationCoverageReport
--reportDir data --targetClasses org.h2.dev.store.btree.StreamStore*
--targetTests org.h2.test.store.TestStreamStore
--sourceDirs src/test,src/tools
Random test:
java15
cd h2database/h2/bin
del *.db
......
......@@ -6,16 +6,22 @@
*/
package org.h2.test.store;
// import static org.junit.Assert.*;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.dev.store.btree.StreamStore;
import org.h2.test.TestBase;
import org.h2.util.IOUtils;
import org.h2.util.New;
import org.h2.util.StringUtils;
import org.junit.Test;
/**
* Test the stream store.
......@@ -33,42 +39,118 @@ public class TestStreamStore extends TestBase {
@Override
public void test() throws IOException {
testFormat();
testWithExistingData();
testSmall();
testWithFullMap();
testLoop();
}
private void testWithExistingData() throws IOException {
@Test
public void testFormat() throws IOException {
Map<Long, byte[]> map = New.hashMap();
StreamStore store = new StreamStore(map);
store.setMinBlockSize(10);
store.setMaxBlockSize(20);
store.setNextKey(123);
byte[] id = store.put(new ByteArrayInputStream(new byte[0]));
assertEquals("", StringUtils.convertBytesToHex(id));
id = store.put(new ByteArrayInputStream(new byte[1]));
assertEquals("0100", StringUtils.convertBytesToHex(id));
id = store.put(new ByteArrayInputStream(new byte[3]));
assertEquals("03000000", StringUtils.convertBytesToHex(id));
id = store.put(new ByteArrayInputStream(new byte[10]));
assertEquals("000a017b", StringUtils.convertBytesToHex(id));
id = store.put(new ByteArrayInputStream(new byte[100]));
int todoInefficient;
assertEquals("ffffffff0f500281010014028001", StringUtils.convertBytesToHex(id));
byte[] combined = StringUtils.convertHexToBytes("010a020b0c");
assertEquals(3, store.length(combined));
InputStream in = store.get(combined);
assertEquals(1, in.skip(1));
assertEquals(0x0b, in.read());
assertEquals(1, in.skip(1));
}
@Test
public void testWithExistingData() throws IOException {
final AtomicInteger tests = new AtomicInteger();
Map<Long, byte[]> map = new HashMap<Long, byte[]>() {
private static final long serialVersionUID = 1L;
public boolean containsKey(Object k) {
tests.incrementAndGet();
return super.containsKey(k);
}
};
StreamStore store = new StreamStore(map);
store.setMinBlockSize(10);
store.setMaxBlockSize(20);
store.setNextKey(0);
for (int i = 0; i < 10; i++) {
store.put(new ByteArrayInputStream(new byte[20]));
}
assertEquals(10, map.size());
assertEquals(10, tests.get());
for (int i = 0; i < 10; i++) {
map.containsKey(i);
}
assertEquals(20, tests.get());
store = new StreamStore(map);
store.setMinBlockSize(10);
store.setMaxBlockSize(20);
store.setNextKey(0);
for (int i = 0; i < 10; i++) {
assertEquals(0, store.getNextKey());
for (int i = 0; i < 5; i++) {
store.put(new ByteArrayInputStream(new byte[20]));
}
assertEquals(20, map.size());
for (int i = 0; i < 20; i++) {
assertEquals(88, tests.get());
assertEquals(15, store.getNextKey());
assertEquals(15, map.size());
for (int i = 0; i < 15; i++) {
map.containsKey(i);
}
}
private void testSmall() throws IOException {
@Test
public void testWithFullMap() throws IOException {
final AtomicInteger tests = new AtomicInteger();
Map<Long, byte[]> map = new HashMap<Long, byte[]>() {
private static final long serialVersionUID = 1L;
public boolean containsKey(Object k) {
tests.incrementAndGet();
if (((Long) k) < Long.MAX_VALUE / 2) {
// simulate a *very* full map
return true;
}
return super.containsKey(k);
}
};
StreamStore store = new StreamStore(map);
store.setMinBlockSize(10);
store.setMaxBlockSize(20);
store.setNextKey(0);
store.put(new ByteArrayInputStream(new byte[20]));
assertEquals(1, map.size());
assertEquals(64, tests.get());
assertEquals(Long.MAX_VALUE / 2 + 1, store.getNextKey());
}
@Test
public void testLoop() throws IOException {
Map<Long, byte[]> map = New.hashMap();
StreamStore store = new StreamStore(map);
assertEquals(256 * 1024, store.getMaxBlockSize());
assertEquals(256, store.getMinBlockSize());
store.setNextKey(0);
assertEquals(0, store.getNextKey());
for (int i = 0; i < 20; i++) {
test(store, 0, 128, i);
test(store, 10, 128, i);
......@@ -110,7 +192,11 @@ public class TestStreamStore extends TestBase {
InputStream in = store.get(id);
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOUtils.copy(in, out);
assertEquals(data, out.toByteArray());
assertTrue(Arrays.equals(data, out.toByteArray()));
in = store.get(id);
in.close();
assertEquals(-1, in.read());
in = store.get(id);
assertEquals(0, in.skip(0));
......@@ -137,6 +223,16 @@ public class TestStreamStore extends TestBase {
in = store.get(id);
assertEquals(12, in.skip(12));
assertEquals(data[12] & 255, in.read());
long skipped = 0;
while (true) {
long s = in.skip(Integer.MAX_VALUE);
if (s == 0) {
break;
}
skipped += s;
}
assertEquals(length - 13, skipped);
assertEquals(-1, in.read());
}
store.remove(id);
......
......@@ -20,11 +20,9 @@ import org.h2.util.IOUtils;
* are stored in a map. Very small streams are inlined in the stream id.
* <p>
* The key of the map is a long (incremented for each stored block). The default
* initial value is the current time (milliseconds since 1970 UTC) at the time
* the stream store is created. The next key can also be set explicitly. Before
* storing blocks into the map, the stream store checks if there is already a
* block with the next key, and the key is incremented as necessary (this is a
* log(n) operation similar to binary search).
* initial value is 0. Before storing blocks into the map, the stream store
* checks if there is already a block with the next key, and if necessary
* searches the next free entry using a binary search (0 to Long.MAX_VALUE).
* <p>
* The format of the binary id is: An empty id represents 0 bytes of data.
* In-place data is encoded as the size (a variable size int), then the data. A
......@@ -40,7 +38,7 @@ public class StreamStore {
private final Map<Long, byte[]> map;
private int minBlockSize = 256;
private int maxBlockSize = 256 * 1024;
private final AtomicLong nextKey = new AtomicLong(System.currentTimeMillis());
private final AtomicLong nextKey = new AtomicLong();
/**
* Create a stream store instance.
......@@ -141,28 +139,18 @@ public class StreamStore {
if (!map.containsKey(key)) {
return key;
}
// search the next free id
// search the next free id using binary search
synchronized (this) {
key = nextKey.getAndIncrement();
// skip 1 at the beginning
long increment = 1;
while (true) {
if (map.containsKey(key + increment)) {
// skip double as many numbers
key += increment + 1;
if (increment < Integer.MAX_VALUE) {
increment += increment;
}
long low = key, high = Long.MAX_VALUE;
while (low < high) {
long x = (low + high) >>> 1;
if (map.containsKey(x)) {
low = x + 1;
} else {
// already past the end:
// skip half as many numbers
if (increment == 1) {
key++;
break;
}
increment /= 2;
high = x;
}
}
key = low;
nextKey.set(key + 1);
return key;
}
......@@ -246,15 +234,12 @@ public class StreamStore {
return true;
}
boolean isInPlace(ByteBuffer idBuffer) {
private static boolean isInPlace(ByteBuffer idBuffer) {
int lenInPlace = DataUtils.readVarInt(idBuffer);
if (lenInPlace > 0) {
idBuffer.position(idBuffer.position() + lenInPlace);
return true;
}
DataUtils.readVarLong(idBuffer);
int lenId = DataUtils.readVarInt(idBuffer);
idBuffer.position(idBuffer.position() + lenId);
return false;
}
......@@ -285,7 +270,6 @@ public class StreamStore {
private StreamStore store;
private byte[] oneByteBuffer;
private final byte[] id;
private ByteBuffer idBuffer;
private ByteArrayInputStream buffer;
private long skip;
......@@ -294,51 +278,52 @@ public class StreamStore {
Stream(StreamStore store, byte[] id) {
this.store = store;
this.id = id;
this.length = store.length(id);
reset();
}
@Override
public void reset() {
this.idBuffer = ByteBuffer.wrap(id);
}
@Override
public int read() throws IOException {
public int read() {
byte[] buffer = oneByteBuffer;
if (buffer == null) {
buffer = oneByteBuffer = new byte[1];
}
int len = read(buffer, 0, 1);
return len < 0 ? len : (buffer[0] & 255);
return len == -1 ? -1 : (buffer[0] & 255);
}
@Override
public long skip(long n) {
n = Math.min(length - pos, n);
skip += n;
pos += n;
if (n == 0) {
return 0;
}
if (buffer != null) {
return buffer.skip(n);
long s = buffer.skip(n);
if (s > 0) {
n = s;
} else {
buffer = null;
skip += n;
}
} else {
skip += n;
}
pos += n;
return n;
}
@Override
public void close() {
buffer = null;
idBuffer = null;
store = null;
idBuffer.position(idBuffer.limit());
pos = length;
}
@Override
public int read(byte[] b, int off, int len) {
while (true) {
if (buffer == null) {
if (store == null) {
return -1;
}
buffer = nextBuffer();
if (buffer == null) {
return -1;
......@@ -347,9 +332,6 @@ public class StreamStore {
int result = buffer.read(b, off, len);
if (result > 0) {
pos += result;
if (pos >= length) {
close();
}
return result;
}
buffer = null;
......@@ -393,7 +375,6 @@ public class StreamStore {
skip = 0;
return new ByteArrayInputStream(data, s, data.length - s);
}
close();
return null;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论