提交 c3170f7d authored 作者: Thomas Mueller's avatar Thomas Mueller

LIRS cache: concurrent and concurrent with long key

上级 9dbf9138
......@@ -8,7 +8,7 @@ package org.h2.test.store;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.h2.dev.store.btree.CacheConcurrentLIRS;
import org.h2.dev.store.btree.CacheLongKeyLIRS;
import org.h2.test.TestBase;
import org.h2.util.Task;
......@@ -31,12 +31,25 @@ public class TestCacheConcurrentLIRS extends TestBase {
}
private static void testConcurrent() {
final CacheConcurrentLIRS<Integer, Integer> test = CacheConcurrentLIRS.newInstance(100, 1);
final CacheLongKeyLIRS<Integer> test = CacheLongKeyLIRS.newInstance(100, 1);
int threadCount = 8;
final CountDownLatch wait = new CountDownLatch(1);
final AtomicBoolean stopped = new AtomicBoolean();
Task[] tasks = new Task[threadCount];
final int[] getCounts = new int[threadCount];
final int offset = 1000000;
for (int i = 0; i < 100; i++) {
test.put(offset + i, i);
}
final int[] keys = new int[1000];
Random random = new Random(1);
for (int i = 0; i < keys.length; i++) {
int key;
do {
key = (int) Math.abs(random.nextGaussian() * 50);
} while (key > 100);
keys[i] = key;
}
for (int i = 0; i < threadCount; i++) {
final int x = i;
Task t = new Task() {
......@@ -46,13 +59,10 @@ public class TestCacheConcurrentLIRS extends TestBase {
wait.await();
int i = 0;
for (; !stopped.get(); i++) {
int key;
do {
key = (int) Math.abs(random.nextGaussian() * 50);
} while (key > 100);
test.get(key);
int key = keys[random.nextInt(keys.length)];
test.get(offset + key);
if ((i & 127) == 0) {
test.put(random.nextInt(100), random.nextInt());
test.put(offset + random.nextInt(100), random.nextInt());
}
}
getCounts[x] = i;
......
......@@ -82,6 +82,18 @@ public class TestCacheLIRS extends TestBase {
verifyMapSize(769, 2048);
CacheLIRS<Integer, Integer> test;
test = CacheLIRS.newInstance(3, 10);
test.put(0, 0, 9);
test.put(1, 10, 9);
test.put(2, 20, 9);
test.put(3, 30, 9);
test.put(4, 40, 9);
test = CacheLIRS.newInstance(1, 1);
test.put(1, 10);
test.put(0, 0);
test.get(0);
test = CacheLIRS.newInstance(1000, 1);
for (int j = 0; j < 2000; j++) {
test.put(j, j);
......
......@@ -13,6 +13,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
/**
* A scan resistant cache. It is meant to cache objects that are relatively
......@@ -32,16 +33,18 @@ import java.util.Set;
* prevent unbound memory usage. The maximum size of this queue is at most the
* size of the rest of the stack. About 6.25% of the mapped entries are cold.
* <p>
* Internally, the cache is split into 16 segments, and each segment is an
* individual LIRS cache. Accessed entries are only moved to the top of the
* stack if at least 20 other entries have been moved to the front. Write access
* and moving entries to the top of the stack is synchronized per segment.
* Internally, the cache is split into a number of segments, and each segment is
* an individual LIRS cache.
* <p>
* Accessed entries are only moved to the top of the stack if at least a number
* of other entries have been moved to the front. Write access and moving
* entries to the top of the stack is synchronized per segment.
*
* @author Thomas Mueller
* @param <K> the key type
* @param <V> the value type
*/
public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<K, V> {
public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V> {
/**
* The maximum memory this cache should use.
......@@ -55,24 +58,29 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
private Segment<K, V>[] segments;
private int segmentCount;
private int segmentShift;
private int segmentMask;
private final int stackMoveDistance;
private CacheConcurrentLIRS(long maxMemory, int averageMemory) {
this.maxMemory = maxMemory;
this.averageMemory = averageMemory;
private CacheConcurrentLIRS(long maxMemory, int averageMemory, int segmentCount, int stackMoveDistance) {
setMaxMemory(maxMemory);
setAverageMemory(averageMemory);
if (Integer.bitCount(segmentCount) != 1) {
throw new IllegalArgumentException("The segment count must be a power of 2, is " + segmentCount);
}
this.segmentCount = segmentCount;
this.stackMoveDistance = stackMoveDistance;
clear();
}
@SuppressWarnings("unchecked")
public void clear() {
// must be a power of 2
int count = 16;
segmentMask = count - 1;
segments = new Segment[count];
for (int i = 0; i < count; i++) {
segmentMask = segmentCount - 1;
segments = new Segment[segmentCount];
for (int i = 0; i < segmentCount; i++) {
segments[i] = new Segment<K, V>(
1 + maxMemory / count, averageMemory);
1 + maxMemory / segmentCount, averageMemory, stackMoveDistance);
}
segmentShift = Integer.numberOfTrailingZeros(segments[0].sizeMapArray());
}
......@@ -132,6 +140,53 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
return put(key, value, averageMemory);
}
public V putIfAbsent(K key, V value) {
int todo;
if (containsKey(key)) {
return get(key);
}
return put(key, value);
}
public boolean remove(Object key, Object value) {
int todo;
Entry<K, V> e = find(key);
if (e != null) {
V x = e.value;
if (x != null && x.equals(value)) {
remove(key);
return true;
}
}
return false;
}
public boolean replace(K key, V oldValue, V newValue) {
int todo;
Entry<K, V> e = find(key);
if (e != null) {
V x = e.value;
if (x != null && x.equals(oldValue)) {
put(key, newValue);
return true;
}
}
return false;
}
public V replace(K key, V value) {
int todo;
Entry<K, V> e = find(key);
if (e != null) {
V x = e.value;
if (x != null) {
put(key, value);
return x;
}
}
return null;
}
/**
* Remove an entry. Both resident and non-resident entries can be removed.
*
......@@ -194,6 +249,24 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
return x;
}
/**
* Set the average memory used per entry. It is used to calculate the length
* of the internal array.
*
* @param averageMemory the average memory used (1 or larger)
*/
public void setAverageMemory(int averageMemory) {
if (averageMemory <= 0) {
throw new IllegalArgumentException("Average memory must be larger than 0");
}
this.averageMemory = averageMemory;
if (segments != null) {
for (Segment<K, V> s : segments) {
s.setAverageMemory(averageMemory);
}
}
}
/**
* Set the maximum memory this cache should use. This will not immediately
* cause entries to get removed however; it will only change the limit. To
......@@ -206,10 +279,12 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
throw new IllegalArgumentException("Max memory must be larger than 0");
}
this.maxMemory = maxMemory;
if (segments != null) {
for (Segment<K, V> s : segments) {
s.setMaxMemory(1 + maxMemory / segments.length);
}
}
}
/**
* Get the average memory used per entry.
......@@ -236,10 +311,14 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
*
* @param maxMemory the maximum memory to use (1 or larger)
* @param averageMemory the average memory (1 or larger)
* @param segmentCount the number of cache segments (must be a power of 2)
* @param stackMoveDistance how many other item are to be moved to the top
* of the stack before the current item is moved
* @return the cache
*/
public static <K, V> CacheConcurrentLIRS<K, V> newInstance(int maxMemory, int averageMemory) {
return new CacheConcurrentLIRS<K, V>(maxMemory, averageMemory);
public static <K, V> CacheConcurrentLIRS<K, V> newInstance(int maxMemory,
int averageMemory, int segmentCount, int stackMoveDistance) {
return new CacheConcurrentLIRS<K, V>(maxMemory, averageMemory, segmentCount, stackMoveDistance);
}
/**
......@@ -321,7 +400,7 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
}
/**
* Get the list of keys. This method allows a view of the internal state of
* Get the list of keys. This method allows to read the internal state of
* the cache.
*
* @param cold if true, only keys for the cold entries are returned
......@@ -344,12 +423,11 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
*/
static class Segment<K, V> {
/**
* How many other item are to be moved to the top of the stack before
* the current item is moved.
*/
private int stackMoveDistance = 20;
private final int stackMoveDistance;
/**
* The maximum memory this cache should use.
......@@ -423,10 +501,13 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
*
* @param maxMemory the maximum memory to use
* @param averageMemory the average memory usage of an object
* @param stackMoveDistance the number of other entries to be moved to
* the top of the stack before moving an entry to the top
*/
Segment(long maxMemory, int averageMemory) {
Segment(long maxMemory, int averageMemory, int stackMoveDistance) {
setMaxMemory(maxMemory);
setAverageMemory(averageMemory);
this.stackMoveDistance = stackMoveDistance;
clear();
}
......@@ -621,7 +702,8 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
/**
* Evict cold entries (resident and non-resident) until the memory limit is
* reached.
* reached. The new entry is added as a cold entry, except if it is the only
* entry.
*
* @param newCold a new cold entry
*/
......@@ -632,10 +714,13 @@ public class CacheConcurrentLIRS<K, V> extends AbstractMap<K, V> implements Map<
while ((queueSize << 5) < mapSize) {
convertOldestHotToCold();
}
if (stackSize > 0) {
// the new cold entry is at the top of the queue
addToQueue(queue, newCold);
}
// the oldest resident cold entries become non-resident
while (usedMemory > maxMemory) {
// but at least one cold entry (the new one) must stay
while (usedMemory > maxMemory && queueSize > 1) {
Entry<K, V> e = queue.queuePrev;
usedMemory -= e.memory;
removeFromQueue(e);
......
......@@ -18,7 +18,7 @@ import java.util.Set;
* A scan resistant cache. It is meant to cache objects that are relatively
* costly to acquire, for example file content.
* <p>
* This implementation is not multi-threading save. Null keys or null values are
* This implementation is not multi-threading safe. Null keys or null values are
* not allowed. The map fill factor is at most 75%.
* <p>
* Each entry is assigned a distinct memory size, and the cache will try to use
......@@ -201,7 +201,7 @@ public class CacheLIRS<K, V> extends AbstractMap<K, V> implements Map<K, V> {
return null;
} else if (e.isHot()) {
if (e != stack.stackNext) {
// move a hot entries to the top of the stack
// move a hot entry to the top of the stack
// unless it is already there
boolean wasEnd = e == stack.stackPrev;
removeFromStack(e);
......@@ -343,21 +343,25 @@ public class CacheLIRS<K, V> extends AbstractMap<K, V> implements Map<K, V> {
/**
* Evict cold entries (resident and non-resident) until the memory limit is
* reached.
* reached. The new entry is added as a cold entry, except if it is the only
* entry.
*
* @param newCold a new cold entry
* @param newEntry a new entry
*/
private void evict(Entry<K, V> newCold) {
private void evict(Entry<K, V> newEntry) {
// ensure there are not too many hot entries:
// left shift of 5 is multiplication by 32, that means if there are less
// than 1/32 (3.125%) cold entries, a new hot entry needs to become cold
while ((queueSize << 5) < mapSize) {
convertOldestHotToCold();
}
if (stackSize > 0) {
// the new cold entry is at the top of the queue
addToQueue(queue, newCold);
addToQueue(queue, newEntry);
}
// the oldest resident cold entries become non-resident
while (usedMemory > maxMemory) {
// but at least one cold entry (the new one) must stay
while (usedMemory > maxMemory && queueSize > 1) {
Entry<K, V> e = queue.queuePrev;
usedMemory -= e.memory;
removeFromQueue(e);
......@@ -459,7 +463,7 @@ public class CacheLIRS<K, V> extends AbstractMap<K, V> implements Map<K, V> {
}
/**
* Get the list of keys. This method allows to view the internal state of
* Get the list of keys. This method allows to read the internal state of
* the cache.
*
* @param cold if true, only keys for the cold entries are returned
......@@ -492,7 +496,7 @@ public class CacheLIRS<K, V> extends AbstractMap<K, V> implements Map<K, V> {
/**
* Check whether there is a resident entry for the given key. This method
* does not adjusts the internal state of the cache.
* does not adjust the internal state of the cache.
*
* @param key the key (may not be null)
* @return true if there is a resident entry
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论