提交 240ec323 authored 作者: Thomas Mueller's avatar Thomas Mueller

Version 1.3: when re-running the same query so that the previous result was…

Version 1.3: when re-running the same query so that the previous result was re-used, and if the result set was large so that it was stored externally (in a temp file or in a temp table), then reading rows from both result sets resulted in an exception or wrong behavior.
上级 7beff649
...@@ -37,7 +37,7 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -37,7 +37,7 @@ public class LocalResult implements ResultInterface, ResultTarget {
private ValueHashMap<Value[]> distinctRows; private ValueHashMap<Value[]> distinctRows;
private Value[] currentRow; private Value[] currentRow;
private int offset, limit; private int offset, limit;
private ResultExternal disk; private ResultExternal external;
private int diskOffset; private int diskOffset;
private boolean distinct; private boolean distinct;
private boolean closed; private boolean closed;
...@@ -105,9 +105,16 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -105,9 +105,16 @@ public class LocalResult implements ResultInterface, ResultTarget {
* @return the copy * @return the copy
*/ */
public LocalResult createShallowCopy(Session targetSession) { public LocalResult createShallowCopy(Session targetSession) {
if (disk == null && (rows == null || rows.size() < rowCount)) { if (external == null && (rows == null || rows.size() < rowCount)) {
return null; return null;
} }
ResultExternal e2 = null;
if (external != null) {
e2 = external.createShallowCopy();
if (e2 == null) {
return null;
}
}
LocalResult copy = new LocalResult(); LocalResult copy = new LocalResult();
copy.maxMemoryRows = this.maxMemoryRows; copy.maxMemoryRows = this.maxMemoryRows;
copy.session = targetSession; copy.session = targetSession;
...@@ -122,7 +129,7 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -122,7 +129,7 @@ public class LocalResult implements ResultInterface, ResultTarget {
copy.currentRow = null; copy.currentRow = null;
copy.offset = 0; copy.offset = 0;
copy.limit = 0; copy.limit = 0;
copy.disk = this.disk; copy.external = e2;
copy.diskOffset = this.diskOffset; copy.diskOffset = this.diskOffset;
return copy; return copy;
} }
...@@ -158,7 +165,7 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -158,7 +165,7 @@ public class LocalResult implements ResultInterface, ResultTarget {
distinctRows.remove(array); distinctRows.remove(array);
rowCount = distinctRows.size(); rowCount = distinctRows.size();
} else { } else {
rowCount = disk.removeRow(values); rowCount = external.removeRow(values);
} }
} }
...@@ -176,16 +183,16 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -176,16 +183,16 @@ public class LocalResult implements ResultInterface, ResultTarget {
ValueArray array = ValueArray.get(values); ValueArray array = ValueArray.get(values);
return distinctRows.get(array) != null; return distinctRows.get(array) != null;
} }
return disk.contains(values); return external.contains(values);
} }
public void reset() { public void reset() {
rowId = -1; rowId = -1;
if (disk != null) { if (external != null) {
disk.reset(); external.reset();
if (diskOffset > 0) { if (diskOffset > 0) {
for (int i = 0; i < diskOffset; i++) { for (int i = 0; i < diskOffset; i++) {
disk.next(); external.next();
} }
} }
} }
...@@ -199,8 +206,8 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -199,8 +206,8 @@ public class LocalResult implements ResultInterface, ResultTarget {
if (rowId < rowCount) { if (rowId < rowCount) {
rowId++; rowId++;
if (rowId < rowCount) { if (rowId < rowCount) {
if (disk != null) { if (external != null) {
currentRow = disk.next(); currentRow = external.next();
} else { } else {
currentRow = rows.get(rowId); currentRow = rows.get(rowId);
} }
...@@ -228,27 +235,27 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -228,27 +235,27 @@ public class LocalResult implements ResultInterface, ResultTarget {
rowCount = distinctRows.size(); rowCount = distinctRows.size();
Database db = session.getDatabase(); Database db = session.getDatabase();
if (rowCount > db.getSettings().maxMemoryRowsDistinct && db.isPersistent()) { if (rowCount > db.getSettings().maxMemoryRowsDistinct && db.isPersistent()) {
disk = new ResultTempTable(session, sort); external = new ResultTempTable(session, sort);
disk.addRows(distinctRows.values()); external.addRows(distinctRows.values());
distinctRows = null; distinctRows = null;
} }
} else { } else {
rowCount = disk.addRow(values); rowCount = external.addRow(values);
} }
return; return;
} }
rows.add(values); rows.add(values);
rowCount++; rowCount++;
if (rows.size() > maxMemoryRows && session.getDatabase().isPersistent()) { if (rows.size() > maxMemoryRows && session.getDatabase().isPersistent()) {
if (disk == null) { if (external == null) {
disk = new ResultDiskBuffer(session, sort, values.length); external = new ResultDiskBuffer(session, sort, values.length);
} }
addRowsToDisk(); addRowsToDisk();
} }
} }
private void addRowsToDisk() { private void addRowsToDisk() {
disk.addRows(rows); external.addRows(rows);
rows.clear(); rows.clear();
} }
...@@ -265,10 +272,10 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -265,10 +272,10 @@ public class LocalResult implements ResultInterface, ResultTarget {
rows = distinctRows.values(); rows = distinctRows.values();
distinctRows = null; distinctRows = null;
} else { } else {
if (disk != null && sort != null) { if (external != null && sort != null) {
// external sort // external sort
ResultExternal temp = disk; ResultExternal temp = external;
disk = null; external = null;
temp.reset(); temp.reset();
rows = New.arrayList(); rows = New.arrayList();
// TODO use offset directly if possible // TODO use offset directly if possible
...@@ -277,12 +284,12 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -277,12 +284,12 @@ public class LocalResult implements ResultInterface, ResultTarget {
if (list == null) { if (list == null) {
break; break;
} }
if (disk == null) { if (external == null) {
disk = new ResultDiskBuffer(session, sort, list.length); external = new ResultDiskBuffer(session, sort, list.length);
} }
rows.add(list); rows.add(list);
if (rows.size() > maxMemoryRows) { if (rows.size() > maxMemoryRows) {
disk.addRows(rows); external.addRows(rows);
rows.clear(); rows.clear();
} }
} }
...@@ -291,9 +298,9 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -291,9 +298,9 @@ public class LocalResult implements ResultInterface, ResultTarget {
} }
} }
} }
if (disk != null) { if (external != null) {
addRowsToDisk(); addRowsToDisk();
disk.done(); external.done();
} else { } else {
if (sort != null) { if (sort != null) {
sort.sort(rows); sort.sort(rows);
...@@ -321,7 +328,7 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -321,7 +328,7 @@ public class LocalResult implements ResultInterface, ResultTarget {
if (limit <= 0) { if (limit <= 0) {
return; return;
} }
if (disk == null) { if (external == null) {
if (rows.size() > limit) { if (rows.size() > limit) {
rows = New.arrayList(rows.subList(0, limit)); rows = New.arrayList(rows.subList(0, limit));
rowCount = limit; rowCount = limit;
...@@ -334,13 +341,13 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -334,13 +341,13 @@ public class LocalResult implements ResultInterface, ResultTarget {
} }
public boolean needToClose() { public boolean needToClose() {
return disk != null; return external != null;
} }
public void close() { public void close() {
if (disk != null) { if (external != null) {
disk.close(); external.close();
disk = null; external = null;
closed = true; closed = true;
} }
} }
...@@ -398,7 +405,7 @@ public class LocalResult implements ResultInterface, ResultTarget { ...@@ -398,7 +405,7 @@ public class LocalResult implements ResultInterface, ResultTarget {
if (offset <= 0) { if (offset <= 0) {
return; return;
} }
if (disk == null) { if (external == null) {
if (offset >= rows.size()) { if (offset >= rows.size()) {
rows.clear(); rows.clear();
rowCount = 0; rowCount = 0;
......
...@@ -25,14 +25,19 @@ class ResultDiskBuffer implements ResultExternal { ...@@ -25,14 +25,19 @@ class ResultDiskBuffer implements ResultExternal {
private static final int READ_AHEAD = 128; private static final int READ_AHEAD = 128;
private Data rowBuff; private final Data rowBuff;
private FileStore file; private final ArrayList<ResultDiskTape> tapes;
private ArrayList<ResultDiskTape> tapes; private final ResultDiskTape mainTape;
private ResultDiskTape mainTape; private final SortOrder sort;
private SortOrder sort; private final int columnCount;
private int columnCount;
private final int maxBufferSize; private final int maxBufferSize;
private FileStore file;
private final ResultDiskBuffer parent;
private boolean closed;
private int childCount;
/** /**
* Represents a virtual disk tape for the merge sort algorithm. * Represents a virtual disk tape for the merge sort algorithm.
* Each virtual disk tape is a region of the temp file. * Each virtual disk tape is a region of the temp file.
...@@ -61,23 +66,62 @@ class ResultDiskBuffer implements ResultExternal { ...@@ -61,23 +66,62 @@ class ResultDiskBuffer implements ResultExternal {
} }
ResultDiskBuffer(Session session, SortOrder sort, int columnCount) { ResultDiskBuffer(Session session, SortOrder sort, int columnCount) {
this.parent = null;
this.sort = sort; this.sort = sort;
this.columnCount = columnCount; this.columnCount = columnCount;
Database db = session.getDatabase(); Database db = session.getDatabase();
rowBuff = Data.create(db, Constants.DEFAULT_PAGE_SIZE); rowBuff = Data.create(null, Constants.DEFAULT_PAGE_SIZE);
String fileName = session.getDatabase().createTempFile(); String fileName = session.getDatabase().createTempFile();
file = session.getDatabase().openFile(fileName, "rw", false); file = session.getDatabase().openFile(fileName, "rw", false);
file.setCheckedWriting(false); file.setCheckedWriting(false);
file.seek(FileStore.HEADER_LENGTH); file.seek(FileStore.HEADER_LENGTH);
if (sort != null) { if (sort != null) {
tapes = New.arrayList(); tapes = New.arrayList();
mainTape = null;
} else { } else {
tapes = null;
mainTape = new ResultDiskTape(); mainTape = new ResultDiskTape();
mainTape.pos = FileStore.HEADER_LENGTH; mainTape.pos = FileStore.HEADER_LENGTH;
} }
this.maxBufferSize = db.getSettings().largeResultBufferSize; this.maxBufferSize = db.getSettings().largeResultBufferSize;
} }
private ResultDiskBuffer(ResultDiskBuffer parent) {
this.parent = parent;
rowBuff = Data.create(null, Constants.DEFAULT_PAGE_SIZE);
file = parent.file;
if (parent.tapes != null) {
tapes = New.arrayList();
for (ResultDiskTape t : parent.tapes) {
ResultDiskTape t2 = new ResultDiskTape();
t2.pos = t2.start = t.start;
t2.end = t.end;
tapes.add(t2);
}
} else {
tapes = null;
}
if (parent.mainTape != null) {
mainTape = new ResultDiskTape();
mainTape.pos = FileStore.HEADER_LENGTH;
mainTape.start = parent.mainTape.start;
mainTape.end = parent.mainTape.end;
} else {
mainTape = null;
}
sort = parent.sort;
columnCount = parent.columnCount;
maxBufferSize = parent.maxBufferSize;
}
public synchronized ResultDiskBuffer createShallowCopy() {
if (closed || parent != null) {
return null;
}
childCount++;
return new ResultDiskBuffer(this);
}
public void addRows(ArrayList<Value[]> rows) { public void addRows(ArrayList<Value[]> rows) {
if (sort != null) { if (sort != null) {
sort.sort(rows); sort.sort(rows);
...@@ -212,12 +256,27 @@ class ResultDiskBuffer implements ResultExternal { ...@@ -212,12 +256,27 @@ class ResultDiskBuffer implements ResultExternal {
close(); close();
} }
public void close() { private synchronized void closeChild() {
if (file != null) { if (--childCount == 0 && closed) {
file.closeAndDeleteSilently();
file = null;
}
}
public synchronized void close() {
if (closed) {
return;
}
closed = true;
if (parent != null) {
parent.closeChild();
} else if (file != null) {
if (childCount == 0) {
file.closeAndDeleteSilently(); file.closeAndDeleteSilently();
file = null; file = null;
} }
} }
}
public int removeRow(Value[] values) { public int removeRow(Value[] values) {
throw DbException.throwInternalError(); throw DbException.throwInternalError();
......
...@@ -69,4 +69,11 @@ public interface ResultExternal { ...@@ -69,4 +69,11 @@ public interface ResultExternal {
*/ */
int addRow(Value[] values); int addRow(Value[] values);
/**
* Create a shallow copy of this object if possible.
*
* @return the shallow copy, or null
*/
ResultExternal createShallowCopy();
} }
...@@ -28,13 +28,17 @@ import org.h2.value.ValueArray; ...@@ -28,13 +28,17 @@ import org.h2.value.ValueArray;
public class ResultTempTable implements ResultExternal { public class ResultTempTable implements ResultExternal {
private static final String COLUMN_NAME = "DATA"; private static final String COLUMN_NAME = "DATA";
private Session session; private final SortOrder sort;
private final Session session;
private final Index index;
private RegularTable table; private RegularTable table;
private SortOrder sort;
private Index index;
private Cursor resultCursor; private Cursor resultCursor;
public ResultTempTable(Session session, SortOrder sort) { private final ResultTempTable parent;
private boolean closed;
private int childCount;
ResultTempTable(Session session, SortOrder sort) {
this.session = session; this.session = session;
this.sort = sort; this.sort = sort;
Schema schema = session.getDatabase().getSchema(Constants.SCHEMA_MAIN); Schema schema = session.getDatabase().getSchema(Constants.SCHEMA_MAIN);
...@@ -60,6 +64,25 @@ public class ResultTempTable implements ResultExternal { ...@@ -60,6 +64,25 @@ public class ResultTempTable implements ResultExternal {
index = new PageBtreeIndex(table, indexId, data.tableName, indexCols, indexType, true, session); index = new PageBtreeIndex(table, indexId, data.tableName, indexCols, indexType, true, session);
index.setTemporary(true); index.setTemporary(true);
table.getIndexes().add(index); table.getIndexes().add(index);
parent = null;
}
private ResultTempTable(ResultTempTable parent) {
this.parent = parent;
this.session = parent.session;
this.table = parent.table;
this.index = parent.index;
// sort is only used when adding rows
this.sort = null;
reset();
}
public synchronized ResultExternal createShallowCopy() {
if (closed || parent != null) {
return null;
}
childCount++;
return new ResultTempTable(this);
} }
public int removeRow(Value[] values) { public int removeRow(Value[] values) {
...@@ -94,7 +117,27 @@ public class ResultTempTable implements ResultExternal { ...@@ -94,7 +117,27 @@ public class ResultTempTable implements ResultExternal {
} }
} }
public void close() { private synchronized void closeChild() {
if (--childCount == 0 && closed) {
dropTable();
}
}
public synchronized void close() {
if (closed) {
return;
}
closed = true;
if (parent != null) {
parent.closeChild();
} else {
if (childCount == 0) {
dropTable();
}
}
}
private void dropTable() {
if (table == null) { if (table == null) {
return; return;
} }
......
...@@ -30,6 +30,8 @@ public class TestTempTables extends TestBase { ...@@ -30,6 +30,8 @@ public class TestTempTables extends TestBase {
public void test() throws SQLException { public void test() throws SQLException {
deleteDb("tempTables"); deleteDb("tempTables");
testTempFileResultSet();
testTempTableResultSet();
testTransactionalTemp(); testTransactionalTemp();
testDeleteGlobalTempTableWhenClosing(); testDeleteGlobalTempTableWhenClosing();
Connection c1 = getConnection("tempTables"); Connection c1 = getConnection("tempTables");
...@@ -43,6 +45,93 @@ public class TestTempTables extends TestBase { ...@@ -43,6 +45,93 @@ public class TestTempTables extends TestBase {
deleteDb("tempTables"); deleteDb("tempTables");
} }
private void testTempFileResultSet() throws SQLException {
deleteDb("tempTables");
Connection conn = getConnection("tempTables;MAX_MEMORY_ROWS=10");
ResultSet rs1, rs2;
Statement stat1 = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
Statement stat2 = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
rs1 = stat1.executeQuery("select * from system_range(1, 20)");
rs2 = stat2.executeQuery("select * from system_range(1, 20)");
for (int i = 0; i < 20; i++) {
rs1.next();
rs2.next();
rs1.getInt(1);
rs2.getInt(1);
}
rs2.close();
// verify the temp table is not deleted yet
rs1.beforeFirst();
for (int i = 0; i < 20; i++) {
rs1.next();
rs1.getInt(1);
}
rs1.close();
rs1 = stat1.executeQuery("select * from system_range(1, 20) order by x desc");
rs2 = stat2.executeQuery("select * from system_range(1, 20) order by x desc");
for (int i = 0; i < 20; i++) {
rs1.next();
rs2.next();
rs1.getInt(1);
rs2.getInt(1);
}
rs1.close();
// verify the temp table is not deleted yet
rs2.beforeFirst();
for (int i = 0; i < 20; i++) {
rs2.next();
rs2.getInt(1);
}
rs2.close();
conn.close();
}
private void testTempTableResultSet() throws SQLException {
deleteDb("tempTables");
Connection conn = getConnection("tempTables;MAX_MEMORY_ROWS_DISTINCT=10");
Statement stat1 = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
Statement stat2 = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
ResultSet rs1, rs2;
rs1 = stat1.executeQuery("select distinct * from system_range(1, 20)");
// this will re-use the same temp table
rs2 = stat2.executeQuery("select distinct * from system_range(1, 20)");
for (int i = 0; i < 20; i++) {
rs1.next();
rs2.next();
rs1.getInt(1);
rs2.getInt(1);
}
rs2.close();
// verify the temp table is not deleted yet
rs1.beforeFirst();
for (int i = 0; i < 20; i++) {
rs1.next();
rs1.getInt(1);
}
rs1.close();
rs1 = stat1.executeQuery("select distinct * from system_range(1, 20)");
rs2 = stat2.executeQuery("select distinct * from system_range(1, 20)");
for (int i = 0; i < 20; i++) {
rs1.next();
rs2.next();
rs1.getInt(1);
rs2.getInt(1);
}
rs1.close();
// verify the temp table is not deleted yet
rs2.beforeFirst();
for (int i = 0; i < 20; i++) {
rs2.next();
rs2.getInt(1);
}
rs2.close();
conn.close();
}
private void testTransactionalTemp() throws SQLException { private void testTransactionalTemp() throws SQLException {
deleteDb("tempTables"); deleteDb("tempTables");
Connection conn = getConnection("tempTables"); Connection conn = getConnection("tempTables");
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论