Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/infuse_iot/rpc_wrappers/security_key_update.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/usr/bin/env python3

import yaml
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import x25519

Expand All @@ -18,13 +19,20 @@ def add_parser(cls, parser):
parser.add_argument("--delete", action="store_true", help="Delete instead of writing key")
parser.add_argument("--delay", type=int, default=2, help="Reboot delay (seconds)")
key_group = parser.add_mutually_exclusive_group(required=True)
key_group.add_argument("--network", type=ValidFile)
key_group.add_argument("--secondary-root", type=ValidFile)

def __init__(self, args):
self._auth = Auth.NETWORK if args.network_auth else Auth.DEVICE
self._key_action = defs.rpc_enum_key_action.KEY_DELETE if args.delete else defs.rpc_enum_key_action.KEY_WRITE
self._delay = args.delay
if args.secondary_root:
if args.network:
self._key_id = defs.rpc_enum_key_id.NETWORK_KEY
with args.network.open("r") as f:
key_info = yaml.safe_load(f)
self._global_key_id = key_info["id"]
self._key_bytes = key_info["key"]
elif args.secondary_root:
self._key_id = defs.rpc_enum_key_id.SECONDARY_REMOTE_PUBLIC_KEY
self._global_key_id = 0
with args.secondary_root.open("r") as f:
Expand Down
12 changes: 9 additions & 3 deletions src/infuse_iot/rpc_wrappers/sym_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class sym_read(InfuseRpcCommand, defs.mem_read):

@classmethod
def add_parser(cls, parser):
parser.add_argument("--elf", type=ValidFile, help="ELF file to read symbol data from")
parser.add_argument("--elf", type=ValidFile, required=True, help="ELF file to read symbol data from")
read_type = parser.add_mutually_exclusive_group(required=True)
read_type.add_argument("--sym", type=str, help="Symbol name to read")
read_type.add_argument("--addr", type=lambda x: int(x, 0), help="Address to read")
Expand All @@ -28,7 +28,8 @@ def __init__(self, args):
# Ignore context-manager warning since ELFFile requires the file to remain opened
self.elf_file = open(args.elf, "rb") # noqa: SIM115
self.elf = ELFFile(self.elf_file)
self.symbol_die: DIE | None
self.symbol_die: DIE | None = None
self.symbol_info: elftools.dwarf_field | None = None

if args.sym:
symbols = elftools.symbols_from_name(self.elf, args.sym)
Expand Down Expand Up @@ -61,6 +62,7 @@ def __init__(self, args):
if symbol is None:
sys.exit(f"Could not find symbol for address 0x{args.addr:08x} in '{args.elf}' symbol table")
self.symbol = symbol
self.symbol_die = elftools.dwarf_die_from_symbol(self.elf, symbol)
else:
raise NotImplementedError("Unexpected symbol refrence")

Expand Down Expand Up @@ -116,7 +118,8 @@ def handle_response(self, return_code, response):
if self.symbol_die is not None:
filename, linenum = elftools.dwarf_die_file_info(self.elf, self.symbol_die)
print(f" Symbol: {self.symbol.name} ({filename}:{linenum})")

else:
print(f" Name: {self.symbol.name}")
address_base = self.symbol.entry["st_value"]
print(f"Address: 0x{address_base:x}")
print(f" Size: {symbol_size} bytes")
Expand All @@ -125,6 +128,9 @@ def handle_response(self, return_code, response):
else:
print(f" Raw: {self.output[:32].hex()}...")

if self.symbol_info is None:
return

def info_table(info, offset=0):
table = [[f"{' ' * offset}{info.name}", f"({info.tag}) ({info.ctype}) {info.offset}", ""]]
for child in info.children:
Expand Down
32 changes: 26 additions & 6 deletions src/infuse_iot/util/elftools.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,15 +83,21 @@ def symbol_from_address(elf: ELFFile, address: int) -> Symbol | None:
def dwarf_die_from_symbol(elf: ELFFile, symbol: Symbol) -> DIE | None:
"""Get a Debug Information Entry associated with a symbol (Global variables only)"""
dwarfinfo = elf.get_dwarf_info()
candidate_cu = None
candidate_die = None

for CU in dwarfinfo.iter_CUs():
for die in CU.iter_DIEs():
if die.tag == "DW_TAG_variable" and "DW_AT_name" in die.attributes and "DW_AT_location" in die.attributes:
die_name = die.attributes["DW_AT_name"].value.decode("utf-8")
if die.tag != "DW_TAG_variable" or "DW_AT_name" not in die.attributes:
continue
die_name = die.attributes["DW_AT_name"].value.decode("utf-8")
if die_name != symbol.name:
continue
# Store as fallback in case we can't find a matching address
candidate_cu = CU
candidate_die = die
if "DW_AT_location" in die.attributes:
die_location = die.attributes.get("DW_AT_location").value
# Not our symbol
if die_name != symbol.name:
continue
# Constant addresses are in a list of form [0x03, addr_bytes]
if not isinstance(die_location, list):
continue
Expand All @@ -100,10 +106,24 @@ def dwarf_die_from_symbol(elf: ELFFile, symbol: Symbol) -> DIE | None:
address = int.from_bytes(die_location[1:], "little")
if address == symbol.entry["st_value"]:
return die
return None

if candidate_cu is None or candidate_die is None:
# No symbols with matching names found
return None

type_attr = candidate_die.attributes["DW_AT_type"]
# The offset may be relative to the CU or absolute
if type_attr.form == "DW_FORM_ref_addr":
type_offset = type_attr.value # absolute offset in .debug_info
else:
type_offset = candidate_cu.cu_offset + type_attr.value # relative to CU
return dwarfinfo.get_DIE_from_refaddr(type_offset)


def dwarf_die_file_info(elf: ELFFile, die: DIE) -> tuple[str | None, int]:
if "DW_AT_decl_file" not in die.attributes or "DW_AT_decl_line" not in die.attributes:
return (None, 0)

file_attr = die.attributes["DW_AT_decl_file"]
line_attr = die.attributes["DW_AT_decl_line"]

Expand Down
8 changes: 5 additions & 3 deletions tests/test_socket_comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

def test_socket_comms():
# Ensure notifications can be sent from the server to the client, and requests sent in reverse
mulicast_addr = comms.default_multicast_address()
server = comms.LocalServer(mulicast_addr)
client = comms.LocalClient(mulicast_addr)
multicast_addr = comms.default_multicast_address()
# Increment port by 1 so we can run the tests in parallel with a real instance
test_addr = (multicast_addr[0], multicast_addr[1] + 1)
server = comms.LocalServer(test_addr)
client = comms.LocalClient(test_addr)

# Send request to server
request = comms.GatewayRequestCommsCheck()
Expand Down
Loading