提交 9632595d authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: use a write buffer instead of a ByteBuffer

上级 ba877c18
......@@ -120,7 +120,7 @@ public class Chunk {
*
* @param buff the target buffer
*/
void writeHeader(ByteBuffer buff) {
void writeHeader(WriteBuffer buff) {
buff.put((byte) 'c');
buff.putInt(length);
buff.putInt(id);
......
......@@ -181,7 +181,7 @@ public class MVStore {
private HashMap<String, String> storeHeader = New.hashMap();
private ByteBuffer writeBuffer;
private WriteBuffer writeBuffer;
private int lastMapId;
......@@ -269,12 +269,14 @@ public class MVStore {
boolean readOnly = config.containsKey("readOnly");
o = config.get("cacheSize");
int mb = o == null ? 16 : (Integer) o;
int maxMemoryBytes = mb * 1024 * 1024;
int averageMemory = Math.max(10, pageSplitSize / 2);
int segmentCount = 16;
int stackMoveDistance = maxMemoryBytes / averageMemory * 2 / 100;
cache = new CacheLongKeyLIRS<Page>(
maxMemoryBytes, averageMemory, segmentCount, stackMoveDistance);
if (mb > 0) {
int maxMemoryBytes = mb * 1024 * 1024;
int averageMemory = Math.max(10, pageSplitSize / 2);
int segmentCount = 16;
int stackMoveDistance = maxMemoryBytes / averageMemory * 2 / 100;
cache = new CacheLongKeyLIRS<Page>(
maxMemoryBytes, averageMemory, segmentCount, stackMoveDistance);
}
o = config.get("writeBufferSize");
mb = o == null ? 4 : (Integer) o;
int writeBufferSize = mb * 1024 * 1024;
......@@ -519,6 +521,11 @@ public class MVStore {
private void readMeta() {
chunks.clear();
Chunk header = readChunkHeader(rootChunkStart);
if (header.start == Long.MAX_VALUE) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Chunk {0} is invalid", header.id);
}
lastChunkId = header.id;
chunks.put(header.id, header);
meta.setRootPos(header.metaRootPos, -1);
......@@ -543,6 +550,11 @@ public class MVStore {
s = meta.get(s);
Chunk c = Chunk.fromString(s);
if (!chunks.containsKey(c.id)) {
if (c.start == Long.MAX_VALUE) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Chunk {0} is invalid", c.id);
}
chunks.put(c.id, c);
}
}
......@@ -731,10 +743,14 @@ public class MVStore {
if (s == null) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Chunk {0} not found",
DataUtils.getPageChunkId(pos));
"Chunk {0} not found", chunkId);
}
c = Chunk.fromString(s);
if (c.start == Long.MAX_VALUE) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT,
"Chunk {0} is invalid", chunkId);
}
chunks.put(c.id, c);
}
return c;
......@@ -892,7 +908,7 @@ public class MVStore {
}
}
Set<Chunk> removedChunks = applyFreedSpace(storeVersion, time);
ByteBuffer buff = getWriteBuffer();
WriteBuffer buff = getWriteBuffer();
// need to patch the header later
c.writeHeader(buff);
c.maxLength = 0;
......@@ -900,28 +916,24 @@ public class MVStore {
for (MVMap<?, ?> m : changed) {
Page p = m.getRoot();
if (p.getTotalCount() > 0) {
buff = p.writeUnsavedRecursive(c, buff);
p.writeUnsavedRecursive(c, buff);
long root = p.getPos();
meta.put("root." + m.getId(), "" + root);
}
}
meta.put("chunk." + c.id, c.asString());
meta.setWriteVersion(version);
// this will (again) modify maxLengthLive, but
// the correct value is written in the chunk header
Page metaRoot = meta.getRoot();
buff = metaRoot.writeUnsavedRecursive(c, buff);
metaRoot.writeUnsavedRecursive(c, buff);
int chunkLength = buff.position();
// round to the next block,
// and one additional block for the store header
int length = MathUtils.roundUpInt(chunkLength, BLOCK_SIZE) + BLOCK_SIZE;
if (length > buff.capacity()) {
buff = DataUtils.ensureCapacity(buff, length - buff.capacity());
}
buff.limit(length);
// free up the space of unused chunks now
......@@ -939,6 +951,14 @@ public class MVStore {
fileStore.markUsed(end, length);
}
boolean storeAtEndOfFile = filePos + length >= end;
// try {
// Thread.sleep(10);
// } catch (InterruptedException e) {
// ; int todo;
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
c.start = filePos;
c.length = chunkLength;
......@@ -955,7 +975,7 @@ public class MVStore {
buff.put(new byte[BLOCK_SIZE - header.length]);
buff.position(0);
fileStore.writeFully(filePos, buff);
fileStore.writeFully(filePos, buff.getBuffer());
releaseWriteBuffer(buff);
......@@ -975,6 +995,7 @@ public class MVStore {
// some pages might have been changed in the meantime (in the newest version)
unsavedPageCount = Math.max(0, unsavedPageCount - currentUnsavedPageCount);
if (!temp) {
metaChanged = false;
lastStoredVersion = storeVersion;
......@@ -988,13 +1009,13 @@ public class MVStore {
*
* @return the buffer
*/
private ByteBuffer getWriteBuffer() {
ByteBuffer buff;
private WriteBuffer getWriteBuffer() {
WriteBuffer buff;
if (writeBuffer != null) {
buff = writeBuffer;
buff.clear();
} else {
buff = ByteBuffer.allocate(1024 * 1024);
buff = new WriteBuffer();
}
return buff;
}
......@@ -1005,7 +1026,7 @@ public class MVStore {
*
* @param buff the buffer than can be re-used
*/
private void releaseWriteBuffer(ByteBuffer buff) {
private void releaseWriteBuffer(WriteBuffer buff) {
if (buff.capacity() <= 4 * 1024 * 1024) {
writeBuffer = buff;
}
......@@ -1123,9 +1144,6 @@ public class MVStore {
private long getEndPosition() {
long size = 2 * BLOCK_SIZE;
for (Chunk c : chunks.values()) {
if (c.start == Long.MAX_VALUE) {
continue;
}
long x = c.start + c.length;
size = Math.max(size, MathUtils.roundUpLong(x, BLOCK_SIZE) + BLOCK_SIZE);
}
......@@ -1200,9 +1218,8 @@ public class MVStore {
}
}
for (Chunk c : move) {
ByteBuffer buff = getWriteBuffer();
WriteBuffer buff = getWriteBuffer();
int length = MathUtils.roundUpInt(c.length, BLOCK_SIZE) + BLOCK_SIZE;
buff = DataUtils.ensureCapacity(buff, length);
buff.limit(length);
ByteBuffer buff2 = fileStore.readFully(c.start, length);
buff.put(buff2);
......@@ -1218,7 +1235,7 @@ public class MVStore {
// fill the header with zeroes
buff.put(new byte[BLOCK_SIZE - header.length]);
buff.position(0);
fileStore.writeFully(end, buff);
fileStore.writeFully(end, buff.getBuffer());
releaseWriteBuffer(buff);
meta.put("chunk." + c.id, c.asString());
}
......@@ -1238,9 +1255,8 @@ public class MVStore {
// previous store operation
continue;
}
ByteBuffer buff = getWriteBuffer();
WriteBuffer buff = getWriteBuffer();
int length = MathUtils.roundUpInt(c.length, BLOCK_SIZE) + BLOCK_SIZE;
buff = DataUtils.ensureCapacity(buff, length);
buff.limit(length);
ByteBuffer buff2 = fileStore.readFully(c.start, length);
buff.put(buff2);
......@@ -1255,7 +1271,7 @@ public class MVStore {
// fill the header with zeroes
buff.put(new byte[BLOCK_SIZE - header.length]);
buff.position(0);
fileStore.writeFully(pos, buff);
fileStore.writeFully(pos, buff.getBuffer());
releaseWriteBuffer(buff);
meta.put("chunk." + c.id, c.asString());
}
......@@ -1432,7 +1448,7 @@ public class MVStore {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT, "Position 0");
}
Page p = cache.get(pos);
Page p = cache == null ? null : cache.get(pos);
if (p == null) {
Chunk c = getChunk(pos);
long filePos = c.start;
......@@ -1442,7 +1458,9 @@ public class MVStore {
DataUtils.ERROR_FILE_CORRUPT, "Negative position {0}", filePos);
}
p = Page.read(fileStore, map, pos, filePos, fileStore.size());
cache.put(pos, p, p.getMemory());
if (cache != null) {
cache.put(pos, p, p.getMemory());
}
}
return p;
}
......@@ -1466,7 +1484,9 @@ public class MVStore {
// This could result in a cache miss if the operation is rolled back,
// but we don't optimize for rollback.
// We could also keep the page in the cache, as somebody could read it.
cache.remove(pos);
if (cache != null) {
cache.remove(pos);
}
Chunk c = getChunk(pos);
long version = currentVersion;
......@@ -1800,6 +1820,11 @@ public class MVStore {
}
}
// rollback might have rolled back the stored chunk metadata as well
Chunk c = chunks.get(lastChunkId - 1);
if (c != null) {
meta.put("chunk." + c.id, c.asString());
}
currentVersion = version;
setWriteVersion(version);
lastCommittedVersion = version;
......
......@@ -770,35 +770,33 @@ public class Page {
*
* @param chunk the chunk
* @param buff the target buffer
* @return the target buffer
*/
private ByteBuffer write(Chunk chunk, ByteBuffer buff) {
buff = DataUtils.ensureCapacity(buff, 1024);
private void write(Chunk chunk, WriteBuffer buff) {
int start = buff.position();
buff.putInt(0);
buff.putShort((byte) 0);
DataUtils.writeVarInt(buff, map.getId());
buff.writeVarInt(map.getId());
int len = keyCount;
DataUtils.writeVarInt(buff, len);
buff.writeVarInt(len);
int type = children != null ? DataUtils.PAGE_TYPE_NODE
: DataUtils.PAGE_TYPE_LEAF;
buff.put((byte) type);
int compressStart = buff.position();
DataType keyType = map.getKeyType();
for (int i = 0; i < len; i++) {
buff = keyType.write(buff, keys[i]);
keyType.write(buff, keys[i]);
}
if (type == DataUtils.PAGE_TYPE_NODE) {
for (int i = 0; i <= len; i++) {
buff.putLong(children[i]);
}
for (int i = 0; i <= len; i++) {
DataUtils.writeVarLong(buff, counts[i]);
buff.writeVarLong(counts[i]);
}
} else {
DataType valueType = map.getValueType();
for (int i = 0; i < len; i++) {
buff = valueType.write(buff, values[i]);
valueType.write(buff, values[i]);
}
}
if (map.getStore().getCompress()) {
......@@ -812,7 +810,7 @@ public class Page {
if (compLen + DataUtils.getVarIntLen(compLen - expLen) < expLen) {
buff.position(compressStart - 1);
buff.put((byte) (type + DataUtils.PAGE_COMPRESSED));
DataUtils.writeVarInt(buff, expLen - compLen);
buff.writeVarInt(expLen - compLen);
buff.put(comp, 0, compLen);
}
}
......@@ -833,7 +831,6 @@ public class Page {
chunk.maxLengthLive += max;
chunk.pageCount++;
chunk.pageCountLive++;
return buff;
}
/**
......@@ -842,24 +839,23 @@ public class Page {
*
* @param chunk the chunk
* @param buff the target buffer
* @return the target buffer
*/
ByteBuffer writeUnsavedRecursive(Chunk chunk, ByteBuffer buff) {
void writeUnsavedRecursive(Chunk chunk, WriteBuffer buff) {
if (pos != 0) {
// already stored before
return buff;
return;
}
if (!isLeaf()) {
int len = children.length;
for (int i = 0; i < len; i++) {
Page p = childrenPages[i];
if (p != null) {
buff = p.writeUnsavedRecursive(chunk, buff);
p.writeUnsavedRecursive(chunk, buff);
children[i] = p.getPos();
}
}
}
return write(chunk, buff);
write(chunk, buff);
}
/**
......
/*
* Copyright 2004-2013 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.mvstore;
import java.nio.ByteBuffer;
/**
* An auto-resize buffer to write data into a ByteBuffer.
*/
public class WriteBuffer {
private static final int MAX_REUSE_LIMIT = 4 * 1024 * 1024;
/**
* The maximum byte to grow a buffer at a time.
*/
private static final int MAX_GROW = 4 * 1024 * 1024;
private ByteBuffer reuse = ByteBuffer.allocate(512 * 1024);
private ByteBuffer buff = reuse;
public void writeVarInt(int x) {
DataUtils.writeVarInt(ensureCapacity(5), x);
}
public void writeVarLong(long x) {
DataUtils.writeVarLong(ensureCapacity(10), x);
}
public void writeStringData(String s, int len) {
ByteBuffer b = ensureCapacity(3 * len);
for (int i = 0; i < len; i++) {
int c = s.charAt(i);
if (c < 0x80) {
b.put((byte) c);
} else if (c >= 0x800) {
b.put((byte) (0xe0 | (c >> 12)));
b.put((byte) (((c >> 6) & 0x3f)));
b.put((byte) (c & 0x3f));
} else {
b.put((byte) (0xc0 | (c >> 6)));
b.put((byte) (c & 0x3f));
}
}
}
public void put(byte x) {
ensureCapacity(1).put(x);
}
public void putChar(char x) {
ensureCapacity(2).putChar(x);
}
public void putShort(short x) {
ensureCapacity(2).putShort(x);
}
public void putInt(int x) {
ensureCapacity(4).putInt(x);
}
public void putLong(long x) {
ensureCapacity(8).putLong(x);
}
public void putFloat(float x) {
ensureCapacity(4).putFloat(x);
}
public void putDouble(double x) {
ensureCapacity(8).putDouble(x);
}
public void put(byte[] bytes) {
ensureCapacity(bytes.length).put(bytes);
}
public void put(byte[] bytes, int offset, int length) {
ensureCapacity(length).put(bytes, offset, length);
}
public void position(int newPosition) {
buff.position(newPosition);
}
public int position() {
return buff.position();
}
public void get(byte[] dst) {
buff.get(dst);
}
public void putInt(int index, int value) {
buff.putInt(index, value);
}
public void putShort(int index, short value) {
buff.putShort(index, value);
}
public void put(ByteBuffer src) {
ensureCapacity(buff.remaining()).put(src);
}
public void limit(int newLimit) {
ensureCapacity(newLimit - buff.position()).limit(newLimit);
}
public int limit() {
return buff.limit();
}
public int capacity() {
return buff.capacity();
}
public ByteBuffer getBuffer() {
return buff;
}
/**
* Clear the buffer after use.
*/
void clear() {
if (buff.limit() > MAX_REUSE_LIMIT) {
buff = reuse;
}
buff.clear();
}
private ByteBuffer ensureCapacity(int len) {
if (buff.remaining() < len) {
grow(len);
}
return buff;
}
private void grow(int len) {
ByteBuffer temp = buff;
len = temp.remaining() + len;
int capacity = temp.capacity();
len = Math.max(len, Math.min(capacity + MAX_GROW, capacity * 2));
buff = ByteBuffer.allocate(len);
temp.flip();
buff.put(temp);
if (len <= MAX_REUSE_LIMIT) {
reuse = buff;
}
}
}
......@@ -17,6 +17,7 @@ import org.h2.mvstore.DataUtils;
import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVMap.Builder;
import org.h2.mvstore.MVStore;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.ObjectDataType;
import org.h2.util.New;
......@@ -1324,17 +1325,16 @@ public class TransactionStore {
}
@Override
public ByteBuffer write(ByteBuffer buff, Object obj) {
public void write(WriteBuffer buff, Object obj) {
VersionedValue v = (VersionedValue) obj;
DataUtils.writeVarLong(buff, v.transactionId);
DataUtils.writeVarLong(buff, v.logId);
buff.writeVarLong(v.transactionId);
buff.writeVarLong(v.logId);
if (v.value == null) {
buff.put((byte) 0);
} else {
buff.put((byte) 1);
buff = valueType.write(buff, v.value);
valueType.write(buff, v.value);
}
return buff;
}
@Override
......@@ -1396,7 +1396,7 @@ public class TransactionStore {
}
@Override
public ByteBuffer write(ByteBuffer buff, Object obj) {
public void write(WriteBuffer buff, Object obj) {
Object[] array = (Object[]) obj;
for (int i = 0; i < arrayLength; i++) {
DataType t = elementTypes[i];
......@@ -1405,10 +1405,9 @@ public class TransactionStore {
buff.put((byte) 0);
} else {
buff.put((byte) 1);
buff = t.write(buff, o);
t.write(buff, o);
}
}
return buff;
}
@Override
......
......@@ -9,6 +9,7 @@ package org.h2.mvstore.rtree;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.DataType;
/**
......@@ -53,7 +54,7 @@ public class SpatialDataType implements DataType {
}
@Override
public ByteBuffer write(ByteBuffer buff, Object obj) {
public void write(WriteBuffer buff, Object obj) {
SpatialKey k = (SpatialKey) obj;
int flags = 0;
for (int i = 0; i < dimensions; i++) {
......@@ -61,15 +62,14 @@ public class SpatialDataType implements DataType {
flags |= 1 << i;
}
}
DataUtils.writeVarInt(buff, flags);
buff.writeVarInt(flags);
for (int i = 0; i < dimensions; i++) {
buff.putFloat(k.min(i));
if ((flags & (1 << i)) == 0) {
buff.putFloat(k.max(i));
}
}
DataUtils.writeVarLong(buff, k.getId());
return buff;
buff.writeVarLong(k.getId());
}
@Override
......
......@@ -8,6 +8,8 @@ package org.h2.mvstore.type;
import java.nio.ByteBuffer;
import org.h2.mvstore.WriteBuffer;
/**
* A data type.
*/
......@@ -36,9 +38,8 @@ public interface DataType {
*
* @param buff the target buffer
* @param obj the value
* @return the byte buffer
*/
ByteBuffer write(ByteBuffer buff, Object obj);
void write(WriteBuffer buff, Object obj);
/**
* Read an object.
......
......@@ -8,6 +8,7 @@ package org.h2.mvstore.type;
import java.nio.ByteBuffer;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.WriteBuffer;
/**
* A string type.
......@@ -33,11 +34,11 @@ public class StringDataType implements DataType {
}
@Override
public ByteBuffer write(ByteBuffer buff, Object obj) {
public void write(WriteBuffer buff, Object obj) {
String s = obj.toString();
int len = s.length();
DataUtils.writeVarInt(buff, len);
return DataUtils.writeStringData(buff, s, len);
buff.writeVarInt(len);
buff.writeStringData(s, len);
}
}
......
......@@ -8,6 +8,7 @@ package org.h2.test.store;
import java.nio.ByteBuffer;
import org.h2.mvstore.DataUtils;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.DataType;
/**
......@@ -69,15 +70,13 @@ public class RowDataType implements DataType {
}
@Override
public ByteBuffer write(ByteBuffer buff, Object obj) {
public void write(WriteBuffer buff, Object obj) {
Object[] x = (Object[]) obj;
int len = x.length;
DataUtils.writeVarInt(buff, len);
buff.writeVarInt(len);
for (int i = 0; i < len; i++) {
buff = DataUtils.ensureCapacity(buff, 0);
buff = types[i].write(buff, x[i]);
types[i].write(buff, x[i]);
}
return buff;
}
}
......@@ -13,6 +13,8 @@ import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Random;
import java.util.UUID;
import org.h2.mvstore.WriteBuffer;
import org.h2.mvstore.type.ObjectDataType;
import org.h2.test.TestBase;
......@@ -131,17 +133,18 @@ public class TestObjectDataType extends TestBase {
ot.getMemory(last);
assertEquals(0, ot.compare(x, x));
ByteBuffer buff = ByteBuffer.allocate(1024);
WriteBuffer buff = new WriteBuffer();
ot.getMemory(last);
buff = ot.write(buff, x);
ot.write(buff, x);
buff.put((byte) 123);
buff.flip();
ByteBuffer bb = buff.getBuffer();
bb.flip();
ot.getMemory(last);
Object y = ot.read(buff);
assertEquals(123, buff.get());
assertEquals(0, buff.remaining());
Object y = ot.read(bb);
assertEquals(123, bb.get());
assertEquals(0, bb.remaining());
assertEquals(x.getClass().getName(), y.getClass().getName());
ot.getMemory(last);
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论