提交 44047777 authored 作者: Andrei Tokar's avatar Andrei Tokar

few changes related to starvation

上级 85b71772
...@@ -23,6 +23,7 @@ import java.util.PriorityQueue; ...@@ -23,6 +23,7 @@ import java.util.PriorityQueue;
import java.util.Queue; import java.util.Queue;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
...@@ -33,6 +34,7 @@ import org.h2.engine.Constants; ...@@ -33,6 +34,7 @@ import org.h2.engine.Constants;
import org.h2.mvstore.cache.CacheLongKeyLIRS; import org.h2.mvstore.cache.CacheLongKeyLIRS;
import org.h2.util.MathUtils; import org.h2.util.MathUtils;
import static org.h2.mvstore.MVMap.INITIAL_VERSION; import static org.h2.mvstore.MVMap.INITIAL_VERSION;
import org.h2.util.Utils;
/* /*
...@@ -150,7 +152,7 @@ public class MVStore { ...@@ -150,7 +152,7 @@ public class MVStore {
* It serves as a replacement for synchronized(this), except it allows for * It serves as a replacement for synchronized(this), except it allows for
* non-blocking lock attempts. * non-blocking lock attempts.
*/ */
private final ReentrantLock storeLock = new ReentrantLock(); private final ReentrantLock storeLock = new ReentrantLock(true);
/** /**
* The background thread, if any. * The background thread, if any.
...@@ -214,7 +216,7 @@ public class MVStore { ...@@ -214,7 +216,7 @@ public class MVStore {
private WriteBuffer writeBuffer; private WriteBuffer writeBuffer;
private int lastMapId; private final AtomicInteger lastMapId = new AtomicInteger();
private int versionsToKeep = 5; private int versionsToKeep = 5;
...@@ -355,11 +357,10 @@ public class MVStore { ...@@ -355,11 +357,10 @@ public class MVStore {
meta.init(); meta.init();
if (this.fileStore != null) { if (this.fileStore != null) {
retentionTime = this.fileStore.getDefaultRetentionTime(); retentionTime = this.fileStore.getDefaultRetentionTime();
int kb = DataUtils.getConfigParam(config, "autoCommitBufferSize", 1024);
// 19 KB memory is about 1 KB storage // 19 KB memory is about 1 KB storage
// TODO: maybe keep 19 MB as an upper bound, int kb = Math.max(1, Math.min(19, Utils.scaleForAvailableMemory(64))) * 1024;
// TODO: but derive actual value from the amount of RAM available kb = DataUtils.getConfigParam(config, "autoCommitBufferSize", kb);
autoCommitMemory = kb * 1024 * 19; autoCommitMemory = kb * 1024;
autoCompactFillRate = DataUtils.getConfigParam(config, "autoCompactFillRate", 40); autoCompactFillRate = DataUtils.getConfigParam(config, "autoCompactFillRate", 40);
char[] encryptionKey = (char[]) config.get("encryptionKey"); char[] encryptionKey = (char[]) config.get("encryptionKey");
try { try {
...@@ -478,34 +479,29 @@ public class MVStore { ...@@ -478,34 +479,29 @@ public class MVStore {
* @return the map * @return the map
*/ */
public <M extends MVMap<K, V>, K, V> M openMap(String name, MVMap.MapBuilder<M, K, V> builder) { public <M extends MVMap<K, V>, K, V> M openMap(String name, MVMap.MapBuilder<M, K, V> builder) {
storeLock.lock(); int id = getMapId(name);
try { M map;
int id = getMapId(name); if (id >= 0) {
M map; map = openMap(id, builder);
if (id >= 0) { } else {
map = openMap(id, builder); HashMap<String, Object> c = new HashMap<>();
} else { id = lastMapId.incrementAndGet();
HashMap<String, Object> c = new HashMap<>(); c.put("id", id);
id = ++lastMapId; c.put("createVersion", currentVersion);
c.put("id", id); map = builder.create(this, c);
c.put("createVersion", currentVersion); map.init();
map = builder.create(this, c); String x = Integer.toHexString(id);
map.init(); meta.put(MVMap.getMapKey(id), map.asString(name));
String x = Integer.toHexString(id); meta.put("name." + name, x);
meta.put(MVMap.getMapKey(id), map.asString(name)); map.setRootPos(0, lastStoredVersion);
meta.put("name." + name, x); markMetaChanged();
map.setRootPos(0, lastStoredVersion); @SuppressWarnings("unchecked")
markMetaChanged(); M existingMap = (M) maps.putIfAbsent(id, map);
@SuppressWarnings("unchecked") if (existingMap != null) {
M existingMap = (M) maps.putIfAbsent(id, map); map = existingMap;
if (existingMap != null) {
map = existingMap;
}
} }
return map;
} finally {
storeLock.unlock();
} }
return map;
} }
public <M extends MVMap<K, V>, K, V> M openMap(int id, MVMap.MapBuilder<M, K, V> builder) { public <M extends MVMap<K, V>, K, V> M openMap(int id, MVMap.MapBuilder<M, K, V> builder) {
...@@ -777,12 +773,12 @@ public class MVStore { ...@@ -777,12 +773,12 @@ public class MVStore {
lastChunk = last; lastChunk = last;
if (last == null) { if (last == null) {
// no valid chunk // no valid chunk
lastMapId = 0; lastMapId.set(0);
currentVersion = 0; currentVersion = 0;
lastStoredVersion = INITIAL_VERSION; lastStoredVersion = INITIAL_VERSION;
meta.setRootPos(0, INITIAL_VERSION); meta.setRootPos(0, INITIAL_VERSION);
} else { } else {
lastMapId = last.mapId; lastMapId.set(last.mapId);
currentVersion = last.version; currentVersion = last.version;
chunks.put(last.id, last); chunks.put(last.id, last);
lastStoredVersion = currentVersion - 1; lastStoredVersion = currentVersion - 1;
...@@ -1176,7 +1172,7 @@ public class MVStore { ...@@ -1176,7 +1172,7 @@ public class MVStore {
c.len = Integer.MAX_VALUE; c.len = Integer.MAX_VALUE;
c.time = time; c.time = time;
c.version = version; c.version = version;
c.mapId = lastMapId; c.mapId = lastMapId.get();
c.next = Long.MAX_VALUE; c.next = Long.MAX_VALUE;
chunks.put(c.id, c); chunks.put(c.id, c);
// force a metadata update // force a metadata update
...@@ -1921,26 +1917,31 @@ public class MVStore { ...@@ -1921,26 +1917,31 @@ public class MVStore {
return false; return false;
} }
checkOpen(); checkOpen();
// We can't wait fo lock here, because if called from the background thread, // We can't wait forever for the lock here,
// because if called from the background thread,
// it might go into deadlock with concurrent database closure // it might go into deadlock with concurrent database closure
// and attempt to stop this thread. // and attempt to stop this thread.
if (storeLock.tryLock()) { try {
try { if (storeLock.tryLock(10, TimeUnit.MILLISECONDS)) {
if (!compactInProgress) { try {
compactInProgress = true; if (!compactInProgress) {
ArrayList<Chunk> old = findOldChunks(targetFillRate, write); compactInProgress = true;
if (old == null || old.isEmpty()) { ArrayList<Chunk> old = findOldChunks(targetFillRate, write);
return false; if (old == null || old.isEmpty()) {
return false;
}
compactRewrite(old);
return true;
} }
compactRewrite(old); } finally {
return true; compactInProgress = false;
storeLock.unlock();
} }
} finally {
compactInProgress = false;
storeLock.unlock();
} }
return false;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} }
return false;
} }
/** /**
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论