提交 443e21c9 authored 作者: Thomas Mueller's avatar Thomas Mueller

A persistent multi-version map: indexed key lookup; improved concurrent test; improved I/O

上级 46d1d669
......@@ -40,8 +40,7 @@ public abstract class FileBase extends FileChannel {
public synchronized int write(ByteBuffer src, long position) throws IOException {
long oldPos = position();
position(position);
int len = src.remaining();
len = write(src);
int len = write(src);
position(oldPos);
return len;
}
......
......@@ -493,7 +493,7 @@ class FileMemData {
if (write) {
changeLength(end);
} else {
return pos;
len = (int) (length - pos);
}
}
while (len > 0) {
......
......@@ -677,8 +677,7 @@ public abstract class TestBase {
return;
} else if (expected == null || actual == null) {
fail("Expected: " + expected + " Actual: " + actual + " " + message);
}
else if (!expected.equals(actual)) {
} else if (!expected.equals(actual)) {
for (int i = 0; i < expected.length(); i++) {
String s = expected.substring(0, i);
if (!actual.startsWith(s)) {
......
......@@ -55,7 +55,8 @@ public class TestConcurrent extends TestMVStore {
t.execute();
for (int k = 0; k < 10000; k++) {
Iterator<Integer> it = map.keyIterator(r.nextInt(len));
long old = s.incrementVersion();
long old = s.getCurrentVersion();
s.incrementVersion();
s.setRetainVersion(old - 100);
while (map.getVersion() == old) {
Thread.yield();
......@@ -88,6 +89,10 @@ public class TestConcurrent extends TestMVStore {
m.remove(rand.nextInt(size));
}
m.get(rand.nextInt(size));
} catch (NegativeArraySizeException e) {
// ignore
} catch (ArrayIndexOutOfBoundsException e) {
// ignore
} catch (NullPointerException e) {
// ignore
}
......@@ -105,6 +110,10 @@ public class TestConcurrent extends TestMVStore {
m.remove(rand.nextInt(size));
}
m.get(rand.nextInt(size));
} catch (NegativeArraySizeException e) {
// ignore
} catch (ArrayIndexOutOfBoundsException e) {
// ignore
} catch (NullPointerException e) {
// ignore
}
......@@ -113,21 +122,6 @@ public class TestConcurrent extends TestMVStore {
Thread.sleep(1);
}
task.get();
// verify the structure is still somewhat usable
for (int x : m.keySet()) {
try {
m.get(x);
} catch (NullPointerException e) {
// ignore
}
}
for (int i = 0; i < size; i++) {
try {
m.get(i);
} catch (NullPointerException e) {
// ignore
}
}
s.close();
}
......
......@@ -10,6 +10,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
import org.h2.dev.store.btree.Cursor;
import org.h2.dev.store.btree.MVMap;
import org.h2.dev.store.btree.MVStore;
import org.h2.store.fs.FileUtils;
......@@ -31,6 +32,7 @@ public class TestMVStore extends TestBase {
}
public void test() throws InterruptedException {
testIndexSkip();
testMinMaxNextKey();
testSetting();
testIterateOldVersion();
......@@ -56,6 +58,53 @@ public class TestMVStore extends TestBase {
testSimple();
}
private void testIndexSkip() {
MVStore s = openStore(null);
s.setMaxPageSize(4);
MVMap<Integer, Integer> map = s.openMap("test");
for (int i = 0; i < 100; i += 2) {
map.put(i, 10 * i);
}
for (int i = -1; i < 100; i++) {
long index = map.getKeyIndex(i);
if (i < 0 || (i % 2) != 0) {
assertEquals(i < 0 ? -1 : -(i / 2) - 2, index);
} else {
assertEquals(i / 2, index);
}
}
for (int i = -1; i < 60; i++) {
Integer k = map.getKey(i);
if (i < 0 || i >= 50) {
assertNull(k);
} else {
assertEquals(i * 2, k.intValue());
}
}
// skip
Cursor<Integer> c = map.keyIterator(0);
assertTrue(c.hasNext());
assertEquals(0, c.next().intValue());
c.skip(0);
assertEquals(2, c.next().intValue());
c.skip(1);
assertEquals(6, c.next().intValue());
c.skip(20);
assertEquals(48, c.next().intValue());
c = map.keyIterator(0);
c.skip(20);
assertEquals(40, c.next().intValue());
c = map.keyIterator(0);
assertEquals(0, c.next().intValue());
assertEquals(12, map.keyList().indexOf(24));
assertEquals(24, map.keyList().get(12).intValue());
assertEquals(-14, map.keyList().indexOf(25));
assertEquals(map.size(), map.keyList().size());
}
private void testMinMaxNextKey() {
MVStore s = openStore(null);
MVMap<Integer, Integer> map = s.openMap("test");
......@@ -300,7 +349,6 @@ public class TestMVStore extends TestBase {
MVStore s;
s = openStore(fileName);
MVMap<String, String> m;
s = openStore(fileName);
m = s.openMap("data", String.class, String.class);
long first = s.getCurrentVersion();
s.incrementVersion();
......
......@@ -12,38 +12,54 @@ import java.util.Iterator;
* A cursor to iterate over elements in ascending order.
*
* @param <K> the key type
* @param <V> the value type
*/
public class Cursor<K, V> implements Iterator<K> {
public class Cursor<K> implements Iterator<K> {
private final MVMap<K, V> map;
private final MVMap<K, ?> map;
private final K from;
private Page root;
private final Page root;
private boolean initialized;
private CursorPos pos;
private K current;
Cursor(MVMap<K, V> map, Page root, K from) {
Cursor(MVMap<K, ?> map, Page root, K from) {
this.map = map;
this.root = root;
this.from = from;
}
public K next() {
hasNext();
K c = current;
fetchNext();
return c;
}
public boolean hasNext() {
if (root != null) {
// initialize
if (!initialized) {
min(root, from);
root = null;
initialized = true;
fetchNext();
}
return current != null;
}
public void skip(long n) {
if (!hasNext()) {
return;
}
if (n < 10) {
while (n-- > 0) {
fetchNext();
}
return;
}
long index = map.getKeyIndex(current);
K k = map.getKey(index + n);
min(root, k);
fetchNext();
}
public void remove() {
throw new UnsupportedOperationException();
}
......
......@@ -6,6 +6,7 @@
*/
package org.h2.dev.store.btree;
import java.io.EOFException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
......@@ -294,7 +295,7 @@ public class DataUtils {
do {
int len = file.read(dst, pos);
if (len < 0) {
break;
throw new EOFException();
}
pos += len;
} while (dst.remaining() > 0);
......
......@@ -6,11 +6,13 @@
*/
package org.h2.dev.store.btree;
import java.util.AbstractList;
import java.util.AbstractMap;
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
......@@ -145,13 +147,118 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return getFirstLast(false);
}
/**
* Get the key at the given index.
* <p>
* This is a O(log(size)) operation.
*
* @param index the index
* @return the key
*/
@SuppressWarnings("unchecked")
private K getFirstLast(boolean first) {
public K getKey(long index) {
checkOpen();
if (index < 0 || index >= size()) {
return null;
}
Page p = root;
long offset = 0;
while (true) {
if (p.isLeaf()) {
if (index >= offset + p.getKeyCount()) {
return null;
}
return (K) p.getKey((int) (index - offset));
}
int i = 0, size = p.getChildPageCount();
for (; i < size; i++) {
long c = p.getCounts(i);
if (index < c + offset) {
break;
}
offset += c;
}
if (i == size) {
return null;
}
p = p.getChildPage(i);
}
}
/**
* Get the key list. The list is a read-only representation of all keys.
* <p>
* The get and indexOf methods are O(log(size)) operations. The result of
* indexOf is cast to an int.
*
* @return the key list
*/
public List<K> keyList() {
return new AbstractList<K>() {
public K get(int index) {
return getKey(index);
}
public int size() {
return MVMap.this.size();
}
@SuppressWarnings("unchecked")
public int indexOf(Object key) {
return (int) getKeyIndex((K) key);
}
};
}
/**
* Get the index of the given key in the map.
* <p>
* This is a O(log(size)) operation.
* <p>
* If the key was found, the returned value is the index in the key array.
* If not found, the returned value is negative, where -1 means the provided
* key is smaller than any keys. See also Arrays.binarySearch.
*
* @param key the key
* @return the index
*/
public long getKeyIndex(K key) {
checkOpen();
if (size() == 0) {
return -1;
}
Page p = root;
long offset = 0;
while (true) {
if (p.isLeaf()) {
int x = p.binarySearch(key);
if (x < 0) {
return -offset + x;
}
return offset + x;
}
int x = key == null ? -1 : p.binarySearch(key);
if (x < 0) {
x = -x - 1;
} else {
x++;
}
for (int i = 0; i < x; i++) {
offset += p.getCounts(i);
}
p = p.getChildPage(x);
}
}
@SuppressWarnings("unchecked")
private K getFirstLast(boolean first) {
checkOpen();
if (size() == 0) {
return null;
}
Page p = root;
while (true) {
if (p.isLeaf()) {
return (K) p.getKey(first ? 0 : p.getKeyCount() - 1);
......@@ -468,12 +575,28 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
Page cOld = p.getChildPage(index);
Page c = cOld.copyOnWrite(writeVersion);
int todoMerge;
// if (c.getKeyCount() < store.getMaxPageSize() / 2) {
// if (p.getChildPageCount() == 1) {
// int todo;
// // replace this node with the child
// } else if (index > 0) {
// int indexSibling = index - 1;
// Page sOld = p.getChildPage(indexSibling);
// merge(cOld, sOld);
// p.remove(indexSibling);
// } else {
// int indexSibling = index + 1;
// Page sOld = p.getChildPage(indexSibling);
// }
// }
long oldCount = c.getTotalCount();
result = remove(c, writeVersion, key);
if (oldCount == c.getTotalCount()) {
return null;
}
// TODO merge if the c key count is below the threshold
if (c.getTotalCount() == 0) {
// this child was deleted
if (p.getKeyCount() == 0) {
......@@ -488,6 +611,29 @@ public class MVMap<K, V> extends AbstractMap<K, V>
return result;
}
int todoMerge;
// private boolean merge(Page a, Page b, boolean left) {
// boolean leaf = a.isLeaf();
// if (leaf != b.isLeaf()) {
// return false;
// }
// if (left) {
// int moved = 0;
// while (a.getKeyCount() < b.getKeyCount() - 1) {
// if (leaf) {
// Object k = b.getKey(0);
// Object v = b.getValue(0);
// b.remove(0);
// a.insertLeaf(a.getKeyCount(), k, v);
// } else {
//
// }
// moved++;
// }
// }
// }
// }
protected void setRoot(Page newRoot) {
if (root != newRoot) {
removeUnusedOldVersions();
......@@ -570,9 +716,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param from the first key to return
* @return the iterator
*/
public Iterator<K> keyIterator(K from) {
public Cursor<K> keyIterator(K from) {
checkOpen();
return new Cursor<K, V>(this, root, from);
return new Cursor<K>(this, root, from);
}
/**
......@@ -603,7 +749,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
@Override
public Iterator<K> iterator() {
return new Cursor<K, V>(map, root, null);
return new Cursor<K>(map, root, null);
}
@Override
......
......@@ -35,22 +35,16 @@ header:
H:3,blockSize=4096,...
TODO:
- implement more counted b-tree (skip, get positions)
- possibly split chunk data into immutable and mutable
- merge pages if small
- support stores that span multiple files (chunks stored in other files)
- triggers
- merge pages if small
- r-tree: add missing features (NN search for example)
- compression: maybe hash table reset speeds up compression
- use file level locking to ensure there is only one writer
- pluggable cache (specially for in-memory file systems)
- maybe store the factory class in the file header
- support custom fields in the header
- support custom fields in the header (auto-server ip address,...)
- auto-server: store port in the header
- recovery: keep a list of old chunks
- recovery: ensure data is not overwritten for 1 minute
- recovery: keep some old chunks; don't overwritten for 1 minute
- pluggable caching (specially for in-memory file systems)
- file locking
- allocate memory with Utils.newBytes and so on
- unified exception handling
- check if locale specific string comparison can make data disappear
......@@ -63,6 +57,11 @@ TODO:
- support background writes (concurrent modification & store)
- limited support for writing to old versions (branches)
- support concurrent operations (including file I/O)
- maxPageSize should be size in bytes (not it is actually maxPageEntryCount)
- on insert, if the child page is already full, don't load and modify it - split directly
- performance test with encrypting file system
- possibly split chunk data into immutable and mutable
- compact: avoid processing pages using a counting bloom filter
*/
......@@ -89,6 +88,7 @@ public class MVStore {
private int maxPageSize = 30;
private FileChannel file;
private long fileSize;
private final int blockSize = 4 * 1024;
private long rootChunkStart;
......@@ -359,7 +359,11 @@ public class MVStore {
try {
log("file open");
file = FilePathCache.wrap(FilePath.get(fileName).open("rw"));
if (file.size() == 0) {
if (file.tryLock() == null) {
throw new RuntimeException("The file is locked: " + fileName);
}
fileSize = file.size();
if (fileSize == 0) {
writeHeader();
} else {
readHeader();
......@@ -411,6 +415,7 @@ public class MVStore {
DataUtils.writeFully(file, 0, header);
header.rewind();
DataUtils.writeFully(file, blockSize, header);
fileSize = Math.max(fileSize, 2 * blockSize);
} catch (Exception e) {
throw convert(e);
}
......@@ -591,6 +596,7 @@ public class MVStore {
} catch (IOException e) {
throw new RuntimeException(e);
}
fileSize = Math.max(fileSize, filePos + buff.position());
rootChunkStart = filePos;
revertTemp();
......@@ -623,25 +629,25 @@ public class MVStore {
private void shrinkFileIfPossible(int minPercent) {
long used = getFileLengthUsed();
try {
long size = file.size();
if (used >= size) {
if (used >= fileSize) {
return;
}
if (minPercent > 0 && size - used < blockSize) {
if (minPercent > 0 && fileSize - used < blockSize) {
return;
}
int savedPercent = (int) (100 - (used * 100 / size));
int savedPercent = (int) (100 - (used * 100 / fileSize));
if (savedPercent < minPercent) {
return;
}
file.truncate(used);
fileSize = used;
} catch (Exception e) {
throw convert(e);
}
}
private long getFileLengthUsed() {
int min = 0;
int min = 2;
for (Chunk c : chunks.values()) {
if (c.start == Long.MAX_VALUE) {
continue;
......@@ -865,7 +871,7 @@ public class MVStore {
if (p == null) {
long filePos = getFilePosition(pos);
readCount++;
p = Page.read(file, map, filePos, pos);
p = Page.read(file, map, pos, filePos, fileSize);
cache.put(pos, p);
}
return p;
......@@ -1089,7 +1095,7 @@ public class MVStore {
/**
* Get the current version of the data. When a new store is created, the
* version is 0. For each commit, it is incremented by one.
* version is 0.
*
* @return the version
*/
......
......@@ -117,10 +117,12 @@ public class Page {
* @return the page
*/
static Page read(FileChannel file, MVMap<?, ?> map,
long filePos, long pos) {
int maxLength = DataUtils.getPageMaxLength(pos), length = maxLength;
long pos, long filePos, long fileSize) {
ByteBuffer buff;
int maxLength = DataUtils.getPageMaxLength(pos);
try {
maxLength = (int) Math.min(fileSize - filePos, maxLength);
int length = maxLength;
if (maxLength == Integer.MAX_VALUE) {
buff = ByteBuffer.wrap(new byte[128]);
DataUtils.readFully(file, filePos, buff);
......@@ -251,10 +253,11 @@ public class Page {
/**
* Search the key in this page using a binary search. Instead of always
* starting the search in the middle, the last found index is cached. If the
* key was found, the returned value is the index in the key array. If not
* found, the returned value is negative, where -1 means the provided key is
* smaller than any keys in this page. See also Arrays.binarySearch.
* starting the search in the middle, the last found index is cached.
* <p>
* If the key was found, the returned value is the index in the key array.
* If not found, the returned value is negative, where -1 means the provided
* key is smaller than any keys in this page. See also Arrays.binarySearch.
*
* @param key the key
* @return the value or null
......@@ -396,6 +399,10 @@ public class Page {
return totalCount;
}
long getCounts(int index) {
return counts[index];
}
/**
* Replace the child page.
*
......@@ -720,9 +727,22 @@ public class Page {
/**
* Get the maximum length in bytes to store temporary records, recursively.
*
* @return the next page id
* @return the maximum length
*/
int getMaxLengthTempRecursive() {
int maxLength = getMaxLength();
if (!isLeaf()) {
for (int i = 0; i <= keyCount; i++) {
Page p = childrenPages[i];
if (p != null) {
maxLength += p.getMaxLengthTempRecursive();
}
}
}
return maxLength;
}
int getMaxLength() {
// length, check, map id, key length, type
int maxLength = 4 + 2 + DataUtils.MAX_VAR_INT_LEN
+ DataUtils.MAX_VAR_INT_LEN + 1;
......@@ -737,12 +757,6 @@ public class Page {
} else {
maxLength += 8 * len;
maxLength += DataUtils.MAX_VAR_LONG_LEN * len;
for (int i = 0; i <= len; i++) {
Page p = childrenPages[i];
if (p != null) {
maxLength += p.getMaxLengthTempRecursive();
}
}
}
return maxLength;
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论