Skip to content

Commit f413ea8

Browse files
committed
fix(client): fix close handling
feat(client): distribute handler agents to available connections
1 parent f1fbe4d commit f413ea8

File tree

11 files changed

+229
-122
lines changed

11 files changed

+229
-122
lines changed

client/connections/ConnectionToCore.ts

Lines changed: 128 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import ICoreRequestPayload from '@secret-agent/core-interfaces/ICoreRequestPayload';
22
import ICoreEventPayload from '@secret-agent/core-interfaces/ICoreEventPayload';
33
import ICoreResponsePayload from '@secret-agent/core-interfaces/ICoreResponsePayload';
4-
import { createPromise } from '@secret-agent/commons/utils';
4+
import { bindFunctions, createPromise } from '@secret-agent/commons/utils';
55
import IResolvablePromise from '@secret-agent/core-interfaces/IResolvablePromise';
66
import Log from '@secret-agent/commons/Logger';
77
import ICreateSessionOptions from '@secret-agent/core-interfaces/ICreateSessionOptions';
@@ -10,6 +10,7 @@ import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingW
1010
import ICoreConfigureOptions from '@secret-agent/core-interfaces/ICoreConfigureOptions';
1111
import { TypedEventEmitter } from '@secret-agent/commons/eventUtils';
1212
import SessionClosedOrMissingError from '@secret-agent/commons/SessionClosedOrMissingError';
13+
import Resolvable from '@secret-agent/commons/Resolvable';
1314
import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';
1415
import CoreCommandQueue from '../lib/CoreCommandQueue';
1516
import CoreSession from '../lib/CoreSession';
@@ -27,11 +28,23 @@ export default abstract class ConnectionToCore extends TypedEventEmitter<{
2728
public readonly commandQueue: CoreCommandQueue;
2829
public readonly hostOrError: Promise<string | Error>;
2930
public options: IConnectionToCoreOptions;
31+
public isDisconnecting = false;
3032

31-
private connectPromise: Promise<Error | null>;
32-
private isClosing = false;
33-
private resolvedHost: string;
33+
protected resolvedHost: string;
3434

35+
private connectPromise: IResolvablePromise<Error | null>;
36+
private get connectOptions(): ICoreConfigureOptions & { isPersistent: boolean } {
37+
return {
38+
coreServerPort: this.options.coreServerPort,
39+
browserEmulatorIds: this.options.browserEmulatorIds,
40+
localProxyPortStart: this.options.localProxyPortStart,
41+
sessionsDir: this.options.sessionsDir,
42+
isPersistent: this.options.isPersistent,
43+
};
44+
}
45+
46+
private connectRequestId: string;
47+
private disconnectRequestId: string;
3548
private coreSessions: CoreSessions;
3649
private readonly pendingRequestsById = new Map<string, IResolvablePromiseWithId>();
3750
private lastId = 0;
@@ -61,61 +74,68 @@ export default abstract class ConnectionToCore extends TypedEventEmitter<{
6174
} else {
6275
this.hostOrError = Promise.resolve(new Error('No host provided'));
6376
}
64-
this.disconnect = this.disconnect.bind(this);
77+
bindFunctions(this);
6578
}
6679

6780
protected abstract internalSendRequest(payload: ICoreRequestPayload): Promise<void>;
6881
protected abstract createConnection(): Promise<Error | null>;
6982
protected abstract destroyConnection(): Promise<any>;
7083

