提交 57c231c1 authored 作者: Thomas Mueller's avatar Thomas Mueller

Issue 265: Linked tables: auto-reconnect if the backside connection is lost…

Issue 265: Linked tables: auto-reconnect if the backside connection is lost (workaround for the MySQL problem that disconnects after 8 hours of inactivity).
上级 c0fc2fd5
......@@ -8,6 +8,7 @@ package org.h2.index;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import org.h2.engine.Constants;
import org.h2.engine.Session;
import org.h2.message.DbException;
......@@ -16,6 +17,7 @@ import org.h2.result.SearchRow;
import org.h2.table.Column;
import org.h2.table.IndexColumn;
import org.h2.table.TableLink;
import org.h2.util.New;
import org.h2.util.StatementBuilder;
import org.h2.value.Value;
import org.h2.value.ValueNull;
......@@ -49,6 +51,7 @@ public class LinkedIndex extends BaseIndex {
}
public void add(Session session, Row row) {
ArrayList<Value> params = New.arrayList();
StatementBuilder buff = new StatementBuilder("INSERT INTO ");
buff.append(targetTableName).append(" VALUES(");
for (int i = 0; i < row.getColumnCount(); i++) {
......@@ -60,29 +63,21 @@ public class LinkedIndex extends BaseIndex {
buff.append("NULL");
} else {
buff.append('?');
params.add(v);
}
}
buff.append(')');
String sql = buff.toString();
synchronized (link.getConnection()) {
try {
PreparedStatement prep = link.getPreparedStatement(sql, false);
for (int i = 0, j = 0; i < row.getColumnCount(); i++) {
Value v = row.getValue(i);
if (v != null && v != ValueNull.INSTANCE) {
v.set(prep, j + 1);
j++;
}
}
prep.executeUpdate();
rowCount++;
} catch (Exception e) {
throw TableLink.wrapException(sql, e);
}
try {
link.execute(sql, params, true);
rowCount++;
} catch (Exception e) {
throw TableLink.wrapException(sql, e);
}
}
public Cursor find(Session session, SearchRow first, SearchRow last) {
ArrayList<Value> params = New.arrayList();
StatementBuilder buff = new StatementBuilder("SELECT * FROM ");
buff.append(targetTableName).append(" T");
for (int i = 0; first != null && i < first.getColumnCount(); i++) {
......@@ -97,6 +92,7 @@ public class LinkedIndex extends BaseIndex {
} else {
buff.append(">=");
addParameter(buff, col);
params.add(v);
}
}
}
......@@ -112,33 +108,17 @@ public class LinkedIndex extends BaseIndex {
} else {
buff.append("<=");
addParameter(buff, col);
params.add(v);
}
}
}
String sql = buff.toString();
synchronized (link.getConnection()) {
try {
PreparedStatement prep = link.getPreparedStatement(sql, true);
int j = 0;
for (int i = 0; first != null && i < first.getColumnCount(); i++) {
Value v = first.getValue(i);
if (v != null && v != ValueNull.INSTANCE) {
v.set(prep, j + 1);
j++;
}
}
for (int i = 0; last != null && i < last.getColumnCount(); i++) {
Value v = last.getValue(i);
if (v != null && v != ValueNull.INSTANCE) {
v.set(prep, j + 1);
j++;
}
}
ResultSet rs = prep.executeQuery();
return new LinkedCursor(link, rs, session, sql, prep);
} catch (Exception e) {
throw TableLink.wrapException(sql, e);
}
try {
PreparedStatement prep = link.execute(sql, params, false);
ResultSet rs = prep.getResultSet();
return new LinkedCursor(link, rs, session, sql, prep);
} catch (Exception e) {
throw TableLink.wrapException(sql, e);
}
}
......@@ -185,6 +165,7 @@ public class LinkedIndex extends BaseIndex {
}
public void remove(Session session, Row row) {
ArrayList<Value> params = New.arrayList();
StatementBuilder buff = new StatementBuilder("DELETE FROM ");
buff.append(targetTableName).append(" WHERE ");
for (int i = 0; i < row.getColumnCount(); i++) {
......@@ -197,25 +178,18 @@ public class LinkedIndex extends BaseIndex {
} else {
buff.append('=');
addParameter(buff, col);
params.add(v);
buff.append(' ');
}
}
String sql = buff.toString();
synchronized (link.getConnection()) {
try {
PreparedStatement prep = link.getPreparedStatement(sql, false);
for (int i = 0, j = 0; i < row.getColumnCount(); i++) {
Value v = row.getValue(i);
if (!isNull(v)) {
v.set(prep, j + 1);
j++;
}
}
int count = prep.executeUpdate();
rowCount -= count;
} catch (Exception e) {
throw TableLink.wrapException(sql, e);
}
try {
PreparedStatement prep = link.execute(sql, params, false);
int count = prep.executeUpdate();
link.reusePreparedStatement(prep, sql);
rowCount -= count;
} catch (Exception e) {
throw TableLink.wrapException(sql, e);
}
}
......@@ -227,6 +201,7 @@ public class LinkedIndex extends BaseIndex {
* @param newRow the new data
*/
public void update(Row oldRow, Row newRow) {
ArrayList<Value> params = New.arrayList();
StatementBuilder buff = new StatementBuilder("UPDATE ");
buff.append(targetTableName).append(" SET ");
for (int i = 0; i < newRow.getColumnCount(); i++) {
......@@ -237,6 +212,7 @@ public class LinkedIndex extends BaseIndex {
buff.append("DEFAULT");
} else {
buff.append('?');
params.add(v);
}
}
buff.append(" WHERE ");
......@@ -250,34 +226,15 @@ public class LinkedIndex extends BaseIndex {
buff.append(" IS NULL");
} else {
buff.append('=');
params.add(v);
addParameter(buff, col);
}
}
String sql = buff.toString();
synchronized (link.getConnection()) {
try {
int j = 1;
PreparedStatement prep = link.getPreparedStatement(sql, false);
for (int i = 0; i < newRow.getColumnCount(); i++) {
Value v = newRow.getValue(i);
if (v != null) {
v.set(prep, j);
j++;
}
}
for (int i = 0; i < oldRow.getColumnCount(); i++) {
Value v = oldRow.getValue(i);
if (!isNull(v)) {
v.set(prep, j);
j++;
}
}
int count = prep.executeUpdate();
// this has no effect but at least it allows to debug the update count
rowCount = rowCount + count - count;
} catch (Exception e) {
throw TableLink.wrapException(sql, e);
}
try {
link.execute(sql, params, true);
} catch (Exception e) {
throw TableLink.wrapException(sql, e);
}
}
......
......@@ -30,6 +30,7 @@ import org.h2.schema.Schema;
import org.h2.util.JdbcUtils;
import org.h2.util.MathUtils;
import org.h2.util.New;
import org.h2.util.StatementBuilder;
import org.h2.util.StringUtils;
import org.h2.value.DataType;
import org.h2.value.Value;
......@@ -43,6 +44,8 @@ import org.h2.value.ValueTimestamp;
*/
public class TableLink extends Table {
private static final int MAX_RETRY = 2;
private static final long ROW_COUNT_APPROXIMATION = 100000;
private String driver, url, user, password, originalSchema, originalTable, qualifiedTableName;
......@@ -72,7 +75,6 @@ public class TableLink extends Table {
try {
connect();
} catch (DbException e) {
connectException = e;
if (!force) {
throw e;
}
......@@ -84,15 +86,26 @@ public class TableLink extends Table {
}
private void connect() {
conn = database.getLinkConnection(driver, url, user, password);
synchronized (conn) {
connectException = null;
for (int retry = 0;; retry++) {
try {
readMetaData();
} catch (Exception e) {
// could be SQLException or RuntimeException
conn.close();
conn = null;
throw DbException.convert(e);
conn = database.getLinkConnection(driver, url, user, password);
synchronized (conn) {
try {
readMetaData();
return;
} catch (Exception e) {
// could be SQLException or RuntimeException
conn.close(true);
conn = null;
throw DbException.convert(e);
}
}
} catch (DbException e) {
if (retry >= MAX_RETRY) {
connectException = e;
throw e;
}
}
}
}
......@@ -395,7 +408,7 @@ public class TableLink extends Table {
public void close(Session session) {
if (conn != null) {
try {
conn.close();
conn.close(false);
} finally {
conn = null;
}
......@@ -405,11 +418,12 @@ public class TableLink extends Table {
public synchronized long getRowCount(Session session) {
String sql = "SELECT COUNT(*) FROM " + qualifiedTableName;
try {
PreparedStatement prep = getPreparedStatement(sql, false);
ResultSet rs = prep.executeQuery();
PreparedStatement prep = execute(sql, null, false);
ResultSet rs = prep.getResultSet();
rs.next();
long count = rs.getLong(1);
rs.close();
reusePreparedStatement(prep, sql);
return count;
} catch (Exception e) {
throw wrapException(sql, e);
......@@ -433,33 +447,60 @@ public class TableLink extends Table {
}
/**
* Get a prepared statement object for the given statement. Prepared
* Execute a SQL statement using the given parameters. Prepared
* statements are kept in a hash map to avoid re-creating them.
*
* @param sql the SQL statement
* @param exclusive if the prepared statement must be removed from the map
* until reusePreparedStatement is called (only required for queries)
* @return the prepared statement
* @param params the parameters or null
* @param reusePrepared if the prepared statement can be re-used immediately
* @return the prepared statement, or null if it is re-used
*/
public PreparedStatement getPreparedStatement(String sql, boolean exclusive) {
if (trace.isDebugEnabled()) {
trace.debug("{0} :\n{1}", getName(), sql);
public PreparedStatement execute(String sql, ArrayList<Value> params, boolean reusePrepared) {
if (conn == null) {
throw connectException;
}
try {
if (conn == null) {
throw connectException;
}
PreparedStatement prep = preparedMap.get(sql);
if (prep == null) {
prep = conn.getConnection().prepareStatement(sql);
preparedMap.put(sql, prep);
}
if (exclusive) {
preparedMap.remove(sql);
for (int retry = 0;; retry++) {
try {
synchronized (conn) {
PreparedStatement prep = preparedMap.remove(sql);
if (prep == null) {
prep = conn.getConnection().prepareStatement(sql);
}
if (trace.isDebugEnabled()) {
StatementBuilder buff = new StatementBuilder();
buff.append(getName()).append(":\n").append(sql);
if (params != null && params.size() > 0) {
buff.append(" {");
int i = 1;
for (Value v : params) {
buff.appendExceptFirst(", ");
buff.append(i++).append(": ").append(v.getSQL());
}
buff.append('}');
}
buff.append(';');
trace.debug(buff.toString());
}
if (params != null) {
for (int i = 0, size = params.size(); i < size; i++) {
Value v = params.get(i);
v.set(prep, i + 1);
}
}
prep.execute();
if (reusePrepared) {
reusePreparedStatement(prep, sql);
return null;
}
return prep;
}
} catch (SQLException e) {
if (retry >= MAX_RETRY) {
throw DbException.convert(e);
}
conn.close(true);
connect();
}
return prep;
} catch (SQLException e) {
throw DbException.convert(e);
}
}
......@@ -552,10 +593,6 @@ public class TableLink extends Table {
this.readOnly = readOnly;
}
public TableLinkConnection getConnection() {
return conn;
}
public long getRowCountApproximation() {
return ROW_COUNT_APPROXIMATION;
}
......@@ -567,7 +604,9 @@ public class TableLink extends Table {
* @param sql the SQL statement
*/
public void reusePreparedStatement(PreparedStatement prep, String sql) {
preparedMap.put(sql, prep);
synchronized (conn) {
preparedMap.put(sql, prep);
}
}
public boolean isDeterministic() {
......
......@@ -124,11 +124,13 @@ public class TableLinkConnection {
/**
* Closes the connection if this is the last link to it.
*
* @param force if the connection needs to be closed even if it is still
* used elsewhere (for example, because the connection is broken)
*/
synchronized void close() {
if (--useCounter <= 0) {
synchronized void close(boolean force) {
if (--useCounter <= 0 || force) {
JdbcUtils.closeSilently(conn);
conn = null;
synchronized (map) {
map.remove(this);
}
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论