提交 e8f0a660 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: various changes (store temporary, store committed and rollback on close)

上级 cfb047bd
......@@ -72,7 +72,7 @@ public class Chunk {
long version;
/**
* When this chunk was created, in seconds after the store was created.
* When this chunk was created, in milliseconds after the store was created.
*/
long time;
......
......@@ -48,8 +48,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private boolean closed;
private boolean readOnly;
/**
* This flag is set during a write operation to the tree.
*/
private volatile boolean writing;
private volatile int writeCount;
protected MVMap(DataType keyType, DataType valueType) {
this.keyType = keyType;
......@@ -918,26 +920,21 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
/**
* This method is called after writing to the map.
* This method is called after writing to the map (whether or not the write
* operation was successful).
*/
protected void afterWrite() {
writeCount++;
writing = false;
}
void waitUntilWritten(long version) {
if (root.getVersion() < version) {
// a write will create a new version
return;
}
// wait until writing is done,
// but only for the current write operation
// a bit like a spin lock
int w = writeCount;
while (writing) {
if (writeCount > w) {
return;
}
/**
* If there is a concurrent update to the given version, wait until it is
* finished.
*
* @param root the root page
*/
protected void waitUntilWritten(Page root) {
while (writing && root == this.root) {
Thread.yield();
}
}
......@@ -967,7 +964,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
/**
* Remove the given page (make the space available).
*
* @param p the page
* @param pos the position of the page to remove
*/
protected void removePage(long pos) {
store.removePage(this, pos);
......
......@@ -52,7 +52,7 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> {
}
}
void waitUntilWritten(long version) {
protected void waitUntilWritten(Page root) {
// no need to wait
}
......
......@@ -43,16 +43,16 @@ H:3,...
TODO:
- getTime: use milliseconds, not seconds (no division, finer granurality)
- naming: hasUnsavedChanges() versus store(): hasUnstoredChanges?
- test rollback of meta table: it is changed after save; could rollback be a problem?
- async store: write test cases; should fail at freedChunks
- async store of current root is illegal, except with MVMapConcurrent
- auto-store needs to be reverted on startup
- auto-store synchronously if too many unstored pages (8 MB)
- auto-store in background thread after 1-2 second by default
- auto-store in background thread if more than 4 MB of unstored pages
- auto-store: use notify to wake up background thread?
- test new write / read algorithm for speed and errors
- detect concurrent writes / reads in the MVMap
- maybe rename store to write
- document how committing, storing, and closing is coupled
- document temporary writes (to avoid out-of-memory)
- store() should probably be store(false), and maybe rename to write
- move setters to the builder, except for setRetainVersion, setReuseSpace,
and settings that are persistent (setStoreVersion)
- update copyright
- test meta table rollback: it is changed after save; could rollback break it?
- automated 'kill process' and 'power failure' test
- mvcc with multiple transactions
- update checkstyle
......@@ -94,6 +94,8 @@ TODO:
- implement an off-heap file system
- remove change cursor, or add support for writing to branches
- support pluggable logging or remove log
- maybe add an optional finalizer and exit hook
to store committed changes
*/
......@@ -116,6 +118,9 @@ public class MVStore {
private static final int FORMAT_WRITE = 1;
private static final int FORMAT_READ = 1;
/**
* Whether the store is closed.
*/
volatile boolean closed;
private final String fileName;
......@@ -172,6 +177,10 @@ public class MVStore {
private final boolean compress;
private long currentVersion;
/**
* The version of the last stored chunk.
*/
private long lastStoredVersion;
private int fileReadCount;
private int fileWriteCount;
......@@ -179,12 +188,23 @@ public class MVStore {
private int maxUnsavedPages;
/**
* The time the store was created, in seconds since 1970.
* The time the store was created, in milliseconds since 1970.
*/
private long creationTime;
private int retentionTime = 45;
private int retentionTime = 45000;
private long lastStoreTime;
/**
* To which version to roll back when opening the store after a crash.
*/
private long lastCommittedVersion;
/**
* The earliest chunk to retain, if any.
*/
private Chunk retainChunk;
private Thread backgroundThread;
/**
......@@ -194,6 +214,11 @@ public class MVStore {
private volatile boolean metaChanged;
/**
* The delay in milliseconds to automatically store changes.
*/
private int writeDelay = 1000;
MVStore(HashMap<String, Object> config) {
String f = (String) config.get("fileName");
if (f != null && !f.startsWith("nio:")) {
......@@ -219,6 +244,8 @@ public class MVStore {
mb = o == null ? 4 : (Integer) o;
int writeBufferSize = mb * 1024 * 1024;
maxUnsavedPages = writeBufferSize / pageSize;
o = config.get("writeDelay");
writeDelay = o == null ? 1000 : (Integer) o;
} else {
cache = null;
filePassword = null;
......@@ -421,9 +448,16 @@ public class MVStore {
}
}
lastStoreTime = getTime();
// if we use auto-save, also start the background thread
if (maxUnsavedPages > 0) {
Writer w = new Writer(this);
String r = meta.get("rollbackOnOpen");
if (r != null) {
long rollback = Long.parseLong(r);
rollbackTo(rollback);
}
this.lastCommittedVersion = currentVersion;
// start the background thread if needed
if (writeDelay > 0) {
int sleep = Math.max(1, writeDelay / 10);
Writer w = new Writer(this, sleep);
Thread t = new Thread(w, "MVStore writer " + fileName);
t.setDaemon(true);
t.start();
......@@ -605,27 +639,44 @@ public class MVStore {
}
/**
* Close the file. Uncommitted changes are ignored, and all open maps are closed.
* Close the file and the store. If there are any committed but unsaved
* changes, they are written to disk first. If any temporary data was
* written but not committed, this is rolled back. All open maps are closed.
*/
public void close() {
close(true);
}
private synchronized void close(boolean shrinkIfPossible) {
private void close(boolean shrinkIfPossible) {
if (closed) {
return;
}
if (!readOnly) {
if (hasUnsavedChanges()) {
rollbackTo(lastCommittedVersion);
store(false);
}
}
closed = true;
if (file == null) {
return;
}
// can not synchronize on this yet, because
// the thread also synchronized on this, which
// could result in a deadlock
if (backgroundThread != null) {
Thread t = backgroundThread;
backgroundThread = null;
t.interrupt();
synchronized (this) {
notify();
}
try {
t.join();
} catch (Exception e) {
// ignore
}
}
synchronized (this) {
try {
if (shrinkIfPossible) {
shrinkFileIfPossible(0);
......@@ -650,6 +701,7 @@ public class MVStore {
file = null;
}
}
}
/**
* Get the chunk for the given position.
......@@ -662,7 +714,7 @@ public class MVStore {
}
/**
* Increment the current version.
* Increment the current version, without committing the changes.
*
* @return the new version
*/
......@@ -670,6 +722,25 @@ public class MVStore {
return ++currentVersion;
}
/**
* Commit the changes. This method marks the changes as committed and
* increments the version.
* <p>
* Unless the write delay is disabled, this method does not write to the
* file. Instead, data is written after the delay, manually by calling the
* store method, when the write buffer is full, or when closing the store.
*
* @return the new version
*/
public long commit() {
if (writeDelay == 0) {
return store(true);
}
long v = ++currentVersion;
lastCommittedVersion = v;
return v;
}
/**
* Commit all changes and persist them to disk. This method does nothing if
* there are no unsaved changes, otherwise it increments the current version
......@@ -680,17 +751,22 @@ public class MVStore {
* @return the new version (incremented if there were changes)
*/
public long store() {
checkOpen();
return store(false);
}
/**
* Store changes.
* Store changes. Changes that are marked as temporary are rolled back after
* a restart.
*
* @param temp whether the changes should be rolled back after opening
* @param temp whether the changes are only temporary (not committed), and
* should be rolled back after a crash
* @return the new version (incremented if there were changes)
*/
private synchronized long store(boolean temp) {
checkOpen();
if (closed) {
return currentVersion;
}
if (currentStoreVersion >= 0) {
// store is possibly called within store, if the meta map changed
return currentVersion;
......@@ -705,10 +781,27 @@ public class MVStore {
if (file == null) {
return version;
}
long time = getTime();
lastStoreTime = time;
if (temp) {
meta.put("rollbackOnOpen", Long.toString(lastCommittedVersion));
// find the oldest chunk to retain
long minVersion = Long.MAX_VALUE;
Chunk minChunk = null;
for (Chunk c : chunks.values()) {
if (c.version < minVersion) {
minVersion = c.version;
minChunk = c;
}
}
retainChunk = minChunk;
} else {
lastCommittedVersion = version;
meta.remove("rollbackOnOpen");
retainChunk = null;
}
// the last chunk was not completely correct in the last store()
// this needs to be updated now (it's better not to update right after
// storing, because that would modify the meta map again)
......@@ -735,7 +828,9 @@ public class MVStore {
if (m != meta) {
long v = m.getVersion();
if (v >= 0 && m.getVersion() >= lastStoredVersion) {
changed.add(m.openVersion(storeVersion));
MVMap<?, ?> r = m.openVersion(storeVersion);
r.waitUntilWritten(r.getRoot());
changed.add(r);
}
}
}
......@@ -747,10 +842,11 @@ public class MVStore {
meta.put("root." + m.getId(), String.valueOf(Integer.MAX_VALUE));
}
}
applyFreedChunks(storeVersion);
ArrayList<Integer> removedChunks = New.arrayList();
// do it twice, because changing the meta table
// could cause a chunk to get empty
// could cause a chunk to become empty
for (int i = 0; i < 2; i++) {
for (Chunk x : chunks.values()) {
if (x.maxLengthLive == 0 && canOverwriteChunk(x, time)) {
......@@ -848,11 +944,18 @@ public class MVStore {
}
private boolean canOverwriteChunk(Chunk c, long time) {
return c.time + retentionTime <= time;
if (c.time + retentionTime > time) {
return false;
}
Chunk r = retainChunk;
if (r != null && c.version > r.version) {
return false;
}
return true;
}
private long getTime() {
return (System.currentTimeMillis() / 1000) - creationTime;
return System.currentTimeMillis() - creationTime;
}
private void applyFreedChunks(long storeVersion) {
......@@ -1238,22 +1341,22 @@ public class MVStore {
}
/**
* How long to retain old, persisted chunks, in seconds. Chunks that are
* older than this many seconds may be overwritten once they contain no live
* data. The default is 45 seconds. It is assumed that a file system and
* hard disk will flush all write buffers within this many seconds. Using a
* lower value might be dangerous, unless the file system and hard disk
* flush the buffers earlier. To manually flush the buffers, use
* How long to retain old, persisted chunks, in milliseconds. Chunks that
* are older may be overwritten once they contain no live data. The default
* is 45000 (45 seconds). It is assumed that a file system and hard disk
* will flush all write buffers within this time. Using a lower value might
* be dangerous, unless the file system and hard disk flush the buffers
* earlier. To manually flush the buffers, use
* <code>MVStore.getFile().force(true)</code>, however please note that
* according to various tests this does not always work as expected.
* <p>
* This setting is not persisted.
*
* @param seconds how many seconds to retain old chunks (0 to overwrite them
* @param ms how many milliseconds to retain old chunks (0 to overwrite them
* as early as possible)
*/
public void setRetentionTime(int seconds) {
this.retentionTime = seconds;
public void setRetentionTime(int ms) {
this.retentionTime = ms;
}
/**
......@@ -1332,7 +1435,7 @@ public class MVStore {
*/
void beforeWrite() {
if (unsavedPageCount > maxUnsavedPages && maxUnsavedPages > 0) {
store();
store(true);
}
}
......@@ -1364,12 +1467,27 @@ public class MVStore {
* Revert to the beginning of the given version. All later changes (stored
* or not) are forgotten. All maps that were created later are closed. A
* rollback to a version before the last stored version is immediately
* persisted.
* persisted. Rollback to version 0 means all data is removed.
*
* @param version the version to revert to
*/
public synchronized void rollbackTo(long version) {
checkOpen();
if (version == 0) {
// special case: remove all data
for (MVMap<?, ?> m : maps.values()) {
m.close();
}
meta.clear();
chunks.clear();
maps.clear();
synchronized (freedChunks) {
freedChunks.clear();
}
currentVersion = version;
metaChanged = false;
return;
}
DataUtils.checkArgument(
isKnownVersion(version),
"Unknown version {0}", version);
......@@ -1422,7 +1540,6 @@ public class MVStore {
}
}
}
// this.lastStoredVersion = version - 1;
this.currentVersion = version;
}
......@@ -1451,6 +1568,15 @@ public class MVStore {
return currentVersion;
}
/**
* Get the last committed version.
*
* @return the version
*/
public long getCommittedVersion() {
return lastCommittedVersion;
}
/**
* Get the number of file write operations since this store was opened.
*
......@@ -1543,38 +1669,48 @@ public class MVStore {
return DataUtils.parseMap(m).get("name");
}
void storeIfNeeded() {
/**
* Store all unsaved changes, if there are any that are committed.
*/
void storeUnsaved() {
if (closed || unsavedPageCount == 0) {
return;
}
if (lastCommittedVersion >= currentVersion) {
return;
}
long time = getTime();
if (time <= lastStoreTime + 1) {
if (time <= lastStoreTime + writeDelay) {
return;
}
store();
store(true);
}
/**
* A background writer to automatically store changes every two seconds.
* A background writer to automatically store changes from time to time.
*/
private static class Writer implements Runnable {
private final MVStore store;
private final int sleep;
Writer(MVStore store) {
Writer(MVStore store, int sleep) {
this.store = store;
this.sleep = sleep;
}
@Override
public void run() {
while (!store.closed) {
store.storeIfNeeded();
synchronized (store) {
try {
Thread.sleep(1000);
store.wait(sleep);
} catch (InterruptedException e) {
// ignore
}
}
store.storeUnsaved();
}
}
}
......@@ -1636,36 +1772,60 @@ public class MVStore {
/**
* Set the read cache size in MB. The default is 16 MB.
*
* @param mb the cache size
* @param mb the cache size in megabytes
* @return this
*/
public Builder cacheSizeMB(int mb) {
public Builder cacheSize(int mb) {
return set("cacheSize", mb);
}
/**
* Set the size of the write buffer in MB. The default is 4 MB. Changes
* are automatically stored if the buffer grows larger than this, and
* after 2 seconds (whichever occurs earlier).
* Compress data before writing using the LZF algorithm. This setting only
* affects writes; it is not necessary to enable compression when reading,
* even if compression was enabled when writing.
*
* @return this
*/
public Builder compressData() {
return set("compress", 1);
}
/**
* Set the size of the write buffer, in MB (for file-based stores).
* Changes are automatically stored if the buffer grows larger than
* this. However, unless the changes are committed later on, they are
* rolled back when opening the store.
* <p>
* To disable automatically storing, set the buffer size to 0.
* The default is 4 MB.
* <p>
* When the value is set to 0 or lower, data is never automatically
* stored.
*
* @param mb the write buffer size
* @param mb the write buffer size, in megabytes
* @return this
*/
public Builder writeBufferSizeMB(int mb) {
public Builder writeBufferSize(int mb) {
return set("writeBufferSize", mb);
}
/**
* Compress data before writing using the LZF algorithm. This setting only
* affects writes; it is not necessary to enable compression when reading,
* even if compression was enabled when writing.
* Set the maximum delay in milliseconds to store committed changes (for
* file-based stores).
* <p>
* The default is 1000, meaning committed changes are stored after at
* most one second.
* <p>
* When the value is set to -1, committed changes are only written when
* calling the store method. When the value is set to 0, committed
* changes are immediately written on a commit, but please note this
* decreases performance and does still not guarantee the disk will
* actually write the data.
*
* @param millis the maximum delay
* @return this
*/
public Builder compressData() {
return set("compress", 1);
public Builder writeDelay(int millis) {
return set("writeDelay", millis);
}
/**
......
......@@ -42,6 +42,8 @@ public class TestMVStore extends TestBase {
FileUtils.deleteRecursive(getBaseDir(), true);
FileUtils.createDirectories(getBaseDir());
testWriteBuffer();
testWriteDelay();
testEncryptedFile();
testFileFormatChange();
testRecreateMap();
......@@ -77,6 +79,107 @@ public class TestMVStore extends TestBase {
testSimple();
}
private void testWriteBuffer() throws IOException {
String fileName = getBaseDir() + "/testAutoStoreBuffer.h3";
FileUtils.delete(fileName);
MVStore s;
MVMap<Integer, byte[]> m;
byte[] data = new byte[1000];
long lastSize = 0;
int len = 1000;
for (int bs = 0; bs <= 1; bs++) {
s = new MVStore.Builder().
fileName(fileName).
writeBufferSize(bs).
open();
m = s.openMap("data");
for (int i = 0; i < len; i++) {
m.put(i, data);
}
long size = s.getFile().size();
assertTrue("last:" + lastSize + " now: " + size, size > lastSize);
lastSize = size;
s.close();
}
s = new MVStore.Builder().
fileName(fileName).
open();
m = s.openMap("data");
assertFalse(m.containsKey(1));
m.put(1, data);
s.commit();
m.put(2, data);
s.close();
s = new MVStore.Builder().
fileName(fileName).
open();
m = s.openMap("data");
assertTrue(m.containsKey(1));
assertFalse(m.containsKey(2));
s.close();
FileUtils.delete(fileName);
}
private void testWriteDelay() throws InterruptedException {
String fileName = getBaseDir() + "/testUndoTempStore.h3";
FileUtils.delete(fileName);
MVStore s;
MVMap<Integer, String> m;
s = new MVStore.Builder().
writeDelay(1).
fileName(fileName).
open();
m = s.openMap("data");
m.put(1, "Hello");
s.store();
long v = s.getCurrentVersion();
m.put(2, "World");
Thread.sleep(5);
// must not store, as nothing has been committed yet
assertEquals(v, s.getCurrentVersion());
s.commit();
m.put(3, "!");
for (int i = 100; i > 0; i--) {
if (s.getCurrentVersion() > v) {
break;
}
if (i < 10) {
fail();
}
Thread.sleep(1);
}
s.close();
s = new MVStore.Builder().
fileName(fileName).
open();
m = s.openMap("data");
assertEquals("Hello", m.get(1));
assertEquals("World", m.get(2));
assertFalse(m.containsKey(3));
String data = new String(new char[1000]).replace((char) 0, 'x');
for (int i = 0; i < 1000; i++) {
m.put(i, data);
}
s.close();
s = new MVStore.Builder().
fileName(fileName).
open();
m = s.openMap("data");
assertEquals("Hello", m.get(1));
assertEquals("World", m.get(2));
assertFalse(m.containsKey(3));
s.close();
FileUtils.delete(fileName);
}
private void testEncryptedFile() {
String fileName = getBaseDir() + "/testEncryptedFile.h3";
FileUtils.delete(fileName);
......@@ -175,6 +278,8 @@ public class TestMVStore extends TestBase {
assertEquals("world", map.getName());
s.rollbackTo(old);
assertEquals("hello", map.getName());
s.rollbackTo(0);
assertTrue(map.isClosed());
s.close();
}
......@@ -211,7 +316,7 @@ public class TestMVStore extends TestBase {
for (int cacheSize = 0; cacheSize <= 6; cacheSize += 4) {
s = new MVStore.Builder().
fileName(fileName).
cacheSizeMB(1 + 3 * cacheSize).open();
cacheSize(1 + 3 * cacheSize).open();
map = s.openMap("test");
for (int i = 0; i < 1024; i += 128) {
for (int j = 0; j < i; j++) {
......@@ -253,11 +358,11 @@ public class TestMVStore extends TestBase {
private void testFileHeader() {
String fileName = getBaseDir() + "/testFileHeader.h3";
MVStore s = openStore(fileName);
long time = System.currentTimeMillis() / 1000;
long time = System.currentTimeMillis();
assertEquals("3", s.getFileHeader().get("H"));
long creationTime = Long.parseLong(s.getFileHeader()
.get("creationTime"));
assertTrue(Math.abs(time - creationTime) < 5);
assertTrue(Math.abs(time - creationTime) < 100);
s.getFileHeader().put("test", "123");
MVMap<Integer, Integer> map = s.openMap("test");
map.put(10, 100);
......@@ -274,6 +379,7 @@ public class TestMVStore extends TestBase {
MVMap<Integer, Integer> map = s.openMap("test");
map.put(10, 100);
FilePath f = FilePath.get(s.getFileName());
s.store();
s.close();
int blockSize = 4 * 1024;
// test corrupt file headers
......@@ -299,6 +405,7 @@ public class TestMVStore extends TestBase {
// header should be used
s = openStore(fileName);
map = s.openMap("test");
assertEquals(100, map.get(10).intValue());
s.close();
} else {
// both headers are corrupt
......@@ -710,11 +817,11 @@ public class TestMVStore extends TestBase {
FileUtils.delete(fileName);
MVMap<String, String> meta;
MVStore s = openStore(fileName);
assertEquals(45, s.getRetentionTime());
assertEquals(45000, s.getRetentionTime());
s.setRetentionTime(0);
assertEquals(0, s.getRetentionTime());
s.setRetentionTime(45);
assertEquals(45, s.getRetentionTime());
s.setRetentionTime(45000);
assertEquals(45000, s.getRetentionTime());
assertEquals(0, s.getCurrentVersion());
assertFalse(s.hasUnsavedChanges());
MVMap<String, String> m = s.openMap("data");
......
/*
* Copyright 2004-2011 H2 Group. Multiple-Licensed under the H2 License,
* Version 1.0, and under the Eclipse Public License, Version 1.0
* (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.test.store;
import org.h2.test.TestBase;
/**
* Test using volatile fields to ensure we don't read from a version that is
* concurrently written to.
*/
public class TestSpinLock extends TestBase {
/**
* The version to use for writing.
*/
volatile int writeVersion;
/**
* The current data object.
*/
volatile Data data = new Data(0, null);
/**
* Run just this test.
*
* @param a ignored
*/
public static void main(String... a) throws Exception {
TestBase.createCaller().init().test();
}
@Override
public void test() throws Exception {
final TestSpinLock obj = new TestSpinLock();
Thread t = new Thread() {
public void run() {
while (!isInterrupted()) {
for (int i = 0; i < 10000; i++) {
Data d = obj.copyOnWrite();
obj.data = d;
d.write(i);
d.writing = false;
}
}
}
};
t.start();
try {
for (int i = 0; i < 100000; i++) {
Data d = obj.getImmutable();
int z = d.x + d.y;
if (z != 0) {
String error = i + " result: " + z + " now: " + d.x + " "
+ d.y;
System.out.println(error);
throw new Exception(error);
}
}
} finally {
t.interrupt();
t.join();
}
}
/**
* Clone the data object if necessary (if the write version is newer than
* the current version).
*
* @return the data object
*/
Data copyOnWrite() {
Data d = data;
d.writing = true;
int w = writeVersion;
if (w <= data.version) {
return d;
}
Data d2 = new Data(w, data);
d2.writing = true;
d.writing = false;
return d2;
}
/**
* Get an immutable copy of the data object.
*
* @return the immutable object
*/
private Data getImmutable() {
Data d = data;
++writeVersion;
// wait until writing is done,
// but only for the current write operation:
// a bit like a spin lock
while (d.writing) {
// Thread.yield() is not required, specially
// if there are multiple cores
// but getImmutable() doesn't
// need to be that fast actually
Thread.yield();
}
return d;
}
/**
* The data class - represents the root page.
*/
static class Data {
/**
* The version.
*/
final int version;
/**
* The values.
*/
int x, y;
/**
* Whether a write operation is in progress.
*/
volatile boolean writing;
/**
* Create a copy of the data.
*
* @param version the new version
* @param old the old data or null
*/
Data(int version, Data old) {
this.version = version;
if (old != null) {
this.x = old.x;
this.y = old.y;
}
}
/**
* Write to the fields in an unsynchronized way.
*
* @param value the new value
*/
void write(int value) {
this.x = value;
this.y = -value;
}
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论