71-
public connect(): Promise<Error | null> {
72-
this.connectPromise ??= this.createConnection()
73-
.then(err => {
74-
if (err) throw err;
75-
return this.internalSendRequestAndWait({
84+
public async connect(): Promise<Error | null> {
85+
if (!this.connectPromise) {
86+
this.connectPromise = new Resolvable();
87+
try {
88+
const connectError = await this.createConnection();
89+
if (connectError) throw connectError;
90+
if (this.isDisconnecting) throw new DisconnectedFromCoreError(this.resolvedHost);
91+
// can be resolved if canceled by a disconnect
92+
if (this.connectPromise.isResolved) return;
93+
94+
const connectResult = await this.internalSendRequestAndWait({
7695
command: 'connect',
77-
args: [
78-
<ICoreConfigureOptions & { isPersistent: boolean }>{
79-
coreServerPort: this.options.coreServerPort,
80-
browserEmulatorIds: this.options.browserEmulatorIds,
81-
localProxyPortStart: this.options.localProxyPortStart,
82-
sessionsDir: this.options.sessionsDir,
83-
isPersistent: this.options.isPersistent,
84-
},
85-
],
96+
args: [this.connectOptions],
8697
});
87-
})
88-
.then(result => this.onConnected(result.data))
89-
.catch(err => err);
98+
if (connectResult?.data) {
99+
const { maxConcurrency, browserEmulatorIds } = connectResult.data;
100+
if (
101+
maxConcurrency &&
102+
(!this.options.maxConcurrency || maxConcurrency < this.options.maxConcurrency)
103+
) {
104+
log.info('Overriding max concurrency with Core value', {
105+
maxConcurrency,
106+
sessionId: null,
107+
});
108+
this.coreSessions.concurrency = maxConcurrency;
109+
this.options.maxConcurrency = maxConcurrency;
110+
}
111+
this.options.browserEmulatorIds ??= browserEmulatorIds ?? [];
112+
}
113+
this.emit('connected');
114+
} catch (err) {
115+
this.connectPromise.resolve(err);
116+
} finally {
117+
if (!this.connectPromise.isResolved) this.connectPromise.resolve();
118+
}
119+
}
90120

91-
return this.connectPromise;
121+
return this.connectPromise.promise;
92122
}
93123

94124
public async disconnect(fatalError?: Error): Promise<void> {
95-
if (this.isClosing) return;
96-
this.isClosing = true;
97-
const logid = log.stats('ConnectionToCore.Disconnecting', {
98-
host: this.hostOrError,
99-
sessionId: null,
100-
});
101-
102-
await this.cancelPendingRequests();
103-
if (this.connectPromise) {
104-
await this.internalSendRequestAndWait(
105-
{
106-
command: 'disconnect',
107-
args: [fatalError],
108-
},
109-
2e3,
110-
);
111-
}
112-
await this.destroyConnection();
113-
log.stats('ConnectionToCore.Disconnected', {
114-
parentLogId: logid,
115-
host: this.hostOrError,
116-
sessionId: null,
125+
// user triggered disconnect sends a disconnect to Core
126+
await this.internalDisconnect(fatalError, async () => {
127+
try {
128+
await this.internalSendRequestAndWait(
129+
{
130+
command: 'disconnect',
131+
args: [fatalError],
132+
},
133+
2e3,
134+
);
135+
} catch (error) {
136+
// don't do anything
137+
}
117138
});
118-
this.emit('disconnected');
119139
}
120140

121141
/////// PIPE FUNCTIONS /////////////////////////////////////////////////////////////////////////////////////////////
@@ -141,20 +161,26 @@ export default abstract class ConnectionToCore extends TypedEventEmitter<{
141161
}
142162
/////// SESSION FUNCTIONS //////////////////////////////////////////////////////////////////////////////////////////
143163

144-
public async useAgent(
164+
public useAgent(
145165
options: IAgentCreateOptions,
146166
callbackFn: (agent: Agent) => Promise<any>,
147167
): Promise<void> {
148-
await this.connect();
149-
await this.coreSessions.waitForAvailable(() => {
168+
// just kick off
169+
this.connect().catch(() => null);
170+
return this.coreSessions.waitForAvailable(() => {
150171
const agent = new Agent({
151172
...options,
152173
connectionToCore: this,
153174
});
175+
154176
return callbackFn(agent);
155177
});
156178
}
157179

180+
public canCreateSessionNow(): boolean {
181+
return this.isDisconnecting === false && this.coreSessions.hasAvailability();
182+
}
183+
158184
public async createSession(options: ICreateSessionOptions): Promise<CoreSession> {
159185
const sessionMeta = await this.commandQueue.run<ISessionMeta>('createSession', options);
160186
const session = new CoreSession({ ...sessionMeta, sessionName: options.sessionName }, this);
@@ -174,11 +200,45 @@ export default abstract class ConnectionToCore extends TypedEventEmitter<{
174200
await this.commandQueue.run('logUnhandledError', error);
175201
}
176202

203+
protected async internalDisconnect(
204+
fatalError?: Error,
205+
beforeClose?: () => Promise<any>,
206+
): Promise<void> {
207+
if (this.isDisconnecting) return;
208+
this.isDisconnecting = true;
209+
const logid = log.stats('ConnectionToCore.Disconnecting', {
210+
host: this.hostOrError,
211+
sessionId: null,
212+
});
213+
214+
this.cancelPendingRequests();
215+
216+
if (this.connectPromise) {
217+
if (!this.connectPromise.isResolved) {
218+
this.connectPromise.resolve(new DisconnectedFromCoreError(this.resolvedHost));
219+
} else if (beforeClose) {
220+
await beforeClose();
221+
}
222+
}
223+
await this.destroyConnection();
224+
log.stats('ConnectionToCore.Disconnected', {
225+
parentLogId: logid,
226+
host: this.hostOrError,
227+
sessionId: null,
228+
});
229+
230+
this.emit('disconnected');
231+
}
232+
177233
protected async internalSendRequestAndWait(
178234
payload: Omit<ICoreRequestPayload, 'messageId'>,
179235
timeoutMs?: number,
180236
): Promise<ICoreResponsePayload> {
181237
const { promise, id, resolve } = this.createPendingResult();
238+
const { command } = payload;
239+
240+
if (command === 'connect') this.connectRequestId = id;
241+
if (command === 'disconnect') this.disconnectRequestId = id;
182242

183243
let timeout: NodeJS.Timeout;
184244
if (timeoutMs) timeout = setTimeout(() => resolve(null), timeoutMs).unref();
@@ -214,25 +274,34 @@ export default abstract class ConnectionToCore extends TypedEventEmitter<{
214274
const pending = this.pendingRequestsById.get(id);
215275
if (!pending) return;
216276
this.pendingRequestsById.delete(id);
277+
const isInternalRequest = this.connectRequestId === id || this.disconnectRequestId === id;
217278

218279
if (message.data instanceof Error) {
219-
let error = message.data;
220-
if (this.isClosing || error.name === SessionClosedOrMissingError.name) {
221-
error = new DisconnectedFromCoreError(this.resolvedHost);
280+
let responseError = message.data;
281+
const isDisconnected =
282+
this.isDisconnecting ||
283+
responseError.name === SessionClosedOrMissingError.name ||
284+
(responseError as any).isDisconnecting === true;
285+
286+
if (!isInternalRequest && isDisconnected) {
287+
responseError = new DisconnectedFromCoreError(this.resolvedHost);
222288
}
223-
this.rejectPendingRequest(pending, error);
289+
this.rejectPendingRequest(pending, responseError);
224290
} else {
225291
pending.resolve({ data: message.data, commandId: message.commandId });
226292
}
227293
}
228294

229-
protected async cancelPendingRequests(): Promise<void> {
230-
this.commandQueue.clearPending();
231-
const host = String(await this.hostOrError);
295+
protected cancelPendingRequests(): void {
296+
const host = String(this.resolvedHost);
232297
this.coreSessions.close(new DisconnectedFromCoreError(host));
233-
const pending = [...this.pendingRequestsById.values()];
234-
this.pendingRequestsById.clear();
235-
for (const entry of pending) {
298+
this.commandQueue.clearPending(new DisconnectedFromCoreError(host));
299+
for (const entry of this.pendingRequestsById.values()) {
300+
const id = entry.id;
301+
if (this.connectRequestId === id || this.disconnectRequestId === id) {
302+
continue;
303+
}
304+
this.pendingRequestsById.delete(id);
236305
this.rejectPendingRequest(entry, new DisconnectedFromCoreError(host));
237306
}
238307
}
@@ -250,23 +319,6 @@ export default abstract class ConnectionToCore extends TypedEventEmitter<{
250319
error.stack += `\n${'------CONNECTION'.padEnd(50, '-')}\n${pending.stack}`;
251320
pending.reject(error);
252321
}
253-
254-
private onConnected(
255-
connectionParams: { maxConcurrency?: number; browserEmulatorIds?: string[] } = {},
256-
): void {
257-
this.isClosing = false;
258-
const { maxConcurrency, browserEmulatorIds } = connectionParams;
259-
if (!this.options.maxConcurrency || maxConcurrency < this.options.maxConcurrency) {
260-
log.info('Overriding max concurrency with Core value', {
261-
maxConcurrency,
262-
sessionId: null,
263-
});
264-
this.coreSessions.concurrency = maxConcurrency;
265-
this.options.maxConcurrency = maxConcurrency;
266-
}
267-
this.options.browserEmulatorIds ??= browserEmulatorIds ?? [];
268-
this.emit('connected');
269-
}
270322
}
271323

272324
interface IResolvablePromiseWithId extends IResolvablePromise<ICoreResponsePayload> {

client/connections/DisconnectedFromCoreError.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,5 +4,6 @@ export default class DisconnectedFromCoreError extends CanceledPromiseError {
44
public code = 'DisconnectedFromCore';
55
constructor(readonly coreHost: string) {
66
super(`This Agent has been disconnected from Core (coreHost: ${coreHost})`);
7+
this.name = 'DisconnectedFromCore';
78
}
89
}

client/connections/RemoteConnectionToCore.ts

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import IResolvablePromise from '@secret-agent/core-interfaces/IResolvablePromise
66
import { CanceledPromiseError } from '@secret-agent/commons/interfaces/IPendingWaitEvent';
77
import ConnectionToCore from './ConnectionToCore';
88
import IConnectionToCoreOptions from '../interfaces/IConnectionToCoreOptions';
9+
import DisconnectedFromCoreError from './DisconnectedFromCoreError';
910

1011
export default class RemoteConnectionToCore extends ConnectionToCore {
1112
private webSocketOrError: IResolvablePromise<WebSocket | Error>;
@@ -15,7 +16,7 @@ export default class RemoteConnectionToCore extends ConnectionToCore {
1516
super(options);
1617
}
1718

18-
public async internalSendRequest(payload: ICoreRequestPayload): Promise<void> {
19+
protected async internalSendRequest(payload: ICoreRequestPayload): Promise<void> {
1920
if (!this.webSocketOrError) throw new CanceledPromiseError('No websocket connection');
2021
const message = TypeSerializer.stringify(payload);
2122

@@ -27,8 +28,13 @@ export default class RemoteConnectionToCore extends ConnectionToCore {
2728

2829
return new Promise((resolve, reject) =>
2930
webSocket.send(message, err => {
30-
if (err) reject(err);
31-
else resolve();
31+
if (err) {
32+
const { code } = err as any;
33+
if (code === 'EPIPE' && super.isDisconnecting) {
34+
return reject(new DisconnectedFromCoreError(this.resolvedHost));
35+
}
36+
reject(err);
37+
} else resolve();
3238
}),
3339
);
3440
}
@@ -37,8 +43,8 @@ export default class RemoteConnectionToCore extends ConnectionToCore {
3743
const webSocket = await this.getWebsocket(false);
3844
if (webSocket?.readyState === WebSocket.OPEN) {
3945
try {
40-
webSocket.off('close', this.disconnect);
41-
webSocket.off('error', this.disconnect);
46+
webSocket.off('close', this.internalDisconnect);
47+
webSocket.off('error', this.internalDisconnect);
4248
webSocket.terminate();
4349
} catch (_) {
4450
// ignore errors terminating
@@ -55,8 +61,8 @@ export default class RemoteConnectionToCore extends ConnectionToCore {
5561
this.webSocketOrError = connectToWebsocketHost(hostOrError);
5662
try {
5763
const webSocket = await this.getWebsocket();
58-
webSocket.once('close', this.disconnect);
59-
webSocket.once('error', this.disconnect);
64+
webSocket.once('close', this.internalDisconnect);
65+
webSocket.once('error', this.internalDisconnect);
6066
webSocket.on('message', message => {
6167
const payload = TypeSerializer.parse(message.toString());
6268
this.onMessage(payload);

0 commit comments

Comments
 (0)