diff --git a/core/src/CMakeLists.txt b/core/src/CMakeLists.txt index 078914de..6d543d5c 100644 --- a/core/src/CMakeLists.txt +++ b/core/src/CMakeLists.txt @@ -32,6 +32,7 @@ add_executable(plc_main ${CMAKE_SOURCE_DIR}/core/src/plc_app/scan_cycle_manager.c ${CMAKE_SOURCE_DIR}/core/src/drivers/plugin_driver.c ${CMAKE_SOURCE_DIR}/core/src/drivers/plugin_config.c + ${CMAKE_SOURCE_DIR}/core/src/drivers/plugin_utils.c ${CMAKE_SOURCE_DIR}/core/src/plc_app/unix_socket.c ${CMAKE_SOURCE_DIR}/core/src/plc_app/debug_handler.c ) diff --git a/core/src/drivers/plugin_driver.c b/core/src/drivers/plugin_driver.c index 37da2d28..106b6d5a 100644 --- a/core/src/drivers/plugin_driver.c +++ b/core/src/drivers/plugin_driver.c @@ -4,6 +4,7 @@ #include "../plc_app/image_tables.h" #include "plugin_config.h" #include "plugin_driver.h" +#include "plugin_utils.h" #include #include #include @@ -469,6 +470,9 @@ void *generate_structured_args_with_driver(plugin_type_t type, plugin_driver_t * // Initialize mutex functions args->mutex_take = plugin_mutex_take; args->mutex_give = plugin_mutex_give; + args->get_var_list = get_var_list; + args->get_var_size = get_var_size; + args->get_var_count = get_var_count; // Set buffer mutex from driver args->buffer_mutex = &driver->buffer_mutex; diff --git a/core/src/drivers/plugin_driver.h b/core/src/drivers/plugin_driver.h index 855dd56f..e28e307b 100644 --- a/core/src/drivers/plugin_driver.h +++ b/core/src/drivers/plugin_driver.h @@ -6,6 +6,7 @@ #include "plugin_config.h" #include "python_plugin_bridge.h" #include +#include // Maximum number of plugins #define MAX_PLUGINS 16 @@ -55,6 +56,9 @@ typedef struct // Mutex functions int (*mutex_take)(pthread_mutex_t *mutex); int (*mutex_give)(pthread_mutex_t *mutex); + void (*get_var_list)(size_t num_vars, size_t *indexes, void **result); + size_t (*get_var_size)(size_t idx); + uint16_t (*get_var_count)(void); pthread_mutex_t *buffer_mutex; char plugin_specific_config_file_path[256]; diff --git a/core/src/drivers/plugin_utils.c b/core/src/drivers/plugin_utils.c new file mode 100644 index 00000000..d515ac95 --- /dev/null +++ b/core/src/drivers/plugin_utils.c @@ -0,0 +1,28 @@ +#include "plugin_utils.h" +#include "../plc_app/image_tables.h" +#include +#include +#include + +// Wrapper function to get list of variable addresses +void get_var_list(size_t num_vars, size_t *indexes, void **result) +{ + for (size_t i = 0; i < num_vars; i++) { + size_t idx = indexes[i]; + if (idx >= ext_get_var_count()) { + result[i] = NULL; + } else { + result[i] = ext_get_var_addr(idx); + } + } +} + +size_t get_var_size(size_t idx) +{ + return ext_get_var_size(idx); +} + +uint16_t get_var_count(void) +{ + return ext_get_var_count(); +} \ No newline at end of file diff --git a/core/src/drivers/plugin_utils.h b/core/src/drivers/plugin_utils.h new file mode 100644 index 00000000..52a6ebfc --- /dev/null +++ b/core/src/drivers/plugin_utils.h @@ -0,0 +1,11 @@ +#ifndef PLUGIN_UTILS_H +#define PLUGIN_UTILS_H + +#include +#include + +void get_var_list(size_t num_vars, size_t *indexes, void **result); +size_t get_var_size(size_t idx); +uint16_t get_var_count(void); + +#endif // PLUGIN_UTILS_H \ No newline at end of file diff --git a/core/src/drivers/plugins/python/shared/API_SPECIFICATION.md b/core/src/drivers/plugins/python/shared/API_SPECIFICATION.md new file mode 100644 index 00000000..7b27d1b8 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/API_SPECIFICATION.md @@ -0,0 +1,154 @@ +# SafeBufferAccess API Specification + +## Overview +This document specifies the complete public API of the `SafeBufferAccess` class that must be maintained for backward compatibility during refactoring. + +## Class Structure + +### Constructor +```python +SafeBufferAccess(runtime_args: PluginRuntimeArgs) +``` + +**Parameters:** +- `runtime_args`: `PluginRuntimeArgs` instance + +**Attributes:** +- `is_valid: bool` - Whether the instance is properly initialized +- `error_msg: str` - Error message if initialization failed + +### Public Methods + +#### Mutex Management +```python +acquire_mutex() -> (bool, str) +release_mutex() -> (bool, str) +``` + +#### Boolean Buffer Operations +```python +read_bool_input(buffer_idx: int, bit_idx: int, thread_safe: bool = True) -> (bool, str) +read_bool_output(buffer_idx: int, bit_idx: int, thread_safe: bool = True) -> (bool, str) +write_bool_output(buffer_idx: int, bit_idx: int, value: bool, thread_safe: bool = True) -> (bool, str) +``` + +#### Byte Buffer Operations +```python +read_byte_input(buffer_idx: int, thread_safe: bool = True) -> (int, str) +read_byte_output(buffer_idx: int, thread_safe: bool = True) -> (int, str) +write_byte_output(buffer_idx: int, value: int, thread_safe: bool = True) -> (bool, str) +``` + +#### Integer Buffer Operations (16-bit) +```python +read_int_input(buffer_idx: int, thread_safe: bool = True) -> (int, str) +read_int_output(buffer_idx: int, thread_safe: bool = True) -> (int, str) +write_int_output(buffer_idx: int, value: int, thread_safe: bool = True) -> (bool, str) +read_int_memory(buffer_idx: int, thread_safe: bool = True) -> (int, str) +write_int_memory(buffer_idx: int, value: int, thread_safe: bool = True) -> (bool, str) +``` + +#### Double Integer Buffer Operations (32-bit) +```python +read_dint_input(buffer_idx: int, thread_safe: bool = True) -> (int, str) +read_dint_output(buffer_idx: int, thread_safe: bool = True) -> (int, str) +write_dint_output(buffer_idx: int, value: int, thread_safe: bool = True) -> (bool, str) +read_dint_memory(buffer_idx: int, thread_safe: bool = True) -> (int, str) +write_dint_memory(buffer_idx: int, value: int, thread_safe: bool = True) -> (bool, str) +``` + +#### Long Integer Buffer Operations (64-bit) +```python +read_lint_input(buffer_idx: int, thread_safe: bool = True) -> (int, str) +read_lint_output(buffer_idx: int, thread_safe: bool = True) -> (int, str) +write_lint_output(buffer_idx: int, value: int, thread_safe: bool = True) -> (bool, str) +read_lint_memory(buffer_idx: int, thread_safe: bool = True) -> (int, str) +write_lint_memory(buffer_idx: int, value: int, thread_safe: bool = True) -> (bool, str) +``` + +#### Batch Operations +```python +batch_read_values(operations: List[Tuple]) -> (List[Tuple], str) +batch_write_values(operations: List[Tuple]) -> (List[Tuple], str) +batch_mixed_operations(read_operations: List[Tuple], write_operations: List[Tuple]) -> (Dict, str) +``` + +#### Debug/Variable Operations +```python +get_var_list(indexes: List[int]) -> (List[int], str) +get_var_size(index: int) -> (int, str) +get_var_value(index: int) -> (Any, str) +set_var_value(index: int, value: Any) -> (bool, str) +get_var_count() -> (int, str) +get_var_info(index: int) -> (Dict, str) +``` + +#### Configuration Operations +```python +get_config_path() -> (str, str) +get_config_file_args_as_map() -> (Dict, str) +``` + +## Parameter Details + +### Common Parameters +- `buffer_idx: int` - Buffer index (0-based) +- `bit_idx: int` - Bit index within buffer (for boolean operations) +- `value: int/bool` - Value to write +- `thread_safe: bool = True` - Whether to use mutex for thread-safe access + +### Value Ranges +- `bool`: `True`/`False` +- `byte`: `0-255` +- `int`: `0-65535` (16-bit unsigned) +- `dint`: `0-4294967295` (32-bit unsigned) +- `lint`: `0-18446744073709551615` (64-bit unsigned) + +### Return Values +- **Read operations**: `(value, error_message: str)` +- **Write operations**: `(success: bool, error_message: str)` +- **Batch operations**: `(results: List/Dict, error_message: str)` + +## Error Handling +- Invalid buffer/bit indices return appropriate error messages +- Out-of-range values return validation errors +- Mutex acquisition failures return error messages +- All operations return consistent `(result, message)` tuples + +## Thread Safety +- Default behavior uses mutex for thread-safe access +- `thread_safe=False` bypasses mutex (for manual control) +- Mutex operations: `acquire_mutex()`/`release_mutex()` + +## Batch Operations Format + +### Read Operations +```python +[ + ('buffer_type', buffer_idx, bit_idx), # for bool operations + ('buffer_type', buffer_idx), # for other types + # ... +] +``` + +### Write Operations +```python +[ + ('buffer_type', buffer_idx, value, bit_idx), # for bool operations + ('buffer_type', buffer_idx, value), # for other types + # ... +] +``` + +### Buffer Types +- `'bool_input'`, `'bool_output'` +- `'byte_input'`, `'byte_output'` +- `'int_input'`, `'int_output'`, `'int_memory'` +- `'dint_input'`, `'dint_output'`, `'dint_memory'` +- `'lint_input'`, `'lint_output'`, `'lint_memory'` + +## Compatibility Requirements +- All existing tests must pass without modification +- All existing plugins must continue to work +- API signatures must remain identical +- Behavior must be preserved exactly diff --git a/core/src/drivers/plugins/python/shared/__init__.py b/core/src/drivers/plugins/python/shared/__init__.py index c6e0cf16..efd5f456 100644 --- a/core/src/drivers/plugins/python/shared/__init__.py +++ b/core/src/drivers/plugins/python/shared/__init__.py @@ -1,18 +1,50 @@ """ -OpenPLC Python Plugin Configuration Package +OpenPLC Python Plugin Shared Components Package + +This package provides shared components for OpenPLC Python plugins, including +buffer access utilities, configuration handling, and type definitions. """ +# Core buffer access functionality (refactored modular architecture) +from .safe_buffer_access_refactored import SafeBufferAccess + +# Legacy compatibility - import from original implementation if needed +from .python_plugin_types import ( + PluginRuntimeArgs, + PluginStructureValidator, + safe_extract_runtime_args_from_capsule +) + +# Configuration models from .plugin_config_decode.plugin_config_contact import PluginConfigContract, PluginConfigError from .plugin_config_decode.modbus_master_config_model import ModbusIoPointConfig, ModbusMasterConfig +# Component interfaces (for advanced users who want to extend the system) +from .component_interfaces import ( + IBufferType, IMutexManager, IBufferValidator, IBufferAccessor, + IBatchProcessor, IDebugUtils, IConfigHandler, ISafeBufferAccess +) + __all__ = [ - # abstract contract for each protocol config model + # Core buffer access (refactored) + 'SafeBufferAccess', + + # Legacy type definitions (maintained for compatibility) + 'PluginRuntimeArgs', + 'PluginStructureValidator', + 'safe_extract_runtime_args_from_capsule', + + # Configuration models 'PluginConfigContract', - # top level config instance - 'PluginConfigError', - # concrete protocol config models + 'PluginConfigError', 'ModbusIoPointConfig', 'ModbusMasterConfig', + + # Component interfaces (for extension) + 'IBufferType', 'IMutexManager', 'IBufferValidator', 'IBufferAccessor', + 'IBatchProcessor', 'IDebugUtils', 'IConfigHandler', 'ISafeBufferAccess', + + # Future extensions # 'EthercatConfig', # 'EthercatIoPointConfig', ] diff --git a/core/src/drivers/plugins/python/shared/batch_processor.py b/core/src/drivers/plugins/python/shared/batch_processor.py new file mode 100644 index 00000000..332acfc7 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/batch_processor.py @@ -0,0 +1,248 @@ +""" +Batch Processor for OpenPLC Python Plugin System + +This module handles batch operations for optimized buffer access. +It processes multiple read/write operations with a single mutex acquisition. +""" + +from typing import List, Tuple, Dict, Any +try: + # Try relative imports first (when used as package) + from .component_interfaces import IBatchProcessor + from .buffer_accessor import GenericBufferAccessor + from .mutex_manager import MutexManager +except ImportError: + # Fall back to absolute imports (when testing standalone) + from component_interfaces import IBatchProcessor + from buffer_accessor import GenericBufferAccessor + from mutex_manager import MutexManager + + +class BatchProcessor(IBatchProcessor): + """ + Processes batch operations for optimized buffer access. + + This class handles multiple buffer operations in a single batch, + acquiring the mutex only once for the entire batch. This provides + better performance for operations that need to access multiple buffers. + """ + + def __init__(self, buffer_accessor: GenericBufferAccessor, mutex_manager: MutexManager): + """ + Initialize the batch processor. + + Args: + buffer_accessor: GenericBufferAccessor instance + mutex_manager: MutexManager instance + """ + self.accessor = buffer_accessor + self.mutex = mutex_manager + + def process_batch_reads(self, operations: List[Tuple]) -> Tuple[List[Tuple], str]: + """ + Process multiple read operations in a batch. + + Args: + operations: List of read operation tuples + Format: [('buffer_type', buffer_idx, bit_idx), ...] + bit_idx is optional for non-boolean operations + + Returns: + Tuple[List[Tuple], str]: (results, error_message) + results format: [(success, value, error_msg), ...] + """ + if not operations: + return [], "No operations provided" + + results = [] + + # Acquire mutex once for all operations + if not self.mutex.acquire(): + return [], "Failed to acquire mutex for batch read" + + try: + for operation in operations: + try: + if len(operation) < 2: + results.append((False, None, "Invalid operation format")) + continue + + buffer_type = operation[0] + buffer_idx = operation[1] + bit_idx = operation[2] if len(operation) > 2 else None + + # Perform read operation without additional mutex + value, msg = self.accessor.read_buffer(buffer_type, buffer_idx, bit_idx, thread_safe=False) + + if msg == "Success": + results.append((True, value, msg)) + else: + results.append((False, None, msg)) + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + results.append((False, None, f"Exception during batch read operation: {e}")) + + return results, "Batch read completed" + + finally: + # Always release the mutex + self.mutex.release() + + def process_batch_writes(self, operations: List[Tuple]) -> Tuple[List[Tuple], str]: + """ + Process multiple write operations in a batch. + + Args: + operations: List of write operation tuples + Format: [('buffer_type', buffer_idx, value, bit_idx), ...] + bit_idx is optional for non-boolean operations + + Returns: + Tuple[List[Tuple], str]: (results, error_message) + results format: [(success, error_msg), ...] + """ + if not operations: + return [], "No operations provided" + + results = [] + + # Acquire mutex once for all operations + if not self.mutex.acquire(): + return [], "Failed to acquire mutex for batch write" + + try: + for operation in operations: + try: + if len(operation) < 3: + results.append((False, "Invalid operation format")) + continue + + buffer_type = operation[0] + buffer_idx = operation[1] + value = operation[2] + bit_idx = operation[3] if len(operation) > 3 else None + + # Perform write operation without additional mutex + success, msg = self.accessor.write_buffer(buffer_type, buffer_idx, value, bit_idx, thread_safe=False) + + results.append((success, msg)) + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + results.append((False, f"Exception during batch write operation: {e}")) + + return results, "Batch write completed" + + finally: + # Always release the mutex + self.mutex.release() + + def process_mixed_operations(self, read_operations: List[Tuple], + write_operations: List[Tuple]) -> Tuple[Dict, str]: + """ + Process mixed read and write operations in a batch. + + Args: + read_operations: List of read operation tuples (same format as process_batch_reads) + write_operations: List of write operation tuples (same format as process_batch_writes) + + Returns: + Tuple[Dict, str]: (results_dict, error_message) + results_dict format: {'reads': [(success, value, error_msg), ...], + 'writes': [(success, error_msg), ...]} + """ + if not read_operations and not write_operations: + return {}, "No operations provided" + + read_results = [] + write_results = [] + + # Acquire mutex once for all operations + if not self.mutex.acquire(): + return {}, "Failed to acquire mutex for mixed operations" + + try: + # Process read operations first (typically safer order) + if read_operations: + for operation in read_operations: + try: + if len(operation) < 2: + read_results.append((False, None, "Invalid operation format")) + continue + + buffer_type = operation[0] + buffer_idx = operation[1] + bit_idx = operation[2] if len(operation) > 2 else None + + # Perform read operation without additional mutex + value, msg = self.accessor.read_buffer(buffer_type, buffer_idx, bit_idx, thread_safe=False) + + if msg == "Success": + read_results.append((True, value, msg)) + else: + read_results.append((False, None, msg)) + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + read_results.append((False, None, f"Exception during mixed read operation: {e}")) + + # Process write operations + if write_operations: + for operation in write_operations: + try: + if len(operation) < 3: + write_results.append((False, "Invalid operation format")) + continue + + buffer_type = operation[0] + buffer_idx = operation[1] + value = operation[2] + bit_idx = operation[3] if len(operation) > 3 else None + + # Perform write operation without additional mutex + success, msg = self.accessor.write_buffer(buffer_type, buffer_idx, value, bit_idx, thread_safe=False) + + write_results.append((success, msg)) + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + write_results.append((False, f"Exception during mixed write operation: {e}")) + + results = { + 'reads': read_results, + 'writes': write_results + } + + return results, "Mixed batch operations completed" + + finally: + # Always release the mutex + self.mutex.release() + + def validate_batch_operations(self, operations: List[Tuple], is_read: bool = True) -> Tuple[bool, str]: + """ + Validate batch operations before processing. + + Args: + operations: List of operation tuples to validate + is_read: True for read operations, False for write operations + + Returns: + Tuple[bool, str]: (is_valid, error_message) + """ + if not operations: + return True, "Empty batch is valid" + + expected_min_length = 2 if is_read else 3 + + for i, operation in enumerate(operations): + if not isinstance(operation, (list, tuple)): + return False, f"Operation {i} is not a list or tuple" + + if len(operation) < expected_min_length: + op_type = "read" if is_read else "write" + return False, f"Operation {i} has insufficient parameters for {op_type}" + + # Additional validation could be added here + buffer_type = operation[0] + if not isinstance(buffer_type, str): + return False, f"Operation {i}: buffer_type must be a string" + + return True, "Batch operations are valid" diff --git a/core/src/drivers/plugins/python/shared/buffer_accessor.py b/core/src/drivers/plugins/python/shared/buffer_accessor.py new file mode 100644 index 00000000..2ed7c318 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/buffer_accessor.py @@ -0,0 +1,229 @@ +""" +Generic Buffer Accessor for OpenPLC Python Plugin System + +This module provides generic buffer access operations that work with any buffer type. +It encapsulates the low-level ctypes operations and provides a clean interface +for reading and writing buffer values. +""" + +import ctypes +from typing import Any, Optional, Tuple +try: + # Try relative imports first (when used as package) + from .component_interfaces import IBufferAccessor + from .buffer_validator import BufferValidator + from .mutex_manager import MutexManager + from .buffer_types import get_buffer_types +except ImportError: + # Fall back to absolute imports (when testing standalone) + from component_interfaces import IBufferAccessor + from buffer_validator import BufferValidator + from mutex_manager import MutexManager + from buffer_types import get_buffer_types + + +class GenericBufferAccessor(IBufferAccessor): + """ + Generic buffer accessor that handles all buffer types uniformly. + + This class encapsulates the complex ctypes buffer access logic and provides + a clean, type-agnostic interface for buffer operations. It eliminates the + massive code duplication that existed in the original SafeBufferAccess class. + """ + + def __init__(self, runtime_args, validator: BufferValidator, mutex_manager: MutexManager): + """ + Initialize the generic buffer accessor. + + Args: + runtime_args: PluginRuntimeArgs instance + validator: BufferValidator instance + mutex_manager: MutexManager instance + """ + self.args = runtime_args + self.validator = validator + self.mutex = mutex_manager + self.buffer_types = get_buffer_types() + + def read_buffer(self, buffer_type: str, buffer_idx: int, bit_idx: Optional[int] = None, + thread_safe: bool = True) -> Tuple[Any, str]: + """ + Generic buffer read operation. + + Args: + buffer_type: Buffer type name (e.g., 'bool_input', 'int_output') + buffer_idx: Buffer index + bit_idx: Bit index (required for boolean operations) + thread_safe: Whether to use mutex protection + + Returns: + Tuple[Any, str]: (value, error_message) + """ + # Validate parameters + is_valid, msg = self.validator.validate_operation_params(buffer_type, buffer_idx, bit_idx) + if not is_valid: + return None, msg + + # Get buffer type info + buffer_type_obj, direction = self.buffer_types.get_buffer_info(buffer_type) + + # Define the read operation + def do_read(): + return self._perform_read(buffer_type, buffer_type_obj, direction, buffer_idx, bit_idx) + + # Execute with or without mutex + if thread_safe: + return self.mutex.with_mutex(do_read) + else: + return do_read() + + def write_buffer(self, buffer_type: str, buffer_idx: int, value: Any, + bit_idx: Optional[int] = None, thread_safe: bool = True) -> Tuple[bool, str]: + """ + Generic buffer write operation. + + Args: + buffer_type: Buffer type name (e.g., 'bool_output', 'int_output') + buffer_idx: Buffer index + value: Value to write + bit_idx: Bit index (required for boolean operations) + thread_safe: Whether to use mutex protection + + Returns: + Tuple[bool, str]: (success, error_message) + """ + # Validate parameters + is_valid, msg = self.validator.validate_operation_params(buffer_type, buffer_idx, bit_idx, value) + if not is_valid: + return False, msg + + # Get buffer type info + buffer_type_obj, direction = self.buffer_types.get_buffer_info(buffer_type) + + # Define the write operation + def do_write(): + return self._perform_write(buffer_type, buffer_type_obj, direction, buffer_idx, value, bit_idx) + + # Execute with or without mutex + if thread_safe: + result = self.mutex.with_mutex(do_write) + return result if isinstance(result, tuple) else (result, "Success") + else: + return do_write() + + def get_buffer_pointer(self, buffer_type: str) -> Optional[ctypes.POINTER]: + """ + Get the buffer pointer for a given type. + + Args: + buffer_type: Buffer type name + + Returns: + Optional[ctypes.POINTER]: Buffer pointer or None if invalid + """ + try: + buffer_type_obj, direction = self.buffer_types.get_buffer_info(buffer_type) + + # Map buffer type to runtime_args field + field_map = { + ('bool', 'input'): 'bool_input', + ('bool', 'output'): 'bool_output', + ('byte', 'input'): 'byte_input', + ('byte', 'output'): 'byte_output', + ('int', 'input'): 'int_input', + ('int', 'output'): 'int_output', + ('int', 'memory'): 'int_memory', + ('dint', 'input'): 'dint_input', + ('dint', 'output'): 'dint_output', + ('dint', 'memory'): 'dint_memory', + ('lint', 'input'): 'lint_input', + ('lint', 'output'): 'lint_output', + ('lint', 'memory'): 'lint_memory', + } + + field_name = field_map.get((buffer_type_obj.name, direction)) + if field_name: + return getattr(self.args, field_name, None) + + return None + + except (AttributeError, TypeError, ValueError): + return None + + def _perform_read(self, buffer_type: str, buffer_type_obj, direction: str, + buffer_idx: int, bit_idx: Optional[int]) -> Tuple[Any, str]: + """ + Internal method to perform the actual buffer read operation. + """ + try: + # Get the appropriate buffer pointer + buffer_ptr = self.get_buffer_pointer(buffer_type) + if buffer_ptr is None or buffer_ptr.contents is None: + return None, f"Buffer pointer not available for {buffer_type}" + + # Handle boolean operations (require bit indexing) + if buffer_type_obj.name == 'bool': + if bit_idx is None: + return None, "Bit index required for boolean operations" + + # Access the specific bit within the buffer + value = bool(buffer_ptr[buffer_idx][bit_idx].contents.value) + return value, "Success" + + # Handle other buffer types (direct value access) + else: + value = buffer_ptr[buffer_idx].contents.value + return value, "Success" + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + return None, f"Buffer read error: {e}" + + def _perform_write(self, buffer_type: str, buffer_type_obj, direction: str, + buffer_idx: int, value: Any, bit_idx: Optional[int]) -> Tuple[bool, str]: + """ + Internal method to perform the actual buffer write operation. + """ + try: + # Get the appropriate buffer pointer + buffer_ptr = self.get_buffer_pointer(buffer_type) + if buffer_ptr is None or buffer_ptr.contents is None: + return False, f"Buffer pointer not available for {buffer_type}" + + # Handle boolean operations (require bit indexing) + if buffer_type_obj.name == 'bool': + if bit_idx is None: + return False, "Bit index required for boolean operations" + + # Set the specific bit within the buffer + buffer_ptr[buffer_idx][bit_idx].contents.value = 1 if value else 0 + return True, "Success" + + # Handle other buffer types (direct value assignment) + else: + buffer_ptr[buffer_idx].contents.value = value + return True, "Success" + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + return False, f"Buffer write error: {e}" + + def _handle_buffer_exception(self, exception, operation_name: str) -> str: + """ + Centralized exception handling for buffer operations. + + Args: + exception: The caught exception + operation_name: Name of the operation that failed + + Returns: + str: Formatted error message + """ + if isinstance(exception, (AttributeError, TypeError)): + return f"Structure access error during {operation_name}: {exception}" + elif isinstance(exception, (ValueError, OverflowError)): + return f"Value validation error during {operation_name}: {exception}" + elif isinstance(exception, OSError): + return f"System error during {operation_name}: {exception}" + elif isinstance(exception, MemoryError): + return f"Memory error during {operation_name}: {exception}" + else: + return f"Unexpected error during {operation_name}: {exception}" diff --git a/core/src/drivers/plugins/python/shared/buffer_types.py b/core/src/drivers/plugins/python/shared/buffer_types.py new file mode 100644 index 00000000..d0b51f82 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/buffer_types.py @@ -0,0 +1,232 @@ +""" +Buffer Type Definitions for OpenPLC Python Plugin System + +This module defines all buffer types and their characteristics in a centralized, +extensible way. Adding a new buffer type requires only adding a new class here. +""" + +import ctypes +from typing import Tuple + +try: + # Try relative imports first (when used as package) + from .component_interfaces import IBufferType +except ImportError: + # Fall back to absolute imports (when testing standalone) + from component_interfaces import IBufferType + + +class BoolBufferType(IBufferType): + """Boolean buffer type (1-bit values accessed via bit indexing)""" + + @property + def name(self) -> str: + return "bool" + + @property + def size_bytes(self) -> int: + return 1 + + @property + def value_range(self) -> Tuple[int, int]: + return (0, 1) + + @property + def requires_bit_index(self) -> bool: + return True + + @property + def ctype_class(self) -> type: + return ctypes.c_uint8 + + +class ByteBufferType(IBufferType): + """Byte buffer type (8-bit unsigned integer)""" + + @property + def name(self) -> str: + return "byte" + + @property + def size_bytes(self) -> int: + return 1 + + @property + def value_range(self) -> Tuple[int, int]: + return (0, 255) + + @property + def requires_bit_index(self) -> bool: + return False + + @property + def ctype_class(self) -> type: + return ctypes.c_uint8 + + +class IntBufferType(IBufferType): + """Integer buffer type (16-bit unsigned integer)""" + + @property + def name(self) -> str: + return "int" + + @property + def size_bytes(self) -> int: + return 2 + + @property + def value_range(self) -> Tuple[int, int]: + return (0, 65535) + + @property + def requires_bit_index(self) -> bool: + return False + + @property + def ctype_class(self) -> type: + return ctypes.c_uint16 + + +class DintBufferType(IBufferType): + """Double integer buffer type (32-bit unsigned integer)""" + + @property + def name(self) -> str: + return "dint" + + @property + def size_bytes(self) -> int: + return 4 + + @property + def value_range(self) -> Tuple[int, int]: + return (0, 4294967295) + + @property + def requires_bit_index(self) -> bool: + return False + + @property + def ctype_class(self) -> type: + return ctypes.c_uint32 + + +class LintBufferType(IBufferType): + """Long integer buffer type (64-bit unsigned integer)""" + + @property + def name(self) -> str: + return "lint" + + @property + def size_bytes(self) -> int: + return 8 + + @property + def value_range(self) -> Tuple[int, int]: + return (0, 18446744073709551615) + + @property + def requires_bit_index(self) -> bool: + return False + + @property + def ctype_class(self) -> type: + return ctypes.c_uint64 + + +# Core buffer types (module-level constants) +_BUFFER_TYPES = { + 'bool': BoolBufferType(), + 'byte': ByteBufferType(), + 'int': IntBufferType(), + 'dint': DintBufferType(), + 'lint': LintBufferType(), +} + +# Buffer type mappings (used by the facade to map method names to types) +_BUFFER_MAPPINGS = { + # Boolean buffers + 'bool_input': ('bool', 'input'), + 'bool_output': ('bool', 'output'), + + # Byte buffers + 'byte_input': ('byte', 'input'), + 'byte_output': ('byte', 'output'), + + # Integer buffers (16-bit) + 'int_input': ('int', 'input'), + 'int_output': ('int', 'output'), + 'int_memory': ('int', 'memory'), + + # Double integer buffers (32-bit) + 'dint_input': ('dint', 'input'), + 'dint_output': ('dint', 'output'), + 'dint_memory': ('dint', 'memory'), + + # Long integer buffers (64-bit) + 'lint_input': ('lint', 'input'), + 'lint_output': ('lint', 'output'), + 'lint_memory': ('lint', 'memory'), +} + + +class BufferTypes: + """ + Utility class for accessing buffer type definitions and metadata. + + This class provides a centralized way to access buffer type definitions + and metadata. It's used by validators and accessors to understand buffer + characteristics. + """ + + @staticmethod + def get_type(type_name: str) -> IBufferType: + """Get buffer type definition by name""" + if type_name not in _BUFFER_TYPES: + raise ValueError(f"Unknown buffer type: {type_name}") + return _BUFFER_TYPES[type_name] + + @staticmethod + def get_buffer_info(buffer_name: str) -> Tuple[IBufferType, str]: + """ + Get buffer type and direction for a buffer name + + Args: + buffer_name: e.g., 'bool_input', 'int_output', 'dint_memory' + + Returns: + Tuple of (IBufferType, direction) where direction is 'input', 'output', or 'memory' + """ + if buffer_name not in _BUFFER_MAPPINGS: + raise ValueError(f"Unknown buffer name: {buffer_name}") + + type_name, direction = _BUFFER_MAPPINGS[buffer_name] + buffer_type = BufferTypes.get_type(type_name) + return buffer_type, direction + + @staticmethod + def get_all_types() -> dict: + """Get all buffer type definitions""" + return _BUFFER_TYPES.copy() + + @staticmethod + def get_all_buffers() -> dict: + """Get all buffer name mappings""" + return _BUFFER_MAPPINGS.copy() + + @staticmethod + def validate_type_exists(type_name: str) -> bool: + """Check if a buffer type exists""" + return type_name in _BUFFER_TYPES + + @staticmethod + def validate_buffer_exists(buffer_name: str) -> bool: + """Check if a buffer name exists""" + return buffer_name in _BUFFER_MAPPINGS + + +def get_buffer_types() -> BufferTypes: + """Get the BufferTypes utility class""" + return BufferTypes diff --git a/core/src/drivers/plugins/python/shared/buffer_validator.py b/core/src/drivers/plugins/python/shared/buffer_validator.py new file mode 100644 index 00000000..103107b9 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/buffer_validator.py @@ -0,0 +1,223 @@ +""" +Buffer Validator for OpenPLC Python Plugin System + +This module provides centralized validation logic for buffer operations. +It validates buffer indices, bit indices, value ranges, and operation parameters. +""" + +from typing import Any, Optional, Tuple + +try: + # Try relative imports first (when used as package) + from .component_interfaces import IBufferValidator + from .buffer_types import get_buffer_types +except ImportError: + # Fall back to absolute imports (when testing standalone) + from component_interfaces import IBufferValidator + from buffer_types import get_buffer_types + + +class BufferValidator(IBufferValidator): + """ + Centralized validation for buffer operations. + + This class consolidates all validation logic that was previously scattered + throughout the SafeBufferAccess class. It provides comprehensive validation + for buffer indices, bit indices, value ranges, and operation parameters. + """ + + def __init__(self, runtime_args): + """ + Initialize the buffer validator. + + Args: + runtime_args: PluginRuntimeArgs instance + """ + self.args = runtime_args + self.buffer_types = get_buffer_types() + + def validate_buffer_index(self, buffer_idx: int, buffer_type: str) -> Tuple[bool, str]: + """ + Validate buffer index for a given buffer type. + + Args: + buffer_idx: Buffer index to validate + buffer_type: Buffer type name (e.g., 'bool_input', 'int_output') + + Returns: + Tuple[bool, str]: (is_valid, error_message) + """ + try: + # Check if buffer type exists + if not self.buffer_types.validate_buffer_exists(buffer_type): + return False, f"Unknown buffer type: {buffer_type}" + + # Validate index range + if buffer_idx < 0: + return False, f"Buffer index cannot be negative: {buffer_idx}" + + if buffer_idx >= self.args.buffer_size: + return False, f"Buffer index out of range: {buffer_idx} >= {self.args.buffer_size}" + + return True, "Success" + + except (AttributeError, TypeError) as e: + return False, f"Validation error: {e}" + + def validate_bit_index(self, bit_idx: int) -> Tuple[bool, str]: + """ + Validate bit index for boolean operations. + + Args: + bit_idx: Bit index to validate (0-63 for 64-bit buffers) + + Returns: + Tuple[bool, str]: (is_valid, error_message) + """ + try: + if bit_idx < 0: + return False, f"Bit index cannot be negative: {bit_idx}" + + if bit_idx >= self.args.bits_per_buffer: + return False, f"Bit index out of range: {bit_idx} >= {self.args.bits_per_buffer}" + + return True, "Success" + + except (AttributeError, TypeError) as e: + return False, f"Bit index validation error: {e}" + + def validate_value_range(self, value: Any, buffer_type: str) -> Tuple[bool, str]: + """ + Validate that a value is within the acceptable range for a buffer type. + + Args: + value: Value to validate + buffer_type: Buffer type name (e.g., 'bool_input', 'int_output') + + Returns: + Tuple[bool, str]: (is_valid, error_message) + """ + try: + # Get buffer type info + buffer_type_obj, _ = self.buffer_types.get_buffer_info(buffer_type) + min_val, max_val = buffer_type_obj.value_range + + # Handle boolean values + if buffer_type_obj.name == 'bool': + if isinstance(value, bool): + return True, "Success" + elif isinstance(value, (int, float)): + if value in (0, 1): + return True, "Success" + else: + return False, f"Boolean value must be 0 or 1, got: {value}" + else: + return False, f"Invalid type for boolean buffer: {type(value)}" + + # Handle numeric values + if not isinstance(value, (int, float)): + return False, f"Value must be numeric, got: {type(value)}" + + # Convert to int for range checking + int_value = int(value) + + if int_value < min_val: + return False, f"Value too small: {int_value} < {min_val}" + + if int_value > max_val: + return False, f"Value too large: {int_value} > {max_val}" + + return True, "Success" + + except (AttributeError, TypeError, ValueError) as e: + return False, f"Value validation error: {e}" + + def validate_operation_params(self, buffer_type: str, buffer_idx: int, + bit_idx: Optional[int] = None, value: Any = None) -> Tuple[bool, str]: + """ + Comprehensive validation of all operation parameters. + + Args: + buffer_type: Buffer type name + buffer_idx: Buffer index + bit_idx: Bit index (required for boolean operations) + value: Value to write (for write operations) + + Returns: + Tuple[bool, str]: (is_valid, error_message) + """ + try: + # Validate buffer index + is_valid, msg = self.validate_buffer_index(buffer_idx, buffer_type) + if not is_valid: + return False, msg + + # Get buffer type info + buffer_type_obj, _ = self.buffer_types.get_buffer_info(buffer_type) + + # Validate bit index for boolean operations + if buffer_type_obj.requires_bit_index: + if bit_idx is None: + return False, f"Bit index required for {buffer_type}" + is_valid, msg = self.validate_bit_index(bit_idx) + if not is_valid: + return False, msg + elif bit_idx is not None: + return False, f"Bit index not allowed for {buffer_type}" + + # Validate value if provided + if value is not None: + is_valid, msg = self.validate_value_range(value, buffer_type) + if not is_valid: + return False, msg + + return True, "All parameters valid" + + except (AttributeError, TypeError, ValueError) as e: + return False, f"Parameter validation error: {e}" + + def get_buffer_constraints(self, buffer_type: str) -> Tuple[Tuple[int, int], bool]: + """ + Get buffer constraints for a given type. + + Args: + buffer_type: Buffer type name + + Returns: + Tuple[Tuple[int, int], bool]: ((min_val, max_val), requires_bit_index) + """ + try: + buffer_type_obj, _ = self.buffer_types.get_buffer_info(buffer_type) + return buffer_type_obj.value_range, buffer_type_obj.requires_bit_index + except (AttributeError, TypeError, ValueError) as e: + # Return safe defaults on error + return ((0, 0), False) + + def is_buffer_type_supported(self, buffer_type: str) -> bool: + """ + Check if a buffer type is supported. + + Args: + buffer_type: Buffer type name to check + + Returns: + bool: True if supported, False otherwise + """ + return self.buffer_types.validate_buffer_exists(buffer_type) + + def get_validation_summary(self) -> dict: + """ + Get a summary of validation configuration. + + Returns: + dict: Validation configuration summary + """ + try: + return { + 'buffer_size': self.args.buffer_size, + 'bits_per_buffer': self.args.bits_per_buffer, + 'supported_buffer_types': list(self.buffer_types.get_all_buffers().keys()), + 'supported_base_types': list(self.buffer_types.get_all_types().keys()) + } + except (AttributeError, TypeError) as e: + return {'error': str(e)} diff --git a/core/src/drivers/plugins/python/shared/component_interfaces.py b/core/src/drivers/plugins/python/shared/component_interfaces.py new file mode 100644 index 00000000..a5cb5471 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/component_interfaces.py @@ -0,0 +1,220 @@ +""" +Component Interfaces for Modular SafeBufferAccess Architecture + +This module defines the abstract interfaces that each component must implement. +These interfaces ensure loose coupling and testability while maintaining API compatibility. +""" + +from abc import ABC, abstractmethod +from typing import List, Dict, Tuple, Any, Optional +import ctypes + + +class IBufferType: + """Interface for buffer type definitions""" + + @property + @abstractmethod + def name(self) -> str: + """Buffer type name (e.g., 'bool', 'byte', 'int')""" + pass + + @property + @abstractmethod + def size_bytes(self) -> int: + """Size in bytes of this buffer type""" + pass + + @property + @abstractmethod + def value_range(self) -> Tuple[int, int]: + """Valid value range (min, max)""" + pass + + @property + @abstractmethod + def requires_bit_index(self) -> bool: + """Whether this type requires bit index for access""" + pass + + @property + @abstractmethod + def ctype_class(self) -> type: + """Corresponding ctypes class""" + pass + + +class IMutexManager: + """Interface for mutex management operations""" + + @abstractmethod + def acquire(self) -> bool: + """Acquire the mutex. Returns True on success.""" + pass + + @abstractmethod + def release(self) -> bool: + """Release the mutex. Returns True on success.""" + pass + + @abstractmethod + def with_mutex(self, operation: callable) -> Any: + """Execute operation within mutex context. Returns operation result.""" + pass + + +class IBufferValidator: + """Interface for buffer validation operations""" + + @abstractmethod + def validate_buffer_index(self, buffer_idx: int, buffer_type: str) -> Tuple[bool, str]: + """Validate buffer index. Returns (is_valid, error_message)""" + pass + + @abstractmethod + def validate_bit_index(self, bit_idx: int) -> Tuple[bool, str]: + """Validate bit index for boolean operations. Returns (is_valid, error_message)""" + pass + + @abstractmethod + def validate_value_range(self, value: Any, buffer_type: str) -> Tuple[bool, str]: + """Validate value is within acceptable range. Returns (is_valid, error_message)""" + pass + + @abstractmethod + def validate_operation_params(self, buffer_type: str, buffer_idx: int, + bit_idx: Optional[int] = None, value: Any = None) -> Tuple[bool, str]: + """Comprehensive parameter validation. Returns (is_valid, error_message)""" + pass + + +class IBufferAccessor: + """Interface for generic buffer access operations""" + + @abstractmethod + def read_buffer(self, buffer_type: str, buffer_idx: int, bit_idx: Optional[int] = None, + thread_safe: bool = True) -> Tuple[Any, str]: + """Generic buffer read operation. Returns (value, error_message)""" + pass + + @abstractmethod + def write_buffer(self, buffer_type: str, buffer_idx: int, value: Any, + bit_idx: Optional[int] = None, thread_safe: bool = True) -> Tuple[bool, str]: + """Generic buffer write operation. Returns (success, error_message)""" + pass + + @abstractmethod + def get_buffer_pointer(self, buffer_type: str) -> Optional[ctypes.POINTER]: + """Get the buffer pointer for a given type. Returns None if invalid.""" + pass + + +class IBatchProcessor: + """Interface for batch operations""" + + @abstractmethod + def process_batch_reads(self, operations: List[Tuple]) -> Tuple[List[Tuple], str]: + """Process multiple read operations. Returns (results, error_message)""" + pass + + @abstractmethod + def process_batch_writes(self, operations: List[Tuple]) -> Tuple[List[Tuple], str]: + """Process multiple write operations. Returns (results, error_message)""" + pass + + @abstractmethod + def process_mixed_operations(self, read_operations: List[Tuple], + write_operations: List[Tuple]) -> Tuple[Dict, str]: + """Process mixed read/write operations. Returns (results_dict, error_message)""" + pass + + +class IDebugUtils: + """Interface for debug and variable operations""" + + @abstractmethod + def get_var_list(self, indexes: List[int]) -> Tuple[List[int], str]: + """Get addresses for variable indexes. Returns (addresses, error_message)""" + pass + + @abstractmethod + def get_var_size(self, index: int) -> Tuple[int, str]: + """Get size of variable at index. Returns (size, error_message)""" + pass + + @abstractmethod + def get_var_value(self, index: int) -> Tuple[Any, str]: + """Read variable value by index. Returns (value, error_message)""" + pass + + @abstractmethod + def set_var_value(self, index: int, value: Any) -> Tuple[bool, str]: + """Write variable value by index. Returns (success, error_message)""" + pass + + @abstractmethod + def get_var_count(self) -> Tuple[int, str]: + """Get total variable count. Returns (count, error_message)""" + pass + + @abstractmethod + def get_var_info(self, index: int) -> Tuple[Dict, str]: + """Get comprehensive variable info. Returns (info_dict, error_message)""" + pass + + +class IConfigHandler: + """Interface for configuration file operations""" + + @abstractmethod + def get_config_path(self) -> Tuple[str, str]: + """Get configuration file path. Returns (path, error_message)""" + pass + + @abstractmethod + def get_config_as_map(self) -> Tuple[Dict, str]: + """Parse config file as key-value map. Returns (config_dict, error_message)""" + pass + + +class ISafeBufferAccess: + """Main interface that maintains API compatibility""" + + @property + @abstractmethod + def is_valid(self) -> bool: + """Whether the instance is properly initialized""" + pass + + @property + @abstractmethod + def error_msg(self) -> str: + """Error message if initialization failed""" + pass + + # All the public methods from the original API must be implemented + # See API_SPECIFICATION.md for complete list + + @abstractmethod + def acquire_mutex(self) -> Tuple[bool, str]: + pass + + @abstractmethod + def release_mutex(self) -> Tuple[bool, str]: + pass + + # Boolean operations + @abstractmethod + def read_bool_input(self, buffer_idx: int, bit_idx: int, thread_safe: bool = True) -> Tuple[bool, str]: + pass + + @abstractmethod + def read_bool_output(self, buffer_idx: int, bit_idx: int, thread_safe: bool = True) -> Tuple[bool, str]: + pass + + @abstractmethod + def write_bool_output(self, buffer_idx: int, bit_idx: int, value: bool, thread_safe: bool = True) -> Tuple[bool, str]: + pass + + # And so on for all other methods... + # (Complete list in API_SPECIFICATION.md) diff --git a/core/src/drivers/plugins/python/shared/config_handler.py b/core/src/drivers/plugins/python/shared/config_handler.py new file mode 100644 index 00000000..b4234bf3 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/config_handler.py @@ -0,0 +1,178 @@ +""" +Configuration Handler for OpenPLC Python Plugin System + +This module handles plugin-specific configuration file operations. +It provides utilities for reading and parsing configuration files. +""" + +import json +import os +from typing import Dict, Tuple +try: + # Try relative imports first (when used as package) + from .component_interfaces import IConfigHandler +except ImportError: + # Fall back to absolute imports (when testing standalone) + from component_interfaces import IConfigHandler + + +class ConfigHandler(IConfigHandler): + """ + Handles plugin-specific configuration file operations. + + This class provides utilities for reading, parsing, and managing + plugin configuration files in JSON format. + """ + + def __init__(self, runtime_args): + """ + Initialize the configuration handler. + + Args: + runtime_args: PluginRuntimeArgs instance containing config path + """ + self.args = runtime_args + + def get_config_path(self) -> Tuple[str, str]: + """ + Retrieve the plugin-specific configuration file path. + + Returns: + Tuple[str, str]: (config_path, error_message) + """ + try: + config_path_bytes = self.args.plugin_specific_config_file_path + + # Handle different types of C char arrays + if isinstance(config_path_bytes, (bytes, bytearray)): + config_path = config_path_bytes.decode('utf-8').rstrip('\x00') + elif hasattr(config_path_bytes, 'value'): + config_path = config_path_bytes.value.decode('utf-8').rstrip('\x00') + elif hasattr(config_path_bytes, 'raw'): + config_path = config_path_bytes.raw.decode('utf-8').rstrip('\x00') + else: + # Try to convert to bytes first + config_path = bytes(config_path_bytes).decode('utf-8').rstrip('\x00') + + # Clean up the path - remove all whitespace and control characters + config_path = config_path.strip() + + return config_path, "Success" + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + return "", f"Exception retrieving config path: {e}" + + def get_config_as_map(self) -> Tuple[Dict, str]: + """ + Parse the plugin-specific configuration file as a key-value map. + + Supports JSON format for flexibility. Returns an empty dict if + the config file doesn't exist or can't be parsed. + + Returns: + Tuple[Dict, str]: (config_map, error_message) + """ + config_path, err_msg = self.get_config_path() + if not config_path: + return {}, f"Failed to get config path: {err_msg}" + + # Debug information (could be logged if needed) + debug_info = f"Config path: '{config_path}', CWD: '{os.getcwd()}'" + + try: + with open(config_path, 'r', encoding='utf-8') as config_file: + config_data = json.load(config_file) + if not isinstance(config_data, dict): + return {}, "Configuration file must contain a JSON object at the top level" + return config_data, "Success" + + except FileNotFoundError: + return {}, f"Configuration file not found: {config_path}" + + except json.JSONDecodeError as e: + return {}, f"JSON parsing error in config file {config_path}: {e}" + + except (OSError, MemoryError) as e: + return {}, f"Exception reading config file {config_path}: {e}" + + except UnicodeDecodeError as e: + return {}, f"Encoding error reading config file {config_path}: {e}" + + def validate_config_file(self) -> Tuple[bool, str]: + """ + Validate that the configuration file exists and is readable. + + Returns: + Tuple[bool, str]: (is_valid, error_message) + """ + config_path, err_msg = self.get_config_path() + if not config_path: + return False, f"Failed to get config path: {err_msg}" + + if not os.path.exists(config_path): + return False, f"Configuration file does not exist: {config_path}" + + if not os.path.isfile(config_path): + return False, f"Configuration path is not a file: {config_path}" + + try: + # Try to open and read the file + with open(config_path, 'r', encoding='utf-8') as f: + f.read(1) # Just read one character to test readability + return True, "Configuration file is valid and readable" + + except (OSError, UnicodeDecodeError) as e: + return False, f"Configuration file is not readable: {e}" + + def get_config_value(self, key: str, default=None): + """ + Get a specific configuration value by key. + + Args: + key: Configuration key to retrieve + default: Default value if key is not found + + Returns: + Any: Configuration value or default + """ + config_map, err_msg = self.get_config_as_map() + if not config_map: + return default + + return config_map.get(key, default) + + def has_config_key(self, key: str) -> bool: + """ + Check if a configuration key exists. + + Args: + key: Configuration key to check + + Returns: + bool: True if key exists, False otherwise + """ + config_map, _ = self.get_config_as_map() + return key in config_map + + def get_config_summary(self) -> Dict: + """ + Get a summary of configuration status. + + Returns: + Dict: Configuration summary with status and metadata + """ + config_path, path_err = self.get_config_path() + is_valid, valid_err = self.validate_config_file() + config_map, map_err = self.get_config_as_map() + + summary = { + 'config_path': config_path, + 'path_error': path_err if path_err != "Success" else None, + 'is_valid': is_valid, + 'validation_error': valid_err if not is_valid else None, + 'has_config': bool(config_map), + 'config_keys': list(config_map.keys()) if config_map else [], + 'config_error': map_err if map_err != "Success" else None + } + + return summary diff --git a/core/src/drivers/plugins/python/shared/debug_utils.py b/core/src/drivers/plugins/python/shared/debug_utils.py new file mode 100644 index 00000000..c0690ec8 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/debug_utils.py @@ -0,0 +1,290 @@ +""" +Debug Utilities for OpenPLC Python Plugin System + +This module provides debug and variable access utilities. +It handles variable listing, size queries, value reading/writing, and other debug operations. +""" + +from typing import List, Tuple, Dict, Any, Optional +import ctypes +try: + # Try relative imports first (when used as package) + from .component_interfaces import IDebugUtils +except ImportError: + # Fall back to absolute imports (when testing standalone) + from component_interfaces import IDebugUtils + + +class DebugUtils(IDebugUtils): + """ + Provides debug and variable access utilities. + + This class encapsulates all debug-related operations, including variable + discovery, size queries, and direct memory access for debugging purposes. + """ + + def __init__(self, runtime_args): + """ + Initialize the debug utilities. + + Args: + runtime_args: PluginRuntimeArgs instance + """ + self.args = runtime_args + + def get_var_list(self, indexes: List[int]) -> Tuple[List[int], str]: + """ + Get a list of variable addresses for the given indexes. + + Args: + indexes: List of integer indexes to get addresses for + + Returns: + Tuple[List[int], str]: (addresses, error_message) + addresses format: [address1, address2, ...] where each address is an int + """ + if not indexes: + return [], "No indexes provided" + + if not isinstance(indexes, (list, tuple)): + return [], "Indexes must be a list or tuple" + + try: + # Convert Python list to C arrays + num_vars = len(indexes) + indexes_array = (ctypes.c_size_t * num_vars)(*indexes) + result_array = (ctypes.c_void_p * num_vars)() + + # Call the C function + self.args.get_var_list(num_vars, indexes_array, result_array) + + # Convert result back to Python list + addresses = [] + for i in range(num_vars): + addr = result_array[i] + if addr is None: + addresses.append(None) + else: + # Convert void pointer to integer address + addresses.append(ctypes.cast(addr, ctypes.c_void_p).value) + + return addresses, "Success" + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + return [], f"Exception during get_var_list: {e}" + + def get_var_size(self, index: int) -> Tuple[int, str]: + """ + Get the size of a variable at the given index. + + Args: + index: Integer index of the variable + + Returns: + Tuple[int, str]: (size, error_message) + """ + try: + size = self.args.get_var_size(ctypes.c_size_t(index)) + return size, "Success" + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + return 0, f"Exception during get_var_size: {e}" + + def get_var_value(self, index: int) -> Tuple[Any, str]: + """ + Read a variable value by index with automatic type handling based on size. + + Args: + index: Integer index of the variable + + Returns: + Tuple[Any, str]: (value, error_message) + """ + try: + # Get variable address and size + addresses, addr_err = self.get_var_list([index]) + if not addresses or addresses[0] is None: + return None, f"Failed to get variable address: {addr_err}" + + size, size_err = self.get_var_size(index) + if size == 0: + return None, f"Failed to get variable size: {size_err}" + + address = addresses[0] + + # Read value based on size (since we can't determine exact type) + if size == 1: + # Could be BOOL, BOOL_O, or SINT - read as unsigned and let user interpret + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint8)) + value = value_ptr.contents.value + return value, "Success" + + elif size == 2: + # 16-bit unsigned integer + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint16)) + value = value_ptr.contents.value + return value, "Success" + + elif size == 4: + # 32-bit unsigned integer (could be TIME) + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint32)) + value = value_ptr.contents.value + return value, "Success" + + elif size == 8: + # 64-bit unsigned integer (could be TIME) + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint64)) + value = value_ptr.contents.value + return value, "Success" + + else: + return None, f"Unsupported variable size: {size}" + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + return None, f"Exception during get_var_value: {e}" + + def set_var_value(self, index: int, value: Any) -> Tuple[bool, str]: + """ + Write a variable value by index with size-based validation. + + Args: + index: Integer index of the variable + value: Value to write + + Returns: + Tuple[bool, str]: (success, error_message) + """ + try: + # Get variable address and size + addresses, addr_err = self.get_var_list([index]) + if not addresses or addresses[0] is None: + return False, f"Failed to get variable address: {addr_err}" + + size, size_err = self.get_var_size(index) + if size == 0: + return False, f"Failed to get variable size: {size_err}" + + address = addresses[0] + + # Validate value type + if not isinstance(value, (bool, int)): + return False, f"Invalid value type: expected bool or int, got {type(value)}" + + # Convert boolean to integer + if isinstance(value, bool): + value = 1 if value else 0 + + # Validate and write value based on size + if size == 1: + # 8-bit value (BOOL, BOOL_O, or SINT) + if not (0 <= value <= 255): + return False, f"Invalid value: {value} (must be 0-255 for 8-bit)" + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint8)) + value_ptr.contents.value = value + return True, "Success" + + elif size == 2: + # 16-bit unsigned integer + if not (0 <= value <= 65535): + return False, f"Invalid value: {value} (must be 0-65535 for 16-bit)" + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint16)) + value_ptr.contents.value = value + return True, "Success" + + elif size == 4: + # 32-bit unsigned integer + if not (0 <= value <= 4294967295): + return False, f"Invalid value: {value} (must be 0-4294967295 for 32-bit)" + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint32)) + value_ptr.contents.value = value + return True, "Success" + + elif size == 8: + # 64-bit unsigned integer + if not (0 <= value <= 18446744073709551615): + return False, f"Invalid value: {value} (must be 0-18446744073709551615 for 64-bit)" + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint64)) + value_ptr.contents.value = value + return True, "Success" + + else: + return False, f"Unsupported variable size: {size}" + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + return False, f"Exception during set_var_value: {e}" + + def get_var_count(self) -> Tuple[int, str]: + """ + Get the total number of debug variables available. + + Returns: + Tuple[int, str]: (count, error_message) + """ + try: + count = self.args.get_var_count() + return count, "Success" + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + return 0, f"Exception during get_var_count: {e}" + + def get_var_info(self, index: int) -> Tuple[Dict, str]: + """ + Get comprehensive information about a variable. + + Args: + index: Integer index of the variable + + Returns: + Tuple[Dict, str]: (info_dict, error_message) + info_dict format: {'address': int, 'size': int, 'inferred_type': str} + """ + try: + # Get variable address + addresses, addr_err = self.get_var_list([index]) + if not addresses or addresses[0] is None: + return {}, f"Failed to get variable address: {addr_err}" + + # Get variable size + size, size_err = self.get_var_size(index) + if size == 0: + return {}, f"Failed to get variable size: {size_err}" + + # Infer type from size + inferred_type = self._infer_var_type_from_size(size) + + info = { + 'address': addresses[0], + 'size': size, + 'inferred_type': inferred_type + } + + return info, "Success" + + except (AttributeError, TypeError, ValueError, OSError, MemoryError) as e: + return {}, f"Exception during get_var_info: {e}" + + def _infer_var_type_from_size(self, size: int) -> str: + """ + Infer variable type based on size. + + Based on debug.c size mappings: + - BOOL/BOOL_O: sizeof(BOOL) = 1 byte + - SINT: sizeof(SINT) = 1 byte + - TIME: sizeof(TIME) = 4 or 8 bytes + + Args: + size: Size in bytes + + Returns: + str: Inferred type name for debugging + """ + if size == 1: + return "BOOL_OR_SINT" # Cannot distinguish between BOOL and SINT by size alone + elif size == 2: + return "UINT16" + elif size == 4: + return "UINT32_OR_TIME" + elif size == 8: + return "UINT64_OR_TIME" + else: + return "UNKNOWN" diff --git a/core/src/drivers/plugins/python/shared/mutex_manager.py b/core/src/drivers/plugins/python/shared/mutex_manager.py new file mode 100644 index 00000000..9e01f333 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/mutex_manager.py @@ -0,0 +1,111 @@ +""" +Mutex Manager for OpenPLC Python Plugin System + +This module provides centralized mutex management for thread-safe buffer operations. +It encapsulates all mutex-related logic and provides a clean interface for acquiring +and releasing mutexes. +""" + +from typing import Any, Callable +try: + # Try relative imports first (when used as package) + from .component_interfaces import IMutexManager +except ImportError: + # Fall back to absolute imports (when testing standalone) + from component_interfaces import IMutexManager + + +class MutexManager(IMutexManager): + """ + Manages mutex operations for thread-safe buffer access. + + This class encapsulates all mutex-related functionality, providing a clean + interface for acquiring, releasing, and using mutexes in a thread-safe manner. + """ + + def __init__(self, runtime_args): + """ + Initialize the mutex manager. + + Args: + runtime_args: PluginRuntimeArgs instance containing mutex pointers + """ + self.args = runtime_args + + def acquire(self) -> bool: + """ + Acquire the buffer mutex. + + Returns: + bool: True if mutex was acquired successfully, False otherwise + """ + if not self.args.buffer_mutex: + return False + + result = self.args.mutex_take(self.args.buffer_mutex) + return result == 0 # 0 typically indicates success + + def release(self) -> bool: + """ + Release the buffer mutex. + + Returns: + bool: True if mutex was released successfully, False otherwise + """ + if not self.args.buffer_mutex: + return False + + result = self.args.mutex_give(self.args.buffer_mutex) + return result == 0 # 0 typically indicates success + + def with_mutex(self, operation: Callable[[], Any]) -> Any: + """ + Execute an operation within a mutex-protected context. + + This method acquires the mutex, executes the operation, and ensures + the mutex is always released, even if the operation raises an exception. + + Args: + operation: Callable that performs the operation to protect + + Returns: + Any: Result of the operation, or (False, error_message) if mutex acquisition fails + + Example: + result = mutex_manager.with_mutex(lambda: self._perform_buffer_read()) + """ + if not self.acquire(): + return False, "Failed to acquire mutex" + + try: + return operation() + finally: + self.release() + + def is_mutex_available(self) -> bool: + """ + Check if the mutex is available for use. + + Returns: + bool: True if mutex pointers are valid, False otherwise + """ + return ( + self.args.buffer_mutex is not None and + self.args.mutex_take is not None and + self.args.mutex_give is not None + ) + + def get_mutex_status(self) -> str: + """ + Get a human-readable status of the mutex configuration. + + Returns: + str: Status description + """ + if not self.args.buffer_mutex: + return "No buffer mutex available" + if not self.args.mutex_take: + return "No mutex_take function available" + if not self.args.mutex_give: + return "No mutex_give function available" + return "Mutex properly configured" diff --git a/core/src/drivers/plugins/python/shared/python_plugin_types.py b/core/src/drivers/plugins/python/shared/python_plugin_types.py index 46a61a2d..62223716 100644 --- a/core/src/drivers/plugins/python/shared/python_plugin_types.py +++ b/core/src/drivers/plugins/python/shared/python_plugin_types.py @@ -44,6 +44,9 @@ class PluginRuntimeArgs(ctypes.Structure): # Mutex function pointers ("mutex_take", ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p)), ("mutex_give", ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_void_p)), + ("get_var_list", ctypes.CFUNCTYPE(None, ctypes.c_size_t, ctypes.POINTER(ctypes.c_size_t), ctypes.POINTER(ctypes.c_void_p))), + ("get_var_size", ctypes.CFUNCTYPE(ctypes.c_size_t)), + ("get_var_count", ctypes.CFUNCTYPE(ctypes.c_uint16)), ("buffer_mutex", ctypes.c_void_p), ("plugin_specific_config_file_path", ctypes.c_char * 256), @@ -1063,14 +1066,268 @@ def release_mutex(self): """ if not self.is_valid: return False, f"Invalid runtime args: {self.error_msg}" - + try: if self.args.mutex_give(self.args.buffer_mutex) != 0: return False, "Failed to release mutex" return True, "Mutex released successfully" except (AttributeError, TypeError, ValueError, OverflowError, OSError, MemoryError) as e: return False, f"Exception during mutex release: {e}" + + def get_var_list(self, indexes): + """ + Get a list of variable addresses for the given indexes + Args: + indexes: List of integer indexes to get addresses for + Returns: (list, str) - (addresses, error_message) + addresses format: [address1, address2, ...] where each address is an int + """ + if not self.is_valid: + return [], f"Invalid runtime args: {self.error_msg}" + + if not indexes: + return [], "No indexes provided" + + if not isinstance(indexes, (list, tuple)): + return [], "Indexes must be a list or tuple" + + try: + # Convert Python list to C arrays + num_vars = len(indexes) + indexes_array = (ctypes.c_size_t * num_vars)(*indexes) + result_array = (ctypes.c_void_p * num_vars)() + + # Call the C function + self.args.get_var_list(num_vars, indexes_array, result_array) + + # Convert result back to Python list + addresses = [] + for i in range(num_vars): + addr = result_array[i] + if addr is None: + addresses.append(None) + else: + # Convert void pointer to integer address + addresses.append(ctypes.cast(addr, ctypes.c_void_p).value) + + return addresses, "Success" + + except (AttributeError, TypeError, ValueError, OverflowError, OSError, MemoryError) as e: + return [], f"Exception during get_var_list: {e}" + + def get_var_size(self, index): + """ + Get the size of a variable at the given index + Args: + index: Integer index of the variable + Returns: (int, str) - (size, error_message) + """ + if not self.is_valid: + return 0, f"Invalid runtime args: {self.error_msg}" + + try: + size = self.args.get_var_size(ctypes.c_size_t(index)) + return size, "Success" + + except (AttributeError, TypeError, ValueError, OverflowError, OSError, MemoryError) as e: + return 0, f"Exception during get_var_size: {e}" + + def _infer_var_type_from_size(self, size): + """ + Infer variable type based on size (since get_var_type doesn't exist in the C API) + Based on debug.c size mappings: + - BOOL/BOOL_O: sizeof(BOOL) = 1 byte + - SINT: sizeof(SINT) = 1 byte + - TIME: sizeof(TIME) = 4 or 8 bytes + Args: + size: Size in bytes + Returns: str - Inferred type name for debugging + """ + if size == 1: + return "BOOL_OR_SINT" # Cannot distinguish between BOOL and SINT by size alone + elif size == 2: + return "UINT16" + elif size == 4: + return "UINT32_OR_TIME" + elif size == 8: + return "UINT64_OR_TIME" + else: + return "UNKNOWN" + + def get_var_value(self, index): + """ + Read a variable value by index with automatic type handling based on size + Args: + index: Integer index of the variable + Returns: (value, str) - (value, error_message) + """ + if not self.is_valid: + return None, f"Invalid runtime args: {self.error_msg}" + + try: + # Get variable address and size + addresses, addr_err = self.get_var_list([index]) + if not addresses or addresses[0] is None: + return None, f"Failed to get variable address: {addr_err}" + + size, size_err = self.get_var_size(index) + if size == 0: + return None, f"Failed to get variable size: {size_err}" + + address = addresses[0] + + # Read value based on size (since we can't determine exact type) + if size == 1: + # Could be BOOL, BOOL_O, or SINT - read as unsigned and let user interpret + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint8)) + value = value_ptr.contents.value + return value, "Success" + + elif size == 2: + # 16-bit unsigned integer + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint16)) + value = value_ptr.contents.value + return value, "Success" + + elif size == 4: + # 32-bit unsigned integer (could be TIME) + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint32)) + value = value_ptr.contents.value + return value, "Success" + + elif size == 8: + # 64-bit unsigned integer (could be TIME) + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint64)) + value = value_ptr.contents.value + return value, "Success" + + else: + return None, f"Unsupported variable size: {size}" + + except (AttributeError, TypeError, ValueError, OverflowError, OSError, MemoryError) as e: + return None, f"Exception during get_var_value: {e}" + def set_var_value(self, index, value): + """ + Write a variable value by index with size-based validation + Args: + index: Integer index of the variable + value: Value to write + Returns: (bool, str) - (success, error_message) + """ + if not self.is_valid: + return False, f"Invalid runtime args: {self.error_msg}" + + try: + # Get variable address and size + addresses, addr_err = self.get_var_list([index]) + if not addresses or addresses[0] is None: + return False, f"Failed to get variable address: {addr_err}" + + size, size_err = self.get_var_size(index) + if size == 0: + return False, f"Failed to get variable size: {size_err}" + + address = addresses[0] + + # Validate value type + if not isinstance(value, (bool, int)): + return False, f"Invalid value type: expected bool or int, got {type(value)}" + + # Convert boolean to integer + if isinstance(value, bool): + value = 1 if value else 0 + + # Validate and write value based on size + if size == 1: + # 8-bit value (BOOL, BOOL_O, or SINT) + if not (0 <= value <= 255): + return False, f"Invalid value: {value} (must be 0-255 for 8-bit)" + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint8)) + value_ptr.contents.value = value + return True, "Success" + + elif size == 2: + # 16-bit unsigned integer + if not (0 <= value <= 65535): + return False, f"Invalid value: {value} (must be 0-65535 for 16-bit)" + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint16)) + value_ptr.contents.value = value + return True, "Success" + + elif size == 4: + # 32-bit unsigned integer + if not (0 <= value <= 4294967295): + return False, f"Invalid value: {value} (must be 0-4294967295 for 32-bit)" + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint32)) + value_ptr.contents.value = value + return True, "Success" + + elif size == 8: + # 64-bit unsigned integer + if not (0 <= value <= 18446744073709551615): + return False, f"Invalid value: {value} (must be 0-18446744073709551615 for 64-bit)" + value_ptr = ctypes.cast(address, ctypes.POINTER(ctypes.c_uint64)) + value_ptr.contents.value = value + return True, "Success" + + else: + return False, f"Unsupported variable size: {size}" + + except (AttributeError, TypeError, ValueError, OverflowError, OSError, MemoryError) as e: + return False, f"Exception during set_var_value: {e}" + + def get_var_count(self): + """ + Get the total number of debug variables available + Returns: (int, str) - (count, error_message) + """ + if not self.is_valid: + return 0, f"Invalid runtime args: {self.error_msg}" + + try: + count = self.args.get_var_count() + return count, "Success" + + except (AttributeError, TypeError, ValueError, OverflowError, OSError, MemoryError) as e: + return 0, f"Exception during get_var_count: {e}" + + def get_var_info(self, index): + """ + Get comprehensive information about a variable + Args: + index: Integer index of the variable + Returns: (dict, str) - (info_dict, error_message) + info_dict format: {'address': int, 'size': int, 'inferred_type': str} + """ + if not self.is_valid: + return {}, f"Invalid runtime args: {self.error_msg}" + + try: + # Get variable address + addresses, addr_err = self.get_var_list([index]) + if not addresses or addresses[0] is None: + return {}, f"Failed to get variable address: {addr_err}" + + # Get variable size + size, size_err = self.get_var_size(index) + if size == 0: + return {}, f"Failed to get variable size: {size_err}" + + # Infer type from size + inferred_type = self._infer_var_type_from_size(size) + + info = { + 'address': addresses[0], + 'size': size, + 'inferred_type': inferred_type + } + + return info, "Success" + + except (AttributeError, TypeError, ValueError, OverflowError, OSError, MemoryError) as e: + return {}, f"Exception during get_var_info: {e}" + # Batch operations for optimized mutex usage def batch_read_values(self, operations): """ diff --git a/core/src/drivers/plugins/python/shared/safe_buffer_access_refactored.py b/core/src/drivers/plugins/python/shared/safe_buffer_access_refactored.py new file mode 100644 index 00000000..22f17790 --- /dev/null +++ b/core/src/drivers/plugins/python/shared/safe_buffer_access_refactored.py @@ -0,0 +1,250 @@ +""" +Refactored SafeBufferAccess - Modular Architecture + +This module provides the refactored SafeBufferAccess class that maintains +100% API compatibility while using a modular component architecture internally. +""" + +from typing import List, Tuple, Dict, Any, Optional +try: + # Try relative imports first (when used as package) + from .component_interfaces import ISafeBufferAccess + from .buffer_types import get_buffer_types + from .mutex_manager import MutexManager + from .buffer_validator import BufferValidator + from .buffer_accessor import GenericBufferAccessor + from .batch_processor import BatchProcessor + from .debug_utils import DebugUtils + from .config_handler import ConfigHandler +except ImportError: + # Fall back to absolute imports (when testing standalone) + from component_interfaces import ISafeBufferAccess + from buffer_types import get_buffer_types + from mutex_manager import MutexManager + from buffer_validator import BufferValidator + from buffer_accessor import GenericBufferAccessor + from batch_processor import BatchProcessor + from debug_utils import DebugUtils + from config_handler import ConfigHandler + + +class SafeBufferAccess(ISafeBufferAccess): + """ + Refactored SafeBufferAccess with modular architecture. + + This class maintains 100% API compatibility with the original SafeBufferAccess + while internally using a clean, modular component architecture. All existing + code and tests will continue to work without modification. + + The modular architecture eliminates code duplication and improves maintainability: + - Buffer type definitions are centralized and extensible + - Validation logic is consolidated + - Mutex management is abstracted + - Buffer access is generic and type-agnostic + - Batch operations are optimized + - Debug utilities are separated + - Configuration handling is isolated + """ + + def __init__(self, runtime_args): + """ + Initialize SafeBufferAccess with modular components. + + Args: + runtime_args: PluginRuntimeArgs instance + """ + # Initialize all components + self.buffer_types = get_buffer_types() + self.mutex_manager = MutexManager(runtime_args) + self.validator = BufferValidator(runtime_args) + self.buffer_accessor = GenericBufferAccessor(runtime_args, self.validator, self.mutex_manager) + self.batch_processor = BatchProcessor(self.buffer_accessor, self.mutex_manager) + self.debug_utils = DebugUtils(runtime_args) + self.config_handler = ConfigHandler(runtime_args) + + # Validate initialization (maintains original behavior) + self._is_valid, self._error_msg = runtime_args.validate_pointers() + + @property + def is_valid(self) -> bool: + """Whether the instance is properly initialized.""" + return self._is_valid + + @property + def error_msg(self) -> str: + """Error message if initialization failed.""" + return self._error_msg + + # ============================================================================ + # Mutex Management Methods + # ============================================================================ + + def acquire_mutex(self) -> Tuple[bool, str]: + """Acquire the buffer mutex.""" + success = self.mutex_manager.acquire() + return success, "Success" if success else "Failed to acquire mutex" + + def release_mutex(self) -> Tuple[bool, str]: + """Release the buffer mutex.""" + success = self.mutex_manager.release() + return success, "Success" if success else "Failed to release mutex" + + # ============================================================================ + # Boolean Buffer Operations + # ============================================================================ + + def read_bool_input(self, buffer_idx: int, bit_idx: int, thread_safe: bool = True) -> Tuple[bool, str]: + """Read a boolean input value.""" + return self.buffer_accessor.read_buffer('bool_input', buffer_idx, bit_idx, thread_safe) + + def read_bool_output(self, buffer_idx: int, bit_idx: int, thread_safe: bool = True) -> Tuple[bool, str]: + """Read a boolean output value.""" + return self.buffer_accessor.read_buffer('bool_output', buffer_idx, bit_idx, thread_safe) + + def write_bool_output(self, buffer_idx: int, bit_idx: int, value: bool, thread_safe: bool = True) -> Tuple[bool, str]: + """Write a boolean output value.""" + return self.buffer_accessor.write_buffer('bool_output', buffer_idx, value, bit_idx, thread_safe) + + # ============================================================================ + # Byte Buffer Operations + # ============================================================================ + + def read_byte_input(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read a byte input value.""" + return self.buffer_accessor.read_buffer('byte_input', buffer_idx, None, thread_safe) + + def read_byte_output(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read a byte output value.""" + return self.buffer_accessor.read_buffer('byte_output', buffer_idx, None, thread_safe) + + def write_byte_output(self, buffer_idx: int, value: int, thread_safe: bool = True) -> Tuple[bool, str]: + """Write a byte output value.""" + return self.buffer_accessor.write_buffer('byte_output', buffer_idx, value, None, thread_safe) + + # ============================================================================ + # Integer Buffer Operations (16-bit) + # ============================================================================ + + def read_int_input(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read an integer input value.""" + return self.buffer_accessor.read_buffer('int_input', buffer_idx, None, thread_safe) + + def read_int_output(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read an integer output value.""" + return self.buffer_accessor.read_buffer('int_output', buffer_idx, None, thread_safe) + + def write_int_output(self, buffer_idx: int, value: int, thread_safe: bool = True) -> Tuple[bool, str]: + """Write an integer output value.""" + return self.buffer_accessor.write_buffer('int_output', buffer_idx, value, None, thread_safe) + + def read_int_memory(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read an integer memory value.""" + return self.buffer_accessor.read_buffer('int_memory', buffer_idx, None, thread_safe) + + def write_int_memory(self, buffer_idx: int, value: int, thread_safe: bool = True) -> Tuple[bool, str]: + """Write an integer memory value.""" + return self.buffer_accessor.write_buffer('int_memory', buffer_idx, value, None, thread_safe) + + # ============================================================================ + # Double Integer Buffer Operations (32-bit) + # ============================================================================ + + def read_dint_input(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read a double integer input value.""" + return self.buffer_accessor.read_buffer('dint_input', buffer_idx, None, thread_safe) + + def read_dint_output(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read a double integer output value.""" + return self.buffer_accessor.read_buffer('dint_output', buffer_idx, None, thread_safe) + + def write_dint_output(self, buffer_idx: int, value: int, thread_safe: bool = True) -> Tuple[bool, str]: + """Write a double integer output value.""" + return self.buffer_accessor.write_buffer('dint_output', buffer_idx, value, None, thread_safe) + + def read_dint_memory(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read a double integer memory value.""" + return self.buffer_accessor.read_buffer('dint_memory', buffer_idx, None, thread_safe) + + def write_dint_memory(self, buffer_idx: int, value: int, thread_safe: bool = True) -> Tuple[bool, str]: + """Write a double integer memory value.""" + return self.buffer_accessor.write_buffer('dint_memory', buffer_idx, value, None, thread_safe) + + # ============================================================================ + # Long Integer Buffer Operations (64-bit) + # ============================================================================ + + def read_lint_input(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read a long integer input value.""" + return self.buffer_accessor.read_buffer('lint_input', buffer_idx, None, thread_safe) + + def read_lint_output(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read a long integer output value.""" + return self.buffer_accessor.read_buffer('lint_output', buffer_idx, None, thread_safe) + + def write_lint_output(self, buffer_idx: int, value: int, thread_safe: bool = True) -> Tuple[bool, str]: + """Write a long integer output value.""" + return self.buffer_accessor.write_buffer('lint_output', buffer_idx, value, None, thread_safe) + + def read_lint_memory(self, buffer_idx: int, thread_safe: bool = True) -> Tuple[int, str]: + """Read a long integer memory value.""" + return self.buffer_accessor.read_buffer('lint_memory', buffer_idx, None, thread_safe) + + def write_lint_memory(self, buffer_idx: int, value: int, thread_safe: bool = True) -> Tuple[bool, str]: + """Write a long integer memory value.""" + return self.buffer_accessor.write_buffer('lint_memory', buffer_idx, value, None, thread_safe) + + # ============================================================================ + # Batch Operations + # ============================================================================ + + def batch_read_values(self, operations: List[Tuple]) -> Tuple[List[Tuple], str]: + """Process multiple read operations in batch.""" + return self.batch_processor.process_batch_reads(operations) + + def batch_write_values(self, operations: List[Tuple]) -> Tuple[List[Tuple], str]: + """Process multiple write operations in batch.""" + return self.batch_processor.process_batch_writes(operations) + + def batch_mixed_operations(self, read_operations: List[Tuple], write_operations: List[Tuple]) -> Tuple[Dict, str]: + """Process mixed read and write operations in batch.""" + return self.batch_processor.process_mixed_operations(read_operations, write_operations) + + # ============================================================================ + # Debug/Variable Operations + # ============================================================================ + + def get_var_list(self, indexes: List[int]) -> Tuple[List[int], str]: + """Get variable addresses for indexes.""" + return self.debug_utils.get_var_list(indexes) + + def get_var_size(self, index: int) -> Tuple[int, str]: + """Get variable size.""" + return self.debug_utils.get_var_size(index) + + def get_var_value(self, index: int) -> Tuple[Any, str]: + """Read variable value by index.""" + return self.debug_utils.get_var_value(index) + + def set_var_value(self, index: int, value: Any) -> Tuple[bool, str]: + """Write variable value by index.""" + return self.debug_utils.set_var_value(index, value) + + def get_var_count(self) -> Tuple[int, str]: + """Get total variable count.""" + return self.debug_utils.get_var_count() + + def get_var_info(self, index: int) -> Tuple[Dict, str]: + """Get variable information.""" + return self.debug_utils.get_var_info(index) + + # ============================================================================ + # Configuration Operations + # ============================================================================ + + def get_config_path(self) -> Tuple[str, str]: + """Get configuration file path.""" + return self.config_handler.get_config_path() + + def get_config_file_args_as_map(self) -> Tuple[Dict, str]: + """Parse configuration file as map.""" + return self.config_handler.get_config_as_map()