提交 a016031b authored 作者: andrei's avatar andrei

Fix for TestConcurrent

上级 00eff4a4
...@@ -54,11 +54,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -54,11 +54,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
private boolean readOnly; private boolean readOnly;
private boolean isVolatile; private boolean isVolatile;
/**
* This is a magic number for a version parameter in setNewRoot() call
* which meens "keep version the same it is now".
*/
private static final long KEEP_CURRENT = -2;
/** /**
* This designates the "last stored" version for a store which was * This designates the "last stored" version for a store which was
* just open for the first time. * just open for the first time.
...@@ -124,7 +119,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -124,7 +119,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
/** /**
* Open this map. * Initialize this map.
*/ */
protected void init() {} protected void init() {}
...@@ -608,11 +603,9 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -608,11 +603,9 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* Re-write any pages that belong to one of the chunks in the given set. * Re-write any pages that belong to one of the chunks in the given set.
* *
* @param set the set of chunk ids * @param set the set of chunk ids
* @return whether rewriting was successful
*/ */
final boolean rewrite(Set<Integer> set) { final void rewrite(Set<Integer> set) {
rewrite(getRootPage(), set); rewrite(getRootPage(), set);
return true;
} }
private int rewrite(Page p, Set<Integer> set) { private int rewrite(Page p, Set<Integer> set) {
...@@ -622,7 +615,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -622,7 +615,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
if (!set.contains(chunkId)) { if (!set.contains(chunkId)) {
return 0; return 0;
} }
if (p.getKeyCount() > 0) { assert p.getKeyCount() > 0;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
K key = (K) p.getKey(0); K key = (K) p.getKey(0);
V value = get(key); V value = get(key);
...@@ -632,7 +625,6 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -632,7 +625,6 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
replace(key, value, value); replace(key, value, value);
} }
}
return 1; return 1;
} }
int writtenPageCount = 0; int writtenPageCount = 0;
...@@ -793,7 +785,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -793,7 +785,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
final void setRoot(Page rootPage) { final void setRoot(Page rootPage) {
int attempt = 0; int attempt = 0;
while (setNewRoot(null, rootPage, KEEP_CURRENT, ++attempt, false) == null) {/**/} while (setNewRoot(null, rootPage, ++attempt, false) == null) {/**/}
} }
final void setInitialRoot(Page rootPage, long version) { final void setInitialRoot(Page rootPage, long version) {
...@@ -805,14 +797,13 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -805,14 +797,13 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* *
* @param oldRoot previous root reference * @param oldRoot previous root reference
* @param newRootPage the new root page * @param newRootPage the new root page
* @param newVersion corresponding new version
* @param attemptUpdateCounter how many attempt (including current) * @param attemptUpdateCounter how many attempt (including current)
* were made to update root * were made to update root
* @param obeyLock false means override root iven if it marked as locked (used to unlock) * @param obeyLock false means override root iven if it marked as locked (used to unlock)
* true will fail to update, if rott is currently locked * true will fail to update, if rott is currently locked
* @return new RootReference or null if update failed * @return new RootReference or null if update failed
*/ */
private RootReference setNewRoot(RootReference oldRoot, Page newRootPage, long newVersion, private RootReference setNewRoot(RootReference oldRoot, Page newRootPage,
int attemptUpdateCounter, boolean obeyLock) { int attemptUpdateCounter, boolean obeyLock) {
RootReference currentRoot = getRoot(); RootReference currentRoot = getRoot();
assert newRootPage != null || currentRoot != null; assert newRootPage != null || currentRoot != null;
...@@ -822,6 +813,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -822,6 +813,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
RootReference previous = currentRoot; RootReference previous = currentRoot;
long updateCounter = 1; long updateCounter = 1;
long newVersion = INITIAL_VERSION;
if(currentRoot != null) { if(currentRoot != null) {
if (obeyLock && currentRoot.lockedForUpdate) { if (obeyLock && currentRoot.lockedForUpdate) {
return null; return null;
...@@ -831,17 +823,8 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -831,17 +823,8 @@ public class MVMap<K, V> extends AbstractMap<K, V>
newRootPage = currentRoot.root; newRootPage = currentRoot.root;
} }
if (newVersion == KEEP_CURRENT) {
newVersion = currentRoot.version; newVersion = currentRoot.version;
previous = currentRoot.previous; previous = currentRoot.previous;
} else {
RootReference tmp = previous;
while ((tmp = tmp.previous) != null && tmp.root == newRootPage) {
previous = tmp;
}
}
updateCounter += currentRoot.updateCounter; updateCounter += currentRoot.updateCounter;
attemptUpdateCounter += currentRoot.updateAttemptCounter; attemptUpdateCounter += currentRoot.updateAttemptCounter;
} }
...@@ -883,7 +866,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -883,7 +866,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
* @param newRoot the new root page * @param newRoot the new root page
*/ */
protected final boolean updateRoot(RootReference oldRoot, Page newRoot, int attemptUpdateCounter) { protected final boolean updateRoot(RootReference oldRoot, Page newRoot, int attemptUpdateCounter) {
return setNewRoot(oldRoot, newRoot, KEEP_CURRENT, attemptUpdateCounter, true) != null; return setNewRoot(oldRoot, newRoot, attemptUpdateCounter, true) != null;
} }
/** /**
...@@ -892,18 +875,15 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -892,18 +875,15 @@ public class MVMap<K, V> extends AbstractMap<K, V>
*/ */
private void removeUnusedOldVersions(RootReference rootReference) { private void removeUnusedOldVersions(RootReference rootReference) {
long oldest = store.getOldestVersionToKeep(); long oldest = store.getOldestVersionToKeep();
// We are trying to keep at least one previous version (if any) here. // We need to keep at least one previous version (if any) here,
// This is not really necessary, just need to mimic existing // because in order to retain whole history of some version
// behaviour embeded in tests. // we really need last root of the previous version.
boolean head = true; // Root labeled with version "X" is the LAST known root for that version
RootReference previous; // and therefore the FIRST known root for the version "X+1"
while ((previous = rootReference.previous) != null) { for(RootReference rootRef = rootReference; rootRef != null; rootRef = rootRef.previous) {
if (previous.version < oldest && !head) { if (rootRef.version < oldest) {
rootReference.previous = null; rootRef.previous = null;
break;
} }
rootReference = previous;
head = false;
} }
} }
...@@ -1536,6 +1516,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1536,6 +1516,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} }
oldRootReference = rootReference; oldRootReference = rootReference;
++attempt; ++attempt;
Thread.yield();
CursorPos pos = traverseDown(rootReference.root, key); CursorPos pos = traverseDown(rootReference.root, key);
Page p = pos.page; Page p = pos.page;
int index = pos.index; int index = pos.index;
...@@ -1682,7 +1663,7 @@ public class MVMap<K, V> extends AbstractMap<K, V> ...@@ -1682,7 +1663,7 @@ public class MVMap<K, V> extends AbstractMap<K, V>
} while(!success); } while(!success);
} }
public static CursorPos traverseDown(Page p, Object key) { private static CursorPos traverseDown(Page p, Object key) {
CursorPos pos = null; CursorPos pos = null;
while (!p.isLeaf()) { while (!p.isLeaf()) {
assert p.getKeyCount() > 0; assert p.getKeyCount() > 0;
......
...@@ -274,11 +274,6 @@ public final class MVStore { ...@@ -274,11 +274,6 @@ public final class MVStore {
private long lastCommitTime; private long lastCommitTime;
/**
* The earliest chunk to retain, if any.
*/
private Chunk retainChunk;
/** /**
* The version of the current store operation (if any). * The version of the current store operation (if any).
*/ */
...@@ -1102,7 +1097,6 @@ public final class MVStore { ...@@ -1102,7 +1097,6 @@ public final class MVStore {
long storeVersion = currentStoreVersion; long storeVersion = currentStoreVersion;
long version = ++currentVersion; long version = ++currentVersion;
lastCommitTime = time; lastCommitTime = time;
retainChunk = null;
// the metadata of the last chunk was not stored so far, and needs to be // the metadata of the last chunk was not stored so far, and needs to be
// set now (it's better not to update right after storing, because that // set now (it's better not to update right after storing, because that
...@@ -1337,13 +1331,14 @@ public final class MVStore { ...@@ -1337,13 +1331,14 @@ public final class MVStore {
inspectedRoots.add(pos); inspectedRoots.add(pos);
collector.visit(pos); collector.visit(pos);
long oldestVersionToKeep = getOldestVersionToKeep(); long oldestVersionToKeep = getOldestVersionToKeep();
for (MVMap.RootReference rootReference = meta.getRoot(); MVMap.RootReference rootReference = meta.getRoot();
rootReference != null && rootReference.version >= oldestVersionToKeep; do {
rootReference = rootReference.previous) {
Page rootPage = rootReference.root; Page rootPage = rootReference.root;
pos = rootPage.getPos(); pos = rootPage.getPos();
if(rootPage.isSaved() && inspectedRoots.add(pos)) { if (!rootPage.isSaved()) {
collector.setMapId(meta.getId());
collector.visit(rootPage);
} else if(inspectedRoots.add(pos)) {
collector.setMapId(meta.getId()); collector.setMapId(meta.getId());
collector.visit(pos); collector.visit(pos);
} }
...@@ -1362,12 +1357,13 @@ public final class MVStore { ...@@ -1362,12 +1357,13 @@ public final class MVStore {
collector.visit(pos); collector.visit(pos);
} }
} }
} } while(rootReference.version >= oldestVersionToKeep &&
(rootReference = rootReference.previous) != null);
return collector.getReferenced(); return collector.getReferenced();
} }
public final class ChunkIdsCollector { final class ChunkIdsCollector {
private final Set<Integer> referenced = new HashSet<>(); private final Set<Integer> referenced = new HashSet<>();
private final ChunkIdsCollector parent; private final ChunkIdsCollector parent;
...@@ -1399,7 +1395,32 @@ public final class MVStore { ...@@ -1399,7 +1395,32 @@ public final class MVStore {
return referenced; return referenced;
} }
public void visit(Page page) {
long pos = page.getPos();
if (DataUtils.isPageSaved(pos)) {
register(DataUtils.getPageChunkId(pos));
}
int count = page.map.getChildPageCount(page);
if (count > 0) {
ChunkIdsCollector childCollector = getChild();
for (int i = 0; i < count; i++) {
Page childPage = page.getChildPageIfLoaded(i);
if (childPage != null) {
childCollector.visit(childPage);
} else {
childCollector.visit(page.getChildPagePos(i));
}
}
// and cache resulting set of chunk ids
if (DataUtils.isPageSaved(pos) && cacheChunkRef != null) {
int[] chunkIds = childCollector.getChunkIds();
cacheChunkRef.put(pos, chunkIds, Constants.MEMORY_ARRAY + 4 * chunkIds.length);
}
}
}
public void visit(long pos) { public void visit(long pos) {
assert DataUtils.isPageSaved(pos);
register(DataUtils.getPageChunkId(pos)); register(DataUtils.getPageChunkId(pos));
if (DataUtils.getPageType(pos) != DataUtils.PAGE_TYPE_LEAF) { if (DataUtils.getPageType(pos) != DataUtils.PAGE_TYPE_LEAF) {
int chunkIds[]; int chunkIds[];
...@@ -1413,10 +1434,7 @@ public final class MVStore { ...@@ -1413,10 +1434,7 @@ public final class MVStore {
Page page; Page page;
if (cache != null && (page = cache.get(pos)) != null) { if (cache != null && (page = cache.get(pos)) != null) {
// there is a full page in cache, use it // there is a full page in cache, use it
int count = page.getRawChildPageCount(); childCollector.visit(page);
for (int i = 0; i < count; i++) {
childCollector.visit(page.getChildPagePos(i));
}
} else { } else {
// page was not cached: read the data // page was not cached: read the data
Chunk chunk = getChunk(pos); Chunk chunk = getChunk(pos);
...@@ -1502,8 +1520,7 @@ public final class MVStore { ...@@ -1502,8 +1520,7 @@ public final class MVStore {
return false; return false;
} }
} }
Chunk r = retainChunk; return true;
return r == null || c.version <= r.version;
} }
private long getTimeSinceCreation() { private long getTimeSinceCreation() {
...@@ -1975,13 +1992,11 @@ public final class MVStore { ...@@ -1975,13 +1992,11 @@ public final class MVStore {
for (MVMap<?, ?> m : maps.values()) { for (MVMap<?, ?> m : maps.values()) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
MVMap<Object, Object> map = (MVMap<Object, Object>) m; MVMap<Object, Object> map = (MVMap<Object, Object>) m;
if (!map.isClosed() && !map.rewrite(set)) { if (!map.isClosed()) {
return; map.rewrite(set);
}
} }
if (!meta.rewrite(set)) {
return;
} }
meta.rewrite(set);
freeUnusedChunks(); freeUnusedChunks();
commit(); commit();
} }
...@@ -2006,7 +2021,7 @@ public final class MVStore { ...@@ -2006,7 +2021,7 @@ public final class MVStore {
if (filePos < 0) { if (filePos < 0) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(
DataUtils.ERROR_FILE_CORRUPT, DataUtils.ERROR_FILE_CORRUPT,
"Negative position {0}", filePos); "Negative position {0}; p={1}, c={2}", filePos, pos, c.toString());
} }
long maxPos = (c.block + c.len) * BLOCK_SIZE; long maxPos = (c.block + c.len) * BLOCK_SIZE;
p = Page.read(fileStore, pos, map, filePos, maxPos); p = Page.read(fileStore, pos, map, filePos, maxPos);
...@@ -2168,7 +2183,7 @@ public final class MVStore { ...@@ -2168,7 +2183,7 @@ public final class MVStore {
public long getOldestVersionToKeep() { public long getOldestVersionToKeep() {
long v = oldestVersionToKeep.get(); long v = oldestVersionToKeep.get();
if (fileStore == null) { if (fileStore == null) {
v = Math.max(v - versionsToKeep, INITIAL_VERSION); v = Math.max(v - versionsToKeep + 1, INITIAL_VERSION);
return v; return v;
} }
......
...@@ -219,6 +219,7 @@ public abstract class Page implements Cloneable ...@@ -219,6 +219,7 @@ public abstract class Page implements Cloneable
* @param pos the position * @param pos the position
* @param filePos the position in the file * @param filePos the position in the file
* @param maxPos the maximum position (the end of the chunk) * @param maxPos the maximum position (the end of the chunk)
* @param collector to report child pages positions to
*/ */
static void readChildrensPositions(FileStore fileStore, long pos, static void readChildrensPositions(FileStore fileStore, long pos,
long filePos, long maxPos, long filePos, long maxPos,
...@@ -318,6 +319,16 @@ public abstract class Page implements Cloneable ...@@ -318,6 +319,16 @@ public abstract class Page implements Cloneable
*/ */
public abstract Page getChildPage(int index); public abstract Page getChildPage(int index);
/**
* Get the child page at the given index only if is
* already loaded. Does not make any attempt to load
* the page or retrieve it from the cache.
*
* @param index the index
* @return the child page, null if it is not loaded
*/
public abstract Page getChildPageIfLoaded(int index);
/** /**
* Get the position of the child. * Get the position of the child.
* *
...@@ -919,6 +930,11 @@ public abstract class Page implements Cloneable ...@@ -919,6 +930,11 @@ public abstract class Page implements Cloneable
return page; return page;
} }
@Override
public Page getChildPageIfLoaded(int index) {
return children[index].page;
}
@Override @Override
public long getChildPagePos(int index) { public long getChildPagePos(int index) {
return children[index].pos; return children[index].pos;
...@@ -1186,6 +1202,9 @@ public abstract class Page implements Cloneable ...@@ -1186,6 +1202,9 @@ public abstract class Page implements Cloneable
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public Page getChildPageIfLoaded(int index) { throw new UnsupportedOperationException(); }
@Override @Override
public long getChildPagePos(int index) { public long getChildPagePos(int index) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
......
...@@ -117,7 +117,12 @@ public class TestConcurrent extends TestMVStore { ...@@ -117,7 +117,12 @@ public class TestConcurrent extends TestMVStore {
int i = 0; int i = 0;
while (!stop) { while (!stop) {
s.compact(100, 1024 * 1024); s.compact(100, 1024 * 1024);
MVStore.TxCounter token = s.registerVersionUsage();
try {
dataMap.put(i % 1000, i * 10); dataMap.put(i % 1000, i * 10);
} finally {
s.deregisterVersionUsage(token);
}
s.commit(); s.commit();
i++; i++;
} }
...@@ -126,7 +131,12 @@ public class TestConcurrent extends TestMVStore { ...@@ -126,7 +131,12 @@ public class TestConcurrent extends TestMVStore {
task.execute(); task.execute();
for (int i = 0; i < 1000 && !task.isFinished(); i++) { for (int i = 0; i < 1000 && !task.isFinished(); i++) {
s.compact(100, 1024 * 1024); s.compact(100, 1024 * 1024);
MVStore.TxCounter token = s.registerVersionUsage();
try {
dataMap.put(i % 1000, i * 10); dataMap.put(i % 1000, i * 10);
} finally {
s.deregisterVersionUsage(token);
}
s.commit(); s.commit();
} }
task.get(); task.get();
...@@ -721,7 +731,10 @@ public class TestConcurrent extends TestMVStore { ...@@ -721,7 +731,10 @@ public class TestConcurrent extends TestMVStore {
m.get(rand.nextInt(size)); m.get(rand.nextInt(size));
} catch (ConcurrentModificationException e) { } catch (ConcurrentModificationException e) {
detected.incrementAndGet(); detected.incrementAndGet();
} catch (NegativeArraySizeException | ArrayIndexOutOfBoundsException | IllegalArgumentException | NullPointerException e) { } catch ( NegativeArraySizeException
| ArrayIndexOutOfBoundsException
| IllegalArgumentException
| NullPointerException e) {
notDetected.incrementAndGet(); notDetected.incrementAndGet();
} }
} }
...@@ -741,7 +754,10 @@ public class TestConcurrent extends TestMVStore { ...@@ -741,7 +754,10 @@ public class TestConcurrent extends TestMVStore {
m.get(rand.nextInt(size)); m.get(rand.nextInt(size));
} catch (ConcurrentModificationException e) { } catch (ConcurrentModificationException e) {
detected.incrementAndGet(); detected.incrementAndGet();
} catch (NegativeArraySizeException | ArrayIndexOutOfBoundsException | IllegalArgumentException | NullPointerException e) { } catch ( NegativeArraySizeException
| ArrayIndexOutOfBoundsException
| NullPointerException
| IllegalArgumentException e) {
notDetected.incrementAndGet(); notDetected.incrementAndGet();
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论