Skip to content

Commit 64aa75d

Browse files
authored
feat: export connection and query diagnostics_channel (#111)
Drop Node.js < 16.17.0 support
1 parent 586317b commit 64aa75d

File tree

12 files changed

+160
-17
lines changed

12 files changed

+160
-17
lines changed

.eslintrc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
{
2-
"extends": "eslint-config-egg/typescript"
2+
"extends": [
3+
"eslint-config-egg/typescript",
4+
"eslint-config-egg/lib/rules/enforce-node-prefix"
5+
]
36
}

.github/workflows/nodejs.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,4 @@ jobs:
1313
uses: node-modules/github-actions/.github/workflows/node-test-mysql.yml@master
1414
with:
1515
os: 'ubuntu-latest'
16-
version: '16, 18, 20'
16+
version: '16.17.0, 16, 18, 20'

package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
"version": "6.1.0",
44
"description": "Aliyun RDS client",
55
"main": "lib/client.js",
6+
"types": "lib/client.d.ts",
67
"files": [
78
"lib"
89
],
@@ -43,7 +44,7 @@
4344
"mysql"
4445
],
4546
"engines": {
46-
"node": ">= 14.17.0"
47+
"node": ">= 16.17.0"
4748
},
4849
"author": "fengmk2 <fengmk2@gmail.com> (https://github.com/fengmk2)",
4950
"license": "MIT"

src/channels.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import diagnosticsChannel from 'node:diagnostics_channel';
2+
import type { PoolConnectionPromisify } from './types';
3+
import type { RDSClient } from './client';
4+
5+
export default {
6+
// pool events https://github.com/mysqljs/mysql#pool-events
7+
connectionNew: diagnosticsChannel.channel('ali-rds:connection:new'),
8+
connectionAcquire: diagnosticsChannel.channel('ali-rds:connection:acquire'),
9+
connectionRelease: diagnosticsChannel.channel('ali-rds:connection:release'),
10+
connectionEnqueue: diagnosticsChannel.channel('ali-rds:connection:enqueue'),
11+
// query
12+
queryStart: diagnosticsChannel.channel('ali-rds:query:start'),
13+
queryEnd: diagnosticsChannel.channel('ali-rds:query:end'),
14+
};
15+
16+
export interface ConnectionMessage {
17+
client: RDSClient;
18+
connection: PoolConnectionPromisify;
19+
}
20+
21+
export interface ConnectionEnqueueMessage {
22+
client: RDSClient;
23+
}
24+
25+
export interface QueryStartMessage {
26+
connection: PoolConnectionPromisify;
27+
sql: string;
28+
}
29+
30+
export interface QueryEndMessage {
31+
connection: PoolConnectionPromisify;
32+
sql: string;
33+
duration: number;
34+
error?: Error;
35+
}

src/client.ts

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import { RDSConnection } from './connection';
88
import { RDSTransaction } from './transaction';
99
import { RDSPoolConfig } from './PoolConfig';
1010
import literals from './literals';
11+
import channels from './channels';
12+
import type { ConnectionMessage, ConnectionEnqueueMessage } from './channels';
1113

