Skip to main content
This page documents the main Python components you can use when extending or integrating with the gateway.

Constants

Flow action types

The gateway supports two action types for flows, defined in src/constants.py:1-2:
FLOW_ACTION_STORE_DB = "STORE_DB"
FLOW_ACTION_POST_ENDPOINT = "POST_ENDPOINT"
Use these constants when creating or querying flows programmatically to ensure consistency.

Configuration

Settings

Configuration dataclass loaded from environment variables. Source: src/config.py:8-26
db_host
string
required
Database hostname (from DB_HOST env var)
db_port
integer
default:"3306"
Database port (from DB_PORT env var)
db_name
string
required
Database name (from DB_NAME env var)
db_user
string
required
Database username (from DB_USER env var)
db_password
string
required
Database password (from DB_PASSWORD env var)
log_dir
string
default:"./log"
Log file directory (from LOG_DIR env var)
http_timeout_seconds
integer
default:"10"
Timeout for HTTP POST requests (from HTTP_TIMEOUT_SECONDS env var)
mqtt_client_id
string
default:"mqtt-gateway"
MQTT client identifier (from MQTT_CLIENT_ID env var)
mqtt_keepalive
integer
default:"60"
MQTT keepalive interval in seconds (from MQTT_KEEPALIVE env var)
flows_reload_interval_seconds
integer
default:"600"
How often to reload flows from database (from FLOWS_RELOAD_INTERVAL_SECONDS env var)
Property:
sqlalchemy_url
string
Computed SQLAlchemy connection URL in format: mysql+pymysql://{user}:{password}@{host}:{port}/{name}

load_settings()

Loads configuration from environment variables using python-dotenv.
from src.config import load_settings

settings = load_settings()
print(settings.db_host)
Returns: Settings instance Raises: ValueError if required environment variables are missing Source: src/config.py:36-49

MQTT Client

GatewayMqttClient

Main client that connects to MQTT broker, subscribes to flow topics, and processes messages. Source: src/mqtt_client.py:17-125

Constructor

GatewayMqttClient(settings: Settings, session_factory, logger: logging.Logger)
settings
Settings
required
Configuration instance from load_settings()
session_factory
required
SQLAlchemy session factory (e.g., from sessionmaker())
logger
logging.Logger
required
Logger instance for error and status messages

load_mqtt_server()

Returns the first enabled MQTT server from the database, prioritizing default servers.
server = client.load_mqtt_server()
Returns: MqttServer instance Raises: RuntimeError if no enabled servers exist Query: SELECT * FROM mqtt_servers WHERE enabled = 1 ORDER BY is_default DESC, id ASC LIMIT 1 Source: src/mqtt_client.py:25-35

load_flows()

Returns all enabled flows from the database.
flows = client.load_flows()
Returns: list[Flow] Query: SELECT * FROM flows WHERE enabled = 1 Source: src/mqtt_client.py:37-40

reload_flows()

Reloads flows from the database and updates MQTT subscriptions dynamically.
client.reload_flows()
This method:
  1. Fetches current enabled flows
  2. Compares with previous subscriptions
  3. Subscribes to new topics
  4. Unsubscribes from removed topics
Source: src/mqtt_client.py:55-63

run_forever()

Main loop that connects to MQTT broker and processes messages indefinitely.
client.run_forever()
This method:
  1. Loads MQTT server configuration
  2. Loads initial flows
  3. Connects to broker with authentication if configured
  4. Starts MQTT loop
  5. Periodically reloads flows every flows_reload_interval_seconds
Blocks until interrupted Source: src/mqtt_client.py:103-125

on_connect()

Callback triggered when MQTT connection is established. Subscribes to all flow topics. Source: src/mqtt_client.py:65-72

on_message()

Callback triggered when an MQTT message arrives. Validates JSON payload and routes to matching flows. Behavior:
  • Decodes payload as UTF-8 JSON object
  • Matches message topic against flow patterns (supports wildcards)
  • Calls process_flow_message() for each matching flow
  • Logs errors without crashing
Source: src/mqtt_client.py:74-101

Message Processing

ProcessResult

Dataclass returned by process_flow_message() with processing metadata. Source: src/processor.py:16-21
flow_code
string
The flow’s unique code identifier
msg_id
integer
The incremented message ID after processing
action
string
The action that was performed (STORE_DB or POST_ENDPOINT)

process_flow_message()

Processes a single MQTT message according to flow configuration.
from src.processor import process_flow_message

result = process_flow_message(
    session_factory=session_factory,
    settings=settings,
    logger=logger,
    flow_id=1,
    payload={"temperature": 23.5, "humidity": 65.2}
)
session_factory
required
SQLAlchemy session factory
settings
Settings
required
Configuration instance
logger
logging.Logger
required
Logger for error messages
flow_id
integer
required
Database ID of the flow to process
payload
dict[str, Any]
required
Parsed JSON payload from MQTT message
Returns: ProcessResult Raises: ValueError if payload validation fails or endpoint_url is missing for POST_ENDPOINT Processing steps:
  1. Locks the flow row with SELECT ... FOR UPDATE
  2. Validates payload against payload_schema using validate_payload()
  3. Increments last_msg_id
  4. For STORE_DB: Creates DataRecord entries for each payload field
  5. For POST_ENDPOINT: POSTs JSON payload to endpoint_url
Source: src/processor.py:79-135

validate_payload()

Validates that payload matches the expected schema.
from src.processor import validate_payload

validate_payload(
    payload={"temperature": 23.5, "humidity": 65.2},
    payload_schema={"temperature": "float", "humidity": "float"}
)
payload
dict[str, Any]
required
The message payload to validate
payload_schema
dict[str, Any]
required
Expected structure with field names and types
Raises: ValueError with descriptive message if:
  • Required field is missing
  • Field has incorrect type
Supported type mappings:
  • string, str → Python str
  • number, float → Python int or float
  • int, integer → Python int
  • bool, boolean → Python bool
  • object → Python dict
  • array → Python list
Source: src/processor.py:56-71

Build docs developers (and LLMs) love