提交 95140455 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore tests and bugfixes

上级 6b03bb19
...@@ -61,8 +61,8 @@ But it can be also directly within an application, without using JDBC or SQL. ...@@ -61,8 +61,8 @@ But it can be also directly within an application, without using JDBC or SQL.
</li><li>Old versions of the data can be read concurrently with all other operations. </li><li>Old versions of the data can be read concurrently with all other operations.
</li><li>Transaction are supported (including concurrent transactions and 2-phase commit). </li><li>Transaction are supported (including concurrent transactions and 2-phase commit).
</li><li>The tool is very modular. It supports pluggable data types / serialization, </li><li>The tool is very modular. It supports pluggable data types / serialization,
pluggable map implementations (B-tree, R-tree, concurrent B-tree currently), BLOB storage, pluggable map implementations (B-tree, R-tree, concurrent B-tree currently), BLOB storage,
and a file system abstraction to support encrypted files and zip files. and a file system abstraction to support encrypted files and zip files.
</li></ul> </li></ul>
<h2 id="example_code">Example Code</h2> <h2 id="example_code">Example Code</h2>
......
...@@ -38,11 +38,16 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -38,11 +38,16 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* The current root page (may not be null). * The current root page (may not be null).
*/ */
protected volatile Page root; protected volatile Page root;
/** /**
* The version used for writing. * The version used for writing.
*/ */
protected long writeVersion; protected volatile long writeVersion;
/**
* This version is set during a write operation.
*/
protected volatile long currentWriteVersion = -1;
private int id; private int id;
private long createVersion; private long createVersion;
...@@ -53,11 +58,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -53,11 +58,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private boolean closed; private boolean closed;
private boolean readOnly; private boolean readOnly;
/**
* This flag is set during a write operation.
*/
private volatile boolean writing;
protected MVMap(DataType keyType, DataType valueType) { protected MVMap(DataType keyType, DataType valueType) {
this.keyType = keyType; this.keyType = keyType;
this.valueType = valueType; this.valueType = valueType;
...@@ -934,14 +934,14 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -934,14 +934,14 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
checkConcurrentWrite(); checkConcurrentWrite();
store.beforeWrite(); store.beforeWrite();
writing = true; currentWriteVersion = writeVersion;
} }
/** /**
* Check that no write operation is in progress. * Check that no write operation is in progress.
*/ */
protected void checkConcurrentWrite() { protected void checkConcurrentWrite() {
if (writing) { if (currentWriteVersion != -1) {
// try to detect concurrent modification // try to detect concurrent modification
// on a best-effort basis // on a best-effort basis
throw DataUtils.newConcurrentModificationException(getName()); throw DataUtils.newConcurrentModificationException(getName());
...@@ -953,17 +953,21 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -953,17 +953,21 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* operation was successful). * operation was successful).
*/ */
protected void afterWrite() { protected void afterWrite() {
writing = false; currentWriteVersion = -1;
} }
/** /**
* If there is a concurrent update to the given version, wait until it is * If there is a concurrent update to the given version, wait until it is
* finished. * finished.
* *
* @param root the root page * @param version the read version
*/ */
protected void waitUntilWritten(Page root) { protected void waitUntilWritten(long version) {
while (writing && root == this.root) { if (readOnly) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL, "Waiting for writes to a read-only map");
}
while (currentWriteVersion == version) {
Thread.yield(); Thread.yield();
} }
} }
...@@ -1136,6 +1140,10 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1136,6 +1140,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
void setWriteVersion(long writeVersion) { void setWriteVersion(long writeVersion) {
if (readOnly) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_INTERNAL, "Trying to write to a read-only map");
}
this.writeVersion = writeVersion; this.writeVersion = writeVersion;
} }
......
...@@ -60,7 +60,7 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> { ...@@ -60,7 +60,7 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> {
} }
@Override @Override
protected void waitUntilWritten(Page root) { protected void waitUntilWritten(long version) {
// no need to wait // no need to wait
} }
......
...@@ -49,17 +49,15 @@ TODO: ...@@ -49,17 +49,15 @@ TODO:
TestMVStoreDataLoss TestMVStoreDataLoss
TransactionStore: TransactionStore:
- write to the undo log _before_ a change (WAL style)
MVStore: MVStore:
- rolling docs review: at "Features" - rolling docs review: at "Features"
- additional test async write / read algorithm for speed and errors
- move setters to the builder, except for setRetainVersion, setReuseSpace, - move setters to the builder, except for setRetainVersion, setReuseSpace,
and settings that are persistent (setStoreVersion) and settings that are persistent (setStoreVersion)
- test meta rollback: it is changed after save; could rollback break it?
- automated 'kill process' and 'power failure' test - automated 'kill process' and 'power failure' test
- update checkstyle - update checkstyle
- maybe split database into multiple files, to speed up compact - maybe split database into multiple files, to speed up compact
and allow using trim (by truncating / deleting empty files)
- auto-compact from time to time and on close - auto-compact from time to time and on close
- test and possibly improve compact operation (for large dbs) - test and possibly improve compact operation (for large dbs)
- possibly split chunk metadata into immutable and mutable - possibly split chunk metadata into immutable and mutable
...@@ -68,17 +66,16 @@ MVStore: ...@@ -68,17 +66,16 @@ MVStore:
- chunk header: store changed chunk data as row; maybe after the root - chunk header: store changed chunk data as row; maybe after the root
- chunk checksum (header, last page, 2 bytes per page?) - chunk checksum (header, last page, 2 bytes per page?)
- is there a better name for the file header, - is there a better name for the file header,
-- if it's no longer always at the beginning of a file? store header? if it's no longer always at the beginning of a file? store header?
- on insert, if the child page is already full, don't load and modify it - on insert, if the child page is already full, don't load and modify it
-- split directly (specially for leaves with one large entry) split directly (specially for leaves with one large entry)
- maybe let a chunk point to a list of potential next chunks - maybe let a chunk point to a list of potential next chunks
-- (so no fixed location header is needed) (so no fixed location header is needed), similar to a skip list
- support stores that span multiple files (chunks stored in other files) - support stores that span multiple files (chunks stored in other files)
- triggers (can be implemented with a custom map) - triggers (can be implemented with a custom map)
- store number of write operations per page (maybe defragment - store number of write operations per page (maybe defragment
-- if much different than count) if much different than count)
- r-tree: nearest neighbor search - r-tree: nearest neighbor search
- chunk metadata: do not store default values
- support maps without values (just existence of the key) - support maps without values (just existence of the key)
- support maps without keys (counted b-tree features) - support maps without keys (counted b-tree features)
- use a small object cache (StringCache), test on Android - use a small object cache (StringCache), test on Android
...@@ -88,19 +85,19 @@ MVStore: ...@@ -88,19 +85,19 @@ MVStore:
- StreamStore optimization: avoid copying bytes - StreamStore optimization: avoid copying bytes
- unlimited transaction size - unlimited transaction size
- MVStoreTool.shrink to shrink a store (create, copy, rename, delete) - MVStoreTool.shrink to shrink a store (create, copy, rename, delete)
and for MVStore on Windows, auto-detect renamed file
- ensure data is overwritten eventually if the system doesn't have a timer - ensure data is overwritten eventually if the system doesn't have a timer
- SSD-friendly write (always in blocks of 4 MB / 1 second?) - SSD-friendly write (always in blocks of 4 MB / 1 second?)
- close the file on out of memory or disk write error (out of disk space or so) - close the file on out of memory or disk write error (out of disk space or so)
- implement a sharded map (in one store, multiple stores) - implement a sharded map (in one store, multiple stores)
- implement a sharded map (in one store, multiple stores) to support concurrent updates and writes, and very large maps
- implement an off-heap file system - implement an off-heap file system
- remove change cursor, or add support for writing to branches - remove change cursor, or add support for writing to branches
- support pluggable logging or remove log - support pluggable logging or remove log
- maybe add an optional finalizer and exit hook - maybe add an optional finalizer and exit hook
to store committed changes to store committed changes
- to save space when persisting very small transactions, - to save space when persisting very small transactions,
- to save space when persisting very small transactions, use a transaction log where only the deltas are stored
- serialization for lists, sets, sets, sorted sets, maps, sorted maps - serialization for lists, sets, sets, sorted sets, maps, sorted maps
- maybe rename 'rollback' to 'revert' to distinguish from transactions - maybe rename 'rollback' to 'revert' to distinguish from transactions
- support other compression algorithms (deflate, LZ4,...) - support other compression algorithms (deflate, LZ4,...)
...@@ -110,6 +107,7 @@ MVStore: ...@@ -110,6 +107,7 @@ MVStore:
- autocommit (to avoid having to call commit, - autocommit (to avoid having to call commit,
as it could be called too often or it is easily forgotten) as it could be called too often or it is easily forgotten)
- remove features that are not really needed; simplify the code - remove features that are not really needed; simplify the code
possibly using a layering / tool mechanism
- rename "store" to "save", as store collides with storeVersion - rename "store" to "save", as store collides with storeVersion
*/ */
...@@ -914,15 +912,13 @@ public class MVStore { ...@@ -914,15 +912,13 @@ public class MVStore {
ArrayList<MVMap<?, ?>> list = New.arrayList(maps.values()); ArrayList<MVMap<?, ?>> list = New.arrayList(maps.values());
ArrayList<MVMap<?, ?>> changed = New.arrayList(); ArrayList<MVMap<?, ?>> changed = New.arrayList();
for (MVMap<?, ?> m : list) { for (MVMap<?, ?> m : list) {
if (m != meta) { m.setWriteVersion(version);
m.setWriteVersion(version); long v = m.getVersion();
long v = m.getVersion(); if (v >= 0 && v >= lastStoredVersion) {
if (v >= 0 && m.getVersion() >= lastStoredVersion) { m.waitUntilWritten(storeVersion);
MVMap<?, ?> r = m.openVersion(storeVersion); MVMap<?, ?> r = m.openVersion(storeVersion);
r.waitUntilWritten(r.getRoot()); if (r.getRoot().getPos() == 0) {
if (r.getRoot().getPos() == 0) { changed.add(r);
changed.add(r);
}
} }
} }
} }
...@@ -1359,10 +1355,12 @@ public class MVStore { ...@@ -1359,10 +1355,12 @@ public class MVStore {
unsavedPageCount = Math.max(0, unsavedPageCount - 1); unsavedPageCount = Math.max(0, unsavedPageCount - 1);
return; return;
} }
// this could result in a cache miss
// if the operation is rolled back, // This could result in a cache miss if the operation is rolled back,
// but we don't optimize for rollback // but we don't optimize for rollback.
// We could also keep the page in the cache, as somebody could read it.
cache.remove(pos); cache.remove(pos);
Chunk c = getChunk(pos); Chunk c = getChunk(pos);
long version = currentVersion; long version = currentVersion;
if (map == meta && currentStoreVersion >= 0) { if (map == meta && currentStoreVersion >= 0) {
......
...@@ -43,9 +43,13 @@ public class TransactionStore { ...@@ -43,9 +43,13 @@ public class TransactionStore {
final MVMap<Long, Object[]> preparedTransactions; final MVMap<Long, Object[]> preparedTransactions;
/** /**
* The undo log. * The undo log.
* If the first entry for a transaction doesn't have a logId of 0, then * <p>
* the transaction is committing (partially committed). * If the first entry for a transaction doesn't have a logId
* of 0, then the transaction is partially committed (which means rollback
* is not possible). Log entries are written before the data is changed
* (write-ahead).
* <p>
* Key: [ transactionId, logId ], value: [ opType, mapId, key, oldValue ]. * Key: [ transactionId, logId ], value: [ opType, mapId, key, oldValue ].
*/ */
final MVMap<long[], Object[]> undoLog; final MVMap<long[], Object[]> undoLog;
...@@ -228,6 +232,19 @@ public class TransactionStore { ...@@ -228,6 +232,19 @@ public class TransactionStore {
} }
} }
/**
* Remove a log entry.
*
* @param t the transaction
* @param logId the log id
*/
public void logUndo(Transaction t, long logId) {
long[] undoKey = { t.getId(), logId };
synchronized (undoLog) {
undoLog.remove(undoKey);
}
}
/** /**
* Commit a transaction. * Commit a transaction.
* *
...@@ -581,6 +598,13 @@ public class TransactionStore { ...@@ -581,6 +598,13 @@ public class TransactionStore {
void log(int opType, int mapId, Object key, Object oldValue) { void log(int opType, int mapId, Object key, Object oldValue) {
store.log(this, logId++, opType, mapId, key, oldValue); store.log(this, logId++, opType, mapId, key, oldValue);
} }
/**
* Remove the last log entry.
*/
void logUndo() {
store.logUndo(this, --logId);
}
/** /**
* Open a data map. * Open a data map.
...@@ -867,7 +891,7 @@ public class TransactionStore { ...@@ -867,7 +891,7 @@ public class TransactionStore {
* @param value the new value (null to remove the value) * @param value the new value (null to remove the value)
* @param onlyIfUnchanged only set the value if it was not changed (by * @param onlyIfUnchanged only set the value if it was not changed (by
* this or another transaction) since the map was opened * this or another transaction) since the map was opened
* @return true if the value was set * @return true if the value was set, false if there was a concurrent update
*/ */
public boolean trySet(K key, V value, boolean onlyIfUnchanged) { public boolean trySet(K key, V value, boolean onlyIfUnchanged) {
VersionedValue current = map.get(key); VersionedValue current = map.get(key);
...@@ -913,40 +937,38 @@ public class TransactionStore { ...@@ -913,40 +937,38 @@ public class TransactionStore {
newValue.value = value; newValue.value = value;
if (current == null) { if (current == null) {
// a new value // a new value
transaction.log(opType, mapId, key, current);
int todo;
// either write the log before the data (and handle this case in rollback)
// or ensure/document concurrent commits are not allowed
VersionedValue old = map.putIfAbsent(key, newValue); VersionedValue old = map.putIfAbsent(key, newValue);
if (old == null) { if (old != null) {
transaction.log(opType, mapId, key, current); transaction.logUndo();
return true; return false;
} }
return false; return true;
} }
long tx = current.transactionId; long tx = current.transactionId;
if (tx == transaction.transactionId) { if (tx == transaction.transactionId) {
// added or updated by this transaction // added or updated by this transaction
if (map.replace(key, current, newValue)) { transaction.log(opType, mapId, key, current);
transaction.log(opType, mapId, key, current); if (!map.replace(key, current, newValue)) {
return true; // strange, somebody overwrite the value
// even thought the change was not committed
transaction.logUndo();
return false;
} }
// strange, somebody overwrite the value return true;
// even thought the change was not committed
return false;
} }
// added or updated by another transaction // added or updated by another transaction
boolean open = transaction.store.isTransactionOpen(tx); boolean open = transaction.store.isTransactionOpen(tx);
if (!open) { if (!open) {
transaction.log(opType, mapId, key, current);
// the transaction is committed: // the transaction is committed:
// overwrite the value // overwrite the value
if (map.replace(key, current, newValue)) { if (!map.replace(key, current, newValue)) {
transaction.log(opType, mapId, key, current); // somebody else was faster
return true; transaction.logUndo();
return false;
} }
// somebody else was faster return true;
return false;
} }
// the transaction is not yet committed // the transaction is not yet committed
return false; return false;
......
...@@ -45,7 +45,7 @@ public class TestTransactionStore extends TestBase { ...@@ -45,7 +45,7 @@ public class TestTransactionStore extends TestBase {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
// testStopWhileCommitting(); testStopWhileCommitting();
testGetModifiedMaps(); testGetModifiedMaps();
testKeyIterator(); testKeyIterator();
testMultiStatement(); testMultiStatement();
...@@ -60,12 +60,7 @@ public class TestTransactionStore extends TestBase { ...@@ -60,12 +60,7 @@ public class TestTransactionStore extends TestBase {
String fileName = getBaseDir() + "/testStopWhileCommitting.h3"; String fileName = getBaseDir() + "/testStopWhileCommitting.h3";
FileUtils.delete(fileName); FileUtils.delete(fileName);
for (int i = 0; i < 100;) { for (int i = 0; i < 10;) {
System.out.println("i:" + i);
// this.printTime("i:" + i);
// for (int i = 0; i < 10;) {
MVStore s; MVStore s;
TransactionStore ts; TransactionStore ts;
Transaction tx; Transaction tx;
...@@ -87,7 +82,7 @@ public class TestTransactionStore extends TestBase { ...@@ -87,7 +82,7 @@ public class TestTransactionStore extends TestBase {
@Override @Override
public void call() throws Exception { public void call() throws Exception {
for (int i = 0; state.get() < Integer.MAX_VALUE; i++) { for (int i = 0; !stop; i++) {
state.set(i); state.set(i);
other.put(i, value); other.put(i, value);
store.store(); store.store();
...@@ -101,8 +96,6 @@ public class TestTransactionStore extends TestBase { ...@@ -101,8 +96,6 @@ public class TestTransactionStore extends TestBase {
} }
// commit while writing in the task // commit while writing in the task
tx.commit(); tx.commit();
// stop writing
state.set(Integer.MAX_VALUE);
// wait for the task to stop // wait for the task to stop
task.get(); task.get();
store.close(); store.close();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论