Skip to content

Commit 46c5987

Browse files
committed
Implemented proper waiting for ws responses.
1 parent 41aaf9f commit 46c5987

File tree

2 files changed

+100
-69
lines changed

2 files changed

+100
-69
lines changed

src/platform.ts

Lines changed: 10 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -71,37 +71,9 @@ export class HomebridgePrincessHeaterPlatform implements DynamicPlatformPlugin {
7171

7272
const auth = await httpAPIClient.login();
7373

74-
const wsAPIClient = new WsAPIClient(this.log);
75-
76-
const helloMessage = await wsAPIClient.send<HelloWsOutgoingMessage>({
77-
type: MessageType.Hello,
78-
version: '2.4.0',
79-
os: 'ios',
80-
source: 'climate',
81-
compatibility: 3,
82-
token: auth.token,
83-
});
84-
85-
wsAPIClient.on('message', (message: WsIncomingMessage) => {
86-
if (
87-
helloMessage &&
88-
message.type === 'response' &&
89-
message.message_id === helloMessage.message_id &&
90-
message.status === 200
91-
) {
92-
this.onHelloMessageResponse(
93-
message as ResponseWsIncomingMessage,
94-
wsAPIClient,
95-
httpAPIClient,
96-
);
97-
}
98-
});
99-
}
100-
101-
async onHelloMessageResponse(response: ResponseWsIncomingMessage, wsClient: WsAPIClient, httpClient: HttpAPIClient) {
102-
this.log.debug('Received a response to Hello message. Going to get list of devices...', response);
74+
const wsAPIClient = new WsAPIClient(this.log, authorizationHeaderValue);
10375

104-
const devices = await httpClient.getDevices();
76+
const devices = await httpAPIClient.getDevices();
10577

10678
this.log.debug('Received a list of devices:', devices.map(d => d.name));
10779

@@ -124,7 +96,7 @@ export class HomebridgePrincessHeaterPlatform implements DynamicPlatformPlugin {
12496

12597
if (existingAccessory) {
12698
this.log.info('Restoring existing accessory from cache:', existingAccessory.displayName);
127-
new HomewizardPrincessHeaterAccessory(this, existingAccessory, wsClient);
99+
new HomewizardPrincessHeaterAccessory(this, existingAccessory, wsAPIClient);
128100
this.api.updatePlatformAccessories([existingAccessory]);
129101
} else {
130102
this.log.info('Adding new accessory:', device.name);
@@ -136,7 +108,7 @@ export class HomebridgePrincessHeaterPlatform implements DynamicPlatformPlugin {
136108
new HomewizardPrincessHeaterAccessory(
137109
this,
138110
accessory as PlatformAccessory<PrincessHeaterAccessoryContext>,
139-
wsClient,
111+
wsAPIClient,
140112
);
141113

142114
this.api.registerPlatformAccessories(PLUGIN_NAME, PLATFORM_NAME, [accessory]);
@@ -146,4 +118,10 @@ export class HomebridgePrincessHeaterPlatform implements DynamicPlatformPlugin {
146118
}
147119
});
148120
}
121+
122+
async onHelloMessageResponse(response: ResponseWsIncomingMessage, wsClient: WsAPIClient, httpClient: HttpAPIClient) {
123+
this.log.debug('Received a response to Hello message. Going to get list of devices...', response);
124+
125+
126+
}
149127
}

src/ws/index.ts

Lines changed: 90 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import WebSocket from 'ws';
2-
import {WsOutgoingMessage} from './types';
3-
import {WS_URL} from './const';
2+
import {HelloWsOutgoingMessage, WsIncomingMessage, WsOutgoingMessage} from './types';
3+
import {MessageType, WS_URL} from './const';
44
import {EventEmitter} from 'events';
55
import {Logger} from 'homebridge';
66

7-
const OPEN_TIMEOUT = 60 * 1000; // 1 minute
7+
const TIMEOUT = 60 * 1000; // 1 minute
88

