Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions core/src/drivers/plugins/python/opcua/opcua_config_template.json
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,36 @@
"description": "Example read-only variable",
"index": 5,
"permissions": {"viewer": "r", "operator": "r", "engineer": "r"}
},
{
"node_id": "PLC.Example.cycle_time",
"browse_name": "cycle_time",
"display_name": "Cycle Time",
"datatype": "TIME",
"initial_value": 0,
"description": "PLC scan cycle time (TIME type, represented as milliseconds in OPC-UA)",
"index": 6,
"permissions": {"viewer": "r", "operator": "r", "engineer": "rw"}
},
{
"node_id": "PLC.Example.timer_preset",
"browse_name": "timer_preset",
"display_name": "Timer Preset",
"datatype": "TIME",
"initial_value": 0,
"description": "Timer preset value (TIME type)",
"index": 7,
"permissions": {"viewer": "r", "operator": "rw", "engineer": "rw"}
},
{
"node_id": "PLC.Example.time_of_day",
"browse_name": "time_of_day",
"display_name": "Time of Day",
"datatype": "TOD",
"initial_value": 0,
"description": "Current time of day (TOD type, mapped to OPC-UA DateTime)",
"index": 8,
"permissions": {"viewer": "r", "operator": "r", "engineer": "rw"}
}
],
"structures": [
Expand Down
74 changes: 72 additions & 2 deletions core/src/drivers/plugins/python/opcua/opcua_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,30 @@
STR_LEN_SIZE = 1 # sizeof(__strlen_t) = sizeof(int8_t) = 1
STRING_TOTAL_SIZE = STR_LEN_SIZE + STR_MAX_LEN # 127 bytes

# IEC 61131-3 TIME/DATE constants (must match iec_types.h)
TIMESPEC_SIZE = 8 # sizeof(IEC_TIMESPEC) = 2 * sizeof(int32_t) = 8 bytes

# TIME-related datatypes that use IEC_TIMESPEC structure
TIME_DATATYPES = frozenset(["TIME", "DATE", "TOD", "DT"])


class IEC_TIMESPEC(ctypes.Structure):
"""
ctypes structure matching IEC_TIMESPEC from iec_types.h.

typedef struct {
int32_t tv_sec; // Seconds
int32_t tv_nsec; // Nanoseconds
} IEC_TIMESPEC;

Used for TIME, DATE, TOD, and DT types.
"""

_fields_ = [
("tv_sec", ctypes.c_int32),
("tv_nsec", ctypes.c_int32),
]


class IEC_STRING(ctypes.Structure):
"""
Expand All @@ -40,16 +64,20 @@ class IEC_STRING(ctypes.Structure):
]


def read_memory_direct(address: int, size: int) -> Any:
def read_memory_direct(address: int, size: int, datatype: str = None) -> Any:
"""
Read value directly from memory using cached address.

Args:
address: Memory address to read from
size: Size of the variable in bytes
datatype: Optional datatype hint for ambiguous sizes (e.g., TIME vs LINT)

Returns:
Value read from memory (int for numeric types, str for STRING)
Value read from memory:
- int for numeric types
- str for STRING
- tuple(tv_sec, tv_nsec) for TIME/DATE/TOD/DT

Raises:
RuntimeError: If memory access fails
Expand All @@ -66,6 +94,9 @@ def read_memory_direct(address: int, size: int) -> Any:
ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint32))
return ptr.contents.value
elif size == 8:
# Check if this is a TIME-related type
if datatype and datatype.upper() in TIME_DATATYPES:
return read_timespec_direct(address)
ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint64))
return ptr.contents.value
elif size == STRING_TOTAL_SIZE:
Expand Down Expand Up @@ -141,6 +172,45 @@ def write_string_direct(address: int, value: str) -> bool:
raise RuntimeError(f"String memory write error: {e}")


def read_timespec_direct(address: int) -> tuple:
"""
Read an IEC_TIMESPEC directly from memory.

Args:
address: Memory address of the IEC_TIMESPEC structure

Returns:
Tuple of (tv_sec, tv_nsec)
"""
try:
ptr = ctypes.cast(address, ctypes.POINTER(IEC_TIMESPEC))
timespec = ptr.contents
return (timespec.tv_sec, timespec.tv_nsec)
except Exception as e:
raise RuntimeError(f"Timespec memory access error: {e}")


