提交 3e764ba6 authored 作者: Andrei Tokar's avatar Andrei Tokar

Postpone session.endStatement() until after commit.

Exception handling: preserve original exception.
Minor re-factoring
上级 2867ae7b
...@@ -151,20 +151,15 @@ public abstract class Command implements CommandInterface { ...@@ -151,20 +151,15 @@ public abstract class Command implements CommandInterface {
@Override @Override
public void stop() { public void stop() {
session.endStatement();
session.setCurrentCommand(null, false); session.setCurrentCommand(null, false);
if (!isTransactional()) { if (!isTransactional()) {
session.commit(true); session.commit(true);
} else if (session.getAutoCommit()) { } else if (session.getAutoCommit()) {
session.commit(false); session.commit(false);
} else if (session.getDatabase().isMultiThreaded()) { } else {
Database db = session.getDatabase(); session.unlockReadLocks();
if (db != null) {
if (db.getLockMode() == Constants.LOCK_MODE_READ_COMMITTED) {
session.unlockReadLocks();
}
}
} }
session.endStatement();
if (trace.isInfoEnabled() && startTimeNanos > 0) { if (trace.isInfoEnabled() && startTimeNanos > 0) {
long timeMillis = (System.nanoTime() - startTimeNanos) / 1000 / 1000; long timeMillis = (System.nanoTime() - startTimeNanos) / 1000 / 1000;
if (timeMillis > Constants.SLOW_QUERY_LIMIT_MS) { if (timeMillis > Constants.SLOW_QUERY_LIMIT_MS) {
...@@ -260,6 +255,7 @@ public abstract class Command implements CommandInterface { ...@@ -260,6 +255,7 @@ public abstract class Command implements CommandInterface {
Session.Savepoint rollback = session.setSavepoint(); Session.Savepoint rollback = session.setSavepoint();
session.startStatementWithinTransaction(); session.startStatementWithinTransaction();
session.setCurrentCommand(this, generatedKeysRequest); session.setCurrentCommand(this, generatedKeysRequest);
DbException ex = null;
try { try {
while (true) { while (true) {
database.checkPowerOff(); database.checkPowerOff();
...@@ -289,18 +285,29 @@ public abstract class Command implements CommandInterface { ...@@ -289,18 +285,29 @@ public abstract class Command implements CommandInterface {
database.shutdownImmediately(); database.shutdownImmediately();
throw e; throw e;
} }
database.checkPowerOff(); try {
if (s.getErrorCode() == ErrorCode.DEADLOCK_1) { database.checkPowerOff();
session.rollback(); if (s.getErrorCode() == ErrorCode.DEADLOCK_1) {
} else { session.rollback();
session.rollbackTo(rollback, false); } else {
session.rollbackTo(rollback, false);
}
} catch (Throwable nested) {
e.addSuppressed(nested);
} }
ex = e;
throw e; throw e;
} finally { } finally {
try { try {
if (callStop) { if (callStop) {
stop(); stop();
} }
} catch (Throwable nested) {
if (ex == null) {
throw nested;
} else {
ex.addSuppressed(nested);
}
} finally { } finally {
if (writing) { if (writing) {
database.afterWriting(); database.afterWriting();
......
...@@ -943,19 +943,14 @@ public class Session extends SessionWithState implements TransactionStore.Rollba ...@@ -943,19 +943,14 @@ public class Session extends SessionWithState implements TransactionStore.Rollba
* READ_COMMITTED. * READ_COMMITTED.
*/ */
public void unlockReadLocks() { public void unlockReadLocks() {
if (database.isMVStore()) { if (!database.isMVStore() && database.isMultiThreaded() &&
// MVCC: keep shared locks (insert / update / delete) database.getLockMode() == Constants.LOCK_MODE_READ_COMMITTED) {
return; for (Iterator<Table> iter = locks.iterator(); iter.hasNext(); ) {
} Table t = iter.next();
// locks is modified in the loop if (!t.isLockedExclusively()) {
for (int i = 0; i < locks.size(); i++) {
Table t = locks.get(i);
if (!t.isLockedExclusively()) {
synchronized (database) {
t.unlock(this); t.unlock(this);
locks.remove(i); iter.remove();
} }
i--;
} }
} }
} }
......
...@@ -33,6 +33,10 @@ public class DbException extends RuntimeException { ...@@ -33,6 +33,10 @@ public class DbException extends RuntimeException {
private static final Properties MESSAGES = new Properties(); private static final Properties MESSAGES = new Properties();
public static final SQLException SQL_OOME =
new SQLException("OutOfMemoryError", "HY000", ErrorCode.OUT_OF_MEMORY, new OutOfMemoryError());
private static final DbException OOME = new DbException(SQL_OOME);
private Object source; private Object source;
static { static {
...@@ -288,22 +292,32 @@ public class DbException extends RuntimeException { ...@@ -288,22 +292,32 @@ public class DbException extends RuntimeException {
* @return the exception object * @return the exception object
*/ */
public static DbException convert(Throwable e) { public static DbException convert(Throwable e) {
if (e instanceof DbException) { try {
return (DbException) e; if (e instanceof DbException) {
} else if (e instanceof SQLException) { return (DbException) e;
return new DbException((SQLException) e); } else if (e instanceof SQLException) {
} else if (e instanceof InvocationTargetException) { return new DbException((SQLException) e);
return convertInvocation((InvocationTargetException) e, null); } else if (e instanceof InvocationTargetException) {
} else if (e instanceof IOException) { return convertInvocation((InvocationTargetException) e, null);
return get(ErrorCode.IO_EXCEPTION_1, e, e.toString()); } else if (e instanceof IOException) {
} else if (e instanceof OutOfMemoryError) { return get(ErrorCode.IO_EXCEPTION_1, e, e.toString());
return get(ErrorCode.OUT_OF_MEMORY, e); } else if (e instanceof OutOfMemoryError) {
} else if (e instanceof StackOverflowError || e instanceof LinkageError) { return get(ErrorCode.OUT_OF_MEMORY, e);
} else if (e instanceof StackOverflowError || e instanceof LinkageError) {
return get(ErrorCode.GENERAL_ERROR_1, e, e.toString());
} else if (e instanceof Error) {
throw (Error) e;
}
return get(ErrorCode.GENERAL_ERROR_1, e, e.toString()); return get(ErrorCode.GENERAL_ERROR_1, e, e.toString());
} else if (e instanceof Error) { } catch (Throwable ex) {
throw (Error) e; try {
DbException dbException = new DbException(new SQLException("GeneralError", "HY000", ErrorCode.GENERAL_ERROR_1, e));
dbException.addSuppressed(ex);
return dbException;
} catch (OutOfMemoryError ignore) {
return OOME;
}
} }
return get(ErrorCode.GENERAL_ERROR_1, e, e.toString());
} }
/** /**
......
...@@ -10,6 +10,7 @@ import java.sql.SQLException; ...@@ -10,6 +10,7 @@ import java.sql.SQLException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicIntegerArray; import java.util.concurrent.atomic.AtomicIntegerArray;
import org.h2.api.ErrorCode;
import org.h2.util.StringUtils; import org.h2.util.StringUtils;
/** /**
...@@ -367,7 +368,11 @@ public class TraceObject { ...@@ -367,7 +368,11 @@ public class TraceObject {
} }
} catch(Throwable ignore) { } catch(Throwable ignore) {
if (e == null) { if (e == null) {
e = new SQLException("", "HY000", ex); try {
e = new SQLException("GeneralError", "HY000", ErrorCode.GENERAL_ERROR_1, ex);
} catch (OutOfMemoryError ignored) {
return DbException.SQL_OOME;
}
} }
e.addSuppressed(ignore); e.addSuppressed(ignore);
} }
......
...@@ -58,7 +58,7 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -58,7 +58,7 @@ public class MVPrimaryIndex extends BaseIndex {
Transaction t = mvTable.getTransactionBegin(); Transaction t = mvTable.getTransactionBegin();
dataMap = t.openMap(mapName, keyType, valueType); dataMap = t.openMap(mapName, keyType, valueType);
t.commit(); t.commit();
if (!table.isPersistData()) { if (!table.isPersistData() || !indexType.isPersistent()) {
dataMap.map.setVolatile(true); dataMap.map.setVolatile(true);
} }
Value k = dataMap.map.lastKey(); // include uncommitted keys as well Value k = dataMap.map.lastKey(); // include uncommitted keys as well
...@@ -113,7 +113,8 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -113,7 +113,8 @@ public class MVPrimaryIndex extends BaseIndex {
} }
TransactionMap<Value, Value> map = getMap(session); TransactionMap<Value, Value> map = getMap(session);
Value key = ValueLong.get(row.getKey()); long rowKey = row.getKey();
Value key = ValueLong.get(rowKey);
try { try {
Value oldValue = map.putIfAbsent(key, ValueArray.get(row.getValueList())); Value oldValue = map.putIfAbsent(key, ValueArray.get(row.getValueList()));
if (oldValue != null) { if (oldValue != null) {
...@@ -135,8 +136,9 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -135,8 +136,9 @@ public class MVPrimaryIndex extends BaseIndex {
} }
// because it's possible to directly update the key using the _rowid_ // because it's possible to directly update the key using the _rowid_
// syntax // syntax
if (row.getKey() > lastKey.get()) { long last;
lastKey.set(row.getKey()); while (rowKey > (last = lastKey.get())) {
if(lastKey.compareAndSet(last, rowKey)) break;
} }
} }
...@@ -223,33 +225,27 @@ public class MVPrimaryIndex extends BaseIndex { ...@@ -223,33 +225,27 @@ public class MVPrimaryIndex extends BaseIndex {
@Override @Override
public Cursor find(Session session, SearchRow first, SearchRow last) { public Cursor find(Session session, SearchRow first, SearchRow last) {
ValueLong min, max; ValueLong min = extractPKFromRow(first, ValueLong.MIN);
if (first == null) { ValueLong max = extractPKFromRow(last, ValueLong.MAX);
min = ValueLong.MIN; TransactionMap<Value, Value> map = getMap(session);
} else if (mainIndexColumn < 0) { return new MVStoreCursor(session, map.entryIterator(min, max));
min = ValueLong.get(first.getKey()); }
} else {
ValueLong v = (ValueLong) first.getValue(mainIndexColumn); private ValueLong extractPKFromRow(SearchRow row, ValueLong defaultValue) {
if (v == null) { ValueLong result;
min = ValueLong.get(first.getKey()); if (row == null) {
} else { result = defaultValue;
min = v; } else if (mainIndexColumn == SearchRow.ROWID_INDEX) {
} result = ValueLong.get(row.getKey());
}
if (last == null) {
max = ValueLong.MAX;
} else if (mainIndexColumn < 0) {
max = ValueLong.get(last.getKey());
} else { } else {
ValueLong v = (ValueLong) last.getValue(mainIndexColumn); ValueLong v = (ValueLong) row.getValue(mainIndexColumn);
if (v == null) { if (v == null) {
max = ValueLong.get(last.getKey()); result = ValueLong.get(row.getKey());
} else { } else {
max = v; result = v;
} }
} }
TransactionMap<Value, Value> map = getMap(session); return result;
return new MVStoreCursor(session, map.entryIterator(min, max));
} }
@Override @Override
......
...@@ -421,9 +421,8 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex { ...@@ -421,9 +421,8 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex {
} }
key = first ? map.higherKey(key) : map.lowerKey(key); key = first ? map.higherKey(key) : map.lowerKey(key);
} }
ArrayList<Value> list = new ArrayList<>(1); MVStoreCursor cursor = new MVStoreCursor(session,
list.add(key); Collections.singletonList(key).iterator(), null);
MVStoreCursor cursor = new MVStoreCursor(session, list.iterator(), null);
cursor.next(); cursor.next();
return cursor; return cursor;
} }
...@@ -544,7 +543,6 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex { ...@@ -544,7 +543,6 @@ public final class MVSecondaryIndex extends BaseIndex implements MVIndex {
public boolean previous() { public boolean previous() {
throw DbException.getUnsupportedException("previous"); throw DbException.getUnsupportedException("previous");
} }
} }
} }
...@@ -427,8 +427,9 @@ public class MVTable extends TableBase { ...@@ -427,8 +427,9 @@ public class MVTable extends TableBase {
@Override @Override
public void unlock(Session s) { public void unlock(Session s) {
if (database != null) { if (database != null) {
traceLock(s, lockExclusiveSession == s, TraceLockEvent.TRACE_LOCK_UNLOCK, NO_EXTRA_INFO); boolean wasLocked = lockExclusiveSession == s;
if (lockExclusiveSession == s) { traceLock(s, wasLocked, TraceLockEvent.TRACE_LOCK_UNLOCK, NO_EXTRA_INFO);
if (wasLocked) {
lockSharedSessions.remove(s); lockSharedSessions.remove(s);
lockExclusiveSession = null; lockExclusiveSession = null;
if (SysProperties.THREAD_DEADLOCK_DETECTOR) { if (SysProperties.THREAD_DEADLOCK_DETECTOR) {
...@@ -436,18 +437,18 @@ public class MVTable extends TableBase { ...@@ -436,18 +437,18 @@ public class MVTable extends TableBase {
EXCLUSIVE_LOCKS.get().remove(getName()); EXCLUSIVE_LOCKS.get().remove(getName());
} }
} }
} } else {
synchronized (getLockSyncObject()) { wasLocked = lockSharedSessions.remove(s) != null;
if (!lockSharedSessions.isEmpty()) { if (SysProperties.THREAD_DEADLOCK_DETECTOR) {
lockSharedSessions.remove(s); if (SHARED_LOCKS.get() != null) {
if (SysProperties.THREAD_DEADLOCK_DETECTOR) { SHARED_LOCKS.get().remove(getName());
if (SHARED_LOCKS.get() != null) {
SHARED_LOCKS.get().remove(getName());
}
} }
} }
if (!waitingSessions.isEmpty()) { }
getLockSyncObject().notifyAll(); if (wasLocked && !waitingSessions.isEmpty()) {
Object lockSyncObject = getLockSyncObject();
synchronized (lockSyncObject) {
lockSyncObject.notifyAll();
} }
} }
} }
......
...@@ -62,14 +62,17 @@ public class TestMemoryUsage extends TestBase { ...@@ -62,14 +62,17 @@ public class TestMemoryUsage extends TestBase {
} }
deleteDb("memoryUsage"); deleteDb("memoryUsage");
conn = getConnection("memoryUsage"); conn = getConnection("memoryUsage");
eatMemory(4000); try {
for (int i = 0; i < 4000; i++) { eatMemory(4000);
Connection c2 = getConnection("memoryUsage"); for (int i = 0; i < 4000; i++) {
c2.createStatement(); Connection c2 = getConnection("memoryUsage");
c2.close(); c2.createStatement();
c2.close();
}
} finally {
freeMemory();
conn.close();
} }
freeMemory();
conn.close();
} }
private void testCreateDropLoop() throws SQLException { private void testCreateDropLoop() throws SQLException {
...@@ -140,8 +143,8 @@ public class TestMemoryUsage extends TestBase { ...@@ -140,8 +143,8 @@ public class TestMemoryUsage extends TestBase {
} }
} }
} finally { } finally {
conn.close();
freeMemory(); freeMemory();
conn.close();
} }
} }
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论