Unverified 提交 7a60c09e authored 作者: Evgenij Ryazanov's avatar Evgenij Ryazanov 提交者: GitHub

Merge pull request #1692 from katzyn/external

Use MVTempResult unconditionally
...@@ -62,14 +62,14 @@ class MVPlainTempResult extends MVTempResult { ...@@ -62,14 +62,14 @@ class MVPlainTempResult extends MVTempResult {
super(database, expressions.length, visibleColumnCount); super(database, expressions.length, visibleColumnCount);
ValueDataType valueType = new ValueDataType(database, new int[columnCount]); ValueDataType valueType = new ValueDataType(database, new int[columnCount]);
Builder<Long, ValueRow> builder = new MVMap.Builder<Long, ValueRow>() Builder<Long, ValueRow> builder = new MVMap.Builder<Long, ValueRow>()
.valueType(valueType).singleWriter(); .valueType(valueType);
map = store.openMap("tmp", builder); map = store.openMap("tmp", builder);
} }
@Override @Override
public int addRow(Value[] values) { public int addRow(Value[] values) {
assert parent == null; assert parent == null;
map.append(counter++, ValueRow.get(values)); map.put(counter++, ValueRow.get(values));
return ++rowCount; return ++rowCount;
} }
......
...@@ -301,12 +301,8 @@ public class LocalResultImpl implements LocalResult { ...@@ -301,12 +301,8 @@ public class LocalResultImpl implements LocalResult {
} }
private void createExternalResult() { private void createExternalResult() {
Database database = session.getDatabase(); external = MVTempResult.of(session.getDatabase(), expressions, distinct, distinctIndexes, visibleColumnCount,
external = database.isMVStore() sort);
|| /* not supported by ResultTempTable */ distinct && expressions.length != visibleColumnCount
|| distinctIndexes != null
? MVTempResult.of(database, expressions, distinct, distinctIndexes, visibleColumnCount, sort)
: new ResultTempTable(session, expressions, distinct, sort);
} }
/** /**
......
/*
* Copyright 2004-2019 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.result;
import java.lang.ref.Reference;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collection;
import org.h2.command.ddl.CreateTableData;
import org.h2.engine.Constants;
import org.h2.engine.Database;
import org.h2.engine.Session;
import org.h2.expression.Expression;
import org.h2.index.Cursor;
import org.h2.index.Index;
import org.h2.index.IndexType;
import org.h2.schema.Schema;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.Table;
import org.h2.util.TempFileDeleter;
import org.h2.value.DataType;
import org.h2.value.TypeInfo;
import org.h2.value.Value;
import org.h2.value.ValueNull;
/**
* This class implements the temp table buffer for the LocalResult class.
*/
public class ResultTempTable implements ResultExternal {
private static final class CloseImpl implements AutoCloseable {
private final Session session;
private final Table table;
Index index;
CloseImpl(Session session, Table table) {
this.session = session;
this.table = table;
}
@Override
public void close() throws Exception {
Database database = session.getDatabase();
// Need to lock because not all of the code-paths
// that reach here have already taken this lock,
// notably via the close() paths.
synchronized (session) {
synchronized (database) {
table.truncate(session);
}
}
// This session may not lock the sys table (except if it already has
// locked it) because it must be committed immediately, otherwise
// other threads can not access the sys table. If the table is not
// removed now, it will be when the database is opened the next
// time. (the table is truncated, so this is just one record)
if (!database.isSysTableLocked()) {
Session sysSession = database.getSystemSession();
table.removeChildrenAndResources(sysSession);
if (index != null) {
// need to explicitly do this,
// as it's not registered in the system session
session.removeLocalTempTableIndex(index);
}
// the transaction must be committed immediately
// TODO this synchronization cascade is very ugly
synchronized (session) {
synchronized (sysSession) {
synchronized (database) {
sysSession.commit(false);
}
}
}
}
}
}
private static final String COLUMN_NAME = "DATA";
private final boolean distinct;
private final SortOrder sort;
private Index index;
private final Session session;
private Table table;
private Cursor resultCursor;
private int rowCount;
private final int columnCount;
private final ResultTempTable parent;
private boolean closed;
private int childCount;
/**
* Temporary file deleter.
*/
private final TempFileDeleter tempFileDeleter;
/**
* Closeable to close the storage.
*/
private final CloseImpl closeable;
/**
* Reference to the record in the temporary file deleter.
*/
private final Reference<?> fileRef;
ResultTempTable(Session session, Expression[] expressions, boolean distinct, SortOrder sort) {
this.session = session;
this.distinct = distinct;
this.sort = sort;
this.columnCount = expressions.length;
Schema schema = session.getDatabase().getSchema(Constants.SCHEMA_MAIN);
CreateTableData data = new CreateTableData();
boolean containsLob = false;
for (int i = 0; i < expressions.length; i++) {
TypeInfo type = expressions[i].getType();
Column col = new Column(COLUMN_NAME + i, type);
if (DataType.isLargeObject(type.getValueType())) {
containsLob = true;
}
data.columns.add(col);
}
data.id = session.getDatabase().allocateObjectId();
data.tableName = "TEMP_RESULT_SET_" + data.id;
data.temporary = true;
data.persistIndexes = false;
data.persistData = true;
data.create = true;
data.session = session;
table = schema.createTable(data);
parent = null;
if (containsLob) {
// contains BLOB or CLOB: cannot truncate on close,
// otherwise the BLOB and CLOB entries are removed
tempFileDeleter = null;
closeable = null;
fileRef = null;
} else {
tempFileDeleter = session.getDatabase().getTempFileDeleter();
closeable = new CloseImpl(session, table);
fileRef = tempFileDeleter.addFile(closeable, this);
}
if (sort != null || distinct) {
IndexColumn[] indexCols;
if (sort != null) {
int[] colIndex = sort.getQueryColumnIndexes();
int len = colIndex.length;
if (distinct) {
BitSet used = new BitSet();
indexCols = new IndexColumn[columnCount];
for (int i = 0; i < len; i++) {
int idx = colIndex[i];
used.set(idx);
IndexColumn indexColumn = createIndexColumn(idx);
indexColumn.sortType = sort.getSortTypes()[i];
indexCols[i] = indexColumn;
}
int idx = 0;
for (int i = len; i < columnCount; i++) {
idx = used.nextClearBit(idx);
indexCols[i] = createIndexColumn(idx);
idx++;
}
} else {
indexCols = new IndexColumn[len];
for (int i = 0; i < len; i++) {
IndexColumn indexColumn = createIndexColumn(colIndex[i]);
indexColumn.sortType = sort.getSortTypes()[i];
indexCols[i] = indexColumn;
}
}
} else {
indexCols = new IndexColumn[columnCount];
for (int i = 0; i < columnCount; i++) {
indexCols[i] = createIndexColumn(i);
}
}
String indexName = table.getSchema().getUniqueIndexName(session, table, Constants.PREFIX_INDEX);
int indexId = session.getDatabase().allocateObjectId();
IndexType indexType = IndexType.createNonUnique(true);
index = table.addIndex(session, indexName, indexId, indexCols, indexType, true, null);
if (closeable != null) {
closeable.index = index;
}
}
}
private ResultTempTable(ResultTempTable parent) {
this.parent = parent;
this.columnCount = parent.columnCount;
this.distinct = parent.distinct;
this.session = parent.session;
this.table = parent.table;
this.rowCount = parent.rowCount;
this.sort = parent.sort;
this.tempFileDeleter = null;
this.closeable = null;
this.fileRef = null;
}
private Index getIndex() {
if (parent != null) {
return parent.getIndex();
}
return index;
}
private IndexColumn createIndexColumn(int index) {
IndexColumn indexColumn = new IndexColumn();
indexColumn.column = table.getColumn(index);
indexColumn.columnName = COLUMN_NAME + index;
return indexColumn;
}
@Override
public synchronized ResultExternal createShallowCopy() {
if (parent != null) {
return parent.createShallowCopy();
}
if (closed) {
return null;
}
childCount++;
return new ResultTempTable(this);
}
@Override
public int removeRow(Value[] values) {
Row row = convertToRow(values);
Cursor cursor = find(row);
if (cursor != null) {
row = cursor.get();
table.removeRow(session, row);
rowCount--;
}
return rowCount;
}
@Override
public boolean contains(Value[] values) {
return find(convertToRow(values)) != null;
}
@Override
public int addRow(Value[] values) {
Row row = convertToRow(values);
if (distinct) {
Cursor cursor = find(row);
if (cursor == null) {
table.addRow(session, row);
rowCount++;
}
} else {
table.addRow(session, row);
rowCount++;
}
return rowCount;
}
@Override
public int addRows(Collection<Value[]> rows) {
for (Value[] values : rows) {
addRow(values);
}
return rowCount;
}
private synchronized void closeChild() {
if (--childCount == 0 && closed) {
delete();
}
}
@Override
public synchronized void close() {
if (closed) {
return;
}
closed = true;
if (parent != null) {
parent.closeChild();
} else {
if (childCount == 0) {
delete();
}
}
}
private void delete() {
if (tempFileDeleter != null) {
tempFileDeleter.deleteFile(fileRef, closeable);
}
}
@Override
public Value[] next() {
if (resultCursor == null) {
Index idx;
if (distinct || sort != null) {
idx = getIndex();
} else {
idx = table.getScanIndex(session);
}
resultCursor = idx.find(session, null, null);
}
if (!resultCursor.next()) {
return null;
}
Row row = resultCursor.get();
return row.getValueList();
}
@Override
public void reset() {
resultCursor = null;
}
private Row convertToRow(Value[] values) {
if (values.length < columnCount) {
Value[] v2 = Arrays.copyOf(values, columnCount);
for (int i = values.length; i < columnCount; i++) {
v2[i] = ValueNull.INSTANCE;
}
values = v2;
}
return session.createRow(values, Row.MEMORY_CALCULATE);
}
private Cursor find(Row row) {
Index index = getIndex();
Cursor cursor = index.find(session, row, row);
while (cursor.next()) {
SearchRow found = cursor.getSearchRow();
boolean ok = true;
Database db = session.getDatabase();
for (int i = 0; i < row.getColumnCount(); i++) {
if (!db.areEqual(row.getValue(i), found.getValue(i))) {
ok = false;
break;
}
}
if (ok) {
return cursor;
}
}
return null;
}
}
...@@ -200,6 +200,7 @@ import org.h2.test.unit.TestJmx; ...@@ -200,6 +200,7 @@ import org.h2.test.unit.TestJmx;
import org.h2.test.unit.TestKeywords; import org.h2.test.unit.TestKeywords;
import org.h2.test.unit.TestLocalResultFactory; import org.h2.test.unit.TestLocalResultFactory;
import org.h2.test.unit.TestLocale; import org.h2.test.unit.TestLocale;
import org.h2.test.unit.TestMVTempResult;
import org.h2.test.unit.TestMathUtils; import org.h2.test.unit.TestMathUtils;
import org.h2.test.unit.TestMemoryUnmapper; import org.h2.test.unit.TestMemoryUnmapper;
import org.h2.test.unit.TestMode; import org.h2.test.unit.TestMode;
...@@ -960,6 +961,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1` ...@@ -960,6 +961,7 @@ kill -9 `jps -l | grep "org.h2.test." | cut -d " " -f 1`
addTest(new TestSpinLock()); addTest(new TestSpinLock());
addTest(new TestStreamStore()); addTest(new TestStreamStore());
addTest(new TestTransactionStore()); addTest(new TestTransactionStore());
addTest(new TestMVTempResult());
// unit // unit
addTest(new TestAnsCompression()); addTest(new TestAnsCompression());
......
/*
* Copyright 2004-2019 H2 Group. Multiple-Licensed under the MPL 2.0,
* and the EPL 1.0 (http://h2database.com/html/license.html).
* Initial Developer: H2 Group
*/
package org.h2.test.unit;
import java.lang.ProcessBuilder.Redirect;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import org.h2.test.TestBase;
import org.h2.tools.DeleteDbFiles;
/**
* Tests that MVTempResult implementations do not produce OOME.
*/
public class TestMVTempResult extends TestBase {
private static final int MEMORY = 128;
private static final int ROWS = 1_000_000;
/**
* May be used to run only this test and may be launched by this test in a
* subprocess.
*
* @param a
* if empty run this test, if not empty run the subprocess
*/
public static void main(String... a) throws Exception {
TestMVTempResult test = (TestMVTempResult) TestBase.createCaller().init();
if (a.length == 0) {
test.test();
} else {
test.runTest();
}
}
@Override
public void test() throws Exception {
ProcessBuilder pb = new ProcessBuilder().redirectError(Redirect.INHERIT);
pb.command(getJVM(), "-Xmx" + MEMORY + "M", "-cp", getClassPath(), "-ea", getClass().getName(), "dummy");
assertEquals(0, pb.start().waitFor());
}
private void runTest() throws SQLException {
String dir = getBaseDir();
String name = "testResultExternal";
DeleteDbFiles.execute(dir, name, true);
try (Connection c = DriverManager.getConnection("jdbc:h2:" + dir + '/' + name)) {
Statement s = c.createStatement();
try (ResultSet rs = s.executeQuery("SELECT X, RAND() R FROM SYSTEM_RANGE(1, " + ROWS + ") ORDER BY R")) {
for (int i = 1; i <= ROWS; i++) {
assertTrue(rs.next());
}
}
try (ResultSet rs = s.executeQuery("SELECT X, RAND() FROM SYSTEM_RANGE(1, " + ROWS + ')')) {
for (int i = 1; i <= ROWS; i++) {
assertTrue(rs.next());
assertEquals(i, rs.getLong(1));
}
}
}
DeleteDbFiles.execute(dir, name, true);
}
}
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论