提交 1107e4e5 authored 作者: James Pan's avatar James Pan

Improve concurrency of connection pool with wait-free implement

上级 0c46e47e
...@@ -22,8 +22,11 @@ package org.h2.jdbcx; ...@@ -22,8 +22,11 @@ package org.h2.jdbcx;
import java.io.PrintWriter; import java.io.PrintWriter;
import java.sql.Connection; import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger; import java.util.logging.Logger;
import javax.sql.ConnectionEvent; import javax.sql.ConnectionEvent;
...@@ -33,7 +36,6 @@ import javax.sql.DataSource; ...@@ -33,7 +36,6 @@ import javax.sql.DataSource;
import javax.sql.PooledConnection; import javax.sql.PooledConnection;
import org.h2.message.DbException; import org.h2.message.DbException;
import org.h2.util.Utils;
/** /**
* A simple standalone JDBC connection pool. * A simple standalone JDBC connection pool.
...@@ -69,12 +71,12 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -69,12 +71,12 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
private static final int DEFAULT_MAX_CONNECTIONS = 10; private static final int DEFAULT_MAX_CONNECTIONS = 10;
private final ConnectionPoolDataSource dataSource; private final ConnectionPoolDataSource dataSource;
private final ArrayList<PooledConnection> recycledConnections = Utils.newSmallArrayList(); private final Queue<PooledConnection> recycledConnections = new ConcurrentLinkedQueue<>();
private PrintWriter logWriter; private PrintWriter logWriter;
private int maxConnections = DEFAULT_MAX_CONNECTIONS; private volatile int maxConnections = DEFAULT_MAX_CONNECTIONS;
private int timeout = DEFAULT_TIMEOUT; private volatile int timeout = DEFAULT_TIMEOUT;
private int activeConnections; private AtomicInteger activeConnections = new AtomicInteger(0);
private boolean isDisposed; private AtomicBoolean isDisposed = new AtomicBoolean(false);
protected JdbcConnectionPool(ConnectionPoolDataSource dataSource) { protected JdbcConnectionPool(ConnectionPoolDataSource dataSource) {
this.dataSource = dataSource; this.dataSource = dataSource;
...@@ -120,13 +122,11 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -120,13 +122,11 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
* *
* @param max the maximum number of connections * @param max the maximum number of connections
*/ */
public synchronized void setMaxConnections(int max) { public void setMaxConnections(int max) {
if (max < 1) { if (max < 1) {
throw new IllegalArgumentException("Invalid maxConnections value: " + max); throw new IllegalArgumentException("Invalid maxConnections value: " + max);
} }
this.maxConnections = max; this.maxConnections = max;
// notify waiting threads if the value was increased
notifyAll();
} }
/** /**
...@@ -134,7 +134,7 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -134,7 +134,7 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
* *
* @return the max the maximum number of connections * @return the max the maximum number of connections
*/ */
public synchronized int getMaxConnections() { public int getMaxConnections() {
return maxConnections; return maxConnections;
} }
...@@ -144,7 +144,7 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -144,7 +144,7 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
* @return the timeout in seconds * @return the timeout in seconds
*/ */
@Override @Override
public synchronized int getLoginTimeout() { public int getLoginTimeout() {
return timeout; return timeout;
} }
...@@ -156,7 +156,7 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -156,7 +156,7 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
* @param seconds the timeout, 0 meaning the default * @param seconds the timeout, 0 meaning the default
*/ */
@Override @Override
public synchronized void setLoginTimeout(int seconds) { public void setLoginTimeout(int seconds) {
if (seconds == 0) { if (seconds == 0) {
seconds = DEFAULT_TIMEOUT; seconds = DEFAULT_TIMEOUT;
} }
...@@ -167,13 +167,12 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -167,13 +167,12 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
* Closes all unused pooled connections. * Closes all unused pooled connections.
* Exceptions while closing are written to the log stream (if set). * Exceptions while closing are written to the log stream (if set).
*/ */
public synchronized void dispose() { public void dispose() {
if (isDisposed) { isDisposed.set(true);
return;
} PooledConnection pc;
isDisposed = true; while ((pc = recycledConnections.poll()) != null) {
for (PooledConnection aList : recycledConnections) { closeConnection(pc);
closeConnection(aList);
} }
} }
...@@ -193,18 +192,28 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -193,18 +192,28 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
@Override @Override
public Connection getConnection() throws SQLException { public Connection getConnection() throws SQLException {
long max = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeout); long max = System.nanoTime() + TimeUnit.SECONDS.toNanos(timeout);
int spin = 0;
do { do {
synchronized (this) { if (activeConnections.incrementAndGet() <= maxConnections) {
if (activeConnections < maxConnections) {
return getConnectionNow();
}
try { try {
wait(1000); return getConnectionNow();
} catch (InterruptedException e) { } catch (Throwable t) {
// ignore activeConnections.decrementAndGet();
throw t;
} }
} else {
activeConnections.decrementAndGet();
} }
} while (System.nanoTime() <= max); if (--spin >= 0) {
continue;
}
try {
spin = 3;
Thread.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} while (System.nanoTime() - max <= 0);
throw new SQLException("Login timeout", "08001", 8001); throw new SQLException("Login timeout", "08001", 8001);
} }
...@@ -217,17 +226,14 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -217,17 +226,14 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
} }
private Connection getConnectionNow() throws SQLException { private Connection getConnectionNow() throws SQLException {
if (isDisposed) { if (isDisposed.get()) {
throw new IllegalStateException("Connection pool has been disposed."); throw new IllegalStateException("Connection pool has been disposed.");
} }
PooledConnection pc; PooledConnection pc = recycledConnections.poll();
if (!recycledConnections.isEmpty()) { if (pc == null) {
pc = recycledConnections.remove(recycledConnections.size() - 1);
} else {
pc = dataSource.getPooledConnection(); pc = dataSource.getPooledConnection();
} }
Connection conn = pc.getConnection(); Connection conn = pc.getConnection();
activeConnections++;
pc.addConnectionEventListener(this); pc.addConnectionEventListener(this);
return conn; return conn;
} }
...@@ -239,19 +245,20 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -239,19 +245,20 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
* *
* @param pc the pooled connection * @param pc the pooled connection
*/ */
synchronized void recycleConnection(PooledConnection pc) { private void recycleConnection(PooledConnection pc) {
if (activeConnections <= 0) { int active = activeConnections.decrementAndGet();
if (active < 0) {
activeConnections.incrementAndGet();
throw new AssertionError(); throw new AssertionError();
} }
activeConnections--; if (!isDisposed.get() && active < maxConnections) {
if (!isDisposed && activeConnections < maxConnections) {
recycledConnections.add(pc); recycledConnections.add(pc);
if (isDisposed.get()) {
dispose();
}
} else { } else {
closeConnection(pc); closeConnection(pc);
} }
if (activeConnections >= maxConnections - 1) {
notifyAll();
}
} }
private void closeConnection(PooledConnection pc) { private void closeConnection(PooledConnection pc) {
...@@ -290,8 +297,8 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener, ...@@ -290,8 +297,8 @@ public class JdbcConnectionPool implements DataSource, ConnectionEventListener,
* *
* @return the number of active connections. * @return the number of active connections.
*/ */
public synchronized int getActiveConnections() { public int getActiveConnections() {
return activeConnections; return activeConnections.get();
} }
/** /**
......
Markdown 格式
0%
您添加了 0 到此讨论。请谨慎行事。
请先完成此评论的编辑!
注册 或者 后发表评论