Skip to content

Commit f49ba4b

Browse files
authored
feat: implement mysql execute (#15)
<!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit * **New Features** * Added an execute capability for running SQL statements with parameter binding across client, connection, transaction, and operator interfaces. * Added queryOne helper to return the first row or null from a query. * **Tests** * Expanded test coverage for execute workflows: standalone execution, transaction scenarios, multi-connection ops, and DML result verification. <sub>✏️ Tip: You can customize this high-level summary in your review settings.</sub> <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent a208cf2 commit f49ba4b

File tree

6 files changed

+172
-29
lines changed

6 files changed

+172
-29
lines changed

src/client.ts

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ export class RDSClient extends Operator {
7979
'query',
8080
'getConnection',
8181
'end',
82+
'execute',
8283
].forEach(method => {
8384
this.#pool[method] = promisify(this.#pool[method]);
8485
});
@@ -113,30 +114,45 @@ export class RDSClient extends Operator {
113114
}
114115

115116
async query<T = any>(sql: string, values?: object | any[], options?: QueryOptions): Promise<T> {
116-
let conn: RDSConnection | RDSTransaction;
117-
let shouldReleaseConn = false;
118-
if (options?.conn) {
119-
conn = options.conn;
120-
} else {
121-
const ctx = this.#connectionStorage.getStore();
122-
const ctxConn = ctx?.[this.#connectionStorageKey];
123-
if (ctxConn) {
124-
conn = ctxConn;
125-
} else {
126-
conn = await this.getConnection();
127-
shouldReleaseConn = true;
128-
}
129-
}
117+
return this.#executeWithConnection('query', sql, values, options);
118+
}
119+
120+
async execute<T = any>(sql: string, values?: object | any[], options?: QueryOptions): Promise<T> {
121+
return this.#executeWithConnection('execute', sql, values, options);
122+
}
123+
124+
async #executeWithConnection<T = any>(
125+
method: 'query' | 'execute',
126+
sql: string,
127+
values?: object | any[],
128+
options?: QueryOptions,
129+
): Promise<T> {
130+
const { conn, shouldRelease } = await this.#getConnection(options);
130131

131132
try {
132-
return await conn.query(sql, values);
133+
return await conn[method](sql, values);
133134
} finally {
134-
if (shouldReleaseConn) {
135+
if (shouldRelease) {
135136
(conn as RDSConnection).release();
136137
}
137138
}
138139
}
139140

141+
async #getConnection(options?: QueryOptions): Promise<{ conn: RDSConnection | RDSTransaction; shouldRelease: boolean }> {
142+
if (options?.conn) {
143+
return { conn: options.conn, shouldRelease: false };
144+
}
145+
146+
const ctx = this.#connectionStorage.getStore();
147+
const ctxConn = ctx?.[this.#connectionStorageKey];
148+
if (ctxConn) {
149+
return { conn: ctxConn, shouldRelease: false };
150+
}
151+
152+
const conn = await this.getConnection();
153+
return { conn, shouldRelease: true };
154+
}
155+
140156
get pool() {
141157
return this.#pool;
142158
}

src/connection.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ export class RDSConnection extends Operator {
1616
if (!this.conn[kWrapToRDS]) {
1717
[
1818
'query',
19+
'execute',
1920
'beginTransaction',
2021
'commit',
2122
'rollback',
@@ -36,6 +37,10 @@ export class RDSConnection extends Operator {
3637
return await this.conn.query(sql, values);
3738
}
3839

40+
async _execute(sql: string, values?: object | any[]) {
41+
return await this.conn.execute(sql, values);
42+
}
43+
3944
async beginTransaction() {
4045
return await this.conn.beginTransaction();
4146
}

src/operator.ts

Lines changed: 36 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,30 @@ export abstract class Operator {
7474
}
7575

7676
async query<T = any>(sql: string, values?: object | any[]): Promise<T> {
77-
// query(sql, values)
77+
return this.#executeWithHooks(sql, values, 'query', this._query.bind(this));
78+
}
79+
80+
async queryOne(sql: string, values?: object | any[]) {
81+
const rows = await this.query(sql, values);
82+
return rows && rows[0] || null;
83+
}
84+
85+
// eslint-disable-next-line @typescript-eslint/no-unused-vars
86+
protected async _query(_sql: string, _values?: object | any[]): Promise<any> {
87+
throw new Error('SubClass must impl this');
88+
}
89+
90+
async execute<T = any>(sql: string, values?: object | any[]): Promise<T> {
91+
return this.#executeWithHooks(sql, values, 'execute', this._execute.bind(this));
92+
}
93+
94+
async #executeWithHooks<T = any>(
95+
sql: string,
96+
values: object | any[] | undefined,
97+
operation: string,
98+
executor: (sql: string, values?: object | any[]) => Promise<any>,
99+
): Promise<T> {
100+
// 处理前置钩子
78101
if (this.beforeQueryHandlers.length > 0) {
79102
for (const beforeQueryHandler of this.beforeQueryHandlers) {
80103
const newSql = beforeQueryHandler(sql, values);
@@ -83,30 +106,35 @@ export abstract class Operator {
83106
}
84107
}
85108
}
86-
debug('[connection#%s] query %o', this.threadId, sql);
109+
110+
debug('[connection#%s] %s %o', this.threadId, operation, sql);
87111
if (typeof this.logging === 'function') {
88112
this.logging(sql, { threadId: this.threadId });
89113
}
114+
90115
const queryStart = performance.now();
91116
let rows: any;
92117
let lastError: Error | undefined;
118+
93119
channels.queryStart.publish({
94120
sql,
95121
values,
96122
connection: this.#connection,
97123
} as QueryStartMessage);
124+
98125
try {
99-
rows = await this._query(sql, values);
126+
rows = await executor(sql, values);
127+
100128
if (Array.isArray(rows)) {
101-
debug('[connection#%s] query get %o rows', this.threadId, rows.length);
129+
debug('[connection#%s] %s get %o rows', this.threadId, operation, rows.length);
102130
} else {
103-
debug('[connection#%s] query result: %o', this.threadId, rows);
131+
debug('[connection#%s] %s result: %o', this.threadId, operation, rows);
104132
}
105133
return rows;
106134
} catch (err) {
107135
lastError = err;
108136
err.stack = `${err.stack}\n sql: ${sql}`;
109-
debug('[connection#%s] query error: %o', this.threadId, err);
137+
debug('[connection#%s] %s error: %o', this.threadId, operation, err);
110138
throw err;
111139
} finally {
112140
const duration = Math.floor((performance.now() - queryStart) * 1000) / 1000;
@@ -117,6 +145,7 @@ export abstract class Operator {
117145
duration,
118146
error: lastError,
119147
} as QueryEndMessage);
148+
120149
if (this.afterQueryHandlers.length > 0) {
121150
for (const afterQueryHandler of this.afterQueryHandlers) {
122151
afterQueryHandler(sql, rows, duration, lastError, values);
@@ -125,13 +154,8 @@ export abstract class Operator {
125154
}
126155
}
127156

128-
async queryOne(sql: string, values?: object | any[]) {
129-
const rows = await this.query(sql, values);
130-
return rows && rows[0] || null;
131-
}
132-
133157
// eslint-disable-next-line @typescript-eslint/no-unused-vars
134-
protected async _query(_sql: string, _values?: object | any[]): Promise<any> {
158+
protected async _execute(_sql: string, _values?: object | any[]): Promise<any> {
135159
throw new Error('SubClass must impl this');
136160
}
137161

src/transaction.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@ export class RDSTransaction extends Operator {
4141
return await this.conn!._query(sql, values);
4242
}
4343

44+
async _execute(sql: string, values?: object | any[]) {
45+
this.#check();
46+
return await this.conn!._execute(sql, values);
47+
}
48+
4449
#check() {
4550
if (!this.conn) {
4651
throw new Error('transaction was commit or rollback');

src/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ export interface RDSClientOptions extends PoolOptions {
1212
logging?: Logging;
1313
}
1414

15-
export interface PoolConnectionPromisify extends Omit<PoolConnection, 'query'> {
15+
export interface PoolConnectionPromisify extends Omit<PoolConnection, 'query' | 'execute'> {
1616
query(sql: string, values?: any | any[] | { [param: string]: any }): Promise<any>;
17+
execute(sql: string, values?: any | any[] | { [param: string]: any }): Promise<any>;
1718
beginTransaction(): Promise<void>;
1819
commit(): Promise<void>;
1920
rollback(): Promise<void>;

test/client.test.ts

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1617,4 +1617,96 @@ describe('test/client.test.ts', () => {
16171617
});
16181618

16191619
});
1620+
1621+
describe('execute()', () => {
1622+
it('should execute sql with parameters', async () => {
1623+
const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())',
1624+
[ prefix + 'execute-test', prefix + 'm@execute-test.com' ]);
1625+
assert.equal(result.affectedRows, 1);
1626+
assert(result.insertId > 0);
1627+
});
1628+
1629+
it('should execute select query', async () => {
1630+
await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())',
1631+
[ prefix + 'execute-select-test', prefix + 'm@execute-select-test.com' ]);
1632+
1633+
const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?',
1634+
[ prefix + 'm@execute-select-test.com' ]);
1635+
assert(Array.isArray(rows));
1636+
assert.equal(rows.length, 1);
1637+
assert.equal(rows[0].name, prefix + 'execute-select-test');
1638+
});
1639+
1640+
it('should execute in transaction', async () => {
1641+
const tran = await db.beginTransaction();
1642+
try {
1643+
const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())',
1644+
[ prefix + 'execute-transaction-test', prefix + 'm@execute-transaction-test.com' ],
1645+
{ conn: tran });
1646+
assert.equal(result.affectedRows, 1);
1647+
await tran.commit();
1648+
} catch (err) {
1649+
await tran.rollback();
1650+
throw err;
1651+
}
1652+
1653+
const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?',
1654+
[ prefix + 'm@execute-transaction-test.com' ]);
1655+
assert.equal(rows.length, 1);
1656+
});
1657+
1658+
it('should execute in transaction scope', async () => {
1659+
await db.beginTransactionScope(async tran => {
1660+
const result = await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())',
1661+
[ prefix + 'execute-scope-test', prefix + 'm@execute-scope-test.com' ],
1662+
{ conn: tran });
1663+
assert.equal(result.affectedRows, 1);
1664+
});
1665+
1666+
const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?',
1667+
[ prefix + 'm@execute-scope-test.com' ]);
1668+
assert.equal(rows.length, 1);
1669+
});
1670+
1671+
it('should execute with connection', async () => {
1672+
const conn = await db.getConnection();
1673+
try {
1674+
const result = await conn.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())',
1675+
[ prefix + 'execute-conn-test', prefix + 'm@execute-conn-test.com' ]);
1676+
assert.equal(result.affectedRows, 1);
1677+
} finally {
1678+
conn.release();
1679+
}
1680+
1681+
const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?',
1682+
[ prefix + 'm@execute-conn-test.com' ]);
1683+
assert.equal(rows.length, 1);
1684+
});
1685+
1686+
it('should execute update query', async () => {
1687+
await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())',
1688+
[ prefix + 'execute-update-test', prefix + 'm@execute-update-test.com' ]);
1689+
1690+
const result = await db.execute('UPDATE `myrds-test-user` SET email = ? WHERE name = ?',
1691+
[ prefix + 'm@execute-updated.com', prefix + 'execute-update-test' ]);
1692+
assert.equal(result.affectedRows, 1);
1693+
1694+
const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE email = ?',
1695+
[ prefix + 'm@execute-updated.com' ]);
1696+
assert.equal(rows.length, 1);
1697+
});
1698+
1699+
it('should execute delete query', async () => {
1700+
await db.execute('INSERT INTO `myrds-test-user` (name, email, gmt_create, gmt_modified) VALUES(?, ?, now(), now())',
1701+
[ prefix + 'execute-delete-test', prefix + 'm@execute-delete-test.com' ]);
1702+
1703+
const result = await db.execute('DELETE FROM `myrds-test-user` WHERE name = ?',
1704+
[ prefix + 'execute-delete-test' ]);
1705+
assert.equal(result.affectedRows, 1);
1706+
1707+
const rows = await db.execute('SELECT * FROM `myrds-test-user` WHERE name = ?',
1708+
[ prefix + 'execute-delete-test' ]);
1709+
assert.equal(rows.length, 0);
1710+
});
1711+
});
16201712
});

0 commit comments

Comments
 (0)