From 189cdb8eb17319ebe8f3721cee8d7e12af9b31b2 Mon Sep 17 00:00:00 2001 From: Thiago Alves Date: Mon, 19 Jan 2026 12:05:41 -0500 Subject: [PATCH 1/4] feat: Add Slave ID support for Modbus TCP master plugin Add Slave ID (Unit ID) support to the Modbus master plugin to enable communication through Modbus TCP gateways that forward messages to RS485 serial networks. The Slave ID is passed to all pymodbus read/write operations via the 'slave' parameter. Changes: - Add slave_id field to ModbusDeviceConfig with validation (0-255) - Parse slave_id from JSON configuration (default 1) - Store slave_id in ModbusConnectionManager - Pass slave parameter to all Modbus operations (FC 1-6, 15, 16) Co-Authored-By: Claude Opus 4.5 --- .../modbus_master/modbus_master_connection.py | 3 ++- .../modbus_master/modbus_master_plugin.py | 20 ++++++++++--------- .../modbus_master_config_model.py | 6 +++++- 3 files changed, 18 insertions(+), 11 deletions(-) diff --git a/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py b/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py index e9c81a5d..f1ba0ccb 100644 --- a/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py +++ b/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py @@ -9,10 +9,11 @@ class ModbusConnectionManager: # pylint: disable=too-many-instance-attributes """Manages Modbus TCP connections with retry logic.""" - def __init__(self, host: str, port: int, timeout_ms: int): + def __init__(self, host: str, port: int, timeout_ms: int, slave_id: int = 1): self.host = host self.port = port self.timeout = timeout_ms / 1000.0 # Convert to seconds + self.slave_id = slave_id # Unit/Slave ID for Modbus TCP gateways # Retry configuration self.retry_delay_base = 2.0 # initial delay between attempts (seconds) diff --git a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py index b494b70e..32805830 100644 --- a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py +++ b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py @@ -74,7 +74,7 @@ def __init__(self, device_config: Any, sba: SafeBufferAccess, plugin_logger: Plu self.logger = plugin_logger self._stop_event = threading.Event() self.connection_manager = ModbusConnectionManager( - device_config.host, device_config.port, device_config.timeout_ms + device_config.host, device_config.port, device_config.timeout_ms, device_config.slave_id ) self.name = f"ModbusSlave-{device_config.name}-{device_config.host}:{device_config.port}" @@ -150,21 +150,22 @@ def run(self): # pylint: disable=too-many-locals # Perform Modbus read based on function code # Note: pymodbus 3.x requires count as keyword argument + # slave parameter is used for Modbus TCP gateways that forward to RS485 devices if point.fc == 1: # Read Coils response = self.connection_manager.client.read_coils( - address, count=count + address, count=count, slave=self.connection_manager.slave_id ) elif point.fc == 2: # Read Discrete Inputs response = self.connection_manager.client.read_discrete_inputs( - address, count=count + address, count=count, slave=self.connection_manager.slave_id ) elif point.fc == 3: # Read Holding Registers response = self.connection_manager.client.read_holding_registers( - address, count=count + address, count=count, slave=self.connection_manager.slave_id ) elif point.fc == 4: # Read Input Registers response = self.connection_manager.client.read_input_registers( - address, count=count + address, count=count, slave=self.connection_manager.slave_id ) else: self.logger.warn(f"[{self.name}] Unsupported read FC: {point.fc}") @@ -327,10 +328,11 @@ def run(self): # pylint: disable=too-many-locals continue # Perform Modbus write operation + # slave parameter is used for Modbus TCP gateways that forward to RS485 devices if point.fc == 5: # Write Single Coil if len(values_to_write) > 0: response = self.connection_manager.client.write_coil( - address, values_to_write[0] + address, values_to_write[0], slave=self.connection_manager.slave_id ) else: self.logger.error( @@ -341,7 +343,7 @@ def run(self): # pylint: disable=too-many-locals elif point.fc == 6: # Write Single Register if len(values_to_write) > 0: response = self.connection_manager.client.write_register( - address, values_to_write[0] + address, values_to_write[0], slave=self.connection_manager.slave_id ) else: self.logger.error( @@ -351,11 +353,11 @@ def run(self): # pylint: disable=too-many-locals continue elif point.fc == 15: # Write Multiple Coils response = self.connection_manager.client.write_coils( - address, values_to_write + address, values_to_write, slave=self.connection_manager.slave_id ) elif point.fc == 16: # Write Multiple Registers response = self.connection_manager.client.write_registers( - address, values_to_write + address, values_to_write, slave=self.connection_manager.slave_id ) else: self.logger.warn(f"[{self.name}] Unsupported write FC: {point.fc}") diff --git a/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py b/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py index e848b1a6..b8c63587 100644 --- a/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py +++ b/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py @@ -73,6 +73,7 @@ def __init__(self): self.host: str = "127.0.0.1" self.port: int = 502 self.timeout_ms: int = 1000 + self.slave_id: int = 1 # Unit/Slave ID for Modbus TCP gateways (0-255) self.io_points: List['ModbusIoPointConfig'] = [] @classmethod @@ -89,6 +90,7 @@ def from_dict(cls, data: Dict[str, Any]) -> 'ModbusDeviceConfig': device.host = config.get("host", "127.0.0.1") device.port = config.get("port", 502) device.timeout_ms = config.get("timeout_ms", 1000) + device.slave_id = config.get("slave_id", 1) # Parse I/O points io_points_data = config.get("io_points", []) @@ -110,6 +112,8 @@ def validate(self) -> None: raise ValueError(f"Invalid port: {self.port}. Must be a positive integer for device {self.name}.") if not isinstance(self.timeout_ms, int) or self.timeout_ms <= 0: raise ValueError(f"Invalid timeout_ms: {self.timeout_ms}. Must be a positive integer for device {self.name}.") + if not isinstance(self.slave_id, int) or not (0 <= self.slave_id <= 255): + raise ValueError(f"Invalid slave_id: {self.slave_id}. Must be an integer between 0 and 255 for device {self.name}.") for i, point in enumerate(self.io_points): if not isinstance(point, ModbusIoPointConfig): @@ -126,7 +130,7 @@ def validate(self) -> None: raise ValueError(f"Invalid cycle_time_ms: {point.cycle_time_ms}. Must be a positive integer for device {self.name}, point {i}.") def __repr__(self) -> str: - return f"ModbusDeviceConfig(name='{self.name}', host='{self.host}', port={self.port}, io_points={len(self.io_points)})" + return f"ModbusDeviceConfig(name='{self.name}', host='{self.host}', port={self.port}, slave_id={self.slave_id}, io_points={len(self.io_points)})" class ModbusMasterConfig(PluginConfigContract): """ From afede961dbe39dd84ed1b8456ca5625023efa076 Mon Sep 17 00:00:00 2001 From: Thiago Alves Date: Mon, 19 Jan 2026 12:28:29 -0500 Subject: [PATCH 2/4] fix: Use device_id parameter for pymodbus 3.10+ pymodbus 3.10+ renamed the 'slave' parameter to 'device_id' for all client read/write methods. Update all Modbus operations to use the correct parameter name. Co-Authored-By: Claude Opus 4.5 --- .../modbus_master/modbus_master_plugin.py | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py index 32805830..ab66a566 100644 --- a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py +++ b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py @@ -149,23 +149,23 @@ def run(self): # pylint: disable=too-many-locals count = point.length # 1:1 mapping for boolean operations # Perform Modbus read based on function code - # Note: pymodbus 3.x requires count as keyword argument - # slave parameter is used for Modbus TCP gateways that forward to RS485 devices + # Note: pymodbus 3.10+ uses device_id parameter (formerly slave) + # device_id is used for Modbus TCP gateways that forward to RS485 devices if point.fc == 1: # Read Coils response = self.connection_manager.client.read_coils( - address, count=count, slave=self.connection_manager.slave_id + address, count=count, device_id=self.connection_manager.slave_id ) elif point.fc == 2: # Read Discrete Inputs response = self.connection_manager.client.read_discrete_inputs( - address, count=count, slave=self.connection_manager.slave_id + address, count=count, device_id=self.connection_manager.slave_id ) elif point.fc == 3: # Read Holding Registers response = self.connection_manager.client.read_holding_registers( - address, count=count, slave=self.connection_manager.slave_id + address, count=count, device_id=self.connection_manager.slave_id ) elif point.fc == 4: # Read Input Registers response = self.connection_manager.client.read_input_registers( - address, count=count, slave=self.connection_manager.slave_id + address, count=count, device_id=self.connection_manager.slave_id ) else: self.logger.warn(f"[{self.name}] Unsupported read FC: {point.fc}") @@ -328,11 +328,12 @@ def run(self): # pylint: disable=too-many-locals continue # Perform Modbus write operation - # slave parameter is used for Modbus TCP gateways that forward to RS485 devices + # Note: pymodbus 3.10+ uses device_id parameter (formerly slave) + # device_id is used for Modbus TCP gateways that forward to RS485 devices if point.fc == 5: # Write Single Coil if len(values_to_write) > 0: response = self.connection_manager.client.write_coil( - address, values_to_write[0], slave=self.connection_manager.slave_id + address, values_to_write[0], device_id=self.connection_manager.slave_id ) else: self.logger.error( @@ -343,7 +344,7 @@ def run(self): # pylint: disable=too-many-locals elif point.fc == 6: # Write Single Register if len(values_to_write) > 0: response = self.connection_manager.client.write_register( - address, values_to_write[0], slave=self.connection_manager.slave_id + address, values_to_write[0], device_id=self.connection_manager.slave_id ) else: self.logger.error( @@ -353,11 +354,11 @@ def run(self): # pylint: disable=too-many-locals continue elif point.fc == 15: # Write Multiple Coils response = self.connection_manager.client.write_coils( - address, values_to_write, slave=self.connection_manager.slave_id + address, values_to_write, device_id=self.connection_manager.slave_id ) elif point.fc == 16: # Write Multiple Registers response = self.connection_manager.client.write_registers( - address, values_to_write, slave=self.connection_manager.slave_id + address, values_to_write, device_id=self.connection_manager.slave_id ) else: self.logger.warn(f"[{self.name}] Unsupported write FC: {point.fc}") From dd21f58bc54f5fed025f7ffcf8d8df533daa9ff2 Mon Sep 17 00:00:00 2001 From: Thiago Alves Date: Tue, 20 Jan 2026 22:18:43 -0500 Subject: [PATCH 3/4] feat: Add Modbus RTU support with multi-drop architecture This commit adds Modbus RTU (serial) communication support to the modbus master plugin, enabling serial communication with Modbus slave devices alongside existing TCP functionality. Key changes: 1. Serial ports REST endpoint (webserver/app.py): - New GET endpoint at /api/serial-ports - Lists available serial ports using pyserial 2. Connection manager refactoring (modbus_master_connection.py): - Support for both TCP and RTU transport types - Factory method _create_client() for transport-specific clients - RTU parameters: serial_port, baud_rate, parity, stop_bits, data_bits 3. Config model updates (modbus_master_config_model.py): - TransportType and ParityType type literals - RTU-specific fields in ModbusDeviceConfig - Transport-specific validation rules - Multi-drop validation (duplicate slave IDs on same bus) 4. Plugin updates (modbus_master_plugin.py): - New ModbusRtuBusHandler class for multi-drop RTU support - One thread per serial bus, handling multiple slave IDs - group_rtu_devices_by_bus() helper function - Updated start_loop() to separate TCP and RTU device handling 5. Dependencies (requirements.txt): - Added pyserial>=3.5 Multi-drop RTU architecture: - Devices on same serial port share a single connection - Each device has unique slave_id (1-247) - IO operations include slave_id to route to correct device - Bus grouping key: serial_port:baud_rate:parity:stop_bits:data_bits Backward compatibility: - Existing TCP configurations work without changes - Missing transport field defaults to "tcp" Co-Authored-By: Claude Opus 4.5 --- .../modbus_master/modbus_master_connection.py | 88 +++- .../modbus_master/modbus_master_plugin.py | 489 +++++++++++++++++- .../python/modbus_master/requirements.txt | 1 + .../modbus_master_config_model.py | 97 +++- webserver/app.py | 32 ++ 5 files changed, 669 insertions(+), 38 deletions(-) diff --git a/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py b/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py index f1ba0ccb..6a87210f 100644 --- a/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py +++ b/core/src/drivers/plugins/python/modbus_master/modbus_master_connection.py @@ -1,19 +1,47 @@ """Modbus Master plugin connection management utilities.""" import time -from typing import Optional +from typing import Literal, Optional, Union -from pymodbus.client import ModbusTcpClient +from pymodbus.client import ModbusTcpClient, ModbusSerialClient + +TransportType = Literal["tcp", "rtu"] +ParityType = Literal["N", "E", "O"] class ModbusConnectionManager: # pylint: disable=too-many-instance-attributes - """Manages Modbus TCP connections with retry logic.""" + """Manages Modbus TCP and RTU connections with retry logic.""" + + def __init__( + self, + transport: TransportType = "tcp", + # TCP parameters + host: str = "127.0.0.1", + port: int = 502, + # RTU parameters + serial_port: str = "", + baud_rate: int = 9600, + parity: ParityType = "N", + stop_bits: int = 1, + data_bits: int = 8, + # Common parameters + timeout_ms: int = 1000, + slave_id: int = 1, + ): + self.transport = transport + self.timeout = timeout_ms / 1000.0 # Convert to seconds + self.slave_id = slave_id # Unit/Slave ID for Modbus operations - def __init__(self, host: str, port: int, timeout_ms: int, slave_id: int = 1): + # TCP configuration self.host = host self.port = port - self.timeout = timeout_ms / 1000.0 # Convert to seconds - self.slave_id = slave_id # Unit/Slave ID for Modbus TCP gateways + + # RTU configuration + self.serial_port = serial_port + self.baud_rate = baud_rate + self.parity = parity + self.stop_bits = stop_bits + self.data_bits = data_bits # Retry configuration self.retry_delay_base = 2.0 # initial delay between attempts (seconds) @@ -22,9 +50,34 @@ def __init__(self, host: str, port: int, timeout_ms: int, slave_id: int = 1): # Connection state - is_connected is the authoritative flag for connection health # It is set to False when any error occurs, forcing reconnection on next cycle - self.client: Optional[ModbusTcpClient] = None + self.client: Optional[Union[ModbusTcpClient, ModbusSerialClient]] = None self.is_connected = False + def _create_client(self) -> Union[ModbusTcpClient, ModbusSerialClient]: + """Create the appropriate Modbus client based on transport type.""" + if self.transport == "tcp": + return ModbusTcpClient( + host=self.host, + port=self.port, + timeout=self.timeout + ) + else: # RTU + return ModbusSerialClient( + port=self.serial_port, + baudrate=self.baud_rate, + parity=self.parity, + stopbits=self.stop_bits, + bytesize=self.data_bits, + timeout=self.timeout + ) + + def get_connection_info(self) -> str: + """Return human-readable connection information.""" + if self.transport == "tcp": + return f"TCP {self.host}:{self.port}" + else: + return f"RTU {self.serial_port}@{self.baud_rate}" + def connect_with_retry(self, stop_event=None) -> bool: """ Attempts to connect to Modbus device with infinite retry. @@ -36,6 +89,7 @@ def connect_with_retry(self, stop_event=None) -> bool: True if connected successfully, False if interrupted """ retry_count = 0 + conn_info = self.get_connection_info() while stop_event is None or not stop_event.is_set(): try: @@ -46,30 +100,28 @@ def connect_with_retry(self, stop_event=None) -> bool: self.client.close() except Exception: pass - self.client = ModbusTcpClient( - host=self.host, port=self.port, timeout=self.timeout - ) + self.client = self._create_client() # Attempt to connect if self.client.connect(): print( - f"(PASS) Connected to {self.host}:{self.port} (attempt {retry_count + 1})" + f"(PASS) Connected to {conn_info} (attempt {retry_count + 1})" ) self.is_connected = True self.retry_delay_current = self.retry_delay_base # Reset delay return True except Exception as e: - print(f"(FAIL) Connection attempt {retry_count + 1} failed: {e}") + print(f"(FAIL) Connection attempt {retry_count + 1} to {conn_info} failed: {e}") # Increment counter and calculate delay retry_count += 1 # Attempt logging if retry_count == 1: - print(f"Failed to connect to {self.host}:{self.port}, starting retry attempts...") + print(f"Failed to connect to {conn_info}, starting retry attempts...") elif retry_count % 10 == 0: # Log every 10 attempts - print(f"Connection attempt {retry_count} failed, continuing retries...") + print(f"Connection attempt {retry_count} to {conn_info} failed, continuing retries...") # Wait with increasing delay (limited exponential backoff) delay = min(self.retry_delay_current, self.retry_delay_max) @@ -128,21 +180,23 @@ def mark_disconnected(self): ModbusIOException, etc.) to ensure the connection is properly re-established. """ self.is_connected = False + conn_info = self.get_connection_info() print( - f"Connection to {self.host}:{self.port} marked as disconnected, " + f"Connection to {conn_info} marked as disconnected, " "will reconnect on next cycle" ) def disconnect(self): """Close the connection and clean up resources.""" + conn_info = self.get_connection_info() try: if self.client: self.client.close() self.client = None self.is_connected = False - print(f"Disconnected from {self.host}:{self.port}") + print(f"Disconnected from {conn_info}") except Exception as e: - print(f"(FAIL) Error disconnecting from {self.host}:{self.port}: {e}") + print(f"(FAIL) Error disconnecting from {conn_info}: {e}") def is_healthy(self) -> bool: """ diff --git a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py index ab66a566..9b4300b9 100644 --- a/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py +++ b/core/src/drivers/plugins/python/modbus_master/modbus_master_plugin.py @@ -67,16 +67,26 @@ class ModbusSlaveDevice(threading.Thread): + """ + Handles a single Modbus TCP device with its own connection. + For RTU devices, use ModbusRtuBusHandler instead. + """ def __init__(self, device_config: Any, sba: SafeBufferAccess, plugin_logger: PluginLogger): super().__init__(daemon=True) self.device_config = device_config self.sba = sba self.logger = plugin_logger self._stop_event = threading.Event() + + # Create connection manager for TCP device self.connection_manager = ModbusConnectionManager( - device_config.host, device_config.port, device_config.timeout_ms, device_config.slave_id + transport="tcp", + host=device_config.host, + port=device_config.port, + timeout_ms=device_config.timeout_ms, + slave_id=device_config.slave_id ) - self.name = f"ModbusSlave-{device_config.name}-{device_config.host}:{device_config.port}" + self.name = f"ModbusSlave-{device_config.name}-TCP-{device_config.host}:{device_config.port}" # Calculate GCD of all I/O point cycle times for this device self.gcd_cycle_time_ms = calculate_gcd_of_cycle_times(device_config.io_points) @@ -423,6 +433,434 @@ def stop(self): self._stop_event.set() +class ModbusRtuBusHandler(threading.Thread): + """ + Handles multiple Modbus RTU devices on a single serial port (multi-drop). + One thread per serial bus, managing multiple slave IDs. + + This differs from ModbusSlaveDevice in that: + - A single serial connection is shared by multiple devices + - Each device has a unique slave_id + - IO operations include the slave_id to route to the correct device + """ + + def __init__( + self, + serial_config: dict, # {serial_port, baud_rate, parity, stop_bits, data_bits, timeout_ms} + devices: List[Any], # List of ModbusDeviceConfig on this bus + sba: SafeBufferAccess, + plugin_logger: PluginLogger, + ): + super().__init__(daemon=True) + self.serial_config = serial_config + self.devices = devices + self.sba = sba + self.logger = plugin_logger + self._stop_event = threading.Event() + + # Create single connection for the entire bus + self.connection_manager = ModbusConnectionManager( + transport="rtu", + serial_port=serial_config["serial_port"], + baud_rate=serial_config["baud_rate"], + parity=serial_config["parity"], + stop_bits=serial_config["stop_bits"], + data_bits=serial_config["data_bits"], + timeout_ms=serial_config["timeout_ms"], + slave_id=1, # Default, will use device-specific slave_id per operation + ) + + # Build consolidated IO point list with slave_id + # Each entry: {"point": io_point, "slave_id": device.slave_id, "device_name": device.name} + self.all_io_points = [] + for device in devices: + for point in device.io_points: + self.all_io_points.append({ + "point": point, + "slave_id": device.slave_id, + "device_name": device.name, + }) + + # Calculate GCD of all IO point cycle times across all devices on this bus + all_cycle_times = [p.cycle_time_ms for d in devices for p in d.io_points] + self.gcd_cycle_time_ms = calculate_gcd_of_cycle_times( + [type('obj', (object,), {'cycle_time_ms': ct})() for ct in all_cycle_times] + ) if all_cycle_times else 1000 + + device_names = ", ".join([d.name for d in devices]) + self.name = f"ModbusRtuBus-{serial_config['serial_port']}" + self.logger.info( + f"[{self.name}] RTU bus handler created for devices: {device_names} " + f"({len(self.all_io_points)} total IO points, GCD cycle: {self.gcd_cycle_time_ms}ms)" + ) + + def _ensure_connection(self) -> bool: + """Ensures there is a valid connection, reconnecting if necessary.""" + return self.connection_manager.ensure_connection(self._stop_event) + + def run(self): # pylint: disable=too-many-locals,too-many-branches,too-many-statements + self.logger.info(f"[{self.name}] Thread started for {len(self.devices)} device(s).") + + gcd_cycle_time_seconds = self.gcd_cycle_time_ms / 1000.0 + + if not self.all_io_points: + self.logger.warn(f"[{self.name}] No I/O points defined. Stopping thread.") + return + + # Connect with infinite retry + if not self.connection_manager.connect_with_retry(self._stop_event): + self.logger.info(f"[{self.name}] Thread stopped before connection could be established.") + return + + # Initialize cycle counter + cycle_counter = 0 + + try: + while not self._stop_event.is_set(): + cycle_start_time = time.monotonic() + + # Ensure connection exists before cycle + if not self._ensure_connection(): + break # Thread was interrupted + + # 1. READ OPERATIONS - Process only I/O points that are due for polling this cycle + read_results_to_update = [] # Store tuples: (iec_addr, modbus_data, length) + + for io_entry in self.all_io_points: + if self._stop_event.is_set(): + break + + point = io_entry["point"] + slave_id = io_entry["slave_id"] + + # Skip if not a read operation + if point.fc not in [1, 2, 3, 4]: + continue + + # Check if point should be polled this cycle + point_cycle_multiple = point.cycle_time_ms // self.gcd_cycle_time_ms + if (cycle_counter % point_cycle_multiple) != 0: + continue + + try: + # Parse Modbus offset + address = parse_modbus_offset(point.offset) + + # Calculate the correct number of Modbus registers/coils needed + if point.fc in [3, 4]: # Register-based operations + iec_size = point.iec_location.size + registers_per_iec_element = get_modbus_registers_count_for_iec_size( + iec_size + ) + count = point.length * registers_per_iec_element + else: # Coil/Discrete Input operations + count = point.length + + # Perform Modbus read with device-specific slave_id + if point.fc == 1: # Read Coils + response = self.connection_manager.client.read_coils( + address, count=count, device_id=slave_id + ) + elif point.fc == 2: # Read Discrete Inputs + response = self.connection_manager.client.read_discrete_inputs( + address, count=count, device_id=slave_id + ) + elif point.fc == 3: # Read Holding Registers + response = self.connection_manager.client.read_holding_registers( + address, count=count, device_id=slave_id + ) + elif point.fc == 4: # Read Input Registers + response = self.connection_manager.client.read_input_registers( + address, count=count, device_id=slave_id + ) + else: + self.logger.warn(f"[{self.name}] Unsupported read FC: {point.fc}") + continue + + # Check if response is valid + if isinstance(response, (ModbusIOException, ExceptionResponse)): + self.logger.error( + f"[{self.name}] Modbus read error " + f"(slave {slave_id}, FC {point.fc}, addr {address}): {response}" + ) + self.connection_manager.mark_disconnected() + continue + if response.isError(): + self.logger.error( + f"[{self.name}] Modbus read failed " + f"(slave {slave_id}, FC {point.fc}, addr {address}): {response}" + ) + self.connection_manager.mark_disconnected() + continue + + # Extract data from response + if point.fc in [1, 2]: + modbus_data = response.bits + else: + modbus_data = response.registers + + # Store for batch update + read_results_to_update.append( + (point.iec_location, modbus_data, point.length) + ) + + except ValueError as ve: + self.logger.error( + f"[{self.name}] Invalid offset " + f"'{point.offset}' for slave {slave_id}, FC {point.fc}: {ve}" + ) + except ConnectionException as ce: + self.logger.error( + f"[{self.name}] Connection error reading " + f"slave {slave_id}, FC {point.fc}, offset {point.offset}: {ce}" + ) + self.connection_manager.mark_disconnected() + except Exception as e: + self.logger.error( + f"[{self.name}] Error reading " + f"slave {slave_id}, FC {point.fc}, offset {point.offset}: {e}" + ) + self.connection_manager.mark_disconnected() + + # Batch update IEC buffers with single mutex acquisition + if read_results_to_update: + preconverted_updates = [] + for iec_addr, modbus_data, length in read_results_to_update: + converted_values, details = convert_modbus_data_to_iec_values( + iec_addr, modbus_data, length + ) + if converted_values is not None and details is not None: + preconverted_updates.append((converted_values, details)) + else: + self.logger.error( + f"[{self.name}] Data conversion failed " + f"for IEC address {iec_addr}, length={length}" + ) + + if preconverted_updates: + lock_acquired, lock_msg = self.sba.acquire_mutex() + if lock_acquired: + try: + for converted_values, details in preconverted_updates: + write_preconverted_iec_values( + self.sba, converted_values, details + ) + finally: + self.sba.release_mutex() + else: + self.logger.error( + f"[{self.name}] Failed to acquire mutex " + f"for read updates: {lock_msg}" + ) + + # 2. WRITE OPERATIONS + write_points_due = [] + for io_entry in self.all_io_points: + if self._stop_event.is_set(): + break + + point = io_entry["point"] + slave_id = io_entry["slave_id"] + + if point.fc not in [5, 6, 15, 16]: # Write functions + continue + + point_cycle_multiple = point.cycle_time_ms // self.gcd_cycle_time_ms + if (cycle_counter % point_cycle_multiple) != 0: + continue + + try: + address = parse_modbus_offset(point.offset) + write_points_due.append((point, address, slave_id)) + except ValueError as ve: + self.logger.error( + f"[{self.name}] Invalid offset " + f"'{point.offset}' for slave {slave_id}, FC {point.fc}: {ve}" + ) + + # Phase 2: Read all raw IEC values under a single mutex acquisition + raw_iec_data = [] + if write_points_due: + lock_acquired, lock_msg = self.sba.acquire_mutex() + if lock_acquired: + try: + for point, address, slave_id in write_points_due: + raw_values, details, iec_size = read_raw_iec_values( + self.sba, point.iec_location, point.length + ) + if raw_values is not None: + raw_iec_data.append( + (point, address, slave_id, raw_values, details, iec_size) + ) + else: + self.logger.error( + f"[{self.name}] Failed to read raw IEC data " + f"for write (slave {slave_id}, FC {point.fc}, offset {point.offset})" + ) + finally: + self.sba.release_mutex() + else: + self.logger.error( + f"[{self.name}] Failed to acquire mutex " + f"for batch write prep: {lock_msg}" + ) + + # Phase 3: Convert and perform Modbus writes + for point, address, slave_id, raw_values, details, iec_size in raw_iec_data: + if self._stop_event.is_set(): + break + + try: + values_to_write = convert_raw_iec_to_modbus(raw_values, details, iec_size) + + if values_to_write is None: + self.logger.error( + f"[{self.name}] Failed to convert data " + f"for Modbus write (slave {slave_id}, FC {point.fc}, " + f"offset {point.offset})" + ) + continue + + # Perform Modbus write with device-specific slave_id + if point.fc == 5: # Write Single Coil + if len(values_to_write) > 0: + response = self.connection_manager.client.write_coil( + address, values_to_write[0], device_id=slave_id + ) + else: + self.logger.error( + f"[{self.name}] No data to write " + f"for slave {slave_id}, FC 5, offset {address}" + ) + continue + elif point.fc == 6: # Write Single Register + if len(values_to_write) > 0: + response = self.connection_manager.client.write_register( + address, values_to_write[0], device_id=slave_id + ) + else: + self.logger.error( + f"[{self.name}] No data to write " + f"for slave {slave_id}, FC 6, offset {address}" + ) + continue + elif point.fc == 15: # Write Multiple Coils + response = self.connection_manager.client.write_coils( + address, values_to_write, device_id=slave_id + ) + elif point.fc == 16: # Write Multiple Registers + response = self.connection_manager.client.write_registers( + address, values_to_write, device_id=slave_id + ) + else: + self.logger.warn(f"[{self.name}] Unsupported write FC: {point.fc}") + continue + + # Check write response + if isinstance(response, (ModbusIOException, ExceptionResponse)): + self.logger.error( + f"[{self.name}] Modbus write error " + f"(slave {slave_id}, FC {point.fc}, addr {address}): {response}" + ) + self.connection_manager.mark_disconnected() + elif response.isError(): + self.logger.error( + f"[{self.name}] Modbus write failed " + f"(slave {slave_id}, FC {point.fc}, addr {address}): {response}" + ) + self.connection_manager.mark_disconnected() + + except ConnectionException as ce: + self.logger.error( + f"[{self.name}] Connection error writing " + f"slave {slave_id}, FC {point.fc}, offset {point.offset}: {ce}" + ) + self.connection_manager.mark_disconnected() + except Exception as e: + self.logger.error( + f"[{self.name}] Error writing " + f"slave {slave_id}, FC {point.fc}, offset {point.offset}: {e}" + ) + self.connection_manager.mark_disconnected() + + # 3. CYCLE TIMING + cycle_elapsed = time.monotonic() - cycle_start_time + sleep_duration = max(0, gcd_cycle_time_seconds - cycle_elapsed) + if sleep_duration > 0: + sleep_increment = 0.1 + remaining_sleep = sleep_duration + + while remaining_sleep > 0 and not self._stop_event.is_set(): + actual_sleep = min(sleep_increment, remaining_sleep) + time.sleep(actual_sleep) + remaining_sleep -= actual_sleep + + cycle_counter += 1 + + except ConnectionException as ce: + self.logger.error(f"[{self.name}] Connection failed: {ce}") + self.connection_manager.mark_disconnected() + except Exception as e: + self.logger.error(f"[{self.name}] Unexpected error in thread: {e}") + traceback.print_exc() + finally: + self.connection_manager.disconnect() + self.logger.info(f"[{self.name}] Thread finished and connection closed.") + + def stop(self): + self.logger.info(f"[{self.name}] Stop signal received.") + self._stop_event.set() + + +def group_rtu_devices_by_bus(devices: List[Any]) -> dict: + """ + Group RTU devices by serial port configuration (forming unique "buses"). + + Devices on the same serial port with the same settings share a connection. + Each bus has one thread handling all devices (different slave IDs). + + Args: + devices: List of ModbusDeviceConfig with transport="rtu" + + Returns: + Dictionary keyed by bus identifier: + { + "serial_port:baud_rate:parity:stop_bits:data_bits": { + "config": {serial_port, baud_rate, parity, stop_bits, data_bits, timeout_ms}, + "devices": [list of devices on this bus] + } + } + """ + buses = {} + + for device in devices: + bus_key = ( + f"{device.serial_port}:{device.baud_rate}:" + f"{device.parity}:{device.stop_bits}:{device.data_bits}" + ) + + if bus_key not in buses: + buses[bus_key] = { + "config": { + "serial_port": device.serial_port, + "baud_rate": device.baud_rate, + "parity": device.parity, + "stop_bits": device.stop_bits, + "data_bits": device.data_bits, + "timeout_ms": device.timeout_ms, + }, + "devices": [], + } + + buses[bus_key]["devices"].append(device) + + # Use minimum timeout across all devices on the bus + if device.timeout_ms < buses[bus_key]["config"]["timeout_ms"]: + buses[bus_key]["config"]["timeout_ms"] = device.timeout_ms + + return buses + + def init(args_capsule): """ Initialize the Modbus Master plugin. @@ -480,6 +918,9 @@ def start_loop(): """ Start the main loop for all configured Modbus devices. This function is called after successful initialization. + + TCP devices: One thread per device (ModbusSlaveDevice) + RTU devices: One thread per serial bus (ModbusRtuBusHandler), supporting multi-drop """ # pylint: disable=global-variable-not-assigned global slave_threads, modbus_master_config, safe_buffer_accessor, logger @@ -492,21 +933,53 @@ def start_loop(): logger.error("Plugin not properly initialized") return False - # Start a thread for each configured device - for device_config in modbus_master_config.devices: + # Separate TCP and RTU devices + tcp_devices = [d for d in modbus_master_config.devices if d.transport == "tcp"] + rtu_devices = [d for d in modbus_master_config.devices if d.transport == "rtu"] + + logger.info(f"Found {len(tcp_devices)} TCP device(s) and {len(rtu_devices)} RTU device(s)") + + # Start TCP devices (one thread per device - existing behavior) + for device_config in tcp_devices: try: device_thread = ModbusSlaveDevice(device_config, safe_buffer_accessor, logger) device_thread.start() slave_threads.append(device_thread) logger.info( - f"Started thread for device: {device_config.name} " - f"({device_config.host}:{device_config.port})" + f"Started TCP thread for device: {device_config.name} " + f"({device_config.host}:{device_config.port}, slave_id={device_config.slave_id})" ) except Exception as e: - logger.error(f"Failed to start thread for device {device_config.name}: {e}") + logger.error(f"Failed to start TCP thread for device {device_config.name}: {e}") + + # Group RTU devices by serial port configuration + if rtu_devices: + rtu_buses = group_rtu_devices_by_bus(rtu_devices) + logger.info(f"RTU devices grouped into {len(rtu_buses)} serial bus(es)") + + # Start RTU bus handlers (one thread per serial port) + for bus_key, bus_info in rtu_buses.items(): + try: + bus_thread = ModbusRtuBusHandler( + serial_config=bus_info["config"], + devices=bus_info["devices"], + sba=safe_buffer_accessor, + plugin_logger=logger, + ) + bus_thread.start() + slave_threads.append(bus_thread) + + device_names = ", ".join([d.name for d in bus_info["devices"]]) + slave_ids = ", ".join([str(d.slave_id) for d in bus_info["devices"]]) + logger.info( + f"Started RTU bus thread: {bus_info['config']['serial_port']} " + f"(devices: {device_names}, slave_ids: {slave_ids})" + ) + except Exception as e: + logger.error(f"Failed to start RTU bus thread for {bus_key}: {e}") if slave_threads: - logger.info(f"Successfully started {len(slave_threads)} device thread(s)") + logger.info(f"Successfully started {len(slave_threads)} device/bus thread(s)") return True else: logger.error("No device threads started") diff --git a/core/src/drivers/plugins/python/modbus_master/requirements.txt b/core/src/drivers/plugins/python/modbus_master/requirements.txt index 7b30613c..2c133a10 100644 --- a/core/src/drivers/plugins/python/modbus_master/requirements.txt +++ b/core/src/drivers/plugins/python/modbus_master/requirements.txt @@ -1,3 +1,4 @@ pymodbus==3.11.2 +pyserial>=3.5 asyncio-mqtt==0.16.2 pytest \ No newline at end of file diff --git a/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py b/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py index b8c63587..ff0a8942 100644 --- a/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py +++ b/core/src/drivers/plugins/python/shared/plugin_config_decode/modbus_master_config_model.py @@ -12,6 +12,8 @@ Area = Literal["I", "Q", "M"] Size = Literal["X", "B", "W", "D", "L"] +TransportType = Literal["tcp", "rtu"] +ParityType = Literal["N", "E", "O"] ADDR_RE = re.compile(r"^%([IQM])([XBWDL])(\d+)(?:\.(\d+))?$", re.IGNORECASE) @@ -65,15 +67,30 @@ def parse_iec_address(s: str) -> IECAddress: class ModbusDeviceConfig: """ Model for a single Modbus device configuration. + Supports both TCP and RTU transport types. """ def __init__(self): self.name: str = "UNDEFINED" self.protocol: str = "MODBUS" self.type: str = "SLAVE" + + # Transport type - "tcp" or "rtu" (defaults to "tcp" for backward compatibility) + self.transport: TransportType = "tcp" + + # TCP-specific fields self.host: str = "127.0.0.1" self.port: int = 502 + + # RTU-specific fields + self.serial_port: str = "" + self.baud_rate: int = 9600 + self.parity: ParityType = "N" + self.stop_bits: int = 1 + self.data_bits: int = 8 + + # Common fields self.timeout_ms: int = 1000 - self.slave_id: int = 1 # Unit/Slave ID for Modbus TCP gateways (0-255) + self.slave_id: int = 1 # Unit/Slave ID (0-255 for TCP gateways, 1-247 for RTU) self.io_points: List['ModbusIoPointConfig'] = [] @classmethod @@ -87,8 +104,22 @@ def from_dict(cls, data: Dict[str, Any]) -> 'ModbusDeviceConfig': config = data.get("config", {}) device.type = config.get("type", "SLAVE") + + # Transport type - defaults to "tcp" for backward compatibility + device.transport = config.get("transport", "tcp") + + # TCP fields device.host = config.get("host", "127.0.0.1") device.port = config.get("port", 502) + + # RTU fields + device.serial_port = config.get("serial_port", "") + device.baud_rate = config.get("baud_rate", 9600) + device.parity = config.get("parity", "N") + device.stop_bits = config.get("stop_bits", 1) + device.data_bits = config.get("data_bits", 8) + + # Common fields device.timeout_ms = config.get("timeout_ms", 1000) device.slave_id = config.get("slave_id", 1) @@ -105,16 +136,35 @@ def from_dict(cls, data: Dict[str, Any]) -> 'ModbusDeviceConfig': def validate(self) -> None: """Validates the device configuration.""" if self.name == "UNDEFINED": - raise ValueError(f"Device name is undefined for device {self.host}:{self.port}.") + raise ValueError(f"Device name is undefined.") if self.protocol != "MODBUS": raise ValueError(f"Invalid protocol: {self.protocol}. Expected 'MODBUS' for device {self.name}.") - if not isinstance(self.port, int) or self.port <= 0: - raise ValueError(f"Invalid port: {self.port}. Must be a positive integer for device {self.name}.") if not isinstance(self.timeout_ms, int) or self.timeout_ms <= 0: raise ValueError(f"Invalid timeout_ms: {self.timeout_ms}. Must be a positive integer for device {self.name}.") - if not isinstance(self.slave_id, int) or not (0 <= self.slave_id <= 255): - raise ValueError(f"Invalid slave_id: {self.slave_id}. Must be an integer between 0 and 255 for device {self.name}.") + # Transport-specific validation + if self.transport == "rtu": + if not self.serial_port: + raise ValueError(f"Serial port is required for RTU device '{self.name}'") + if not isinstance(self.slave_id, int) or not (1 <= self.slave_id <= 247): + raise ValueError(f"Slave ID must be 1-247 for RTU device '{self.name}', got {self.slave_id}") + if self.parity not in ("N", "E", "O"): + raise ValueError(f"Invalid parity '{self.parity}' for RTU device '{self.name}'. Must be 'N', 'E', or 'O'.") + if self.stop_bits not in (1, 2): + raise ValueError(f"Stop bits must be 1 or 2 for RTU device '{self.name}', got {self.stop_bits}") + if self.data_bits not in (7, 8): + raise ValueError(f"Data bits must be 7 or 8 for RTU device '{self.name}', got {self.data_bits}") + elif self.transport == "tcp": + if not self.host: + raise ValueError(f"Host is required for TCP device '{self.name}'") + if not isinstance(self.port, int) or not (1 <= self.port <= 65535): + raise ValueError(f"Port must be 1-65535 for TCP device '{self.name}', got {self.port}") + if not isinstance(self.slave_id, int) or not (0 <= self.slave_id <= 255): + raise ValueError(f"Slave ID must be 0-255 for TCP device '{self.name}', got {self.slave_id}") + else: + raise ValueError(f"Invalid transport type '{self.transport}' for device '{self.name}'. Must be 'tcp' or 'rtu'.") + + # Validate IO points for i, point in enumerate(self.io_points): if not isinstance(point, ModbusIoPointConfig): raise ValueError(f"Invalid I/O point {i}: {point}. Must be an instance of ModbusIoPointConfig for device {self.name}.") @@ -130,7 +180,10 @@ def validate(self) -> None: raise ValueError(f"Invalid cycle_time_ms: {point.cycle_time_ms}. Must be a positive integer for device {self.name}, point {i}.") def __repr__(self) -> str: - return f"ModbusDeviceConfig(name='{self.name}', host='{self.host}', port={self.port}, slave_id={self.slave_id}, io_points={len(self.io_points)})" + if self.transport == "tcp": + return f"ModbusDeviceConfig(name='{self.name}', transport='tcp', host='{self.host}', port={self.port}, slave_id={self.slave_id}, io_points={len(self.io_points)})" + else: + return f"ModbusDeviceConfig(name='{self.name}', transport='rtu', serial_port='{self.serial_port}', baud_rate={self.baud_rate}, slave_id={self.slave_id}, io_points={len(self.io_points)})" class ModbusMasterConfig(PluginConfigContract): """ @@ -167,23 +220,41 @@ def validate(self) -> None: """Validates the configuration.""" if not self.devices: raise ValueError("No devices configured. At least one Modbus device must be defined.") - + # Validate each device for i, device in enumerate(self.devices): try: device.validate() except Exception as e: raise ValueError(f"Device #{i+1} validation failed: {e}") - + # Check for duplicate device names device_names = [device.name for device in self.devices] if len(device_names) != len(set(device_names)): raise ValueError("Duplicate device names found. Each device must have a unique name.") - - # Check for duplicate host:port combinations - host_port_combinations = [(device.host, device.port) for device in self.devices] + + # Separate TCP and RTU devices for different validation rules + tcp_devices = [d for d in self.devices if d.transport == "tcp"] + rtu_devices = [d for d in self.devices if d.transport == "rtu"] + + # Check for duplicate host:port combinations for TCP devices + host_port_combinations = [(device.host, device.port) for device in tcp_devices] if len(host_port_combinations) != len(set(host_port_combinations)): - raise ValueError("Duplicate host:port combinations found. Each device must have a unique host:port combination.") + raise ValueError("Duplicate host:port combinations found for TCP devices. Each TCP device must have a unique host:port combination.") + + # Check for duplicate slave IDs on the same serial bus for RTU devices + # Group RTU devices by serial bus (serial_port + baud_rate + parity + stop_bits + data_bits) + rtu_buses: Dict[str, List[int]] = {} + for device in rtu_devices: + bus_key = f"{device.serial_port}:{device.baud_rate}:{device.parity}:{device.stop_bits}:{device.data_bits}" + if bus_key not in rtu_buses: + rtu_buses[bus_key] = [] + rtu_buses[bus_key].append(device.slave_id) + + # Check for duplicate slave IDs within each bus + for bus_key, slave_ids in rtu_buses.items(): + if len(slave_ids) != len(set(slave_ids)): + raise ValueError(f"Duplicate slave IDs found on RTU bus '{bus_key}'. Each device on the same serial bus must have a unique slave ID.") def __repr__(self) -> str: return f"{self.__class__.__name__}(devices={len(self.devices)})" diff --git a/webserver/app.py b/webserver/app.py index 59e54424..99a59846 100644 --- a/webserver/app.py +++ b/webserver/app.py @@ -130,6 +130,37 @@ def handle_ping(data: dict) -> dict: return {"status": response} +def handle_list_serial_ports(data: dict) -> dict: + """ + List available serial ports on the system. + + Returns: + { + "ports": [ + {"device": "/dev/ttyUSB0", "description": "USB-Serial Controller"}, + {"device": "/dev/ttyACM0", "description": "Arduino Uno"}, + ... + ] + } + """ + try: + import serial.tools.list_ports + + ports = serial.tools.list_ports.comports() + port_list = [ + { + "device": port.device, + "description": port.description or port.device, + } + for port in ports + ] + return {"ports": port_list} + except ImportError: + return {"error": "pyserial not installed", "ports": []} + except Exception as e: + return {"error": str(e), "ports": []} + + GET_HANDLERS: dict[str, Callable[[dict], dict]] = { "start-plc": handle_start_plc, "stop-plc": handle_stop_plc, @@ -137,6 +168,7 @@ def handle_ping(data: dict) -> dict: "compilation-status": handle_compilation_status, "status": handle_status, "ping": handle_ping, + "serial-ports": handle_list_serial_ports, } From bf5b3e6576a0d03c0f2a405c8718f846e0167661 Mon Sep 17 00:00:00 2001 From: Thiago Alves Date: Wed, 21 Jan 2026 16:43:59 -0500 Subject: [PATCH 4/4] fix: add pyserial to main requirements for serial port listing The handle_list_serial_ports endpoint in app.py needs pyserial to enumerate available serial ports, but it runs in the main webserver context. Previously pyserial was only installed in the modbus_master plugin's virtual environment. This fix adds pyserial to the main requirements.txt so the serial port listing API endpoint works correctly. Co-Authored-By: Claude Opus 4.5 --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 1dd17d45..aae4fd93 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,3 +9,4 @@ pytest pytest-flask pre-commit psutil; sys_platform != "cygwin" +pyserial>=3.5