Skip to content

Commit e3afedd

Browse files
committed
fix(client): properly handle unhandled disconnect
# Conflicts: # commons/Queue.ts
1 parent 98da380 commit e3afedd

File tree

6 files changed

+88
-17
lines changed

6 files changed

+88
-17
lines changed

client/connections/ConnectionToCore.ts

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,15 @@ export default abstract class ConnectionToCore extends TypedEventEmitter<{
182182
}
183183

184184
public async createSession(options: ICreateSessionOptions): Promise<CoreSession> {
185-
const sessionMeta = await this.commandQueue.run<ISessionMeta>('Session.create', options);
186-
const session = new CoreSession({ ...sessionMeta, sessionName: options.sessionName }, this);
187-
this.coreSessions.track(session);
188-
return session;
185+
try {
186+
const sessionMeta = await this.commandQueue.run<ISessionMeta>('Session.create', options);
187+
const session = new CoreSession({ ...sessionMeta, sessionName: options.sessionName }, this);
188+
this.coreSessions.track(session);
189+
return session;
190+
} catch (error) {
191+
if (error instanceof DisconnectedFromCoreError && this.isDisconnecting) return null;
192+
throw error;
193+
}
189194
}
190195

191196
public getSession(sessionId: string): CoreSession {
@@ -294,8 +299,6 @@ export default abstract class ConnectionToCore extends TypedEventEmitter<{
294299

295300
protected cancelPendingRequests(): void {
296301
const host = String(this.resolvedHost);
297-
this.coreSessions.stop(new DisconnectedFromCoreError(host));
298-
this.commandQueue.stop(new DisconnectedFromCoreError(host));
299302
for (const entry of this.pendingRequestsById.values()) {
300303
const id = entry.id;
301304
if (this.connectRequestId === id || this.disconnectRequestId === id) {
@@ -304,6 +307,8 @@ export default abstract class ConnectionToCore extends TypedEventEmitter<{
304307
this.pendingRequestsById.delete(id);
305308
this.rejectPendingRequest(entry, new DisconnectedFromCoreError(host));
306309
}
310+
this.coreSessions.stop(new DisconnectedFromCoreError(host));
311+
this.commandQueue.stop(new DisconnectedFromCoreError(host));
307312
}
308313

309314
private createPendingResult(): IResolvablePromiseWithId {

client/lib/CoreCommandQueue.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ export default class CoreCommandQueue {
2929
}
3030

3131
public run<T>(command: string, ...args: any[]): Promise<T> {
32+
if (this.connection.isDisconnecting) {
33+
return Promise.resolve(null);
34+
}
3235
return this.internalQueue.run<T>(this.runRequest.bind(this, command, args)).catch(error => {
3336
error.stack += `${this.sessionMarker}`;
3437
throw error;

client/lib/Handler.ts

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import { createPromise, pickRandom } from '@secret-agent/commons/utils';
22
import ShutdownHandler from '@secret-agent/commons/ShutdownHandler';
3-
import Log from '@secret-agent/commons/Logger';
3+
import Log, { hasBeenLoggedSymbol } from '@secret-agent/commons/Logger';
4+
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
45
import IAgentCreateOptions from '../interfaces/IAgentCreateOptions';
56
import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';
67
import Agent from './Agent';
78
import ConnectionToCore from '../connections/ConnectionToCore';
89
import ConnectionFactory from '../connections/ConnectionFactory';
10+
import DisconnectedFromCoreError from '../connections/DisconnectedFromCoreError';
911

1012
type SettledDispatchesBySessionId = { [sessionId: string]: { args: any; error?: Error } };
1113
type PendingDispatch = { resolution: Promise<Error | void>; sessionId?: string; args: any };
@@ -118,10 +120,15 @@ export default class Handler {
118120
const dispatches = [...this.dispatches];
119121
// clear out dispatches everytime you check it
120122
this.dispatches.length = 0;
123+
const startStack = new Error('').stack.split(/\r?\n/).slice(1).join('\n');
121124
await Promise.all(
122125
dispatches.map(async dispatch => {
123126
const err = await dispatch.resolution;
124-
if (err) throw err;
127+
if (err) {
128+
const marker = `------WAIT FOR ALL DISPATCHES`.padEnd(50, '-');
129+
err.stack += `\n${marker}\n${startStack}`;
130+
throw err;
131+
}
125132
}),
126133
);
127134
// keep going if there are new things queued
@@ -166,12 +173,25 @@ export default class Handler {
166173
}
167174

168175
private async logUnhandledError(error: Error): Promise<void> {
176+
if (error instanceof DisconnectedFromCoreError) return;
177+
if (!error || error[hasBeenLoggedSymbol]) return;
169178
// if error and there are remote connections, log error here
170-
if (error && this.connections.some(x => !!x.options.host)) {
171-
log.error('UnhandledRejection', { error, sessionId: null });
179+
if (this.connections.some(x => !!x.options.host)) {
180+
log.error('UnhandledRejection (Client)', { error, sessionId: null });
172181
}
173182
// eslint-disable-next-line promise/no-promise-in-callback
174-
await Promise.all(this.connections.map(x => x.logUnhandledError(error)));
183+
await Promise.all(
184+
this.connections.map(x => {
185+
return x.logUnhandledError(error).catch(logError => {
186+
if (logError instanceof CanceledPromiseError) return;
187+
log.error('UnhandledRejection.CouldNotSendToCore', {
188+
error: logError,
189+
connectionHost: x.hostOrError,
190+
sessionId: null,
191+
});
192+
});
193+
}),
194+
);
175195
}
176196

177197
private onDisconnected(connection: ConnectionToCore): void {

commons/Queue.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import IResolvablePromise from '@secret-agent/core-interfaces/IResolvablePromise
22
import { createPromise } from './utils';
33
import { CanceledPromiseError } from './interfaces/IPendingWaitEvent';
44
import Resolvable from './Resolvable';
5+
import getPrototypeOf = Reflect.getPrototypeOf;
56

67
type AsyncCallback<T> = (value?: any) => Promise<T>;
78

@@ -24,7 +25,7 @@ export default class Queue {
2425
this.queue.push({
2526
promise,
2627
cb,
27-
startStack: new Error('').stack.split('\n').slice(1).join('\n'),
28+
startStack: new Error('').stack.split(/\r?\n/).slice(1).join('\n'),
2829
});
2930

3031
this.next().catch(() => null);
@@ -68,7 +69,9 @@ export default class Queue {
6869
this.activeCount += 1;
6970
try {
7071
const res = await Promise.race([next.cb(), this.abortPromise.promise]);
71-
if (this.abortPromise.isResolved) throw await this.abortPromise.promise;
72+
if (this.abortPromise.isResolved) {
73+
return this.reject(next, await this.abortPromise.promise);
74+
}
7275

7376
next.promise.resolve(res);
7477
} catch (error) {
@@ -77,13 +80,16 @@ export default class Queue {
7780
this.activeCount -= 1;
7881
}
7982

80-
setImmediate(() => this.next());
83+
setImmediate(() => this.next().catch(() => null));
8184
}
8285

83-
private reject(entry: IQueueEntry, error: Error): void {
84-
const marker = `------${this.stacktraceMarker}`.padEnd(50, '-');
86+
private reject(entry: IQueueEntry, sourceError: Error): void {
87+
const error = <Error>Object.create(getPrototypeOf(sourceError));
88+
error.message = sourceError.message;
89+
Object.assign(error, sourceError);
8590

86-
error.stack = `${error.stack}\n${marker}\n${entry.startStack}`;
91+
const marker = `------${this.stacktraceMarker}`.padEnd(50, '-');
92+
error.stack = `${sourceError.stack}\n${marker}\n${entry.startStack}`;
8793
entry.promise.reject(error);
8894
}
8995
}

core/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,7 @@ export default class Core {
124124
}
125125

126126
public static logUnhandledError(clientError: Error, fatalError = false): void {
127+
if (!clientError || clientError[hasBeenLoggedSymbol]) return;
127128
if (fatalError) {
128129
log.error('UnhandledError(fatal)', { clientError, sessionId: null });
129130
} else if (!clientError[hasBeenLoggedSymbol]) {

full-client/test/handler.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,42 @@ describe('connectionToCore', () => {
224224
await expect(handler.waitForAllDispatches()).rejects.toThrowError(DisconnectedFromCoreError);
225225
});
226226

227+
it('handles disconnects from client', async () => {
228+
const coreHost = await CoreProcess.spawn({});
229+
Helpers.onClose(() => CoreProcess.kill('SIGINT'));
230+
const connection = new RemoteConnectionToCore({
231+
maxConcurrency: 2,
232+
host: coreHost,
233+
});
234+
await connection.connect();
235+
236+
const handler = new Handler(connection);
237+
Helpers.needsClosing.push(handler);
238+
239+
const waitForGoto = createPromise();
240+
const dispatchErrorPromise = createPromise<Error>();
241+
handler.dispatchAgent(async agent => {
242+
try {
243+
await agent.goto(koaServer.baseUrl);
244+
const promise = agent.waitForMillis(10e3);
245+
await new Promise(resolve => setTimeout(resolve, 50));
246+
waitForGoto.resolve();
247+
await promise;
248+
} catch (error) {
249+
dispatchErrorPromise.resolve(error);
250+
throw error;
251+
}
252+
});
253+
await waitForGoto.promise;
254+
await connection.disconnect();
255+
await new Promise(setImmediate);
256+
await expect(dispatchErrorPromise).resolves.toBeTruthy();
257+
const dispatchError = await dispatchErrorPromise;
258+
expect(dispatchError).toBeInstanceOf(DisconnectedFromCoreError);
259+
expect((dispatchError as DisconnectedFromCoreError).coreHost).toBe(coreHost);
260+
await expect(handler.waitForAllDispatches()).rejects.toThrowError(DisconnectedFromCoreError);
261+
});
262+
227263
it('handles core server ending websocket (econnreset)', async () => {
228264
const coreHost = await Core.server.address;
229265
// @ts-ignore

0 commit comments

Comments
 (0)