Skip to content

Commit 43af64e

Browse files
committed
feat(puppet): convert to websocket
1 parent 3c03c3a commit 43af64e

File tree

4 files changed

+86
-96
lines changed

4 files changed

+86
-96
lines changed

puppet-interfaces/IConnectionTransport.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ import ITypedEventEmitter from '@secret-agent/core-interfaces/ITypedEventEmitter
1717

1818
export default interface IConnectionTransport
1919
extends ITypedEventEmitter<IConnectionTransportEvents> {
20+
url?: string;
2021
send(body: string);
2122
close();
23+
clone(): IConnectionTransport;
24+
waitForOpen: Promise<void>;
2225
}
2326

2427
export interface IConnectionTransportEvents {

puppet/lib/PipeTransport.ts

Lines changed: 0 additions & 87 deletions
This file was deleted.

puppet/lib/WebSocketTransport.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import * as EventUtils from '@secret-agent/commons/eventUtils';
2+
import { addEventListeners, TypedEventEmitter } from '@secret-agent/commons/eventUtils';
3+
import Log from '@secret-agent/commons/Logger';
4+
import IRegisteredEventListener from '@secret-agent/core-interfaces/IRegisteredEventListener';
5+
import IConnectionTransport, {
6+
IConnectionTransportEvents,
7+
} from '@secret-agent/puppet-interfaces/IConnectionTransport';
8+
import * as WebSocket from 'ws';
9+
import Resolvable from '@secret-agent/commons/Resolvable';
10+
import * as http from 'http';
11+
import { URL } from 'url';
12+
13+
const { log } = Log(module);
14+
15+
export class WebSocketTransport
16+
extends TypedEventEmitter<IConnectionTransportEvents>
17+
implements IConnectionTransport {
18+
eventListeners: IRegisteredEventListener[];
19+
20+
public get waitForOpen(): Promise<void> {
21+
return this.connectResolvable.promise;
22+
}
23+
24+
public get url(): string {
25+
return this.webSocket.url;
26+
}
27+
28+
private readonly connectResolvable = new Resolvable<void>();
29+
private readonly webSocket: WebSocket;
30+
31+
constructor(url: string, readonly label = 'root') {
32+
super();
33+
this.webSocket = new WebSocket(url);
34+
this.webSocket.on('open', this.connectResolvable.resolve);
35+
this.webSocket.on('error', this.connectResolvable.reject);
36+
this.eventListeners = addEventListeners(this.webSocket, [
37+
['message', this.onMessage.bind(this)],
38+
['close', this.onClosed.bind(this)],
39+
['error', error => log.info('WebSocketTransport.Error', { error, sessionId: null })],
40+
]);
41+
this.emit = this.emit.bind(this);
42+
}
43+
44+
send(message: string) {
45+
this.webSocket.send(message);
46+
}
47+
48+
close() {
49+
EventUtils.removeEventListeners(this.eventListeners);
50+
this.webSocket.close();
51+
}
52+
53+
clone(): WebSocketTransport {
54+
return new WebSocketTransport(this.webSocket.url, 'clone');
55+
}
56+
57+
private onClosed() {
58+
log.stats('WebSocketTransport.Closed');
59+
this.emit('close');
60+
}
61+
62+
private onMessage(event: string): void {
63+
this.emit('message', event);
64+
}
65+
}

puppet/lib/launchProcess.ts

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,23 +16,23 @@
1616
*/
1717
import * as childProcess from 'child_process';
1818
import { StdioOptions } from 'child_process';
19-
import { Readable, Writable } from 'stream';
2019
import * as readline from 'readline';
2120
import * as Path from 'path';
2221
import Log from '@secret-agent/commons/Logger';
2322
import ILaunchedProcess from '@secret-agent/puppet-interfaces/ILaunchedProcess';
24-
import { PipeTransport } from './PipeTransport';
23+
import Resolvable from '@secret-agent/commons/Resolvable';
24+
import { WebSocketTransport } from './WebSocketTransport';
2525

2626
const { log } = Log(module);
2727

2828
const logProcessExit = process.env.NODE_ENV !== 'test';
2929

30-
export default function launchProcess(
30+
export default async function launchProcess(
3131
executablePath: string,
3232
processArguments: string[],
3333
env: NodeJS.ProcessEnv,
3434
): Promise<ILaunchedProcess> {
35-
const stdio: StdioOptions = ['ignore', 'pipe', 'pipe', 'pipe', 'pipe'];
35+
const stdio: StdioOptions = ['ignore', 'pipe', 'pipe'];
3636

3737
log.info(`Puppet.LaunchProcess`, { sessionId: null, executablePath, processArguments });
3838
const launchedProcess = childProcess.spawn(executablePath, processArguments, {
@@ -60,11 +60,16 @@ export default function launchProcess(
6060

6161
const stdout = readline.createInterface({ input: launchedProcess.stdout });
6262
stdout.on('line', line => {
63-
log.stats(`${exe}.stdout`, { message: line, sessionId: null });
63+
if (line) log.stats(`${exe}.stdout`, { message: line, sessionId: null });
6464
});
6565

66+
const websocketEndpointResolvable = new Resolvable<string>();
6667
const stderr = readline.createInterface({ input: launchedProcess.stderr });
6768
stderr.on('line', line => {
69+
if (!line) return;
70+
const match = line.match(/DevTools listening on (.*)/);
71+
if (match) websocketEndpointResolvable.resolve(match[1].trim());
72+
6873
log.warn(`${exe}.stderr`, { message: line, sessionId: null });
6974
});
7075

@@ -76,10 +81,9 @@ export default function launchProcess(
7681
}
7782
});
7883

79-
const transport = new PipeTransport(
80-
launchedProcess.stdio[3] as Writable,
81-
launchedProcess.stdio[4] as Readable,
82-
);
84+
const wsEndpoint = await websocketEndpointResolvable.promise;
85+
const transport = new WebSocketTransport(wsEndpoint);
86+
await transport.waitForOpen;
8387

8488
return Promise.resolve(<ILaunchedProcess>{
8589
transport,
@@ -89,6 +93,11 @@ export default function launchProcess(
8993
function close(): Promise<void> {
9094
if (launchedProcess.killed || processKilled) return;
9195

96+
try {
97+
transport.close();
98+
} catch (error) {
99+
// drown
100+
}
92101
try {
93102
const closed = new Promise<void>(resolve => launchedProcess.once('exit', resolve));
94103
if (process.platform === 'win32') {

0 commit comments

Comments
 (0)