Skip to content

Commit b0ece5b

Browse files
committed
feat(handler): re-q unstarted agents on disconnect
1 parent cb49d1e commit b0ece5b

File tree

3 files changed

+113
-10
lines changed

3 files changed

+113
-10
lines changed

client/lib/Handler.ts

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,20 @@ import ConnectionToCore from '../connections/ConnectionToCore';
99
import ConnectionFactory from '../connections/ConnectionFactory';
1010
import DisconnectedFromCoreError from '../connections/DisconnectedFromCoreError';
1111

12-
type SettledDispatchesBySessionId = { [sessionId: string]: { args: any; error?: Error } };
13-
type PendingDispatch = { resolution: Promise<Error | void>; sessionId?: string; args: any };
12+
type SettledDispatchesBySessionId = {
13+
[sessionId: string]: { args: any; error?: Error; retries: number };
14+
};
15+
type PendingDispatch = {
16+
resolution: Promise<Error | void>;
17+
sessionId?: string;
18+
args: any;
19+
retries: number;
20+
};
1421

1522
const { log } = Log(module);
1623

1724
export default class Handler {
25+
public disconnectedDispatchRetries = 3;
1826
public defaultAgentOptions: IAgentCreateOptions = {};
1927
public get coreHosts(): Promise<string[]> {
2028
return Promise.all(this.connections.map(x => x.hostOrError)).then(x => {
@@ -30,6 +38,8 @@ export default class Handler {
3038
private readonly connections: ConnectionToCore[] = [];
3139
private readonly dispatches: PendingDispatch[] = [];
3240

41+
private isClosing = false;
42+
3343
constructor(...connectionOptions: (IConnectionToCoreOptions | ConnectionToCore)[]) {
3444
if (!connectionOptions.length) {
3545
connectionOptions.push({});
@@ -68,14 +78,25 @@ export default class Handler {
6878
runFn: (agent: Agent, args?: T) => Promise<void>,
6979
args?: T,
7080
createAgentOptions?: IAgentCreateOptions,
81+
pendingDispatch?: PendingDispatch,
7182
): void {
7283
const options = {
7384
...this.defaultAgentOptions,
7485
...createAgentOptions,
7586
};
87+
88+
const dispatched: PendingDispatch = pendingDispatch ?? { args, resolution: null, retries: 0 };
89+
90+
// if no available connection, return
7691
const connection = this.getConnection();
92+
if (!connection) {
93+
dispatched.resolution = Promise.resolve(
94+
new Error("There aren't any connections available to dispatch this agent"),
95+
);
96+
this.dispatches.push(dispatched);
97+
return;
98+
}
7799

78-
const dispatched: PendingDispatch = { args, resolution: null };
79100
dispatched.resolution = connection
80101
.useAgent(options, async agent => {
81102
try {
@@ -85,7 +106,16 @@ export default class Handler {
85106
await agent.close();
86107
}
87108
})
88-
.catch((err: Error) => err);
109+
.catch(err => {
110+
const canRetry =
111+
!dispatched.sessionId && dispatched.retries < this.disconnectedDispatchRetries;
112+
if (canRetry && !this.isClosing && this.connections.length) {
113+
dispatched.retries += 1;
114+
return this.dispatchAgent(runFn, args, createAgentOptions, dispatched);
115+
}
116+
117+
return err;
118+
});
89119

90120
this.dispatches.push(dispatched);
91121
}
@@ -144,9 +174,9 @@ export default class Handler {
144174
this.dispatches.length = 0;
145175

146176
await Promise.all(dispatches.map(x => x.resolution));
147-
for (const { sessionId, resolution, args } of dispatches) {
177+
for (const { sessionId, resolution, args, retries } of dispatches) {
148178
const error = <Error>await resolution;
149-
result[sessionId] = { args, error };
179+
result[sessionId] = { args, error, retries };
150180
}
151181

152182
await new Promise(setImmediate);
@@ -156,15 +186,21 @@ export default class Handler {
156186
}
157187

158188
public async close(error?: Error): Promise<void> {
189+
if (this.isClosing) return;
190+
this.isClosing = true;
159191
// eslint-disable-next-line promise/no-promise-in-callback
160192
await Promise.all(this.connections.map(x => x.disconnect(error)));
161193
}
162194

163-
private getConnection(): ConnectionToCore {
195+
private getAvailableConnections(): ConnectionToCore[] {
164196
// prefer a connection that can create a session right now
165197
let connections = this.connections.filter(x => x.canCreateSessionNow());
166198
if (!connections.length) connections = this.connections.filter(x => !x.isDisconnecting);
167-
return pickRandom(connections);
199+
return connections;
200+
}
201+
202+
private getConnection(): ConnectionToCore {
203+
return pickRandom(this.getAvailableConnections());
168204
}
169205

170206
private registerUnhandledExceptionHandlers(): void {

core/server/ConnectionToClient.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ export default class ConnectionToClient extends TypedEventEmitter<{
3434

3535
private autoShutdownTimer: NodeJS.Timer;
3636
private readonly sessionIds = new Set<string>();
37+
private hasActiveCommand = false;
3738

3839
private clientExposedMethods = new Map<string, keyof this & string>([
3940
['Core.connect', 'connect'],
@@ -62,6 +63,7 @@ export default class ConnectionToClient extends TypedEventEmitter<{
6263

6364
let data: any;
6465
try {
66+
this.hasActiveCommand = true;
6567
data = await this.executeCommand(command, args, meta);
6668
} catch (error) {
6769
// if we're closing, don't emit errors
@@ -77,6 +79,8 @@ export default class ConnectionToClient extends TypedEventEmitter<{
7779
}
7880
data = this.serializeError(error);
7981
data.isDisconnecting = this.isClosing || session?.isClosing;
82+
} finally {
83+
this.hasActiveCommand = false;
8084
}
8185

8286
const commandId = session?.sessionState?.lastCommand?.id;
@@ -132,7 +136,7 @@ export default class ConnectionToClient extends TypedEventEmitter<{
132136
}
133137

134138
public isActive(): boolean {
135-
return this.sessionIds.size > 0 || this.isPersistent;
139+
return this.sessionIds.size > 0 || this.isPersistent || this.hasActiveCommand;
136140
}
137141

138142
/////// SESSION /////////////////////////////////////////////////////////////////////////////////////////////////////

full-client/test/handler.test.ts

Lines changed: 64 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { Helpers } from '@secret-agent/testing';
22
import { ITestKoaServer } from '@secret-agent/testing/helpers';
3-
import Core, { Session, CoreProcess } from '@secret-agent/core/index';
3+
import Core, { CoreProcess, Session } from '@secret-agent/core/index';
44
import DisconnectedFromCoreError from '@secret-agent/client/connections/DisconnectedFromCoreError';
55
import { Agent, RemoteConnectionToCore } from '@secret-agent/client/index';
66
import { createPromise } from '@secret-agent/commons/utils';
@@ -12,6 +12,7 @@ beforeAll(async () => {
1212
await Core.start();
1313
koaServer = await Helpers.runKoaServer(true);
1414
});
15+
afterEach(Helpers.afterEach);
1516
afterAll(Helpers.afterAll);
1617

1718
describe('Full client Handler', () => {
@@ -68,6 +69,8 @@ describe('Full client Handler', () => {
6869
await expect(agent2.sessionId).resolves.toBeTruthy();
6970
const agent3 = handler.createAgent();
7071

72+
Helpers.needsClosing.push(agent2);
73+
7174
async function isAgent3Available(millis = 100): Promise<boolean> {
7275
const result = await Promise.race([
7376
agent3,
@@ -81,6 +84,7 @@ describe('Full client Handler', () => {
8184
await agent1.close();
8285

8386
await expect(isAgent3Available(5e3)).resolves.toBe(true);
87+
await (await agent3).close();
8488
});
8589
});
8690

@@ -374,4 +378,63 @@ describe('connectionToCore', () => {
374378

375379
await handler.close();
376380
});
381+
382+
it('can re-queue dispatched agents that never started', async () => {
383+
const coreHost = await CoreProcess.spawn({});
384+
Helpers.onClose(() => CoreProcess.kill('SIGINT'));
385+
const connection1 = new RemoteConnectionToCore({
386+
maxConcurrency: 1,
387+
host: coreHost,
388+
});
389+
await connection1.connect();
390+
391+
const handler = new Handler(connection1);
392+
Helpers.needsClosing.push(handler);
393+
394+
const waitForGoto = createPromise();
395+
const dispatchErrorPromise = createPromise<Error>();
396+
handler.dispatchAgent(async agent => {
397+
try {
398+
await agent.goto(koaServer.baseUrl);
399+
// create a command we can disconnect from (don't await yet)
400+
const promise = agent.waitForMillis(5e3);
401+
await new Promise(resolve => setTimeout(resolve, 50));
402+
waitForGoto.resolve();
403+
await promise;
404+
} catch (error) {
405+
dispatchErrorPromise.resolve(error);
406+
throw error;
407+
}
408+
});
409+
410+
let counter = 0;
411+
const incr = async agent => {
412+
await agent.goto(koaServer.baseUrl);
413+
counter += 1;
414+
};
415+
handler.dispatchAgent(incr);
416+
handler.dispatchAgent(incr);
417+
418+
// first 2 will be queued against the first connection
419+
const coreHost2 = await Core.server.address;
420+
await handler.addConnectionToCore({ maxConcurrency: 2, host: coreHost2 });
421+
handler.dispatchAgent(incr);
422+
handler.dispatchAgent(incr);
423+
await waitForGoto.promise;
424+
425+
// disconnect the first connection. the first two handlers should get re-queued
426+
await connection1.disconnect();
427+
await new Promise(setImmediate);
428+
429+
// should have an error thrown if it actually the process. this one should NOT get re-queued
430+
await expect(dispatchErrorPromise).resolves.toBeTruthy();
431+
const dispatchError = await dispatchErrorPromise;
432+
expect(dispatchError).toBeInstanceOf(DisconnectedFromCoreError);
433+
434+
const allDispatches = await handler.waitForAllDispatchesSettled();
435+
436+
expect(counter).toBe(4);
437+
expect(Object.keys(allDispatches)).toHaveLength(5);
438+
expect(Object.values(allDispatches).filter(x => !!x.error)).toHaveLength(1);
439+
});
377440
});

0 commit comments

Comments
 (0)