66"""
77
88import logging
9+ import socket
910import ssl
1011import sys
1112from collections import namedtuple
@@ -99,6 +100,7 @@ def __init__(
99100 self .broker_port : int = 1883
100101 self .tls_enabled : bool = False
101102 self .keepalive : int = 60
103+ self .connection_timeout : int = 5
102104 self .last_will_topic : Optional [str ] = None
103105 self .last_will_message : Optional [str ] = None
104106 self .last_will_qos : int = 0
@@ -108,7 +110,7 @@ def __init__(
108110 self .tls_certfile : Optional [str ] = None
109111 self .tls_keyfile : Optional [str ] = None
110112 self .tls_cert_reqs : int = ssl .CERT_NONE
111- self .tls_version : int = ssl .PROTOCOL_TLSv1
113+ self .tls_version : int = ssl .PROTOCOL_TLSv1_2
112114 self .tls_ciphers : Optional [List [str ]] = None
113115 self .tls_insecure : bool = False
114116
@@ -169,6 +171,9 @@ def init_app(self, app: Flask, config_prefix: str = "MQTT") -> None:
169171 if config_prefix + "_KEEPALIVE" in app .config :
170172 self .keepalive = app .config [config_prefix + "_KEEPALIVE" ]
171173
174+ if config_prefix + "_CONNECTION_TIMEOUT" in app .config :
175+ self .connection_timeout = app .config [config_prefix + "_CONNECTION_TIMEOUT" ]
176+
172177 if config_prefix + "_LAST_WILL_TOPIC" in app .config :
173178 self .last_will_topic = app .config [config_prefix + "_LAST_WILL_TOPIC" ]
174179
@@ -201,7 +206,7 @@ def init_app(self, app: Flask, config_prefix: str = "MQTT") -> None:
201206 config_prefix + "_TLS_CERT_REQS" , ssl .CERT_REQUIRED
202207 )
203208 self .tls_version = app .config .get (
204- config_prefix + "_TLS_VERSION" , ssl .PROTOCOL_TLSv1
209+ config_prefix + "_TLS_VERSION" , ssl .PROTOCOL_TLSv1_2
205210 )
206211
207212 # set last will message
@@ -219,40 +224,109 @@ def _connect(self) -> None:
219224 if self .username is not None :
220225 self .client .username_pw_set (self .username , self .password )
221226
222- # security
223- if self .tls_enabled :
224- self .client .tls_set (
225- ca_certs = self .tls_ca_certs ,
226- certfile = self .tls_certfile ,
227- keyfile = self .tls_keyfile ,
228- cert_reqs = self .tls_cert_reqs ,
229- tls_version = self .tls_version ,
230- ciphers = self .tls_ciphers ,
231- )
232-
233- if self .tls_insecure :
234- self .client .tls_insecure_set (self .tls_insecure )
227+ # Set socket timeout for connection attempts (paho-mqtt uses this internally)
228+ # This timeout applies during the socket connection phase
229+ default_timeout = None
230+ try :
231+ default_timeout = socket .getdefaulttimeout ()
232+ socket .setdefaulttimeout (self .connection_timeout )
233+ except Exception :
234+ pass # Continue if socket timeout setting fails
235+
236+ try :
237+ # security
238+ if self .tls_enabled :
239+ try :
240+ self .client .tls_set (
241+ ca_certs = self .tls_ca_certs ,
242+ certfile = self .tls_certfile ,
243+ keyfile = self .tls_keyfile ,
244+ cert_reqs = self .tls_cert_reqs ,
245+ tls_version = self .tls_version ,
246+ ciphers = self .tls_ciphers ,
247+ )
235248
236- if self ._connect_async :
237- # if connect_async is used
238- self . client . connect_async (
239- self . broker_url , self . broker_port , keepalive = self . keepalive
240- )
241- else :
242- res = self . client . connect (
243- self . broker_url , self . broker_port , keepalive = self . keepalive
244- )
249+ if self .tls_insecure :
250+ self . client . tls_insecure_set ( self . tls_insecure )
251+ except Exception as e :
252+ logger . error (
253+ "TLS configuration failed for broker {0}:{1} - {2}: {3}" . format (
254+ self . broker_url , self . broker_port , type ( e ). __name__ , str ( e )
255+ )
256+ )
257+ raise
245258
246- if res == 0 :
247- logger .debug (
248- "Connected client '{0}' to broker {1}:{2}" .format (
249- self .client_id , self .broker_url , self .broker_port
259+ if self ._connect_async :
260+ # if connect_async is used
261+ try :
262+ self .client .connect_async (
263+ self .broker_url , self .broker_port , keepalive = self .keepalive
250264 )
251- )
265+ except Exception as e :
266+ logger .error (
267+ "Failed to initiate async connection to broker {0}:{1} - {2}: {3}" .format (
268+ self .broker_url , self .broker_port , type (e ).__name__ , str (e )
269+ )
270+ )
271+ raise
252272 else :
253- logger .error (
254- "Could not connect to MQTT Broker, Error Code: {0}" .format (res )
255- )
273+ try :
274+ res = self .client .connect (
275+ self .broker_url , self .broker_port , keepalive = self .keepalive
276+ )
277+
278+ if res == 0 :
279+ logger .debug (
280+ "Connected client '{0}' to broker {1}:{2}" .format (
281+ self .client_id , self .broker_url , self .broker_port
282+ )
283+ )
284+ else :
285+ error_messages = {
286+ MQTT_ERR_AGAIN : "Resource temporarily unavailable" ,
287+ MQTT_ERR_NOMEM : "Out of memory" ,
288+ MQTT_ERR_PROTOCOL : "Protocol error" ,
289+ MQTT_ERR_INVAL : "Invalid function arguments" ,
290+ MQTT_ERR_NO_CONN : "No connection to broker" ,
291+ MQTT_ERR_CONN_REFUSED : "Connection refused by broker" ,
292+ MQTT_ERR_NOT_FOUND : "Resource not found" ,
293+ MQTT_ERR_CONN_LOST : "Connection lost" ,
294+ MQTT_ERR_TLS : "TLS error" ,
295+ MQTT_ERR_PAYLOAD_SIZE : "Payload size error" ,
296+ MQTT_ERR_NOT_SUPPORTED : "Operation not supported" ,
297+ MQTT_ERR_AUTH : "Authentication failed" ,
298+ MQTT_ERR_ACL_DENIED : "ACL denied" ,
299+ MQTT_ERR_UNKNOWN : "Unknown error" ,
300+ MQTT_ERR_ERRNO : "System error" ,
301+ MQTT_ERR_QUEUE_SIZE : "Queue size exceeded" ,
302+ }
303+ error_msg = error_messages .get (res , "Unknown error" )
304+ logger .error (
305+ "Failed to connect to MQTT broker {0}:{1} - Error {2}: {3}" .format (
306+ self .broker_url , self .broker_port , res , error_msg
307+ )
308+ )
309+ except OSError as e :
310+ logger .error (
311+ "Network error connecting to broker {0}:{1} (timeout: {2}s) - {3}" .format (
312+ self .broker_url , self .broker_port , self .connection_timeout , str (e )
313+ )
314+ )
315+ raise
316+ except Exception as e :
317+ logger .error (
318+ "Unexpected error connecting to broker {0}:{1} - {2}: {3}" .format (
319+ self .broker_url , self .broker_port , type (e ).__name__ , str (e )
320+ )
321+ )
322+ raise
323+ finally :
324+ # Restore original socket timeout
325+ try :
326+ socket .setdefaulttimeout (default_timeout )
327+ except Exception :
328+ pass
329+
256330 self .client .loop_start ()
257331
258332 def _disconnect (self ) -> None :
0 commit comments