提交 c1dd5a5c authored 作者: Evgenij Ryazanov's avatar Evgenij Ryazanov

Handle interrupts in FilePathAsync and add it to tests

上级 57843714
...@@ -15,6 +15,7 @@ import java.nio.file.OpenOption; ...@@ -15,6 +15,7 @@ import java.nio.file.OpenOption;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/** /**
* This file system stores files on disk and uses * This file system stores files on disk and uses
...@@ -81,6 +82,23 @@ class FileAsync extends FileBase { ...@@ -81,6 +82,23 @@ class FileAsync extends FileBase {
private long position; private long position;
private static <T> T complete(Future<T> future) throws IOException {
boolean interrupted = false;
for (;;) {
try {
T result = future.get();
if (interrupted) {
Thread.currentThread().interrupt();
}
return result;
} catch (InterruptedException e) {
interrupted = true;
} catch (ExecutionException e) {
throw new IOException(e.getCause());
}
}
}
FileAsync(String fileName, String mode) throws IOException { FileAsync(String fileName, String mode) throws IOException {
this.name = fileName; this.name = fileName;
OpenOption[] options; OpenOption[] options;
...@@ -120,14 +138,9 @@ class FileAsync extends FileBase { ...@@ -120,14 +138,9 @@ class FileAsync extends FileBase {
@Override @Override
public int read(ByteBuffer dst) throws IOException { public int read(ByteBuffer dst) throws IOException {
int read; int read = complete(channel.read(dst, position));
try { if (read > 0) {
read = channel.read(dst, position).get(); position += read;
if (read > 0) {
position += read;
}
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
} }
return read; return read;
} }
...@@ -140,21 +153,15 @@ class FileAsync extends FileBase { ...@@ -140,21 +153,15 @@ class FileAsync extends FileBase {
@Override @Override
public int read(ByteBuffer dst, long position) throws IOException { public int read(ByteBuffer dst, long position) throws IOException {
try { return complete(channel.read(dst, position));
return channel.read(dst, position).get();
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
}
} }
@Override @Override
public int write(ByteBuffer src, long position) throws IOException { public int write(ByteBuffer src, long position) throws IOException {
try { try {
return channel.write(src, position).get(); return complete(channel.write(src, position));
} catch (NonWritableChannelException e) { } catch (NonWritableChannelException e) {
throw new IOException("read only"); throw new IOException("read only");
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
} }
} }
...@@ -174,16 +181,14 @@ class FileAsync extends FileBase { ...@@ -174,16 +181,14 @@ class FileAsync extends FileBase {
@Override @Override
public int write(ByteBuffer src) throws IOException { public int write(ByteBuffer src) throws IOException {
int read; int written;
try { try {
read = channel.write(src, position).get(); written = complete(channel.write(src, position));
position += read; position += written;
} catch (NonWritableChannelException e) { } catch (NonWritableChannelException e) {
throw new IOException("read only"); throw new IOException("read only");
} catch (InterruptedException | ExecutionException e) {
throw new IOException(e);
} }
return read; return written;
} }
@Override @Override
......
...@@ -47,7 +47,8 @@ public class TestConcurrent extends TestMVStore { ...@@ -47,7 +47,8 @@ public class TestConcurrent extends TestMVStore {
@Override @Override
public void test() throws Exception { public void test() throws Exception {
FileUtils.createDirectories(getBaseDir()); FileUtils.createDirectories(getBaseDir());
testInterruptReopen(); testInterruptReopenAsync();
testInterruptReopenRetryNIO();
testConcurrentSaveCompact(); testConcurrentSaveCompact();
testConcurrentDataType(); testConcurrentDataType();
testConcurrentAutoCommitAndChange(); testConcurrentAutoCommitAndChange();
...@@ -64,8 +65,16 @@ public class TestConcurrent extends TestMVStore { ...@@ -64,8 +65,16 @@ public class TestConcurrent extends TestMVStore {
testConcurrentRead(); testConcurrentRead();
} }
private void testInterruptReopen() { private void testInterruptReopenAsync() {
String fileName = "retry:nio:" + getBaseDir() + "/" + getTestName(); testInterruptReopen("async:");
}
private void testInterruptReopenRetryNIO() {
testInterruptReopen("retry:nio:");
}
private void testInterruptReopen(String prefix) {
String fileName = prefix + getBaseDir() + "/" + getTestName();
FileUtils.delete(fileName); FileUtils.delete(fileName);
final MVStore s = new MVStore.Builder(). final MVStore s = new MVStore.Builder().
fileName(fileName). fileName(fileName).
......
...@@ -83,6 +83,7 @@ public class TestFileSystem extends TestDb { ...@@ -83,6 +83,7 @@ public class TestFileSystem extends TestDb {
String f = "split:10:" + getBaseDir() + "/fs"; String f = "split:10:" + getBaseDir() + "/fs";
FileUtils.toRealPath(f); FileUtils.toRealPath(f);
testFileSystem(getBaseDir() + "/fs"); testFileSystem(getBaseDir() + "/fs");
testFileSystem("async:" + getBaseDir() + "/fs");
testFileSystem("memFS:"); testFileSystem("memFS:");
testFileSystem("memLZF:"); testFileSystem("memLZF:");
testFileSystem("nioMemFS:"); testFileSystem("nioMemFS:");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论