def write_timespec_direct(address: int, tv_sec: int, tv_nsec: int) -> bool:
"""
Write an IEC_TIMESPEC to memory.

Args:
address: Memory address of the IEC_TIMESPEC structure
tv_sec: Seconds value (int32)
tv_nsec: Nanoseconds value (int32)

Returns:
True if successful
"""
try:
ptr = ctypes.cast(address, ctypes.POINTER(IEC_TIMESPEC))
ptr.contents.tv_sec = ctypes.c_int32(tv_sec).value
ptr.contents.tv_nsec = ctypes.c_int32(tv_nsec).value
return True
except Exception as e:
raise RuntimeError(f"Timespec memory write error: {e}")


def initialize_variable_cache(sba, indices: List[int]) -> Dict[int, VariableMetadata]:
"""Initialize metadata cache for direct memory access."""
try:
Expand Down
175 changes: 171 additions & 4 deletions core/src/drivers/plugins/python/opcua/opcua_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
from opcua_logging import log_info, log_warn, log_error


# TIME-related datatypes that use IEC_TIMESPEC structure
TIME_DATATYPES = frozenset(["TIME", "DATE", "TOD", "DT"])


def map_plc_to_opcua_type(plc_type: str) -> ua.VariantType:
"""Map plc datatype to OPC-UA VariantType."""
type_mapping = {
Expand All @@ -31,11 +35,45 @@ def map_plc_to_opcua_type(plc_type: str) -> ua.VariantType:
"FLOAT": ua.VariantType.Float,
"REAL": ua.VariantType.Float, # IEC 61131-3 REAL = 32-bit float
"STRING": ua.VariantType.String,
# TIME-related types
"TIME": ua.VariantType.Int64, # Duration in milliseconds
"TOD": ua.VariantType.DateTime, # Time of day as DateTime (current date + time)
"DATE": ua.VariantType.DateTime, # Date as DateTime (date only, time set to 00:00:00)
"DT": ua.VariantType.DateTime, # Date and Time as OPC-UA DateTime
}
mapped_type = type_mapping.get(plc_type.upper(), ua.VariantType.Variant)
return mapped_type


