diff --git a/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py b/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py index e9c81a5d..6a87210f 100644 --- a/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py +++ b/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py @@ -1,18 +1,47 @@ """Modbus Master plugin connection management utilities.""" import time -from typing import Optional +from typing import Literal, Optional, Union -from pymodbus.client import ModbusTcpClient +from pymodbus.client import ModbusTcpClient, ModbusSerialClient + +TransportType = Literal["tcp", "rtu"] +ParityType = Literal["N", "E", "O"] class ModbusConnectionManager: # pylint: disable=too-many-instance-attributes - """Manages Modbus TCP connections with retry logic.""" + """Manages Modbus TCP and RTU connections with retry logic.""" + + def __init__( + self, + transport: TransportType = "tcp", + # TCP parameters + host: str = "127.0.0.1", + port: int = 502, + # RTU parameters + serial_port: str = "", + baud_rate: int = 9600, + parity: ParityType = "N", + stop_bits: int = 1, + data_bits: int = 8, + # Common parameters + timeout_ms: int = 1000, + slave_id: int = 1, + ): + self.transport = transport + self.timeout = timeout_ms / 1000.0 # Convert to seconds + self.slave_id = slave_id # Unit/Slave ID for Modbus operations - def __init__(self, host: str, port: int, timeout_ms: int): + # TCP configuration self.host = host self.port = port - self.timeout = timeout_ms / 1000.0 # Convert to seconds + + # RTU configuration + self.serial_port = serial_port + self.baud_rate = baud_rate + self.parity = parity + self.stop_bits = stop_bits + self.data_bits = data_bits # Retry configuration self.retry_delay_base = 2.0 # initial delay between attempts (seconds) @@ -21,9 +50,34 @@ def __init__(self, host: str, port: int, timeout_ms: int): # Connection state - is_connected is the authoritative flag for connection health # It is set to False when any error occurs, forcing reconnection on next cycle - self.client: Optional[ModbusTcpClient] = None + self.client: Optional[Union[ModbusTcpClient, ModbusSerialClient]] = None self.is_connected = False + def _create_client(self) -> Union[ModbusTcpClient, ModbusSerialClient]: + """Create the appropriate Modbus client based on transport type.""" + if self.transport == "tcp": + return ModbusTcpClient( + host=self.host, + port=self.port, + timeout=self.timeout + ) + else: # RTU + return ModbusSerialClient( + port=self.serial_port, + baudrate=self.baud_rate, + parity=self.parity, + stopbits=self.stop_bits, + bytesize=self.data_bits, + timeout=self.timeout + ) + + def get_connection_info(self) -> str: + """Return human-readable connection information.""" + if self.transport == "tcp": + return f"TCP {self.host}:{self.port}" + else: + return f"RTU {self.serial_port}@{self.baud_rate}" + def connect_with_retry(self, stop_event=None) -> bool: """ Attempts to connect to Modbus device with infinite retry. @@ -35,6 +89,7 @@ def connect_with_retry(self, stop_event=None) -> bool: True if connected successfully, False if interrupted """ retry_count = 0 + conn_info = self.get_connection_info() while stop_event is None or not stop_event.is_set(): try: @@ -45,30 +100,28 @@ def connect_with_retry(self, stop_event=None) -> bool: self.client.close() except Exception: pass - self.client = ModbusTcpClient( - host=self.host, port=self.port, timeout=self.timeout - ) + self.client = self._create_client() # Attempt to connect if self.client.connect(): print( - f"(PASS) Connected to {self.host}:{self.port} (attempt {retry_count + 1})" + f"(PASS) Connected to {conn_info} (attempt {retry_count + 1})" ) self.is_connected = True self.retry_delay_current = self.retry_delay_base # Reset delay return True except Exception as e: - print(f"(FAIL) Connection attempt {retry_count + 1} failed: {e}") + print(f"(FAIL) Connection attempt {retry_count + 1} to {conn_info} failed: {e}") # Increment counter and calculate delay retry_count += 1 # Attempt logging if retry_count == 1: - print(f"Failed to connect to {self.host}:{self.port}, starting retry attempts...") + print(f"Failed to connect to {conn_info}, starting retry attempts...") elif retry_count % 10 == 0: # Log every 10 attempts - print(f"Connection attempt {retry_count} failed, continuing retries...") + print(f"Connection attempt {retry_count} to {conn_info} failed, continuing retries...") # Wait with increasing delay (limited exponential backoff) delay = min(self.retry_delay_current, self.retry_delay_max) @@ -127,21 +180,23 @@ def mark_disconnected(self): ModbusIOException, etc.) to ensure the connection is properly re-established. """ self.is_connected = False + conn_info = self.get_connection_info() print( - f"Connection to {self.host}:{self.port} marked as disconnected, " + f"Connection to {conn_info} marked as disconnected, " "will reconnect on next cycle" ) def disconnect(self): """Close the connection and clean up resources.""" + conn_info = self.get_connection_info() try: if self.client: self.client.close() self.client = None self.is_connected = False - print(f"Disconnected from {self.host}:{self.port}") + print(f"Disconnected from {conn_info}") except Exception as e: - print(f"(FAIL) Error disconnecting from {self.host}:{self.port}: {e}") + print(f"(FAIL) Error disconnecting from {conn_info}: {e}") def is_healthy(self) -> bool: """ diff --git a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py index b494b70e..9b4300b9 100644 --- a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py +++ b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py @@ -67,16 +67,26 @@ class ModbusSlaveDevice(threading.Thread): + """ + Handles a single Modbus TCP device with its own connection. + For RTU devices, use ModbusRtuBusHandler instead. + """ def __init__(self, device_config: Any, sba: SafeBufferAccess, plugin_logger: PluginLogger): super().__init__(daemon=True) self.device_config = device_config self.sba = sba self.logger = plugin_logger self._stop_event = threading.Event() + + # Create connection manager for TCP device self.connection_manager = ModbusConnectionManager( - device_config.host, device_config.port, device_config.timeout_ms + transport="tcp", + host=device_config.host, + port=device_config.port, + timeout_ms=device_config.timeout_ms, + slave_id=device_config.slave_id ) - self.name = f"ModbusSlave-{device_config.name}-{device_config.host}:{device_config.port}" + self.name = f"ModbusSlave-{device_config.name}-TCP-{device_config.host}:{device_config.port}" # Calculate GCD of all I/O point cycle times for this device self.gcd_cycle_time_ms = calculate_gcd_of_cycle_times(device_config.io_points) @@ -149,22 +159,23 @@ def run(self): # pylint: disable=too-many-locals count = point.length # 1:1 mapping for boolean operations # Perform Modbus read based on function code - # Note: pymodbus 3.x requires count as keyword argument + # Note: pymodbus 3.10+ uses device_id parameter (formerly slave) + # device_id is used for Modbus TCP gateways that forward to RS485 devices if point.fc == 1: # Read Coils response = self.connection_manager.client.read_coils( - address, count=count + address, count=count, device_id=self.connection_manager.slave_id ) elif point.fc == 2: # Read Discrete Inputs response = self.connection_manager.client.read_discrete_inputs( - address, count=count + address, count=count, device_id=self.connection_manager.slave_id ) elif point.fc == 3: # Read Holding Registers response = self.connection_manager.client.read_holding_registers( - address, count=count + address, count=count, device_id=self.connection_manager.slave_id ) elif point.fc == 4: # Read Input Registers response = self.connection_manager.client.read_input_registers( - address, count=count + address, count=count, device_id=self.connection_manager.slave_id ) else: self.logger.warn(f"[{self.name}] Unsupported read FC: {point.fc}") @@ -327,10 +338,12 @@ def run(self): # pylint: disable=too-many-locals continue # Perform Modbus write operation + # Note: pymodbus 3.10+ uses device_id parameter (formerly slave) + # device_id is used for Modbus TCP gateways that forward to RS485 devices if point.fc == 5: # Write Single Coil if len(values_to_write) > 0: response = self.connection_manager.client.write_coil( - address, values_to_write[0] + address, values_to_write[0], device_id=self.connection_manager.slave_id ) else: self.logger.error( @@ -341,7 +354,7 @@ def run(self): # pylint: disable=too-many-locals elif point.fc == 6: # Write Single Register if len(values_to_write) > 0: response = self.connection_manager.client.write_register( - address, values_to_write[0] + address, values_to_write[0], device_id=self.connection_manager.slave_id ) else: self.logger.error( @@ -351,11 +364,11 @@ def run(self): # pylint: disable=too-many-locals continue elif point.fc == 15: # Write Multiple Coils response = self.connection_manager.client.write_coils( - address, values_to_write + address, values_to_write, device_id=self.connection_manager.slave_id ) elif point.fc == 16: # Write Multiple Registers response = self.connection_manager.client.write_registers( - address, values_to_write + address, values_to_write, device_id=self.connection_manager.slave_id ) else: self.logger.warn(f"[{self.name}] Unsupported write FC: {point.fc}") @@ -420,6 +433,434 @@ def stop(self): self._stop_event.set() +class ModbusRtuBusHandler(threading.Thread): + """ + Handles multiple Modbus RTU devices on a single serial port (multi-drop). + One thread per serial bus, managing multiple slave IDs. + + This differs from ModbusSlaveDevice in that: + - A single serial connection is shared by multiple devices + - Each device has a unique slave_id + - IO operations include the slave_id to route to the correct device + """ + + def __init__( + self, + serial_config: dict, # {serial_port, baud_rate, parity, stop_bits, data_bits, timeout_ms} + devices: List[Any], # List of ModbusDeviceConfig on this bus + sba: SafeBufferAccess, + plugin_logger: PluginLogger, + ): + super().__init__(daemon=True) + self.serial_config = serial_config + self.devices = devices + self.sba = sba + self.logger = plugin_logger + self._stop_event = threading.Event() + + # Create single connection for the entire bus + self.connection_manager = ModbusConnectionManager( + transport="rtu", + serial_port=serial_config["serial_port"], + baud_rate=serial_config["baud_rate"], + parity=serial_config["parity"], + stop_bits=serial_config["stop_bits"], + data_bits=serial_config["data_bits"], + timeout_ms=serial_config["timeout_ms"], + slave_id=1, # Default, will use device-specific slave_id per operation + ) + + # Build consolidated IO point list with slave_id + # Each entry: {"point": io_point, "slave_id": device.slave_id, "device_name": device.name} + self.all_io_points = [] + for device in devices: + for point in device.io_points: + self.all_io_points.append({ + "point": point, + "slave_id": device.slave_id, + "device_name": device.name, + }) + + # Calculate GCD of all IO point cycle times across all devices on this bus + all_cycle_times = [p.cycle_time_ms for d in devices for p in d.io_points] + self.gcd_cycle_time_ms = calculate_gcd_of_cycle_times( + [type('obj', (object,), {'cycle_time_ms': ct})() for ct in all_cycle_times] + ) if all_cycle_times else 1000 + + device_names = ", ".join([d.name for d in devices]) + self.name = f"ModbusRtuBus-{serial_config['serial_port']}" + self.logger.info( + f"[{self.name}] RTU bus handler created for devices: {device_names} " + f"({len(self.all_io_points)} total IO points, GCD cycle: {self.gcd_cycle_time_ms}ms)" + ) + + def _ensure_connection(self) -> bool: + """Ensures there is a valid connection, reconnecting if necessary.""" + return self.connection_manager.ensure_connection(self._stop_event) + + def run(self): # pylint: disable=too-many-locals,too-many-branches,too-many-statements + self.logger.info(f"[{self.name}] Thread started for {len(self.devices)} device(s).") + + gcd_cycle_time_seconds = self.gcd_cycle_time_ms / 1000.0 + + if not self.all_io_points: + self.logger.warn(f"[{self.name}] No I/O points defined. Stopping thread.") + return + + # Connect with infinite retry + if not self.connection_manager.connect_with_retry(self._stop_event): + self.logger.info(f"[{self.name}] Thread stopped before connection could be established.") + return + + # Initialize cycle counter + cycle_counter = 0 + + try: + while not self._stop_event.is_set(): + cycle_start_time = time.monotonic() + + # Ensure connection exists before cycle + if not self._ensure_connection(): + break # Thread was interrupted + + # 1. READ OPERATIONS - Process only I/O points that are due for polling this cycle + read_results_to_update = [] # Store tuples: (iec_addr, modbus_data, length) + + for io_entry in self.all_io_points: + if self._stop_event.is_set(): + break + + point = io_entry["point"] + slave_id = io_entry["slave_id"] + + # Skip if not a read operation + if point.fc not in [1, 2, 3, 4]: + continue + + # Check if point should be polled this cycle + point_cycle_multiple = point.cycle_time_ms // self.gcd_cycle_time_ms + if (cycle_counter % point_cycle_multiple) != 0: + continue + + try: + # Parse Modbus offset + address = parse_modbus_offset(point.offset) + + # Calculate the correct number of Modbus registers/coils needed + if point.fc in [3, 4]: # Register-based operations + iec_size = point.iec_location.size + registers_per_iec_element = get_modbus_registers_count_for_iec_size( + iec_size + ) + count = point.length * registers_per_iec_element + else: # Coil/Discrete Input operations + count = point.length + + # Perform Modbus read with device-specific slave_id + if point.fc == 1: # Read Coils + response = self.connection_manager.client.read_coils( + address, count=count, device_id=slave_id + ) + elif point.fc == 2: # Read Discrete Inputs + response = self.connection_manager.client.read_discrete_inputs( + address, count=count, device_id=slave_id + ) + elif point.fc == 3: # Read Holding Registers + response = self.connection_manager.client.read_holding_registers( + address, count=count, device_id=slave_id + ) + elif point.fc == 4: # Read Input Registers + response = self.connection_manager.client.read_input_registers( + address, count=count, device_id=slave_id + ) + else: + self.logger.warn(f"[{self.name}] Unsupported read FC: {point.fc}") + continue + + # Check if response is valid + if isinstance(response, (ModbusIOException, ExceptionResponse)): + self.logger.error( + f"[{self.name}] Modbus read error " + f"(slave {slave_id}, FC {point.fc}, addr {address}): {response}" + ) + self.connection_manager.mark_disconnected() + continue + if response.isError(): + self.logger.error( + f"[{self.name}] Modbus read failed " + f"(slave {slave_id}, FC {point.fc}, addr {address}): {response}" + ) + self.connection_manager.mark_disconnected() + continue + + # Extract data from response + if point.fc in [1, 2]: + modbus_data = response.bits + else: + modbus_data = response.registers + + # Store for batch update + read_results_to_update.append( + (point.iec_location, modbus_data, point.length) + ) + + except ValueError as ve: + self.logger.error( + f"[{self.name}] Invalid offset " + f"'{point.offset}' for slave {slave_id}, FC {point.fc}: {ve}" + ) + except ConnectionException as ce: + self.logger.error( + f"[{self.name}] Connection error reading " + f"slave {slave_id}, FC {point.fc}, offset {point.offset}: {ce}" + ) + self.connection_manager.mark_disconnected() + except Exception as e: + self.logger.error( + f"[{self.name}] Error reading " + f"slave {slave_id}, FC {point.fc}, offset {point.offset}: {e}" + ) + self.connection_manager.mark_disconnected() + + # Batch update IEC buffers with single mutex acquisition + if read_results_to_update: + preconverted_updates = [] + for iec_addr, modbus_data, length in read_results_to_update: + converted_values, details = convert_modbus_data_to_iec_values( + iec_addr, modbus_data, length + ) + if converted_values is not None and details is not None: + preconverted_updates.append((converted_values, details)) + else: + self.logger.error( + f"[{self.name}] Data conversion failed " + f"for IEC address {iec_addr}, length={length}" + ) + + if preconverted_updates: + lock_acquired, lock_msg = self.sba.acquire_mutex() + if lock_acquired: + try: + for converted_values, details in preconverted_updates: + write_preconverted_iec_values( + self.sba, converted_values, details + ) + finally: + self.sba.release_mutex() + else: + self.logger.error( + f"[{self.name}] Failed to acquire mutex " + f"for read updates: {lock_msg}" + ) + + # 2. WRITE OPERATIONS + write_points_due = [] + for io_entry in self.all_io_points: + if self._stop_event.is_set(): + break + + point = io_entry["point"] + slave_id = io_entry["slave_id"] + + if point.fc not in [5, 6, 15, 16]: # Write functions + continue + + point_cycle_multiple = point.cycle_time_ms // self.gcd_cycle_time_ms + if (cycle_counter % point_cycle_multiple) != 0: + continue + + try: + address = parse_modbus_offset(point.offset) + write_points_due.append((point, address, slave_id)) + except ValueError as ve: + self.logger.error( + f"[{self.name}] Invalid offset " + f"'{point.offset}' for slave {slave_id}, FC {point.fc}: {ve}" + ) + + # Phase 2: Read all raw IEC values under a single mutex acquisition + raw_iec_data = [] + if write_points_due: + lock_acquired, lock_msg = self.sba.acquire_mutex() + if lock_acquired: + try: + for point, address, slave_id in write_points_due: + raw_values, details, iec_size = read_raw_iec_values( + self.sba, point.iec_location, point.length + ) + if raw_values is not None: + raw_iec_data.append( + (point, address, slave_id, raw_values, details, iec_size) + ) + else: + self.logger.error( + f"[{self.name}] Failed to read raw IEC data " + f"for write (slave {slave_id}, FC {point.fc}, offset {point.offset})" + ) + finally: + self.sba.release_mutex() + else: + self.logger.error( + f"[{self.name}] Failed to acquire mutex " + f"for batch write prep: {lock_msg}" + ) + + # Phase 3: Convert and perform Modbus writes + for point, address, slave_id, raw_values, details, iec_size in raw_iec_data: + if self._stop_event.is_set(): + break + + try: + values_to_write = convert_raw_iec_to_modbus(raw_values, details, iec_size) + + if values_to_write is None: + self.logger.error( + f"[{self.name}] Failed to convert data " + f"for Modbus write (slave {slave_id}, FC {point.fc}, " + f"offset {point.offset})" + ) + continue + + # Perform Modbus write with device-specific slave_id + if point.fc == 5: # Write Single Coil + if len(values_to_write) > 0: + response = self.connection_manager.client.write_coil( + address, values_to_write[0], device_id=slave_id + ) + else: + self.logger.error( + f"[{self.name}] No data to write " + f"for slave {slave_id}, FC 5, offset {address}" + ) + continue + elif point.fc == 6: # Write Single Register + if len(values_to_write) > 0: + response = self.connection_manager.client.write_register( + address, values_to_write[0], device_id=slave_id + ) + else: + self.logger.error( + f"[{self.name}] No data to write " + f"for slave {slave_id}, FC 6, offset {address}" + ) + continue + elif point.fc == 15: # Write Multiple Coils + response = self.connection_manager.client.write_coils( + address, values_to_write, device_id=slave_id + ) + elif point.fc == 16: # Write Multiple Registers + response = self.connection_manager.client.write_registers( + address, values_to_write, device_id=slave_id + ) + else: + self.logger.warn(f"[{self.name}] Unsupported write FC: {point.fc}") + continue + + # Check write response + if isinstance(response, (ModbusIOException, ExceptionResponse)): + self.logger.error( + f"[{self.name}] Modbus write error " + f"(slave {slave_id}, FC {point.fc}, addr {address}): {response}" + ) + self.connection_manager.mark_disconnected() + elif response.isError(): + self.logger.error( + f"[{self.name}] Modbus write failed " + f"(slave {slave_id}, FC {point.fc}, addr {address}): {response}" + ) + self.connection_manager.mark_disconnected() + + except ConnectionException as ce: + self.logger.error( + f"[{self.name}] Connection error writing " + f"slave {slave_id}, FC {point.fc}, offset {point.offset}: {ce}" + ) + self.connection_manager.mark_disconnected() + except Exception as e: + self.logger.error( + f"[{self.name}] Error writing " + f"slave {slave_id}, FC {point.fc}, offset {point.offset}: {e}" + ) + self.connection_manager.mark_disconnected() + + # 3. CYCLE TIMING + cycle_elapsed = time.monotonic() - cycle_start_time + sleep_duration = max(0, gcd_cycle_time_seconds - cycle_elapsed) + if sleep_duration > 0: + sleep_increment = 0.1 + remaining_sleep = sleep_duration + + while remaining_sleep > 0 and not self._stop_event.is_set(): + actual_sleep = min(sleep_increment, remaining_sleep) + time.sleep(actual_sleep) + remaining_sleep -= actual_sleep + + cycle_counter += 1 + + except ConnectionException as ce: + self.logger.error(f"[{self.name}] Connection failed: {ce}") + self.connection_manager.mark_disconnected() + except Exception as e: + self.logger.error(f"[{self.name}] Unexpected error in thread: {e}") + traceback.print_exc() + finally: + self.connection_manager.disconnect() + self.logger.info(f"[{self.name}] Thread finished and connection closed.") + + def stop(self): + self.logger.info(f"[{self.name}] Stop signal received.") + self._stop_event.set() + + +def group_rtu_devices_by_bus(devices: List[Any]) -> dict: + """ + Group RTU devices by serial port configuration (forming unique "buses"). + + Devices on the same serial port with the same settings share a connection. + Each bus has one thread handling all devices (different slave IDs). + + Args: + devices: List of ModbusDeviceConfig with transport="rtu" + + Returns: + Dictionary keyed by bus identifier: + { + "serial_port:baud_rate:parity:stop_bits:data_bits": { + "config": {serial_port, baud_rate, parity, stop_bits, data_bits, timeout_ms}, + "devices": [list of devices on this bus] + } + } + """ + buses = {} + + for device in devices: + bus_key = ( + f"{device.serial_port}:{device.baud_rate}:" + f"{device.parity}:{device.stop_bits}:{device.data_bits}" + ) + + if bus_key not in buses: + buses[bus_key] = { + "config": { + "serial_port": device.serial_port, + "baud_rate": device.baud_rate, + "parity": device.parity, + "stop_bits": device.stop_bits, + "data_bits": device.data_bits, + "timeout_ms": device.timeout_ms, + }, + "devices": [], + } + + buses[bus_key]["devices"].append(device) + + # Use minimum timeout across all devices on the bus + if device.timeout_ms < buses[bus_key]["config"]["timeout_ms"]: + buses[bus_key]["config"]["timeout_ms"] = device.timeout_ms + + return buses + + def init(args_capsule): """ Initialize the Modbus Master plugin. @@ -477,6 +918,9 @@ def start_loop(): """ Start the main loop for all configured Modbus devices. This function is called after successful initialization. + + TCP devices: One thread per device (ModbusSlaveDevice) + RTU devices: One thread per serial bus (ModbusRtuBusHandler), supporting multi-drop """ # pylint: disable=global-variable-not-assigned global slave_threads, modbus_master_config, safe_buffer_accessor, logger @@ -489,21 +933,53 @@ def start_loop(): logger.error("Plugin not properly initialized") return False - # Start a thread for each configured device - for device_config in modbus_master_config.devices: + # Separate TCP and RTU devices + tcp_devices = [d for d in modbus_master_config.devices if d.transport == "tcp"] + rtu_devices = [d for d in modbus_master_config.devices if d.transport == "rtu"] + + logger.info(f"Found {len(tcp_devices)} TCP device(s) and {len(rtu_devices)} RTU device(s)") + + # Start TCP devices (one thread per device - existing behavior) + for device_config in tcp_devices: try: device_thread = ModbusSlaveDevice(device_config, safe_buffer_accessor, logger) device_thread.start() slave_threads.append(device_thread) logger.info( - f"Started thread for device: {device_config.name} " - f"({device_config.host}:{device_config.port})" + f"Started TCP thread for device: {device_config.name} " + f"({device_config.host}:{device_config.port}, slave_id={device_config.slave_id})" ) except Exception as e: - logger.error(f"Failed to start thread for device {device_config.name}: {e}") + logger.error(f"Failed to start TCP thread for device {device_config.name}: {e}") + + # Group RTU devices by serial port configuration + if rtu_devices: + rtu_buses = group_rtu_devices_by_bus(rtu_devices) + logger.info(f"RTU devices grouped into {len(rtu_buses)} serial bus(es)") + + # Start RTU bus handlers (one thread per serial port) + for bus_key, bus_info in rtu_buses.items(): + try: + bus_thread = ModbusRtuBusHandler( + serial_config=bus_info["config"], + devices=bus_info["devices"], + sba=safe_buffer_accessor, + plugin_logger=logger, + ) + bus_thread.start() + slave_threads.append(bus_thread) + + device_names = ", ".join([d.name for d in bus_info["devices"]]) + slave_ids = ", ".join([str(d.slave_id) for d in bus_info["devices"]]) + logger.info( + f"Started RTU bus thread: {bus_info['config']['serial_port']} " + f"(devices: {device_names}, slave_ids: {slave_ids})" + ) + except Exception as e: + logger.error(f"Failed to start RTU bus thread for {bus_key}: {e}") if slave_threads: - logger.info(f"Successfully started {len(slave_threads)} device thread(s)") + logger.info(f"Successfully started {len(slave_threads)} device/bus thread(s)") return True else: logger.error("No device threads started") diff --git a/core/src/drivers/plugins/python/modbus_master/requirements.txt b/core/src/drivers/plugins/python/modbus_master/requirements.txt index 7b30613c..2c133a10 100644 --- a/core/src/drivers/plugins/python/modbus_master/requirements.txt +++ b/core/src/drivers/plugins/python/modbus_master/requirements.txt @@ -1,3 +1,4 @@ pymodbus==3.11.2 +pyserial>=3.5 asyncio-mqtt==0.16.2 pytest \ No newline at end of file diff --git a/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py b/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py index e848b1a6..ff0a8942 100644 --- a/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py +++ b/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py @@ -12,6 +12,8 @@ Area = Literal["I", "Q", "M"] Size = Literal["X", "B", "W", "D", "L"] +TransportType = Literal["tcp", "rtu"] +ParityType = Literal["N", "E", "O"] ADDR_RE = re.compile(r"^%([IQM])([XBWDL])(\d+)(?:\.(\d+))?$", re.IGNORECASE) @@ -65,14 +67,30 @@ def parse_iec_address(s: str) -> IECAddress: class ModbusDeviceConfig: """ Model for a single Modbus device configuration. + Supports both TCP and RTU transport types. """ def __init__(self): self.name: str = "UNDEFINED" self.protocol: str = "MODBUS" self.type: str = "SLAVE" + + # Transport type - "tcp" or "rtu" (defaults to "tcp" for backward compatibility) + self.transport: TransportType = "tcp" + + # TCP-specific fields self.host: str = "127.0.0.1" self.port: int = 502 + + # RTU-specific fields + self.serial_port: str = "" + self.baud_rate: int = 9600 + self.parity: ParityType = "N" + self.stop_bits: int = 1 + self.data_bits: int = 8 + + # Common fields self.timeout_ms: int = 1000 + self.slave_id: int = 1 # Unit/Slave ID (0-255 for TCP gateways, 1-247 for RTU) self.io_points: List['ModbusIoPointConfig'] = [] @classmethod @@ -86,9 +104,24 @@ def from_dict(cls, data: Dict[str, Any]) -> 'ModbusDeviceConfig': config = data.get("config", {}) device.type = config.get("type", "SLAVE") + + # Transport type - defaults to "tcp" for backward compatibility + device.transport = config.get("transport", "tcp") + + # TCP fields device.host = config.get("host", "127.0.0.1") device.port = config.get("port", 502) + + # RTU fields + device.serial_port = config.get("serial_port", "") + device.baud_rate = config.get("baud_rate", 9600) + device.parity = config.get("parity", "N") + device.stop_bits = config.get("stop_bits", 1) + device.data_bits = config.get("data_bits", 8) + + # Common fields device.timeout_ms = config.get("timeout_ms", 1000) + device.slave_id = config.get("slave_id", 1) # Parse I/O points io_points_data = config.get("io_points", []) @@ -103,14 +136,35 @@ def from_dict(cls, data: Dict[str, Any]) -> 'ModbusDeviceConfig': def validate(self) -> None: """Validates the device configuration.""" if self.name == "UNDEFINED": - raise ValueError(f"Device name is undefined for device {self.host}:{self.port}.") + raise ValueError(f"Device name is undefined.") if self.protocol != "MODBUS": raise ValueError(f"Invalid protocol: {self.protocol}. Expected 'MODBUS' for device {self.name}.") - if not isinstance(self.port, int) or self.port <= 0: - raise ValueError(f"Invalid port: {self.port}. Must be a positive integer for device {self.name}.") if not isinstance(self.timeout_ms, int) or self.timeout_ms <= 0: raise ValueError(f"Invalid timeout_ms: {self.timeout_ms}. Must be a positive integer for device {self.name}.") + # Transport-specific validation + if self.transport == "rtu": + if not self.serial_port: + raise ValueError(f"Serial port is required for RTU device '{self.name}'") + if not isinstance(self.slave_id, int) or not (1 <= self.slave_id <= 247): + raise ValueError(f"Slave ID must be 1-247 for RTU device '{self.name}', got {self.slave_id}") + if self.parity not in ("N", "E", "O"): + raise ValueError(f"Invalid parity '{self.parity}' for RTU device '{self.name}'. Must be 'N', 'E', or 'O'.") + if self.stop_bits not in (1, 2): + raise ValueError(f"Stop bits must be 1 or 2 for RTU device '{self.name}', got {self.stop_bits}") + if self.data_bits not in (7, 8): + raise ValueError(f"Data bits must be 7 or 8 for RTU device '{self.name}', got {self.data_bits}") + elif self.transport == "tcp": + if not self.host: + raise ValueError(f"Host is required for TCP device '{self.name}'") + if not isinstance(self.port, int) or not (1 <= self.port <= 65535): + raise ValueError(f"Port must be 1-65535 for TCP device '{self.name}', got {self.port}") + if not isinstance(self.slave_id, int) or not (0 <= self.slave_id <= 255): + raise ValueError(f"Slave ID must be 0-255 for TCP device '{self.name}', got {self.slave_id}") + else: + raise ValueError(f"Invalid transport type '{self.transport}' for device '{self.name}'. Must be 'tcp' or 'rtu'.") + + # Validate IO points for i, point in enumerate(self.io_points): if not isinstance(point, ModbusIoPointConfig): raise ValueError(f"Invalid I/O point {i}: {point}. Must be an instance of ModbusIoPointConfig for device {self.name}.") @@ -126,7 +180,10 @@ def validate(self) -> None: raise ValueError(f"Invalid cycle_time_ms: {point.cycle_time_ms}. Must be a positive integer for device {self.name}, point {i}.") def __repr__(self) -> str: - return f"ModbusDeviceConfig(name='{self.name}', host='{self.host}', port={self.port}, io_points={len(self.io_points)})" + if self.transport == "tcp": + return f"ModbusDeviceConfig(name='{self.name}', transport='tcp', host='{self.host}', port={self.port}, slave_id={self.slave_id}, io_points={len(self.io_points)})" + else: + return f"ModbusDeviceConfig(name='{self.name}', transport='rtu', serial_port='{self.serial_port}', baud_rate={self.baud_rate}, slave_id={self.slave_id}, io_points={len(self.io_points)})" class ModbusMasterConfig(PluginConfigContract): """ @@ -163,23 +220,41 @@ def validate(self) -> None: """Validates the configuration.""" if not self.devices: raise ValueError("No devices configured. At least one Modbus device must be defined.") - + # Validate each device for i, device in enumerate(self.devices): try: device.validate() except Exception as e: raise ValueError(f"Device #{i+1} validation failed: {e}") - + # Check for duplicate device names device_names = [device.name for device in self.devices] if len(device_names) != len(set(device_names)): raise ValueError("Duplicate device names found. Each device must have a unique name.") - - # Check for duplicate host:port combinations - host_port_combinations = [(device.host, device.port) for device in self.devices] + + # Separate TCP and RTU devices for different validation rules + tcp_devices = [d for d in self.devices if d.transport == "tcp"] + rtu_devices = [d for d in self.devices if d.transport == "rtu"] + + # Check for duplicate host:port combinations for TCP devices + host_port_combinations = [(device.host, device.port) for device in tcp_devices] if len(host_port_combinations) != len(set(host_port_combinations)): - raise ValueError("Duplicate host:port combinations found. Each device must have a unique host:port combination.") + raise ValueError("Duplicate host:port combinations found for TCP devices. Each TCP device must have a unique host:port combination.") + + # Check for duplicate slave IDs on the same serial bus for RTU devices + # Group RTU devices by serial bus (serial_port + baud_rate + parity + stop_bits + data_bits) + rtu_buses: Dict[str, List[int]] = {} + for device in rtu_devices: + bus_key = f"{device.serial_port}:{device.baud_rate}:{device.parity}:{device.stop_bits}:{device.data_bits}" + if bus_key not in rtu_buses: + rtu_buses[bus_key] = [] + rtu_buses[bus_key].append(device.slave_id) + + # Check for duplicate slave IDs within each bus + for bus_key, slave_ids in rtu_buses.items(): + if len(slave_ids) != len(set(slave_ids)): + raise ValueError(f"Duplicate slave IDs found on RTU bus '{bus_key}'. Each device on the same serial bus must have a unique slave ID.") def __repr__(self) -> str: return f"{self.__class__.__name__}(devices={len(self.devices)})" diff --git a/requirements.txt b/requirements.txt index 1dd17d45..aae4fd93 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ pytest pytest-flask pre-commit psutil; sys_platform != "cygwin" +pyserial>=3.5 diff --git a/webserver/app.py b/webserver/app.py index 59e54424..99a59846 100644 --- a/webserver/app.py +++ b/webserver/app.py @@ -130,6 +130,37 @@ def handle_ping(data: dict) -> dict: return {"status": response} +def handle_list_serial_ports(data: dict) -> dict: + """ + List available serial ports on the system. + + Returns: + { + "ports": [ + {"device": "/dev/ttyUSB0", "description": "USB-Serial Controller"}, + {"device": "/dev/ttyACM0", "description": "Arduino Uno"}, + ... + ] + } + """ + try: + import serial.tools.list_ports + + ports = serial.tools.list_ports.comports() + port_list = [ + { + "device": port.device, + "description": port.description or port.device, + } + for port in ports + ] + return {"ports": port_list} + except ImportError: + return {"error": "pyserial not installed", "ports": []} + except Exception as e: + return {"error": str(e), "ports": []} + + GET_HANDLERS: dict[str, Callable[[dict], dict]] = { "start-plc": handle_start_plc, "stop-plc": handle_stop_plc, @@ -137,6 +168,7 @@ def handle_ping(data: dict) -> dict: "compilation-status": handle_compilation_status, "status": handle_status, "ping": handle_ping, + "serial-ports": handle_list_serial_ports, }