提交 5f87d67d authored 作者: Thomas Mueller's avatar Thomas Mueller

A persistent multi-version map: key navigation; absolute file I/O

上级 bc7a26f6
......@@ -29,6 +29,23 @@ public abstract class FileBase extends FileChannel {
public abstract int write(ByteBuffer src) throws IOException;
public synchronized int read(ByteBuffer dst, long position) throws IOException {
long oldPos = position();
position(position);
int len = read(dst);
position(oldPos);
return len;
}
public synchronized int write(ByteBuffer src, long position) throws IOException {
long oldPos = position();
position(position);
int len = src.remaining();
len = write(src);
position(oldPos);
return len;
}
public abstract FileChannel truncate(long size) throws IOException;
public void force(boolean metaData) throws IOException {
......@@ -47,10 +64,6 @@ public abstract class FileBase extends FileChannel {
throw new UnsupportedOperationException();
}
public int read(ByteBuffer dst, long position) throws IOException {
throw new UnsupportedOperationException();
}
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
throw new UnsupportedOperationException();
}
......@@ -68,10 +81,6 @@ public abstract class FileBase extends FileChannel {
throw new UnsupportedOperationException();
}
public int write(ByteBuffer src, long position) throws IOException {
throw new UnsupportedOperationException();
}
public long write(ByteBuffer[] srcs, int offset, int length) throws IOException {
throw new UnsupportedOperationException();
}
......
......@@ -63,6 +63,14 @@ class FileNio extends FileBase {
return this;
}
public int read(ByteBuffer dst, long position) throws IOException {
return channel.read(dst, position);
}
public int write(ByteBuffer src, long position) throws IOException {
return channel.write(src, position);
}
public FileChannel truncate(long newLength) throws IOException {
try {
channel.truncate(newLength);
......
......@@ -31,6 +31,7 @@ public class TestMVStore extends TestBase {
}
public void test() throws InterruptedException {
testMinMaxNextKey();
testSetting();
testIterateOldVersion();
testObjects();
......@@ -55,6 +56,68 @@ public class TestMVStore extends TestBase {
testSimple();
}
private void testMinMaxNextKey() {
MVStore s = openStore(null);
MVMap<Integer, Integer> map = s.openMap("test");
map.put(10, 100);
map.put(20, 200);
assertEquals(10, map.firstKey().intValue());
assertEquals(20, map.lastKey().intValue());
assertEquals(20, map.ceilingKey(15).intValue());
assertEquals(20, map.ceilingKey(20).intValue());
assertEquals(10, map.floorKey(15).intValue());
assertEquals(10, map.floorKey(10).intValue());
assertEquals(20, map.higherKey(10).intValue());
assertEquals(10, map.lowerKey(20).intValue());
for (int i = 3; i < 20; i++) {
s = openStore(null);
s.setMaxPageSize(4);
map = s.openMap("test");
for (int j = 3; j < i; j++) {
map.put(j * 2, j * 20);
}
if (i == 3) {
assertNull(map.firstKey());
assertNull(map.lastKey());
} else {
assertEquals(6, map.firstKey().intValue());
int max = (i - 1) * 2;
assertEquals(max, map.lastKey().intValue());
for (int j = 0; j < i * 2 + 2; j++) {
if (j > max) {
assertNull(map.ceilingKey(j));
} else {
int ceiling = Math.max((j + 1) / 2 * 2, 6);
assertEquals(ceiling, map.ceilingKey(j).intValue());
}
int floor = Math.min(max, Math.max(j / 2 * 2, 4));
if (floor < 6) {
assertNull(map.floorKey(j));
} else {
map.floorKey(j);
}
int lower = Math.min(max, Math.max((j - 1) / 2 * 2, 4));
if (lower < 6) {
assertNull(map.lowerKey(j));
} else {
assertEquals(lower, map.lowerKey(j).intValue());
}
int higher = Math.max((j + 2) / 2 * 2, 6);
if (higher > max) {
assertNull(map.higherKey(j));
} else {
assertEquals(higher, map.higherKey(j).intValue());
}
}
}
}
}
private void testSetting() {
String fileName = getBaseDir() + "/testSetting.h3";
FileUtils.delete(fileName);
......
......@@ -10,17 +10,16 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Map;
import org.h2.dev.store.cache.CacheLongKeyLIRS;
import org.h2.store.fs.FileBase;
import org.h2.store.fs.FilePathWrapper;
import org.h2.util.SmallLRUCache;
/**
* A file with a read cache.
*/
public class FilePathCache extends FilePathWrapper {
public static FileChannel wrap(FileChannel f) throws IOException {
public static FileChannel wrap(FileChannel f) {
return new FileCache(f);
}
......@@ -39,12 +38,12 @@ public class FilePathCache extends FilePathWrapper {
private static final int CACHE_BLOCK_SIZE = 4 * 1024;
private final FileChannel base;
private long pos, posBase, size;
private final Map<Long, ByteBuffer> cache = SmallLRUCache.newInstance(16);
// 1 MB (256 * 4 * 1024)
private final CacheLongKeyLIRS<ByteBuffer> cache =
CacheLongKeyLIRS.newInstance(256);
FileCache(FileChannel base) throws IOException {
FileCache(FileChannel base) {
this.base = base;
this.size = base.size();
}
protected void implCloseChannel() throws IOException {
......@@ -52,31 +51,29 @@ public class FilePathCache extends FilePathWrapper {
}
public FileChannel position(long newPosition) throws IOException {
this.pos = newPosition;
return this;
throw new UnsupportedOperationException();
// base.position(newPosition);
// return this;
}
public long position() throws IOException {
return pos;
throw new UnsupportedOperationException();
// return base.position();
}
private void positionBase(long pos) throws IOException {
if (posBase != pos) {
base.position(pos);
posBase = pos;
}
public int read(ByteBuffer dst) throws IOException {
throw new UnsupportedOperationException();
// return base.read(dst);
}
public int read(ByteBuffer dst) throws IOException {
long cachePos = getCachePos(pos);
int off = (int) (pos - cachePos);
public synchronized int read(ByteBuffer dst, long position) throws IOException {
long cachePos = getCachePos(position);
int off = (int) (position - cachePos);
int len = CACHE_BLOCK_SIZE - off;
ByteBuffer buff = cache.get(cachePos);
if (buff == null) {
buff = ByteBuffer.allocate(CACHE_BLOCK_SIZE);
positionBase(cachePos);
int read = base.read(buff);
posBase += read;
int read = base.read(buff, cachePos);
if (read == CACHE_BLOCK_SIZE) {
cache.put(cachePos, buff);
} else {
......@@ -89,7 +86,6 @@ public class FilePathCache extends FilePathWrapper {
len = Math.min(len, dst.remaining());
System.arraycopy(buff.array(), off, dst.array(), dst.position(), len);
dst.position(dst.position() + len);
pos += len;
return len;
}
......@@ -98,30 +94,31 @@ public class FilePathCache extends FilePathWrapper {
}
public long size() throws IOException {
return size;
return base.size();
}
public FileChannel truncate(long newSize) throws IOException {
cache.clear();
base.truncate(newSize);
size = Math.min(size, newSize);
pos = Math.min(pos, newSize);
posBase = pos;
return this;
}
public int write(ByteBuffer src) throws IOException {
public int write(ByteBuffer src, long position) throws IOException {
if (cache.size() > 0) {
for (long p = getCachePos(pos), len = src.remaining(); len > 0; p += CACHE_BLOCK_SIZE, len -= CACHE_BLOCK_SIZE) {
int len = src.remaining();
long p = getCachePos(position);
while (len > 0) {
cache.remove(p);
p += CACHE_BLOCK_SIZE;
len -= CACHE_BLOCK_SIZE;
}
}
positionBase(pos);
int len = base.write(src);
posBase += len;
pos += len;
size = Math.max(size, pos);
return len;
return base.write(src, position);
}
public int write(ByteBuffer src) throws IOException {
throw new UnsupportedOperationException();
// return base.write(src);
}
public void force(boolean metaData) throws IOException {
......
......@@ -284,22 +284,38 @@ public class DataUtils {
}
/**
* Read from a file channel until the target buffer is full, or end-of-file
* has been reached.
* Read from a file channel until the buffer is full, or end-of-file
* has been reached. The buffer is rewind at the end.
*
* @param file the file channel
* @param buff the target buffer
* @param dst the byte buffer
*/
public static void readFully(FileChannel file, ByteBuffer buff) throws IOException {
public static void readFully(FileChannel file, long pos, ByteBuffer dst) throws IOException {
do {
int len = file.read(buff);
int len = file.read(dst, pos);
if (len < 0) {
break;
}
} while (buff.remaining() > 0);
buff.rewind();
pos += len;
} while (dst.remaining() > 0);
dst.rewind();
}
/**
* Write to a file channel.
*
* @param file the file channel
* @param src the source buffer
*/
public static void writeFully(FileChannel file, long pos, ByteBuffer src) throws IOException {
int off = 0;
do {
int len = file.write(src, pos + off);
off += len;
} while (src.remaining() > 0);
}
/**
* Convert the length to a length code 0..31. 31 means more than 1 MB.
*
......
......@@ -127,6 +127,130 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return result;
}
/**
* Get the first key, or null if the map is empty.
*
* @return the first key, or null
*/
public K firstKey() {
return getFirstLast(true);
}
/**
* Get the last key, or null if the map is empty.
*
* @return the last key, or null
*/
public K lastKey() {
return getFirstLast(false);
}
@SuppressWarnings("unchecked")
private K getFirstLast(boolean first) {
checkOpen();
Page p = root;
if (size() == 0) {
return null;
}
while (true) {
if (p.isLeaf()) {
return (K) p.getKey(first ? 0 : p.getKeyCount() - 1);
}
p = p.getChildPage(first ? 0 : p.getChildPageCount() - 1);
}
}
/**
* Get the smallest key that is larger than the given key, or null if no
* such key exists.
*
* @param key the key (may not be null)
* @return the result
*/
public K higherKey(K key) {
return getMinMax(key, false, true);
}
/**
* Get the smallest key that is larger or equal to this key.
*
* @param key the key (may not be null)
* @return the result
*/
public K ceilingKey(K key) {
return getMinMax(key, false, false);
}
/**
* Get the largest key that is smaller or equal to this key.
*
* @param key the key (may not be null)
* @return the result
*/
public K floorKey(K key) {
return getMinMax(key, true, false);
}
/**
* Get the largest key that is smaller than the given key, or null if no
* such key exists.
*
* @param key the key (may not be null)
* @return the result
*/
public K lowerKey(K key) {
return getMinMax(key, true, true);
}
private K getMinMax(K key, boolean min, boolean excluding) {
checkOpen();
if (size() == 0) {
return null;
}
return getMinMax(root, key, min, excluding);
}
@SuppressWarnings("unchecked")
private K getMinMax(Page p, K key, boolean min, boolean excluding) {
if (p.isLeaf()) {
if (key == null) {
return (K) p.getKey(min ? 0 : p.getKeyCount() - 1);
}
int x = p.binarySearch(key);
if (x < 0) {
x = -x - (min ? 2 : 1);
} else if (excluding) {
x += min ? -1 : 1;
}
if (x < 0 || x >= p.getKeyCount()) {
return null;
}
return (K) p.getKey(x);
}
int x;
if (key == null) {
x = min ? 0 : p.getKeyCount() - 1;
} else {
x = p.binarySearch(key);
if (x < 0) {
x = -x - 1;
} else {
x++;
}
}
while (true) {
if (x < 0 || x >= p.getChildPageCount()) {
return null;
}
K k = getMinMax(p.getChildPage(x), key, min, excluding);
if (k != null) {
return k;
}
x += min ? -1 : 1;
}
}
/**
* Get a value.
*
......
......@@ -35,10 +35,10 @@ header:
H:3,blockSize=4096,...
TODO:
- implement more counted b-tree (skip, get positions)
- possibly split chunk data into immutable and mutable
- support stores that span multiple files (chunks stored in other files)
- triggers
- implement more counted b-tree (skip, get positions)
- merge pages if small
- r-tree: add missing features (NN search for example)
- compression: maybe hash table reset speeds up compression
......@@ -408,10 +408,9 @@ public class MVStore {
header.put(h.getBytes("UTF-8"));
header.rewind();
writeCount++;
file.position(0);
file.write(header);
file.position(blockSize);
file.write(header);
DataUtils.writeFully(file, 0, header);
header.rewind();
DataUtils.writeFully(file, blockSize, header);
} catch (Exception e) {
throw convert(e);
}
......@@ -421,8 +420,7 @@ public class MVStore {
try {
byte[] headers = new byte[blockSize * 2];
readCount++;
file.position(0);
file.read(ByteBuffer.wrap(headers));
file.read(ByteBuffer.wrap(headers), 0);
String s = new String(headers, 0, blockSize, "UTF-8").trim();
HashMap<String, String> map = DataUtils.parseMap(s);
rootChunkStart = Long.parseLong(map.get("rootChunk"));
......@@ -589,8 +587,7 @@ public class MVStore {
buff.rewind();
try {
writeCount++;
file.position(filePos);
file.write(buff);
DataUtils.writeFully(file, filePos, buff);
} catch (IOException e) {
throw new RuntimeException(e);
}
......@@ -708,9 +705,8 @@ public class MVStore {
private Chunk readChunkHeader(long start) {
try {
readCount++;
file.position(start);
ByteBuffer buff = ByteBuffer.wrap(new byte[32]);
DataUtils.readFully(file, buff);
DataUtils.readFully(file, start, buff);
buff.rewind();
return Chunk.fromHeader(buff, start);
} catch (IOException e) {
......@@ -804,8 +800,7 @@ public class MVStore {
private void copyLive(Chunk chunk, ArrayList<Chunk> old) {
ByteBuffer buff = ByteBuffer.allocate(chunk.length);
try {
file.position(chunk.start);
DataUtils.readFully(file, buff);
DataUtils.readFully(file, chunk.start, buff);
} catch (IOException e) {
throw new RuntimeException(e);
}
......
......@@ -121,15 +121,14 @@ public class Page {
int maxLength = DataUtils.getPageMaxLength(pos), length = maxLength;
ByteBuffer buff;
try {
file.position(filePos);
if (maxLength == Integer.MAX_VALUE) {
buff = ByteBuffer.wrap(new byte[128]);
DataUtils.readFully(file, buff);
DataUtils.readFully(file, filePos, buff);
maxLength = buff.getInt();
file.position(filePos);
//read the first bytes again
}
buff = ByteBuffer.wrap(new byte[length]);
DataUtils.readFully(file, buff);
DataUtils.readFully(file, filePos, buff);
} catch (IOException e) {
throw new RuntimeException(e);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论