Skip to content

Commit 9f4d4e3

Browse files
authored
Pool queries should honor connection timeout (#1586) (#1588)
See #1232 If a connection timeout is defined in connect options, the pool uses it when users acquire a connection with pool.getConnection, but not when they execute a query with pool.query Signed-off-by: Thomas Segismont <tsegismont@gmail.com>
1 parent ba112bf commit 9f4d4e3

File tree

3 files changed

+54
-16
lines changed

3 files changed

+54
-16
lines changed

vertx-pg-client/src/test/java/io/vertx/pgclient/PgPoolTest.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,14 @@
3434
import org.junit.Rule;
3535
import org.junit.Test;
3636

37-
import java.util.ArrayList;
38-
import java.util.Collections;
39-
import java.util.HashSet;
40-
import java.util.List;
41-
import java.util.Set;
37+
import java.util.*;
4238
import java.util.concurrent.TimeUnit;
4339
import java.util.concurrent.atomic.AtomicInteger;
4440
import java.util.concurrent.atomic.AtomicLong;
4541
import java.util.concurrent.atomic.AtomicReference;
4642
import java.util.stream.Collector;
4743

44+
import static java.util.concurrent.TimeUnit.SECONDS;
4845
import static java.util.stream.Collectors.mapping;
4946
import static java.util.stream.Collectors.toList;
5047

@@ -592,4 +589,17 @@ private void testConnectionClosedInProvider(TestContext ctx, boolean immediately
592589
}));
593590
}));
594591
}
592+
593+
@Test
594+
public void testPooledQueryTimeout(TestContext ctx) {
595+
Async async = ctx.async();
596+
PoolOptions poolOptions = new PoolOptions().setMaxSize(1).setConnectionTimeout(1).setConnectionTimeoutUnit(SECONDS);
597+
Pool pool = createPool(options, poolOptions);
598+
pool.getConnection().onComplete(ctx.asyncAssertSuccess(conn -> {
599+
pool.query("SELECT 1").execute().onComplete(ctx.asyncAssertFailure(t -> {
600+
conn.close();
601+
async.complete();
602+
}));
603+
}));
604+
}
595605
}

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/PoolImpl.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@
2525
import io.vertx.core.impl.future.PromiseInternal;
2626
import io.vertx.core.spi.metrics.PoolMetrics;
2727
import io.vertx.core.spi.metrics.VertxMetrics;
28-
import io.vertx.sqlclient.*;
28+
import io.vertx.sqlclient.Pool;
29+
import io.vertx.sqlclient.PoolOptions;
30+
import io.vertx.sqlclient.SqlConnection;
31+
import io.vertx.sqlclient.TransactionRollbackException;
2932
import io.vertx.sqlclient.impl.command.CommandBase;
3033
import io.vertx.sqlclient.impl.pool.SqlConnectionPool;
3134
import io.vertx.sqlclient.spi.Driver;
@@ -179,7 +182,7 @@ public Future<SqlConnection> getConnection() {
179182

180183
@Override
181184
public <R> Future<R> schedule(ContextInternal context, CommandBase<R> cmd) {
182-
return pool.execute(context, cmd);
185+
return pool.execute(context, cmd, connectionTimeout);
183186
}
184187

185188
private void acquire(ContextInternal context, long timeout, Handler<AsyncResult<SqlConnectionPool.PooledConnection>> completionHandler) {

vertx-sql-client/src/main/java/io/vertx/sqlclient/impl/pool/SqlConnectionPool.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -207,28 +207,53 @@ private Object endUse(Object metric) {
207207
return NO_METRICS;
208208
}
209209

210-
public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd) {
210+
private static final Exception POOL_QUERY_TIMEOUT_EXCEPTION = new VertxException("Timeout waiting for connection", true);
211+
212+
public <R> Future<R> execute(ContextInternal context, CommandBase<R> cmd, long timeout) {
213+
Promise<R> res = context.promise();
214+
long timerId;
215+
if (timeout > 0) {
216+
timerId = context.setTimer(timeout, t -> res.fail(POOL_QUERY_TIMEOUT_EXCEPTION));
217+
} else {
218+
timerId = -1;
219+
}
211220
Promise<Lease<PooledConnection>> p = context.promise();
212221
Object queueMetric = enqueueMetric();
213222
pool.acquire(context, 0, p);
214-
return p.future().compose(lease -> {
215-
Object useMetric = dequeueAndBeginUse(queueMetric);
223+
p.future().compose(lease -> {
216224
PooledConnection pooled = lease.get();
217-
Connection conn = pooled.conn;
225+
Object useMetric;
218226
Future<R> future;
219-
if (afterAcquire != null) {
220-
future = afterAcquire.apply(conn)
221-
.compose(v -> pooled.schedule(context, cmd))
222-
.eventually(v -> beforeRecycle.apply(conn));
227+
if (timerId != -1 && !vertx.cancelTimer(timerId)) {
228+
// We want to make sure the connection is released properly below
229+
// But we don't want to record begin/end pool metrics
230+
dequeueAndReject(queueMetric);
231+
useMetric = NO_METRICS;
232+
future = Future.failedFuture(POOL_QUERY_TIMEOUT_EXCEPTION);
223233
} else {
224-
future = pooled.schedule(context, cmd);
234+
useMetric = dequeueAndBeginUse(queueMetric);
235+
if (afterAcquire != null) {
236+
Connection conn = pooled.conn;
237+
future = afterAcquire.apply(conn)
238+
.compose(v -> pooled.schedule(context, cmd))
239+
.eventually(() -> beforeRecycle.apply(conn));
240+
} else {
241+
future = pooled.schedule(context, cmd);
242+
}
225243
}
226244
return future.andThen(ar -> {
227245
endUse(useMetric);
228246
pooled.refresh();
229247
lease.recycle();
230248
});
249+
}).onComplete(ar -> {
250+
if (ar.succeeded()) {
251+
res.complete(ar.result());
252+
} else if (!POOL_QUERY_TIMEOUT_EXCEPTION.equals(ar.cause())) {
253+
res.fail(ar.cause());
254+
}
231255
});
256+
return res.future();
232257
}
233258

234259
public void acquire(ContextInternal context, long timeout, Handler<AsyncResult<PooledConnection>> handler) {

0 commit comments

Comments
 (0)