Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/examples/push_server/gateway_alarm_trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ def alarm_callback(source_device, action, params):
trigger_token=gateway.token,
)

await loop.run_in_executor(None, push_server.subscribe_event, gateway, event_info)
await push_server.subscribe_event(gateway, event_info)

_LOGGER.info("Listening")

await asyncio.sleep(30)

push_server.stop()
await push_server.stop()


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions docs/examples/push_server/gateway_button_press.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ def subdevice_callback(source_device, action, params):
source_model=button.zigbee_model,
)

await loop.run_in_executor(None, push_server.subscribe_event, gateway, event_info)
await push_server.subscribe_event(gateway, event_info)

_LOGGER.info("Listening")

await asyncio.sleep(30)

push_server.stop()
await push_server.stop()


if __name__ == "__main__":
Expand Down
4 changes: 2 additions & 2 deletions docs/push_server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ we assume that a device class has already been initialized to which the events b

::

push_server.subscribe_event(miio_device, event_info)
await push_server.subscribe_event(miio_device, event_info)

7. The callback function should now be called whenever a matching event occurs.

Expand All @@ -114,7 +114,7 @@ we assume that a device class has already been initialized to which the events b

::

push_server.stop()
await push_server.stop()


.. _obtain_event_info:
Expand Down
10 changes: 6 additions & 4 deletions miio/gateway/alarm.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def last_status_change_time(self) -> datetime:
"""Return the last time the alarm changed status."""
return datetime.fromtimestamp(self._gateway.send("get_arming_time").pop())

def subscribe_events(self):
async def subscribe_events(self):
"""subscribe to the alarm events using the push server."""
if self._gateway._push_server is None:
raise DeviceException(
Expand All @@ -80,15 +80,17 @@ def subscribe_events(self):
trigger_token=self._gateway.token,
)

event_id = self._gateway._push_server.subscribe_event(self._gateway, event_info)
event_id = await self._gateway._push_server.subscribe_event(
self._gateway, event_info
)
if event_id is None:
return False

self._event_ids.append(event_id)
return True

def unsubscribe_events(self):
async def unsubscribe_events(self):
"""Unsubscibe from events registered in the gateway memory."""
for event_id in self._event_ids:
self._gateway._push_server.unsubscribe_event(self._gateway, event_id)
await self._gateway._push_server.unsubscribe_event(self._gateway, event_id)
self._event_ids.remove(event_id)
8 changes: 4 additions & 4 deletions miio/gateway/devices/subdevice.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def push_callback(self, action: str, params: str):
for callback in self._registered_callbacks.values():
callback(action, params)

def subscribe_events(self):
async def subscribe_events(self):
"""subscribe to all subdevice events using the push server."""
if self._gw._push_server is None:
raise DeviceException(
Expand All @@ -323,7 +323,7 @@ def subscribe_events(self):
trigger_value=self.push_events[action].get("trigger_value"),
)

event_id = self._gw._push_server.subscribe_event(self._gw, event_info)
event_id = await self._gw._push_server.subscribe_event(self._gw, event_info)
if event_id is None:
result = False
continue
Expand All @@ -332,8 +332,8 @@ def subscribe_events(self):

return result

def unsubscribe_events(self):
async def unsubscribe_events(self):
"""Unsubscibe from events registered in the gateway memory."""
for event_id in self._event_ids:
self._gw._push_server.unsubscribe_event(self._gw, event_id)
await self._gw._push_server.unsubscribe_event(self._gw, event_id)
self._event_ids.remove(event_id)
4 changes: 2 additions & 2 deletions miio/gateway/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ def push_callback(self, source_device: str, action: str, params: str):
device = self.devices[source_device]
device.push_callback(action, params)

