提交 8497c759 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: deal with the case that data was moved to a new chunk (work in progress)

上级 d476faca
...@@ -67,6 +67,12 @@ public class DataUtils { ...@@ -67,6 +67,12 @@ public class DataUtils {
*/ */
public static final int ERROR_SERIALIZATION = 8; public static final int ERROR_SERIALIZATION = 8;
/**
* The application was trying to read data from a chunk that is no longer
* available.
*/
public static final int ERROR_CHUNK_NOT_FOUND = 9;
/** /**
* The transaction store is corrupt. * The transaction store is corrupt.
*/ */
......
...@@ -793,8 +793,16 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -793,8 +793,16 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* *
* @param set the set of chunk ids * @param set the set of chunk ids
*/ */
public void rewrite(Set<Integer> set) { void rewrite(Set<Integer> set) {
try {
rewrite(root, set); rewrite(root, set);
} catch (IllegalStateException e) {
if (DataUtils.getErrorCode(e.getMessage()) == DataUtils.ERROR_CHUNK_NOT_FOUND) {
// ignore
} else {
throw e;
}
}
} }
private int rewrite(Page p, Set<Integer> set) { private int rewrite(Page p, Set<Integer> set) {
......
...@@ -105,6 +105,13 @@ MVStore: ...@@ -105,6 +105,13 @@ MVStore:
- compact: copy whole pages (without having to open all maps) - compact: copy whole pages (without having to open all maps)
- maybe change the length code to have lower gaps - maybe change the length code to have lower gaps
- test with very low limits (such as: short chunks, small pages) - test with very low limits (such as: short chunks, small pages)
- maybe allow to read beyond the retention time:
when compacting, move live pages in old chunks
to a map (possibly the metadata map) -
this requires a change in the compaction code, plus
a map lookup when reading old data; also, this
old data map needs to be cleaned up somehow;
maybe using an additional timeout
*/ */
...@@ -849,7 +856,7 @@ public class MVStore { ...@@ -849,7 +856,7 @@ public class MVStore {
// it could also be unsynchronized metadata // it could also be unsynchronized metadata
// access (if synchronization on this was forgotten) // access (if synchronization on this was forgotten)
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL, DataUtils.ERROR_CHUNK_NOT_FOUND,
"Chunk {0} no longer exists", "Chunk {0} no longer exists",
chunkId); chunkId);
} }
......
...@@ -55,7 +55,7 @@ public class MVTableEngine implements TableEngine { ...@@ -55,7 +55,7 @@ public class MVTableEngine implements TableEngine {
String dbPath = db.getDatabasePath(); String dbPath = db.getDatabasePath();
MVStore.Builder builder = new MVStore.Builder(); MVStore.Builder builder = new MVStore.Builder();
if (dbPath == null) { if (dbPath == null) {
store = new Store(db, builder.open()); store = new Store(db, builder);
} else { } else {
String fileName = dbPath + Constants.SUFFIX_MV_FILE; String fileName = dbPath + Constants.SUFFIX_MV_FILE;
MVStoreTool.compactCleanUp(fileName); MVStoreTool.compactCleanUp(fileName);
...@@ -94,7 +94,7 @@ public class MVTableEngine implements TableEngine { ...@@ -94,7 +94,7 @@ public class MVTableEngine implements TableEngine {
}); });
try { try {
store = new Store(db, builder.open()); store = new Store(db, builder);
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
int errorCode = DataUtils.getErrorCode(e.getMessage()); int errorCode = DataUtils.getErrorCode(e.getMessage());
if (errorCode == DataUtils.ERROR_FILE_CORRUPT) { if (errorCode == DataUtils.ERROR_FILE_CORRUPT) {
...@@ -157,8 +157,8 @@ public class MVTableEngine implements TableEngine { ...@@ -157,8 +157,8 @@ public class MVTableEngine implements TableEngine {
private int temporaryMapId; private int temporaryMapId;
public Store(Database db, MVStore store) { public Store(Database db, MVStore.Builder builder) {
this.store = store; this.store = builder.open();
this.transactionStore = new TransactionStore( this.transactionStore = new TransactionStore(
store, store,
new ValueDataType(null, db, null), new ValueDataType(null, db, null),
......
...@@ -1383,9 +1383,67 @@ public class TransactionStore { ...@@ -1383,9 +1383,67 @@ public class TransactionStore {
* included * included
* @return the iterator * @return the iterator
*/ */
public Iterator<K> keyIterator(K from, boolean includeUncommitted) { public Iterator<K> keyIterator(final K from, final boolean includeUncommitted) {
Iterator<K> it = map.keyIterator(from); return new Iterator<K>() {
return wrapIterator(it, includeUncommitted); private K currentKey = from;
private Cursor<K, VersionedValue> cursor = map.cursor(currentKey);
{
fetchNext();
}
private void fetchNext() {
while (cursor.hasNext()) {
K k;
try {
k = cursor.next();
} catch (IllegalStateException e) {
// TODO this is a bit ugly
if (DataUtils.getErrorCode(e.getMessage()) == DataUtils.ERROR_CHUNK_NOT_FOUND) {
cursor = map.cursor(currentKey);
// we (should) get the current key again,
// we need to ignore that one
if (!cursor.hasNext()) {
break;
}
cursor.next();
if (!cursor.hasNext()) {
break;
}
k = cursor.next();
} else {
throw e;
}
}
currentKey = k;
if (includeUncommitted) {
return;
}
if (containsKey(k)) {
return;
}
}
currentKey = null;
}
@Override
public boolean hasNext() {
return currentKey != null;
}
@Override
public K next() {
K result = currentKey;
fetchNext();
return result;
}
@Override
public void remove() {
throw DataUtils.newUnsupportedOperationException(
"Removing is not supported");
}
};
} }
/** /**
...@@ -1394,10 +1452,11 @@ public class TransactionStore { ...@@ -1394,10 +1452,11 @@ public class TransactionStore {
* @param from the first key to return * @param from the first key to return
* @return the iterator * @return the iterator
*/ */
public Iterator<Entry<K, V>> entryIterator(K from) { public Iterator<Entry<K, V>> entryIterator(final K from) {
final Cursor<K, VersionedValue> cursor = map.cursor(from);
return new Iterator<Entry<K, V>>() { return new Iterator<Entry<K, V>>() {
private Entry<K, V> current; private Entry<K, V> current;
private K currentKey = from;
private Cursor<K, VersionedValue> cursor = map.cursor(currentKey);
{ {
fetchNext(); fetchNext();
...@@ -1405,17 +1464,40 @@ public class TransactionStore { ...@@ -1405,17 +1464,40 @@ public class TransactionStore {
private void fetchNext() { private void fetchNext() {
while (cursor.hasNext()) { while (cursor.hasNext()) {
final K key = cursor.next(); K k;
try {
k = cursor.next();
} catch (IllegalStateException e) {
// TODO this is a bit ugly
if (DataUtils.getErrorCode(e.getMessage()) == DataUtils.ERROR_CHUNK_NOT_FOUND) {
cursor = map.cursor(currentKey);
// we (should) get the current key again,
// we need to ignore that one
if (!cursor.hasNext()) {
break;
}
cursor.next();
if (!cursor.hasNext()) {
break;
}
k = cursor.next();
} else {
throw e;
}
}
final K key = k;
VersionedValue data = cursor.getValue(); VersionedValue data = cursor.getValue();
data = getValue(key, readLogId, data); data = getValue(key, readLogId, data);
if (data != null && data.value != null) { if (data != null && data.value != null) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
final V value = (V) data.value; final V value = (V) data.value;
current = new DataUtils.MapEntry<K, V>(key, value); current = new DataUtils.MapEntry<K, V>(key, value);
currentKey = key;
return; return;
} }
} }
current = null; current = null;
currentKey = null;
} }
@Override @Override
......
...@@ -625,7 +625,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1` ...@@ -625,7 +625,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
new TestCompatibility().runTest(this); new TestCompatibility().runTest(this);
new TestCompatibilityOracle().runTest(this); new TestCompatibilityOracle().runTest(this);
new TestCsv().runTest(this); new TestCsv().runTest(this);
new TestDateStorage().runTest(this); // TODO test new TestDateStorage().runTest(this);
new TestDeadlock().runTest(this); new TestDeadlock().runTest(this);
new TestDrop().runTest(this); new TestDrop().runTest(this);
new TestDuplicateKeyUpdate().runTest(this); new TestDuplicateKeyUpdate().runTest(this);
...@@ -721,7 +721,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1` ...@@ -721,7 +721,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
// synth // synth
new TestBtreeIndex().runTest(this); new TestBtreeIndex().runTest(this);
new TestConcurrentUpdate().runTest(this); new TestConcurrentUpdate().runTest(this);
new TestDiskFull().runTest(this); // TODO test new TestDiskFull().runTest(this);
new TestCrashAPI().runTest(this); new TestCrashAPI().runTest(this);
new TestFuzzOptimizations().runTest(this); new TestFuzzOptimizations().runTest(this);
new TestLimit().runTest(this); new TestLimit().runTest(this);
......
...@@ -259,6 +259,11 @@ public abstract class TestBase { ...@@ -259,6 +259,11 @@ public abstract class TestBase {
} }
return name; return name;
} }
if (admin) {
; // TODO testing
// name = addOption(name, "RETENTION_TIME", "10");
// name = addOption(name, "WRITE_DELAY", "10");
}
if (config.memory) { if (config.memory) {
name = "mem:" + name; name = "mem:" + name;
} else { } else {
......
...@@ -48,6 +48,7 @@ public class TestMVTableEngine extends TestBase { ...@@ -48,6 +48,7 @@ public class TestMVTableEngine extends TestBase {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
testLowRetentionTime();
testOldAndNew(); testOldAndNew();
testTemporaryTables(); testTemporaryTables();
testUniqueIndex(); testUniqueIndex();
...@@ -79,6 +80,31 @@ public class TestMVTableEngine extends TestBase { ...@@ -79,6 +80,31 @@ public class TestMVTableEngine extends TestBase {
testSimple(); testSimple();
} }
private void testLowRetentionTime() throws SQLException {
deleteDb("mvstore");
Connection conn = getConnection("mvstore;RETENTION_TIME=10;WRITE_DELAY=10");
Statement stat = conn.createStatement();
Connection conn2 = getConnection("mvstore");
Statement stat2 = conn2.createStatement();
stat.execute("create alias sleep as $$void sleep(int ms) throws Exception { Thread.sleep(ms); }$$");
stat.execute("create table test(id identity, name varchar) as select x, 'Init' from system_range(0, 1999)");
for (int i = 0; i < 10; i++) {
stat.execute("insert into test values(null, 'Hello')");
// create and delete a large table: this will force compaction
stat.execute("create table temp(id identity, name varchar) as " +
"select x, space(1000000) from system_range(0, 10)");
stat.execute("drop table temp");
}
ResultSet rs = stat2
.executeQuery("select *, sleep(1) from test order by id");
for (int i = 0; i < 2000 + 10; i++) {
assertTrue(rs.next());
assertEquals(i, rs.getInt(1));
}
assertFalse(rs.next());
conn.close();
}
private void testOldAndNew() throws SQLException { private void testOldAndNew() throws SQLException {
FileUtils.deleteRecursive(getBaseDir(), true); FileUtils.deleteRecursive(getBaseDir(), true);
Connection conn; Connection conn;
......
...@@ -168,7 +168,7 @@ public class TestRandomMapOps extends TestBase { ...@@ -168,7 +168,7 @@ public class TestRandomMapOps extends TestBase {
private static MVStore openStore(String fileName) { private static MVStore openStore(String fileName) {
MVStore s = new MVStore.Builder().fileName(fileName). MVStore s = new MVStore.Builder().fileName(fileName).
pageSplitSize(50).autoCommitDisabled().open(); pageSplitSize(50).autoCommitDisabled().open();
s.setRetentionTime(0); s.setRetentionTime(1000);
return s; return s;
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论