Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion doc/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ Configuration Keys
client will send ping messages to the broker.
Defaults to 60 seconds.

``MQTT_CONNECTION_TIMEOUT`` Socket timeout in seconds for connection
attempts to the MQTT broker. This controls
how long the client will wait when attempting
to establish a connection before timing out.
Defaults to 5 seconds.

``MQTT_TLS_ENABLED`` Enable TLS for the connection to the MQTT broker.
Use the following config keys to configure TLS.

Expand All @@ -63,7 +69,7 @@ Configuration Keys
to be used. This parameter expects an integer
constant from the ``ssl`` module, not a string.
It is recommended to use TLS v1.2 or higher for
security. Defaults to ``ssl.PROTOCOL_TLSv1``.
security. Defaults to ``ssl.PROTOCOL_TLSv1_2``.

Example values:

Expand Down
138 changes: 106 additions & 32 deletions flask_mqtt/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"""

import logging
import socket
import ssl
import sys
from collections import namedtuple
Expand Down Expand Up @@ -99,6 +100,7 @@ def __init__(
self.broker_port: int = 1883
self.tls_enabled: bool = False
self.keepalive: int = 60
self.connection_timeout: int = 5
self.last_will_topic: Optional[str] = None
self.last_will_message: Optional[str] = None
self.last_will_qos: int = 0
Expand All @@ -108,7 +110,7 @@ def __init__(
self.tls_certfile: Optional[str] = None
self.tls_keyfile: Optional[str] = None
self.tls_cert_reqs: int = ssl.CERT_NONE
self.tls_version: int = ssl.PROTOCOL_TLSv1
self.tls_version: int = ssl.PROTOCOL_TLSv1_2
self.tls_ciphers: Optional[List[str]] = None
self.tls_insecure: bool = False

Expand Down Expand Up @@ -169,6 +171,9 @@ def init_app(self, app: Flask, config_prefix: str = "MQTT") -> None:
if config_prefix + "_KEEPALIVE" in app.config:
self.keepalive = app.config[config_prefix + "_KEEPALIVE"]

if config_prefix + "_CONNECTION_TIMEOUT" in app.config:
self.connection_timeout = app.config[config_prefix + "_CONNECTION_TIMEOUT"]

if config_prefix + "_LAST_WILL_TOPIC" in app.config:
self.last_will_topic = app.config[config_prefix + "_LAST_WILL_TOPIC"]

Expand Down Expand Up @@ -201,7 +206,7 @@ def init_app(self, app: Flask, config_prefix: str = "MQTT") -> None:
config_prefix + "_TLS_CERT_REQS", ssl.CERT_REQUIRED
)
self.tls_version = app.config.get(
config_prefix + "_TLS_VERSION", ssl.PROTOCOL_TLSv1
config_prefix + "_TLS_VERSION", ssl.PROTOCOL_TLSv1_2
)

# set last will message
Expand All @@ -219,40 +224,109 @@ def _connect(self) -> None:
if self.username is not None:
self.client.username_pw_set(self.username, self.password)

# security
if self.tls_enabled:
self.client.tls_set(
ca_certs=self.tls_ca_certs,
certfile=self.tls_certfile,
keyfile=self.tls_keyfile,
cert_reqs=self.tls_cert_reqs,
tls_version=self.tls_version,
ciphers=self.tls_ciphers,
)

if self.tls_insecure:
self.client.tls_insecure_set(self.tls_insecure)
# Set socket timeout for connection attempts (paho-mqtt uses this internally)
# This timeout applies during the socket connection phase
default_timeout = None
try:
default_timeout = socket.getdefaulttimeout()
socket.setdefaulttimeout(self.connection_timeout)
except Exception:
pass # Continue if socket timeout setting fails

try:
# security
if self.tls_enabled:
try:
self.client.tls_set(
ca_certs=self.tls_ca_certs,
certfile=self.tls_certfile,
keyfile=self.tls_keyfile,
cert_reqs=self.tls_cert_reqs,
tls_version=self.tls_version,
ciphers=self.tls_ciphers,
)

if self._connect_async:
# if connect_async is used
self.client.connect_async(
self.broker_url, self.broker_port, keepalive=self.keepalive
)
else:
res = self.client.connect(
self.broker_url, self.broker_port, keepalive=self.keepalive
)
if self.tls_insecure:
self.client.tls_insecure_set(self.tls_insecure)
except Exception as e:
logger.error(
"TLS configuration failed for broker {0}:{1} - {2}: {3}".format(
self.broker_url, self.broker_port, type(e).__name__, str(e)
)
)
raise

if res == 0:
logger.debug(
"Connected client '{0}' to broker {1}:{2}".format(
self.client_id, self.broker_url, self.broker_port
if self._connect_async:
# if connect_async is used
try:
self.client.connect_async(
self.broker_url, self.broker_port, keepalive=self.keepalive
)
)
except Exception as e:
logger.error(
"Failed to initiate async connection to broker {0}:{1} - {2}: {3}".format(
self.broker_url, self.broker_port, type(e).__name__, str(e)
)
)
raise
else:
logger.error(
"Could not connect to MQTT Broker, Error Code: {0}".format(res)
)
try:
res = self.client.connect(
self.broker_url, self.broker_port, keepalive=self.keepalive
)

if res == 0:
logger.debug(
"Connected client '{0}' to broker {1}:{2}".format(
self.client_id, self.broker_url, self.broker_port
)
)
else:
error_messages = {
MQTT_ERR_AGAIN: "Resource temporarily unavailable",
MQTT_ERR_NOMEM: "Out of memory",
MQTT_ERR_PROTOCOL: "Protocol error",
MQTT_ERR_INVAL: "Invalid function arguments",
MQTT_ERR_NO_CONN: "No connection to broker",
MQTT_ERR_CONN_REFUSED: "Connection refused by broker",
MQTT_ERR_NOT_FOUND: "Resource not found",
MQTT_ERR_CONN_LOST: "Connection lost",
MQTT_ERR_TLS: "TLS error",
MQTT_ERR_PAYLOAD_SIZE: "Payload size error",
MQTT_ERR_NOT_SUPPORTED: "Operation not supported",
MQTT_ERR_AUTH: "Authentication failed",
MQTT_ERR_ACL_DENIED: "ACL denied",
MQTT_ERR_UNKNOWN: "Unknown error",
MQTT_ERR_ERRNO: "System error",
MQTT_ERR_QUEUE_SIZE: "Queue size exceeded",
}
error_msg = error_messages.get(res, "Unknown error")
logger.error(
"Failed to connect to MQTT broker {0}:{1} - Error {2}: {3}".format(
self.broker_url, self.broker_port, res, error_msg
)
)
except OSError as e:
logger.error(
"Network error connecting to broker {0}:{1} (timeout: {2}s) - {3}".format(
self.broker_url, self.broker_port, self.connection_timeout, str(e)
)
)
raise
except Exception as e:
logger.error(
"Unexpected error connecting to broker {0}:{1} - {2}: {3}".format(
self.broker_url, self.broker_port, type(e).__name__, str(e)
)
)
raise
finally:
# Restore original socket timeout
try:
socket.setdefaulttimeout(default_timeout)
except Exception:
pass

self.client.loop_start()

def _disconnect(self) -> None:
Expand Down