Constants
Flow action types
The gateway supports two action types for flows, defined insrc/constants.py:1-2:
Configuration
Settings
Configuration dataclass loaded from environment variables. Source:src/config.py:8-26
Database hostname (from
DB_HOST env var)Database port (from
DB_PORT env var)Database name (from
DB_NAME env var)Database username (from
DB_USER env var)Database password (from
DB_PASSWORD env var)Log file directory (from
LOG_DIR env var)Timeout for HTTP POST requests (from
HTTP_TIMEOUT_SECONDS env var)MQTT client identifier (from
MQTT_CLIENT_ID env var)MQTT keepalive interval in seconds (from
MQTT_KEEPALIVE env var)How often to reload flows from database (from
FLOWS_RELOAD_INTERVAL_SECONDS env var)Computed SQLAlchemy connection URL in format:
mysql+pymysql://{user}:{password}@{host}:{port}/{name}load_settings()
Loads configuration from environment variables usingpython-dotenv.
Settings instance
Raises: ValueError if required environment variables are missing
Source: src/config.py:36-49
MQTT Client
GatewayMqttClient
Main client that connects to MQTT broker, subscribes to flow topics, and processes messages. Source:src/mqtt_client.py:17-125
Constructor
Configuration instance from
load_settings()SQLAlchemy session factory (e.g., from
sessionmaker())Logger instance for error and status messages
load_mqtt_server()
Returns the first enabled MQTT server from the database, prioritizing default servers.MqttServer instance
Raises: RuntimeError if no enabled servers exist
Query: SELECT * FROM mqtt_servers WHERE enabled = 1 ORDER BY is_default DESC, id ASC LIMIT 1
Source: src/mqtt_client.py:25-35
load_flows()
Returns all enabled flows from the database.list[Flow]
Query: SELECT * FROM flows WHERE enabled = 1
Source: src/mqtt_client.py:37-40
reload_flows()
Reloads flows from the database and updates MQTT subscriptions dynamically.- Fetches current enabled flows
- Compares with previous subscriptions
- Subscribes to new topics
- Unsubscribes from removed topics
src/mqtt_client.py:55-63
run_forever()
Main loop that connects to MQTT broker and processes messages indefinitely.- Loads MQTT server configuration
- Loads initial flows
- Connects to broker with authentication if configured
- Starts MQTT loop
- Periodically reloads flows every
flows_reload_interval_seconds
src/mqtt_client.py:103-125
on_connect()
Callback triggered when MQTT connection is established. Subscribes to all flow topics. Source:src/mqtt_client.py:65-72
on_message()
Callback triggered when an MQTT message arrives. Validates JSON payload and routes to matching flows. Behavior:- Decodes payload as UTF-8 JSON object
- Matches message topic against flow patterns (supports wildcards)
- Calls
process_flow_message()for each matching flow - Logs errors without crashing
src/mqtt_client.py:74-101
Message Processing
ProcessResult
Dataclass returned byprocess_flow_message() with processing metadata.
Source: src/processor.py:16-21
The flow’s unique code identifier
The incremented message ID after processing
The action that was performed (
STORE_DB or POST_ENDPOINT)process_flow_message()
Processes a single MQTT message according to flow configuration.SQLAlchemy session factory
Configuration instance
Logger for error messages
Database ID of the flow to process
Parsed JSON payload from MQTT message
ProcessResult
Raises: ValueError if payload validation fails or endpoint_url is missing for POST_ENDPOINT
Processing steps:
- Locks the flow row with
SELECT ... FOR UPDATE - Validates payload against
payload_schemausingvalidate_payload() - Increments
last_msg_id - For
STORE_DB: CreatesDataRecordentries for each payload field - For
POST_ENDPOINT: POSTs JSON payload toendpoint_url
src/processor.py:79-135
validate_payload()
Validates that payload matches the expected schema.The message payload to validate
Expected structure with field names and types
ValueError with descriptive message if:
- Required field is missing
- Field has incorrect type
string,str→ Pythonstrnumber,float→ Pythonintorfloatint,integer→ Pythonintbool,boolean→ Pythonboolobject→ Pythondictarray→ Pythonlist
src/processor.py:56-71