1214
interface PoolPromisify extends Omit<Pool, 'query'> {
1315
query(sql: string): Promise<any>;
@@ -39,8 +41,8 @@ export class RDSClient extends Operator {
3941
// get connection options from getConnectionConfig method every time
4042
if (mysqlOptions.getConnectionConfig) {
4143
// eslint-disable-next-line @typescript-eslint/no-var-requires
42-
const Pool = require('mysql/lib/Pool');
43-
this.#pool = new Pool({
44+
const MySQLPool = require('mysql/lib/Pool');
45+
this.#pool = new MySQLPool({
4446
config: new RDSPoolConfig(mysqlOptions, mysqlOptions.getConnectionConfig),
4547
});
4648
// override _needsChangeUser to return false
@@ -57,11 +59,39 @@ export class RDSClient extends Operator {
5759
});
5860
this.#connectionStorage = connectionStorage || new AsyncLocalStorage();
5961
this.#connectionStorageKey = connectionStorageKey || RDSClient.#DEFAULT_STORAGE_KEY;
62+
// https://github.com/mysqljs/mysql#pool-events
63+
this.#pool.on('connection', (connection: PoolConnectionPromisify) => {
64+
channels.connectionNew.publish({
65+
client: this,
66+
connection,
67+
} as ConnectionMessage);
68+
});
69+
this.#pool.on('enqueue', () => {
70+
channels.connectionEnqueue.publish({
71+
client: this,
72+
} as ConnectionEnqueueMessage);
73+
});
74+
this.#pool.on('acquire', (connection: PoolConnectionPromisify) => {
75+
channels.connectionAcquire.publish({
76+
client: this,
77+
connection,
78+
} as ConnectionMessage);
79+
});
80+
this.#pool.on('release', (connection: PoolConnectionPromisify) => {
81+
channels.connectionRelease.publish({
82+
client: this,
83+
connection,
84+
} as ConnectionMessage);
85+
});
6086
}
6187

