提交 88c6bd3a authored 作者: Thomas Mueller Graf's avatar Thomas Mueller Graf

File systems cache, memLZF, nioMemLZF did not support concurrent writes

上级 159fe23a
...@@ -74,6 +74,9 @@ changed from Types.OTHER (1111) to Types.TIMESTAMP_WITH_TIMEZONE (2014) ...@@ -74,6 +74,9 @@ changed from Types.OTHER (1111) to Types.TIMESTAMP_WITH_TIMEZONE (2014)
</li> </li>
<li>ResultSet.getObject(..., Class) threw a ClassNotFoundException if the JTS suite was not in the classpath. <li>ResultSet.getObject(..., Class) threw a ClassNotFoundException if the JTS suite was not in the classpath.
</li> </li>
<li>File systems: the "cache:" file system, and the
compressed in-memory file systems memLZF and nioMemLZF did not
correctly support concurrent reading and writing.
</ul> </ul>
<h2>Version 1.4.193 Beta (2016-10-31)</h2> <h2>Version 1.4.193 Beta (2016-10-31)</h2>
......
...@@ -87,7 +87,7 @@ public class FilePathCache extends FilePathWrapper { ...@@ -87,7 +87,7 @@ public class FilePathCache extends FilePathWrapper {
} }
@Override @Override
public int read(ByteBuffer dst, long position) throws IOException { public synchronized int read(ByteBuffer dst, long position) throws IOException {
long cachePos = getCachePos(position); long cachePos = getCachePos(position);
int off = (int) (position - cachePos); int off = (int) (position - cachePos);
int len = CACHE_BLOCK_SIZE - off; int len = CACHE_BLOCK_SIZE - off;
...@@ -130,20 +130,20 @@ public class FilePathCache extends FilePathWrapper { ...@@ -130,20 +130,20 @@ public class FilePathCache extends FilePathWrapper {
} }
@Override @Override
public FileChannel truncate(long newSize) throws IOException { public synchronized FileChannel truncate(long newSize) throws IOException {
cache.clear(); cache.clear();
base.truncate(newSize); base.truncate(newSize);
return this; return this;
} }
@Override @Override
public int write(ByteBuffer src, long position) throws IOException { public synchronized int write(ByteBuffer src, long position) throws IOException {
clearCache(src, position); clearCache(src, position);
return base.write(src, position); return base.write(src, position);
} }
@Override @Override
public int write(ByteBuffer src) throws IOException { public synchronized int write(ByteBuffer src) throws IOException {
clearCache(src, position()); clearCache(src, position());
return base.write(src); return base.write(src);
} }
......
...@@ -13,10 +13,13 @@ import java.nio.channels.FileChannel; ...@@ -13,10 +13,13 @@ import java.nio.channels.FileChannel;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
import java.nio.channels.NonWritableChannelException; import java.nio.channels.NonWritableChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import org.h2.api.ErrorCode; import org.h2.api.ErrorCode;
import org.h2.compress.CompressLZF; import org.h2.compress.CompressLZF;
import org.h2.message.DbException; import org.h2.message.DbException;
...@@ -433,7 +436,7 @@ class FileMemData { ...@@ -433,7 +436,7 @@ class FileMemData {
private final int id; private final int id;
private final boolean compress; private final boolean compress;
private long length; private long length;
private byte[][] data; private AtomicReference<byte[]>[] buffers;
private long lastModified; private long lastModified;
private boolean isReadOnly; private boolean isReadOnly;
private boolean isLockedExclusive; private boolean isLockedExclusive;
...@@ -446,14 +449,35 @@ class FileMemData { ...@@ -446,14 +449,35 @@ class FileMemData {
System.arraycopy(BUFFER, 0, COMPRESSED_EMPTY_BLOCK, 0, len); System.arraycopy(BUFFER, 0, COMPRESSED_EMPTY_BLOCK, 0, len);
} }
@SuppressWarnings("unchecked")
FileMemData(String name, boolean compress) { FileMemData(String name, boolean compress) {
this.name = name; this.name = name;
this.id = name.hashCode(); this.id = name.hashCode();
this.compress = compress; this.compress = compress;
data = new byte[0][]; this.buffers = new AtomicReference[0];
lastModified = System.currentTimeMillis(); lastModified = System.currentTimeMillis();
} }
byte[] getPage(int page) {
AtomicReference<byte[]>[] b = buffers;
if (page >= b.length) {
return null;
}
return b[page].get();
}
void setPage(int page, byte[] oldData, byte[] newData, boolean force) {
AtomicReference<byte[]>[] b = buffers;
if (page >= b.length) {
return;
}
if (force) {
b[page].set(newData);
} else {
b[page].compareAndSet(oldData, newData);
}
}
int getId() { int getId() {
return id; return id;
} }
...@@ -508,13 +532,18 @@ class FileMemData { ...@@ -508,13 +532,18 @@ class FileMemData {
this.size = size; this.size = size;
} }
@Override
public synchronized V put(K key, V value) {
return super.put(key, value);
}
@Override @Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
if (size() < size) { if (size() < size) {
return false; return false;
} }
CompressItem c = (CompressItem) eldest.getKey(); CompressItem c = (CompressItem) eldest.getKey();
c.file.compress(c.page, c.data); c.file.compress(c.page);
return true; return true;
} }
} }
...@@ -529,11 +558,6 @@ class FileMemData { ...@@ -529,11 +558,6 @@ class FileMemData {
*/ */
FileMemData file; FileMemData file;
/**
* The data array.
*/
byte[] data;
/** /**
* The page to compress. * The page to compress.
*/ */
...@@ -559,16 +583,15 @@ class FileMemData { ...@@ -559,16 +583,15 @@ class FileMemData {
CompressItem c = new CompressItem(); CompressItem c = new CompressItem();
c.file = this; c.file = this;
c.page = page; c.page = page;
c.data = data[page];
synchronized (LZF) { synchronized (LZF) {
COMPRESS_LATER.put(c, c); COMPRESS_LATER.put(c, c);
} }
} }
private void expand(int page) { private byte[] expand(int page) {
byte[] d = data[page]; byte[] d = getPage(page);
if (d.length == BLOCK_SIZE) { if (d.length == BLOCK_SIZE) {
return; return d;
} }
byte[] out = new byte[BLOCK_SIZE]; byte[] out = new byte[BLOCK_SIZE];
if (d != COMPRESSED_EMPTY_BLOCK) { if (d != COMPRESSED_EMPTY_BLOCK) {
...@@ -576,7 +599,8 @@ class FileMemData { ...@@ -576,7 +599,8 @@ class FileMemData {
LZF.expand(d, 0, d.length, out, 0, BLOCK_SIZE); LZF.expand(d, 0, d.length, out, 0, BLOCK_SIZE);
} }
} }
data[page] = out; setPage(page, d, out, false);
return out;
} }
/** /**
...@@ -585,26 +609,19 @@ class FileMemData { ...@@ -585,26 +609,19 @@ class FileMemData {
* @param page which page to compress * @param page which page to compress
* @param old the page array * @param old the page array
*/ */
void compress(int page, byte[] old) { void compress(int page) {
byte[][] array = data; byte[] old = getPage(page);
if (page >= array.length) { if (old == null || old.length != BLOCK_SIZE) {
return; // not yet initialized or already compressed
}
byte[] d = array[page];
if (d != old) {
return; return;
} }
synchronized (LZF) { synchronized (LZF) {
int len = LZF.compress(d, BLOCK_SIZE, BUFFER, 0); int len = LZF.compress(old, BLOCK_SIZE, BUFFER, 0);
if (len <= BLOCK_SIZE) { if (len <= BLOCK_SIZE) {
d = new byte[len]; byte[] d = new byte[len];
System.arraycopy(BUFFER, 0, d, 0, len); System.arraycopy(BUFFER, 0, d, 0, len);
// maybe data was changed in the meantime // maybe data was changed in the meantime
byte[] o = array[page]; setPage(page, old, d, false);
if (o != old) {
return;
}
array[page] = d;
} }
} }
} }
...@@ -640,11 +657,12 @@ class FileMemData { ...@@ -640,11 +657,12 @@ class FileMemData {
long end = MathUtils.roundUpLong(newLength, BLOCK_SIZE); long end = MathUtils.roundUpLong(newLength, BLOCK_SIZE);
if (end != newLength) { if (end != newLength) {
int lastPage = (int) (newLength >>> BLOCK_SIZE_SHIFT); int lastPage = (int) (newLength >>> BLOCK_SIZE_SHIFT);
expand(lastPage); byte[] d = expand(lastPage);
byte[] d = data[lastPage]; byte[] d2 = Arrays.copyOf(d, d.length);
for (int i = (int) (newLength & BLOCK_SIZE_MASK); i < BLOCK_SIZE; i++) { for (int i = (int) (newLength & BLOCK_SIZE_MASK); i < BLOCK_SIZE; i++) {
d[i] = 0; d2[i] = 0;
} }
setPage(lastPage, d, d2, true);
if (compress) { if (compress) {
compressLater(lastPage); compressLater(lastPage);
} }
...@@ -655,13 +673,12 @@ class FileMemData { ...@@ -655,13 +673,12 @@ class FileMemData {
length = len; length = len;
len = MathUtils.roundUpLong(len, BLOCK_SIZE); len = MathUtils.roundUpLong(len, BLOCK_SIZE);
int blocks = (int) (len >>> BLOCK_SIZE_SHIFT); int blocks = (int) (len >>> BLOCK_SIZE_SHIFT);
if (blocks != data.length) { if (blocks != buffers.length) {
byte[][] n = new byte[blocks][]; AtomicReference<byte[]>[] n = Arrays.copyOf(buffers, blocks);
System.arraycopy(data, 0, n, 0, Math.min(data.length, n.length)); for (int i = buffers.length; i < blocks; i++) {
for (int i = data.length; i < blocks; i++) { n[i] = new AtomicReference<byte[]>(COMPRESSED_EMPTY_BLOCK);
n[i] = COMPRESSED_EMPTY_BLOCK;
} }
data = n; buffers = n;
} }
} }
...@@ -687,11 +704,12 @@ class FileMemData { ...@@ -687,11 +704,12 @@ class FileMemData {
while (len > 0) { while (len > 0) {
int l = (int) Math.min(len, BLOCK_SIZE - (pos & BLOCK_SIZE_MASK)); int l = (int) Math.min(len, BLOCK_SIZE - (pos & BLOCK_SIZE_MASK));
int page = (int) (pos >>> BLOCK_SIZE_SHIFT); int page = (int) (pos >>> BLOCK_SIZE_SHIFT);
expand(page); byte[] block = expand(page);
byte[] block = data[page];
int blockOffset = (int) (pos & BLOCK_SIZE_MASK); int blockOffset = (int) (pos & BLOCK_SIZE_MASK);
if (write) { if (write) {
System.arraycopy(b, off, block, blockOffset, l); byte[] p2 = Arrays.copyOf(block, block.length);
System.arraycopy(b, off, p2, blockOffset, l);
setPage(page, block, p2, true);
} else { } else {
System.arraycopy(block, blockOffset, b, off, l); System.arraycopy(block, blockOffset, b, off, l);
} }
......
...@@ -432,11 +432,12 @@ class FileNioMemData { ...@@ -432,11 +432,12 @@ class FileNioMemData {
} }
}; };
final int nameHashCode;
private final CompressLaterCache<CompressItem, CompressItem> compressLaterCache = private final CompressLaterCache<CompressItem, CompressItem> compressLaterCache =
new CompressLaterCache<CompressItem, CompressItem>(CACHE_MIN_SIZE); new CompressLaterCache<CompressItem, CompressItem>(CACHE_MIN_SIZE);
private String name; private String name;
private final int nameHashCode;
private final boolean compress; private final boolean compress;
private final float compressLaterCachePercent; private final float compressLaterCachePercent;
private long length; private long length;
...@@ -515,6 +516,11 @@ class FileNioMemData { ...@@ -515,6 +516,11 @@ class FileNioMemData {
this.size = size; this.size = size;
} }
@Override
public synchronized V put(K key, V value) {
return super.put(key, value);
}
@Override @Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) { protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
if (size() < size) { if (size() < size) {
...@@ -577,8 +583,7 @@ class FileNioMemData { ...@@ -577,8 +583,7 @@ class FileNioMemData {
// already expanded, or not compressed // already expanded, or not compressed
return d; return d;
} }
synchronized (d) synchronized (d) {
{
if (d.capacity() == BLOCK_SIZE) { if (d.capacity() == BLOCK_SIZE) {
return d; return d;
} }
...@@ -599,10 +604,10 @@ class FileNioMemData { ...@@ -599,10 +604,10 @@ class FileNioMemData {
*/ */
void compressPage(int page) { void compressPage(int page) {
final ByteBuffer d = buffers[page].get(); final ByteBuffer d = buffers[page].get();
synchronized (d) synchronized (d) {
{
if (d.capacity() != BLOCK_SIZE) { if (d.capacity() != BLOCK_SIZE) {
return; // already compressed // already compressed
return;
} }
final byte[] compressOutputBuffer = COMPRESS_OUT_BUF_THREAD_LOCAL.get(); final byte[] compressOutputBuffer = COMPRESS_OUT_BUF_THREAD_LOCAL.get();
int len = LZF_THREAD_LOCAL.get().compress(d, 0, compressOutputBuffer, 0); int len = LZF_THREAD_LOCAL.get().compress(d, 0, compressOutputBuffer, 0);
...@@ -671,7 +676,8 @@ class FileNioMemData { ...@@ -671,7 +676,8 @@ class FileNioMemData {
} }
buffers = newBuffers; buffers = newBuffers;
} }
compressLaterCache.setCacheSize(Math.max(CACHE_MIN_SIZE, (int)(blocks * compressLaterCachePercent / 100))); compressLaterCache.setCacheSize(Math.max(CACHE_MIN_SIZE, (int) (blocks *
compressLaterCachePercent / 100)));
} }
/** /**
......
...@@ -37,6 +37,7 @@ import org.h2.test.utils.FilePathDebug; ...@@ -37,6 +37,7 @@ import org.h2.test.utils.FilePathDebug;
import org.h2.tools.Backup; import org.h2.tools.Backup;
import org.h2.tools.DeleteDbFiles; import org.h2.tools.DeleteDbFiles;
import org.h2.util.IOUtils; import org.h2.util.IOUtils;
import org.h2.util.Task;
/** /**
* Tests various file system. * Tests various file system.
...@@ -84,8 +85,9 @@ public class TestFileSystem extends TestBase { ...@@ -84,8 +85,9 @@ public class TestFileSystem extends TestBase {
testFileSystem("memFS:"); testFileSystem("memFS:");
testFileSystem("memLZF:"); testFileSystem("memLZF:");
testFileSystem("nioMemFS:"); testFileSystem("nioMemFS:");
testFileSystem("nioMemLZF:"); testFileSystem("nioMemLZF:1:");
testFileSystem("nioMemLZF:12:"); // 12% compressLaterCache // 12% compressLaterCache
testFileSystem("nioMemLZF:12:");
testFileSystem("rec:memFS:"); testFileSystem("rec:memFS:");
testUserHome(); testUserHome();
try { try {
...@@ -393,6 +395,7 @@ public class TestFileSystem extends TestBase { ...@@ -393,6 +395,7 @@ public class TestFileSystem extends TestBase {
} }
private void testFileSystem(String fsBase) throws Exception { private void testFileSystem(String fsBase) throws Exception {
testConcurrent(fsBase);
testRootExists(fsBase); testRootExists(fsBase);
testPositionedReadWrite(fsBase); testPositionedReadWrite(fsBase);
testSetReadOnly(fsBase); testSetReadOnly(fsBase);
...@@ -800,4 +803,67 @@ public class TestFileSystem extends TestBase { ...@@ -800,4 +803,67 @@ public class TestFileSystem extends TestBase {
FileUtils.delete(s); FileUtils.delete(s);
} }
private void testConcurrent(String fsBase) throws Exception {
String s = FileUtils.createTempFile(fsBase + "/tmp", ".tmp", false, false);
File file = new File(TestBase.BASE_TEST_DIR + "/tmp");
file.getParentFile().mkdirs();
file.delete();
RandomAccessFile ra = new RandomAccessFile(file, "rw");
FileUtils.delete(s);
final FileChannel f = FileUtils.open(s, "rw");
final int size = getSize(100, 500);
f.write(ByteBuffer.allocate(size * 64 * 1024));
Random random = new Random(1);
Task task = new Task() {
@Override
public void call() throws Exception {
ByteBuffer byteBuff = ByteBuffer.allocate(16);
while (!stop) {
for (int pos = 0; pos < size; pos++) {
byteBuff.clear();
f.read(byteBuff, pos * 64 * 1024);
byteBuff.position(0);
int x = byteBuff.getInt();
int y = byteBuff.getInt();
assertEquals(x, y);
Thread.yield();
}
}
}
};
task.execute();
int[] data = new int[size];
try {
ByteBuffer byteBuff = ByteBuffer.allocate(16);
int operations = 10000;
for (int i = 0; i < operations; i++) {
byteBuff.position(0);
byteBuff.putInt(i);
byteBuff.putInt(i);
byteBuff.flip();
int pos = random.nextInt(size);
f.write(byteBuff, pos * 64 * 1024);
data[pos] = i;
pos = random.nextInt(size);
byteBuff.clear();
f.read(byteBuff, pos * 64 * 1024);
byteBuff.limit(16);
byteBuff.position(0);
int x = byteBuff.getInt();
int y = byteBuff.getInt();
assertEquals(x, y);
assertEquals(data[pos], x);
}
} catch (Throwable e) {
e.printStackTrace();
fail("Exception: " + e);
} finally {
task.get();
f.close();
ra.close();
file.delete();
FileUtils.delete(s);
}
}
} }
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论