提交 36c4613d authored 作者: tledkov-gridgain's avatar tledkov-gridgain

Merge branch 'master' into localresult-interface

...@@ -21,6 +21,16 @@ Change Log ...@@ -21,6 +21,16 @@ Change Log
<h2>Next Version (unreleased)</h2> <h2>Next Version (unreleased)</h2>
<ul> <ul>
<li>Issue #1421: Remove old-style outer join
</li>
<li>PR #1419: Assorted minor changes
</li>
<li>PR #1414: DEFRAG and COMPACT mixup
</li>
<li>PR #1413: improvements to MVStore garbage collection
</li>
<li>PR #1412: Added org.h2.store.fs package to exported osgi bundles
</li>
<li>PR #1409: Map all remaining error codes to custom exception classes <li>PR #1409: Map all remaining error codes to custom exception classes
</li> </li>
<li>Issue #1407: Add a MODE() aggregate function <li>Issue #1407: Add a MODE() aggregate function
......
...@@ -141,7 +141,6 @@ import org.h2.engine.Mode.ModeEnum; ...@@ -141,7 +141,6 @@ import org.h2.engine.Mode.ModeEnum;
import org.h2.engine.Procedure; import org.h2.engine.Procedure;
import org.h2.engine.Right; import org.h2.engine.Right;
import org.h2.engine.Session; import org.h2.engine.Session;
import org.h2.engine.SysProperties;
import org.h2.engine.User; import org.h2.engine.User;
import org.h2.engine.UserAggregate; import org.h2.engine.UserAggregate;
import org.h2.engine.UserDataType; import org.h2.engine.UserDataType;
...@@ -2809,41 +2808,7 @@ public class Parser { ...@@ -2809,41 +2808,7 @@ public class Parser {
} }
read(CLOSE_PAREN); read(CLOSE_PAREN);
} else { } else {
Expression right = readConcat(); r = new Comparison(session, compareType, r, readConcat());
if (SysProperties.OLD_STYLE_OUTER_JOIN &&
readIf(OPEN_PAREN) && readIf(PLUS_SIGN) && readIf(CLOSE_PAREN)) {
// support for a subset of old-fashioned Oracle outer
// join with (+)
if (r instanceof ExpressionColumn &&
right instanceof ExpressionColumn) {
ExpressionColumn leftCol = (ExpressionColumn) r;
ExpressionColumn rightCol = (ExpressionColumn) right;
ArrayList<TableFilter> filters = currentSelect
.getTopFilters();
for (TableFilter f : filters) {
while (f != null) {
leftCol.mapColumns(f, 0);
rightCol.mapColumns(f, 0);
f = f.getJoin();
}
}
TableFilter leftFilter = leftCol.getTableFilter();
TableFilter rightFilter = rightCol.getTableFilter();
r = new Comparison(session, compareType, r, right);
if (leftFilter != null && rightFilter != null) {
int idx = filters.indexOf(rightFilter);
if (idx >= 0) {
filters.remove(idx);
leftFilter.addJoin(rightFilter, true, r);
} else {
rightFilter.mapAndAddFilter(r);
}
r = ValueExpression.get(ValueBoolean.TRUE);
}
}
} else {
r = new Comparison(session, compareType, r, right);
}
} }
} }
if (not) { if (not) {
...@@ -3313,12 +3278,7 @@ public class Parser { ...@@ -3313,12 +3278,7 @@ public class Parser {
} }
String name = readColumnIdentifier(); String name = readColumnIdentifier();
Schema s = database.findSchema(objectName); Schema s = database.findSchema(objectName);
if ((!SysProperties.OLD_STYLE_OUTER_JOIN || s != null) && readIf(OPEN_PAREN)) { if (readIf(OPEN_PAREN)) {
// only if the token before the dot is a valid schema name,
// otherwise the old style Oracle outer join doesn't work:
// t.x = t2.x(+)
// this additional check is not required
// if the old style outer joins are not supported
return readFunction(s, name); return readFunction(s, name);
} else if (readIf(DOT)) { } else if (readIf(DOT)) {
String schema = objectName; String schema = objectName;
......
...@@ -126,7 +126,7 @@ public final class GeneratedKeys { ...@@ -126,7 +126,7 @@ public final class GeneratedKeys {
* @return local result with generated keys * @return local result with generated keys
*/ */
public LocalResult getKeys(Session session) { public LocalResult getKeys(Session session) {
Database db = session == null ? null : session.getDatabase(); Database db = session.getDatabase();
if (Boolean.FALSE.equals(generatedKeysRequest)) { if (Boolean.FALSE.equals(generatedKeysRequest)) {
clear(null); clear(null);
return LocalResultFactory.createRow(session); return LocalResultFactory.createRow(session);
......
...@@ -326,14 +326,6 @@ public class SysProperties { ...@@ -326,14 +326,6 @@ public class SysProperties {
} }
} }
/**
* System property <code>h2.oldStyleOuterJoin</code>
* (default: false).<br />
* Limited support for the old-style Oracle outer join with "(+)".
*/
public static final boolean OLD_STYLE_OUTER_JOIN =
Utils.getProperty("h2.oldStyleOuterJoin", false);
/** /**
* System property {@code h2.oldResultSetGetObject}, {@code true} by default * System property {@code h2.oldResultSetGetObject}, {@code true} by default
* unless {@code h2.preview} is enabled. * unless {@code h2.preview} is enabled.
......
...@@ -5,15 +5,22 @@ ...@@ -5,15 +5,22 @@
*/ */
package org.h2.mvstore; package org.h2.mvstore;
import static org.h2.engine.Constants.MEMORY_ARRAY;
import static org.h2.engine.Constants.MEMORY_OBJECT;
import static org.h2.engine.Constants.MEMORY_POINTER;
import static org.h2.mvstore.DataUtils.PAGE_TYPE_LEAF;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import org.h2.compress.Compressor; import org.h2.compress.Compressor;
import org.h2.message.DbException;
import org.h2.mvstore.type.DataType; import org.h2.mvstore.type.DataType;
import org.h2.util.Utils; import org.h2.util.Utils;
import static org.h2.engine.Constants.MEMORY_ARRAY;
import static org.h2.engine.Constants.MEMORY_OBJECT;
import static org.h2.engine.Constants.MEMORY_POINTER;
import static org.h2.mvstore.DataUtils.PAGE_TYPE_LEAF;
/** /**
* A page (a node or a leaf). * A page (a node or a leaf).
...@@ -247,9 +254,9 @@ public abstract class Page implements Cloneable ...@@ -247,9 +254,9 @@ public abstract class Page implements Cloneable
* @param maxPos the maximum position (the end of the chunk) * @param maxPos the maximum position (the end of the chunk)
* @param collector to report child pages positions to * @param collector to report child pages positions to
*/ */
static void readChildrenPositions(FileStore fileStore, long pos, static void readChildrenPositions(FileStore fileStore, long pos, long filePos, long maxPos,
long filePos, long maxPos, final MVStore.ChunkIdsCollector collector, final ThreadPoolExecutor executorService,
MVStore.ChunkIdsCollector collector) { final AtomicInteger executingThreadCounter) {
ByteBuffer buff; ByteBuffer buff;
int maxLength = DataUtils.getPageMaxLength(pos); int maxLength = DataUtils.getPageMaxLength(pos);
if (maxLength == DataUtils.PAGE_LARGE) { if (maxLength == DataUtils.PAGE_LARGE) {
...@@ -260,10 +267,8 @@ public abstract class Page implements Cloneable ...@@ -260,10 +267,8 @@ public abstract class Page implements Cloneable
maxLength = (int) Math.min(maxPos - filePos, maxLength); maxLength = (int) Math.min(maxPos - filePos, maxLength);
int length = maxLength; int length = maxLength;
if (length < 0) { if (length < 0) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT, "Illegal page length {0} reading at {1}; max pos {2} ", length, filePos, maxPos);
"Illegal page length {0} reading at {1}; max pos {2} ",
length, filePos, maxPos);
} }
buff = fileStore.readFully(filePos, length); buff = fileStore.readFully(filePos, length);
int chunkId = DataUtils.getPageChunkId(pos); int chunkId = DataUtils.getPageChunkId(pos);
...@@ -271,39 +276,70 @@ public abstract class Page implements Cloneable ...@@ -271,39 +276,70 @@ public abstract class Page implements Cloneable
int start = buff.position(); int start = buff.position();
int pageLength = buff.getInt(); int pageLength = buff.getInt();
if (pageLength > maxLength) { if (pageLength > maxLength) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT, "File corrupted in chunk {0}, expected page length =< {1}, got {2}", chunkId, maxLength,
"File corrupted in chunk {0}, expected page length =< {1}, got {2}", pageLength);
chunkId, maxLength, pageLength);
} }
buff.limit(start + pageLength); buff.limit(start + pageLength);
short check = buff.getShort(); short check = buff.getShort();
int m = DataUtils.readVarInt(buff); int m = DataUtils.readVarInt(buff);
int mapId = collector.getMapId(); int mapId = collector.getMapId();
if (m != mapId) { if (m != mapId) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT, "File corrupted in chunk {0}, expected map id {1}, got {2}", chunkId, mapId, m);
"File corrupted in chunk {0}, expected map id {1}, got {2}",
chunkId, mapId, m);
} }
int checkTest = DataUtils.getCheckValue(chunkId) int checkTest = DataUtils.getCheckValue(chunkId) ^ DataUtils.getCheckValue(offset)
^ DataUtils.getCheckValue(offset)
^ DataUtils.getCheckValue(pageLength); ^ DataUtils.getCheckValue(pageLength);
if (check != (short) checkTest) { if (check != (short) checkTest) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT, "File corrupted in chunk {0}, expected check value {1}, got {2}", chunkId, checkTest, check);
"File corrupted in chunk {0}, expected check value {1}, got {2}",
chunkId, checkTest, check);
} }
int len = DataUtils.readVarInt(buff); int len = DataUtils.readVarInt(buff);
int type = buff.get(); int type = buff.get();
if ((type & 1) != DataUtils.PAGE_TYPE_NODE) { if ((type & 1) != DataUtils.PAGE_TYPE_NODE) {
throw DataUtils.newIllegalStateException( throw DataUtils.newIllegalStateException(DataUtils.ERROR_FILE_CORRUPT,
DataUtils.ERROR_FILE_CORRUPT,
"Position {0} expected to be a non-leaf", pos); "Position {0} expected to be a non-leaf", pos);
} }
/**
* The logic here is a little awkward. We want to (a) execute reads in parallel, but (b)
* limit the number of threads we create. This is complicated by (a) the algorithm is
* recursive and needs to wait for children before returning up the call-stack, (b) checking
* the size of the thread-pool is not reliable.
*/
final List<Future<?>> futures = new ArrayList<>(len);
for (int i = 0; i <= len; i++) { for (int i = 0; i <= len; i++) {
collector.visit(buff.getLong()); final long childPagePos = buff.getLong();
for (;;) {
int counter = executingThreadCounter.get();
if (counter >= executorService.getMaximumPoolSize()) {
collector.visit(childPagePos, executorService, executingThreadCounter);
break;
} else {
if (executingThreadCounter.compareAndSet(counter, counter + 1)) {
Future<?> f = executorService.submit(new Runnable() {
@Override
public void run() {
try {
collector.visit(childPagePos, executorService, executingThreadCounter);
} finally {
executingThreadCounter.decrementAndGet();
}
}
});
futures.add(f);
break;
}
}
}
}
for (Future<?> f : futures) {
try {
f.get();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
} catch (ExecutionException ex) {
throw DbException.convert(ex);
}
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论