From 9426649ca0803d98d8c37a1af8fdaa4051e5e0d4 Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Thu, 22 Jan 2026 14:51:27 -0300 Subject: [PATCH 1/8] fix: Auto-detect local IPs for OPC-UA certificate SAN entries When running in Docker containers, the auto-generated OPC-UA server certificate only included the container hostname in the Subject Alternative Name (SAN). This caused BadCertificateHostNameInvalid errors when clients connected via IP address. Changes: - Add get_local_ip_addresses() to auto-detect all local IPs - Add generate_certificate_with_sans() for certificates with multiple DNS names and IP addresses in SANs - Update certificate generation to include all detected local IPs Co-Authored-By: Claude Opus 4.5 --- .../plugins/python/opcua/opcua_security.py | 316 +++++++++++++++--- 1 file changed, 278 insertions(+), 38 deletions(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_security.py b/core/src/drivers/plugins/python/opcua/opcua_security.py index e1af6f3c..a9ceb845 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_security.py +++ b/core/src/drivers/plugins/python/opcua/opcua_security.py @@ -16,8 +16,10 @@ import asyncio import tempfile import shutil +import ipaddress +import datetime from pathlib import Path -from typing import Optional, Tuple, List +from typing import Optional, Tuple, List, Set from urllib.parse import urlparse from asyncua.crypto import uacrypto from asyncua.crypto.cert_gen import setup_self_signed_certificate @@ -27,10 +29,11 @@ from asyncua.crypto.permission_rules import SimpleRoleRuleset, PermissionRuleset from asyncua.server.user_managers import UserRole from asyncua import ua -from cryptography.x509.oid import ExtensionOID, ExtendedKeyUsageOID +from cryptography.x509.oid import ExtensionOID, ExtendedKeyUsageOID, NameOID from cryptography import x509 from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives.asymmetric import rsa # Add directories to path for module access _current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -44,6 +47,230 @@ from opcua_logging import log_info, log_warn, log_error +def get_local_ip_addresses() -> Set[str]: + """ + Get all local IP addresses of the machine. + + Returns: + Set of IP address strings (both IPv4 and IPv6) + """ + ip_addresses = set() + + # Always include localhost addresses + ip_addresses.add("127.0.0.1") + ip_addresses.add("::1") + + try: + # Method 1: Get IPs from all network interfaces + hostname = socket.gethostname() + try: + # Get all addresses associated with hostname + for info in socket.getaddrinfo(hostname, None): + ip = info[4][0] + # Filter out link-local and loopback for external access + if not ip.startswith("fe80:"): # Skip IPv6 link-local + ip_addresses.add(ip) + except socket.gaierror: + pass + + # Method 2: Connect to external address to find default interface IP + try: + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + # Doesn't actually connect, just determines route + s.connect(("8.8.8.8", 80)) + ip_addresses.add(s.getsockname()[0]) + except Exception: + pass + + # Method 3: Try to get all interface IPs using netifaces-like approach + try: + import fcntl + import struct + import array + + # Get list of network interfaces + max_interfaces = 128 + buf_size = max_interfaces * 40 # sizeof(struct ifreq) on 64-bit + buf = array.array("B", b"\0" * buf_size) + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + # SIOCGIFCONF = 0x8912 + result = fcntl.ioctl( + s.fileno(), + 0x8912, + struct.pack("iL", buf_size, buf.buffer_info()[0]), + ) + out_bytes = struct.unpack("iL", result)[0] + + # Parse the buffer for interface addresses + offset = 0 + while offset < out_bytes: + # Interface name is 16 bytes, then sockaddr + iface_name = buf[offset : offset + 16].tobytes().split(b"\0")[0] + # Skip to IP address (offset 20 from start of entry) + ip_offset = offset + 20 + if ip_offset + 4 <= len(buf): + ip_bytes = buf[ip_offset : ip_offset + 4].tobytes() + ip = socket.inet_ntoa(ip_bytes) + if ip != "0.0.0.0": + ip_addresses.add(ip) + offset += 40 # Move to next interface + except Exception: + pass + + except Exception as e: + log_warn(f"Error getting local IP addresses: {e}") + + return ip_addresses + + +async def generate_certificate_with_sans( + cert_path: Path, + key_path: Path, + app_uri: str, + dns_names: List[str], + ip_addresses: List[str], + common_name: str = "OpenPLC OPC-UA Server", + organization: str = "Autonomy Logic", + country: str = "US", + state: str = "CA", + locality: str = "California", + key_size: int = 2048, + valid_days: int = 365, +) -> bool: + """ + Generate a self-signed certificate with multiple Subject Alternative Names. + + This function creates a certificate suitable for OPC-UA servers with proper + SAN extensions including multiple DNS names, IP addresses, and URIs. + + Args: + cert_path: Path where certificate will be saved (PEM format) + key_path: Path where private key will be saved (PEM format) + app_uri: Application URI for the certificate + dns_names: List of DNS names to include in SAN + ip_addresses: List of IP addresses to include in SAN + common_name: Certificate common name + organization: Organization name + country: Country code + state: State/Province + locality: City/Locality + key_size: RSA key size (default 2048) + valid_days: Certificate validity in days (default 365) + + Returns: + bool: True if certificate generated successfully + """ + try: + # Generate RSA private key + private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=key_size, + backend=default_backend(), + ) + + # Build subject name + subject = issuer = x509.Name( + [ + x509.NameAttribute(NameOID.COUNTRY_NAME, country), + x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, state), + x509.NameAttribute(NameOID.LOCALITY_NAME, locality), + x509.NameAttribute(NameOID.ORGANIZATION_NAME, organization), + x509.NameAttribute(NameOID.COMMON_NAME, common_name), + ] + ) + + # Build Subject Alternative Names + san_entries = [] + + # Add URI (required for OPC-UA) + san_entries.append(x509.UniformResourceIdentifier(app_uri)) + + # Add DNS names + for dns_name in dns_names: + if dns_name: # Skip empty strings + san_entries.append(x509.DNSName(dns_name)) + + # Add IP addresses + for ip_str in ip_addresses: + if ip_str: # Skip empty strings + try: + ip_obj = ipaddress.ip_address(ip_str) + san_entries.append(x509.IPAddress(ip_obj)) + except ValueError as e: + log_warn(f"Invalid IP address '{ip_str}' for SAN: {e}") + + # Build certificate + now = datetime.datetime.now(datetime.timezone.utc) + cert_builder = ( + x509.CertificateBuilder() + .subject_name(subject) + .issuer_name(issuer) + .public_key(private_key.public_key()) + .serial_number(x509.random_serial_number()) + .not_valid_before(now) + .not_valid_after(now + datetime.timedelta(days=valid_days)) + .add_extension( + x509.SubjectAlternativeName(san_entries), + critical=False, + ) + .add_extension( + x509.BasicConstraints(ca=False, path_length=None), + critical=True, + ) + .add_extension( + x509.KeyUsage( + digital_signature=True, + key_encipherment=True, + content_commitment=False, + data_encipherment=True, + key_agreement=False, + key_cert_sign=False, + crl_sign=False, + encipher_only=False, + decipher_only=False, + ), + critical=True, + ) + .add_extension( + x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), + critical=False, + ) + ) + + # Sign the certificate + certificate = cert_builder.sign( + private_key, hashes.SHA256(), default_backend() + ) + + # Write private key to file + key_path.parent.mkdir(parents=True, exist_ok=True) + with open(key_path, "wb") as f: + f.write( + private_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.TraditionalOpenSSL, + encryption_algorithm=serialization.NoEncryption(), + ) + ) + + # Write certificate to file + cert_path.parent.mkdir(parents=True, exist_ok=True) + with open(cert_path, "wb") as f: + f.write(certificate.public_bytes(serialization.Encoding.PEM)) + + log_info(f"Generated certificate with {len(san_entries)} SAN entries") + log_info(f" DNS names: {dns_names}") + log_info(f" IP addresses: {ip_addresses}") + log_info(f" URI: {app_uri}") + + return True + + except Exception as e: + log_error(f"Failed to generate certificate: {e}") + return False + + class OpenPLCRoleRuleset(PermissionRuleset): """ Custom permission ruleset for OpenPLC OPC-UA server. @@ -478,6 +705,10 @@ async def generate_server_certificate( """ Generate a self-signed certificate for the server with proper SAN extensions. + This method auto-detects local IP addresses and includes them in the + certificate's Subject Alternative Names (SANs) to prevent hostname + validation errors when connecting via IP address. + Args: cert_path: Path where certificate will be saved key_path: Path where private key will be saved @@ -492,7 +723,7 @@ async def generate_server_certificate( try: # Get system hostname for proper certificate validation system_hostname = socket.gethostname() - + # Extract hostname from endpoint if available endpoint_hostname = "localhost" # default if hasattr(self.config, 'endpoint') and self.config.endpoint: @@ -504,11 +735,11 @@ async def generate_server_certificate( endpoint_hostname = parsed.hostname except Exception as e: log_warn(f"Could not parse endpoint hostname: {e}") - + # Use provided app_uri or fallback to default if not app_uri: app_uri = "urn:autonomy-logic:openplc:opcua:server" - + # Collect all possible hostnames for SAN DNS entries dns_names = [] # Add system hostname @@ -520,35 +751,30 @@ async def generate_server_certificate( # Always include localhost if "localhost" not in dns_names: dns_names.append("localhost") - - # IP addresses for SAN - ip_addresses = ["127.0.0.1"] - # Add 0.0.0.0 if endpoint uses it (for bind-all scenarios) - if hasattr(self.config, 'endpoint') and "0.0.0.0" in self.config.endpoint: - ip_addresses.append("0.0.0.0") - + + # Auto-detect all local IP addresses for SAN + local_ips = get_local_ip_addresses() + ip_addresses = list(local_ips) + log_info(f"Generating certificate with DNS SANs: {dns_names}") log_info(f"Generating certificate with IP SANs: {ip_addresses}") log_info(f"Application URI: {app_uri}") - - # Use the setup_self_signed_certificate function from asyncua with supported parameters - await setup_self_signed_certificate( - key_file=Path(key_path), - cert_file=Path(cert_path), + + # Use custom certificate generation with multiple SANs + success = await generate_certificate_with_sans( + cert_path=Path(cert_path), + key_path=Path(key_path), app_uri=app_uri, - host_name=system_hostname, # Use actual system hostname - cert_use=[ExtendedKeyUsageOID.SERVER_AUTH], - subject_attrs={ - "countryName": "US", - "stateOrProvinceName": "CA", - "localityName": "California", - "organizationName": "Autonomy Logic", - "commonName": common_name - }, + dns_names=dns_names, + ip_addresses=ip_addresses, + common_name=common_name, + key_size=key_size, + valid_days=valid_days, ) - log_info(f"Server certificate generated with proper SANs: {cert_path}") - return True + if success: + log_info(f"Server certificate generated with proper SANs: {cert_path}") + return success except Exception as e: log_error(f"Failed to generate server certificate: {e}") @@ -624,20 +850,34 @@ async def _setup_server_certificates_for_asyncua(self, server, app_uri: str = No log_info(f"Generating new self-signed certificate in {cert_dir}") log_info(f"Certificate will be created for app_uri: {app_uri}") log_info(f"Certificate will be created for hostname: {hostname}") - await setup_self_signed_certificate( - key_file=key_file, - cert_file=cert_file, + + # Collect DNS names for SAN + dns_names = [hostname] + if hostname != "localhost": + dns_names.append("localhost") + + # Auto-detect all local IP addresses for SAN + local_ips = get_local_ip_addresses() + ip_addresses = list(local_ips) + + log_info(f"Certificate DNS SANs: {dns_names}") + log_info(f"Certificate IP SANs: {ip_addresses}") + + # Use custom certificate generation with multiple SANs + success = await generate_certificate_with_sans( + cert_path=cert_file, + key_path=key_file, app_uri=app_uri, - host_name=hostname, - cert_use=[ExtendedKeyUsageOID.SERVER_AUTH], - subject_attrs={} + dns_names=dns_names, + ip_addresses=ip_addresses, + common_name="OpenPLC OPC-UA Server", ) - + # Verify files were created - if not cert_file.exists() or not key_file.exists(): + if not success or not cert_file.exists() or not key_file.exists(): log_error(f"Certificate files not created: cert={cert_file.exists()}, key={key_file.exists()}") return - + log_info(f"Certificate files created successfully: {cert_file}, {key_file}") else: log_info(f"Using existing certificate files: {cert_file}, {key_file}") From 0b09098c8cc1e4233157784116d50dd5308b2ba9 Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Thu, 22 Jan 2026 15:22:14 -0300 Subject: [PATCH 2/8] fix: Use PKCS8 format for private key to fix asyncua compatibility The TraditionalOpenSSL format caused parsing errors when asyncua tried to load the private key. PKCS8 format is required by asyncua. Co-Authored-By: Claude Opus 4.5 --- core/src/drivers/plugins/python/opcua/opcua_security.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_security.py b/core/src/drivers/plugins/python/opcua/opcua_security.py index a9ceb845..682ced25 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_security.py +++ b/core/src/drivers/plugins/python/opcua/opcua_security.py @@ -243,13 +243,13 @@ async def generate_certificate_with_sans( private_key, hashes.SHA256(), default_backend() ) - # Write private key to file + # Write private key to file (PKCS8 format required by asyncua) key_path.parent.mkdir(parents=True, exist_ok=True) with open(key_path, "wb") as f: f.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, - format=serialization.PrivateFormat.TraditionalOpenSSL, + format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) ) From 7e7bd0fb70b8089da0353f091b324d2cf295cb72 Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Thu, 22 Jan 2026 15:34:36 -0300 Subject: [PATCH 3/8] fix: Convert certificate to DER format for asyncua compatibility asyncua's load_certificate was failing with PEM format. Convert both certificate and private key to DER format before loading into the server. Also improved error messages for better debugging. Co-Authored-By: Claude Opus 4.5 --- .../plugins/python/opcua/opcua_security.py | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_security.py b/core/src/drivers/plugins/python/opcua/opcua_security.py index 682ced25..28ddcf89 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_security.py +++ b/core/src/drivers/plugins/python/opcua/opcua_security.py @@ -882,36 +882,42 @@ async def _setup_server_certificates_for_asyncua(self, server, app_uri: str = No else: log_info(f"Using existing certificate files: {cert_file}, {key_file}") - # Load certificate (PEM format works) + # Load and convert certificate from PEM to DER log_info(f"Loading server certificate from: {cert_file}") with open(cert_file, 'rb') as f: - cert_data = f.read() - log_info(f"Certificate loaded: {len(cert_data)} bytes") - - # Load private key and convert PEM to DER (asyncua requires DER for keys) + cert_pem_data = f.read() + log_info(f"Certificate PEM loaded: {len(cert_pem_data)} bytes") + + # Load private key log_info(f"Loading server private key from: {key_file}") with open(key_file, 'rb') as f: - pem_key_data = f.read() - - # Convert private key from PEM to DER for asyncua compatibility + key_pem_data = f.read() + + # Convert certificate and key from PEM to DER for asyncua compatibility from cryptography.hazmat.primitives.serialization import load_pem_private_key try: - private_key = load_pem_private_key(pem_key_data, password=None) - der_key_data = private_key.private_bytes( + # Convert certificate PEM to DER + cert_obj = x509.load_pem_x509_certificate(cert_pem_data, default_backend()) + cert_der_data = cert_obj.public_bytes(serialization.Encoding.DER) + log_info(f"Certificate converted to DER: {len(cert_der_data)} bytes") + + # Convert private key PEM to DER + private_key = load_pem_private_key(key_pem_data, password=None) + key_der_data = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption() ) - log_info(f"Certificate data loaded and converted: cert={len(cert_data)} bytes, key={len(der_key_data)} bytes DER") - - # Load certificate and converted key into server - log_info(f"Loading certificate into asyncua server: {len(cert_data)} bytes") - await server.load_certificate(cert_data) # PEM cert works - log_info(f"Loading private key into asyncua server: {len(der_key_data)} bytes (DER format)") - await server.load_private_key(der_key_data) # DER key required - + log_info(f"Private key converted to DER: {len(key_der_data)} bytes") + + # Load certificate and key into server (both in DER format) + log_info(f"Loading certificate into asyncua server: {len(cert_der_data)} bytes DER") + await server.load_certificate(cert_der_data) + log_info(f"Loading private key into asyncua server: {len(key_der_data)} bytes DER") + await server.load_private_key(key_der_data) + except Exception as e: - log_error(f"Failed to convert private key from PEM to DER: {e}") + log_error(f"Failed to load certificate/key into asyncua server: {e}") raise log_info("Self-signed server certificate loaded successfully into asyncua server") From f791cc19af2bed7400d500a61b2ecd8bbe816fbb Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Thu, 22 Jan 2026 15:44:58 -0300 Subject: [PATCH 4/8] fix: Add CLIENT_AUTH to certificate Extended Key Usage for OPC-UA OPC-UA certificates require both SERVER_AUTH and CLIENT_AUTH in the Extended Key Usage extension. Missing CLIENT_AUTH caused BadCertificateUseNotAllowed errors when clients connected. Co-Authored-By: Claude Opus 4.5 --- core/src/drivers/plugins/python/opcua/opcua_security.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_security.py b/core/src/drivers/plugins/python/opcua/opcua_security.py index 28ddcf89..d3bbd3ca 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_security.py +++ b/core/src/drivers/plugins/python/opcua/opcua_security.py @@ -233,7 +233,10 @@ async def generate_certificate_with_sans( critical=True, ) .add_extension( - x509.ExtendedKeyUsage([ExtendedKeyUsageOID.SERVER_AUTH]), + x509.ExtendedKeyUsage([ + ExtendedKeyUsageOID.SERVER_AUTH, + ExtendedKeyUsageOID.CLIENT_AUTH, + ]), critical=False, ) ) From 41454b9b3f90acd151a39beb1973419bf236abbb Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Thu, 22 Jan 2026 16:15:14 -0300 Subject: [PATCH 5/8] fix: Enable nonRepudiation (content_commitment) in certificate Key Usage OPC-UA specification (OPC 10000-6 6.2.2) requires certificates to have keyUsage including: digitalSignature, nonRepudiation, keyEncipherment, and dataEncipherment. The missing nonRepudiation flag caused BadCertificateUseNotAllowed errors. Co-Authored-By: Claude Opus 4.5 --- core/src/drivers/plugins/python/opcua/opcua_security.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_security.py b/core/src/drivers/plugins/python/opcua/opcua_security.py index d3bbd3ca..d64b6142 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_security.py +++ b/core/src/drivers/plugins/python/opcua/opcua_security.py @@ -222,7 +222,7 @@ async def generate_certificate_with_sans( x509.KeyUsage( digital_signature=True, key_encipherment=True, - content_commitment=False, + content_commitment=True, # nonRepudiation - required by OPC-UA data_encipherment=True, key_agreement=False, key_cert_sign=False, From 3ced536281c84a6f86fb6b7a344e160ae8fa3737 Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Thu, 22 Jan 2026 16:51:52 -0300 Subject: [PATCH 6/8] fix: Improve IP address detection code quality - Use ipaddress.is_link_local for proper link-local address filtering instead of string prefix check (handles both IPv4 and IPv6) - Add named constants for ioctl magic numbers (_SIOCGIFCONF, _SIZEOF_IFREQ, _MAX_INTERFACES) for better code readability Co-Authored-By: Claude Opus 4.5 --- .../plugins/python/opcua/opcua_security.py | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_security.py b/core/src/drivers/plugins/python/opcua/opcua_security.py index d64b6142..6b37dbe2 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_security.py +++ b/core/src/drivers/plugins/python/opcua/opcua_security.py @@ -47,6 +47,12 @@ from opcua_logging import log_info, log_warn, log_error +# ioctl constants for network interface enumeration (Linux) +_SIOCGIFCONF = 0x8912 # ioctl request code to get interface configuration +_SIZEOF_IFREQ = 40 # sizeof(struct ifreq) on 64-bit Linux +_MAX_INTERFACES = 128 # Maximum number of network interfaces to query + + def get_local_ip_addresses() -> Set[str]: """ Get all local IP addresses of the machine. @@ -67,9 +73,13 @@ def get_local_ip_addresses() -> Set[str]: # Get all addresses associated with hostname for info in socket.getaddrinfo(hostname, None): ip = info[4][0] - # Filter out link-local and loopback for external access - if not ip.startswith("fe80:"): # Skip IPv6 link-local - ip_addresses.add(ip) + # Filter out link-local addresses using ipaddress module + try: + addr = ipaddress.ip_address(ip) + if not addr.is_link_local: + ip_addresses.add(ip) + except ValueError: + pass except socket.gaierror: pass @@ -89,15 +99,13 @@ def get_local_ip_addresses() -> Set[str]: import array # Get list of network interfaces - max_interfaces = 128 - buf_size = max_interfaces * 40 # sizeof(struct ifreq) on 64-bit + buf_size = _MAX_INTERFACES * _SIZEOF_IFREQ buf = array.array("B", b"\0" * buf_size) with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: - # SIOCGIFCONF = 0x8912 result = fcntl.ioctl( s.fileno(), - 0x8912, + _SIOCGIFCONF, struct.pack("iL", buf_size, buf.buffer_info()[0]), ) out_bytes = struct.unpack("iL", result)[0] @@ -114,7 +122,7 @@ def get_local_ip_addresses() -> Set[str]: ip = socket.inet_ntoa(ip_bytes) if ip != "0.0.0.0": ip_addresses.add(ip) - offset += 40 # Move to next interface + offset += _SIZEOF_IFREQ except Exception: pass From aa09a60928ac5dcea04a5b85545c3d1c0c896cfd Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Thu, 22 Jan 2026 17:40:17 -0300 Subject: [PATCH 7/8] fix: Extend certificate validity to 10 years and auto-regenerate expired certs - Change default certificate validity from 365 days to 3650 days (10 years) - Add _is_certificate_valid() method to check if certificate is still valid - Add _remove_certificate_files() method to remove expired certificate files - Update _ensure_server_certificates() to check validity and regenerate if expired - Update _setup_server_certificates_for_asyncua() with same validity check logic - Remove unused iface_name variable to fix ruff lint warning Co-Authored-By: Claude Opus 4.5 --- .../plugins/python/opcua/opcua_security.py | 393 ++++++++++++------ 1 file changed, 259 insertions(+), 134 deletions(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_security.py b/core/src/drivers/plugins/python/opcua/opcua_security.py index 6b37dbe2..cbfbacf3 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_security.py +++ b/core/src/drivers/plugins/python/opcua/opcua_security.py @@ -8,32 +8,34 @@ - Client trust list management """ +import datetime +import hashlib +import ipaddress import os -import sys -import ssl +import shutil import socket -import hashlib -import asyncio +import ssl +import sys import tempfile -import shutil -import ipaddress -import datetime from pathlib import Path -from typing import Optional, Tuple, List, Set +from typing import List, Optional, Set, Tuple from urllib.parse import urlparse -from asyncua.crypto import uacrypto -from asyncua.crypto.cert_gen import setup_self_signed_certificate -from asyncua.crypto.security_policies import SecurityPolicyBasic256Sha256, SecurityPolicyAes128Sha256RsaOaep, SecurityPolicyAes256Sha256RsaPss + +from asyncua import ua +from asyncua.crypto.permission_rules import PermissionRuleset +from asyncua.crypto.security_policies import ( + SecurityPolicyAes128Sha256RsaOaep, + SecurityPolicyAes256Sha256RsaPss, + SecurityPolicyBasic256Sha256, +) from asyncua.crypto.truststore import TrustStore from asyncua.crypto.validator import CertificateValidator -from asyncua.crypto.permission_rules import SimpleRoleRuleset, PermissionRuleset from asyncua.server.user_managers import UserRole -from asyncua import ua -from cryptography.x509.oid import ExtensionOID, ExtendedKeyUsageOID, NameOID from cryptography import x509 from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization, hashes +from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID # Add directories to path for module access _current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -42,9 +44,9 @@ # Import logging (handle both package and direct loading) try: - from .opcua_logging import log_info, log_warn, log_error + from .opcua_logging import log_error, log_info, log_warn except ImportError: - from opcua_logging import log_info, log_warn, log_error + from opcua_logging import log_error, log_info, log_warn # ioctl constants for network interface enumeration (Linux) @@ -94,9 +96,9 @@ def get_local_ip_addresses() -> Set[str]: # Method 3: Try to get all interface IPs using netifaces-like approach try: + import array import fcntl import struct - import array # Get list of network interfaces buf_size = _MAX_INTERFACES * _SIZEOF_IFREQ @@ -113,8 +115,7 @@ def get_local_ip_addresses() -> Set[str]: # Parse the buffer for interface addresses offset = 0 while offset < out_bytes: - # Interface name is 16 bytes, then sockaddr - iface_name = buf[offset : offset + 16].tobytes().split(b"\0")[0] + # Interface name is 16 bytes, then sockaddr (unused, skip it) # Skip to IP address (offset 20 from start of entry) ip_offset = offset + 20 if ip_offset + 4 <= len(buf): @@ -144,7 +145,7 @@ async def generate_certificate_with_sans( state: str = "CA", locality: str = "California", key_size: int = 2048, - valid_days: int = 365, + valid_days: int = 3650, ) -> bool: """ Generate a self-signed certificate with multiple Subject Alternative Names. @@ -164,7 +165,7 @@ async def generate_certificate_with_sans( state: State/Province locality: City/Locality key_size: RSA key size (default 2048) - valid_days: Certificate validity in days (default 365) + valid_days: Certificate validity in days (default 3650 = 10 years) Returns: bool: True if certificate generated successfully @@ -241,18 +242,18 @@ async def generate_certificate_with_sans( critical=True, ) .add_extension( - x509.ExtendedKeyUsage([ - ExtendedKeyUsageOID.SERVER_AUTH, - ExtendedKeyUsageOID.CLIENT_AUTH, - ]), + x509.ExtendedKeyUsage( + [ + ExtendedKeyUsageOID.SERVER_AUTH, + ExtendedKeyUsageOID.CLIENT_AUTH, + ] + ), critical=False, ) ) # Sign the certificate - certificate = cert_builder.sign( - private_key, hashes.SHA256(), default_backend() - ) + certificate = cert_builder.sign(private_key, hashes.SHA256(), default_backend()) # Write private key to file (PKCS8 format required by asyncua) key_path.parent.mkdir(parents=True, exist_ok=True) @@ -368,14 +369,14 @@ class OpcuaSecurityManager: "None": None, "Basic256Sha256": SecurityPolicyBasic256Sha256, "Aes128_Sha256_RsaOaep": SecurityPolicyAes128Sha256RsaOaep, - "Aes256_Sha256_RsaPss": SecurityPolicyAes256Sha256RsaPss + "Aes256_Sha256_RsaPss": SecurityPolicyAes256Sha256RsaPss, } # Mapping from config strings to opcua-asyncio message security modes SECURITY_MODE_MAPPING = { "None": 1, # MessageSecurityMode.None "Sign": 2, # MessageSecurityMode.Sign - "SignAndEncrypt": 3 # MessageSecurityMode.SignAndEncrypt + "SignAndEncrypt": 3, # MessageSecurityMode.SignAndEncrypt } # Mapping from (policy, mode) to SecurityPolicyType for asyncua Server @@ -388,9 +389,15 @@ class OpcuaSecurityManager: ("Basic128Rsa15", "Sign"): ua.SecurityPolicyType.Basic128Rsa15_Sign, ("Basic128Rsa15", "SignAndEncrypt"): ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt, ("Aes128_Sha256_RsaOaep", "Sign"): ua.SecurityPolicyType.Aes128Sha256RsaOaep_Sign, - ("Aes128_Sha256_RsaOaep", "SignAndEncrypt"): ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt, + ( + "Aes128_Sha256_RsaOaep", + "SignAndEncrypt", + ): ua.SecurityPolicyType.Aes128Sha256RsaOaep_SignAndEncrypt, ("Aes256_Sha256_RsaPss", "Sign"): ua.SecurityPolicyType.Aes256Sha256RsaPss_Sign, - ("Aes256_Sha256_RsaPss", "SignAndEncrypt"): ua.SecurityPolicyType.Aes256Sha256RsaPss_SignAndEncrypt, + ( + "Aes256_Sha256_RsaPss", + "SignAndEncrypt", + ): ua.SecurityPolicyType.Aes256Sha256RsaPss_SignAndEncrypt, } CERTS_DIR = "certs" @@ -445,16 +452,84 @@ async def initialize_security(self) -> bool: if not self._load_trusted_certificates(): return False - log_info(f"Security initialized: policy={self.config.security_policy}, mode={self.config.security_mode}") + log_info( + f"Security initialized: policy={self.config.security_policy}, mode={self.config.security_mode}" + ) return True except Exception as e: log_error(f"Failed to initialize security: {e}") return False + def _is_certificate_valid(self, cert_path: str) -> bool: + """ + Check if a certificate file exists and is still valid (not expired). + + Args: + cert_path: Path to the certificate file + + Returns: + bool: True if certificate exists and is valid, False otherwise + """ + if not os.path.exists(cert_path): + return False + + try: + with open(cert_path, "rb") as f: + cert_data = f.read() + + cert = x509.load_pem_x509_certificate(cert_data, default_backend()) + + # Use timezone-aware datetime for comparison + now_utc = datetime.datetime.now(datetime.timezone.utc) + + # Get certificate validity dates (prefer UTC versions if available) + not_valid_after = getattr(cert, "not_valid_after_utc", None) + if not_valid_after is None: + not_valid_after = cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) + + not_valid_before = getattr(cert, "not_valid_before_utc", None) + if not_valid_before is None: + not_valid_before = cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) + + # Check if certificate is not yet valid + if not_valid_before > now_utc: + log_warn(f"Certificate {cert_path} is not yet valid") + return False + + # Check if certificate has expired + if not_valid_after < now_utc: + log_warn(f"Certificate {cert_path} has expired") + return False + + # Certificate is valid + days_until_expiry = (not_valid_after - now_utc).days + log_info(f"Certificate {cert_path} is valid for {days_until_expiry} more days") + return True + + except Exception as e: + log_warn(f"Failed to validate certificate {cert_path}: {e}") + return False + + def _remove_certificate_files(self, cert_path: str, key_path: str) -> None: + """ + Remove existing certificate and key files. + + Args: + cert_path: Path to the certificate file + key_path: Path to the private key file + """ + for file_path in [cert_path, key_path]: + if os.path.exists(file_path): + try: + os.remove(file_path) + log_info(f"Removed expired certificate file: {file_path}") + except Exception as e: + log_warn(f"Failed to remove file {file_path}: {e}") + async def _ensure_server_certificates(self) -> bool: """ - Ensure server certificates exist, generate if missing. + Ensure server certificates exist and are valid, generate if missing or expired. Returns: bool: True if certificates are available @@ -466,9 +541,15 @@ async def _ensure_server_certificates(self) -> bool: cert_path = os.path.join(self.certs_dir, self.SERVER_CERT_FILE) key_path = os.path.join(self.certs_dir, self.SERVER_KEY_FILE) - # Check if certificates already exist + # Check if certificates already exist and are valid if os.path.exists(cert_path) and os.path.exists(key_path): - log_info(f"Found existing server certificates in {self.certs_dir}") + if self._is_certificate_valid(cert_path): + log_info(f"Found valid server certificates in {self.certs_dir}") + else: + log_info("Server certificate is expired or invalid, regenerating") + self._remove_certificate_files(cert_path, key_path) + if not await self.generate_server_certificate(cert_path, key_path): + return False else: log_info(f"Server certificates not found, generating new ones in {self.certs_dir}") if not await self.generate_server_certificate(cert_path, key_path): @@ -490,11 +571,11 @@ def _load_certificates(self, cert_path: str, key_path: str) -> bool: """ try: # Load certificate - with open(cert_path, 'rb') as cert_file: + with open(cert_path, "rb") as cert_file: self.certificate_data = cert_file.read() # Load private key - with open(key_path, 'rb') as key_file: + with open(key_path, "rb") as key_file: self.private_key_data = key_file.read() # Validate certificate format (basic check) @@ -520,25 +601,26 @@ def _validate_certificate_format(self) -> bool: """ try: # Try to load certificate with ssl module for basic validation - ssl.PEM_cert_to_DER_cert(self.certificate_data.decode('utf-8')) - + ssl.PEM_cert_to_DER_cert(self.certificate_data.decode("utf-8")) + # Enhanced validation using cryptography library try: + import datetime + from cryptography import x509 from cryptography.hazmat.backends import default_backend - import datetime - + cert = x509.load_pem_x509_certificate(self.certificate_data, default_backend()) # Use timezone-aware datetime for comparison now_utc = datetime.datetime.now(datetime.timezone.utc) # Get certificate validity dates (prefer UTC versions if available) - not_valid_after = getattr(cert, 'not_valid_after_utc', None) + not_valid_after = getattr(cert, "not_valid_after_utc", None) if not_valid_after is None: not_valid_after = cert.not_valid_after.replace(tzinfo=datetime.timezone.utc) - not_valid_before = getattr(cert, 'not_valid_before_utc', None) + not_valid_before = getattr(cert, "not_valid_before_utc", None) if not_valid_before is None: not_valid_before = cert.not_valid_before.replace(tzinfo=datetime.timezone.utc) @@ -556,51 +638,67 @@ def _validate_certificate_format(self) -> bool: days_until_expiry = (not_valid_after - now_utc).days if days_until_expiry < 30: log_warn(f"Certificate expires in {days_until_expiry} days") - + # Check for Subject Alternative Name extension try: - san_ext = cert.extensions.get_extension_for_oid(x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME) + san_ext = cert.extensions.get_extension_for_oid( + x509.ExtensionOID.SUBJECT_ALTERNATIVE_NAME + ) san_names = san_ext.value - + # Log SAN entries for debugging dns_names = [name.value for name in san_names if isinstance(name, x509.DNSName)] - ip_addresses = [name.value.compressed for name in san_names if isinstance(name, x509.IPAddress)] - uris = [name.value for name in san_names if isinstance(name, x509.UniformResourceIdentifier)] - + ip_addresses = [ + name.value.compressed + for name in san_names + if isinstance(name, x509.IPAddress) + ] + uris = [ + name.value + for name in san_names + if isinstance(name, x509.UniformResourceIdentifier) + ] + log_info(f"Certificate SAN DNS names: {dns_names}") log_info(f"Certificate SAN IP addresses: {ip_addresses}") log_info(f"Certificate SAN URIs: {uris}") - + # Check if we have expected entries system_hostname = socket.gethostname() if system_hostname not in dns_names and system_hostname != "localhost": - log_warn(f"System hostname '{system_hostname}' not found in certificate DNS SANs") - + log_warn( + f"System hostname '{system_hostname}' not found in certificate DNS SANs" + ) + # Check for application URI expected_uri = "urn:autonomy-logic:openplc:opcua:server" if expected_uri not in uris: - log_warn(f"Expected application URI '{expected_uri}' not found in certificate") - + log_warn( + f"Expected application URI '{expected_uri}' not found in certificate" + ) + except x509.ExtensionNotFound: log_warn("Certificate missing Subject Alternative Name extension") - + # Check key usage extensions try: - key_usage = cert.extensions.get_extension_for_oid(x509.ExtensionOID.KEY_USAGE).value + key_usage = cert.extensions.get_extension_for_oid( + x509.ExtensionOID.KEY_USAGE + ).value if not key_usage.digital_signature: log_warn("Certificate lacks digital signature key usage") if not key_usage.key_encipherment: log_warn("Certificate lacks key encipherment usage") except x509.ExtensionNotFound: log_warn("Certificate missing key usage extension") - + log_info("Certificate format and extensions validated") return True - + except ImportError: log_warn("cryptography library not available for enhanced validation") return True # Fall back to basic validation - + except Exception: try: # Try as DER format @@ -633,16 +731,14 @@ def _load_trusted_certificates(self) -> bool: cert_der = ssl.PEM_cert_to_DER_cert(cert_pem) cert_hash = hashlib.sha256(cert_der).hexdigest()[:16] # Short hash for logging - self.trusted_certificates.append({ - 'pem': cert_pem, - 'der': cert_der, - 'hash': cert_hash - }) + self.trusted_certificates.append( + {"pem": cert_pem, "der": cert_der, "hash": cert_hash} + ) - log_info(f"Loaded trusted certificate {i+1} (SHA256: {cert_hash})") + log_info(f"Loaded trusted certificate {i + 1} (SHA256: {cert_hash})") except Exception as e: - log_error(f"Invalid trusted certificate {i+1}: {e}") + log_error(f"Invalid trusted certificate {i + 1}: {e}") return False log_info(f"Loaded {len(self.trusted_certificates)} trusted client certificates") @@ -679,7 +775,7 @@ def validate_client_certificate(self, client_cert_pem: str) -> bool: # Check if client certificate matches any trusted certificate for trusted_cert in self.trusted_certificates: - if trusted_cert['der'] == client_cert_der: + if trusted_cert["der"] == client_cert_der: log_info(f"Client certificate trusted (SHA256: {client_hash})") return True @@ -690,7 +786,9 @@ def validate_client_certificate(self, client_cert_pem: str) -> bool: log_error(f"Error validating client certificate: {e}") return False - def get_security_settings(self) -> Tuple[Optional[object], int, Optional[bytes], Optional[bytes]]: + def get_security_settings( + self, + ) -> Tuple[Optional[object], int, Optional[bytes], Optional[bytes]]: """ Get security settings for opcua-asyncio server. @@ -701,7 +799,7 @@ def get_security_settings(self) -> Tuple[Optional[object], int, Optional[bytes], self.security_policy, self.security_mode, self.certificate_data, - self.private_key_data + self.private_key_data, ) async def generate_server_certificate( @@ -710,8 +808,8 @@ async def generate_server_certificate( key_path: str, common_name: str = "OpenPLC OPC-UA Server", key_size: int = 2048, - valid_days: int = 365, - app_uri: str = None + valid_days: int = 3650, + app_uri: str = None, ) -> bool: """ Generate a self-signed certificate for the server with proper SAN extensions. @@ -737,7 +835,7 @@ async def generate_server_certificate( # Extract hostname from endpoint if available endpoint_hostname = "localhost" # default - if hasattr(self.config, 'endpoint') and self.config.endpoint: + if hasattr(self.config, "endpoint") and self.config.endpoint: try: # Convert opc.tcp:// to http:// for parsing endpoint_url = self.config.endpoint.replace("opc.tcp://", "http://") @@ -793,7 +891,7 @@ async def generate_server_certificate( async def setup_server_security(self, server, security_profiles, app_uri: str = None) -> None: """Setup security policies and certificates for asyncua Server. - + Args: server: asyncua Server instance security_profiles: List of security profiles from config @@ -801,63 +899,82 @@ async def setup_server_security(self, server, security_profiles, app_uri: str = """ # Setup security policies security_policies = [] - + for profile in security_profiles: if not profile.enabled: continue - + policy_key = (profile.security_policy, profile.security_mode) policy_type = self.POLICY_TYPE_MAPPING.get(policy_key) - + if policy_type is not None: security_policies.append(policy_type) - log_info(f"Added security profile '{profile.name}': {profile.security_policy}/{profile.security_mode} -> {policy_type}") + log_info( + f"Added security profile '{profile.name}': {profile.security_policy}/{profile.security_mode} -> {policy_type}" + ) else: - log_warn(f"Unsupported security policy/mode combination '{profile.security_policy}/{profile.security_mode}' for profile '{profile.name}', skipping") - + log_warn( + f"Unsupported security policy/mode combination '{profile.security_policy}/{profile.security_mode}' for profile '{profile.name}', skipping" + ) + # Create custom permission ruleset that allows ModifySubscription for users permission_ruleset = OpenPLCRoleRuleset() if security_policies: - log_info(f"=== SECURITY MANAGER DEBUG ===") + log_info("=== SECURITY MANAGER DEBUG ===") log_info(f"Setting {len(security_policies)} security policies: {security_policies}") server.set_security_policy(security_policies, permission_ruleset=permission_ruleset) - log_info(f"Security policies applied to server successfully") - log_info(f"Using OpenPLCRoleRuleset for subscription permission support") - log_info(f"=== END SECURITY MANAGER DEBUG ===") + log_info("Security policies applied to server successfully") + log_info("Using OpenPLCRoleRuleset for subscription permission support") + log_info("=== END SECURITY MANAGER DEBUG ===") else: # Default to no security if no profiles enabled log_warn("No security profiles enabled, defaulting to NoSecurity") - server.set_security_policy([ua.SecurityPolicyType.NoSecurity], permission_ruleset=permission_ruleset) - + server.set_security_policy( + [ua.SecurityPolicyType.NoSecurity], permission_ruleset=permission_ruleset + ) + # Setup server certificates if needed log_info("=== CERTIFICATE SETUP DEBUG ===") await self._setup_server_certificates_for_asyncua(server, app_uri) log_info("=== END CERTIFICATE SETUP DEBUG ===") - + async def _setup_server_certificates_for_asyncua(self, server, app_uri: str = None) -> None: """Setup server certificates for asyncua Server. - + Args: server: asyncua Server instance app_uri: Application URI for the certificate (from config) """ - if hasattr(self.config, 'security') and self.config.security.server_certificate_strategy == "auto_self_signed": + if ( + hasattr(self.config, "security") + and self.config.security.server_certificate_strategy == "auto_self_signed" + ): # Generate self-signed certificate in persistent directory cert_dir = Path(self.plugin_dir) / "certs" cert_dir.mkdir(parents=True, exist_ok=True) - + key_file = cert_dir / "server_key.pem" cert_file = cert_dir / "server_cert.pem" - + hostname = socket.gethostname() # Use provided app_uri or fallback to config value if not app_uri: - app_uri = getattr(self.config.server, 'application_uri', - 'urn:autonomy-logic:openplc:opcua:server') - - # Only generate if files don't exist + app_uri = getattr( + self.config.server, "application_uri", "urn:autonomy-logic:openplc:opcua:server" + ) + + # Check if we need to generate new certificates + need_generation = False if not cert_file.exists() or not key_file.exists(): + log_info("Certificate files not found, will generate new ones") + need_generation = True + elif not self._is_certificate_valid(str(cert_file)): + log_info("Certificate is expired or invalid, will regenerate") + self._remove_certificate_files(str(cert_file), str(key_file)) + need_generation = True + + if need_generation: log_info(f"Generating new self-signed certificate in {cert_dir}") log_info(f"Certificate will be created for app_uri: {app_uri}") log_info(f"Certificate will be created for hostname: {hostname}") @@ -886,26 +1003,31 @@ async def _setup_server_certificates_for_asyncua(self, server, app_uri: str = No # Verify files were created if not success or not cert_file.exists() or not key_file.exists(): - log_error(f"Certificate files not created: cert={cert_file.exists()}, key={key_file.exists()}") + log_error( + f"Certificate files not created: cert={cert_file.exists()}, key={key_file.exists()}" + ) return log_info(f"Certificate files created successfully: {cert_file}, {key_file}") else: - log_info(f"Using existing certificate files: {cert_file}, {key_file}") - + log_info(f"Using existing valid certificate files: {cert_file}, {key_file}") + # Load and convert certificate from PEM to DER log_info(f"Loading server certificate from: {cert_file}") - with open(cert_file, 'rb') as f: + with open(cert_file, "rb") as f: cert_pem_data = f.read() log_info(f"Certificate PEM loaded: {len(cert_pem_data)} bytes") # Load private key log_info(f"Loading server private key from: {key_file}") - with open(key_file, 'rb') as f: + with open(key_file, "rb") as f: key_pem_data = f.read() # Convert certificate and key from PEM to DER for asyncua compatibility - from cryptography.hazmat.primitives.serialization import load_pem_private_key + from cryptography.hazmat.primitives.serialization import ( + load_pem_private_key, + ) + try: # Convert certificate PEM to DER cert_obj = x509.load_pem_x509_certificate(cert_pem_data, default_backend()) @@ -917,7 +1039,7 @@ async def _setup_server_certificates_for_asyncua(self, server, app_uri: str = No key_der_data = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() + encryption_algorithm=serialization.NoEncryption(), ) log_info(f"Private key converted to DER: {len(key_der_data)} bytes") @@ -930,77 +1052,80 @@ async def _setup_server_certificates_for_asyncua(self, server, app_uri: str = No except Exception as e: log_error(f"Failed to load certificate/key into asyncua server: {e}") raise - + log_info("Self-signed server certificate loaded successfully into asyncua server") - - elif hasattr(self.config, 'security') and self.config.security.server_certificate_custom: + + elif hasattr(self.config, "security") and self.config.security.server_certificate_custom: cert_path = self.config.security.server_certificate_custom key_path = self.config.security.server_private_key_custom if cert_path and key_path: try: # Carregar certificado - with open(cert_path, 'rb') as f: + with open(cert_path, "rb") as f: cert_data = f.read() - + # Carregar e converter chave privada de PEM para DER - with open(key_path, 'rb') as f: + with open(key_path, "rb") as f: pem_key_data = f.read() - - from cryptography.hazmat.primitives.serialization import load_pem_private_key + + from cryptography.hazmat.primitives.serialization import ( + load_pem_private_key, + ) + private_key = load_pem_private_key(pem_key_data, password=None) der_key_data = private_key.private_bytes( encoding=serialization.Encoding.DER, format=serialization.PrivateFormat.PKCS8, - encryption_algorithm=serialization.NoEncryption() + encryption_algorithm=serialization.NoEncryption(), ) - + await server.load_certificate(cert_data) await server.load_private_key(der_key_data) log_info("Custom server certificate loaded (PEM cert + DER key)") except Exception as e: log_error(f"Failed to load custom certificate: {e}") - + elif self.certificate_data and self.private_key_data: await server.load_certificate(self.certificate_data) await server.load_private_key(self.private_key_data) log_info("SecurityManager certificates loaded into server") - + async def create_trust_store(self, trusted_certificates: List[str]) -> Optional[TrustStore]: """Create and configure TrustStore with trusted client certificates. - + Args: trusted_certificates: List of PEM certificate strings - + Returns: TrustStore instance or None if failed """ if not trusted_certificates: return None - + try: # Create temporary directory for certificate files temp_dir = tempfile.mkdtemp(prefix="opcua_trust_") self._trust_store_temp_dir = temp_dir # Store for cleanup cert_files = [] - + for i, cert_pem in enumerate(trusted_certificates): try: # Load and validate certificate using cryptography cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend()) - + # Convert to DER format and save to temporary file cert_der = cert.public_bytes(encoding=serialization.Encoding.DER) - + cert_file = os.path.join(temp_dir, f"trusted_cert_{i}.der") - with open(cert_file, 'wb') as f: + with open(cert_file, "wb") as f: f.write(cert_der) - + cert_files.append(cert_file) - log_info(f"Added trusted certificate {i+1} to trust store") - + log_info(f"Added trusted certificate {i + 1} to trust store") + except Exception as e: - log_warn(f"Failed to process trusted certificate {i+1}: {e}") - + log_warn(f"Failed to process trusted certificate {i + 1}: {e}") + if cert_files: # Create TrustStore with certificate files trust_store = TrustStore(cert_files, []) @@ -1010,7 +1135,7 @@ async def create_trust_store(self, trusted_certificates: List[str]) -> Optional[ else: log_warn("No valid trusted certificates processed") return None - + except Exception as e: log_error(f"Failed to create TrustStore: {e}") return None @@ -1030,14 +1155,14 @@ def cleanup(self) -> None: async def setup_certificate_validation(self, server, trusted_certificates) -> None: """Setup certificate validation for asyncua Server. - + Args: server: asyncua Server instance trusted_certificates: List of certificate dictionaries with 'id' and 'pem' keys """ if not trusted_certificates: return - + try: # Handle both List[str] and List[Dict[str, str]] formats cert_pems = [] @@ -1047,19 +1172,19 @@ async def setup_certificate_validation(self, server, trusted_certificates) -> No else: # Already a list of PEM strings cert_pems = trusted_certificates - + # Create trust store trust_store = await self.create_trust_store(cert_pems) if not trust_store: log_error("Could not create trust store") return - + # Create certificate validator cert_validator = CertificateValidator(trust_store=trust_store) - + # Set validator on server server.set_certificate_validator(cert_validator) log_info("Certificate validation configured") - + except Exception as e: log_error(f"Failed to setup certificate validation: {e}") From 47b0de98f49ccbfbf297b17a6604e8b755b94704 Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Thu, 22 Jan 2026 18:31:29 -0300 Subject: [PATCH 8/8] fix: Address Copilot PR review comments for certificate generation - Remove async from generate_certificate_with_sans (no await operations) - Remove deprecated default_backend() from all cryptography calls - Remove redundant imports in _validate_certificate_format - Add restricted permissions (0o600) for private key files - Document 10-year validity rationale in docstring Co-Authored-By: Claude Opus 4.5 --- .../plugins/python/opcua/opcua_security.py | 32 +++++++++---------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_security.py b/core/src/drivers/plugins/python/opcua/opcua_security.py index cbfbacf3..f92fa73a 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_security.py +++ b/core/src/drivers/plugins/python/opcua/opcua_security.py @@ -32,7 +32,6 @@ from asyncua.crypto.validator import CertificateValidator from asyncua.server.user_managers import UserRole from cryptography import x509 -from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.x509.oid import ExtendedKeyUsageOID, NameOID @@ -133,7 +132,7 @@ def get_local_ip_addresses() -> Set[str]: return ip_addresses -async def generate_certificate_with_sans( +def generate_certificate_with_sans( cert_path: Path, key_path: Path, app_uri: str, @@ -153,6 +152,10 @@ async def generate_certificate_with_sans( This function creates a certificate suitable for OPC-UA servers with proper SAN extensions including multiple DNS names, IP addresses, and URIs. + The default validity period is 10 years (3650 days) to minimize certificate + renewal overhead in industrial/embedded environments where PLCs may run + for extended periods without maintenance. + Args: cert_path: Path where certificate will be saved (PEM format) key_path: Path where private key will be saved (PEM format) @@ -175,7 +178,6 @@ async def generate_certificate_with_sans( private_key = rsa.generate_private_key( public_exponent=65537, key_size=key_size, - backend=default_backend(), ) # Build subject name @@ -253,11 +255,12 @@ async def generate_certificate_with_sans( ) # Sign the certificate - certificate = cert_builder.sign(private_key, hashes.SHA256(), default_backend()) + certificate = cert_builder.sign(private_key, hashes.SHA256()) - # Write private key to file (PKCS8 format required by asyncua) + # Write private key to file with restricted permissions (PKCS8 format required by asyncua) key_path.parent.mkdir(parents=True, exist_ok=True) - with open(key_path, "wb") as f: + fd = os.open(str(key_path), os.O_WRONLY | os.O_CREAT | os.O_TRUNC, 0o600) + with os.fdopen(fd, "wb") as f: f.write( private_key.private_bytes( encoding=serialization.Encoding.PEM, @@ -478,7 +481,7 @@ def _is_certificate_valid(self, cert_path: str) -> bool: with open(cert_path, "rb") as f: cert_data = f.read() - cert = x509.load_pem_x509_certificate(cert_data, default_backend()) + cert = x509.load_pem_x509_certificate(cert_data) # Use timezone-aware datetime for comparison now_utc = datetime.datetime.now(datetime.timezone.utc) @@ -605,12 +608,7 @@ def _validate_certificate_format(self) -> bool: # Enhanced validation using cryptography library try: - import datetime - - from cryptography import x509 - from cryptography.hazmat.backends import default_backend - - cert = x509.load_pem_x509_certificate(self.certificate_data, default_backend()) + cert = x509.load_pem_x509_certificate(self.certificate_data) # Use timezone-aware datetime for comparison now_utc = datetime.datetime.now(datetime.timezone.utc) @@ -870,7 +868,7 @@ async def generate_server_certificate( log_info(f"Application URI: {app_uri}") # Use custom certificate generation with multiple SANs - success = await generate_certificate_with_sans( + success = generate_certificate_with_sans( cert_path=Path(cert_path), key_path=Path(key_path), app_uri=app_uri, @@ -992,7 +990,7 @@ async def _setup_server_certificates_for_asyncua(self, server, app_uri: str = No log_info(f"Certificate IP SANs: {ip_addresses}") # Use custom certificate generation with multiple SANs - success = await generate_certificate_with_sans( + success = generate_certificate_with_sans( cert_path=cert_file, key_path=key_file, app_uri=app_uri, @@ -1030,7 +1028,7 @@ async def _setup_server_certificates_for_asyncua(self, server, app_uri: str = No try: # Convert certificate PEM to DER - cert_obj = x509.load_pem_x509_certificate(cert_pem_data, default_backend()) + cert_obj = x509.load_pem_x509_certificate(cert_pem_data) cert_der_data = cert_obj.public_bytes(serialization.Encoding.DER) log_info(f"Certificate converted to DER: {len(cert_der_data)} bytes") @@ -1111,7 +1109,7 @@ async def create_trust_store(self, trusted_certificates: List[str]) -> Optional[ for i, cert_pem in enumerate(trusted_certificates): try: # Load and validate certificate using cryptography - cert = x509.load_pem_x509_certificate(cert_pem.encode(), default_backend()) + cert = x509.load_pem_x509_certificate(cert_pem.encode()) # Convert to DER format and save to temporary file cert_der = cert.public_bytes(encoding=serialization.Encoding.DER)