From 5db18463d359f19a497a13d2ec5fe871dfb9074e Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Mon, 19 Jan 2026 10:08:17 -0300 Subject: [PATCH 1/5] feat: Add TIME type support for OPC-UA plugin Add support for IEC 61131-3 TIME, DATE, TOD, and DT types in the OPC-UA plugin: - Add IEC_TIMESPEC ctypes structure matching C definition (tv_sec, tv_nsec) - Implement TIME type mapping to OPC-UA Int64 (milliseconds) - Implement DATE/DT type mapping to OPC-UA DateTime - Add timespec_to_milliseconds and milliseconds_to_timespec conversion functions - Update convert_value_for_opcua/plc functions to handle TIME types - Add read_timespec_direct and write_timespec_direct memory access functions - Update synchronization to pass datatype hint for TIME handling - Add datatype validation in config model with VALID_DATATYPES constant - Add TIME variable examples to config template TIME values are represented as Int64 milliseconds in OPC-UA, which provides good compatibility with standard OPC-UA clients while maintaining reasonable precision for PLC applications. Co-Authored-By: Claude Opus 4.5 --- .../python/opcua/opcua_config_template.json | 30 + .../plugins/python/opcua/opcua_memory.py | 74 ++- .../plugins/python/opcua/opcua_utils.py | 114 +++- .../plugins/python/opcua/synchronization.py | 92 ++- .../opcua_config_model.py | 33 ++ docs/plans/TIME_TYPE_SUPPORT_PLAN.md | 533 ++++++++++++++++++ 6 files changed, 856 insertions(+), 20 deletions(-) create mode 100644 docs/plans/TIME_TYPE_SUPPORT_PLAN.md diff --git a/core/src/drivers/plugins/python/opcua/opcua_config_template.json b/core/src/drivers/plugins/python/opcua/opcua_config_template.json index a3b2b5fb..b9e31e5b 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_config_template.json +++ b/core/src/drivers/plugins/python/opcua/opcua_config_template.json @@ -139,6 +139,36 @@ "description": "Example read-only variable", "index": 5, "permissions": {"viewer": "r", "operator": "r", "engineer": "r"} + }, + { + "node_id": "PLC.Example.cycle_time", + "browse_name": "cycle_time", + "display_name": "Cycle Time", + "datatype": "TIME", + "initial_value": 0, + "description": "PLC scan cycle time (TIME type, represented as milliseconds in OPC-UA)", + "index": 6, + "permissions": {"viewer": "r", "operator": "r", "engineer": "rw"} + }, + { + "node_id": "PLC.Example.timer_preset", + "browse_name": "timer_preset", + "display_name": "Timer Preset", + "datatype": "TIME", + "initial_value": 0, + "description": "Timer preset value (TIME type)", + "index": 7, + "permissions": {"viewer": "r", "operator": "rw", "engineer": "rw"} + }, + { + "node_id": "PLC.Example.time_of_day", + "browse_name": "time_of_day", + "display_name": "Time of Day", + "datatype": "TOD", + "initial_value": 0, + "description": "Current time of day (TOD type, milliseconds since midnight)", + "index": 8, + "permissions": {"viewer": "r", "operator": "r", "engineer": "rw"} } ], "structures": [ diff --git a/core/src/drivers/plugins/python/opcua/opcua_memory.py b/core/src/drivers/plugins/python/opcua/opcua_memory.py index 25b1fa8d..c1539e55 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_memory.py +++ b/core/src/drivers/plugins/python/opcua/opcua_memory.py @@ -24,6 +24,30 @@ STR_LEN_SIZE = 1 # sizeof(__strlen_t) = sizeof(int8_t) = 1 STRING_TOTAL_SIZE = STR_LEN_SIZE + STR_MAX_LEN # 127 bytes +# IEC 61131-3 TIME/DATE constants (must match iec_types.h) +TIMESPEC_SIZE = 8 # sizeof(IEC_TIMESPEC) = 2 * sizeof(int32_t) = 8 bytes + +# TIME-related datatypes that use IEC_TIMESPEC structure +TIME_DATATYPES = frozenset(["TIME", "DATE", "TOD", "DT"]) + + +class IEC_TIMESPEC(ctypes.Structure): + """ + ctypes structure matching IEC_TIMESPEC from iec_types.h. + + typedef struct { + int32_t tv_sec; // Seconds + int32_t tv_nsec; // Nanoseconds + } IEC_TIMESPEC; + + Used for TIME, DATE, TOD, and DT types. + """ + + _fields_ = [ + ("tv_sec", ctypes.c_int32), + ("tv_nsec", ctypes.c_int32), + ] + class IEC_STRING(ctypes.Structure): """ @@ -40,16 +64,20 @@ class IEC_STRING(ctypes.Structure): ] -def read_memory_direct(address: int, size: int) -> Any: +def read_memory_direct(address: int, size: int, datatype: str = None) -> Any: """ Read value directly from memory using cached address. Args: address: Memory address to read from size: Size of the variable in bytes + datatype: Optional datatype hint for ambiguous sizes (e.g., TIME vs LINT) Returns: - Value read from memory (int for numeric types, str for STRING) + Value read from memory: + - int for numeric types + - str for STRING + - tuple(tv_sec, tv_nsec) for TIME/DATE/TOD/DT Raises: RuntimeError: If memory access fails @@ -66,6 +94,9 @@ def read_memory_direct(address: int, size: int) -> Any: ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint32)) return ptr.contents.value elif size == 8: + # Check if this is a TIME-related type + if datatype and datatype.upper() in TIME_DATATYPES: + return read_timespec_direct(address) ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint64)) return ptr.contents.value elif size == STRING_TOTAL_SIZE: @@ -141,6 +172,45 @@ def write_string_direct(address: int, value: str) -> bool: raise RuntimeError(f"String memory write error: {e}") +def read_timespec_direct(address: int) -> tuple: + """ + Read an IEC_TIMESPEC directly from memory. + + Args: + address: Memory address of the IEC_TIMESPEC structure + + Returns: + Tuple of (tv_sec, tv_nsec) + """ + try: + ptr = ctypes.cast(address, ctypes.POINTER(IEC_TIMESPEC)) + timespec = ptr.contents + return (timespec.tv_sec, timespec.tv_nsec) + except Exception as e: + raise RuntimeError(f"Timespec memory access error: {e}") + + +def write_timespec_direct(address: int, tv_sec: int, tv_nsec: int) -> bool: + """ + Write an IEC_TIMESPEC to memory. + + Args: + address: Memory address of the IEC_TIMESPEC structure + tv_sec: Seconds value (int32) + tv_nsec: Nanoseconds value (int32) + + Returns: + True if successful + """ + try: + ptr = ctypes.cast(address, ctypes.POINTER(IEC_TIMESPEC)) + ptr.contents.tv_sec = ctypes.c_int32(tv_sec).value + ptr.contents.tv_nsec = ctypes.c_int32(tv_nsec).value + return True + except Exception as e: + raise RuntimeError(f"Timespec memory write error: {e}") + + def initialize_variable_cache(sba, indices: List[int]) -> Dict[int, VariableMetadata]: """Initialize metadata cache for direct memory access.""" try: diff --git a/core/src/drivers/plugins/python/opcua/opcua_utils.py b/core/src/drivers/plugins/python/opcua/opcua_utils.py index defc7843..010dbe9b 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_utils.py +++ b/core/src/drivers/plugins/python/opcua/opcua_utils.py @@ -19,6 +19,10 @@ from opcua_logging import log_info, log_warn, log_error +# TIME-related datatypes that use IEC_TIMESPEC structure +TIME_DATATYPES = frozenset(["TIME", "DATE", "TOD", "DT"]) + + def map_plc_to_opcua_type(plc_type: str) -> ua.VariantType: """Map plc datatype to OPC-UA VariantType.""" type_mapping = { @@ -31,11 +35,45 @@ def map_plc_to_opcua_type(plc_type: str) -> ua.VariantType: "FLOAT": ua.VariantType.Float, "REAL": ua.VariantType.Float, # IEC 61131-3 REAL = 32-bit float "STRING": ua.VariantType.String, + # TIME-related types - represented as Int64 (milliseconds for duration types) + "TIME": ua.VariantType.Int64, # Duration in milliseconds + "TOD": ua.VariantType.Int64, # Time of day in milliseconds since midnight + "DATE": ua.VariantType.DateTime, # Date as OPC-UA DateTime + "DT": ua.VariantType.DateTime, # Date and Time as OPC-UA DateTime } mapped_type = type_mapping.get(plc_type.upper(), ua.VariantType.Variant) return mapped_type +def timespec_to_milliseconds(tv_sec: int, tv_nsec: int) -> int: + """ + Convert IEC_TIMESPEC (tv_sec, tv_nsec) to milliseconds. + + Args: + tv_sec: Seconds component + tv_nsec: Nanoseconds component + + Returns: + Total time in milliseconds + """ + return (tv_sec * 1000) + (tv_nsec // 1_000_000) + + +def milliseconds_to_timespec(ms: int) -> tuple: + """ + Convert milliseconds to IEC_TIMESPEC format (tv_sec, tv_nsec). + + Args: + ms: Time in milliseconds + + Returns: + Tuple of (tv_sec, tv_nsec) + """ + tv_sec = ms // 1000 + tv_nsec = (ms % 1000) * 1_000_000 + return (tv_sec, tv_nsec) + + def convert_value_for_opcua(datatype: str, value: Any) -> Any: """Convert PLC debug variable value to OPC-UA compatible format.""" # The debug utils return raw integer values based on variable size @@ -79,10 +117,50 @@ def convert_value_for_opcua(datatype: str, value: Any) -> Any: elif datatype.upper() in ["STRING", "String"]: return str(value) - + + elif datatype.upper() == "TIME": + # TIME values are stored as IEC_TIMESPEC (tv_sec, tv_nsec) + # Convert to milliseconds for OPC-UA Int64 representation + if isinstance(value, tuple) and len(value) == 2: + tv_sec, tv_nsec = value + return timespec_to_milliseconds(tv_sec, tv_nsec) + elif isinstance(value, int): + # If already an integer, assume it's milliseconds + return value + return 0 + + elif datatype.upper() == "TOD": + # TOD (Time of Day) - milliseconds since midnight + if isinstance(value, tuple) and len(value) == 2: + tv_sec, tv_nsec = value + return timespec_to_milliseconds(tv_sec, tv_nsec) + elif isinstance(value, int): + return value + return 0 + + elif datatype.upper() in ["DATE", "DT"]: + # DATE and DT map to OPC-UA DateTime + # IEC_TIMESPEC stores seconds since epoch (1970-01-01) + from datetime import datetime, timezone + + if isinstance(value, tuple) and len(value) == 2: + tv_sec, tv_nsec = value + # Convert to datetime object + try: + dt = datetime.fromtimestamp(tv_sec, tz=timezone.utc) + # Add microseconds (nsec / 1000) + dt = dt.replace(microsecond=tv_nsec // 1000) + return dt + except (OSError, OverflowError, ValueError): + # Invalid timestamp, return epoch + return datetime(1970, 1, 1, tzinfo=timezone.utc) + elif isinstance(value, datetime): + return value + return datetime(1970, 1, 1, tzinfo=timezone.utc) + else: return value - + except (ValueError, TypeError, OverflowError) as e: # If conversion fails, return a safe default log_warn(f"Failed to convert value {value} to OPC-UA format for {datatype}: {e}") @@ -92,6 +170,8 @@ def convert_value_for_opcua(datatype: str, value: Any) -> Any: return 0.0 elif datatype.upper() == "STRING": return "" + elif datatype.upper() in TIME_DATATYPES: + return 0 else: return 0 @@ -140,11 +220,35 @@ def convert_value_for_plc(datatype: str, value: Any) -> Any: elif datatype.upper() in ["STRING", "String"]: return str(value) - + + elif datatype.upper() == "TIME": + # Convert OPC-UA milliseconds (Int64) to IEC_TIMESPEC tuple + ms = int(value) + return milliseconds_to_timespec(ms) + + elif datatype.upper() == "TOD": + # TOD (Time of Day) - convert milliseconds to timespec + ms = int(value) + return milliseconds_to_timespec(ms) + + elif datatype.upper() in ["DATE", "DT"]: + # Convert OPC-UA DateTime to IEC_TIMESPEC tuple + from datetime import datetime, timezone + + if isinstance(value, datetime): + # Convert datetime to seconds since epoch + tv_sec = int(value.timestamp()) + tv_nsec = value.microsecond * 1000 + return (tv_sec, tv_nsec) + elif isinstance(value, (int, float)): + # Assume it's a timestamp + return (int(value), 0) + return (0, 0) + else: # For unknown types, try to preserve the value return value - + except (ValueError, TypeError, OverflowError) as e: # If conversion fails, log and return a safe default log_warn(f"Failed to convert value {value} to {datatype}, using default: {e}") @@ -154,6 +258,8 @@ def convert_value_for_plc(datatype: str, value: Any) -> Any: return 0 elif datatype.upper() == "STRING": return "" + elif datatype.upper() in TIME_DATATYPES: + return (0, 0) else: return 0 diff --git a/core/src/drivers/plugins/python/opcua/synchronization.py b/core/src/drivers/plugins/python/opcua/synchronization.py index c3576e64..32cff5fb 100644 --- a/core/src/drivers/plugins/python/opcua/synchronization.py +++ b/core/src/drivers/plugins/python/opcua/synchronization.py @@ -34,13 +34,33 @@ try: from .opcua_logging import log_info, log_warn, log_error, log_debug from .opcua_types import VariableNode, VariableMetadata - from .opcua_utils import map_plc_to_opcua_type, convert_value_for_opcua, convert_value_for_plc - from .opcua_memory import read_memory_direct, initialize_variable_cache + from .opcua_utils import ( + map_plc_to_opcua_type, + convert_value_for_opcua, + convert_value_for_plc, + TIME_DATATYPES, + ) + from .opcua_memory import ( + read_memory_direct, + initialize_variable_cache, + write_timespec_direct, + TIME_DATATYPES as MEM_TIME_DATATYPES, + ) except ImportError: from opcua_logging import log_info, log_warn, log_error, log_debug from opcua_types import VariableNode, VariableMetadata - from opcua_utils import map_plc_to_opcua_type, convert_value_for_opcua, convert_value_for_plc - from opcua_memory import read_memory_direct, initialize_variable_cache + from opcua_utils import ( + map_plc_to_opcua_type, + convert_value_for_opcua, + convert_value_for_plc, + TIME_DATATYPES, + ) + from opcua_memory import ( + read_memory_direct, + initialize_variable_cache, + write_timespec_direct, + TIME_DATATYPES as MEM_TIME_DATATYPES, + ) from shared import SafeBufferAccess @@ -247,14 +267,17 @@ async def sync_opcua_to_runtime(self) -> None: Synchronize values from OPC-UA readwrite nodes to PLC runtime. Only syncs changed values to minimize PLC writes. + TIME values are written via direct memory access. """ try: if not self._readwrite_nodes: return # Collect values to write (only changed values) + # Separate TIME values (need direct memory access) from regular values values_to_write = [] indices_to_write = [] + time_writes = [] # List of (var_index, tv_sec, tv_nsec) tuples for var_index, var_node in self._readwrite_nodes.items(): try: @@ -266,6 +289,8 @@ async def sync_opcua_to_runtime(self) -> None: if actual_value is None: continue + is_time_type = var_node.datatype.upper() in TIME_DATATYPES + # Check if this is an array node if var_node.array_length and var_node.array_length > 0: # Handle array: value should be a list @@ -276,8 +301,12 @@ async def sync_opcua_to_runtime(self) -> None: # Check if element has changed if self._has_value_changed(elem_index, plc_value): - values_to_write.append(plc_value) - indices_to_write.append(elem_index) + if is_time_type and isinstance(plc_value, tuple): + tv_sec, tv_nsec = plc_value + time_writes.append((elem_index, tv_sec, tv_nsec)) + else: + values_to_write.append(plc_value) + indices_to_write.append(elem_index) self.opcua_value_cache[elem_index] = plc_value log_debug(f"Array element {elem_index} changed: {plc_value}") continue @@ -287,8 +316,13 @@ async def sync_opcua_to_runtime(self) -> None: # Check if value has changed if self._has_value_changed(var_index, plc_value): - values_to_write.append(plc_value) - indices_to_write.append(var_index) + if is_time_type and isinstance(plc_value, tuple): + # TIME values need direct memory access + tv_sec, tv_nsec = plc_value + time_writes.append((var_index, tv_sec, tv_nsec)) + else: + values_to_write.append(plc_value) + indices_to_write.append(var_index) # Update cache self.opcua_value_cache[var_index] = plc_value @@ -302,6 +336,19 @@ async def sync_opcua_to_runtime(self) -> None: if values_to_write: await self._write_to_plc_batch(indices_to_write, values_to_write) + # Write TIME values via direct memory access + if time_writes and self._direct_memory_access_enabled: + for var_index, tv_sec, tv_nsec in time_writes: + try: + metadata = self.variable_metadata.get(var_index) + if metadata: + write_timespec_direct(metadata.address, tv_sec, tv_nsec) + log_debug(f"TIME variable {var_index} written: ({tv_sec}, {tv_nsec})") + else: + log_warn(f"No metadata for TIME variable {var_index}, skipping write") + except Exception as e: + log_error(f"Failed to write TIME variable {var_index}: {e}") + except Exception as e: log_error(f"Error in OPC-UA to runtime sync: {e}") @@ -331,12 +378,18 @@ async def _update_via_direct_memory_access(self) -> None: """ for var_index, metadata in self.variable_metadata.items(): try: - # Direct memory read - value = read_memory_direct(metadata.address, metadata.size) - var_node = self.variable_nodes.get(var_index) - if var_node: - await self._update_opcua_node(var_node, value) + if not var_node: + continue + + # Direct memory read - pass datatype for TIME handling + value = read_memory_direct( + metadata.address, + metadata.size, + datatype=var_node.datatype + ) + + await self._update_opcua_node(var_node, value) except Exception as e: log_error(f"Direct memory access failed for var {var_index}: {e}") @@ -447,7 +500,12 @@ async def _update_array_node(self, var_node: VariableNode) -> None: for idx in element_indices: metadata = self.variable_metadata.get(idx) if metadata: - raw_value = read_memory_direct(metadata.address, metadata.size) + # Pass datatype for TIME handling + raw_value = read_memory_direct( + metadata.address, + metadata.size, + datatype=var_node.datatype + ) opcua_value = convert_value_for_opcua(var_node.datatype, raw_value) array_values.append(opcua_value) else: @@ -505,6 +563,8 @@ def _get_default_value(self, datatype: str) -> Any: return 0.0 elif dtype == "STRING": return "" + elif dtype in TIME_DATATYPES: + return 0 # TIME is represented as milliseconds (Int64) in OPC-UA else: return 0 @@ -559,6 +619,10 @@ def _has_value_changed(self, var_index: int, new_value: Any) -> bool: if isinstance(new_value, float) and isinstance(cached_value, float): return abs(new_value - cached_value) > 1e-6 + # Tuple comparison for TIME types (tv_sec, tv_nsec) + if isinstance(new_value, tuple) and isinstance(cached_value, tuple): + return new_value != cached_value + # Exact comparison for other types return new_value != cached_value diff --git a/core/src/drivers/plugins/python/shared/plugin_config_decode/opcua_config_model.py b/core/src/drivers/plugins/python/shared/plugin_config_decode/opcua_config_model.py index 4ee17b6c..706d4645 100644 --- a/core/src/drivers/plugins/python/shared/plugin_config_decode/opcua_config_model.py +++ b/core/src/drivers/plugins/python/shared/plugin_config_decode/opcua_config_model.py @@ -12,6 +12,17 @@ # Permission types for variables PermissionType = Literal["r", "w", "rw"] +# Valid datatypes for OPC-UA variables +VALID_DATATYPES = frozenset([ + "BOOL", "BYTE", + "INT", "DINT", "LINT", "INT32", + "FLOAT", "REAL", + "STRING", + # TIME-related types (IEC 61131-3) + "TIME", "DATE", "TOD", "DT", +]) + + @dataclass class SecurityProfile: """Configuration for a security profile/endpoint.""" @@ -431,6 +442,28 @@ def validate(self) -> None: if len(all_indices) != len(set(all_indices)): raise ValueError(f"Duplicate indices found in plugin '{plugin.name}'") + # Validate datatypes + for var in address_space.variables: + if var.datatype.upper() not in VALID_DATATYPES: + raise ValueError( + f"Invalid datatype '{var.datatype}' for variable '{var.node_id}' " + f"in plugin '{plugin.name}'. Valid types: {sorted(VALID_DATATYPES)}" + ) + for struct in address_space.structures: + for field in struct.fields: + if field.datatype.upper() not in VALID_DATATYPES: + raise ValueError( + f"Invalid datatype '{field.datatype}' for field '{field.name}' " + f"in struct '{struct.node_id}' in plugin '{plugin.name}'. " + f"Valid types: {sorted(VALID_DATATYPES)}" + ) + for arr in address_space.arrays: + if arr.datatype.upper() not in VALID_DATATYPES: + raise ValueError( + f"Invalid datatype '{arr.datatype}' for array '{arr.node_id}' " + f"in plugin '{plugin.name}'. Valid types: {sorted(VALID_DATATYPES)}" + ) + # Check for duplicate plugin names plugin_names = [plugin.name for plugin in self.plugins] if len(plugin_names) != len(set(plugin_names)): diff --git a/docs/plans/TIME_TYPE_SUPPORT_PLAN.md b/docs/plans/TIME_TYPE_SUPPORT_PLAN.md new file mode 100644 index 00000000..20c1c708 --- /dev/null +++ b/docs/plans/TIME_TYPE_SUPPORT_PLAN.md @@ -0,0 +1,533 @@ +# Development Plan: OPC-UA TIME Type Support + +## Executive Summary + +The current OPC-UA plugin implementation does not support IEC 61131-3 TIME type variables. This document outlines the development and test plan to introduce TIME type support. + +## Current State Analysis + +### IEC 61131-3 TIME Structure (from `core/src/lib/iec_types.h`) + +```c +typedef struct { + int32_t tv_sec; // Seconds + int32_t tv_nsec; // Nanoseconds +} IEC_TIMESPEC; + +typedef IEC_TIMESPEC IEC_TIME; // Duration type +typedef IEC_TIMESPEC IEC_DATE; // Date type +typedef IEC_TIMESPEC IEC_DT; // Date and Time type +typedef IEC_TIMESPEC IEC_TOD; // Time of Day type +``` + +**Key characteristics:** +- Total size: 8 bytes +- Represents duration/time as seconds + nanoseconds +- Same underlying structure for TIME, DATE, DT, and TOD + +### Current Type Support (from `opcua_utils.py`) + +| PLC Type | OPC-UA Type | Size | +|----------|-------------|------| +| BOOL | Boolean | 1 byte | +| BYTE | Byte | 1 byte | +| INT | Int16 | 2 bytes | +| DINT/INT32 | Int32 | 4 bytes | +| LINT | Int64 | 8 bytes | +| FLOAT/REAL | Float | 4 bytes | +| STRING | String | 127 bytes | + +**Missing types:** TIME, DATE, TOD, DT, LREAL, WORD, DWORD, LWORD, UINT, UDINT, ULINT, SINT, USINT + +### Gap Analysis + +1. **Type Mapping**: `map_plc_to_opcua_type()` has no TIME mapping +2. **Memory Access**: `opcua_memory.py` reads 8-byte values as `c_uint64`, not as TIME struct +3. **Value Conversion**: `convert_value_for_opcua()` and `convert_value_for_plc()` have no TIME handling +4. **Configuration**: No TIME examples in config templates or documentation + +--- + +## Development Plan + +### Phase 1: Core Type Support + +#### Task 1.1: Define IEC_TIMESPEC ctypes Structure + +**File:** `core/src/drivers/plugins/python/opcua/opcua_memory.py` + +Add a ctypes structure matching the C definition: + +```python +class IEC_TIMESPEC(ctypes.Structure): + """ + ctypes structure matching IEC_TIMESPEC from iec_types.h. + + typedef struct { + int32_t tv_sec; // Seconds + int32_t tv_nsec; // Nanoseconds + } IEC_TIMESPEC; + """ + _fields_ = [ + ("tv_sec", ctypes.c_int32), + ("tv_nsec", ctypes.c_int32), + ] + +TIMESPEC_SIZE = 8 # sizeof(IEC_TIMESPEC) +``` + +#### Task 1.2: Add TIME Type Mapping + +**File:** `core/src/drivers/plugins/python/opcua/opcua_utils.py` + +Update `map_plc_to_opcua_type()`: + +```python +def map_plc_to_opcua_type(plc_type: str) -> ua.VariantType: + """Map plc datatype to OPC-UA VariantType.""" + type_mapping = { + # Existing types... + "BOOL": ua.VariantType.Boolean, + "BYTE": ua.VariantType.Byte, + "INT": ua.VariantType.Int16, + "INT32": ua.VariantType.Int32, + "DINT": ua.VariantType.Int32, + "LINT": ua.VariantType.Int64, + "FLOAT": ua.VariantType.Float, + "REAL": ua.VariantType.Float, + "STRING": ua.VariantType.String, + # New TIME types - represented as Int64 (milliseconds) + "TIME": ua.VariantType.Int64, + "DATE": ua.VariantType.DateTime, + "TOD": ua.VariantType.Int64, # Milliseconds since midnight + "DT": ua.VariantType.DateTime, + } + return type_mapping.get(plc_type.upper(), ua.VariantType.Variant) +``` + +**Design Decision: TIME Representation in OPC-UA** + +| Option | OPC-UA Type | Pros | Cons | +|--------|-------------|------|------| +| A. Int64 (ms) | Int64 | Simple, standard duration format | Loss of nanosecond precision | +| B. Double (seconds) | Double | Good precision, human readable | Floating point quirks | +| C. Custom Struct | ExtensionObject | Full precision preserved | Complex, non-standard | + +**Recommendation:** Option A (Int64 milliseconds) for TIME/TOD types, and DateTime for DATE/DT types. + +#### Task 1.3: Implement TIME Conversion Functions + +**File:** `core/src/drivers/plugins/python/opcua/opcua_utils.py` + +```python +def timespec_to_milliseconds(tv_sec: int, tv_nsec: int) -> int: + """Convert IEC_TIMESPEC to milliseconds.""" + return (tv_sec * 1000) + (tv_nsec // 1_000_000) + +def milliseconds_to_timespec(ms: int) -> tuple[int, int]: + """Convert milliseconds to (tv_sec, tv_nsec) tuple.""" + tv_sec = ms // 1000 + tv_nsec = (ms % 1000) * 1_000_000 + return (tv_sec, tv_nsec) +``` + +Update `convert_value_for_opcua()`: + +```python +elif datatype.upper() == "TIME": + # TIME values are stored as IEC_TIMESPEC (tv_sec, tv_nsec) + # Convert to milliseconds for OPC-UA Int64 representation + if isinstance(value, tuple) and len(value) == 2: + tv_sec, tv_nsec = value + return timespec_to_milliseconds(tv_sec, tv_nsec) + elif isinstance(value, int): + # Already in raw format, interpret as packed 64-bit value + tv_sec = value & 0xFFFFFFFF + tv_nsec = (value >> 32) & 0xFFFFFFFF + return timespec_to_milliseconds(tv_sec, tv_nsec) + return 0 +``` + +Update `convert_value_for_plc()`: + +```python +elif datatype.upper() == "TIME": + # Convert OPC-UA milliseconds to IEC_TIMESPEC format + ms = int(value) + tv_sec, tv_nsec = milliseconds_to_timespec(ms) + # Return as tuple for memory writing + return (tv_sec, tv_nsec) +``` + +#### Task 1.4: Implement TIME Memory Read/Write + +**File:** `core/src/drivers/plugins/python/opcua/opcua_memory.py` + +```python +def read_timespec_direct(address: int) -> tuple[int, int]: + """ + Read an IEC_TIMESPEC directly from memory. + + Returns: + Tuple of (tv_sec, tv_nsec) + """ + ptr = ctypes.cast(address, ctypes.POINTER(IEC_TIMESPEC)) + timespec = ptr.contents + return (timespec.tv_sec, timespec.tv_nsec) + +def write_timespec_direct(address: int, tv_sec: int, tv_nsec: int) -> bool: + """ + Write an IEC_TIMESPEC to memory. + """ + ptr = ctypes.cast(address, ctypes.POINTER(IEC_TIMESPEC)) + ptr.contents.tv_sec = tv_sec + ptr.contents.tv_nsec = tv_nsec + return True +``` + +Update `read_memory_direct()` to handle TIME size: + +```python +def read_memory_direct(address: int, size: int, datatype: str = None) -> Any: + """Read value from memory with optional datatype hint.""" + # ... existing code ... + elif size == 8: + if datatype and datatype.upper() in ["TIME", "DATE", "TOD", "DT"]: + return read_timespec_direct(address) + else: + ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint64)) + return ptr.contents.value +``` + +### Phase 2: Synchronization Integration + +#### Task 2.1: Update Address Space Creation + +**File:** `core/src/drivers/plugins/python/opcua/address_space.py` + +Ensure TIME variables are created with proper OPC-UA type and initial value conversion. + +#### Task 2.2: Update Synchronization Logic + +**File:** `core/src/drivers/plugins/python/opcua/synchronization.py` + +Modify the sync functions to pass datatype information for proper TIME handling: + +- `_sync_single_var_from_runtime()`: Pass datatype to memory read +- `_sync_single_var_to_runtime()`: Handle TIME tuple values for memory write + +### Phase 3: Configuration and Validation + +#### Task 3.1: Update Configuration Model + +**File:** `core/src/drivers/plugins/python/shared/plugin_config_decode/opcua_config_model.py` + +Add validation for TIME datatype: + +```python +VALID_DATATYPES = ["BOOL", "BYTE", "INT", "DINT", "LINT", "FLOAT", "REAL", + "STRING", "TIME", "DATE", "TOD", "DT"] +``` + +#### Task 3.2: Update Type Inference + +**File:** `core/src/drivers/plugins/python/opcua/opcua_utils.py` + +Update `infer_var_type()` to better handle ambiguous sizes when datatype is known: + +```python +def infer_var_type(size: int, configured_type: str = None) -> str: + """Infer variable type from size and optional configured type.""" + if configured_type: + return configured_type.upper() + # ... existing inference logic ... +``` + +#### Task 3.3: Update Configuration Templates + +**File:** `core/src/drivers/plugins/python/opcua/opcua_config_template.json` + +Add TIME variable examples: + +```json +{ + "node_id": "ns=2;s=CycleTime", + "browse_name": "CycleTime", + "display_name": "Cycle Time", + "datatype": "TIME", + "initial_value": 0, + "description": "PLC scan cycle time", + "index": 10, + "permissions": { + "viewer": "r", + "operator": "r", + "engineer": "rw" + } +} +``` + +--- + +## Test Plan + +### Unit Tests + +#### Test Suite 1: Type Conversion (test_time_conversion.py) + +```python +class TestTimeConversion: + def test_timespec_to_milliseconds_basic(self): + """Test basic conversion: 1 second = 1000 ms""" + assert timespec_to_milliseconds(1, 0) == 1000 + + def test_timespec_to_milliseconds_with_nanoseconds(self): + """Test conversion with nanoseconds: 1.5 sec = 1500 ms""" + assert timespec_to_milliseconds(1, 500_000_000) == 1500 + + def test_milliseconds_to_timespec_basic(self): + """Test reverse conversion""" + assert milliseconds_to_timespec(1500) == (1, 500_000_000) + + def test_roundtrip_conversion(self): + """Test roundtrip preserves value""" + original = (5, 250_000_000) + ms = timespec_to_milliseconds(*original) + result = milliseconds_to_timespec(ms) + assert result == original + + def test_zero_time(self): + """Test zero value handling""" + assert timespec_to_milliseconds(0, 0) == 0 + assert milliseconds_to_timespec(0) == (0, 0) + + def test_large_time_values(self): + """Test large values (hours/days)""" + # 24 hours in seconds = 86400 + ms = timespec_to_milliseconds(86400, 0) + assert ms == 86_400_000 +``` + +#### Test Suite 2: Type Mapping (test_time_mapping.py) + +```python +class TestTimeTypeMapping: + def test_time_maps_to_int64(self): + """TIME should map to Int64""" + assert map_plc_to_opcua_type("TIME") == ua.VariantType.Int64 + + def test_time_case_insensitive(self): + """Mapping should be case-insensitive""" + assert map_plc_to_opcua_type("time") == ua.VariantType.Int64 + assert map_plc_to_opcua_type("Time") == ua.VariantType.Int64 + + def test_date_maps_to_datetime(self): + """DATE should map to DateTime""" + assert map_plc_to_opcua_type("DATE") == ua.VariantType.DateTime +``` + +#### Test Suite 3: Memory Access (test_time_memory.py) + +```python +class TestTimeMemoryAccess: + def test_read_timespec_structure(self): + """Test reading IEC_TIMESPEC from memory""" + # Create test memory with known values + test_struct = IEC_TIMESPEC() + test_struct.tv_sec = 10 + test_struct.tv_nsec = 500_000_000 + + address = ctypes.addressof(test_struct) + result = read_timespec_direct(address) + assert result == (10, 500_000_000) + + def test_write_timespec_structure(self): + """Test writing IEC_TIMESPEC to memory""" + test_struct = IEC_TIMESPEC() + address = ctypes.addressof(test_struct) + + write_timespec_direct(address, 5, 250_000_000) + + assert test_struct.tv_sec == 5 + assert test_struct.tv_nsec == 250_000_000 +``` + +### Integration Tests + +#### Test Suite 4: End-to-End TIME Variable Sync (test_time_sync_integration.py) + +```python +class TestTimeVariableSync: + @pytest.fixture + def time_variable_config(self): + """Configuration with TIME variable""" + return { + "node_id": "ns=2;s=TestTime", + "browse_name": "TestTime", + "display_name": "Test Time Variable", + "datatype": "TIME", + "initial_value": 0, + "description": "Test TIME variable", + "index": 100, + "permissions": {"viewer": "r", "operator": "rw", "engineer": "rw"} + } + + async def test_time_variable_created_in_address_space(self, server, config): + """Verify TIME variable is created with correct OPC-UA type""" + # Create variable + # Check node exists and has Int64 data type + pass + + async def test_time_value_sync_plc_to_opcua(self, server, config): + """Test syncing TIME value from PLC to OPC-UA""" + # Set PLC memory to specific TIME value + # Trigger sync + # Verify OPC-UA node has correct milliseconds value + pass + + async def test_time_value_sync_opcua_to_plc(self, server, config): + """Test syncing TIME value from OPC-UA to PLC""" + # Write milliseconds value to OPC-UA node + # Trigger sync + # Verify PLC memory has correct tv_sec/tv_nsec + pass +``` + +#### Test Suite 5: Configuration Validation (test_time_config_validation.py) + +```python +class TestTimeConfigValidation: + def test_time_datatype_accepted(self): + """TIME datatype should be valid in config""" + config = {"datatype": "TIME", ...} + # Should not raise + SimpleVariable.from_dict(config) + + def test_time_initial_value_formats(self): + """Various initial value formats for TIME""" + # Integer milliseconds + config1 = {"datatype": "TIME", "initial_value": 5000, ...} + # String format "T#5s" + config2 = {"datatype": "TIME", "initial_value": "T#5s", ...} +``` + +### System Tests + +#### Test Suite 6: OPC-UA Client Interaction (test_time_client.py) + +```python +class TestTimeWithOpcuaClient: + async def test_read_time_value_with_uaexpert(self): + """Verify TIME value can be read by standard OPC-UA client""" + # Start server with TIME variable + # Connect with asyncua client + # Read value, verify it's Int64 type with correct value + pass + + async def test_write_time_value_with_uaexpert(self): + """Verify TIME value can be written by standard OPC-UA client""" + # Write Int64 value representing milliseconds + # Verify PLC memory updated correctly + pass + + async def test_time_subscription_updates(self): + """Verify TIME variable changes trigger subscriptions""" + # Subscribe to TIME node + # Change PLC value + # Verify subscription callback received + pass +``` + +### Performance Tests + +#### Test Suite 7: TIME Sync Performance (test_time_performance.py) + +```python +class TestTimePerformance: + def test_time_conversion_performance(self): + """Conversion should be fast""" + import timeit + time_taken = timeit.timeit( + lambda: timespec_to_milliseconds(12345, 678_000_000), + number=100_000 + ) + assert time_taken < 1.0 # 100k conversions under 1 second + + def test_time_sync_latency(self): + """Measure sync latency for TIME variables""" + # Time the complete sync cycle + pass +``` + +--- + +## Files to Modify + +| File | Changes | +|------|---------| +| `core/src/drivers/plugins/python/opcua/opcua_utils.py` | Add TIME mapping, conversion functions | +| `core/src/drivers/plugins/python/opcua/opcua_memory.py` | Add IEC_TIMESPEC struct, read/write functions | +| `core/src/drivers/plugins/python/opcua/address_space.py` | Handle TIME in variable creation | +| `core/src/drivers/plugins/python/opcua/synchronization.py` | Pass datatype for TIME handling | +| `core/src/drivers/plugins/python/shared/plugin_config_decode/opcua_config_model.py` | Add TIME validation | +| `core/src/drivers/plugins/python/opcua/opcua_config_template.json` | Add TIME examples | +| `core/src/drivers/plugins/python/opcua/docs/` | Update documentation | + +## New Files to Create + +| File | Purpose | +|------|---------| +| `tests/plugins/opcua/test_time_conversion.py` | Unit tests for conversion | +| `tests/plugins/opcua/test_time_mapping.py` | Unit tests for type mapping | +| `tests/plugins/opcua/test_time_memory.py` | Unit tests for memory access | +| `tests/plugins/opcua/test_time_sync_integration.py` | Integration tests | + +--- + +## Implementation Priority + +1. **High Priority (Core Functionality)** + - Task 1.1: IEC_TIMESPEC ctypes structure + - Task 1.2: Type mapping + - Task 1.3: Conversion functions + - Task 1.4: Memory read/write + +2. **Medium Priority (Integration)** + - Task 2.1: Address space creation + - Task 2.2: Synchronization logic + +3. **Lower Priority (Polish)** + - Task 3.1: Configuration validation + - Task 3.2: Type inference update + - Task 3.3: Template and documentation updates + +--- + +## Risk Assessment + +| Risk | Impact | Mitigation | +|------|--------|------------| +| Precision loss (ns to ms) | Low | Document limitation; sufficient for most PLC applications | +| Breaking existing configs | Medium | TIME is opt-in via explicit datatype | +| Memory alignment issues | High | Use ctypes Structure with matching C layout | +| OPC-UA client compatibility | Medium | Use standard Int64 type; test with multiple clients | + +--- + +## Acceptance Criteria + +1. TIME variables can be configured in opcua.json +2. TIME values sync correctly PLC -> OPC-UA (ms representation) +3. TIME values sync correctly OPC-UA -> PLC (timespec structure) +4. Standard OPC-UA clients can read/write TIME values +5. All unit and integration tests pass +6. No regression in existing type support +7. Documentation updated with TIME examples + +--- + +## Future Considerations + +- **LTIME support**: IEC 61131-3 LTIME (64-bit time) may use different structure +- **DATE/DT/TOD types**: Can use same IEC_TIMESPEC structure with DateTime mapping +- **LREAL support**: Similar pattern (8-byte, needs struct unpacking) +- **Array of TIME**: Extend array support to handle TIME arrays From 49bfbbb8bbe769fb57bdb9896e108134b16ced77 Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Tue, 20 Jan 2026 08:58:34 -0300 Subject: [PATCH 2/5] test: Add unit tests for TIME type support Add comprehensive unit tests for the new TIME type support: Type conversion tests (test_type_conversions.py): - TIME/TOD/DATE/DT type mappings to OPC-UA types - TIME conversion from tuple to milliseconds - TIME conversion from milliseconds to tuple - TIME roundtrip conversion tests - timespec_to_milliseconds helper function tests - milliseconds_to_timespec helper function tests - TIME_DATATYPES constant tests Memory access tests (test_memory.py): - IEC_TIMESPEC structure tests (size, fields, initialization) - read_timespec_direct function tests - write_timespec_direct function tests - read_memory_direct with TIME datatype hint tests - Roundtrip read/write tests for TIME values All 127 tests pass. Co-Authored-By: Claude Opus 4.5 --- tests/pytest/plugins/opcua/test_memory.py | 209 ++++++++++++++++++ .../plugins/opcua/test_type_conversions.py | 181 +++++++++++++++ 2 files changed, 390 insertions(+) diff --git a/tests/pytest/plugins/opcua/test_memory.py b/tests/pytest/plugins/opcua/test_memory.py index d59ed143..f53ad602 100644 --- a/tests/pytest/plugins/opcua/test_memory.py +++ b/tests/pytest/plugins/opcua/test_memory.py @@ -19,11 +19,16 @@ from opcua_memory import ( IEC_STRING, + IEC_TIMESPEC, STR_MAX_LEN, STRING_TOTAL_SIZE, + TIMESPEC_SIZE, + TIME_DATATYPES, read_memory_direct, read_string_direct, write_string_direct, + read_timespec_direct, + write_timespec_direct, ) @@ -258,3 +263,207 @@ def test_unsupported_size_raises(self): with pytest.raises(RuntimeError) as exc_info: read_memory_direct(address, 16) assert "Unsupported variable size" in str(exc_info.value) + + +class TestIECTimespecStructure: + """Tests for the IEC_TIMESPEC ctypes structure.""" + + def test_structure_size(self): + """IEC_TIMESPEC should be 8 bytes (2 x int32).""" + assert ctypes.sizeof(IEC_TIMESPEC) == TIMESPEC_SIZE + assert ctypes.sizeof(IEC_TIMESPEC) == 8 + + def test_timespec_size_constant(self): + """TIMESPEC_SIZE should be 8.""" + assert TIMESPEC_SIZE == 8 + + def test_structure_fields(self): + """IEC_TIMESPEC should have tv_sec and tv_nsec fields.""" + timespec = IEC_TIMESPEC() + assert hasattr(timespec, 'tv_sec') + assert hasattr(timespec, 'tv_nsec') + + def test_structure_initialization(self): + """IEC_TIMESPEC should initialize with zeros.""" + timespec = IEC_TIMESPEC() + assert timespec.tv_sec == 0 + assert timespec.tv_nsec == 0 + + def test_structure_tv_sec_field(self): + """tv_sec field should accept int32 values.""" + timespec = IEC_TIMESPEC() + timespec.tv_sec = 3600 + assert timespec.tv_sec == 3600 + + timespec.tv_sec = -100 + assert timespec.tv_sec == -100 + + def test_structure_tv_nsec_field(self): + """tv_nsec field should accept int32 values.""" + timespec = IEC_TIMESPEC() + timespec.tv_nsec = 500_000_000 + assert timespec.tv_nsec == 500_000_000 + + +class TestReadTimespecDirect: + """Tests for read_timespec_direct function.""" + + def _create_timespec_in_memory(self, tv_sec: int, tv_nsec: int) -> tuple: + """ + Create an IEC_TIMESPEC in memory and return (address, struct). + """ + timespec = IEC_TIMESPEC() + timespec.tv_sec = tv_sec + timespec.tv_nsec = tv_nsec + address = ctypes.addressof(timespec) + return address, timespec + + def test_read_zero_time(self): + """Should read zero time correctly.""" + address, timespec = self._create_timespec_in_memory(0, 0) + result = read_timespec_direct(address) + assert result == (0, 0) + + def test_read_seconds_only(self): + """Should read time with only seconds.""" + address, timespec = self._create_timespec_in_memory(100, 0) + result = read_timespec_direct(address) + assert result == (100, 0) + + def test_read_with_nanoseconds(self): + """Should read time with nanoseconds.""" + address, timespec = self._create_timespec_in_memory(1, 500_000_000) + result = read_timespec_direct(address) + assert result == (1, 500_000_000) + + def test_read_large_time(self): + """Should read large time values (hours/days).""" + # 24 hours + address, timespec = self._create_timespec_in_memory(86400, 0) + result = read_timespec_direct(address) + assert result == (86400, 0) + + def test_read_negative_seconds(self): + """Should handle negative seconds (for negative time intervals).""" + address, timespec = self._create_timespec_in_memory(-10, 0) + result = read_timespec_direct(address) + assert result == (-10, 0) + + +class TestWriteTimespecDirect: + """Tests for write_timespec_direct function.""" + + def _create_empty_timespec(self) -> tuple: + """Create an empty IEC_TIMESPEC and return (address, struct).""" + timespec = IEC_TIMESPEC() + address = ctypes.addressof(timespec) + return address, timespec + + def test_write_zero_time(self): + """Should write zero time correctly.""" + address, timespec = self._create_empty_timespec() + result = write_timespec_direct(address, 0, 0) + assert result is True + assert timespec.tv_sec == 0 + assert timespec.tv_nsec == 0 + + def test_write_seconds_only(self): + """Should write time with only seconds.""" + address, timespec = self._create_empty_timespec() + result = write_timespec_direct(address, 100, 0) + assert result is True + assert timespec.tv_sec == 100 + assert timespec.tv_nsec == 0 + + def test_write_with_nanoseconds(self): + """Should write time with nanoseconds.""" + address, timespec = self._create_empty_timespec() + result = write_timespec_direct(address, 1, 500_000_000) + assert result is True + assert timespec.tv_sec == 1 + assert timespec.tv_nsec == 500_000_000 + + def test_write_large_time(self): + """Should write large time values.""" + address, timespec = self._create_empty_timespec() + result = write_timespec_direct(address, 86400, 999_000_000) + assert result is True + assert timespec.tv_sec == 86400 + assert timespec.tv_nsec == 999_000_000 + + def test_write_then_read_roundtrip(self): + """Should support write then read roundtrip.""" + address, timespec = self._create_empty_timespec() + + write_timespec_direct(address, 3600, 250_000_000) + result = read_timespec_direct(address) + + assert result == (3600, 250_000_000) + + +class TestReadMemoryDirectWithTimeDatatype: + """Tests for read_memory_direct with TIME datatype hint.""" + + def _create_timespec_in_memory(self, tv_sec: int, tv_nsec: int) -> tuple: + """Create an IEC_TIMESPEC in memory.""" + timespec = IEC_TIMESPEC() + timespec.tv_sec = tv_sec + timespec.tv_nsec = tv_nsec + address = ctypes.addressof(timespec) + return address, timespec + + def test_read_memory_direct_time_with_datatype(self): + """read_memory_direct should return tuple for TIME datatype.""" + address, timespec = self._create_timespec_in_memory(10, 500_000_000) + result = read_memory_direct(address, 8, datatype="TIME") + assert result == (10, 500_000_000) + + def test_read_memory_direct_tod_with_datatype(self): + """read_memory_direct should return tuple for TOD datatype.""" + address, timespec = self._create_timespec_in_memory(3600, 0) + result = read_memory_direct(address, 8, datatype="TOD") + assert result == (3600, 0) + + def test_read_memory_direct_date_with_datatype(self): + """read_memory_direct should return tuple for DATE datatype.""" + address, timespec = self._create_timespec_in_memory(86400, 0) + result = read_memory_direct(address, 8, datatype="DATE") + assert result == (86400, 0) + + def test_read_memory_direct_dt_with_datatype(self): + """read_memory_direct should return tuple for DT datatype.""" + address, timespec = self._create_timespec_in_memory(1000000, 123_000_000) + result = read_memory_direct(address, 8, datatype="DT") + assert result == (1000000, 123_000_000) + + def test_read_memory_direct_8bytes_without_datatype(self): + """read_memory_direct should return uint64 for 8 bytes without datatype hint.""" + value = ctypes.c_uint64(1000000000) + address = ctypes.addressof(value) + result = read_memory_direct(address, 8) + assert result == 1000000000 + assert isinstance(result, int) + + def test_read_memory_direct_time_case_insensitive(self): + """read_memory_direct should handle case-insensitive datatype.""" + address, timespec = self._create_timespec_in_memory(5, 100_000_000) + result = read_memory_direct(address, 8, datatype="time") + assert result == (5, 100_000_000) + + result = read_memory_direct(address, 8, datatype="Time") + assert result == (5, 100_000_000) + + +class TestTimeDatatypesConstantMemory: + """Tests for TIME_DATATYPES constant in memory module.""" + + def test_time_datatypes_contains_all_time_types(self): + """TIME_DATATYPES should contain all time-related types.""" + assert "TIME" in TIME_DATATYPES + assert "DATE" in TIME_DATATYPES + assert "TOD" in TIME_DATATYPES + assert "DT" in TIME_DATATYPES + + def test_time_datatypes_is_frozen(self): + """TIME_DATATYPES should be immutable.""" + assert isinstance(TIME_DATATYPES, frozenset) diff --git a/tests/pytest/plugins/opcua/test_type_conversions.py b/tests/pytest/plugins/opcua/test_type_conversions.py index eb094e15..71746f96 100644 --- a/tests/pytest/plugins/opcua/test_type_conversions.py +++ b/tests/pytest/plugins/opcua/test_type_conversions.py @@ -22,6 +22,9 @@ convert_value_for_opcua, convert_value_for_plc, infer_var_type, + timespec_to_milliseconds, + milliseconds_to_timespec, + TIME_DATATYPES, ) from asyncua import ua @@ -80,6 +83,28 @@ def test_unknown_type_mapping(self): assert map_plc_to_opcua_type("UNKNOWN") == ua.VariantType.Variant assert map_plc_to_opcua_type("CUSTOM") == ua.VariantType.Variant + # TIME type mappings + def test_time_mapping(self): + """TIME should map to Int64 (milliseconds).""" + assert map_plc_to_opcua_type("TIME") == ua.VariantType.Int64 + assert map_plc_to_opcua_type("time") == ua.VariantType.Int64 + assert map_plc_to_opcua_type("Time") == ua.VariantType.Int64 + + def test_tod_mapping(self): + """TOD (Time of Day) should map to Int64 (milliseconds since midnight).""" + assert map_plc_to_opcua_type("TOD") == ua.VariantType.Int64 + assert map_plc_to_opcua_type("tod") == ua.VariantType.Int64 + + def test_date_mapping(self): + """DATE should map to DateTime.""" + assert map_plc_to_opcua_type("DATE") == ua.VariantType.DateTime + assert map_plc_to_opcua_type("date") == ua.VariantType.DateTime + + def test_dt_mapping(self): + """DT (Date and Time) should map to DateTime.""" + assert map_plc_to_opcua_type("DT") == ua.VariantType.DateTime + assert map_plc_to_opcua_type("dt") == ua.VariantType.DateTime + class TestConvertValueForOpcua: """Tests for convert_value_for_opcua function.""" @@ -200,6 +225,36 @@ def test_string_from_other_types(self): """Non-string values should be converted to string.""" assert convert_value_for_opcua("STRING", 123) == "123" + # TIME conversions + def test_time_from_tuple(self): + """TIME from tuple (tv_sec, tv_nsec) should convert to milliseconds.""" + # 1.5 seconds = 1500 ms + assert convert_value_for_opcua("TIME", (1, 500_000_000)) == 1500 + # 0 seconds + assert convert_value_for_opcua("TIME", (0, 0)) == 0 + # 10.25 seconds = 10250 ms + assert convert_value_for_opcua("TIME", (10, 250_000_000)) == 10250 + + def test_time_from_int(self): + """TIME from int should be treated as already milliseconds.""" + assert convert_value_for_opcua("TIME", 1500) == 1500 + assert convert_value_for_opcua("TIME", 0) == 0 + + def test_tod_from_tuple(self): + """TOD from tuple should convert to milliseconds since midnight.""" + # 1 hour = 3600 seconds = 3600000 ms + assert convert_value_for_opcua("TOD", (3600, 0)) == 3600000 + # 1 hour + 500ms + assert convert_value_for_opcua("TOD", (3600, 500_000_000)) == 3600500 + + def test_time_large_values(self): + """TIME should handle large values (hours/days).""" + # 24 hours = 86400 seconds = 86400000 ms + assert convert_value_for_opcua("TIME", (86400, 0)) == 86400000 + # 1 day + 1 hour + 1 minute + 1.5 seconds + tv_sec = 86400 + 3600 + 60 + 1 + assert convert_value_for_opcua("TIME", (tv_sec, 500_000_000)) == (tv_sec * 1000 + 500) + class TestConvertValueForPlc: """Tests for convert_value_for_plc function.""" @@ -288,6 +343,28 @@ def test_string_normal(self): assert convert_value_for_plc("STRING", "Hello") == "Hello" assert convert_value_for_plc("STRING", "") == "" + # TIME conversions (OPC-UA milliseconds -> PLC timespec tuple) + def test_time_to_tuple(self): + """TIME milliseconds should convert to (tv_sec, tv_nsec) tuple.""" + # 1500 ms = 1.5 seconds + assert convert_value_for_plc("TIME", 1500) == (1, 500_000_000) + # 0 ms + assert convert_value_for_plc("TIME", 0) == (0, 0) + # 10250 ms = 10.25 seconds + assert convert_value_for_plc("TIME", 10250) == (10, 250_000_000) + + def test_tod_to_tuple(self): + """TOD milliseconds should convert to (tv_sec, tv_nsec) tuple.""" + # 3600000 ms = 1 hour + assert convert_value_for_plc("TOD", 3600000) == (3600, 0) + # 3600500 ms = 1 hour + 500ms + assert convert_value_for_plc("TOD", 3600500) == (3600, 500_000_000) + + def test_time_large_values_to_tuple(self): + """TIME should handle large milliseconds values.""" + # 86400000 ms = 24 hours + assert convert_value_for_plc("TIME", 86400000) == (86400, 0) + class TestInferVarType: """Tests for infer_var_type function.""" @@ -392,3 +469,107 @@ def test_string_roundtrip(self): opcua_val = convert_value_for_opcua("STRING", val) plc_val = convert_value_for_plc("STRING", opcua_val) assert plc_val == val + + def test_time_roundtrip(self): + """TIME values should survive round-trip conversion (PLC tuple -> OPC-UA ms -> PLC tuple).""" + test_values = [ + (0, 0), # Zero + (1, 0), # 1 second + (1, 500_000_000), # 1.5 seconds + (10, 250_000_000), # 10.25 seconds + (3600, 0), # 1 hour + (86400, 0), # 24 hours + ] + for tv_sec, tv_nsec in test_values: + # Convert PLC tuple to OPC-UA milliseconds + opcua_val = convert_value_for_opcua("TIME", (tv_sec, tv_nsec)) + # Convert back to PLC tuple + plc_val = convert_value_for_plc("TIME", opcua_val) + # Compare (note: nanosecond precision is truncated to milliseconds) + expected_sec = tv_sec + expected_nsec = (tv_nsec // 1_000_000) * 1_000_000 # Truncate to ms precision + assert plc_val == (expected_sec, expected_nsec) + + def test_tod_roundtrip(self): + """TOD values should survive round-trip conversion.""" + test_values = [ + (0, 0), # Midnight + (3600, 0), # 1:00 AM + (43200, 0), # Noon + (43200, 500_000_000), # Noon + 500ms + ] + for tv_sec, tv_nsec in test_values: + opcua_val = convert_value_for_opcua("TOD", (tv_sec, tv_nsec)) + plc_val = convert_value_for_plc("TOD", opcua_val) + expected_sec = tv_sec + expected_nsec = (tv_nsec // 1_000_000) * 1_000_000 + assert plc_val == (expected_sec, expected_nsec) + + +class TestTimespecConversionHelpers: + """Tests for TIME conversion helper functions.""" + + def test_timespec_to_milliseconds_basic(self): + """Basic conversion: 1 second = 1000 ms.""" + assert timespec_to_milliseconds(1, 0) == 1000 + assert timespec_to_milliseconds(0, 0) == 0 + assert timespec_to_milliseconds(10, 0) == 10000 + + def test_timespec_to_milliseconds_with_nanoseconds(self): + """Conversion with nanoseconds: 1.5 sec = 1500 ms.""" + assert timespec_to_milliseconds(1, 500_000_000) == 1500 + assert timespec_to_milliseconds(0, 100_000_000) == 100 + assert timespec_to_milliseconds(2, 750_000_000) == 2750 + + def test_timespec_to_milliseconds_truncates_submillisecond(self): + """Sub-millisecond nanoseconds should be truncated.""" + # 999999 ns = 0.999999 ms, should truncate to 0 ms + assert timespec_to_milliseconds(0, 999_999) == 0 + # 1000000 ns = 1 ms + assert timespec_to_milliseconds(0, 1_000_000) == 1 + + def test_milliseconds_to_timespec_basic(self): + """Basic reverse conversion.""" + assert milliseconds_to_timespec(1000) == (1, 0) + assert milliseconds_to_timespec(0) == (0, 0) + assert milliseconds_to_timespec(10000) == (10, 0) + + def test_milliseconds_to_timespec_with_remainder(self): + """Conversion with fractional seconds.""" + assert milliseconds_to_timespec(1500) == (1, 500_000_000) + assert milliseconds_to_timespec(100) == (0, 100_000_000) + assert milliseconds_to_timespec(2750) == (2, 750_000_000) + + def test_roundtrip_conversion(self): + """Roundtrip conversion should preserve millisecond precision.""" + for ms in [0, 1, 100, 999, 1000, 1500, 10000, 86400000]: + tv_sec, tv_nsec = milliseconds_to_timespec(ms) + result = timespec_to_milliseconds(tv_sec, tv_nsec) + assert result == ms + + def test_large_time_values(self): + """Large values should work correctly.""" + # 24 hours in seconds = 86400 + assert timespec_to_milliseconds(86400, 0) == 86_400_000 + assert milliseconds_to_timespec(86_400_000) == (86400, 0) + + # 1 week in milliseconds + week_ms = 7 * 24 * 60 * 60 * 1000 + tv_sec, tv_nsec = milliseconds_to_timespec(week_ms) + assert tv_sec == 7 * 24 * 60 * 60 + assert tv_nsec == 0 + + +class TestTimeDatatypesConstant: + """Tests for TIME_DATATYPES constant.""" + + def test_time_datatypes_contains_all_time_types(self): + """TIME_DATATYPES should contain all time-related types.""" + assert "TIME" in TIME_DATATYPES + assert "DATE" in TIME_DATATYPES + assert "TOD" in TIME_DATATYPES + assert "DT" in TIME_DATATYPES + + def test_time_datatypes_is_frozen(self): + """TIME_DATATYPES should be immutable.""" + assert isinstance(TIME_DATATYPES, frozenset) From b48c16ce122bb72370d4aeaf3bdcd5b43fa61db4 Mon Sep 17 00:00:00 2001 From: Marcone Tenorio Date: Wed, 21 Jan 2026 09:07:16 -0300 Subject: [PATCH 3/5] fix: Improve DATE and TOD type conversions for OPC-UA - DATE: Now extracts only the date portion, setting time to 00:00:00 (ignores HH:MM:SS from the IEC_TIMESPEC value) - TOD: Now uses current date (today) + time from IEC_TIMESPEC (ignores YYYY-MM-DD, only uses HH:MM:SS) Changed mapping from Int64 to DateTime for better OPC-UA client compatibility - DT: Unchanged - full DateTime conversion (both date and time) - TIME: Unchanged - Int64 milliseconds representation Updated tests to reflect the new behavior. Co-Authored-By: Claude Opus 4.5 --- .../plugins/python/opcua/opcua_utils.py | 99 +++++++++++++++---- .../plugins/opcua/test_type_conversions.py | 51 +++++++--- 2 files changed, 118 insertions(+), 32 deletions(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_utils.py b/core/src/drivers/plugins/python/opcua/opcua_utils.py index 010dbe9b..b6ef665d 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_utils.py +++ b/core/src/drivers/plugins/python/opcua/opcua_utils.py @@ -35,10 +35,10 @@ def map_plc_to_opcua_type(plc_type: str) -> ua.VariantType: "FLOAT": ua.VariantType.Float, "REAL": ua.VariantType.Float, # IEC 61131-3 REAL = 32-bit float "STRING": ua.VariantType.String, - # TIME-related types - represented as Int64 (milliseconds for duration types) + # TIME-related types "TIME": ua.VariantType.Int64, # Duration in milliseconds - "TOD": ua.VariantType.Int64, # Time of day in milliseconds since midnight - "DATE": ua.VariantType.DateTime, # Date as OPC-UA DateTime + "TOD": ua.VariantType.DateTime, # Time of day as DateTime (current date + time) + "DATE": ua.VariantType.DateTime, # Date as DateTime (date only, time set to 00:00:00) "DT": ua.VariantType.DateTime, # Date and Time as OPC-UA DateTime } mapped_type = type_mapping.get(plc_type.upper(), ua.VariantType.Variant) @@ -130,29 +130,67 @@ def convert_value_for_opcua(datatype: str, value: Any) -> Any: return 0 elif datatype.upper() == "TOD": - # TOD (Time of Day) - milliseconds since midnight + # TOD (Time of Day) - use current date + time from timespec + # IEC_TIMESPEC stores seconds since midnight for TOD + from datetime import datetime, timezone + if isinstance(value, tuple) and len(value) == 2: tv_sec, tv_nsec = value - return timespec_to_milliseconds(tv_sec, tv_nsec) - elif isinstance(value, int): + # tv_sec contains seconds since midnight + hours = tv_sec // 3600 + minutes = (tv_sec % 3600) // 60 + seconds = tv_sec % 60 + microseconds = tv_nsec // 1000 + + # Use current date (today) + time from timespec + today = datetime.now(timezone.utc).date() + try: + dt = datetime( + today.year, today.month, today.day, + hours, minutes, seconds, microseconds, + tzinfo=timezone.utc + ) + return dt + except (ValueError, OverflowError): + # Invalid time, return today at midnight + return datetime(today.year, today.month, today.day, tzinfo=timezone.utc) + elif isinstance(value, datetime): return value - return 0 + # Default: today at midnight + today = datetime.now(timezone.utc).date() + return datetime(today.year, today.month, today.day, tzinfo=timezone.utc) - elif datatype.upper() in ["DATE", "DT"]: - # DATE and DT map to OPC-UA DateTime + elif datatype.upper() == "DATE": + # DATE - use date from timespec, set time to 00:00:00 # IEC_TIMESPEC stores seconds since epoch (1970-01-01) from datetime import datetime, timezone if isinstance(value, tuple) and len(value) == 2: tv_sec, tv_nsec = value - # Convert to datetime object + try: + # Convert to datetime and extract date only + dt = datetime.fromtimestamp(tv_sec, tz=timezone.utc) + # Set time to 00:00:00 (ignore time portion) + dt = dt.replace(hour=0, minute=0, second=0, microsecond=0) + return dt + except (OSError, OverflowError, ValueError): + return datetime(1970, 1, 1, tzinfo=timezone.utc) + elif isinstance(value, datetime): + # Zero out time portion + return value.replace(hour=0, minute=0, second=0, microsecond=0) + return datetime(1970, 1, 1, tzinfo=timezone.utc) + + elif datatype.upper() == "DT": + # DT (Date and Time) - full DateTime conversion + from datetime import datetime, timezone + + if isinstance(value, tuple) and len(value) == 2: + tv_sec, tv_nsec = value try: dt = datetime.fromtimestamp(tv_sec, tz=timezone.utc) - # Add microseconds (nsec / 1000) dt = dt.replace(microsecond=tv_nsec // 1000) return dt except (OSError, OverflowError, ValueError): - # Invalid timestamp, return epoch return datetime(1970, 1, 1, tzinfo=timezone.utc) elif isinstance(value, datetime): return value @@ -227,21 +265,44 @@ def convert_value_for_plc(datatype: str, value: Any) -> Any: return milliseconds_to_timespec(ms) elif datatype.upper() == "TOD": - # TOD (Time of Day) - convert milliseconds to timespec - ms = int(value) - return milliseconds_to_timespec(ms) + # TOD (Time of Day) - extract time portion only (seconds since midnight) + from datetime import datetime, timezone + + if isinstance(value, datetime): + # Calculate seconds since midnight + tv_sec = value.hour * 3600 + value.minute * 60 + value.second + tv_nsec = value.microsecond * 1000 + return (tv_sec, tv_nsec) + elif isinstance(value, (int, float)): + # Assume it's seconds since midnight + return (int(value), 0) + return (0, 0) + + elif datatype.upper() == "DATE": + # DATE - extract date only, set time to 00:00:00 + from datetime import datetime, timezone + + if isinstance(value, datetime): + # Create datetime at midnight for the date, then get timestamp + dt_midnight = value.replace(hour=0, minute=0, second=0, microsecond=0) + tv_sec = int(dt_midnight.timestamp()) + return (tv_sec, 0) + elif isinstance(value, (int, float)): + # Assume it's a timestamp, zero out time portion + dt = datetime.fromtimestamp(int(value), tz=timezone.utc) + dt_midnight = dt.replace(hour=0, minute=0, second=0, microsecond=0) + return (int(dt_midnight.timestamp()), 0) + return (0, 0) - elif datatype.upper() in ["DATE", "DT"]: - # Convert OPC-UA DateTime to IEC_TIMESPEC tuple + elif datatype.upper() == "DT": + # DT (Date and Time) - full DateTime conversion from datetime import datetime, timezone if isinstance(value, datetime): - # Convert datetime to seconds since epoch tv_sec = int(value.timestamp()) tv_nsec = value.microsecond * 1000 return (tv_sec, tv_nsec) elif isinstance(value, (int, float)): - # Assume it's a timestamp return (int(value), 0) return (0, 0) diff --git a/tests/pytest/plugins/opcua/test_type_conversions.py b/tests/pytest/plugins/opcua/test_type_conversions.py index 71746f96..e6b30267 100644 --- a/tests/pytest/plugins/opcua/test_type_conversions.py +++ b/tests/pytest/plugins/opcua/test_type_conversions.py @@ -91,9 +91,9 @@ def test_time_mapping(self): assert map_plc_to_opcua_type("Time") == ua.VariantType.Int64 def test_tod_mapping(self): - """TOD (Time of Day) should map to Int64 (milliseconds since midnight).""" - assert map_plc_to_opcua_type("TOD") == ua.VariantType.Int64 - assert map_plc_to_opcua_type("tod") == ua.VariantType.Int64 + """TOD (Time of Day) should map to DateTime (current date + time).""" + assert map_plc_to_opcua_type("TOD") == ua.VariantType.DateTime + assert map_plc_to_opcua_type("tod") == ua.VariantType.DateTime def test_date_mapping(self): """DATE should map to DateTime.""" @@ -241,11 +241,25 @@ def test_time_from_int(self): assert convert_value_for_opcua("TIME", 0) == 0 def test_tod_from_tuple(self): - """TOD from tuple should convert to milliseconds since midnight.""" - # 1 hour = 3600 seconds = 3600000 ms - assert convert_value_for_opcua("TOD", (3600, 0)) == 3600000 - # 1 hour + 500ms - assert convert_value_for_opcua("TOD", (3600, 500_000_000)) == 3600500 + """TOD from tuple should convert to DateTime with current date + time.""" + from datetime import datetime, timezone + + # 1 hour = 3600 seconds since midnight -> 01:00:00 + result = convert_value_for_opcua("TOD", (3600, 0)) + assert isinstance(result, datetime) + assert result.hour == 1 + assert result.minute == 0 + assert result.second == 0 + # Date should be today + today = datetime.now(timezone.utc).date() + assert result.date() == today + + # 1 hour + 30 minutes + 45 seconds = 5445 seconds + result2 = convert_value_for_opcua("TOD", (5445, 500_000_000)) + assert result2.hour == 1 + assert result2.minute == 30 + assert result2.second == 45 + assert result2.microsecond == 500000 # 500ms = 500000 microseconds def test_time_large_values(self): """TIME should handle large values (hours/days).""" @@ -354,11 +368,22 @@ def test_time_to_tuple(self): assert convert_value_for_plc("TIME", 10250) == (10, 250_000_000) def test_tod_to_tuple(self): - """TOD milliseconds should convert to (tv_sec, tv_nsec) tuple.""" - # 3600000 ms = 1 hour - assert convert_value_for_plc("TOD", 3600000) == (3600, 0) - # 3600500 ms = 1 hour + 500ms - assert convert_value_for_plc("TOD", 3600500) == (3600, 500_000_000) + """TOD DateTime should convert to (tv_sec, tv_nsec) tuple (seconds since midnight).""" + from datetime import datetime, timezone + + # 01:00:00 = 3600 seconds since midnight + dt1 = datetime(2025, 6, 15, 1, 0, 0, tzinfo=timezone.utc) + assert convert_value_for_plc("TOD", dt1) == (3600, 0) + + # 01:30:45.500000 = 5445 seconds + 500000 microseconds + dt2 = datetime(2025, 6, 15, 1, 30, 45, 500000, tzinfo=timezone.utc) + result = convert_value_for_plc("TOD", dt2) + assert result[0] == 5445 # seconds since midnight + assert result[1] == 500_000_000 # nanoseconds + + # Midnight = 0 seconds + dt3 = datetime(2025, 6, 15, 0, 0, 0, tzinfo=timezone.utc) + assert convert_value_for_plc("TOD", dt3) == (0, 0) def test_time_large_values_to_tuple(self): """TIME should handle large milliseconds values.""" From fb20d2e7b5cf3944a7330623a8548bd2668a1203 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcone=20Ten=C3=B3rio=20da=20Silva=20Filho?= Date: Wed, 21 Jan 2026 14:20:09 -0300 Subject: [PATCH 4/5] Update core/src/drivers/plugins/python/opcua/opcua_config_template.json Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../src/drivers/plugins/python/opcua/opcua_config_template.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/drivers/plugins/python/opcua/opcua_config_template.json b/core/src/drivers/plugins/python/opcua/opcua_config_template.json index b9e31e5b..65a610a0 100644 --- a/core/src/drivers/plugins/python/opcua/opcua_config_template.json +++ b/core/src/drivers/plugins/python/opcua/opcua_config_template.json @@ -166,7 +166,7 @@ "display_name": "Time of Day", "datatype": "TOD", "initial_value": 0, - "description": "Current time of day (TOD type, milliseconds since midnight)", + "description": "Current time of day (TOD type, mapped to OPC-UA DateTime)", "index": 8, "permissions": {"viewer": "r", "operator": "r", "engineer": "rw"} } From 90acd953654d142d64ba690f8c68a9ea81f0078d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Marcone=20Ten=C3=B3rio=20da=20Silva=20Filho?= Date: Wed, 21 Jan 2026 14:23:14 -0300 Subject: [PATCH 5/5] Update tests/pytest/plugins/opcua/test_type_conversions.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- tests/pytest/plugins/opcua/test_type_conversions.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/tests/pytest/plugins/opcua/test_type_conversions.py b/tests/pytest/plugins/opcua/test_type_conversions.py index e6b30267..1e2a730c 100644 --- a/tests/pytest/plugins/opcua/test_type_conversions.py +++ b/tests/pytest/plugins/opcua/test_type_conversions.py @@ -244,15 +244,19 @@ def test_tod_from_tuple(self): """TOD from tuple should convert to DateTime with current date + time.""" from datetime import datetime, timezone + # Capture the date before conversion to avoid flakiness across midnight + today_before = datetime.now(timezone.utc).date() + # 1 hour = 3600 seconds since midnight -> 01:00:00 result = convert_value_for_opcua("TOD", (3600, 0)) assert isinstance(result, datetime) assert result.hour == 1 assert result.minute == 0 assert result.second == 0 - # Date should be today - today = datetime.now(timezone.utc).date() - assert result.date() == today + # Date should correspond to the current date at the time of conversion, + # allowing for the possibility that midnight passes during the test. + today_after = datetime.now(timezone.utc).date() + assert today_before <= result.date() <= today_after # 1 hour + 30 minutes + 45 seconds = 5445 seconds result2 = convert_value_for_opcua("TOD", (5445, 500_000_000))