Skip to main content
A flow defines how MQTT Gateway processes messages from a specific MQTT topic. Each flow specifies the topic to listen to, how to validate incoming payloads, and where to route the data.

Flow structure

Flows are stored in the flows database table and contain the following properties:
PropertyTypeDescription
codestringUnique identifier for the flow (max 100 chars)
descriptionstringHuman-readable description of the flow’s purpose
topicstringMQTT topic pattern to subscribe to
actionstringAction to perform: STORE_DB or POST_ENDPOINT
payload_schemaJSONSchema defining expected payload structure and types
endpoint_urlstringHTTP endpoint URL (required for POST_ENDPOINT)
last_msg_idintegerCounter for processed messages
enabledbooleanWhether the flow is active
The code field must be unique across all flows and is used to identify records in the database.

How flows work

When an MQTT message arrives, the gateway performs these steps:
  1. Topic matching: Compares the message topic against all enabled flows using MQTT topic pattern matching (src/mqtt_client.py:84-86)
  2. Payload validation: Validates the JSON payload against the flow’s payload_schema (src/processor.py:92)
  3. Message ID increment: Increments the last_msg_id counter atomically (src/processor.py:94-95)
  4. Action execution: Routes the message based on the flow’s action type
# From src/mqtt_client.py:84-86
matched_flow_ids = [
    flow_id for sub_topic, flow_id in self.flow_topics if topic_matches_sub(sub_topic, topic)
]
Multiple flows can match the same topic. Each matching flow processes the message independently.

Example flows

Here are real examples from the sample data:

Temperature and humidity to database

INSERT INTO flows (
  code,
  description,
  topic,
  action,
  payload_schema,
  endpoint_url,
  last_msg_id,
  enabled
)
VALUES (
  'AHT10_SENSOR',
  'Flow sensor de temperatura y humedad',
  'sensors/aht10',
  'STORE_DB',
  '{"temperature":"float","humidity":"float"}',
  NULL,
  0,
  1
);
This flow listens to sensors/aht10, validates that messages contain temperature and humidity fields as floats, and stores them in the database.

Sensor data to HTTP endpoint

INSERT INTO flows (
  code,
  description,
  topic,
  action,
  payload_schema,
  endpoint_url,
  last_msg_id,
  enabled
)
VALUES (
  'AHT10_ENDPOINT',
  'Flow para publicar a endpoint HTTP',
  'sensors/aht10/http',
  'POST_ENDPOINT',
  '{"temperature":"float","humidity":"float"}',
  'http://host.docker.internal:8080/iot/data',
  0,
  1
);
This flow forwards validated messages to an HTTP endpoint via POST request.

Flow reloading

The gateway automatically reloads flows from the database at regular intervals without restarting:
# From src/mqtt_client.py:115-122
reload_interval_seconds = max(1, self.settings.flows_reload_interval_seconds)
try:
    while True:
        time.sleep(reload_interval_seconds)
        try:
            self.reload_flows()
        except Exception as exc:
            self.logger.error("Error reloading flows: %s", exc)
When flows are reloaded, the gateway automatically subscribes to new topics and unsubscribes from removed ones. Changes to existing flows take effect on the next message.

Enabling and disabling flows

Set enabled = 0 to disable a flow without deleting it:
UPDATE flows SET enabled = 0 WHERE code = 'AHT10_SENSOR';
Disabled flows are excluded from the next reload cycle (src/mqtt_client.py:38-40):
# From src/mqtt_client.py:38-40
def load_flows(self) -> list[Flow]:
    with self.session_factory() as session:
        stmt = select(Flow).where(Flow.enabled.is_(True))
        return list(session.execute(stmt).scalars().all())

Build docs developers (and LLMs) love