Skip to content

Commit 9ae7675

Browse files
committed
feat: impl PoolWaitTimeoutError
Throw error when get connection wait time great than poolWaitTimeout.
1 parent 194062e commit 9ae7675

File tree

5 files changed

+158
-6
lines changed

5 files changed

+158
-6
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,13 @@ const db = new RDSClient({
4848
// connectionStorage: new AsyncLocalStorage(),
4949
// If create multiple RDSClient instances with the same connectionStorage, use this key to distinguish between the instances
5050
// connectionStorageKey: 'datasource',
51+
52+
// The timeout for connecting to the MySQL server. (Default: 500 milliseconds)
53+
// connectTimeout: 500,
54+
55+
// The timeout for waiting for a connection from the connection pool. (Default: 500 milliseconds)
56+
// So max timeout for get a connection is (connectTimeout + poolWaitTimeout)
57+
// poolWaitTimeout: 500,
5158
});
5259
```
5360

src/client.ts

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { AsyncLocalStorage } from 'node:async_hooks';
22
import { promisify } from 'node:util';
3+
import { setTimeout } from 'node:timers/promises';
34
import mysql, { Pool } from 'mysql2';
45
import type { PoolOptions } from 'mysql2';
56
import type { PoolConnectionPromisify, RDSClientOptions, TransactionContext, TransactionScope } from './types';
@@ -10,12 +11,17 @@ import literals from './literals';
1011
import channels from './channels';
1112
import type { ConnectionMessage, ConnectionEnqueueMessage } from './channels';
1213
import { RDSPoolConfig } from './PoolConfig';
14+
import { PoolWaitTimeoutError } from './util/PoolWaitTimeout';
15+
1316
export * from './types';
1417

1518
interface PoolPromisify extends Omit<Pool, 'query'> {
1619
query(sql: string): Promise<any>;
20+
1721
getConnection(): Promise<PoolConnectionPromisify>;
22+
1823
end(): Promise<void>;
24+
1925
_acquiringConnections: any[];
2026
_allConnections: any[];
2127
_freeConnections: any[];
@@ -30,21 +36,37 @@ export interface QueryOptions {
3036
}
3137

3238
export class RDSClient extends Operator {
33-
static get literals() { return literals; }
34-
static get escape() { return mysql.escape; }
35-
static get escapeId() { return mysql.escapeId; }
36-
static get format() { return mysql.format; }
37-
static get raw() { return mysql.raw; }
39+
static get literals() {
40+
return literals;
41+
}
42+
43+
static get escape() {
44+
return mysql.escape;
45+
}
46+
47+
static get escapeId() {
48+
return mysql.escapeId;
49+
}
50+
51+
static get format() {
52+
return mysql.format;
53+
}
54+
55+
static get raw() {
56+
return mysql.raw;
57+
}
3858

3959
static #DEFAULT_STORAGE_KEY = Symbol('RDSClient#storage#default');
4060
static #TRANSACTION_NEST_COUNT = Symbol('RDSClient#transaction#nestCount');
4161

4262
#pool: PoolPromisify;
4363
#connectionStorage: AsyncLocalStorage<TransactionContext>;
4464
#connectionStorageKey: string | symbol;
65+
#poolWaitTimeout: number;
4566

4667
constructor(options: RDSClientOptions) {
4768
super();
69+
options.connectTimeout = options.connectTimeout ?? 500;
4870
const { connectionStorage, connectionStorageKey, ...mysqlOptions } = options;
4971
// get connection options from getConnectionConfig method every time
5072
if (mysqlOptions.getConnectionConfig) {
@@ -61,6 +83,7 @@ export class RDSClient extends Operator {
6183
});
6284
this.#connectionStorage = connectionStorage || new AsyncLocalStorage();
6385
this.#connectionStorageKey = connectionStorageKey || RDSClient.#DEFAULT_STORAGE_KEY;
86+
this.#poolWaitTimeout = options.poolWaitTimeout || 500;
6487
// https://github.com/mysqljs/mysql#pool-events
6588
this.#pool.on('connection', (connection: PoolConnectionPromisify) => {
6689
channels.connectionNew.publish({
@@ -129,9 +152,30 @@ export class RDSClient extends Operator {
129152
};
130153
}
131154

155+
async waitPoolConnection(abortSignal: AbortSignal) {
156+
const now = performance.now();
157+
await setTimeout(this.#poolWaitTimeout, undefined, { signal: abortSignal });
158+
return performance.now() - now;
159+
}
160+
161+
async getConnectionWithTimeout() {
162+
const connPromise = this.#pool.getConnection();
163+
const timeoutAbortController = new AbortController();
164+
const timeoutPromise = this.waitPoolConnection(timeoutAbortController.signal);
165+
const connOrTimeout = await Promise.race([ connPromise, timeoutPromise ]);
166+
if (typeof connOrTimeout === 'number') {
167+
connPromise.then(conn => {
168+
conn.release();
169+
});
170+
throw new PoolWaitTimeoutError(`get connection timeout after ${connOrTimeout}ms`);
171+
}
172+
timeoutAbortController.abort();
173+
return connPromise;
174+
}
175+
132176
async getConnection() {
133177
try {
134-
const _conn = await this.#pool.getConnection();
178+
const _conn = await this.getConnectionWithTimeout();
135179
const conn = new RDSConnection(_conn);
136180
if (this.beforeQueryHandlers.length > 0) {
137181
for (const handler of this.beforeQueryHandlers) {

src/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ export interface RDSClientOptions extends PoolOptions {
88
connectionStorageKey?: string;
99
connectionStorage?: AsyncLocalStorage<Record<PropertyKey, RDSTransaction>>;
1010
getConnectionConfig?: GetConnectionConfig;
11+
poolWaitTimeout?: number;
1112
}
1213

1314
export interface PoolConnectionPromisify extends Omit<PoolConnection, 'query'> {

src/util/PoolWaitTimeout.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export class PoolWaitTimeoutError extends Error {
2+
constructor(...args) {
3+
super(...args);
4+
this.name = 'PoolWaitTimeoutError';
5+
}
6+
}

test/client.test.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { AsyncLocalStorage } from 'node:async_hooks';
22
import { strict as assert } from 'node:assert';
33
import fs from 'node:fs/promises';
4+
import { setTimeout } from 'node:timers/promises';
45
import path from 'node:path';
56
import mm from 'mm';
67
import { RDSTransaction } from '../src/transaction';
@@ -1493,4 +1494,97 @@ describe('test/client.test.ts', () => {
14931494
assert.equal(counter2After, 4);
14941495
});
14951496
});
1497+
1498+
describe('PoolWaitTimeout', () => {
1499+
async function longQuery(timeout?: number) {
1500+
await db.beginTransactionScope(async conn => {
1501+
await setTimeout(timeout ?? 1000);
1502+
await conn.query('SELECT 1+1');
1503+
});
1504+
}
1505+
1506+
it('should throw error if pool wait timeout', async () => {
1507+
const tasks: Array<Promise<void>> = [];
1508+
for (let i = 0; i < 10; i++) {
1509+
tasks.push(longQuery());
1510+
}
1511+
await assert.rejects(async () => {
1512+
await longQuery();
1513+
}, /get connection timeout after/);
1514+
await Promise.all(tasks);
1515+
});
1516+
1517+
it('should release conn to pool', async () => {
1518+
const tasks: Array<Promise<void>> = [];
1519+
const timeoutTasks: Array<Promise<void>> = [];
1520+
// 1. fill the pool
1521+
for (let i = 0; i < 10; i++) {
1522+
tasks.push(longQuery());
1523+
}
1524+
// 2. add more conn and wait for timeout
1525+
for (let i = 0; i < 10; i++) {
1526+
timeoutTasks.push(longQuery());
1527+
}
1528+
const [ succeedTasks, failedTasks ] = await Promise.all([
1529+
Promise.allSettled(tasks),
1530+
Promise.allSettled(timeoutTasks),
1531+
]);
1532+
const succeedCount = succeedTasks.filter(t => t.status === 'fulfilled').length;
1533+
assert.equal(succeedCount, 10);
1534+
1535+
const failedCount = failedTasks.filter(t => t.status === 'rejected').length;
1536+
assert.equal(failedCount, 10);
1537+
1538+
// 3. after pool empty, create new tasks
1539+
const retryTasks: Array<Promise<void>> = [];
1540+
for (let i = 0; i < 10; i++) {
1541+
retryTasks.push(longQuery());
1542+
}
1543+
await Promise.all(retryTasks);
1544+
});
1545+
1546+
it('should not wait too long', async () => {
1547+
const tasks: Array<Promise<void>> = [];
1548+
const timeoutTasks: Array<Promise<void>> = [];
1549+
const fastTasks: Array<Promise<void>> = [];
1550+
const start = performance.now();
1551+
// 1. fill the pool
1552+
for (let i = 0; i < 10; i++) {
1553+
tasks.push(longQuery());
1554+
}
1555+
const tasksPromise = Promise.allSettled(tasks);
1556+
// 2. add more conn and wait for timeout
1557+
for (let i = 0; i < 10; i++) {
1558+
timeoutTasks.push(longQuery());
1559+
}
1560+
const timeoutTasksPromise = Promise.allSettled(timeoutTasks);
1561+
await setTimeout(600);
1562+
// 3. add fast query
1563+
for (let i = 0; i < 10; i++) {
1564+
fastTasks.push(longQuery(1));
1565+
}
1566+
const fastTasksPromise = Promise.allSettled(fastTasks);
1567+
const [ succeedTasks, failedTasks, fastTaskResults ] = await Promise.all([
1568+
tasksPromise,
1569+
timeoutTasksPromise,
1570+
fastTasksPromise,
1571+
]);
1572+
const duration = performance.now() - start;
1573+
const succeedCount = succeedTasks.filter(t => t.status === 'fulfilled').length;
1574+
assert.equal(succeedCount, 10);
1575+
1576+
const failedCount = failedTasks.filter(t => t.status === 'rejected').length;
1577+
assert.equal(failedCount, 10);
1578+
1579+
const faskTaskSucceedCount = fastTaskResults.filter(t => t.status === 'fulfilled').length;
1580+
assert.equal(faskTaskSucceedCount, 10);
1581+
1582+
// - 10 long queries cost 1000ms
1583+
// - 10 timeout queries should be timeout in long query execution so not cost time
1584+
// - 10 fast queries wait long query to finish, cost 1ms
1585+
// 1000ms + 0ms + 1ms < 1100ms
1586+
assert(duration < 1100);
1587+
});
1588+
1589+
});
14961590
});

0 commit comments

Comments
 (0)