提交 53549619 authored 作者: Thomas Mueller's avatar Thomas Mueller

A persistent multi-version map: a utility to store and read streams

上级 ada103af
......@@ -111,6 +111,7 @@ import org.h2.test.store.TestDataUtils;
import org.h2.test.store.TestMVStore;
import org.h2.test.store.TestMVRTree;
import org.h2.test.store.TestObjectType;
import org.h2.test.store.TestStreamStore;
import org.h2.test.synth.TestBtreeIndex;
import org.h2.test.synth.TestCrashAPI;
import org.h2.test.synth.TestDiskFull;
......@@ -677,6 +678,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
new TestMVRTree().runTest(this);
new TestMVStore().runTest(this);
new TestObjectType().runTest(this);
new TestStreamStore().runTest(this);
// unit
new TestAutoReconnect().runTest(this);
......
......@@ -5,6 +5,8 @@
*/
package org.h2.test.store;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import org.h2.dev.store.btree.DataUtils;
......@@ -96,13 +98,31 @@ public class TestDataUtils extends TestBase {
buff.rewind();
assertEquals(-1, DataUtils.readVarLong(buff));
assertEquals(10, buff.position());
buff.clear();
testVarIntVarLong(buff, DataUtils.COMPRESSED_VAR_INT_MAX);
testVarIntVarLong(buff, DataUtils.COMPRESSED_VAR_INT_MAX + 1);
testVarIntVarLong(buff, DataUtils.COMPRESSED_VAR_LONG_MAX);
testVarIntVarLong(buff, DataUtils.COMPRESSED_VAR_LONG_MAX + 1);
}
private void testVarIntVarLong(ByteBuffer buff, long x) {
int len;
byte[] data;
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataUtils.writeVarLong(out, x);
data = out.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
DataUtils.writeVarLong(buff, x);
len = buff.position();
assertEquals(data.length, len);
byte[] data2 = new byte[len];
buff.position(0);
buff.get(data2);
assertEquals(data2, data);
buff.flip();
long y = DataUtils.readVarLong(buff);
assertEquals(y, x);
......@@ -111,8 +131,20 @@ public class TestDataUtils extends TestBase {
buff.clear();
int intX = (int) x;
try {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataUtils.writeVarInt(out, intX);
data = out.toByteArray();
} catch (IOException e) {
throw new RuntimeException(e);
}
DataUtils.writeVarInt(buff, intX);
len = buff.position();
assertEquals(data.length, len);
data2 = new byte[len];
buff.position(0);
buff.get(data2);
assertEquals(data2, data);
buff.flip();
int intY = DataUtils.readVarInt(buff);
assertEquals(intY, intX);
......
/*
* Copyright 2004-2011 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.test.store;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map;
import java.util.Random;
import org.h2.dev.store.btree.StreamStore;
import org.h2.test.TestBase;
import org.h2.util.IOUtils;
import org.h2.util.New;
/**
* Test the stream store.
*/
public class TestStreamStore extends TestBase {
/**
* Run just this test.
*
* @param a ignored
*/
public static void main(String... a) throws Exception {
TestBase.createCaller().init().test();
}
@Override
public void test() throws IOException {
testWithExistingData();
testSmall();
}
private void testWithExistingData() throws IOException {
Map<Long, byte[]> map = New.hashMap();
StreamStore store = new StreamStore(map);
store.setMinBlockSize(10);
store.setMaxBlockSize(20);
store.setNextKey(0);
for (int i = 0; i < 10; i++) {
store.put(new ByteArrayInputStream(new byte[20]));
}
assertEquals(10, map.size());
for (int i = 0; i < 10; i++) {
map.containsKey(i);
}
store = new StreamStore(map);
store.setMinBlockSize(10);
store.setMaxBlockSize(20);
store.setNextKey(0);
for (int i = 0; i < 10; i++) {
store.put(new ByteArrayInputStream(new byte[20]));
}
assertEquals(20, map.size());
for (int i = 0; i < 20; i++) {
map.containsKey(i);
}
}
private void testSmall() throws IOException {
Map<Long, byte[]> map = New.hashMap();
StreamStore store = new StreamStore(map);
assertEquals(256 * 1024, store.getMaxBlockSize());
assertEquals(256, store.getMinBlockSize());
store.setNextKey(0);
for (int i = 0; i < 20; i++) {
test(store, 0, 128, i);
test(store, 10, 128, i);
}
for (int i = 20; i < 200; i += 10) {
test(store, 0, 128, i);
test(store, 10, 128, i);
}
test(store, 10, 20, 1000);
}
private void test(StreamStore store, int minBlockSize, int maxBlockSize, int length) throws IOException {
store.setMinBlockSize(minBlockSize);
assertEquals(minBlockSize, store.getMinBlockSize());
store.setMaxBlockSize(maxBlockSize);
assertEquals(maxBlockSize, store.getMaxBlockSize());
long next = store.getNextKey();
Random r = new Random(length);
byte[] data = new byte[length];
r.nextBytes(data);
byte[] id = store.put(new ByteArrayInputStream(data));
if (length > 0 && length >= minBlockSize) {
assertFalse(store.isInPlace(id));
} else {
assertTrue(store.isInPlace(id));
}
long next2 = store.getNextKey();
if (length > 0 && length >= minBlockSize) {
assertTrue(next2 > next);
} else {
assertEquals(next, next2);
}
if (length == 0) {
assertEquals(0, id.length);
}
assertEquals(length, store.length(id));
InputStream in = store.get(id);
ByteArrayOutputStream out = new ByteArrayOutputStream();
IOUtils.copy(in, out);
assertEquals(data, out.toByteArray());
in = store.get(id);
assertEquals(0, in.skip(0));
if (length > 0) {
assertEquals(1, in.skip(1));
if (length > 1) {
assertEquals(data[1] & 255, in.read());
if (length > 2) {
assertEquals(1, in.skip(1));
if (length > 3) {
assertEquals(data[3] & 255, in.read());
}
} else {
assertEquals(0, in.skip(1));
}
} else {
assertEquals(-1, in.read());
}
} else {
assertEquals(0, in.skip(1));
}
if (length > 12) {
in = store.get(id);
assertEquals(12, in.skip(12));
assertEquals(data[12] & 255, in.read());
}
store.remove(id);
assertEquals(0, store.getMap().size());
}
}
......@@ -7,6 +7,7 @@
package org.h2.dev.store.btree;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
......@@ -150,7 +151,21 @@ public class DataUtils {
/**
* Write a variable size int.
*
* @param buff the target buffer
* @param out the output stream
* @param x the value
*/
public static void writeVarInt(OutputStream out, int x) throws IOException {
while ((x & ~0x7f) != 0) {
out.write((byte) (0x80 | (x & 0x7f)));
x >>>= 7;
}
out.write((byte) x);
}
/**
* Write a variable size int.
*
* @param buff the source buffer
* @param x the value
*/
public static void writeVarInt(ByteBuffer buff, int x) {
......@@ -207,7 +222,7 @@ public class DataUtils {
}
/**
* Write a variable size int.
* Write a variable size long.
*
* @param buff the target buffer
* @param x the value
......@@ -220,6 +235,20 @@ public class DataUtils {
buff.put((byte) x);
}
/**
* Write a variable size long.
*
* @param out the output stream
* @param x the value
*/
public static void writeVarLong(OutputStream out, long x) throws IOException {
while ((x & ~0x7f) != 0) {
out.write((byte) (0x80 | (x & 0x7f)));
x >>>= 7;
}
out.write((byte) x);
}
/**
* Copy the elements of an array, with a gap.
*
......
......@@ -35,10 +35,8 @@ header:
H:3,blockSize=4096,...
TODO:
- how to iterate (just) over deleted pages / entries
- compact: use total max length instead of page count (liveCount)
- support background writes (store old version)
- support large binaries
- support database version / app version
- limited support for writing to old versions (branches)
- atomic test-and-set (when supporting concurrent writes)
......@@ -62,6 +60,9 @@ TODO:
- unified exception handling
- check if locale specific string comparison can make data disappear
- concurrent map; avoid locking during IO (pre-load pages)
- maybe split database into multiple files, to speed up compact operation
- automated 'kill process' and 'power failure' test
- implement table engine for H2
*/
......
/*
* Copyright 2004-2011 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.dev.store.btree;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.h2.util.IOUtils;
/**
* A facility to store streams in a map. Streams are split into blocks, which
* are stored in a map. Very small streams are inlined in the stream id.
* <p>
* The key of the map is a long (incremented for each stored block). The default
* initial value is the current time (milliseconds since 1970 UTC) at the time
* the stream store is created. The next key can also be set explicitly. Before
* storing blocks into the map, the stream store checks if there is already a
* block with the next key, and the key is incremented as necessary (this is a
* log(n) operation similar to binary search).
* <p>
* The format of the binary id is: An empty id represents 0 bytes of data.
* In-place data is encoded as the size (a variable size int), then the data. A
* stored block is encoded as 0, the length of the block (a variable size long),
* the length of the key (a variable size int), then the key. If the key large,
* it is stored itself. This is encoded as -1 (a variable size int), the total
* length (a variable size long), the length of the key (a variable size int),
* and the key that points to the id. Multiple ids can be concatenated to
* concatenate the data.
*/
public class StreamStore {
private final Map<Long, byte[]> map;
private int minBlockSize = 256;
private int maxBlockSize = 256 * 1024;
private final AtomicLong nextKey = new AtomicLong(System.currentTimeMillis());
/**
* Create a stream store instance.
*
* @param map the map to store blocks of data
*/
public StreamStore(Map<Long, byte[]> map) {
this.map = map;
}
public Map<Long, byte[]> getMap() {
return map;
}
public void setNextKey(long nextKey) {
this.nextKey.set(nextKey);
}
public long getNextKey() {
return nextKey.get();
}
public void setMinBlockSize(int minBlockSize) {
this.minBlockSize = minBlockSize;
}
public int getMinBlockSize() {
return minBlockSize;
}
public void setMaxBlockSize(int maxBlockSize) {
this.maxBlockSize = maxBlockSize;
}
public long getMaxBlockSize() {
return maxBlockSize;
}
/**
* Store the stream, and return the id.
*
* @param in the stream
* @return the id (potentially an empty array)
*/
public byte[] put(InputStream in) throws IOException {
ByteArrayOutputStream idStream = new ByteArrayOutputStream();
long length = 0;
while (true) {
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
int len = (int) IOUtils.copy(in, outBuffer, maxBlockSize);
if (len == 0) {
break;
}
boolean eof = len < maxBlockSize;
byte[] data = outBuffer.toByteArray();
ByteArrayOutputStream idBlock = new ByteArrayOutputStream();
if (len < minBlockSize) {
DataUtils.writeVarInt(idBlock, len);
idBlock.write(data);
} else {
long key = writeBlock(data);
DataUtils.writeVarInt(idBlock, 0);
DataUtils.writeVarLong(idBlock, len);
int keyLen = DataUtils.getVarLongLen(key);
DataUtils.writeVarInt(idBlock, keyLen);
DataUtils.writeVarLong(idBlock, key);
}
if (idStream.size() > 0) {
int idSize = idStream.size() + idBlock.size();
if (idSize > maxBlockSize || (eof && idSize > minBlockSize)) {
data = idStream.toByteArray();
idStream.reset();
long key = writeBlock(data);
DataUtils.writeVarInt(idStream, -1);
DataUtils.writeVarLong(idStream, length);
int keyLen = DataUtils.getVarLongLen(key);
DataUtils.writeVarInt(idStream, keyLen);
DataUtils.writeVarLong(idStream, key);
}
}
length += len;
idBlock.writeTo(idStream);
if (eof) {
break;
}
}
return idStream.toByteArray();
}
private long writeBlock(byte[] data) {
long key = getAndIncrementNextKey();
map.put(key, data);
return key;
}
private long getAndIncrementNextKey() {
long key = nextKey.getAndIncrement();
if (!map.containsKey(key)) {
return key;
}
// search the next free id
synchronized (this) {
key = nextKey.getAndIncrement();
// skip 1 at the beginning
long increment = 1;
while (true) {
if (map.containsKey(key + increment)) {
// skip double as many numbers
key += increment + 1;
if (increment < Integer.MAX_VALUE) {
increment += increment;
}
} else {
// already past the end:
// skip half as many numbers
if (increment == 1) {
key++;
break;
}
increment /= 2;
}
}
nextKey.set(key + 1);
return key;
}
}
/**
* Remove all stored blocks for the given id.
*
* @param id the id
*/
public void remove(byte[] id) {
ByteBuffer idBuffer = ByteBuffer.wrap(id);
while (idBuffer.hasRemaining()) {
removeBlock(idBuffer);
}
}
private void removeBlock(ByteBuffer idBuffer) {
int lenInPlace = DataUtils.readVarInt(idBuffer);
if (lenInPlace > 0) {
idBuffer.position(idBuffer.position() + lenInPlace);
return;
}
DataUtils.readVarLong(idBuffer);
int lenId = DataUtils.readVarInt(idBuffer);
byte[] key = new byte[lenId];
idBuffer.get(key);
if (lenInPlace < 0) {
// recurse
remove(readBlock(key));
}
removeBlock(key);
}
private void removeBlock(byte[] key) {
map.remove(getKey(key));
}
/**
* Calculate the number of data bytes for the given id. As the length is
* encoded in the id, this operation does not cause any reads in the map.
*
* @param id the id
* @return the length
*/
public long length(byte[] id) {
ByteBuffer idBuffer = ByteBuffer.wrap(id);
long length = 0;
while (idBuffer.hasRemaining()) {
length += readLength(idBuffer);
}
return length;
}
private static long readLength(ByteBuffer idBuffer) {
int lenInPlace = DataUtils.readVarInt(idBuffer);
if (lenInPlace > 0) {
idBuffer.position(idBuffer.position() + lenInPlace);
return lenInPlace;
}
long len = DataUtils.readVarLong(idBuffer);
int lenId = DataUtils.readVarInt(idBuffer);
idBuffer.position(idBuffer.position() + lenId);
return len;
}
/**
* Check whether the id itself contains all the data. This operation does
* not cause any reads in the map.
*
* @param id the id
* @return if the id contains the data
*/
public boolean isInPlace(byte[] id) {
ByteBuffer idBuffer = ByteBuffer.wrap(id);
while (idBuffer.hasRemaining()) {
if (!isInPlace(idBuffer)) {
return false;
}
}
return true;
}
boolean isInPlace(ByteBuffer idBuffer) {
int lenInPlace = DataUtils.readVarInt(idBuffer);
if (lenInPlace > 0) {
idBuffer.position(idBuffer.position() + lenInPlace);
return true;
}
DataUtils.readVarLong(idBuffer);
int lenId = DataUtils.readVarInt(idBuffer);
idBuffer.position(idBuffer.position() + lenId);
return false;
}
byte[] readBlock(byte[] key) {
return map.get(getKey(key));
}
private static Long getKey(byte[] key) {
int todoRemove;
ByteBuffer buff = ByteBuffer.wrap(key);
return DataUtils.readVarLong(buff);
}
/**
* Open an input stream to read data.
*
* @param id the id
* @return the stream
*/
public InputStream get(byte[] id) {
return new Stream(this, id);
}
/**
* A stream backed by a map.
*/
static class Stream extends InputStream {
private StreamStore store;
private byte[] oneByteBuffer;
private final byte[] id;
private ByteBuffer idBuffer;
private ByteArrayInputStream buffer;
private long skip;
private long length;
private long pos;
Stream(StreamStore store, byte[] id) {
this.store = store;
this.id = id;
this.length = store.length(id);
reset();
}
@Override
public void reset() {
this.idBuffer = ByteBuffer.wrap(id);
}
@Override
public int read() throws IOException {
byte[] buffer = oneByteBuffer;
if (buffer == null) {
buffer = oneByteBuffer = new byte[1];
}
int len = read(buffer, 0, 1);
return len < 0 ? len : (buffer[0] & 255);
}
@Override
public long skip(long n) {
n = Math.min(length - pos, n);
skip += n;
pos += n;
if (buffer != null) {
return buffer.skip(n);
}
return n;
}
@Override
public void close() {
buffer = null;
idBuffer = null;
store = null;
}
@Override
public int read(byte[] b, int off, int len) {
while (true) {
if (buffer == null) {
if (store == null) {
return -1;
}
buffer = nextBuffer();
if (buffer == null) {
return -1;
}
}
int result = buffer.read(b, off, len);
if (result > 0) {
pos += result;
if (pos >= length) {
close();
}
return result;
}
buffer = null;
}
}
private ByteArrayInputStream nextBuffer() {
while (idBuffer.hasRemaining()) {
int lenInPlace = DataUtils.readVarInt(idBuffer);
if (lenInPlace > 0) {
if (skip >= lenInPlace) {
skip -= lenInPlace;
idBuffer.position(idBuffer.position() + lenInPlace);
continue;
}
int p = (int) (idBuffer.position() + skip);
int l = (int) (lenInPlace - skip);
idBuffer.position(p + l);
return new ByteArrayInputStream(idBuffer.array(), p, l);
}
long length = DataUtils.readVarLong(idBuffer);
int lenId = DataUtils.readVarInt(idBuffer);
if (skip >= length) {
skip -= length;
idBuffer.position(idBuffer.position() + lenId);
continue;
}
byte[] key = new byte[lenId];
idBuffer.get(key);
if (lenInPlace < 0) {
byte[] k = store.readBlock(key);
ByteBuffer newBuffer = ByteBuffer.allocate(k.length + idBuffer.limit() - idBuffer.position());
newBuffer.put(k);
newBuffer.put(idBuffer);
newBuffer.flip();
idBuffer = newBuffer;
return nextBuffer();
}
byte[] data = store.readBlock(key);
int s = (int) skip;
skip = 0;
return new ByteArrayInputStream(data, s, data.length - s);
}
close();
return null;
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论