Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions src/infuse_iot/rpc_wrappers/coap_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ def add_parser(cls, parser):
default=cls.INFUSE_COAP_SERVER_PORT,
help="COAP server port",
)
parser.add_argument(
"--block-timeout",
type=int,
default=2000,
help="Timeout in milliseconds for each COAP block query",
)
parser.add_argument(
"--resource",
"-r",
Expand Down Expand Up @@ -104,6 +110,7 @@ def __init__(self, args):
self.server = args.server.encode("utf-8")
self.port = args.port
self.resource = args.resource.encode("utf-8")
self.block_timeout = args.block_timeout
self.action = args.action
self.file_len, self.file_crc = coap_server_file_stats(args.server, args.resource)

Expand All @@ -123,7 +130,7 @@ class request(ctypes.LittleEndianStructure):
return request(
self.server,
self.port,
2000,
self.block_timeout,
self.action,
self.file_len,
self.file_crc,
Expand All @@ -134,7 +141,7 @@ def request_json(self):
return {
"server_address": self.server.decode("utf-8"),
"server_port": str(self.port),
"block_timeout_ms": "2000",
"block_timeout_ms": str(self.block_timeout),
"action": self.action.name,
"resource_len": str(self.file_len),
"resource_crc": str(self.file_crc),
Expand Down
8 changes: 6 additions & 2 deletions src/infuse_iot/rpc_wrappers/kv_write.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,10 @@ def add_parser(cls, parser):
v_parser = parser.add_mutually_exclusive_group(required=True)
v_parser.add_argument("--value", "-v", type=str, help="KV value as hex string")
v_parser.add_argument("--string", "-s", type=str, help="KV string")
v_parser.add_argument("--delete", "-d", action="store_true", help="Delete KV value")

def __init__(self, args):
self.delete = args.delete
self.key = args.key
if args.value is not None:
self.value = bytes.fromhex(args.value)
Expand All @@ -62,6 +64,8 @@ def __init__(self, args):

str_val = args.string.encode("utf-8") + b"\x00"
self.value = len(str_val).to_bytes(1, "little") + str_val
elif args.delete:
self.value = b""
else:
raise NotImplementedError("Unimplmented value parsing")

Expand All @@ -77,9 +81,9 @@ def handle_response(self, return_code, response):

def print_status(name, rc):
if rc < 0:
print(f"{name} failed to write ({os.strerror(-rc)})")
print(f"{name} failed to {'delete' if self.delete else 'write'} ({os.strerror(-rc)})")
elif rc == 0:
print(f"{name} already matched")
print(f"{name} {'deleted' if self.delete else 'already matched'}")
else:
print(f"{name} updated")

Expand Down
6 changes: 4 additions & 2 deletions src/infuse_iot/socket_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ def __init__(self, multicast_address):
# Multicast output socket
self._output_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
self._output_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 2)
self._output_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton("127.0.0.1"))
if sys.platform != "win32":
self._output_sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton("127.0.0.1"))
self._output_addr = multicast_address
# Single input socket
unicast_address = ("localhost", multicast_address[1] + 1)
Expand Down Expand Up @@ -303,9 +304,10 @@ def __init__(self, multicast_address, rx_timeout=0.2):
self._input_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if sys.platform == "win32":
self._input_sock.bind(("", multicast_address[1]))
mreq = struct.pack("4sl", socket.inet_aton(multicast_address[0]), socket.INADDR_ANY)
else:
self._input_sock.bind(multicast_address)
mreq = struct.pack("4s4s", socket.inet_aton(multicast_address[0]), socket.inet_aton("127.0.0.1"))
mreq = struct.pack("4s4s", socket.inet_aton(multicast_address[0]), socket.inet_aton("127.0.0.1"))
self._input_sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
self._input_sock.settimeout(rx_timeout)
# Unicast output socket
Expand Down
23 changes: 15 additions & 8 deletions src/infuse_iot/tools/rpc_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import argparse
import base64
import datetime
import importlib
import json
import pkgutil
Expand Down Expand Up @@ -98,10 +99,22 @@ def query(self, client: Client):
rsp = get_rpc_by_id.sync(client=client, id=UUID(self._args.id))
if isinstance(rsp, Error) or rsp is None:
sys.exit(f"Failed to query RPC state ({rsp})")
now = datetime.datetime.now(tz=datetime.timezone.utc)
downlink = rsp.downlink_message
print(f"RPC State: {downlink.status}")
rpc_req = downlink.rpc_req
try:
command_name = id_type_mapping[rpc_req.command_id].NAME
except KeyError:
command_name = "Unknown"
print(f" RPC ID: {rpc_req.command_id} ({command_name})")
print(f" To: {rsp.device.device_id}")
# Manually detect downlink expiry, as the API doesn't do it
if downlink.status == DownlinkMessageStatus.WAITING and downlink.expires_at and now > downlink.expires_at:
print(" State: expired")
else:
print(f" State: {downlink.status}")
if downlink.status in [DownlinkMessageStatus.SENT, DownlinkMessageStatus.COMPLETED]:
route = downlink.rpc_req.route
route = rpc_req.route
if downlink.sent_at:
print(f" At: {downlink.sent_at}")
if route:
Expand All @@ -110,14 +123,8 @@ def query(self, client: Client):
else:
print(f" Through: Direct ({route.interface.upper()})")
if downlink.status == DownlinkMessageStatus.COMPLETED:
rpc_req = downlink.rpc_req
rpc_rsp = downlink.rpc_rsp
assert isinstance(rpc_rsp, RpcRsp)
try:
command_name = id_type_mapping[rpc_req.command_id].NAME
except KeyError:
command_name = "Unknown"
print(f" RPC ID: {rpc_req.command_id} ({command_name})")
print(f" Result: {rpc_rsp.return_code}")
if rpc_rsp.params:
print(json.dumps(rpc_rsp.params.additional_properties, indent=4))
Expand Down
26 changes: 26 additions & 0 deletions tests/test_socket_comms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import os

import infuse_iot.socket_comms as comms

assert "TOXTEMPDIR" in os.environ, "you must run these tests using tox"


def test_socket_comms():
# Ensure notifications can be sent from the server to the client, and requests sent in reverse
mulicast_addr = comms.default_multicast_address()
server = comms.LocalServer(mulicast_addr)
client = comms.LocalClient(mulicast_addr)

# Send request to server
request = comms.GatewayRequestCommsCheck()
client.send(request)

# Server receives request, responds
recv_req = server.receive()
assert isinstance(recv_req, comms.GatewayRequestCommsCheck)
response = comms.ClientNotificationCommsCheck()
server.broadcast(response)

# Client receives the response
recv_rsp = client.receive()
assert isinstance(recv_rsp, comms.ClientNotificationCommsCheck)
Loading