提交 3a92ea32 authored 作者: Thomas Mueller's avatar Thomas Mueller

A persistent multi-version map: a utility to store and read streams

上级 b568eadd
......@@ -21,7 +21,7 @@ import org.h2.util.IOUtils;
import org.h2.util.New;
import org.h2.util.StringUtils;
// import org.junit.Test;
//import static org.junit.Assert.*;
// import static org.junit.Assert.*;
/**
* Test the stream store.
......@@ -39,12 +39,62 @@ public class TestStreamStore extends TestBase {
@Override
public void test() throws IOException {
testDetectIllegalId();
testTreeStructure();
testFormat();
testWithExistingData();
testWithFullMap();
testLoop();
}
public void testDetectIllegalId() throws IOException {
Map<Long, byte[]> map = New.hashMap();
StreamStore store = new StreamStore(map);
try {
store.length(new byte[]{3, 0, 0});
fail();
} catch (IllegalArgumentException e) {
// expected
}
try {
store.remove(new byte[]{3, 0, 0});
fail();
} catch (IllegalArgumentException e) {
// expected
}
map.put(0L, new byte[]{3, 0, 0});
InputStream in = store.get(new byte[]{2, 1, 0});
try {
in.read();
fail();
} catch (IllegalArgumentException e) {
// expected
}
}
public void testTreeStructure() throws IOException {
final AtomicInteger reads = new AtomicInteger();
Map<Long, byte[]> map = new HashMap<Long, byte[]>() {
private static final long serialVersionUID = 1L;
public byte[] get(Object k) {
reads.incrementAndGet();
return super.get(k);
}
};
StreamStore store = new StreamStore(map);
store.setMinBlockSize(10);
store.setMaxBlockSize(100);
byte[] id = store.put(new ByteArrayInputStream(new byte[10000]));
InputStream in = store.get(id);
assertEquals(0, in.read());
assertEquals(3, reads.get());
}
public void testFormat() throws IOException {
Map<Long, byte[]> map = New.hashMap();
StreamStore store = new StreamStore(map);
......@@ -55,26 +105,26 @@ public class TestStreamStore extends TestBase {
byte[] id;
id = store.put(new ByteArrayInputStream(new byte[200]));
int todoInefficient;
assertEquals("ffffffff0fb4010287010014028601", StringUtils.convertBytesToHex(id));
assertEquals(200, store.length(id));
assertEquals("02c8018801", StringUtils.convertBytesToHex(id));
id = store.put(new ByteArrayInputStream(new byte[0]));
assertEquals("", StringUtils.convertBytesToHex(id));
id = store.put(new ByteArrayInputStream(new byte[1]));
assertEquals("0100", StringUtils.convertBytesToHex(id));
assertEquals("000100", StringUtils.convertBytesToHex(id));
id = store.put(new ByteArrayInputStream(new byte[3]));
assertEquals("03000000", StringUtils.convertBytesToHex(id));
assertEquals("0003000000", StringUtils.convertBytesToHex(id));
id = store.put(new ByteArrayInputStream(new byte[10]));
assertEquals("000a028801", StringUtils.convertBytesToHex(id));
assertEquals("010a8901", StringUtils.convertBytesToHex(id));
byte[] combined = StringUtils.convertHexToBytes("010a020b0c");
byte[] combined = StringUtils.convertHexToBytes("0001aa0002bbcc");
assertEquals(3, store.length(combined));
InputStream in = store.get(combined);
assertEquals(1, in.skip(1));
assertEquals(0x0b, in.read());
assertEquals(0xbb, in.read());
assertEquals(1, in.skip(1));
}
......@@ -137,10 +187,10 @@ public class TestStreamStore extends TestBase {
};
StreamStore store = new StreamStore(map);
store.setMinBlockSize(10);
store.setMaxBlockSize(20);
store.setMinBlockSize(20);
store.setMaxBlockSize(100);
store.setNextKey(0);
store.put(new ByteArrayInputStream(new byte[20]));
store.put(new ByteArrayInputStream(new byte[100]));
assertEquals(1, map.size());
assertEquals(64, tests.get());
assertEquals(Long.MAX_VALUE / 2 + 1, store.getNextKey());
......@@ -153,6 +203,7 @@ public class TestStreamStore extends TestBase {
assertEquals(256, store.getMinBlockSize());
store.setNextKey(0);
assertEquals(0, store.getNextKey());
test(store, 10, 20, 1000);
for (int i = 0; i < 20; i++) {
test(store, 0, 128, i);
test(store, 10, 128, i);
......@@ -161,7 +212,6 @@ public class TestStreamStore extends TestBase {
test(store, 0, 128, i);
test(store, 10, 128, i);
}
test(store, 10, 20, 1000);
}
private void test(StreamStore store, int minBlockSize, int maxBlockSize, int length) throws IOException {
......
......@@ -25,13 +25,12 @@ import org.h2.util.IOUtils;
* searches the next free entry using a binary search (0 to Long.MAX_VALUE).
* <p>
* The format of the binary id is: An empty id represents 0 bytes of data.
* In-place data is encoded as the size (a variable size int), then the data. A
* stored block is encoded as 0, the length of the block (a variable size long),
* the length of the key (a variable size int), then the key. If the key large,
* it is stored itself. This is encoded as -1 (a variable size int), the total
* length (a variable size long), the length of the key (a variable size int),
* and the key that points to the id. Multiple ids can be concatenated to
* concatenate the data.
* In-place data is encoded as 0, the size (a variable size int), then the data.
* A stored block is encoded as 1, the length of the block (a variable size
* int), then the key (a variable size long). Multiple ids can be concatenated
* to concatenate the data. If the id is large, it is stored itself, which is
* encoded as 2, the total length (a variable size long), and the key of the
* block that contains the id (a variable size long).
*/
public class StreamStore {
......@@ -84,48 +83,64 @@ public class StreamStore {
* @return the id (potentially an empty array)
*/
public byte[] put(InputStream in) throws IOException {
ByteArrayOutputStream idStream = new ByteArrayOutputStream();
long length = 0;
ByteArrayOutputStream id = new ByteArrayOutputStream();
int level = 0;
while (true) {
ByteArrayOutputStream outBuffer = new ByteArrayOutputStream();
int len = (int) IOUtils.copy(in, outBuffer, maxBlockSize);
if (len == 0) {
if (put(id, in, level)) {
break;
}
if (id.size() > maxBlockSize / 2) {
id = putIndirectId(id);
level++;
}
}
if (id.size() > minBlockSize * 2) {
id = putIndirectId(id);
}
return id.toByteArray();
}
private boolean put(ByteArrayOutputStream id, InputStream in, int level) throws IOException {
if (level > 0) {
ByteArrayOutputStream id2 = new ByteArrayOutputStream();
while (true) {
boolean eof = put(id2, in, level - 1);
if (id2.size() > maxBlockSize / 2) {
id2 = putIndirectId(id2);
id2.writeTo(id);
return eof;
} else if (eof) {
id2.writeTo(id);
return true;
}
}
}
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int len = (int) IOUtils.copy(in, buffer, maxBlockSize);
if (len == 0) {
return true;
}
boolean eof = len < maxBlockSize;
byte[] data = outBuffer.toByteArray();
ByteArrayOutputStream idBlock = new ByteArrayOutputStream();
byte[] data = buffer.toByteArray();
if (len < minBlockSize) {
DataUtils.writeVarInt(idBlock, len);
idBlock.write(data);
id.write(0);
DataUtils.writeVarInt(id, len);
id.write(data);
} else {
long key = writeBlock(data);
DataUtils.writeVarInt(idBlock, 0);
DataUtils.writeVarLong(idBlock, len);
int keyLen = DataUtils.getVarLongLen(key);
DataUtils.writeVarInt(idBlock, keyLen);
DataUtils.writeVarLong(idBlock, key);
}
if (idStream.size() > 0) {
int idSize = idStream.size() + idBlock.size();
if (idSize > maxBlockSize || (eof && idSize > minBlockSize)) {
data = idStream.toByteArray();
idStream.reset();
long key = writeBlock(data);
DataUtils.writeVarInt(idStream, -1);
DataUtils.writeVarLong(idStream, length);
int keyLen = DataUtils.getVarLongLen(key);
DataUtils.writeVarInt(idStream, keyLen);
DataUtils.writeVarLong(idStream, key);
id.write(1);
DataUtils.writeVarInt(id, len);
DataUtils.writeVarLong(id, writeBlock(data));
}
return eof;
}
length += len;
idBlock.writeTo(idStream);
if (eof) {
break;
}
}
return idStream.toByteArray();
private ByteArrayOutputStream putIndirectId(ByteArrayOutputStream id) throws IOException {
byte[] data = id.toByteArray();
id = new ByteArrayOutputStream();
id.write(2);
DataUtils.writeVarLong(id, length(data));
DataUtils.writeVarLong(id, writeBlock(data));
return id;
}
private long writeBlock(byte[] data) {
......@@ -164,29 +179,27 @@ public class StreamStore {
public void remove(byte[] id) {
ByteBuffer idBuffer = ByteBuffer.wrap(id);
while (idBuffer.hasRemaining()) {
removeBlock(idBuffer);
}
}
private void removeBlock(ByteBuffer idBuffer) {
int lenInPlace = DataUtils.readVarInt(idBuffer);
if (lenInPlace > 0) {
idBuffer.position(idBuffer.position() + lenInPlace);
return;
}
switch (idBuffer.get()) {
case 0:
int len = DataUtils.readVarInt(idBuffer);
idBuffer.position(idBuffer.position() + len);
break;
case 1:
DataUtils.readVarInt(idBuffer);
long k = DataUtils.readVarLong(idBuffer);
map.remove(k);
break;
case 2:
DataUtils.readVarLong(idBuffer);
int lenId = DataUtils.readVarInt(idBuffer);
byte[] key = new byte[lenId];
idBuffer.get(key);
if (lenInPlace < 0) {
long k2 = DataUtils.readVarLong(idBuffer);
// recurse
remove(readBlock(key));
remove(map.get(k2));
map.remove(k2);
break;
default:
throw new IllegalArgumentException("Unsupported id");
}
removeBlock(key);
}
private void removeBlock(byte[] key) {
map.remove(getKey(key));
}
/**
......@@ -200,21 +213,25 @@ public class StreamStore {
ByteBuffer idBuffer = ByteBuffer.wrap(id);
long length = 0;
while (idBuffer.hasRemaining()) {
length += readLength(idBuffer);
}
return length;
switch (idBuffer.get()) {
case 0:
int len = DataUtils.readVarInt(idBuffer);
idBuffer.position(idBuffer.position() + len);
length += len;
break;
case 1:
length += DataUtils.readVarInt(idBuffer);
DataUtils.readVarLong(idBuffer);
break;
case 2:
length += DataUtils.readVarLong(idBuffer);
DataUtils.readVarLong(idBuffer);
break;
default:
throw new IllegalArgumentException("Unsupported id");
}
private static long readLength(ByteBuffer idBuffer) {
int lenInPlace = DataUtils.readVarInt(idBuffer);
if (lenInPlace > 0) {
idBuffer.position(idBuffer.position() + lenInPlace);
return lenInPlace;
}
long len = DataUtils.readVarLong(idBuffer);
int lenId = DataUtils.readVarInt(idBuffer);
idBuffer.position(idBuffer.position() + lenId);
return len;
return length;
}
/**
......@@ -227,32 +244,15 @@ public class StreamStore {
public boolean isInPlace(byte[] id) {
ByteBuffer idBuffer = ByteBuffer.wrap(id);
while (idBuffer.hasRemaining()) {
if (!isInPlace(idBuffer)) {
if (idBuffer.get() != 0) {
return false;
}
int len = DataUtils.readVarInt(idBuffer);
idBuffer.position(idBuffer.position() + len);
}
return true;
}
private static boolean isInPlace(ByteBuffer idBuffer) {
int lenInPlace = DataUtils.readVarInt(idBuffer);
if (lenInPlace > 0) {
idBuffer.position(idBuffer.position() + lenInPlace);
return true;
}
return false;
}
byte[] readBlock(byte[] key) {
return map.get(getKey(key));
}
private static Long getKey(byte[] key) {
int todoRemove;
ByteBuffer buff = ByteBuffer.wrap(key);
return DataUtils.readVarLong(buff);
}
/**
* Open an input stream to read data.
*
......@@ -263,6 +263,10 @@ public class StreamStore {
return new Stream(this, id);
}
byte[] getBlock(long key) {
return map.get(key);
}
/**
* A stream backed by a map.
*/
......@@ -340,29 +344,39 @@ public class StreamStore {
private ByteArrayInputStream nextBuffer() {
while (idBuffer.hasRemaining()) {
int lenInPlace = DataUtils.readVarInt(idBuffer);
if (lenInPlace > 0) {
if (skip >= lenInPlace) {
skip -= lenInPlace;
idBuffer.position(idBuffer.position() + lenInPlace);
switch (idBuffer.get()) {
case 0: {
int len = DataUtils.readVarInt(idBuffer);
if (skip >= len) {
skip -= len;
idBuffer.position(idBuffer.position() + len);
continue;
}
int p = (int) (idBuffer.position() + skip);
int l = (int) (lenInPlace - skip);
int l = (int) (len - skip);
idBuffer.position(p + l);
return new ByteArrayInputStream(idBuffer.array(), p, l);
}
long length = DataUtils.readVarLong(idBuffer);
int lenId = DataUtils.readVarInt(idBuffer);
if (skip >= length) {
skip -= length;
idBuffer.position(idBuffer.position() + lenId);
case 1: {
int len = DataUtils.readVarInt(idBuffer);
long key = DataUtils.readVarLong(idBuffer);
if (skip >= len) {
skip -= len;
continue;
}
byte[] data = store.getBlock(key);
int s = (int) skip;
skip = 0;
return new ByteArrayInputStream(data, s, data.length - s);
}
case 2: {
long len = DataUtils.readVarInt(idBuffer);
long key = DataUtils.readVarLong(idBuffer);
if (skip >= len) {
skip -= len;
continue;
}
byte[] key = new byte[lenId];
idBuffer.get(key);
if (lenInPlace < 0) {
byte[] k = store.readBlock(key);
byte[] k = store.getBlock(key);
ByteBuffer newBuffer = ByteBuffer.allocate(k.length + idBuffer.limit() - idBuffer.position());
newBuffer.put(k);
newBuffer.put(idBuffer);
......@@ -370,10 +384,9 @@ public class StreamStore {
idBuffer = newBuffer;
return nextBuffer();
}
byte[] data = store.readBlock(key);
int s = (int) skip;
skip = 0;
return new ByteArrayInputStream(data, s, data.length - s);
default:
throw new IllegalArgumentException("Unsupported id");
}
}
return null;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论