Flow structure
Flows are stored in theflows database table and contain the following properties:
| Property | Type | Description |
|---|---|---|
code | string | Unique identifier for the flow (max 100 chars) |
description | string | Human-readable description of the flow’s purpose |
topic | string | MQTT topic pattern to subscribe to |
action | string | Action to perform: STORE_DB or POST_ENDPOINT |
payload_schema | JSON | Schema defining expected payload structure and types |
endpoint_url | string | HTTP endpoint URL (required for POST_ENDPOINT) |
last_msg_id | integer | Counter for processed messages |
enabled | boolean | Whether the flow is active |
The
code field must be unique across all flows and is used to identify records in the database.How flows work
When an MQTT message arrives, the gateway performs these steps:- Topic matching: Compares the message topic against all enabled flows using MQTT topic pattern matching (src/mqtt_client.py:84-86)
- Payload validation: Validates the JSON payload against the flow’s
payload_schema(src/processor.py:92) - Message ID increment: Increments the
last_msg_idcounter atomically (src/processor.py:94-95) - Action execution: Routes the message based on the flow’s
actiontype
Multiple flows can match the same topic. Each matching flow processes the message independently.
Example flows
Here are real examples from the sample data:Temperature and humidity to database
sensors/aht10, validates that messages contain temperature and humidity fields as floats, and stores them in the database.
Sensor data to HTTP endpoint
Flow reloading
The gateway automatically reloads flows from the database at regular intervals without restarting:Enabling and disabling flows
Setenabled = 0 to disable a flow without deleting it: