From 2363c936a11491a8e1fc2e94adedc5de29de9fe8 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Mon, 27 Apr 2026 15:29:44 +1000 Subject: [PATCH 1/6] rpc_wrappers: security_key_update: write network key Add option to write a new network key to a device. Signed-off-by: Jordan Yates --- src/infuse_iot/rpc_wrappers/security_key_update.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/infuse_iot/rpc_wrappers/security_key_update.py b/src/infuse_iot/rpc_wrappers/security_key_update.py index e6eb612..75a0c6e 100644 --- a/src/infuse_iot/rpc_wrappers/security_key_update.py +++ b/src/infuse_iot/rpc_wrappers/security_key_update.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +import yaml from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import x25519 @@ -18,13 +19,20 @@ def add_parser(cls, parser): parser.add_argument("--delete", action="store_true", help="Delete instead of writing key") parser.add_argument("--delay", type=int, default=2, help="Reboot delay (seconds)") key_group = parser.add_mutually_exclusive_group(required=True) + key_group.add_argument("--network", type=ValidFile) key_group.add_argument("--secondary-root", type=ValidFile) def __init__(self, args): self._auth = Auth.NETWORK if args.network_auth else Auth.DEVICE self._key_action = defs.rpc_enum_key_action.KEY_DELETE if args.delete else defs.rpc_enum_key_action.KEY_WRITE self._delay = args.delay - if args.secondary_root: + if args.network: + self._key_id = defs.rpc_enum_key_id.NETWORK_KEY + with args.network.open("r") as f: + key_info = yaml.safe_load(f) + self._global_key_id = key_info["id"] + self._key_bytes = key_info["key"] + elif args.secondary_root: self._key_id = defs.rpc_enum_key_id.SECONDARY_REMOTE_PUBLIC_KEY self._global_key_id = 0 with args.secondary_root.open("r") as f: From 063934d1ebc7bc6ea26586113878c1aac3acab78 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Fri, 1 May 2026 13:29:44 +1000 Subject: [PATCH 2/6] rpc_wrappers: sym_read: `--elf` is required The `--elf` argument is not optional. Signed-off-by: Jordan Yates --- src/infuse_iot/rpc_wrappers/sym_read.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/infuse_iot/rpc_wrappers/sym_read.py b/src/infuse_iot/rpc_wrappers/sym_read.py index 36f5983..3b9a4ab 100644 --- a/src/infuse_iot/rpc_wrappers/sym_read.py +++ b/src/infuse_iot/rpc_wrappers/sym_read.py @@ -19,7 +19,7 @@ class sym_read(InfuseRpcCommand, defs.mem_read): @classmethod def add_parser(cls, parser): - parser.add_argument("--elf", type=ValidFile, help="ELF file to read symbol data from") + parser.add_argument("--elf", type=ValidFile, required=True, help="ELF file to read symbol data from") read_type = parser.add_mutually_exclusive_group(required=True) read_type.add_argument("--sym", type=str, help="Symbol name to read") read_type.add_argument("--addr", type=lambda x: int(x, 0), help="Address to read") From 34fd28fbc122a8d2bf889cd1ef315a61bd557af3 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Fri, 1 May 2026 13:31:03 +1000 Subject: [PATCH 3/6] rpc_wrappers: sym_read: handle missing symbol info Add handling for the case where symbol DIE information cannot be found. Signed-off-by: Jordan Yates --- src/infuse_iot/rpc_wrappers/sym_read.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/infuse_iot/rpc_wrappers/sym_read.py b/src/infuse_iot/rpc_wrappers/sym_read.py index 3b9a4ab..56337d9 100644 --- a/src/infuse_iot/rpc_wrappers/sym_read.py +++ b/src/infuse_iot/rpc_wrappers/sym_read.py @@ -28,7 +28,8 @@ def __init__(self, args): # Ignore context-manager warning since ELFFile requires the file to remain opened self.elf_file = open(args.elf, "rb") # noqa: SIM115 self.elf = ELFFile(self.elf_file) - self.symbol_die: DIE | None + self.symbol_die: DIE | None = None + self.symbol_info: elftools.dwarf_field | None = None if args.sym: symbols = elftools.symbols_from_name(self.elf, args.sym) @@ -116,7 +117,8 @@ def handle_response(self, return_code, response): if self.symbol_die is not None: filename, linenum = elftools.dwarf_die_file_info(self.elf, self.symbol_die) print(f" Symbol: {self.symbol.name} ({filename}:{linenum})") - + else: + print(f" Name: {self.symbol.name}") address_base = self.symbol.entry["st_value"] print(f"Address: 0x{address_base:x}") print(f" Size: {symbol_size} bytes") @@ -125,6 +127,9 @@ def handle_response(self, return_code, response): else: print(f" Raw: {self.output[:32].hex()}...") + if self.symbol_info is None: + return + def info_table(info, offset=0): table = [[f"{' ' * offset}{info.name}", f"({info.tag}) ({info.ctype}) {info.offset}", ""]] for child in info.children: From 38594862077975a8bc7f0199156d72ac88f822f8 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Fri, 1 May 2026 13:32:05 +1000 Subject: [PATCH 4/6] rpc_wrappers: sym_read: load DIE info for address If provided a hardcoded address, try and load the debugging information associated with it. Signed-off-by: Jordan Yates --- src/infuse_iot/rpc_wrappers/sym_read.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/infuse_iot/rpc_wrappers/sym_read.py b/src/infuse_iot/rpc_wrappers/sym_read.py index 56337d9..ea7f03c 100644 --- a/src/infuse_iot/rpc_wrappers/sym_read.py +++ b/src/infuse_iot/rpc_wrappers/sym_read.py @@ -62,6 +62,7 @@ def __init__(self, args): if symbol is None: sys.exit(f"Could not find symbol for address 0x{args.addr:08x} in '{args.elf}' symbol table") self.symbol = symbol + self.symbol_die = elftools.dwarf_die_from_symbol(self.elf, symbol) else: raise NotImplementedError("Unexpected symbol refrence") From d6f769e9a6878437628f1ca1c5023e145ec000a2 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Fri, 1 May 2026 13:32:42 +1000 Subject: [PATCH 5/6] util: elftools: use declaration type info fallback If we can't find the DIE directly by address, attempt to use the declarations type information. Signed-off-by: Jordan Yates --- src/infuse_iot/util/elftools.py | 32 ++++++++++++++++++++++++++------ 1 file changed, 26 insertions(+), 6 deletions(-) diff --git a/src/infuse_iot/util/elftools.py b/src/infuse_iot/util/elftools.py index 508ad77..ede9b06 100644 --- a/src/infuse_iot/util/elftools.py +++ b/src/infuse_iot/util/elftools.py @@ -83,15 +83,21 @@ def symbol_from_address(elf: ELFFile, address: int) -> Symbol | None: def dwarf_die_from_symbol(elf: ELFFile, symbol: Symbol) -> DIE | None: """Get a Debug Information Entry associated with a symbol (Global variables only)""" dwarfinfo = elf.get_dwarf_info() + candidate_cu = None + candidate_die = None for CU in dwarfinfo.iter_CUs(): for die in CU.iter_DIEs(): - if die.tag == "DW_TAG_variable" and "DW_AT_name" in die.attributes and "DW_AT_location" in die.attributes: - die_name = die.attributes["DW_AT_name"].value.decode("utf-8") + if die.tag != "DW_TAG_variable" or "DW_AT_name" not in die.attributes: + continue + die_name = die.attributes["DW_AT_name"].value.decode("utf-8") + if die_name != symbol.name: + continue + # Store as fallback in case we can't find a matching address + candidate_cu = CU + candidate_die = die + if "DW_AT_location" in die.attributes: die_location = die.attributes.get("DW_AT_location").value - # Not our symbol - if die_name != symbol.name: - continue # Constant addresses are in a list of form [0x03, addr_bytes] if not isinstance(die_location, list): continue @@ -100,10 +106,24 @@ def dwarf_die_from_symbol(elf: ELFFile, symbol: Symbol) -> DIE | None: address = int.from_bytes(die_location[1:], "little") if address == symbol.entry["st_value"]: return die - return None + + if candidate_cu is None or candidate_die is None: + # No symbols with matching names found + return None + + type_attr = candidate_die.attributes["DW_AT_type"] + # The offset may be relative to the CU or absolute + if type_attr.form == "DW_FORM_ref_addr": + type_offset = type_attr.value # absolute offset in .debug_info + else: + type_offset = candidate_cu.cu_offset + type_attr.value # relative to CU + return dwarfinfo.get_DIE_from_refaddr(type_offset) def dwarf_die_file_info(elf: ELFFile, die: DIE) -> tuple[str | None, int]: + if "DW_AT_decl_file" not in die.attributes or "DW_AT_decl_line" not in die.attributes: + return (None, 0) + file_attr = die.attributes["DW_AT_decl_file"] line_attr = die.attributes["DW_AT_decl_line"] From 73453acbf0b3779837d3eea76e713dc75d9c9e93 Mon Sep 17 00:00:00 2001 From: Jordan Yates Date: Fri, 1 May 2026 13:33:52 +1000 Subject: [PATCH 6/6] tests: socket_comms: use alternate port Don't use the default port for testing, as this will fail if there is a normal users of the libraries running in parallel. Signed-off-by: Jordan Yates --- tests/test_socket_comms.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tests/test_socket_comms.py b/tests/test_socket_comms.py index c633591..6a3f539 100644 --- a/tests/test_socket_comms.py +++ b/tests/test_socket_comms.py @@ -7,9 +7,11 @@ def test_socket_comms(): # Ensure notifications can be sent from the server to the client, and requests sent in reverse - mulicast_addr = comms.default_multicast_address() - server = comms.LocalServer(mulicast_addr) - client = comms.LocalClient(mulicast_addr) + multicast_addr = comms.default_multicast_address() + # Increment port by 1 so we can run the tests in parallel with a real instance + test_addr = (multicast_addr[0], multicast_addr[1] + 1) + server = comms.LocalServer(test_addr) + client = comms.LocalClient(test_addr) # Send request to server request = comms.GatewayRequestCommsCheck()