MQTT Gateway connects to an MQTT broker to receive messages. Broker configuration is stored in the database and can be updated without restarting the service.
Broker connection settings
MQTT broker configurations are stored in the mqtt_servers table in the database. The gateway automatically selects the enabled broker with the highest priority.
Server selection logic
When the gateway starts, it loads the MQTT server using this query:
# From src/mqtt_client.py:25
def load_mqtt_server(self) -> MqttServer:
with self.session_factory() as session:
stmt = (
select(MqttServer)
.where(MqttServer.enabled.is_(True))
.order_by(MqttServer.is_default.desc(), MqttServer.id.asc())
)
server = session.execute(stmt).scalars().first()
if not server:
raise RuntimeError("No enabled MQTT server found in mqtt_servers")
return server
The gateway prioritizes servers where is_default=True, then by lowest id.
Server parameters
MQTT broker hostname or IP address.Example: 192.168.0.137
MQTT broker port number.Common ports:
1883 - Standard MQTT
8883 - MQTT over TLS
Example: 1883
Optional username for MQTT broker authentication.Leave empty if the broker doesn’t require authentication.
Optional password for MQTT broker authentication.Leave empty if the broker doesn’t require authentication.
Whether this MQTT server configuration is active.Set to false to disable without deleting the configuration.
Whether this is the default MQTT server.When multiple enabled servers exist, the gateway uses the one marked as default.
Authentication
If your MQTT broker requires authentication, set both username and password fields:
# From src/mqtt_client.py:107
if server.username and server.password:
self.client.username_pw_set(server.username, server.password)
Both fields must be set for authentication to be used. If either is missing, the gateway connects without credentials.
Connection parameters
The MQTT client connection is configured using environment variables:
MQTT_CLIENT_ID
string
default:"mqtt-gateway"
Connection establishment
The gateway establishes the MQTT connection at startup:
# From src/mqtt_client.py:112
self.client.connect(server.host, server.port, keepalive=self.settings.mqtt_keepalive)
self.client.loop_start()
The connection runs in a background thread, allowing the main thread to handle flow reloading.
Topic subscriptions
The gateway automatically subscribes to MQTT topics based on enabled flows in the database.
Initial subscription
When the connection is established, the gateway subscribes to all topics from enabled flows:
# From src/mqtt_client.py:65
def on_connect(self, client: mqtt.Client, userdata: Any, flags: Any, reason_code: Any, properties: Any) -> None:
if reason_code != 0:
self.logger.error("MQTT connection failed with reason code: %s", reason_code)
return
subscriptions = set(topic for topic, _ in self.flow_topics)
for topic in subscriptions:
client.subscribe(topic)
Dynamic subscription updates
The gateway periodically reloads flows from the database and updates subscriptions:
# From src/mqtt_client.py:55
def reload_flows(self) -> None:
flows = self.load_flows()
new_flow_topics = self._topic_to_flow_ids(flows)
previous_topics = set(topic for topic, _ in self.flow_topics)
current_topics = set(topic for topic, _ in new_flow_topics)
self.flow_topics = new_flow_topics
self._sync_subscriptions(previous_topics, current_topics)
This allows you to add or remove topic subscriptions by updating flows in the database without restarting the gateway.
The reload interval is controlled by the FLOWS_RELOAD_INTERVAL_SECONDS environment variable (default: 600 seconds).
Topic matching
The gateway supports MQTT wildcard patterns in topic subscriptions:
+ - Single-level wildcard (matches one topic level)
# - Multi-level wildcard (matches zero or more topic levels)
Example topics:
sensors/temperature - Exact match
sensors/+/temperature - Matches sensors/kitchen/temperature, sensors/bedroom/temperature
sensors/# - Matches all topics under sensors/
When a message arrives, the gateway matches it against all flow topics:
# From src/mqtt_client.py:84
matched_flow_ids = [
flow_id for sub_topic, flow_id in self.flow_topics if topic_matches_sub(sub_topic, topic)
]
Message processing
Incoming messages must be valid JSON objects:
# From src/mqtt_client.py:74
def on_message(self, client: mqtt.Client, userdata: Any, msg: mqtt.MQTTMessage) -> None:
topic = msg.topic
try:
payload = json.loads(msg.payload.decode("utf-8"))
if not isinstance(payload, dict):
raise ValueError("Payload must be a JSON object")
except (json.JSONDecodeError, UnicodeDecodeError, ValueError) as exc:
self.logger.error("Invalid payload in topic %s: %s", topic, exc)
return
Messages with invalid JSON or non-object payloads are logged and discarded. Ensure your MQTT publishers send valid JSON objects.
Connection error handling
If the connection fails, the error is logged but the gateway continues running:
# From src/mqtt_client.py:66
if reason_code != 0:
self.logger.error("MQTT connection failed with reason code: %s", reason_code)
return
The Paho MQTT client library automatically attempts to reconnect using its built-in reconnection logic.