提交 ecaf3d67 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: atomic operations

上级 d3cde13e
...@@ -24,8 +24,8 @@ MVStore ...@@ -24,8 +24,8 @@ MVStore
Store Builder</a><br /> Store Builder</a><br />
<a href="#r_tree"> <a href="#r_tree">
R-Tree</a><br /> R-Tree</a><br />
<a href="#features"> <a href="#features">
Features</a><br /> Features</a><br />
<a href="#differences"> <a href="#differences">
...@@ -283,14 +283,14 @@ The plan is to add such a mechanism later when needed. ...@@ -283,14 +283,14 @@ The plan is to add such a mechanism later when needed.
<h3>Log Structured Storage</h3> <h3>Log Structured Storage</h3>
<p> <p>
Changes are buffered in memory, and once enough changes have accumulated, Changes are buffered in memory, and once enough changes have accumulated,
they are written in one continuous disk write operation. they are written in one continuous disk write operation.
(According to a test, write throughput of a common SSD gets higher the larger the block size, (According to a test, write throughput of a common SSD gets higher the larger the block size,
until a block size of 2 MB, and then does not further increase.) until a block size of 2 MB, and then does not further increase.)
By default, committed changes are automatically written once every second By default, committed changes are automatically written once every second
in a background thread, even if only little data was changed. in a background thread, even if only little data was changed.
Changes can also be written explicitly by calling <code>store()</code>. Changes can also be written explicitly by calling <code>store()</code>.
To avoid out of memory, uncommitted changes are also written when needed, To avoid out of memory, uncommitted changes are also written when needed,
however they are rolled back when closing the store, however they are rolled back when closing the store,
or at the latest (when the store was not correctly closed) when opening the store. or at the latest (when the store was not correctly closed) when opening the store.
</p><p> </p><p>
......
...@@ -605,7 +605,7 @@ public class DataUtils { ...@@ -605,7 +605,7 @@ public class DataUtils {
String message) { String message) {
return new UnsupportedOperationException(message + " " + getVersion()); return new UnsupportedOperationException(message + " " + getVersion());
} }
/** /**
* Create a new ConcurrentModificationException. * Create a new ConcurrentModificationException.
* *
......
...@@ -582,12 +582,21 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -582,12 +582,21 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/ */
public synchronized boolean remove(Object key, Object value) { public synchronized boolean remove(Object key, Object value) {
V old = get(key); V old = get(key);
if (old.equals(value)) { if (equalsValue(old, value)) {
remove(key); remove(key);
return true; return true;
} }
return false; return false;
} }
private boolean equalsValue(Object a, Object b) {
if (a == b) {
return true;
} else if (a == null || b == null) {
return false;
}
return valueType.compare(a, b) == 0;
}
/** /**
* Replace a value for an existing key, if the value matches. * Replace a value for an existing key, if the value matches.
...@@ -599,7 +608,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -599,7 +608,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/ */
public synchronized boolean replace(K key, V oldValue, V newValue) { public synchronized boolean replace(K key, V oldValue, V newValue) {
V old = get(key); V old = get(key);
if (old.equals(oldValue)) { if (equalsValue(old, oldValue)) {
put(key, newValue); put(key, newValue);
return true; return true;
} }
...@@ -611,7 +620,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -611,7 +620,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* *
* @param key the key (may not be null) * @param key the key (may not be null)
* @param value the new value * @param value the new value
* @return true if the value was replaced * @return the old value, if the value was replaced, or null
*/ */
public synchronized V replace(K key, V value) { public synchronized V replace(K key, V value) {
V old = get(key); V old = get(key);
...@@ -921,7 +930,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -921,7 +930,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
writing = true; writing = true;
store.beforeWrite(); store.beforeWrite();
} }
/** /**
* Check that no write operation is in progress. * Check that no write operation is in progress.
*/ */
......
...@@ -31,7 +31,7 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> { ...@@ -31,7 +31,7 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> {
protected Page copyOnWrite(Page p, long writeVersion) { protected Page copyOnWrite(Page p, long writeVersion) {
return p.copy(writeVersion); return p.copy(writeVersion);
} }
protected void checkConcurrentWrite() { protected void checkConcurrentWrite() {
// ignore (writes are synchronized) // ignore (writes are synchronized)
} }
......
...@@ -49,7 +49,7 @@ TODO: ...@@ -49,7 +49,7 @@ TODO:
- additional test async write / read algorithm for speed and errors - additional test async write / read algorithm for speed and errors
- move setters to the builder, except for setRetainVersion, setReuseSpace, - move setters to the builder, except for setRetainVersion, setReuseSpace,
and settings that are persistent (setStoreVersion) and settings that are persistent (setStoreVersion)
- test & document meta table rollback: it is changed after save; could rollback break it? - test meta rollback: it is changed after save; could rollback break it?
- automated 'kill process' and 'power failure' test - automated 'kill process' and 'power failure' test
- update checkstyle - update checkstyle
- maybe split database into multiple files, to speed up compact - maybe split database into multiple files, to speed up compact
...@@ -92,8 +92,9 @@ TODO: ...@@ -92,8 +92,9 @@ TODO:
- support pluggable logging or remove log - support pluggable logging or remove log
- maybe add an optional finalizer and exit hook - maybe add an optional finalizer and exit hook
to store committed changes to store committed changes
- to save space when persisting very small transactions, - to save space when persisting very small transactions,
-- use a transaction log where only the deltas are stored -- use a transaction log where only the deltas are stored
- serialization for lists, sets, sets, sorted sets, maps, sorted maps
*/ */
...@@ -1680,7 +1681,7 @@ public class MVStore { ...@@ -1680,7 +1681,7 @@ public class MVStore {
if (closed || unsavedPageCount == 0) { if (closed || unsavedPageCount == 0) {
return; return;
} }
// could also store when there are many unstored pages, // could also store when there are many unsaved pages,
// but according to a test it doesn't really help // but according to a test it doesn't really help
if (lastCommittedVersion >= currentVersion) { if (lastCommittedVersion >= currentVersion) {
return; return;
......
...@@ -16,19 +16,23 @@ public class StringDataType implements DataType { ...@@ -16,19 +16,23 @@ public class StringDataType implements DataType {
public static final StringDataType INSTANCE = new StringDataType(); public static final StringDataType INSTANCE = new StringDataType();
@Override
public int compare(Object a, Object b) { public int compare(Object a, Object b) {
return a.toString().compareTo(b.toString()); return a.toString().compareTo(b.toString());
} }
@Override
public int getMemory(Object obj) { public int getMemory(Object obj) {
return 24 + 2 * obj.toString().length(); return 24 + 2 * obj.toString().length();
} }
@Override
public String read(ByteBuffer buff) { public String read(ByteBuffer buff) {
int len = DataUtils.readVarInt(buff); int len = DataUtils.readVarInt(buff);
return DataUtils.readString(buff, len); return DataUtils.readString(buff, len);
} }
@Override
public ByteBuffer write(ByteBuffer buff, Object obj) { public ByteBuffer write(ByteBuffer buff, Object obj) {
String s = obj.toString(); String s = obj.toString();
int len = s.length(); int len = s.length();
......
...@@ -220,8 +220,9 @@ public class TestConcurrent extends TestMVStore { ...@@ -220,8 +220,9 @@ public class TestConcurrent extends TestMVStore {
// in most cases, it should be detected // in most cases, it should be detected
assertTrue(notDetected.get() * 10 <= detected.get()); assertTrue(notDetected.get() * 10 <= detected.get());
} }
private void testConcurrentWrite(final AtomicInteger detected, final AtomicInteger notDetected) throws InterruptedException { private void testConcurrentWrite(final AtomicInteger detected,
final AtomicInteger notDetected) throws InterruptedException {
final MVStore s = openStore(null); final MVStore s = openStore(null);
final MVMap<Integer, Integer> m = s.openMap("data"); final MVMap<Integer, Integer> m = s.openMap("data");
final int size = 20; final int size = 20;
......
...@@ -42,6 +42,7 @@ public class TestMVStore extends TestBase { ...@@ -42,6 +42,7 @@ public class TestMVStore extends TestBase {
FileUtils.deleteRecursive(getBaseDir(), true); FileUtils.deleteRecursive(getBaseDir(), true);
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
testAtomicOperations();
testWriteBuffer(); testWriteBuffer();
testWriteDelay(); testWriteDelay();
testEncryptedFile(); testEncryptedFile();
...@@ -78,6 +79,41 @@ public class TestMVStore extends TestBase { ...@@ -78,6 +79,41 @@ public class TestMVStore extends TestBase {
testCloseTwice(); testCloseTwice();
testSimple(); testSimple();
} }
private void testAtomicOperations() {
String fileName = getBaseDir() + "/testAtomicOperations.h3";
FileUtils.delete(fileName);
MVStore s;
MVMap<Integer, byte[]> m;
s = new MVStore.Builder().
fileName(fileName).
open();
m = s.openMap("data");
// putIfAbsent
assertNull(m.putIfAbsent(1, new byte[1]));
assertEquals(1, m.putIfAbsent(1, new byte[2]).length);
assertEquals(1, m.get(1).length);
// replace
assertNull(m.replace(2, new byte[2]));
assertNull(m.get(2));
assertEquals(1, m.replace(1, new byte[2]).length);
assertEquals(2, m.replace(1, new byte[3]).length);
assertEquals(3, m.replace(1, new byte[1]).length);
// replace with oldValue
assertFalse(m.replace(1, new byte[2], new byte[10]));
assertTrue(m.replace(1, new byte[1], new byte[2]));
assertTrue(m.replace(1, new byte[2], new byte[1]));
// remove
assertFalse(m.remove(1, new byte[2]));
assertTrue(m.remove(1, new byte[1]));
s.close();
FileUtils.delete(fileName);
}
private void testWriteBuffer() throws IOException { private void testWriteBuffer() throws IOException {
String fileName = getBaseDir() + "/testAutoStoreBuffer.h3"; String fileName = getBaseDir() + "/testAutoStoreBuffer.h3";
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论