提交 2ea52348 authored 作者: Thomas Mueller's avatar Thomas Mueller

MVStore: use a separate write version for every map, so that the meta map…

MVStore: use a separate write version for every map, so that the meta map version matches the version of the other maps.
上级 bae98769
......@@ -38,6 +38,11 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* The current root page (may not be null).
*/
protected volatile Page root;
/**
* The version used for writing.
*/
protected long writeVersion;
private int id;
private long createVersion;
......@@ -70,6 +75,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
this.id = Integer.parseInt(config.get("id"));
String x = config.get("createVersion");
this.createVersion = x == null ? 0 : Long.parseLong(x);
this.writeVersion = store.getCurrentVersion();
}
/**
......@@ -100,10 +106,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
DataUtils.checkArgument(value != null, "The value may not be null");
beforeWrite();
try {
long writeVersion = store.getCurrentVersion();
Page p = copyOnWrite(root, writeVersion);
p = splitRootIfNeeded(p, writeVersion);
Object result = put(p, writeVersion, key, value);
long v = writeVersion;
Page p = copyOnWrite(root, v);
p = splitRootIfNeeded(p, v);
Object result = put(p, v, key, value);
newRoot(p);
return (V) result;
} finally {
......@@ -496,7 +502,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
beforeWrite();
try {
root.removeAllRecursive();
newRoot(Page.createEmpty(this, store.getCurrentVersion()));
newRoot(Page.createEmpty(this, writeVersion));
} finally {
afterWrite();
}
......@@ -544,10 +550,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
public V remove(Object key) {
beforeWrite();
try {
long writeVersion = store.getCurrentVersion();
Page p = copyOnWrite(root, writeVersion);
long v = writeVersion;
Page p = copyOnWrite(root, v);
@SuppressWarnings("unchecked")
V result = (V) remove(p, writeVersion, key);
V result = (V) remove(p, v, key);
newRoot(p);
return result;
} finally {
......@@ -1014,7 +1020,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
// need to copy because it can change
Page r = root;
if (version >= r.getVersion() &&
(version == store.getCurrentVersion() ||
(version == writeVersion ||
r.getVersion() >= 0 ||
version <= createVersion ||
store.getFile() == null)) {
......@@ -1128,6 +1134,10 @@ public class MVMap<K, V> extends AbstractMap<K, V>
afterWrite();
}
}
void setWriteVersion(long writeVersion) {
this.writeVersion = writeVersion;
}
@Override
public String toString() {
......
......@@ -46,11 +46,11 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> {
// even if the result is the same, we still update the value
// (otherwise compact doesn't work)
get(key);
long writeVersion = store.getCurrentVersion();
long v = writeVersion;
synchronized (this) {
Page p = copyOnWrite(root, writeVersion);
p = splitRootIfNeeded(p, writeVersion);
V result = (V) put(p, writeVersion, key, value);
Page p = copyOnWrite(root, v);
p = splitRootIfNeeded(p, v);
V result = (V) put(p, v, key, value);
newRoot(p);
return result;
}
......@@ -73,10 +73,10 @@ public class MVMapConcurrent<K, V> extends MVMap<K, V> {
if (result == null) {
return null;
}
long writeVersion = store.getCurrentVersion();
long v = writeVersion;
synchronized (this) {
Page p = copyOnWrite(root, writeVersion);
result = (V) remove(p, writeVersion, key);
Page p = copyOnWrite(root, v);
result = (V) remove(p, v, key);
newRoot(p);
}
return result;
......
......@@ -49,7 +49,7 @@ TODO:
TestMVStoreDataLoss
TransactionStore:
- support reading the undo log
- write to the undo log _before_ a change (WAL style)
MVStore:
- rolling docs review: at "Features"
......@@ -109,6 +109,8 @@ MVStore:
- more consistent null handling (keys/values sometimes may be null)
- autocommit (to avoid having to call commit,
as it could be called too often or it is easily forgotten)
- remove features that are not really needed; simplify the code
- rename "store" to "save", as store collides with storeVersion
*/
......@@ -405,9 +407,9 @@ public class MVStore {
private Chunk getChunkForVersion(long version) {
for (int chunkId = lastChunkId;; chunkId--) {
Chunk x = chunks.get(chunkId);
if (x == null || x.version < version) {
if (x == null) {
return null;
} else if (x.version == version) {
} else if (x.version <= version) {
return x;
}
}
......@@ -619,7 +621,9 @@ public class MVStore {
private void readFileHeader() {
// we don't have a valid header yet
currentVersion = -1;
// read the last block of the file, and then two first blocks
// we don't know which chunk is the newest
long newestChunk = -1;
// read the last block of the file, and then the two first blocks
ByteBuffer buff = ByteBuffer.allocate(3 * BLOCK_SIZE);
buff.limit(BLOCK_SIZE);
fileReadCount++;
......@@ -653,25 +657,28 @@ public class MVStore {
if (check != checksum) {
continue;
}
long version = Long.parseLong(m.get("version"));
if (version > currentVersion) {
long chunk = Long.parseLong(m.get("chunk"));
if (chunk > newestChunk) {
newestChunk = chunk;
fileHeader = m;
rootChunkStart = Long.parseLong(m.get("rootChunk"));
creationTime = Long.parseLong(m.get("creationTime"));
currentVersion = version;
lastMapId = Integer.parseInt(m.get("lastMapId"));
currentVersion = Long.parseLong(m.get("version"));
}
}
if (currentVersion < 0) {
throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT, "File header is corrupt: {0}", fileName);
}
setWriteVersion(currentVersion);
lastStoredVersion = -1;
}
private byte[] getFileHeaderBytes() {
StringBuilder buff = new StringBuilder();
fileHeader.put("lastMapId", "" + lastMapId);
fileHeader.put("chunk", "" + lastChunkId);
fileHeader.put("rootChunk", "" + rootChunkStart);
fileHeader.put("version", "" + currentVersion);
DataUtils.appendMap(buff, fileHeader);
......@@ -709,8 +716,10 @@ public class MVStore {
return;
}
if (!readOnly) {
if (hasUnsavedChanges()) {
stopBackgroundThread();
if (hasUnsavedChanges() || lastCommittedVersion != currentVersion) {
rollbackTo(lastCommittedVersion);
metaChanged = true;
store(false);
}
}
......@@ -718,11 +727,17 @@ public class MVStore {
}
/**
* Close the file and the store, without writing anything.
* This will stop the background thread.
* Close the file and the store, without writing anything. This will stop
* the background thread. This method ignores all errors.
*/
public void closeImmediately() {
closeFile(false);
try {
closeFile(false);
} catch (Exception e) {
if (backgroundExceptionListener != null) {
backgroundExceptionListener.exceptionThrown(e);
}
}
}
private void closeFile(boolean shrinkIfPossible) {
......@@ -782,7 +797,16 @@ public class MVStore {
* @return the new version
*/
public long incrementVersion() {
return ++currentVersion;
long v = ++currentVersion;
setWriteVersion(v);
return v;
}
private void setWriteVersion(long version) {
for (MVMap<?, ?> map : maps.values()) {
map.setWriteVersion(version);
}
meta.setWriteVersion(version);
}
/**
......@@ -796,7 +820,7 @@ public class MVStore {
* @return the new version
*/
public long commit() {
long v = ++currentVersion;
long v = incrementVersion();
lastCommittedVersion = v;
if (writeDelay == 0) {
store(false);
......@@ -842,12 +866,12 @@ public class MVStore {
DataUtils.ERROR_WRITING_FAILED, "This store is read-only");
}
int currentUnsavedPageCount = unsavedPageCount;
long storeVersion = currentStoreVersion = currentVersion;
long version = incrementVersion();
currentStoreVersion = currentVersion;
if (file == null) {
return version;
return incrementVersion();
}
long storeVersion = currentStoreVersion;
long version = ++currentVersion;
long time = getTime();
lastStoreTime = time;
if (temp) {
......@@ -867,6 +891,7 @@ public class MVStore {
meta.remove("rollbackOnOpen");
retainChunk = null;
}
// the last chunk was not completely correct in the last store()
// this needs to be updated now (it's better not to update right after
// storing, because that would modify the meta map again)
......@@ -890,6 +915,7 @@ public class MVStore {
ArrayList<MVMap<?, ?>> changed = New.arrayList();
for (MVMap<?, ?> m : list) {
if (m != meta) {
m.setWriteVersion(version);
long v = m.getVersion();
if (v >= 0 && m.getVersion() >= lastStoredVersion) {
MVMap<?, ?> r = m.openVersion(storeVersion);
......@@ -930,8 +956,9 @@ public class MVStore {
}
meta.put("chunk." + c.id, c.asString());
// this will modify maxLengthLive, but
meta.setWriteVersion(version);
// this will (again) modify maxLengthLive, but
// the correct value is written in the chunk header
buff = meta.getRoot().writeUnsavedRecursive(c, buff);
......@@ -1001,8 +1028,10 @@ public class MVStore {
// some pages might have been changed in the meantime (in the newest version)
unsavedPageCount = Math.max(0, unsavedPageCount - currentUnsavedPageCount);
currentStoreVersion = -1;
metaChanged = false;
lastStoredVersion = storeVersion;
if (!temp) {
metaChanged = false;
lastStoredVersion = storeVersion;
}
return version;
}
......@@ -1138,7 +1167,7 @@ public class MVStore {
for (MVMap<?, ?> m : maps.values()) {
if (!m.isClosed()) {
long v = m.getVersion();
if (v >= 0 && v >= lastStoredVersion) {
if (v >= 0 && v > lastStoredVersion) {
return true;
}
}
......@@ -1564,6 +1593,8 @@ public class MVStore {
freedPages.clear();
}
currentVersion = version;
setWriteVersion(version);
lastCommittedVersion = version;
metaChanged = false;
return;
}
......@@ -1584,31 +1615,46 @@ public class MVStore {
meta.rollbackTo(version);
metaChanged = false;
boolean loadFromFile = false;
Chunk last = chunks.get(lastChunkId);
if (last != null) {
if (last.version >= version) {
revertTemp(version);
loadFromFile = true;
do {
last = chunks.remove(lastChunkId);
int len = MathUtils.roundUpInt(last.length, BLOCK_SIZE) + BLOCK_SIZE;
freeSpace.free(last.start, len);
lastChunkId--;
} while (last.version > version && chunks.size() > 0);
rootChunkStart = last.start;
writeFileHeader();
// need to write the header at the end of the file as well,
// so that the old end header is not used
byte[] bytes = getFileHeaderBytes();
ByteBuffer header = ByteBuffer.allocate(BLOCK_SIZE);
header.put(bytes);
header.rewind();
fileWriteCount++;
DataUtils.writeFully(file, fileSize, header);
fileSize += BLOCK_SIZE;
readFileHeader();
readMeta();
// get the largest chunk with a version
// higher or equal the requested version
int removeChunksNewerThan = -1;
for (int chunkId = lastChunkId;; chunkId--) {
Chunk x = chunks.get(chunkId);
if (x == null) {
break;
} else if (x.version >= version) {
removeChunksNewerThan = x.id;
}
}
if (removeChunksNewerThan >= 0 && lastChunkId > removeChunksNewerThan) {
revertTemp(version);
loadFromFile = true;
Chunk last = null;
while (true) {
last = chunks.get(lastChunkId);
if (last == null) {
break;
} else if (last.id <= removeChunksNewerThan) {
break;
}
chunks.remove(lastChunkId);
int len = MathUtils.roundUpInt(last.length, BLOCK_SIZE) + BLOCK_SIZE;
freeSpace.free(last.start, len);
lastChunkId--;
}
rootChunkStart = last.start;
writeFileHeader();
// need to write the header at the end of the file as well,
// so that the old end header is not used
byte[] bytes = getFileHeaderBytes();
ByteBuffer header = ByteBuffer.allocate(BLOCK_SIZE);
header.put(bytes);
header.rewind();
fileWriteCount++;
DataUtils.writeFully(file, fileSize, header);
fileSize += BLOCK_SIZE;
readFileHeader();
readMeta();
}
for (MVMap<?, ?> m : New.arrayList(maps.values())) {
int id = m.getId();
......@@ -1622,8 +1668,11 @@ public class MVStore {
m.setRootPos(root, -1);
}
}
}
this.currentVersion = version;
currentVersion = version;
setWriteVersion(version);
lastCommittedVersion = version;
}
private void revertTemp(long storeVersion) {
......@@ -1727,16 +1776,15 @@ public class MVStore {
checkOpen();
DataUtils.checkArgument(map != meta,
"Renaming the meta map is not allowed");
if (map.getName().equals(newName)) {
int id = map.getId();
String oldName = getMapName(id);
if (oldName.equals(newName)) {
return;
}
DataUtils.checkArgument(
!meta.containsKey("name." + newName),
"A map named {0} already exists", newName);
int id = map.getId();
String oldName = getMapName(id);
markMetaChanged();
meta.remove("map." + id);
meta.remove("name." + oldName);
meta.put("map." + id, map.asString(newName));
meta.put("name." + newName, Integer.toString(id));
......@@ -1762,13 +1810,16 @@ public class MVStore {
}
// could also store when there are many unsaved pages,
// but according to a test it doesn't really help
if (lastCommittedVersion >= currentVersion) {
if (lastStoredVersion >= lastCommittedVersion) {
return;
}
long time = getTime();
if (time <= lastStoreTime + writeDelay) {
return;
}
if (!hasUnsavedChanges()) {
return;
}
try {
store(true);
} catch (Exception e) {
......
......@@ -913,6 +913,11 @@ public class TransactionStore {
newValue.value = value;
if (current == null) {
// a new value
int todo;
// either write the log before the data (and handle this case in rollback)
// or ensure/document concurrent commits are not allowed
VersionedValue old = map.putIfAbsent(key, newValue);
if (old == null) {
transaction.log(opType, mapId, key, current);
......
......@@ -223,30 +223,30 @@ public class MVRTreeMap<V> extends MVMap<SpatialKey, V> {
private Object putOrAdd(SpatialKey key, V value, boolean alwaysAdd) {
beforeWrite();
try {
long writeVersion = store.getCurrentVersion();
Page p = copyOnWrite(root, writeVersion);
long v = writeVersion;
Page p = copyOnWrite(root, v);
Object result;
if (alwaysAdd || get(key) == null) {
if (p.getMemory() > store.getPageSize() && p.getKeyCount() > 1) {
// only possible if this is the root, else we would have split earlier
// (this requires maxPageSize is fixed)
long totalCount = p.getTotalCount();
Page split = split(p, writeVersion);
Page split = split(p, v);
Object k1 = getBounds(p);
Object k2 = getBounds(split);
Object[] keys = { k1, k2 };
long[] children = { p.getPos(), split.getPos(), 0 };
Page[] childrenPages = { p, split, null };
long[] counts = { p.getTotalCount(), split.getTotalCount(), 0 };
p = Page.create(this, writeVersion, 2,
p = Page.create(this, v, 2,
keys, null, children, childrenPages, counts,
totalCount, 0, 0);
// now p is a node; continues
}
add(p, writeVersion, key, value);
add(p, v, key, value);
result = null;
} else {
result = set(p, writeVersion, key, value);
result = set(p, v, key, value);
}
newRoot(p);
return result;
......
......@@ -123,7 +123,7 @@ public class TestMVStore extends TestBase {
assertTrue(e != null);
assertEquals(DataUtils.ERROR_WRITING_FAILED, DataUtils.getErrorCode(e.getMessage()));
s.close();
s.closeImmediately();
FileUtils.delete(fileName);
}
......@@ -237,7 +237,14 @@ public class TestMVStore extends TestBase {
m.put(2, "World");
Thread.sleep(5);
// must not store, as nothing has been committed yet
assertEquals(v, s.getCurrentVersion());
s.closeImmediately();
s = new MVStore.Builder().
writeDelay(1).
fileName(fileName).
open();
m = s.openMap("data");
assertEquals(null, m.get(2));
m.put(2, "World");
s.commit();
m.put(3, "!");
......@@ -251,20 +258,7 @@ public class TestMVStore extends TestBase {
Thread.sleep(1);
}
s.close();
s = new MVStore.Builder().
fileName(fileName).
open();
m = s.openMap("data");
assertEquals("Hello", m.get(1));
assertEquals("World", m.get(2));
assertFalse(m.containsKey(3));
String data = new String(new char[1000]).replace((char) 0, 'x');
for (int i = 0; i < 1000; i++) {
m.put(i, data);
}
s.close();
s = new MVStore.Builder().
fileName(fileName).
open();
......@@ -472,13 +466,21 @@ public class TestMVStore extends TestBase {
s.close();
}
private void testFileHeaderCorruption() throws IOException {
private void testFileHeaderCorruption() throws Exception {
String fileName = getBaseDir() + "/testFileHeader.h3";
MVStore s = openStore(fileName);
s.setRetentionTime(10);
MVMap<Integer, Integer> map = s.openMap("test");
for (int i = 0; i < 5; i++) {
s.setStoreVersion(i);
s.store();
}
// ensure the oldest chunks can be overwritten
Thread.sleep(11);
s.compact(50);
map.put(10, 100);
FilePath f = FilePath.get(s.getFileName());
s.store();
FilePath f = FilePath.get(s.getFileName());
s.close();
int blockSize = 4 * 1024;
// test corrupt file headers
......@@ -652,13 +654,13 @@ public class TestMVStore extends TestBase {
s.setStoreVersion(1);
s.close();
s = MVStore.open(fileName);
assertEquals(0, s.getCurrentVersion());
assertEquals(1, s.getCurrentVersion());
assertEquals(0, s.getStoreVersion());
s.setStoreVersion(1);
s.store();
s.close();
s = MVStore.open(fileName);
assertEquals(1, s.getCurrentVersion());
assertEquals(2, s.getCurrentVersion());
assertEquals(1, s.getStoreVersion());
s.close();
}
......@@ -698,6 +700,7 @@ public class TestMVStore extends TestBase {
map.put(new Object[1], new Object[]{1, "2"});
s.store();
s.close();
s = new MVStore.Builder().fileName(fileName).open();
map = s.openMap("test");
assertEquals("Hello", map.get(1).toString());
......@@ -938,6 +941,8 @@ public class TestMVStore extends TestBase {
s.rollbackTo(1);
assertEquals(1, s.getCurrentVersion());
assertEquals("Hello", m.get("1"));
// so a new version is created
m.put("1", "Hello");
long v2 = s.store();
assertEquals(2, v2);
......
......@@ -441,7 +441,7 @@ public class TestTransactionStore extends TestBase {
return;
}
statements.get(0).execute(
"drop table if exists test");
"drop table if exists test cascade");
statements.get(0).execute(
"create table test(id int primary key, name varchar(255))");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论