def timespec_to_milliseconds(tv_sec: int, tv_nsec: int) -> int:
"""
Convert IEC_TIMESPEC (tv_sec, tv_nsec) to milliseconds.

Args:
tv_sec: Seconds component
tv_nsec: Nanoseconds component

Returns:
Total time in milliseconds
"""
return (tv_sec * 1000) + (tv_nsec // 1_000_000)


def milliseconds_to_timespec(ms: int) -> tuple:
"""
Convert milliseconds to IEC_TIMESPEC format (tv_sec, tv_nsec).

Args:
ms: Time in milliseconds

Returns:
Tuple of (tv_sec, tv_nsec)
"""
tv_sec = ms // 1000
tv_nsec = (ms % 1000) * 1_000_000
return (tv_sec, tv_nsec)


def convert_value_for_opcua(datatype: str, value: Any) -> Any:
"""Convert PLC debug variable value to OPC-UA compatible format."""
# The debug utils return raw integer values based on variable size
Expand Down Expand Up @@ -79,10 +117,88 @@ def convert_value_for_opcua(datatype: str, value: Any) -> Any:

elif datatype.upper() in ["STRING", "String"]:
return str(value)


elif datatype.upper() == "TIME":
# TIME values are stored as IEC_TIMESPEC (tv_sec, tv_nsec)
# Convert to milliseconds for OPC-UA Int64 representation
if isinstance(value, tuple) and len(value) == 2:
tv_sec, tv_nsec = value
return timespec_to_milliseconds(tv_sec, tv_nsec)
elif isinstance(value, int):
# If already an integer, assume it's milliseconds
return value
return 0

elif datatype.upper() == "TOD":
# TOD (Time of Day) - use current date + time from timespec
# IEC_TIMESPEC stores seconds since midnight for TOD
from datetime import datetime, timezone

if isinstance(value, tuple) and len(value) == 2:
tv_sec, tv_nsec = value
# tv_sec contains seconds since midnight
hours = tv_sec // 3600
minutes = (tv_sec % 3600) // 60
seconds = tv_sec % 60
microseconds = tv_nsec // 1000

# Use current date (today) + time from timespec
today = datetime.now(timezone.utc).date()
try:
dt = datetime(
today.year, today.month, today.day,
hours, minutes, seconds, microseconds,
tzinfo=timezone.utc
)
return dt
except (ValueError, OverflowError):
# Invalid time, return today at midnight
return datetime(today.year, today.month, today.day, tzinfo=timezone.utc)
elif isinstance(value, datetime):
return value
# Default: today at midnight
today = datetime.now(timezone.utc).date()
return datetime(today.year, today.month, today.day, tzinfo=timezone.utc)

elif datatype.upper() == "DATE":
# DATE - use date from timespec, set time to 00:00:00
# IEC_TIMESPEC stores seconds since epoch (1970-01-01)
from datetime import datetime, timezone

if isinstance(value, tuple) and len(value) == 2:
tv_sec, tv_nsec = value
try:
# Convert to datetime and extract date only
dt = datetime.fromtimestamp(tv_sec, tz=timezone.utc)
# Set time to 00:00:00 (ignore time portion)
dt = dt.replace(hour=0, minute=0, second=0, microsecond=0)
return dt
except (OSError, OverflowError, ValueError):
return datetime(1970, 1, 1, tzinfo=timezone.utc)
elif isinstance(value, datetime):
# Zero out time portion
return value.replace(hour=0, minute=0, second=0, microsecond=0)
return datetime(1970, 1, 1, tzinfo=timezone.utc)

elif datatype.upper() == "DT":
# DT (Date and Time) - full DateTime conversion
from datetime import datetime, timezone

if isinstance(value, tuple) and len(value) == 2:
tv_sec, tv_nsec = value
try:
dt = datetime.fromtimestamp(tv_sec, tz=timezone.utc)
dt = dt.replace(microsecond=tv_nsec // 1000)
return dt
except (OSError, OverflowError, ValueError):
return datetime(1970, 1, 1, tzinfo=timezone.utc)
elif isinstance(value, datetime):
return value
return datetime(1970, 1, 1, tzinfo=timezone.utc)

else:
return value

except (ValueError, TypeError, OverflowError) as e:
# If conversion fails, return a safe default
log_warn(f"Failed to convert value {value} to OPC-UA format for {datatype}: {e}")
Expand All @@ -92,6 +208,8 @@ def convert_value_for_opcua(datatype: str, value: Any) -> Any:
return 0.0
elif datatype.upper() == "STRING":
return ""
elif datatype.upper() in TIME_DATATYPES:
return 0
else:
return 0

Expand Down Expand Up @@ -140,11 +258,58 @@ def convert_value_for_plc(datatype: str, value: Any) -> Any:

elif datatype.upper() in ["STRING", "String"]:
return str(value)


elif datatype.upper() == "TIME":
# Convert OPC-UA milliseconds (Int64) to IEC_TIMESPEC tuple
ms = int(value)
return milliseconds_to_timespec(ms)

elif datatype.upper() == "TOD":
# TOD (Time of Day) - extract time portion only (seconds since midnight)
from datetime import datetime, timezone

if isinstance(value, datetime):
# Calculate seconds since midnight
tv_sec = value.hour * 3600 + value.minute * 60 + value.second
tv_nsec = value.microsecond * 1000
return (tv_sec, tv_nsec)
elif isinstance(value, (int, float)):
# Assume it's seconds since midnight
return (int(value), 0)
return (0, 0)

elif datatype.upper() == "DATE":
# DATE - extract date only, set time to 00:00:00
from datetime import datetime, timezone

if isinstance(value, datetime):
# Create datetime at midnight for the date, then get timestamp
dt_midnight = value.replace(hour=0, minute=0, second=0, microsecond=0)
tv_sec = int(dt_midnight.timestamp())
return (tv_sec, 0)
elif isinstance(value, (int, float)):
# Assume it's a timestamp, zero out time portion
dt = datetime.fromtimestamp(int(value), tz=timezone.utc)
dt_midnight = dt.replace(hour=0, minute=0, second=0, microsecond=0)
return (int(dt_midnight.timestamp()), 0)
return (0, 0)

elif datatype.upper() == "DT":
# DT (Date and Time) - full DateTime conversion
from datetime import datetime, timezone

if isinstance(value, datetime):
tv_sec = int(value.timestamp())
tv_nsec = value.microsecond * 1000
return (tv_sec, tv_nsec)
elif isinstance(value, (int, float)):
return (int(value), 0)
return (0, 0)

else:
# For unknown types, try to preserve the value
return value

except (ValueError, TypeError, OverflowError) as e:
# If conversion fails, log and return a safe default
log_warn(f"Failed to convert value {value} to {datatype}, using default: {e}")
Expand All @@ -154,6 +319,8 @@ def convert_value_for_plc(datatype: str, value: Any) -> Any:
return 0
elif datatype.upper() == "STRING":
return ""
elif datatype.upper() in TIME_DATATYPES:
return (0, 0)
else:
return 0

Expand Down
Loading