99
export class WsAPIClient extends EventEmitter {
1010

@@ -14,11 +14,41 @@ export class WsAPIClient extends EventEmitter {
1414

1515
constructor(
1616
private readonly log: Logger,
17+
private readonly authorization: string,
1718
) {
1819
super();
1920
}
2021

21-
private open(): Promise<WebSocket> {
22+
public async send<M extends WsOutgoingMessage>(
23+
message: Omit<M, 'message_id'>,
24+
): Promise<M> {
25+
26+
const ws = await this._getWebSocket();
27+
return this._send<M>(message, ws);
28+
}
29+
30+
private _getWebSocket(): Promise<WebSocket> {
31+
return new Promise((res, rej) => {
32+
if (
33+
!this.ws ||
34+
this.ws.readyState === WebSocket.CLOSING ||
35+
this.ws.readyState === WebSocket.CLOSED
36+
) {
37+
38+
this.log.warn('WS connection is not initialized or closed. Attempting to reopen');
39+
40+
this.
41+
_open()
42+
.then(this._handshake)
43+
.then(ws => this.ws = ws)
44+
.catch(err => rej(err));
45+
} else {
46+
return res(this.ws);
47+
}
48+
});
49+
}
50+
51+
private _open(): Promise<WebSocket> {
2252
return new Promise((res, rej) => {
2353
this.log.debug('Opening WS connection to ->', WS_URL);
2454
const ws = new WebSocket(WS_URL);
@@ -38,8 +68,8 @@ export class WsAPIClient extends EventEmitter {
3868
});
3969

4070
const openTimeout = setTimeout(() => {
41-
rej(new Error(`WS connection was not ready in ${OPEN_TIMEOUT}ms`));
42-
}, OPEN_TIMEOUT);
71+
rej(new Error(`WS connection was not ready in ${TIMEOUT}ms`));
72+
}, TIMEOUT);
4373

4474
ws.on('open', () => {
4575
clearTimeout(openTimeout);
@@ -49,40 +79,29 @@ export class WsAPIClient extends EventEmitter {
4979
});
5080
}
5181

52-
public async send<M extends WsOutgoingMessage>(
82+
private async _handshake(ws: WebSocket): Promise<WebSocket> {
83+
return await this._send<HelloWsOutgoingMessage>({
84+
type: MessageType.Hello,
85+
version: '2.4.0',
86+
os: 'ios',
87+
source: 'climate',
88+
compatibility: 3,
89+
token: this.authorization,
90+
}, ws).then(() => ws);
91+
}
92+
93+
private async _send<M extends WsOutgoingMessage>(
5394
message: Omit<M, 'message_id'>,
95+
ws: WebSocket,
5496
): Promise<M> {
5597

56-
const wsPromise: Promise<WebSocket> = new Promise((res, rej) => {
57-
if (
58-
!this.ws ||
59-
this.ws.readyState === WebSocket.CLOSING ||
60-
this.ws.readyState === WebSocket.CLOSED
61-
) {
62-
63-
this.log.warn('WS connection is not initialized or closed. Attempting to reopen');
64-
65-
this.open()
66-
.then((ws) => {
67-
this.ws = ws;
68-
res(ws);
69-
})
70-
.catch(err => rej(err));
71-
} else {
72-
return res(this.ws);
73-
}
74-
});
75-
76-
const ws = await wsPromise;
77-
78-
const messageId = ++this.lastMessageId;
79-
const fullMessage = {
80-
...message,
81-
message_id: messageId,
82-
} as M;
83-
this.log.debug('Sending WS message -> ', fullMessage);
84-
85-
return new Promise((res, rej) => {
98+
const sentMessage = await new Promise((res, rej) => {
99+
const messageId = ++this.lastMessageId;
100+
const fullMessage = {
101+
...message,
102+
message_id: messageId,
103+
} as M;
104+
this.log.debug('Sending WS message -> ', fullMessage);
86105
ws.send(
87106
JSON.stringify(fullMessage),
88107
(err) => {
@@ -96,5 +115,39 @@ export class WsAPIClient extends EventEmitter {
96115
},
97116
);
98117
});
118+
119+
return await this._waitForResponse(sentMessage as M);
120+
}
121+
122+
private _waitForResponse<M extends WsOutgoingMessage>(
123+
outgoingMessage: M,
124+
): Promise<M> {
125+
return new Promise((res, rej) => {
126+
127+
this.log.debug('Waiting for message response -> ', outgoingMessage);
128+
129+
const timeout = setTimeout(() => {
130+
rej(new Error(`Didn't receive response in ${TIMEOUT}!`));
131+
}, TIMEOUT);
132+
133+
const onMessage = (incomingMessage: WsIncomingMessage) => {
134+
if (
135+
incomingMessage.type === 'response' &&
136+
incomingMessage.message_id === outgoingMessage.message_id
137+
) {
138+
this.log.debug('Received response for message -> ', outgoingMessage, incomingMessage);
139+
this.off('message', onMessage);
140+
clearTimeout(timeout);
141+
if (incomingMessage.status === 200) {
142+
res(outgoingMessage);
143+
} else {
144+
rej(incomingMessage);
145+
}
146+
}
147+
148+
};
149+
150+
this.on('message', onMessage);
151+
});
99152
}
100153
}

0 commit comments

Comments
 (0)