提交 0b9fb0ed authored 作者: Thomas Mueller's avatar Thomas Mueller

Test power failure with re-ordered write operations (WIP)

上级 441aa453
...@@ -107,6 +107,7 @@ import org.h2.test.mvcc.TestMvcc1; ...@@ -107,6 +107,7 @@ import org.h2.test.mvcc.TestMvcc1;
import org.h2.test.mvcc.TestMvcc2; import org.h2.test.mvcc.TestMvcc2;
import org.h2.test.mvcc.TestMvcc3; import org.h2.test.mvcc.TestMvcc3;
import org.h2.test.mvcc.TestMvccMultiThreaded; import org.h2.test.mvcc.TestMvccMultiThreaded;
import org.h2.test.poweroff.TestReorderWrites;
import org.h2.test.rowlock.TestRowLocks; import org.h2.test.rowlock.TestRowLocks;
import org.h2.test.server.TestAutoServer; import org.h2.test.server.TestAutoServer;
import org.h2.test.server.TestInit; import org.h2.test.server.TestInit;
...@@ -767,6 +768,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1` ...@@ -767,6 +768,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
addTest(new TestMemoryUsage()); addTest(new TestMemoryUsage());
addTest(new TestMultiThread()); addTest(new TestMultiThread());
addTest(new TestPowerOff()); addTest(new TestPowerOff());
addTest(new TestReorderWrites());
addTest(new TestRandomSQL()); addTest(new TestRandomSQL());
addTest(new TestQueryCache()); addTest(new TestQueryCache());
addTest(new TestUrlJavaObjectSerializer()); addTest(new TestUrlJavaObjectSerializer());
......
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.test.poweroff;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import org.h2.mvstore.MVStore;
import org.h2.store.fs.FilePath;
import org.h2.store.fs.FileUtils;
import org.h2.test.TestBase;
import org.h2.test.utils.FilePathReorderWrites;
/**
* Tests that the MVStore recovers from a power failure if the file system or
* disk re-ordered the write operations.
*/
public class TestReorderWrites extends TestBase {
/**
* Run just this test.
*
* @param a ignored
*/
public static void main(String... a) throws Exception {
TestBase.createCaller().init().test();
}
@Override
public void test() throws Exception {
testFileSystem();
// testMVStore();
}
private void testMVStore() {
FilePathReorderWrites fs = FilePathReorderWrites.register();
String fileName = "reorder:memFS:test.mv";
Random r = new Random(1);
for (int i = 0; i < 100; i++) {
System.out.println(i + " tst --------------------------------");
fs.setPowerOffCountdown(100, i);
FileUtils.delete(fileName);
MVStore store = new MVStore.Builder().
fileName(fileName).
autoCommitDisabled().open();
// store.setRetentionTime(10);
Map<Integer, byte[]> map = store.openMap("data");
map.put(0, new byte[1]);
store.commit();
// if (r.nextBoolean()) {
store.getFileStore().sync();
//}
fs.setPowerOffCountdown(4 + r.nextInt(20), i);
try {
for (int j = 1; j < 100; j++) {
Map<Integer, Integer> newMap = store.openMap("d" + j);
newMap.put(j, j * 10);
int key = r.nextInt(10);
int len = 10 * r.nextInt(1000);
if (r.nextBoolean()) {
map.remove(key);
} else {
map.put(key, new byte[len]);
}
store.commit();
}
// write has to fail at some point
fail();
} catch (IllegalStateException e) {
// expected
}
store.close();
System.out.println("-------------------------------- test");
fs.setPowerOffCountdown(100, 0);
System.out.println("file size: " + FileUtils.size(fileName));
store = new MVStore.Builder().
fileName(fileName).
autoCommitDisabled().open();
map = store.openMap("data");
assertEquals(1, map.get(0).length);
for (int j = 0; j < 100; j++) {
Map<Integer, Integer> newMap = store.openMap("d" + j);
newMap.get(j);
}
// map.keySet();
store.close();
}
}
private void testFileSystem() throws IOException {
FilePathReorderWrites fs = FilePathReorderWrites.register();
String fileName = "reorder:memFS:test";
ByteBuffer empty = ByteBuffer.allocate(1024);
Random r = new Random(1);
long minSize = Long.MAX_VALUE;
long maxSize = 0;
int minWritten = Integer.MAX_VALUE;
int maxWritten = 0;
for (int i = 0; i < 100; i++) {
fs.setPowerOffCountdown(100, i);
FileUtils.delete(fileName);
FileChannel fc = FilePath.get(fileName).open("rw");
for (int j = 0; j < 20; j++) {
fc.write(empty, j * 1024);
empty.flip();
}
fs.setPowerOffCountdown(4 + r.nextInt(20), i);
int lastWritten = 0;
int lastTruncated = 0;
for (int j = 20; j >= 0; j--) {
try {
byte[] bytes = new byte[1024];
Arrays.fill(bytes, (byte) j);
ByteBuffer data = ByteBuffer.wrap(bytes);
fc.write(data, 0);
lastWritten = j;
} catch (IOException e) {
// expected
break;
}
try {
fc.truncate(j * 1024);
lastTruncated = j * 1024;
} catch (IOException e) {
// expected
break;
}
}
if (lastTruncated <= 0 || lastWritten <= 0) {
fail();
}
fs.setPowerOffCountdown(100, 0);
fc = FilePath.get(fileName).open("rw");
ByteBuffer data = ByteBuffer.allocate(1024);
fc.read(data, 0);
data.flip();
int got = data.get();
long size = fc.size();
minSize = Math.min(minSize, size);
maxSize = Math.max(minSize, size);
minWritten = Math.min(minWritten, got);
maxWritten = Math.max(maxWritten, got);
}
assertTrue(minSize < maxSize);
assertTrue(minWritten < maxWritten);
}
}
/*
* Copyright 2004-2014 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.test.utils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.ArrayList;
import java.util.Random;
import org.h2.store.fs.FileBase;
import org.h2.store.fs.FilePath;
import org.h2.store.fs.FilePathWrapper;
import org.h2.util.IOUtils;
/**
* An unstable file system. It is used to simulate file system problems (for
* example out of disk space).
*/
public class FilePathReorderWrites extends FilePathWrapper {
static final boolean TRACE = false;
private static final FilePathReorderWrites INSTANCE = new FilePathReorderWrites();
private static final IOException POWER_FAILURE = new IOException("Power Failure");
private static int powerFailureCountdown;
private static boolean partialWrites;
private static Random random = new Random(1);
/**
* Register the file system.
*
* @return the instance
*/
public static FilePathReorderWrites register() {
FilePath.register(INSTANCE);
return INSTANCE;
}
/**
* Set the number of write operations before a simulated power failure, and the
* random seed (for partial writes).
*
* @param count the number of write operations (0 to never fail,
* Integer.MAX_VALUE to count the operations)
* @param seed the new seed
*/
public void setPowerOffCountdown(int count, int seed) {
powerFailureCountdown = count;
random.setSeed(seed);
}
public int getPowerOffCountdown() {
return powerFailureCountdown;
}
/**
* Whether partial writes are possible (writing only part of the data).
*
* @param partialWrites true to enable
*/
public void setPartialWrites(boolean partialWrites) {
FilePathReorderWrites.partialWrites = partialWrites;
}
boolean getPartialWrites() {
return partialWrites;
}
/**
* Get a buffer with a subset (the head) of the data of the source buffer.
*
* @param src the source buffer
* @return a buffer with a subset of the data
*/
ByteBuffer getRandomSubset(ByteBuffer src) {
int len = src.remaining();
len = Math.min(4096, Math.min(len, 1 + random.nextInt(len)));
ByteBuffer temp = ByteBuffer.allocate(len);
src.get(temp.array());
return temp;
}
Random getRandom() {
return random;
}
/**
* Check if the simulated problem occurred.
* This call will decrement the countdown.
*
* @throws IOException if the simulated power failure occurred
*/
void checkError() throws IOException {
if (powerFailureCountdown == 0) {
return;
}
if (--powerFailureCountdown > 0) {
return;
}
if (powerFailureCountdown >= -1) {
powerFailureCountdown--;
throw POWER_FAILURE;
}
}
@Override
public FileChannel open(String mode) throws IOException {
InputStream in = newInputStream();
FilePath copy = FilePath.get(getBase().toString() + ".copy");
OutputStream out = copy.newOutputStream(false);
IOUtils.copy(in, out);
FileChannel base = getBase().open(mode);
FileChannel readBase = copy.open(mode);
return new FilePowerFailure(this, base, readBase);
}
@Override
public String getScheme() {
return "reorder";
}
public long getMaxAge() {
// TODO implement, configurable
return 45000;
}
}
/**
* An file that checks for errors before each write operation.
*/
class FilePowerFailure extends FileBase {
private final FilePathReorderWrites file;
/**
* The base channel, where not all operations are immediately applied.
*/
private final FileChannel base;
/**
* The base channel that is used for reading, where all operations are immediately applied to get a consistent view before a power failure.
*/
private final FileChannel readBase;
private boolean closed;
/**
* The list of not yet applied to the base channel. It is sorted by time.
*/
private ArrayList<FileOperation> notAppliedList = new ArrayList<FileOperation>();
private int id;
FilePowerFailure(FilePathReorderWrites file, FileChannel base, FileChannel readBase) {
this.file = file;
this.base = base;
this.readBase = readBase;
}
@Override
public void implCloseChannel() throws IOException {
base.close();
readBase.close();
closed = true;
}
@Override
public long position() throws IOException {
return readBase.position();
}
@Override
public long size() throws IOException {
return readBase.size();
}
@Override
public int read(ByteBuffer dst) throws IOException {
return readBase.read(dst);
}
@Override
public int read(ByteBuffer dst, long pos) throws IOException {
return readBase.read(dst, pos);
}
@Override
public FileChannel position(long pos) throws IOException {
readBase.position(pos);
return this;
}
@Override
public FileChannel truncate(long newSize) throws IOException {
long oldSize = readBase.size();
if (oldSize <= newSize) {
return this;
}
addOperation(new FileOperation(id++, newSize, null));
return this;
}
private int addOperation(FileOperation op) throws IOException {
trace("op " + op);
checkError();
notAppliedList.add(op);
long now = op.time;
for (int i = 0; i < notAppliedList.size() - 1; i++) {
FileOperation old = notAppliedList.get(i);
boolean applyOld = false;
// String reason = "";
if (old.time + 45000 < now) {
// reason = "old";
applyOld = true;
} else if (old.overlaps(op)) {
// reason = "overlap";
applyOld = true;
} else if (file.getRandom().nextInt(100) < 10) {
// reason = "random";
applyOld = true;
}
if (applyOld) {
trace("op apply " + op);
old.apply(base);
notAppliedList.remove(i);
i--;
}
}
return op.apply(readBase);
}
private void applyAll() throws IOException {
trace("applyAll");
for (FileOperation op : notAppliedList) {
op.apply(base);
}
notAppliedList.clear();
}
@Override
public void force(boolean metaData) throws IOException {
checkError();
readBase.force(metaData);
applyAll();
}
@Override
public int write(ByteBuffer src) throws IOException {
return addOperation(new FileOperation(id++, readBase.position(), src));
}
@Override
public int write(ByteBuffer src, long position) throws IOException {
return addOperation(new FileOperation(id++, position, src));
}
private void checkError() throws IOException {
if (closed) {
throw new IOException("Closed");
}
file.checkError();
}
@Override
public synchronized FileLock tryLock(long position, long size,
boolean shared) throws IOException {
return readBase.tryLock(position, size, shared);
}
@Override
public String toString() {
return file.getScheme() + ":" + file.toString();
}
private static void trace(String message) {
if (FilePathReorderWrites.TRACE) {
System.out.println(message);
}
}
/**
* A file operation (that might be re-ordered with other operations, or not
* be applied on power failure).
*/
static class FileOperation {
final int id;
final long time;
final ByteBuffer buffer;
final long position;
FileOperation(int id, long position, ByteBuffer src) {
this.id = id;
this.time = System.currentTimeMillis();
if (src == null) {
buffer = null;
} else {
int len = src.limit() - src.position();
this.buffer = ByteBuffer.allocate(len);
buffer.put(src);
buffer.flip();
}
this.position = position;
}
public boolean overlaps(FileOperation other) {
if (isTruncate() && other.isTruncate()) {
// we just keep the latest truncate operation
return true;
}
if (isTruncate()) {
return position < other.getEndPosition();
} else if (other.isTruncate()) {
return getEndPosition() > other.position;
}
return position < other.getEndPosition() &&
getEndPosition() > other.position;
}
private boolean isTruncate() {
return buffer == null;
}
private long getEndPosition() {
return position + getLength();
}
private int getLength() {
return buffer == null ? 0 : buffer.limit() - buffer.position();
}
int apply(FileChannel channel) throws IOException {
if (isTruncate()) {
channel.truncate(position);
return -1;
}
// TODO support the case were part is not written
int len = channel.write(buffer, position);
buffer.flip();
return len;
}
@Override
public String toString() {
String s = "[" + id + "]: @" + position + (
isTruncate() ? "-truncate" : ("+" + getLength()));
return s;
}
}
}
\ No newline at end of file
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论