提交 c662be41 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: auto-save (every few MBs and every 1-2 seconds)

上级 5c75744e
......@@ -48,6 +48,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private boolean closed;
private boolean readOnly;
private volatile boolean writing;
private volatile int writeCount;
protected MVMap(DataType keyType, DataType valueType) {
this.keyType = keyType;
this.valueType = valueType;
......@@ -91,13 +94,17 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
@SuppressWarnings("unchecked")
public V put(K key, V value) {
checkWrite();
long writeVersion = store.getCurrentVersion();
Page p = copyOnWrite(root, writeVersion);
p = splitRootIfNeeded(p, writeVersion);
Object result = put(p, writeVersion, key, value);
newRoot(p);
return (V) result;
beforeWrite();
try {
long writeVersion = store.getCurrentVersion();
Page p = copyOnWrite(root, writeVersion);
p = splitRootIfNeeded(p, writeVersion);
Object result = put(p, writeVersion, key, value);
newRoot(p);
return (V) result;
} finally {
afterWrite();
}
}
/**
......@@ -488,9 +495,13 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Remove all entries.
*/
public void clear() {
checkWrite();
root.removeAllRecursive();
newRoot(Page.createEmpty(this, store.getCurrentVersion()));
beforeWrite();
try {
root.removeAllRecursive();
newRoot(Page.createEmpty(this, store.getCurrentVersion()));
} finally {
afterWrite();
}
}
/**
......@@ -498,11 +509,16 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/
public void removeMap() {
checkOpen();
if (this != store.getMetaMap()) {
checkWrite();
if (this == store.getMetaMap()) {
return;
}
beforeWrite();
try {
root.removeAllRecursive();
store.removeMap(id);
close();
} finally {
afterWrite();
}
}
......@@ -527,13 +543,17 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @return the old value if the key existed, or null otherwise
*/
public V remove(Object key) {
checkWrite();
long writeVersion = store.getCurrentVersion();
Page p = copyOnWrite(root, writeVersion);
@SuppressWarnings("unchecked")
V result = (V) remove(p, writeVersion, key);
newRoot(p);
return result;
beforeWrite();
try {
long writeVersion = store.getCurrentVersion();
Page p = copyOnWrite(root, writeVersion);
@SuppressWarnings("unchecked")
V result = (V) remove(p, writeVersion, key);
newRoot(p);
return result;
} finally {
afterWrite();
}
}
/**
......@@ -616,7 +636,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
result = p.getValue(index);
p.remove(index);
if (p.getKeyCount() == 0) {
removePage(p);
removePage(p.getPos());
}
}
return result;
......@@ -638,7 +658,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (p.getKeyCount() == 0) {
p.setChild(index, c);
p.setCounts(index, c);
removePage(p);
removePage(p.getPos());
} else {
p.remove(index);
}
......@@ -673,15 +693,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
}
/**
* Check whether this map has any unsaved changes.
*
* @return true if there are unsaved changes.
*/
public boolean hasUnsavedChanges() {
return !oldRoots.isEmpty();
}
/**
* Compare two keys.
*
......@@ -819,30 +830,34 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param version the version
*/
void rollbackTo(long version) {
checkWrite();
removeUnusedOldVersions();
if (version <= createVersion) {
// the map is removed later
} else if (root.getVersion() >= version) {
// iterating in descending order -
// this is not terribly efficient if there are many versions
ArrayList<Page> list = oldRoots;
while (list.size() > 0) {
int i = list.size() - 1;
Page p = list.get(i);
root = p;
list.remove(i);
if (p.getVersion() < version) {
break;
beforeWrite();
try {
removeUnusedOldVersions();
if (version <= createVersion) {
// the map is removed later
} else if (root.getVersion() >= version) {
// iterating in descending order -
// this is not terribly efficient if there are many versions
ArrayList<Page> list = oldRoots;
while (list.size() > 0) {
int i = list.size() - 1;
Page p = list.get(i);
root = p;
list.remove(i);
if (p.getVersion() < version) {
break;
}
}
}
} finally {
afterWrite();
}
}
/**
* Forget all old versions.
*/
void removeAllOldVersions() {
private void removeAllOldVersions() {
// create a new instance
// because another thread might iterate over it
oldRoots = new ArrayList<Page>();
......@@ -887,16 +902,44 @@ public class MVMap<K, V> extends AbstractMap<K, V>
}
/**
* Check whether writing is allowed.
* This method is called before writing to the map. The default
* implementation checks whether writing is allowed.
*
* @throws IllegalStateException if the map is read-only
* @throws UnsupportedOperationException if the map is read-only
*/
protected void checkWrite() {
protected void beforeWrite() {
if (readOnly) {
checkOpen();
throw DataUtils.newUnsupportedOperationException(
"This map is read-only");
}
writing = true;
store.beforeWrite();
}
/**
* This method is called after writing to the map.
*/
protected void afterWrite() {
writeCount++;
writing = false;
}
void waitUntilWritten(long version) {
if (root.getVersion() < version) {
// a write will create a new version
return;
}
// wait until writing is done,
// but only for the current write operation
// a bit like a spin lock
int w = writeCount;
while (writing) {
if (writeCount > w) {
return;
}
Thread.yield();
}
}
public int hashCode() {
......@@ -926,8 +969,8 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*
* @param p the page
*/
protected void removePage(Page p) {
store.removePage(p.getPos());
protected void removePage(long pos) {
store.removePage(this, pos);
}
/**
......@@ -1051,15 +1094,18 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param newMapName the name name
*/
public void renameMap(String newMapName) {
checkWrite();
store.renameMap(this, newMapName);
beforeWrite();
try {
store.renameMap(this, newMapName);
} finally {
afterWrite();
}
}
public String toString() {
return asString(null);
}
/**
* A builder for maps.
*
......
......@@ -34,35 +34,46 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> {
@SuppressWarnings("unchecked")
public V put(K key, V value) {
checkWrite();
V result = get(key);
if (value.equals(result)) {
return result;
}
long writeVersion = store.getCurrentVersion();
synchronized (this) {
Page p = copyOnWrite(root, writeVersion);
p = splitRootIfNeeded(p, writeVersion);
result = (V) put(p, writeVersion, key, value);
newRoot(p);
beforeWrite();
try {
// even if the result is the same, we still update the value
// (otherwise compact doesn't work)
get(key);
long writeVersion = store.getCurrentVersion();
synchronized (this) {
Page p = copyOnWrite(root, writeVersion);
p = splitRootIfNeeded(p, writeVersion);
V result = (V) put(p, writeVersion, key, value);
newRoot(p);
return result;
}
} finally {
afterWrite();
}
return result;
}
void waitUntilWritten(long version) {
// no need to wait
}
@SuppressWarnings("unchecked")
public V remove(Object key) {
checkWrite();
V result = get(key);
if (result == null) {
return null;
}
long writeVersion = store.getCurrentVersion();
synchronized (this) {
Page p = copyOnWrite(root, writeVersion);
result = (V) remove(p, writeVersion, key);
newRoot(p);
beforeWrite();
try {
V result = get(key);
if (result == null) {
return null;
}
long writeVersion = store.getCurrentVersion();
synchronized (this) {
Page p = copyOnWrite(root, writeVersion);
result = (V) remove(p, writeVersion, key);
newRoot(p);
}
return result;
} finally {
afterWrite();
}
return result;
}
/**
......
......@@ -18,6 +18,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.h2.compress.CompressLZF;
import org.h2.compress.Compressor;
import org.h2.mvstore.cache.CacheLongKeyLIRS;
......@@ -42,6 +43,16 @@ H:3,...
TODO:
- getTime: use milliseconds, not seconds (no division, finer granurality)
- naming: hasUnsavedChanges() versus store(): hasUnstoredChanges?
- test rollback of meta table: it is changed after save; could rollback be a problem?
- async store: write test cases; should fail at freedChunks
- async store of current root is illegal, except with MVMapConcurrent
- auto-store needs to be reverted on startup
- auto-store synchronously if too many unstored pages (8 MB)
- auto-store in background thread after 1-2 second by default
- auto-store in background thread if more than 4 MB of unstored pages
- auto-store: use notify to wake up background thread?
- automated 'kill process' and 'power failure' test
- mvcc with multiple transactions
- update checkstyle
......@@ -64,16 +75,11 @@ TODO:
- store number of write operations per page (maybe defragment
-- if much different than count)
- r-tree: nearest neighbor search
- use FileChannel by default (nio file system), but:
-- an interrupt closes the FileChannel
- auto-save temporary data if it uses too much memory,
-- but revert it on startup if needed.
- chunk metadata: do not store default values
- support maps without values (just existence of the key)
- support maps without keys (counted b-tree features)
- use a small object cache (StringCache), test on Android
- dump values
- tool to import / manipulate CSV files (maybe concurrently)
- map split / merge (fast if no overlap)
- auto-save if there are too many changes (required for StreamStore)
- StreamStore optimization: avoid copying bytes
......@@ -87,8 +93,7 @@ TODO:
-- to support concurrent updates and writes, and very large maps
- implement an off-heap file system
- remove change cursor, or add support for writing to branches
- file encryption: try using multiple threads
- file encryption: add a fast, insecure algorithm
- support pluggable logging or remove log
*/
......@@ -111,6 +116,8 @@ public class MVStore {
private static final int FORMAT_WRITE = 1;
private static final int FORMAT_READ = 1;
volatile boolean closed;
private final String fileName;
private final char[] filePassword;
......@@ -129,22 +136,25 @@ public class MVStore {
private final CacheLongKeyLIRS<Page> cache;
private int lastChunkId;
private final HashMap<Integer, Chunk> chunks = New.hashMap();
/**
* The map of chunks.
*/
private final ConcurrentHashMap<Integer, Chunk> chunks =
new ConcurrentHashMap<Integer, Chunk>();
/**
* The map of temporarily freed entries in the chunks. The key is the
* unsaved version, the value is the map of chunks. The maps of chunks
* contains the number of freed entries per chunk.
* Access is synchronized.
*/
private final HashMap<Long, HashMap<Integer, Chunk>> freedChunks = New.hashMap();
private MVMap<String, String> meta;
private final HashMap<Integer, MVMap<?, ?>> maps = New.hashMap();
private MVMapConcurrent<String, String> meta;
/**
* The set of maps with potentially unsaved changes.
*/
private final HashMap<Integer, MVMap<?, ?>> mapsChanged = New.hashMap();
private final ConcurrentHashMap<Integer, MVMap<?, ?>> maps =
new ConcurrentHashMap<Integer, MVMap<?, ?>>();
private HashMap<String, String> fileHeader = New.hashMap();
......@@ -162,9 +172,11 @@ public class MVStore {
private final boolean compress;
private long currentVersion;
private long lastStoredVersion;
private int fileReadCount;
private int fileWriteCount;
private int unsavedPageCount;
private int maxUnsavedPages;
/**
* The time the store was created, in seconds since 1970.
......@@ -172,24 +184,41 @@ public class MVStore {
private long creationTime;
private int retentionTime = 45;
private boolean closed;
private long lastStoreTime;
private Thread backgroundThread;
/**
* The version of the current store operation (if any).
*/
private long currentStoreVersion = -1;
private volatile boolean metaChanged;
MVStore(HashMap<String, Object> config) {
String f = (String) config.get("fileName");
if (f != null && !f.startsWith("nio:")) {
// nio is used by default
// NIO is used by default
// the following line is to ensure the NIO file system is compiled
FilePathNio.class.getName();
f = "nio:" + f;
}
this.fileName = f;
this.readOnly = "r".equals(config.get("openMode"));
this.compress = "1".equals(config.get("compress"));
this.readOnly = config.containsKey("readOnly");
this.compress = config.containsKey("compress");
if (fileName != null) {
Object s = config.get("cacheSize");
int mb = s == null ? 16 : Integer.parseInt(s.toString());
Object o = config.get("cacheSize");
int mb = o == null ? 16 : (Integer) o;
int maxMemoryBytes = mb * 1024 * 1024;
int averageMemory = pageSize / 2;
int segmentCount = 16;
int stackMoveDistance = maxMemoryBytes / averageMemory * 2 / 100;
cache = new CacheLongKeyLIRS<Page>(
mb * 1024 * 1024, 2048, 16, mb * 1024 * 1024 / 2048 * 2 / 100);
maxMemoryBytes, averageMemory, segmentCount, stackMoveDistance);
filePassword = (char[]) config.get("encrypt");
o = config.get("writeBufferSize");
mb = o == null ? 4 : (Integer) o;
int writeBufferSize = mb * 1024 * 1024;
maxUnsavedPages = writeBufferSize / pageSize;
} else {
cache = null;
filePassword = null;
......@@ -314,7 +343,7 @@ public class MVStore {
private MVMap<String, String> getMetaMap(long version) {
Chunk c = getChunkForVersion(version);
DataUtils.checkArgument(c != null, "Unknown version {}", version);
DataUtils.checkArgument(c != null, "Unknown version {0}", version);
c = readChunkHeader(c.start);
MVMap<String, String> oldMeta = meta.openReadOnly();
oldMeta.setRootPos(c.metaRootPos, version);
......@@ -343,7 +372,6 @@ public class MVStore {
meta.remove("map." + id);
meta.remove("name." + name);
meta.remove("root." + id);
mapsChanged.remove(id);
maps.remove(id);
}
......@@ -353,7 +381,9 @@ public class MVStore {
* @param map the map
*/
void markChanged(MVMap<?, ?> map) {
mapsChanged.put(map.getId(), map);
if (map == meta) {
metaChanged = true;
}
}
private void markMetaChanged() {
......@@ -366,7 +396,7 @@ public class MVStore {
* Open the store.
*/
void open() {
meta = new MVMap<String, String>(StringDataType.INSTANCE, StringDataType.INSTANCE);
meta = new MVMapConcurrent<String, String>(StringDataType.INSTANCE, StringDataType.INSTANCE);
HashMap<String, String> c = New.hashMap();
c.put("id", "0");
c.put("createVersion", Long.toString(currentVersion));
......@@ -390,6 +420,15 @@ public class MVStore {
Arrays.fill(filePassword, (char) 0);
}
}
lastStoreTime = getTime();
// if we use auto-save, also start the background thread
if (maxUnsavedPages > 0) {
Writer w = new Writer(this);
Thread t = new Thread(w, "MVStore writer " + fileName);
t.setDaemon(true);
t.start();
backgroundThread = t;
}
}
/**
......@@ -428,6 +467,7 @@ public class MVStore {
if (fileSize == 0) {
creationTime = 0;
creationTime = getTime();
lastStoreTime = creationTime;
fileHeader.put("H", "3");
fileHeader.put("blockSize", "" + BLOCK_SIZE);
fileHeader.put("format", "" + FORMAT_WRITE);
......@@ -534,6 +574,7 @@ public class MVStore {
if (currentVersion < 0) {
throw DataUtils.newIllegalStateException("File header is corrupt");
}
lastStoredVersion = -1;
}
private byte[] getFileHeaderBytes() {
......@@ -547,7 +588,7 @@ public class MVStore {
DataUtils.appendMap(buff, "fletcher", Integer.toHexString(checksum));
bytes = DataUtils.utf8Encode(buff.toString());
DataUtils.checkArgument(bytes.length <= BLOCK_SIZE,
"File header too large: {}", buff);
"File header too large: {0}", buff);
return bytes;
}
......@@ -570,33 +611,43 @@ public class MVStore {
close(true);
}
private void close(boolean shrinkIfPossible) {
private synchronized void close(boolean shrinkIfPossible) {
closed = true;
if (file != null) {
if (file == null) {
return;
}
if (backgroundThread != null) {
Thread t = backgroundThread;
backgroundThread = null;
t.interrupt();
try {
if (shrinkIfPossible) {
shrinkFileIfPossible(0);
}
log("file close");
if (fileLock != null) {
fileLock.release();
fileLock = null;
}
file.close();
for (MVMap<?, ?> m : New.arrayList(maps.values())) {
m.close();
}
meta = null;
chunks.clear();
cache.clear();
maps.clear();
mapsChanged.clear();
t.join();
} catch (Exception e) {
throw DataUtils.newIllegalStateException(
"Closing failed for file {0}", fileName, e);
} finally {
file = null;
// ignore
}
}
try {
if (shrinkIfPossible) {
shrinkFileIfPossible(0);
}
log("file close");
if (fileLock != null) {
fileLock.release();
fileLock = null;
}
file.close();
for (MVMap<?, ?> m : New.arrayList(maps.values())) {
m.close();
}
meta = null;
chunks.clear();
cache.clear();
maps.clear();
} catch (Exception e) {
throw DataUtils.newIllegalStateException(
"Closing failed for file {0}", fileName, e);
} finally {
file = null;
}
}
......@@ -623,24 +674,41 @@ public class MVStore {
* Commit all changes and persist them to disk. This method does nothing if
* there are no unsaved changes, otherwise it increments the current version
* and stores the data (for file based stores).
* <p>
* One store operation may run at any time.
*
* @return the new version (incremented if there were changes)
*/
public long store() {
return store(false);
}
/**
* Store changes.
*
* @param temp whether the changes should be rolled back after opening
* @return the new version (incremented if there were changes)
*/
private synchronized long store(boolean temp) {
checkOpen();
if (currentStoreVersion >= 0) {
// store is possibly called within store, if the meta map changed
return currentVersion;
}
if (!hasUnsavedChanges()) {
return currentVersion;
}
int currentUnsavedPageCount = unsavedPageCount;
long storeVersion = currentVersion;
long storeVersion = currentStoreVersion = currentVersion;
long version = incrementVersion();
long time = getTime();
if (file == null) {
return version;
}
long time = getTime();
lastStoreTime = time;
// the last chunk was not completely correct in the last store()
// this needs to be updated now (it's better not to update right after
// storing, because that would modify the meta map again)
......@@ -661,21 +729,29 @@ public class MVStore {
c.version = version;
chunks.put(c.id, c);
meta.put("chunk." + c.id, c.asString());
for (MVMap<?, ?> m : mapsChanged.values()) {
if (m == meta || !m.hasUnsavedChanges()) {
continue;
ArrayList<MVMap<?, ?>> list = New.arrayList(maps.values());
ArrayList<MVMap<?, ?>> changed = New.arrayList();
for (MVMap<?, ?> m : list) {
if (m != meta) {
long v = m.getVersion();
if (v >= 0 && m.getVersion() >= lastStoredVersion) {
changed.add(m.openVersion(storeVersion));
}
}
Page p = m.openVersion(storeVersion).getRoot();
}
for (MVMap<?, ?> m : changed) {
Page p = m.getRoot();
if (p.getTotalCount() == 0) {
meta.put("root." + m.getId(), "0");
} else {
meta.put("root." + m.getId(), String.valueOf(Long.MAX_VALUE));
meta.put("root." + m.getId(), String.valueOf(Integer.MAX_VALUE));
}
}
applyFreedChunks();
applyFreedChunks(storeVersion);
ArrayList<Integer> removedChunks = New.arrayList();
do {
// do it twice, because changing the meta table
// could cause a chunk to get empty
for (int i = 0; i < 2; i++) {
for (Chunk x : chunks.values()) {
if (x.maxLengthLive == 0 && canOverwriteChunk(x, time)) {
meta.remove("chunk." + x.id);
......@@ -683,9 +759,9 @@ public class MVStore {
} else {
meta.put("chunk." + x.id, x.asString());
}
applyFreedChunks();
applyFreedChunks(storeVersion);
}
} while (freedChunks.size() > 0);
}
ByteBuffer buff;
if (writeBuffer != null) {
buff = writeBuffer;
......@@ -697,11 +773,8 @@ public class MVStore {
c.writeHeader(buff);
c.maxLength = 0;
c.maxLengthLive = 0;
for (MVMap<?, ?> m : mapsChanged.values()) {
if (m == meta || !m.hasUnsavedChanges()) {
continue;
}
Page p = m.openVersion(storeVersion).getRoot();
for (MVMap<?, ?> m : changed) {
Page p = m.getRoot();
if (p.getTotalCount() > 0) {
buff = p.writeUnsavedRecursive(c, buff);
long root = p.getPos();
......@@ -728,7 +801,7 @@ public class MVStore {
long fileLength = getFileLengthUsed();
long filePos = reuseSpace ? allocateChunk(length) : fileLength;
boolean atEnd = filePos + length >= fileLength;
boolean storeAtEndOfFile = filePos + length >= fileLength;
// need to keep old chunks
// until they are are no longer referenced
......@@ -744,7 +817,7 @@ public class MVStore {
buff.position(0);
c.writeHeader(buff);
rootChunkStart = filePos;
revertTemp();
revertTemp(storeVersion);
buff.position(buff.limit() - BLOCK_SIZE);
byte[] header = getFileHeaderBytes();
......@@ -761,12 +834,16 @@ public class MVStore {
}
// overwrite the header if required
if (!atEnd) {
if (!storeAtEndOfFile) {
writeFileHeader();
shrinkFileIfPossible(1);
}
// some pages might have been changed in the meantime (in the newest version)
unsavedPageCount = Math.max(0, unsavedPageCount - currentUnsavedPageCount);
currentStoreVersion = -1;
metaChanged = false;
lastStoredVersion = storeVersion;
return version;
}
......@@ -778,19 +855,25 @@ public class MVStore {
return (System.currentTimeMillis() / 1000) - creationTime;
}
private void applyFreedChunks() {
// TODO support concurrent operations
for (HashMap<Integer, Chunk> freed : freedChunks.values()) {
for (Chunk f : freed.values()) {
Chunk c = chunks.get(f.id);
c.maxLengthLive += f.maxLengthLive;
if (c.maxLengthLive < 0) {
throw DataUtils.newIllegalStateException(
"Corrupt max length {0}", c.maxLengthLive);
private void applyFreedChunks(long storeVersion) {
synchronized (freedChunks) {
for (Iterator<Long> it = freedChunks.keySet().iterator(); it.hasNext();) {
long v = it.next();
if (v > storeVersion) {
continue;
}
Map<Integer, Chunk> freed = freedChunks.get(v);
for (Chunk f : freed.values()) {
Chunk c = chunks.get(f.id);
c.maxLengthLive += f.maxLengthLive;
if (c.maxLengthLive < 0) {
throw DataUtils.newIllegalStateException(
"Corrupt max length {0}", c.maxLengthLive);
}
}
it.remove();
}
}
freedChunks.clear();
}
/**
......@@ -870,11 +953,12 @@ public class MVStore {
*/
public boolean hasUnsavedChanges() {
checkOpen();
if (mapsChanged.size() == 0) {
return false;
if (metaChanged) {
return true;
}
for (MVMap<?, ?> m : mapsChanged.values()) {
if (m == meta || m.hasUnsavedChanges()) {
for (MVMap<?, ?> m : maps.values()) {
long v = m.getVersion();
if (v >= 0 && v >= lastStoredVersion) {
return true;
}
}
......@@ -1051,9 +1135,10 @@ public class MVStore {
/**
* Remove a page.
*
* @param map the map the page belongs to
* @param pos the position of the page
*/
void removePage(long pos) {
void removePage(MVMap<?, ?> map, long pos) {
// we need to keep temporary pages,
// to support reading old versions and rollback
if (pos == 0) {
......@@ -1065,17 +1150,27 @@ public class MVStore {
// but we don't optimize for rollback
cache.remove(pos);
Chunk c = getChunk(pos);
HashMap<Integer, Chunk>freed = freedChunks.get(currentVersion);
if (freed == null) {
freed = New.hashMap();
freedChunks.put(currentVersion, freed);
}
Chunk f = freed.get(c.id);
if (f == null) {
f = new Chunk(c.id);
freed.put(c.id, f);
long version = currentVersion;
if (map == meta && currentStoreVersion >= 0) {
// if the meta map is modified while storing,
// then this freed page needs to be registered
// with the stored chunk, so that the old chunk
// can be re-used
version = currentStoreVersion;
}
synchronized (freedChunks) {
HashMap<Integer, Chunk>freed = freedChunks.get(version);
if (freed == null) {
freed = New.hashMap();
freedChunks.put(version, freed);
}
Chunk f = freed.get(c.id);
if (f == null) {
f = new Chunk(c.id);
freed.put(c.id, f);
}
f.maxLengthLive -= DataUtils.getPageMaxLength(pos);
}
f.maxLengthLive -= DataUtils.getPageMaxLength(pos);
}
/**
......@@ -1146,10 +1241,10 @@ public class MVStore {
* How long to retain old, persisted chunks, in seconds. Chunks that are
* older than this many seconds may be overwritten once they contain no live
* data. The default is 45 seconds. It is assumed that a file system and
* hard disk will flush all write buffers after this many seconds at the
* latest. Using a lower value might be dangerous, unless the file system
* and hard disk flush the buffers earlier. To manually flush the buffers,
* use <code>MVStore.getFile().force(true)</code>, however please note that
* hard disk will flush all write buffers within this many seconds. Using a
* lower value might be dangerous, unless the file system and hard disk
* flush the buffers earlier. To manually flush the buffers, use
* <code>MVStore.getFile().force(true)</code>, however please note that
* according to various tests this does not always work as expected.
* <p>
* This setting is not persisted.
......@@ -1232,6 +1327,15 @@ public class MVStore {
unsavedPageCount++;
}
/**
* This method is called before writing to a map.
*/
void beforeWrite() {
if (unsavedPageCount > maxUnsavedPages && maxUnsavedPages > 0) {
store();
}
}
/**
* Get the store version. The store version is usually used to upgrade the
* structure of the store after upgrading the application. Initially the
......@@ -1264,12 +1368,12 @@ public class MVStore {
*
* @param version the version to revert to
*/
public void rollbackTo(long version) {
public synchronized void rollbackTo(long version) {
checkOpen();
DataUtils.checkArgument(
isKnownVersion(version),
"Unknown version {0}", version);
for (MVMap<?, ?> m : mapsChanged.values()) {
for (MVMap<?, ?> m : maps.values()) {
m.rollbackTo(version);
}
for (long v = currentVersion; v >= version; v--) {
......@@ -1279,11 +1383,12 @@ public class MVStore {
freedChunks.remove(v);
}
meta.rollbackTo(version);
metaChanged = false;
boolean loadFromFile = false;
Chunk last = chunks.get(lastChunkId);
if (last != null) {
if (last.version >= version) {
revertTemp();
revertTemp(version);
loadFromFile = true;
do {
last = chunks.remove(lastChunkId);
......@@ -1313,19 +1418,27 @@ public class MVStore {
if (loadFromFile) {
String r = meta.get("root." + id);
long root = r == null ? 0 : Long.parseLong(r);
m.setRootPos(root, version);
m.setRootPos(root, -1);
}
}
}
// this.lastStoredVersion = version - 1;
this.currentVersion = version;
}
private void revertTemp() {
freedChunks.clear();
for (MVMap<?, ?> m : mapsChanged.values()) {
m.removeAllOldVersions();
private void revertTemp(long storeVersion) {
synchronized (freedChunks) {
for (Iterator<Long> it = freedChunks.keySet().iterator(); it.hasNext();) {
long v = it.next();
if (v > storeVersion) {
continue;
}
it.remove();
}
}
for (MVMap<?, ?> m : maps.values()) {
m.removeUnusedOldVersions();
}
mapsChanged.clear();
}
/**
......@@ -1430,6 +1543,42 @@ public class MVStore {
return DataUtils.parseMap(m).get("name");
}
void storeIfNeeded() {
if (closed || unsavedPageCount == 0) {
return;
}
long time = getTime();
if (time <= lastStoreTime + 1) {
return;
}
store();
}
/**
* A background writer to automatically store changes every two seconds.
*/
private static class Writer implements Runnable {
private final MVStore store;
Writer(MVStore store) {
this.store = store;
}
@Override
public void run() {
while (!store.closed) {
store.storeIfNeeded();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
}
}
}
/**
* A builder for an MVStore.
*/
......@@ -1481,7 +1630,7 @@ public class MVStore {
* @return this
*/
public Builder readOnly() {
return set("openMode", "r");
return set("readOnly", 1);
}
/**
......@@ -1491,7 +1640,21 @@ public class MVStore {
* @return this
*/
public Builder cacheSizeMB(int mb) {
return set("cacheSize", Integer.toString(mb));
return set("cacheSize", mb);
}
/**
* Set the size of the write buffer in MB. The default is 4 MB. Changes
* are automatically stored if the buffer grows larger than this, and
* after 2 seconds (whichever occurs earlier).
* <p>
* To disable automatically storing, set the buffer size to 0.
*
* @param mb the write buffer size
* @return this
*/
public Builder writeBufferSizeMB(int mb) {
return set("writeBufferSize", mb);
}
/**
......@@ -1502,7 +1665,7 @@ public class MVStore {
* @return this
*/
public Builder compressData() {
return set("compress", "1");
return set("compress", 1);
}
/**
......
......@@ -267,7 +267,7 @@ public class Page {
* @return a page with the given version
*/
public Page copy(long version) {
map.getStore().removePage(pos);
map.removePage(pos);
Page newPage = create(map, version,
keyCount, keys, values, children, childrenPages,
counts, totalCount,
......@@ -541,14 +541,14 @@ public class Page {
long c = children[i];
int type = DataUtils.getPageType(c);
if (type == DataUtils.PAGE_TYPE_LEAF) {
map.getStore().removePage(c);
map.removePage(c);
} else {
map.readPage(c).removeAllRecursive();
}
}
}
}
map.getStore().removePage(pos);
map.removePage(pos);
}
/**
......
......@@ -150,7 +150,7 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
result = p.getValue(i);
p.remove(i);
if (p.getKeyCount() == 0) {
removePage(p);
removePage(p.getPos());
}
break;
}
......@@ -170,7 +170,7 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
// this child was deleted
p.remove(i);
if (p.getKeyCount() == 0) {
removePage(p);
removePage(p.getPos());
}
break;
}
......@@ -211,34 +211,38 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
}
private Object putOrAdd(SpatialKey key, V value, boolean alwaysAdd) {
checkWrite();
long writeVersion = store.getCurrentVersion();
Page p = copyOnWrite(root, writeVersion);
Object result;
if (alwaysAdd || get(key) == null) {
if (p.getMemory() > store.getPageSize() && p.getKeyCount() > 1) {
// only possible if this is the root, else we would have split earlier
// (this requires maxPageSize is fixed)
long totalCount = p.getTotalCount();
Page split = split(p, writeVersion);
Object k1 = getBounds(p);
Object k2 = getBounds(split);
Object[] keys = { k1, k2 };
long[] children = { p.getPos(), split.getPos(), 0 };
Page[] childrenPages = { p, split, null };
long[] counts = { p.getTotalCount(), split.getTotalCount(), 0 };
p = Page.create(this, writeVersion, 2,
keys, null, children, childrenPages, counts,
totalCount, 0, 0);
// now p is a node; continues
beforeWrite();
try {
long writeVersion = store.getCurrentVersion();
Page p = copyOnWrite(root, writeVersion);
Object result;
if (alwaysAdd || get(key) == null) {
if (p.getMemory() > store.getPageSize() && p.getKeyCount() > 1) {
// only possible if this is the root, else we would have split earlier
// (this requires maxPageSize is fixed)
long totalCount = p.getTotalCount();
Page split = split(p, writeVersion);
Object k1 = getBounds(p);
Object k2 = getBounds(split);
Object[] keys = { k1, k2 };
long[] children = { p.getPos(), split.getPos(), 0 };
Page[] childrenPages = { p, split, null };
long[] counts = { p.getTotalCount(), split.getTotalCount(), 0 };
p = Page.create(this, writeVersion, 2,
keys, null, children, childrenPages, counts,
totalCount, 0, 0);
// now p is a node; continues
}
add(p, writeVersion, key, value);
result = null;
} else {
result = set(p, writeVersion, key, value);
}
add(p, writeVersion, key, value);
result = null;
} else {
result = set(p, writeVersion, key, value);
newRoot(p);
return result;
} finally {
afterWrite();
}
newRoot(p);
return result;
}
/**
......
......@@ -206,7 +206,7 @@ public class TestMVStore extends TestBase {
s.store();
s.close();
int[] expectedReadsForCacheSize = {
3412, 2590, 1924, 1440, 1102, 956, 918
3407, 2590, 1924, 1440, 1106, 956, 918
};
for (int cacheSize = 0; cacheSize <= 6; cacheSize += 4) {
s = new MVStore.Builder().
......@@ -723,17 +723,22 @@ public class TestMVStore extends TestBase {
m.put("1", "Hello");
assertEquals(1, s.incrementVersion());
s.rollbackTo(1);
assertEquals(1, s.getCurrentVersion());
assertEquals("Hello", m.get("1"));
long v2 = s.store();
assertEquals(2, v2);
assertEquals(2, s.getCurrentVersion());
assertFalse(s.hasUnsavedChanges());
assertEquals("Hello", m.get("1"));
s.close();
s = openStore(fileName);
assertEquals(2, s.getCurrentVersion());
meta = s.getMetaMap();
m = s.openMap("data");
assertFalse(s.hasUnsavedChanges());
assertEquals("Hello", m.get("1"));
m0 = s.openMap("data0");
MVMap<String, String> m1 = s.openMap("data1");
m.put("1", "Hallo");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论