Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,18 +1,47 @@
"""Modbus Master plugin connection management utilities."""

import time
from typing import Optional
from typing import Literal, Optional, Union

from pymodbus.client import ModbusTcpClient
from pymodbus.client import ModbusTcpClient, ModbusSerialClient

TransportType = Literal["tcp", "rtu"]
ParityType = Literal["N", "E", "O"]


class ModbusConnectionManager: # pylint: disable=too-many-instance-attributes
"""Manages Modbus TCP connections with retry logic."""
"""Manages Modbus TCP and RTU connections with retry logic."""

def __init__(
self,
transport: TransportType = "tcp",
# TCP parameters
host: str = "127.0.0.1",
port: int = 502,
# RTU parameters
serial_port: str = "",
baud_rate: int = 9600,
parity: ParityType = "N",
stop_bits: int = 1,
data_bits: int = 8,
# Common parameters
timeout_ms: int = 1000,
slave_id: int = 1,
):
self.transport = transport
self.timeout = timeout_ms / 1000.0 # Convert to seconds
self.slave_id = slave_id # Unit/Slave ID for Modbus operations

def __init__(self, host: str, port: int, timeout_ms: int):
# TCP configuration
self.host = host
self.port = port
self.timeout = timeout_ms / 1000.0 # Convert to seconds

# RTU configuration
self.serial_port = serial_port
self.baud_rate = baud_rate
self.parity = parity
self.stop_bits = stop_bits
self.data_bits = data_bits

# Retry configuration
self.retry_delay_base = 2.0 # initial delay between attempts (seconds)
Expand All @@ -21,9 +50,34 @@ def __init__(self, host: str, port: int, timeout_ms: int):

# Connection state - is_connected is the authoritative flag for connection health
# It is set to False when any error occurs, forcing reconnection on next cycle
self.client: Optional[ModbusTcpClient] = None
self.client: Optional[Union[ModbusTcpClient, ModbusSerialClient]] = None
self.is_connected = False

def _create_client(self) -> Union[ModbusTcpClient, ModbusSerialClient]:
"""Create the appropriate Modbus client based on transport type."""
if self.transport == "tcp":
return ModbusTcpClient(
host=self.host,
port=self.port,
timeout=self.timeout
)
else: # RTU
return ModbusSerialClient(
port=self.serial_port,
baudrate=self.baud_rate,
parity=self.parity,
stopbits=self.stop_bits,
bytesize=self.data_bits,
timeout=self.timeout
)

def get_connection_info(self) -> str:
"""Return human-readable connection information."""
if self.transport == "tcp":
return f"TCP {self.host}:{self.port}"
else:
return f"RTU {self.serial_port}@{self.baud_rate}"

def connect_with_retry(self, stop_event=None) -> bool:
"""
Attempts to connect to Modbus device with infinite retry.
Expand All @@ -35,6 +89,7 @@ def connect_with_retry(self, stop_event=None) -> bool:
True if connected successfully, False if interrupted
"""
retry_count = 0
conn_info = self.get_connection_info()

while stop_event is None or not stop_event.is_set():
try:
Expand All @@ -45,30 +100,28 @@ def connect_with_retry(self, stop_event=None) -> bool:
self.client.close()
except Exception:
pass
self.client = ModbusTcpClient(
host=self.host, port=self.port, timeout=self.timeout
)
self.client = self._create_client()

# Attempt to connect
if self.client.connect():
print(
f"(PASS) Connected to {self.host}:{self.port} (attempt {retry_count + 1})"
f"(PASS) Connected to {conn_info} (attempt {retry_count + 1})"
)
self.is_connected = True
self.retry_delay_current = self.retry_delay_base # Reset delay
return True

except Exception as e:
print(f"(FAIL) Connection attempt {retry_count + 1} failed: {e}")
print(f"(FAIL) Connection attempt {retry_count + 1} to {conn_info} failed: {e}")

# Increment counter and calculate delay
retry_count += 1

# Attempt logging
if retry_count == 1:
print(f"Failed to connect to {self.host}:{self.port}, starting retry attempts...")
print(f"Failed to connect to {conn_info}, starting retry attempts...")
elif retry_count % 10 == 0: # Log every 10 attempts
print(f"Connection attempt {retry_count} failed, continuing retries...")
print(f"Connection attempt {retry_count} to {conn_info} failed, continuing retries...")

# Wait with increasing delay (limited exponential backoff)
delay = min(self.retry_delay_current, self.retry_delay_max)
Expand Down Expand Up @@ -127,21 +180,23 @@ def mark_disconnected(self):
ModbusIOException, etc.) to ensure the connection is properly re-established.
"""
self.is_connected = False
conn_info = self.get_connection_info()
print(
f"Connection to {self.host}:{self.port} marked as disconnected, "
f"Connection to {conn_info} marked as disconnected, "
"will reconnect on next cycle"
)

def disconnect(self):
"""Close the connection and clean up resources."""
conn_info = self.get_connection_info()
try:
if self.client:
self.client.close()
self.client = None
self.is_connected = False
print(f"Disconnected from {self.host}:{self.port}")
print(f"Disconnected from {conn_info}")
except Exception as e:
print(f"(FAIL) Error disconnecting from {self.host}:{self.port}: {e}")
print(f"(FAIL) Error disconnecting from {conn_info}: {e}")

def is_healthy(self) -> bool:
"""
Expand Down
Loading