62-
// impl Operator._query
63-
protected async _query(sql: string) {
64-
return await this.#pool.query(sql);
88+
async query<T = any>(sql: string, values?: object | any[]): Promise<T> {
89+
const conn = await this.getConnection();
90+
try {
91+
return await conn.query(sql, values);
92+
} finally {
93+
conn.release();
94+
}
6595
}
6696

6797
get pool() {
@@ -197,7 +227,7 @@ export class RDSClient extends Operator {
197227
* @param scope - scope with code
198228
* @return {Object} - scope return result
199229
*/
200-
async beginTransactionScope(scope: TransactionScope) {
230+
async beginTransactionScope(scope: TransactionScope): Promise<any> {
201231
let ctx = this.#connectionStorage.getStore();
202232
if (ctx) {
203233
return await this.#beginTransactionScope(scope, ctx);

src/connection.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const kWrapToRDS = Symbol('kWrapToRDS');
77
export class RDSConnection extends Operator {
88
conn: PoolConnectionPromisify;
99
constructor(conn: PoolConnectionPromisify) {
10-
super();
10+
super(conn);
1111
this.conn = conn;
1212
if (!this.conn[kWrapToRDS]) {
1313
[

src/operator.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,33 @@ import {
88
LockResult, LockTableOption,
99
SelectOption,
1010
UpdateOption, UpdateResult, UpdateRow,
11+
PoolConnectionPromisify,
1112
} from './types';
13+
import channels from './channels';
14+
import type { QueryStartMessage, QueryEndMessage } from './channels';
1215

1316
const debug = debuglog('ali-rds:operator');
1417

1518
/**
1619
* Operator Interface
1720
*/
1821
export abstract class Operator {
22+
#connection: PoolConnectionPromisify;
23+
constructor(connection?: PoolConnectionPromisify) {
24+
if (connection) {
25+
this.#connection = connection;
26+
}
27+
}
28+
1929
protected beforeQueryHandlers: BeforeQueryHandler[] = [];
2030
protected afterQueryHandlers: AfterQueryHandler[] = [];
2131

2232
get literals() { return literals; }
2333

34+
get threadId() {
35+
return this.#connection?.threadId;
36+
}
37+
2438
beforeQuery(beforeQueryHandler: BeforeQueryHandler) {
2539
this.beforeQueryHandlers.push(beforeQueryHandler);
2640
}
@@ -66,9 +80,13 @@ export abstract class Operator {
6680
}
6781
}
6882
debug('query %o', sql);
69-
const queryStart = Date.now();
83+
const queryStart = performance.now();
7084
let rows: any;
7185
let lastError: Error | undefined;
86+
channels.queryStart.publish({
87+
sql,
88+
connection: this.#connection,
89+
} as QueryStartMessage);
7290
try {
7391
rows = await this._query(sql);
7492
if (Array.isArray(rows)) {
@@ -83,10 +101,16 @@ export abstract class Operator {
83101
debug('query error: %o', err);
84102
throw err;
85103
} finally {
104+
const duration = Math.floor((performance.now() - queryStart) * 1000) / 1000;
105+
channels.queryEnd.publish({
106+
sql,
107+
connection: this.#connection,
108+
duration,
109+
error: lastError,
110+
} as QueryEndMessage);
86111
if (this.afterQueryHandlers.length > 0) {
87-
const execDuration = Date.now() - queryStart;
88112
for (const afterQueryHandler of this.afterQueryHandlers) {
89-
afterQueryHandler(sql, rows, execDuration, lastError);
113+
afterQueryHandler(sql, rows, duration, lastError);
90114
}
91115
}
92116
}

src/transaction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export class RDSTransaction extends Operator {
66
isRollback = false;
77
conn: RDSConnection | null;
88
constructor(conn: RDSConnection) {
9-
super();
9+
super(conn.conn);
1010
this.conn = conn;
1111
}
1212

src/types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { AsyncLocalStorage } from 'async_hooks';
1+
import { AsyncLocalStorage } from 'node:async_hooks';
22
import type { PoolConnection, PoolConfig, ConnectionConfig } from 'mysql';
33
import { RDSTransaction } from './transaction';
44

test/PoolConfig.test.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,41 @@
11
import { strict as assert } from 'node:assert';
22
import fs from 'node:fs/promises';
33
import path from 'node:path';
4+
import diagnosticsChannel from 'node:diagnostics_channel';
45
import mm from 'mm';
56
import config from './config';
67
import { RDSClient } from '../src/client';
8+
import type { ConnectionMessage, QueryEndMessage } from '../src/channels';
79

810
describe('test/PoolConfig.test.ts', () => {
911
const prefix = 'prefix-PoolConfig' + process.version + '-';
1012
const table = 'ali-sdk-test-user';
1113
let db: RDSClient;
1214
let index = 0;
1315
let newConnectionCount = 0;
16+
let newConnectionCountByDiagnosticsChannel = 0;
17+
let queryCount = 0;
18+
let queryErrorCount = 0;
19+
let end = false;
20+
1421
before(async () => {
22+
diagnosticsChannel.subscribe('ali-rds:connection:new', message => {
23+
if (end) return;
24+
const { connection } = message as ConnectionMessage;
25+
console.log('[diagnosticsChannel] connection threadId %o created', connection.threadId);
26+
newConnectionCountByDiagnosticsChannel++;
27+
});
28+
diagnosticsChannel.subscribe('ali-rds:query:end', message => {
29+
if (end) return;
30+
const { connection, sql, duration, error } = message as QueryEndMessage;
31+
console.log('[diagnosticsChannel] connection threadId %o query %o, duration: %oms, error: %o',
32+
connection.threadId, sql, duration, error);
33+
queryCount++;
34+
if (error) {
35+
queryErrorCount++;
36+
}
37+
});
38+
1539
db = new RDSClient({
1640
// test getConnectionConfig
1741
connectionLimit: 2,
@@ -44,7 +68,9 @@ describe('test/PoolConfig.test.ts', () => {
4468
});
4569

4670
after(async () => {
47-
return await db.end();
71+
await db.end();
72+
assert.equal(queryCount, 7);
73+
end = true;
4874
});
4975

5076
afterEach(() => {
@@ -79,6 +105,18 @@ describe('test/PoolConfig.test.ts', () => {
79105
assert(Array.isArray(results[2]));
80106
assert.equal(index, 3);
81107
assert.equal(newConnectionCount, 2);
108+
assert.equal(newConnectionCountByDiagnosticsChannel, 2);
109+
});
110+
111+
it('should query error', async () => {
112+
assert.equal(queryErrorCount, 0);
113+
await assert.rejects(async () => {
114+
await db.query('show tables wrong sql');
115+
}, (err: Error) => {
116+
assert.match(err.message, /You have an error in your SQL syntax/);
117+
return true;
118+
});
119+
assert.equal(queryErrorCount, 1);
82120
});
83121
});
84122
});

0 commit comments

Comments
 (0)