提交 f6fbb356 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: support two compression levels; measure read and written bytes;…

MVStore: support two compression levels; measure read and written bytes; ascending iteration over entry set
上级 ae3db101
......@@ -103,7 +103,7 @@ Example usage:
MVStore s = new MVStore.Builder().
fileName(fileName).
encryptionKey("007".toCharArray()).
compressData().
compress().
open();
</pre>
<p>
......@@ -114,7 +114,10 @@ The list of available options is:
</li><li>backgroundExceptionHandler: specify a handler for
exceptions that could occur while writing in the background.
</li><li>cacheSize: the cache size in MB.
</li><li>compressData: compress the data when storing.
</li><li>compress: compress the data when storing
using a fast algorithm (LZF).
</li><li>compressHigh: compress the data when storing
using a slower algorithm (Deflate).
</li><li>encryptionKey: the encryption key for file encryption.
</li><li>fileName: the name of the file, for file based stores.
</li><li>fileStore: the storage implementation to use.
......
......@@ -229,7 +229,7 @@ public class Chunk {
DataUtils.appendMap(buff, "block", block);
DataUtils.appendMap(buff, "version", version);
byte[] bytes = buff.toString().getBytes(DataUtils.LATIN);
int checksum = DataUtils.getFletcher32(bytes, bytes.length / 2 * 2);
int checksum = DataUtils.getFletcher32(bytes, bytes.length);
DataUtils.appendMap(buff, "fletcher", checksum);
while (buff.length() < Chunk.FOOTER_LENGTH - 1) {
buff.append(' ');
......
......@@ -19,7 +19,7 @@ public class Cursor<K, V> implements Iterator<K> {
private final MVMap<K, ?> map;
private final K from;
private CursorPos pos;
private K current;
private K current, last;
private V currentValue, lastValue;
private final Page root;
private boolean initialized;
......@@ -44,11 +44,21 @@ public class Cursor<K, V> implements Iterator<K> {
public K next() {
hasNext();
K c = current;
last = current;
lastValue = currentValue;
fetchNext();
return c;
}
/**
* Get the last read key if there was one.
*
* @return the key or null
*/
public K getKey() {
return last;
}
/**
* Get the last read value if there was one.
*
......
......@@ -94,10 +94,15 @@ public class DataUtils {
public static final int PAGE_TYPE_NODE = 1;
/**
* The bit mask for compressed pages.
* The bit mask for compressed pages (compression level fast).
*/
public static final int PAGE_COMPRESSED = 2;
/**
* The bit mask for compressed pages (compression level high).
*/
public static final int PAGE_COMPRESSED_HIGH = 2 + 4;
/**
* The maximum length of a variable size int.
*/
......@@ -394,12 +399,13 @@ public class DataUtils {
}
/**
* Read from a file channel until the buffer is full, or end-of-file
* has been reached. The buffer is rewind after reading.
* Read from a file channel until the buffer is full.
* The buffer is rewind after reading.
*
* @param file the file channel
* @param pos the absolute position within the file
* @param dst the byte buffer
* @throws IllegalStateException if some data could not be read
*/
public static void readFully(FileChannel file, long pos, ByteBuffer dst) {
try {
......@@ -662,20 +668,26 @@ public class DataUtils {
* Calculate the Fletcher32 checksum.
*
* @param bytes the bytes
* @param length the message length (must be a multiple of 2)
* @param length the message length (if odd, 0 is appended)
* @return the checksum
*/
public static int getFletcher32(byte[] bytes, int length) {
int s1 = 0xffff, s2 = 0xffff;
for (int i = 0; i < length;) {
int i = 0, evenLength = length / 2 * 2;
while (i < evenLength) {
// reduce after 360 words (each word is two bytes)
for (int end = Math.min(i + 720, length); i < end;) {
for (int end = Math.min(i + 720, evenLength); i < end;) {
int x = ((bytes[i++] & 0xff) << 8) | (bytes[i++] & 0xff);
s2 += s1 += x;
}
s1 = (s1 & 0xffff) + (s1 >>> 16);
s2 = (s2 & 0xffff) + (s2 >>> 16);
}
if (i < length) {
// odd length: append 0
int x = (bytes[i] & 0xff) << 8;
s2 += s1 += x;
}
s1 = (s1 & 0xffff) + (s1 >>> 16);
s2 = (s2 & 0xffff) + (s2 >>> 16);
return (s2 << 16) | s1;
......
......@@ -30,11 +30,21 @@ public class FileStore {
*/
protected long readCount;
/**
* The number of read bytes.
*/
protected long readBytes;
/**
* The number of write operations.
*/
protected long writeCount;
/**
* The number of written bytes.
*/
protected long writeBytes;
/**
* The free spaces between the chunks. The first block to use is block 2
* (the first two blocks are the store header).
......@@ -85,9 +95,10 @@ public class FileStore {
* @return the byte buffer
*/
public ByteBuffer readFully(long pos, int len) {
readCount++;
ByteBuffer dst = ByteBuffer.allocate(len);
DataUtils.readFully(file, pos, dst);
readCount++;
readBytes += len;
return dst;
}
......@@ -98,9 +109,11 @@ public class FileStore {
* @param src the source buffer
*/
public void writeFully(long pos, ByteBuffer src) {
writeCount++;
fileSize = Math.max(fileSize, pos + src.remaining());
int len = src.remaining();
fileSize = Math.max(fileSize, pos + len);
DataUtils.writeFully(file, pos, src);
writeCount++;
writeBytes += len;
}
/**
......@@ -258,6 +271,15 @@ public class FileStore {
return writeCount;
}
/**
* Get the number of written bytes since this store was opened.
*
* @return the number of write operations
*/
public long getWriteBytes() {
return writeBytes;
}
/**
* Get the number of read operations since this store was opened.
* For file based stores, this is the number of file read operations.
......@@ -268,6 +290,15 @@ public class FileStore {
return readCount;
}
/**
* Get the number of read bytes since this store was opened.
*
* @return the number of write operations
*/
public long getReadBytes() {
return readBytes;
}
public boolean isReadOnly() {
return readOnly;
}
......
......@@ -16,6 +16,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.h2.mvstore.type.DataType;
import org.h2.mvstore.type.ObjectDataType;
import org.h2.util.New;
......@@ -765,11 +766,47 @@ public class MVMap<K, V> extends AbstractMap<K, V>
@Override
public Set<Map.Entry<K, V>> entrySet() {
HashMap<K, V> map = new HashMap<K, V>();
for (Cursor<K, V> cursor = cursor(null); cursor.hasNext();) {
map.put(cursor.next(), cursor.getValue());
final MVMap<K, V> map = this;
final Page root = this.root;
return new AbstractSet<Entry<K, V>>() {
@Override
public Iterator<Entry<K, V>> iterator() {
final Cursor<K, V> cursor = new Cursor<K, V>(map, root, null);
return new Iterator<Entry<K, V>>() {
@Override
public boolean hasNext() {
return cursor.hasNext();
}
@Override
public Entry<K, V> next() {
K k = cursor.next();
return new DataUtils.MapEntry<K, V>(k, cursor.getValue());
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException(
"Removing is not supported");
}
};
}
@Override
public int size() {
return MVMap.this.size();
}
return map.entrySet();
@Override
public boolean contains(Object o) {
return MVMap.this.containsKey(o);
}
};
}
@Override
......
......@@ -20,6 +20,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.h2.compress.CompressDeflate;
import org.h2.compress.CompressLZF;
import org.h2.compress.Compressor;
import org.h2.mvstore.cache.CacheLongKeyLIRS;
......@@ -186,12 +187,14 @@ public class MVStore {
private int versionsToKeep = 5;
/**
* Whether to compress new pages. Even if disabled, the store may contain
* (old) compressed pages.
* The compression level for new pages (0 for disabled, 1 for fast, 2 for
* high). Even if disabled, the store may contain (old) compressed pages.
*/
private final boolean compress;
private final int compressionLevel;
private final Compressor compressor = new CompressLZF();
private Compressor compressorFast;
private Compressor compressorHigh;
private final UncaughtExceptionHandler backgroundExceptionHandler;
......@@ -247,9 +250,10 @@ public class MVStore {
* @throws IllegalArgumentException if the directory does not exist
*/
MVStore(HashMap<String, Object> config) {
this.compress = config.containsKey("compress");
Object o = config.get("compress");
this.compressionLevel = o == null ? 0 : (Integer) o;
String fileName = (String) config.get("fileName");
Object o = config.get("pageSplitSize");
o = config.get("pageSplitSize");
if (o == null) {
pageSplitSize = fileName == null ? 4 * 1024 : 16 * 1024;
} else {
......@@ -525,7 +529,7 @@ public class MVStore {
s = s.substring(0, s.lastIndexOf("fletcher") - 1);
byte[] bytes = s.getBytes(DataUtils.LATIN);
int checksum = DataUtils.getFletcher32(bytes,
bytes.length / 2 * 2);
bytes.length);
if (check != checksum) {
continue;
}
......@@ -683,7 +687,7 @@ public class MVStore {
m.remove("fletcher");
s = s.substring(0, s.lastIndexOf("fletcher") - 1);
byte[] bytes = s.getBytes(DataUtils.LATIN);
int checksum = DataUtils.getFletcher32(bytes, bytes.length / 2 * 2);
int checksum = DataUtils.getFletcher32(bytes, bytes.length);
if (check == checksum) {
int chunk = DataUtils.readHexInt(m, "chunk", 0);
Chunk c = new Chunk(chunk);
......@@ -706,7 +710,7 @@ public class MVStore {
}
DataUtils.appendMap(buff, fileHeader);
byte[] bytes = buff.toString().getBytes(DataUtils.LATIN);
int checksum = DataUtils.getFletcher32(bytes, bytes.length / 2 * 2);
int checksum = DataUtils.getFletcher32(bytes, bytes.length);
DataUtils.appendMap(buff, "fletcher", checksum);
buff.append("\n");
bytes = buff.toString().getBytes(DataUtils.LATIN);
......@@ -1676,12 +1680,22 @@ public class MVStore {
}
}
Compressor getCompressor() {
return compressor;
Compressor getCompressorFast() {
if (compressorFast == null) {
compressorFast = new CompressLZF();
}
return compressorFast;
}
Compressor getCompressorHigh() {
if (compressorHigh == null) {
compressorHigh = new CompressDeflate();
}
return compressorHigh;
}
boolean getCompress() {
return compress;
int getCompressionLevel() {
return compressionLevel;
}
public int getPageSplitSize() {
......@@ -2247,6 +2261,15 @@ public class MVStore {
return (int) (cache.getMaxMemory() / 1024 / 1024);
}
/**
* Get the cache.
*
* @return the cache
*/
public CacheLongKeyLIRS<Page> getCache() {
return cache;
}
/**
* A background writer thread to automatically store changes from time to
* time.
......@@ -2391,10 +2414,25 @@ public class MVStore {
*
* @return this
*/
public Builder compressData() {
public Builder compress() {
return set("compress", 1);
}
/**
* Compress data before writing using the Deflate algorithm. This will
* save more disk space, but will slow down read and write operations
* quite a bit.
* <p>
* This setting only affects writes; it is not necessary to enable
* compression when reading, even if compression was enabled when
* writing.
*
* @return this
*/
public Builder compressHigh() {
return set("compress", 2);
}
/**
* Set the amount of memory a page should contain at most, in bytes,
* before it is split. The default is 16 KB for persistent stores and 4
......
......@@ -39,6 +39,7 @@ public class OffHeapStore extends FileStore {
"Could not read from position {0}", pos);
}
readCount++;
readBytes += len;
ByteBuffer buff = memEntry.getValue();
ByteBuffer read = buff.duplicate();
int offset = (int) (pos - memEntry.getKey());
......@@ -81,6 +82,7 @@ public class OffHeapStore extends FileStore {
"partial overwrite is not supported", pos);
}
writeCount++;
writeBytes += length;
buff.rewind();
buff.put(src);
return;
......@@ -95,8 +97,9 @@ public class OffHeapStore extends FileStore {
}
private void writeNewEntry(long pos, ByteBuffer src) {
writeCount++;
int length = src.remaining();
writeCount++;
writeBytes += length;
ByteBuffer buff = ByteBuffer.allocateDirect(length);
buff.put(src);
buff.rewind();
......
......@@ -777,7 +777,13 @@ public class Page {
}
boolean compressed = (type & DataUtils.PAGE_COMPRESSED) != 0;
if (compressed) {
Compressor compressor = map.getStore().getCompressor();
Compressor compressor;
if ((type & DataUtils.PAGE_COMPRESSED_HIGH) ==
DataUtils.PAGE_COMPRESSED_HIGH) {
compressor = map.getStore().getCompressorHigh();
} else {
compressor = map.getStore().getCompressorFast();
}
int lenAdd = DataUtils.readVarInt(buff);
int compLen = pageLength + start - buff.position();
byte[] comp = DataUtils.newBytes(compLen);
......@@ -827,20 +833,32 @@ public class Page {
}
MVStore store = map.getStore();
int expLen = buff.position() - compressStart;
if (expLen > 16 && store.getCompress()) {
Compressor compressor = map.getStore().getCompressor();
if (expLen > 16) {
int compressionLevel = store.getCompressionLevel();
if (compressionLevel > 0) {
Compressor compressor;
int compressType;
if (compressionLevel == 1) {
compressor = map.getStore().getCompressorFast();
compressType = DataUtils.PAGE_COMPRESSED;
} else {
compressor = map.getStore().getCompressorHigh();
compressType = DataUtils.PAGE_COMPRESSED_HIGH;
}
byte[] exp = new byte[expLen];
buff.position(compressStart).get(exp);
byte[] comp = new byte[expLen * 2];
int compLen = compressor.compress(exp, expLen, comp, 0);
if (compLen + DataUtils.getVarIntLen(compLen - expLen) < expLen) {
int plus = DataUtils.getVarIntLen(compLen - expLen);
if (compLen + plus < expLen) {
buff.position(typePos).
put((byte) (type + DataUtils.PAGE_COMPRESSED));
put((byte) (type + compressType));
buff.position(compressStart).
putVarInt(expLen - compLen).
put(comp, 0, compLen);
}
}
}
int pageLength = buff.position() - start;
int chunkId = chunk.id;
int check = DataUtils.getCheckValue(chunkId)
......
......@@ -301,7 +301,10 @@ public class WriteBuffer {
private void grow(int len) {
ByteBuffer temp = buff;
int needed = len - temp.remaining();
int newCapacity = temp.capacity() + Math.max(needed, MIN_GROW);
int grow = Math.max(needed, MIN_GROW);
// grow at least 50% of the current size
grow = Math.max(temp.capacity() / 2, grow);
int newCapacity = temp.capacity() + grow;
buff = ByteBuffer.allocate(newCapacity);
temp.flip();
buff.put(temp);
......
......@@ -79,7 +79,7 @@ public class MVTableEngine implements TableEngine {
builder.encryptionKey(password);
}
if (db.getSettings().compressData) {
builder.compressData();
builder.compress();
// use a larger page split size to improve the compression ratio
builder.pageSplitSize(64 * 1024);
}
......
......@@ -61,6 +61,21 @@ public class TestDataUtils extends TestBase {
for (int i = 0; i < 10000; i += 1000) {
assertEquals(-1, DataUtils.getFletcher32(data, i));
}
for (int i = 0; i < 1000; i++) {
for (int j = 0; j < 255; j++) {
Arrays.fill(data, 0, i, (byte) j);
data[i] = 0;
int a = DataUtils.getFletcher32(data, i);
if (i % 2 == 1) {
// add length: same as appending a 0
int b = DataUtils.getFletcher32(data, i + 1);
assertEquals(a, b);
}
data[i] = 10;
int c = DataUtils.getFletcher32(data, i);
assertEquals(a, c);
}
}
long last = 0;
for (int i = 1; i < 255; i++) {
Arrays.fill(data, (byte) i);
......
......@@ -11,6 +11,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
......@@ -51,6 +52,7 @@ public class TestMVStore extends TestBase {
public void test() throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true);
FileUtils.createDirectories(getBaseDir());
testEntrySet();
testCompressEmptyPage();
testCompressed();
testFileFormatExample();
......@@ -108,11 +110,26 @@ public class TestMVStore extends TestBase {
testLargerThan2G();
}
private void testEntrySet() {
MVStore s = new MVStore.Builder().open();
MVMap<Integer, Integer> map;
map = s.openMap("data");
for (int i = 0; i < 20; i++) {
map.put(i, i * 10);
}
int next = 0;
for (Entry<Integer, Integer> e : map.entrySet()) {
assertEquals(next, e.getKey().intValue());
assertEquals(next * 10, e.getValue().intValue());
next++;
}
}
private void testCompressEmptyPage() {
String fileName = getBaseDir() + "/testDeletedMap.h3";
MVStore store = new MVStore.Builder().
cacheSize(100).fileName(fileName).
compressData().
compress().
autoCommitBufferSize(10 * 1024).
open();
MVMap<String, String> map = store.openMap("test");
......@@ -120,20 +137,34 @@ public class TestMVStore extends TestBase {
store.commit();
store.close();
store = new MVStore.Builder().
compressData().
compress().
open();
store.close();
}
private void testCompressed() {
String fileName = getBaseDir() + "/testCompressed.h3";
MVStore s = new MVStore.Builder().fileName(fileName).compressData().open();
long lastSize = 0;
for (int level = 0; level <= 2; level++) {
FileUtils.delete(fileName);
MVStore.Builder builder = new MVStore.Builder().fileName(fileName);
if (level == 1) {
builder.compress();
} else if (level == 2) {
builder.compressHigh();
}
MVStore s = builder.open();
MVMap<String, String> map = s.openMap("data");
String data = "xxxxxxxxxx";
String data = new String(new char[1000]).replace((char) 0, 'x');
for (int i = 0; i < 400; i++) {
map.put(data + i, data);
}
s.close();
long size = FileUtils.size(fileName);
if (level > 0) {
assertTrue(size < lastSize);
}
lastSize = size;
s = new MVStore.Builder().fileName(fileName).open();
map = s.openMap("data");
for (int i = 0; i < 400; i++) {
......@@ -141,6 +172,7 @@ public class TestMVStore extends TestBase {
}
s.close();
}
}
private void testFileFormatExample() {
String fileName = getBaseDir() + "/testFileFormatExample.h3";
......@@ -707,7 +739,7 @@ public class TestMVStore extends TestBase {
s = new MVStore.Builder().
fileName(fileName).
autoCommitDisabled().
compressData().open();
compress().open();
map = s.openMap("test");
// add 10 MB of data
for (int i = 0; i < 1024; i++) {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论