Skip to content

Commit aa53e85

Browse files
committed
fix(core): full close down of on premature exit
1 parent 382979d commit aa53e85

31 files changed

+509
-216
lines changed

client/connections/ConnectionToCore.ts

Lines changed: 81 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import IResolvablePromise from '@secret-agent/core-interfaces/IResolvablePromise
66
import Log from '@secret-agent/commons/Logger';
77
import ICreateSessionOptions from '@secret-agent/core-interfaces/ICreateSessionOptions';
88
import ISessionMeta from '@secret-agent/core-interfaces/ISessionMeta';
9+
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
910
import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';
1011
import CoreCommandQueue from '../lib/CoreCommandQueue';
1112
import CoreSession from '../lib/CoreSession';
@@ -17,11 +18,11 @@ const { log } = Log(module);
1718

1819
export default abstract class ConnectionToCore {
1920
public readonly commandQueue: CoreCommandQueue;
21+
public readonly hostOrError: Promise<string | Error>;
2022
public options: IConnectionToCoreOptions;
2123

22-
public hostOrError: Promise<string | Error>;
23-
2424
private connectPromise: Promise<Error | null>;
25+
private isClosing = false;
2526

2627
private coreSessions: CoreSessions;
2728
private readonly pendingRequestsById = new Map<string, IResolvablePromiseWithId>();
@@ -34,31 +35,57 @@ export default abstract class ConnectionToCore {
3435
this.options.maxConcurrency,
3536
this.options.agentTimeoutMillis,
3637
);
38+
39+
if (this.options.host) {
40+
this.hostOrError = Promise.resolve(this.options.host)
41+
.then(x => {
42+
if (!x.includes('://')) {
43+
return `ws://${x}`;
44+
}
45+
return x;
46+
})
47+
.catch(err => err);
48+
} else {
49+
this.hostOrError = Promise.resolve(new Error('No host provided'));
50+
}
3751
}
3852

39-
protected abstract internalSendRequest(payload: ICoreRequestPayload): void | Promise<void>;
53+
protected abstract internalSendRequest(payload: ICoreRequestPayload): Promise<void>;
54+
protected abstract createConnection(): Promise<Error | null>;
55+
protected abstract destroyConnection(): Promise<any>;
4056

4157
public connect(): Promise<Error | null> {
42-
if (this.connectPromise) return this.connectPromise;
43-
44-
const { promise, id } = this.createPendingResult();
45-
this.connectPromise = promise.then(x => this.onConnected(x.data)).catch(err => err);
46-
this.internalSendRequest({
47-
command: 'connect',
48-
args: [this.options],
49-
messageId: id,
50-
});
58+
this.connectPromise ??= this.createConnection()
59+
.then(err => {
60+
if (err) throw err;
61+
return this.internalSendRequestAndWait({
62+
command: 'connect',
63+
args: [this.options],
64+
});
65+
})
66+
.then(result => this.onConnected(result.data))
67+
.catch(err => err);
5168

5269
return this.connectPromise;
5370
}
5471

