提交 3c8e6bb4 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: try to detect concurrent modification

上级 3610b832
...@@ -265,6 +265,7 @@ The page cache is a concurrent LIRS cache, which should be resistant against sca ...@@ -265,6 +265,7 @@ The page cache is a concurrent LIRS cache, which should be resistant against sca
</p><p> </p><p>
The default map implementation does not support concurrent modification The default map implementation does not support concurrent modification
operations on a map (the same as <code>HashMap</code> and <code>TreeMap</code>). operations on a map (the same as <code>HashMap</code> and <code>TreeMap</code>).
Similar to those classes, the map tries to detect concurrent modification.
</p><p> </p><p>
With the <code>MVMapConcurrent</code> implementation, With the <code>MVMapConcurrent</code> implementation,
read operations even on the newest version can happen concurrently with all other read operations even on the newest version can happen concurrently with all other
...@@ -396,6 +397,7 @@ The following exceptions can occur: ...@@ -396,6 +397,7 @@ The following exceptions can occur:
</li><li><code>IllegalArgumentException</code> if a method was called with an illegal argument. </li><li><code>IllegalArgumentException</code> if a method was called with an illegal argument.
</li><li><code>UnsupportedOperationException</code> if a method was called that is not supported, </li><li><code>UnsupportedOperationException</code> if a method was called that is not supported,
for example trying to modify a read-only map or view. for example trying to modify a read-only map or view.
</li><li><code>ConcurrentModificationException</code> if the object is modified concurrently.
</li></ul> </li></ul>
<h2 id="differences">Similar Projects and Differences to Other Storage Engines</h2> <h2 id="differences">Similar Projects and Differences to Other Storage Engines</h2>
......
...@@ -14,6 +14,7 @@ import java.nio.channels.FileChannel; ...@@ -14,6 +14,7 @@ import java.nio.channels.FileChannel;
import java.text.MessageFormat; import java.text.MessageFormat;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap; import java.util.HashMap;
import org.h2.engine.Constants; import org.h2.engine.Constants;
import org.h2.util.New; import org.h2.util.New;
...@@ -590,7 +591,7 @@ public class DataUtils { ...@@ -590,7 +591,7 @@ public class DataUtils {
public static IllegalArgumentException newIllegalArgumentException( public static IllegalArgumentException newIllegalArgumentException(
String message, Object... arguments) { String message, Object... arguments) {
return initCause(new IllegalArgumentException( return initCause(new IllegalArgumentException(
MessageFormat.format(message, arguments) + getVersion()), MessageFormat.format(message, arguments) + " " + getVersion()),
arguments); arguments);
} }
...@@ -602,7 +603,16 @@ public class DataUtils { ...@@ -602,7 +603,16 @@ public class DataUtils {
*/ */
public static UnsupportedOperationException newUnsupportedOperationException( public static UnsupportedOperationException newUnsupportedOperationException(
String message) { String message) {
return new UnsupportedOperationException(message + getVersion()); return new UnsupportedOperationException(message + " " + getVersion());
}
/**
* Create a new ConcurrentModificationException.
*
* @return the exception
*/
public static ConcurrentModificationException newConcurrentModificationException() {
return new ConcurrentModificationException(getVersion());
} }
/** /**
...@@ -615,7 +625,7 @@ public class DataUtils { ...@@ -615,7 +625,7 @@ public class DataUtils {
public static IllegalStateException newIllegalStateException( public static IllegalStateException newIllegalStateException(
String message, Object... arguments) { String message, Object... arguments) {
return initCause(new IllegalStateException( return initCause(new IllegalStateException(
MessageFormat.format(message, arguments) + getVersion()), MessageFormat.format(message, arguments) + " " + getVersion()),
arguments); arguments);
} }
...@@ -631,7 +641,7 @@ public class DataUtils { ...@@ -631,7 +641,7 @@ public class DataUtils {
} }
private static String getVersion() { private static String getVersion() {
return " [" + Constants.VERSION_MAJOR + "." + return "[" + Constants.VERSION_MAJOR + "." +
Constants.VERSION_MINOR + "." + Constants.BUILD_ID + "]"; Constants.VERSION_MINOR + "." + Constants.BUILD_ID + "]";
} }
......
...@@ -49,7 +49,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -49,7 +49,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private boolean readOnly; private boolean readOnly;
/** /**
* This flag is set during a write operation to the tree. * This flag is set during a write operation.
*/ */
private volatile boolean writing; private volatile boolean writing;
...@@ -905,9 +905,11 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -905,9 +905,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
/** /**
* This method is called before writing to the map. The default * This method is called before writing to the map. The default
* implementation checks whether writing is allowed. * implementation checks whether writing is allowed, and tries
* to detect concurrent modification.
* *
* @throws UnsupportedOperationException if the map is read-only * @throws UnsupportedOperationException if the map is read-only,
* or if another thread is concurrently writing
*/ */
protected void beforeWrite() { protected void beforeWrite() {
if (readOnly) { if (readOnly) {
...@@ -915,9 +917,21 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -915,9 +917,21 @@ public class MVMap<K, V> extends AbstractMap<K, V>
throw DataUtils.newUnsupportedOperationException( throw DataUtils.newUnsupportedOperationException(
"This map is read-only"); "This map is read-only");
} }
checkConcurrentWrite();
writing = true; writing = true;
store.beforeWrite(); store.beforeWrite();
} }
/**
* Check that no write operation is in progress.
*/
protected void checkConcurrentWrite() {
if (writing) {
// try to detect concurrent modification
// on a best-effort basis
throw DataUtils.newConcurrentModificationException();
}
}
/** /**
* This method is called after writing to the map (whether or not the write * This method is called after writing to the map (whether or not the write
......
...@@ -31,6 +31,10 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> { ...@@ -31,6 +31,10 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> {
protected Page copyOnWrite(Page p, long writeVersion) { protected Page copyOnWrite(Page p, long writeVersion) {
return p.copy(writeVersion); return p.copy(writeVersion);
} }
protected void checkConcurrentWrite() {
// ignore (writes are synchronized)
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public V put(K key, V value) { public V put(K key, V value) {
......
...@@ -44,18 +44,13 @@ H:3,... ...@@ -44,18 +44,13 @@ H:3,...
TODO: TODO:
- rolling docs review: at convert "Features" to top-level (linked) entries - rolling docs review: at convert "Features" to top-level (linked) entries
- background thread: async store when the write buffer is almost full
- test new write / read algorithm for speed and errors - mvcc with multiple transactions
- detect concurrent writes / reads in the MVMap - additional test async write / read algorithm for speed and errors
- maybe rename store to write
- document how committing, storing, and closing is coupled
- document temporary writes (to avoid out-of-memory)
- store() should probably be store(false), and maybe rename to write
- move setters to the builder, except for setRetainVersion, setReuseSpace, - move setters to the builder, except for setRetainVersion, setReuseSpace,
and settings that are persistent (setStoreVersion) and settings that are persistent (setStoreVersion)
- test meta table rollback: it is changed after save; could rollback break it? - test & document meta table rollback: it is changed after save; could rollback break it?
- automated 'kill process' and 'power failure' test - automated 'kill process' and 'power failure' test
- mvcc with multiple transactions
- update checkstyle - update checkstyle
- maybe split database into multiple files, to speed up compact - maybe split database into multiple files, to speed up compact
- auto-compact from time to time and on close - auto-compact from time to time and on close
...@@ -786,7 +781,6 @@ public class MVStore { ...@@ -786,7 +781,6 @@ public class MVStore {
} }
long time = getTime(); long time = getTime();
lastStoreTime = time; lastStoreTime = time;
if (temp) { if (temp) {
meta.put("rollbackOnOpen", Long.toString(lastCommittedVersion)); meta.put("rollbackOnOpen", Long.toString(lastCommittedVersion));
// find the oldest chunk to retain // find the oldest chunk to retain
...@@ -896,6 +890,9 @@ public class MVStore { ...@@ -896,6 +890,9 @@ public class MVStore {
int chunkLength = buff.position(); int chunkLength = buff.position();
int length = MathUtils.roundUpInt(chunkLength, BLOCK_SIZE) + BLOCK_SIZE; int length = MathUtils.roundUpInt(chunkLength, BLOCK_SIZE) + BLOCK_SIZE;
if (length > buff.capacity()) {
buff = DataUtils.ensureCapacity(buff, length - buff.capacity());
}
buff.limit(length); buff.limit(length);
long fileLength = getFileLengthUsed(); long fileLength = getFileLengthUsed();
...@@ -1437,6 +1434,10 @@ public class MVStore { ...@@ -1437,6 +1434,10 @@ public class MVStore {
* This method is called before writing to a map. * This method is called before writing to a map.
*/ */
void beforeWrite() { void beforeWrite() {
if (currentStoreVersion >= 0) {
// store is possibly called within store, if the meta map changed
return;
}
if (unsavedPageCount > maxUnsavedPages && maxUnsavedPages > 0) { if (unsavedPageCount > maxUnsavedPages && maxUnsavedPages > 0) {
store(true); store(true);
} }
...@@ -1675,10 +1676,12 @@ public class MVStore { ...@@ -1675,10 +1676,12 @@ public class MVStore {
/** /**
* Store all unsaved changes, if there are any that are committed. * Store all unsaved changes, if there are any that are committed.
*/ */
void storeUnsaved() { void storeInBackground() {
if (closed || unsavedPageCount == 0) { if (closed || unsavedPageCount == 0) {
return; return;
} }
// could also store when there are many unstored pages,
// but according to a test it doesn't really help
if (lastCommittedVersion >= currentVersion) { if (lastCommittedVersion >= currentVersion) {
return; return;
} }
...@@ -1712,7 +1715,7 @@ public class MVStore { ...@@ -1712,7 +1715,7 @@ public class MVStore {
// ignore // ignore
} }
} }
store.storeUnsaved(); store.storeInBackground();
} }
} }
......
...@@ -764,6 +764,7 @@ public class Page { ...@@ -764,6 +764,7 @@ public class Page {
* @return the target buffer * @return the target buffer
*/ */
private ByteBuffer write(Chunk chunk, ByteBuffer buff) { private ByteBuffer write(Chunk chunk, ByteBuffer buff) {
buff = DataUtils.ensureCapacity(buff, 1024);
int start = buff.position(); int start = buff.position();
buff.putInt(0); buff.putInt(0);
buff.putShort((byte) 0); buff.putShort((byte) 0);
......
...@@ -11,9 +11,12 @@ import java.io.FileOutputStream; ...@@ -11,9 +11,12 @@ import java.io.FileOutputStream;
import java.io.InputStream; import java.io.InputStream;
import java.nio.channels.Channels; import java.nio.channels.Channels;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.ConcurrentModificationException;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.mvstore.MVMap; import org.h2.mvstore.MVMap;
import org.h2.mvstore.MVMapConcurrent; import org.h2.mvstore.MVMapConcurrent;
import org.h2.mvstore.MVStore; import org.h2.mvstore.MVStore;
...@@ -209,6 +212,16 @@ public class TestConcurrent extends TestMVStore { ...@@ -209,6 +212,16 @@ public class TestConcurrent extends TestMVStore {
* map, so that keys and values may become null. * map, so that keys and values may become null.
*/ */
private void testConcurrentWrite() throws InterruptedException { private void testConcurrentWrite() throws InterruptedException {
final AtomicInteger detected = new AtomicInteger();
final AtomicInteger notDetected = new AtomicInteger();
for (int i = 0; i < 10; i++) {
testConcurrentWrite(detected, notDetected);
}
// in most cases, it should be detected
assertTrue(notDetected.get() * 10 <= detected.get());
}
private void testConcurrentWrite(final AtomicInteger detected, final AtomicInteger notDetected) throws InterruptedException {
final MVStore s = openStore(null); final MVStore s = openStore(null);
final MVMap<Integer, Integer> m = s.openMap("data"); final MVMap<Integer, Integer> m = s.openMap("data");
final int size = 20; final int size = 20;
...@@ -223,14 +236,16 @@ public class TestConcurrent extends TestMVStore { ...@@ -223,14 +236,16 @@ public class TestConcurrent extends TestMVStore {
m.remove(rand.nextInt(size)); m.remove(rand.nextInt(size));
} }
m.get(rand.nextInt(size)); m.get(rand.nextInt(size));
} catch (ConcurrentModificationException e) {
detected.incrementAndGet();
} catch (NegativeArraySizeException e) { } catch (NegativeArraySizeException e) {
// ignore notDetected.incrementAndGet();
} catch (ArrayIndexOutOfBoundsException e) { } catch (ArrayIndexOutOfBoundsException e) {
// ignore notDetected.incrementAndGet();
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// ignore notDetected.incrementAndGet();
} catch (NullPointerException e) { } catch (NullPointerException e) {
// ignore notDetected.incrementAndGet();
} }
} }
} }
...@@ -246,14 +261,16 @@ public class TestConcurrent extends TestMVStore { ...@@ -246,14 +261,16 @@ public class TestConcurrent extends TestMVStore {
m.remove(rand.nextInt(size)); m.remove(rand.nextInt(size));
} }
m.get(rand.nextInt(size)); m.get(rand.nextInt(size));
} catch (ConcurrentModificationException e) {
detected.incrementAndGet();
} catch (NegativeArraySizeException e) { } catch (NegativeArraySizeException e) {
// ignore notDetected.incrementAndGet();
} catch (ArrayIndexOutOfBoundsException e) { } catch (ArrayIndexOutOfBoundsException e) {
// ignore notDetected.incrementAndGet();
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
// ignore notDetected.incrementAndGet();
} catch (NullPointerException e) { } catch (NullPointerException e) {
// ignore notDetected.incrementAndGet();
} }
} }
s.incrementVersion(); s.incrementVersion();
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论