def close(self):
async def close(self):
"""Cleanup all subscribed events and registered callbacks."""
if self._push_server is not None:
self._push_server.unregister_miio_device(self)
await self._push_server.unregister_miio_device(self)
58 changes: 37 additions & 21 deletions miio/push_server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,11 @@ class PushServer:
trigger_token=miio_device.token,
)
# Send a message to the miio_device to subscribe for the event to receive messages on the push_server
await loop.run_in_executor(None, push_server.subscribe_event, miio_device, event_info)
await push_server.subscribe_event(miio_device, event_info)
# Now you will see the callback function beeing called whenever the event occurs
await asyncio.sleep(30)
# When done stop the push_server, this will send messages to all subscribed miio_devices to unsubscribe all events
push_server.stop()
await push_server.stop()
"""

def __init__(self, device_ip):
Expand All @@ -62,6 +62,7 @@ def __init__(self, device_ip):
self._server_id = int(FAKE_DEVICE_ID)
self._server_model = FAKE_DEVICE_MODEL

self._loop = None
self._listen_couroutine = None
self._registered_devices = {}

Expand All @@ -73,19 +74,21 @@ async def start(self):
_LOGGER.error("Miio push server already started, not starting another one.")
return

listen_task = self._create_udp_server()
_, self._listen_couroutine = await listen_task
self._loop = asyncio.get_event_loop()

def stop(self):
_, self._listen_couroutine = await self._create_udp_server()

async def stop(self):
"""Stop Miio push server."""
if self._listen_couroutine is None:
return

for ip in list(self._registered_devices):
self.unregister_miio_device(self._registered_devices[ip]["device"])
await self.unregister_miio_device(self._registered_devices[ip]["device"])

self._listen_couroutine.close()
self._listen_couroutine = None
self._loop = None

def register_miio_device(self, device: Device, callback: PushServerCallback):
"""Register a miio device to this push server."""
Expand Down Expand Up @@ -115,19 +118,21 @@ def register_miio_device(self, device: Device, callback: PushServerCallback):
"device": device,
}

def unregister_miio_device(self, device: Device):
async def unregister_miio_device(self, device: Device):
"""Unregister a miio device from this push server."""
device_info = self._registered_devices.get(device.ip)
if device_info is None:
_LOGGER.debug("Device with ip %s not registered, bailing out", device.ip)
return

for event_id in device_info["event_ids"]:
self.unsubscribe_event(device, event_id)
await self.unsubscribe_event(device, event_id)
self._registered_devices.pop(device.ip)
_LOGGER.debug("push server: unregistered miio device with ip %s", device.ip)

def subscribe_event(self, device: Device, event_info: EventInfo) -> Optional[str]:
async def subscribe_event(
self, device: Device, event_info: EventInfo
) -> Optional[str]:
"""Subscribe to a event such that the device will start pushing data for that
event."""
if device.ip not in self._registered_devices:
Expand All @@ -141,9 +146,18 @@ def subscribe_event(self, device: Device, event_info: EventInfo) -> Optional[str
self._event_id = self._event_id + 1
event_id = f"x.scene.{self._event_id}"

event_payload = self._construct_event(event_id, event_info, device)
# device.device_id and device.model may do IO if device info is not cached, so run in executor.
event_payload = await self._loop.run_in_executor(
None,
self._construct_event,
event_id,
event_info,
device,
)

response = device.send(
response = await self._loop.run_in_executor(
None,
device.send,
"send_data_frame",
{
"cur": 0,
Expand All @@ -167,9 +181,11 @@ def subscribe_event(self, device: Device, event_info: EventInfo) -> Optional[str

return event_id

def unsubscribe_event(self, device: Device, event_id: str):
async def unsubscribe_event(self, device: Device, event_id: str):
"""Unsubscribe from a event by id."""
result = device.send("miIO.xdel", [event_id])
result = await self._loop.run_in_executor(
None, device.send, "miIO.xdel", [event_id]
)
if result == ["ok"]:
event_ids = self._registered_devices[device.ip]["event_ids"]
if event_id in event_ids:
Expand All @@ -179,28 +195,28 @@ def unsubscribe_event(self, device: Device, event_id: str):

return result

def _get_server_ip(self):
async def _get_server_ip(self):
"""Connect to the miio device to get server_ip using a one time use socket."""
get_ip_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
get_ip_socket.bind((self._address, SERVER_PORT))
get_ip_socket.connect((self._device_ip, SERVER_PORT))
get_ip_socket.setblocking(False)
await self._loop.sock_connect(get_ip_socket, (self._device_ip, SERVER_PORT))
server_ip = get_ip_socket.getsockname()[0]
get_ip_socket.close()
_LOGGER.debug("Miio push server device ip=%s", server_ip)
return server_ip

def _create_udp_server(self):
async def _create_udp_server(self):
"""Create the UDP socket and protocol."""
self._server_ip = self._get_server_ip()
self._server_ip = await self._get_server_ip()

# Create a fresh socket that will be used for the push server
udp_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
udp_socket.bind((self._address, SERVER_PORT))
udp_socket.setblocking(False)

loop = asyncio.get_event_loop()

return loop.create_datagram_endpoint(
lambda: ServerProtocol(loop, udp_socket, self),
return await self._loop.create_datagram_endpoint(
lambda: ServerProtocol(self._loop, udp_socket, self),
sock=udp_socket,
)

Expand Down
6 changes: 3 additions & 3 deletions miio/push_server/serverprotocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ def datagram_received(self, data, addr):
msg_id = msg_value["id"]
_LOGGER.debug("<< %s:%s: %s", host, port, msg_value)

# Send OK
self.send_msg_OK(host, port, msg_id, token)

# Parse message
action, device_call_id = msg_value["method"].rsplit(":", 1)
source_device_id = device_call_id.replace("_", ".")

callback(source_device_id, action, msg_value.get("params"))

# Send OK
self.send_msg_OK(host, port, msg_id, token)

except Exception:
_LOGGER.exception(
"Cannot process Miio push server packet: '%s' from %s:%s",
Expand Down