5572
public async disconnect(fatalError?: Error): Promise<void> {
56-
this.commandQueue.clearPending();
57-
this.coreSessions.close();
58-
if (this.connectPromise || fatalError) {
59-
await this.commandQueue.run('disconnect', [fatalError]);
60-
this.connectPromise = null;
73+
if (this.isClosing) return;
74+
this.isClosing = true;
75+
const logid = log.stats('ConnectionToCore.Disconnecting');
76+
77+
this.cancelPendingRequests();
78+
if (this.connectPromise) {
79+
await this.internalSendRequestAndWait({
80+
command: 'disconnect',
81+
args: [fatalError],
82+
});
6183
}
84+
await this.destroyConnection();
85+
log.stats('RemoteConnectionToCore.Disconnected', {
86+
parentLogId: logid,
87+
sessionId: null,
88+
});
6289
}
6390

6491
/////// PIPE FUNCTIONS /////////////////////////////////////////////////////////////////////////////////////////////
@@ -69,12 +96,7 @@ export default abstract class ConnectionToCore {
6996
const result = await this.connect();
7097
if (result) throw result;
7198

72-
const { promise, id } = this.createPendingResult();
73-
await this.internalSendRequest({
74-
messageId: id,
75-
...payload,
76-
});
77-
return promise;
99+
return this.internalSendRequestAndWait(payload);
78100
}
79101

80102
public onMessage(payload: ICoreResponsePayload | ICoreEventPayload): void {
@@ -122,6 +144,25 @@ export default abstract class ConnectionToCore {
122144
await this.commandQueue.run('logUnhandledError', error);
123145
}
124146

147+
protected async internalSendRequestAndWait(
148+
payload: Omit<ICoreRequestPayload, 'messageId'>,
149+
): Promise<ICoreResponsePayload> {
150+
const { promise, id } = this.createPendingResult();
151+
try {
152+
await this.internalSendRequest({
153+
messageId: id,
154+
...payload,
155+
});
156+
} catch (error) {
157+
if (error instanceof CanceledPromiseError) {
158+
this.pendingRequestsById.delete(id);
159+
return;
160+
}
161+
throw error;
162+
}
163+
return promise;
164+
}
165+
125166
protected onEvent(payload: ICoreEventPayload): void {
126167
const { meta, listenerId, eventArgs } = payload as ICoreEventPayload;
127168
const session = this.getSession(meta.sessionId);
@@ -136,13 +177,22 @@ export default abstract class ConnectionToCore {
136177
if (message.isError) {
137178
const error = new Error(message.data?.message);
138179
Object.assign(error, message.data);
139-
error.stack += `\n${'------CONNECTION'.padEnd(50, '-')}\n${pending.stack}`;
140-
pending.reject(error);
180+
this.rejectPendingRequest(pending, error);
141181
} else {
142182
pending.resolve({ data: message.data, commandId: message.commandId });
143183
}
144184
}
145185

186+
protected cancelPendingRequests(): void {
187+
this.commandQueue.clearPending();
188+
this.coreSessions.close();
189+
const pending = [...this.pendingRequestsById.values()];
190+
this.pendingRequestsById.clear();
191+
for (const entry of pending) {
192+
this.rejectPendingRequest(entry, new CanceledPromiseError('Disconnecting from Core'));
193+
}
194+
}
195+
146196
private createPendingResult(): IResolvablePromiseWithId {
147197
const resolvablePromise = createPromise<ICoreResponsePayload>() as IResolvablePromiseWithId;
148198
this.lastId += 1;
@@ -152,9 +202,15 @@ export default abstract class ConnectionToCore {
152202
return this.pendingRequestsById.get(id);
153203
}
154204

205+
private rejectPendingRequest(pending: IResolvablePromiseWithId, error: Error): void {
206+
error.stack += `\n${'------CONNECTION'.padEnd(50, '-')}\n${pending.stack}`;
207+
pending.reject(error);
208+
}
209+
155210
private onConnected(
156211
connectionParams: { maxConcurrency?: number; browserEmulatorIds?: string[] } = {},
157212
): void {
213+
this.isClosing = false;
158214
const { maxConcurrency, browserEmulatorIds } = connectionParams;
159215
if (!this.options.maxConcurrency || maxConcurrency < this.options.maxConcurrency) {
160216
log.info('Overriding max concurrency with Core value', {
Lines changed: 60 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,76 +1,93 @@
11
import ICoreRequestPayload from '@secret-agent/core-interfaces/ICoreRequestPayload';
22
import WebSocket from 'ws';
33
import TypeSerializer from '@secret-agent/commons/TypeSerializer';
4+
import { createPromise } from '@secret-agent/commons/utils';
5+
import IResolvablePromise from '@secret-agent/core-interfaces/IResolvablePromise';
6+
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
47
import ConnectionToCore from './ConnectionToCore';
58
import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';
69

710
export default class RemoteConnectionToCore extends ConnectionToCore {
8-
private wsConnectPromise: Promise<any>;
9-
private webSocket: WebSocket;
11+
private webSocketOrError: IResolvablePromise<WebSocket | Error>;
1012

1113
constructor(options: IConnectionToCoreOptions) {
14+
if (!options.host) throw new Error('A remote connection to core needs a host parameter!');
1215
super(options);
13-
const host = options.host;
14-
if (!host) throw new Error('A remote connection to core needs a host parameter!');
15-
16-
this.hostOrError = Promise.resolve(host)
17-
.then(x => {
18-
if (!x.includes('://')) {
19-
return `ws://${x}`;
20-
}
21-
return x;
22-
})
23-
.catch(err => err);
16+
this.disconnect = this.disconnect.bind(this);
2417
}
2518

26-
public internalSendRequest(payload: ICoreRequestPayload): Promise<void> {
19+
public async internalSendRequest(payload: ICoreRequestPayload): Promise<void> {
20+
if (!this.webSocketOrError) return;
2721
const message = TypeSerializer.stringify(payload);
22+
23+
const webSocket = await this.getWebsocket();
24+
25+
if (webSocket?.readyState !== WebSocket.OPEN) {
26+
throw new CanceledPromiseError('Websocket was not open');
27+
}
28+
2829
return new Promise((resolve, reject) =>
29-
this.webSocket.send(message, err => {
30+
webSocket.send(message, err => {
3031
if (err) reject(err);
3132
else resolve();
3233
}),
3334
);
3435
}
3536

36-
public async disconnect(): Promise<void> {
37-
if (this.wsConnectPromise && this.webSocket && this.webSocket.readyState === WebSocket.OPEN) {
38-
this.wsConnectPromise = null;
39-
await super.disconnect();
37+
protected async destroyConnection(): Promise<any> {
38+
const webSocket = await this.getWebsocket(false);
39+
if (webSocket?.readyState === WebSocket.OPEN) {
4040
try {
41-
this.webSocket.terminate();
41+
webSocket.off('close', this.disconnect);
42+
webSocket.terminate();
4243
} catch (_) {
4344
// ignore errors terminating
4445
}
4546
}
4647
}
4748

48-
public connect(): Promise<Error | null> {
49-
if (!this.wsConnectPromise) {
50-
this.wsConnectPromise = this.wsConnect().catch(err => err);
51-
}
49+
protected async createConnection(): Promise<Error | null> {
50+
// do this first to see if we can resolve the host
51+
const hostOrError = await this.hostOrError;
52+
if (hostOrError instanceof Error) return hostOrError;
5253

53-
return this.wsConnectPromise;
54+
if (!this.webSocketOrError) {
55+
this.webSocketOrError = connectToWebsocketHost(hostOrError);
56+
try {
57+
const webSocket = await this.getWebsocket();
58+
webSocket.once('close', this.disconnect);
59+
webSocket.on('message', message => {
60+
const payload = TypeSerializer.parse(message.toString());
61+
this.onMessage(payload);
62+
});
63+
} catch (error) {
64+
return error;
65+
}
66+
}
5467
}
5568

56-
private async wsConnect(): Promise<void> {
57-
const hostOrError = await this.hostOrError;
58-
if (hostOrError instanceof Error) throw hostOrError;
59-
60-
this.webSocket = new WebSocket(hostOrError);
61-
await new Promise<void>((resolve, reject) => {
62-
this.webSocket.on('error', reject);
63-
this.webSocket.once('open', () => {
64-
this.webSocket.off('error', reject);
65-
resolve();
66-
});
67-
});
68-
this.webSocket.once('close', this.disconnect.bind(this));
69-
this.webSocket.on('message', message => {
70-
const payload = TypeSerializer.parse(message.toString());
71-
this.onMessage(payload);
72-
});
69+
private async getWebsocket(throwIfError = true): Promise<WebSocket> {
70+
if (!this.webSocketOrError) return null;
71+
const webSocketOrError = await this.webSocketOrError.promise;
72+
if (webSocketOrError instanceof Error) {
73+
if (throwIfError) throw webSocketOrError;
74+
return null;
75+
}
76+
return webSocketOrError;
77+
}
78+
}
7379

74-
await super.connect();
80+
function connectToWebsocketHost(host: string): IResolvablePromise<WebSocket | Error> {
81+
const resolvable = createPromise<WebSocket | Error>(30e3);
82+
const webSocket = new WebSocket(host);
83+
function onError(error: Error): void {
84+
if (error instanceof Error) resolvable.resolve(error);
85+
else resolvable.resolve(new Error(`Error connecting to Websocket host -> ${error}`));
7586
}
87+
webSocket.once('error', onError);
88+
webSocket.once('open', () => {
89+
webSocket.off('error', onError);
90+
resolvable.resolve(webSocket);
91+
});
92+
return resolvable;
7693
}

client/lib/Agent.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import Request from 'awaited-dom/impl/official-klasses/Request';
1717
import IWaitForOptions from '@secret-agent/core-interfaces/IWaitForOptions';
1818
import { IElementIsolate } from 'awaited-dom/base/interfaces/isolate';
1919
import CSSStyleDeclaration from 'awaited-dom/impl/official-klasses/CSSStyleDeclaration';
20-
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
2120
import IAgentMeta from '@secret-agent/core-interfaces/IAgentMeta';
2221
import IScreenshotOptions from '@secret-agent/core-interfaces/IScreenshotOptions';
2322
import WebsocketResource from './WebsocketResource';
@@ -380,10 +379,7 @@ class SessionConnection {
380379
connectionToCore ?? { isPersistent: false },
381380
);
382381

383-
this._coreSession = connection.createSession(options).catch(err => {
384-
if (err instanceof CanceledPromiseError) return null;
385-
return err;
386-
});
382+
this._coreSession = connection.createSession(options).catch(err => err);
387383

388384
const defaultShowReplay = Boolean(JSON.parse(process.env.SA_SHOW_REPLAY ?? 'true'));
389385

client/lib/CoreCommandQueue.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { createPromise } from '@secret-agent/commons/utils';
22
import ISessionMeta from '@secret-agent/core-interfaces/ISessionMeta';
33
import Log from '@secret-agent/commons/Logger';
4+
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
45
import ConnectionToCore from '../connections/ConnectionToCore';
56

67
const { log } = Log(module);
@@ -49,7 +50,12 @@ export default class CoreCommandQueue {
4950
}
5051

5152
public clearPending(): void {
52-
this.items.length = 0;
53+
while (this.items.length) {
54+
const next = this.items.shift();
55+
const cancel = new CanceledPromiseError(`Canceling pending ${next.command} command`);
56+
cancel.stack = next.stack;
57+
next.reject();
58+
}
5359
}
5460

5561
// PRIVATE

client/lib/Handler.ts

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { createPromise, pickRandom } from '@secret-agent/commons/utils';
2+
import ShutdownHandler from '@secret-agent/commons/ShutdownHandler';
23
import IAgentCreateOptions from '../interfaces/IAgentCreateOptions';
34
import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';
45
import Agent from './Agent';
56
import ConnectionToCore from '../connections/ConnectionToCore';
67
import ConnectionFactory from '../connections/ConnectionFactory';
7-
import Signals = NodeJS.Signals;
88

99
export default class Handler {
1010
public defaultAgentOptions: IAgentCreateOptions = {};
@@ -21,7 +21,7 @@ export default class Handler {
2121
this.connections.push(connection);
2222
}
2323

24-
this.registerShutdownHandlers();
24+
ShutdownHandler.register(() => this.close());
2525
this.registerUnhandledExceptionHandlers();
2626
}
2727

@@ -93,13 +93,7 @@ export default class Handler {
9393
await Promise.all(this.connections.map(x => x.disconnect(error)));
9494
}
9595

96-
public registerShutdownHandlers(): void {
97-
for (const signal of ['exit', 'SIGTERM', 'SIGINT', 'SIGQUIT']) {
98-
process.once(signal as Signals, this.close.bind(this));
99-
}
100-
}
101-
102-
public registerUnhandledExceptionHandlers(): void {
96+
private registerUnhandledExceptionHandlers(): void {
10397
process.on('uncaughtExceptionMonitor', this.close.bind(this));
10498
process.on('unhandledRejection', this.logUnhandledError.bind(this));
10599
}

0 commit comments

Comments
 (0)