提交 7648963e authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore mode: the CLOB and BLOB storage was re-implemented and is now much…

MVStore mode: the CLOB and BLOB storage was re-implemented and is now much faster than with the PageStore (which is still the default storage).
上级 47e6588f
...@@ -18,7 +18,9 @@ Change Log ...@@ -18,7 +18,9 @@ Change Log
<h1>Change Log</h1> <h1>Change Log</h1>
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul><li>Various bugs in the MVStore storage and have been fixed. <ul><li>MVStore mode: the CLOB and BLOB storage was re-implemented and is
now much faster than with the PageStore (which is still the default storage).
</li><li>Various bugs in the MVStore storage and have been fixed.
</li><li>The method org.h2.expression.Function.getCost could throw a NullPointException. </li><li>The method org.h2.expression.Function.getCost could throw a NullPointException.
</li><li>Storing LOBs in separate files (outside of the database) is no longer supported for new databases. </li><li>Storing LOBs in separate files (outside of the database) is no longer supported for new databases.
</li><li>Lucene 2 is no longer supported. </li><li>Lucene 2 is no longer supported.
......
...@@ -99,6 +99,7 @@ public class StreamStore { ...@@ -99,6 +99,7 @@ public class StreamStore {
public byte[] put(InputStream in) throws IOException { public byte[] put(InputStream in) throws IOException {
ByteArrayOutputStream id = new ByteArrayOutputStream(); ByteArrayOutputStream id = new ByteArrayOutputStream();
int level = 0; int level = 0;
try {
while (true) { while (true) {
if (put(id, in, level)) { if (put(id, in, level)) {
break; break;
...@@ -108,6 +109,10 @@ public class StreamStore { ...@@ -108,6 +109,10 @@ public class StreamStore {
level++; level++;
} }
} }
} catch (IOException e) {
remove(id.toByteArray());
throw e;
}
if (id.size() > minBlockSize * 2) { if (id.size() > minBlockSize * 2) {
id = putIndirectId(id); id = putIndirectId(id);
} }
...@@ -159,12 +164,16 @@ public class StreamStore { ...@@ -159,12 +164,16 @@ public class StreamStore {
int copied = 0; int copied = 0;
int remaining = target.length; int remaining = target.length;
while (remaining > 0) { while (remaining > 0) {
try {
int len = in.read(target, copied, remaining); int len = in.read(target, copied, remaining);
if (len < 0) { if (len < 0) {
return Arrays.copyOf(target, copied); return Arrays.copyOf(target, copied);
} }
copied += len; copied += len;
remaining -= len; remaining -= len;
} catch (RuntimeException e) {
throw new IOException(e);
}
} }
return target; return target;
} }
......
...@@ -8,6 +8,7 @@ package org.h2.test.store; ...@@ -8,6 +8,7 @@ package org.h2.test.store;
import java.sql.Connection; import java.sql.Connection;
import java.sql.PreparedStatement; import java.sql.PreparedStatement;
import java.sql.Statement; import java.sql.Statement;
import java.util.Random;
import org.h2.store.fs.FileUtils; import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase; import org.h2.test.TestBase;
...@@ -28,6 +29,10 @@ public class TestBenchmark extends TestBase { ...@@ -28,6 +29,10 @@ public class TestBenchmark extends TestBase {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
;
// TODO this test is currently disabled
test(true); test(true);
test(false); test(false);
test(true); test(true);
...@@ -37,9 +42,59 @@ public class TestBenchmark extends TestBase { ...@@ -37,9 +42,59 @@ public class TestBenchmark extends TestBase {
} }
private void test(boolean mvStore) throws Exception { private void test(boolean mvStore) throws Exception {
testBinary(mvStore);
}
; private void testBinary(boolean mvStore) throws Exception {
// TODO this test is currently disabled FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn;
Statement stat;
String url = "mvstore";
if (mvStore) {
url += ";MV_STORE=TRUE;MV_STORE=TRUE";
}
url = getURL(url, true);
conn = getConnection(url);
stat = conn.createStatement();
stat.execute("create table test(id bigint primary key, data blob)");
conn.setAutoCommit(false);
PreparedStatement prep = conn
.prepareStatement("insert into test values(?, ?)");
byte[] data = new byte[1024 * 1024];
int rowCount = 100;
int readCount = 20 * rowCount;
long start = System.currentTimeMillis();
for (int i = 0; i < rowCount; i++) {
prep.setInt(1, i);
randomize(data, i);
prep.setBytes(2, data);
prep.execute();
if (i % 100 == 0) {
conn.commit();
}
}
prep = conn.prepareStatement("select * from test where id = ?");
for (int i = 0; i < readCount; i++) {
prep.setInt(1, i % rowCount);
prep.executeQuery();
}
System.out.println((System.currentTimeMillis() - start) + " "
+ (mvStore ? "mvstore" : "default"));
conn.close();
}
void randomize(byte[] data, int i) {
Random r = new Random(i);
r.nextBytes(data);
}
private void testInsertSelect(boolean mvStore) throws Exception {
FileUtils.deleteRecursive(getBaseDir(), true); FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn; Connection conn;
......
...@@ -54,10 +54,40 @@ public class TestStreamStore extends TestBase { ...@@ -54,10 +54,40 @@ public class TestStreamStore extends TestBase {
testLoop(); testLoop();
} }
private void testExceptionDuringStore() { private void testExceptionDuringStore() throws IOException {
// TODO test that if there is an IOException while storing // test that if there is an IOException while storing
// the data, the entries in the map are rolled back // the data, the entries in the map are "rolled back"
; HashMap<Long, byte[]> map = New.hashMap();
StreamStore s = new StreamStore(map);
s.setMaxBlockSize(1024);
assertThrows(IOException.class, s).
put(failingStream(new IOException()));
assertEquals(0, map.size());
// the runtime exception is converted to an IOException
assertThrows(IOException.class, s).
put(failingStream(new IllegalStateException()));
assertEquals(0, map.size());
}
static void throwUnchecked(Throwable e) {
TestStreamStore.<RuntimeException>throwThis(e);
}
@SuppressWarnings("unchecked")
private static <E extends Throwable> void throwThis(Throwable e) throws E {
throw (E) e;
}
private static ByteArrayInputStream failingStream(final Exception e) {
return new ByteArrayInputStream(new byte[20 * 1024]) {
@Override
public int read(byte[] buffer, int off, int len) {
if (this.pos > 10 * 1024) {
throwUnchecked(e);
}
return super.read(buffer, off, len);
}
};
} }
private void testReadCount() throws IOException { private void testReadCount() throws IOException {
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论