Skip to main content

Local installation

Requirements

  • Python: 3.12 or higher
  • MariaDB/MySQL: 5.7+ or MariaDB 10.3+
  • MQTT Broker: Mosquitto, EMQX, or any MQTT 3.1.1 compatible broker

Step-by-step setup

1

Install Python dependencies

Create a virtual environment to isolate dependencies:
python -m venv .venv
source .venv/bin/activate  # On Windows: .venv\Scripts\activate
Install required packages:
pip install -r requirements.txt
SQLAlchemy>=2.0.0
PyMySQL>=1.1.0
paho-mqtt>=2.1.0
requests>=2.32.0
python-dotenv>=1.0.1
2

Configure environment variables

Copy the example environment file:
cp .env.example .env
Edit .env to match your infrastructure:
.env
# Database connection
DB_HOST=192.168.0.137
DB_PORT=3306
DB_NAME=db
DB_USER=demo
DB_PASSWORD=demo

# Logging
LOG_DIR=./log

# HTTP settings for POST_ENDPOINT flows
HTTP_TIMEOUT_SECONDS=10

# MQTT client configuration
MQTT_CLIENT_ID=mqtt-gateway
MQTT_KEEPALIVE=60

# Flow reload interval (seconds)
FLOWS_RELOAD_INTERVAL_SECONDS=600

Environment variable reference

VariableRequiredDefaultDescription
DB_HOSTYes-MariaDB/MySQL host address
DB_PORTNo3306Database port
DB_NAMEYes-Database name
DB_USERYes-Database username
DB_PASSWORDYes-Database password
LOG_DIRNo./logDirectory for error logs (created automatically)
HTTP_TIMEOUT_SECONDSNo10Timeout for POST_ENDPOINT HTTP requests
MQTT_CLIENT_IDNomqtt-gatewayMQTT client identifier
MQTT_KEEPALIVENo60MQTT keepalive interval (seconds)
FLOWS_RELOAD_INTERVAL_SECONDSNo600How often to reload flows from database
3

Initialize the database

The gateway automatically creates tables on first run, but you can verify your database is accessible:
mysql -h 192.168.0.137 -P 3306 -u demo -p db
Tables created automatically:
  • mqtt_servers: MQTT broker configuration
  • flows: Flow definitions (topics, actions, schemas)
  • data: Stored message attributes (for STORE_DB flows)
The gateway uses SQLAlchemy’s create_all() which is idempotent. Running it multiple times is safe.
4

Start the service

Run the gateway:
python app.py
The entry point is simple:
app.py
from src.main import run

if __name__ == "__main__":
    run()
On startup, you’ll see:
  1. Database connection established (see src/config.py:22-26 for SQLAlchemy URL construction)
  2. Tables created if missing
  3. Default MQTT broker seeded if none exists (see src/db.py:20-35)
  4. Flows loaded and subscriptions established
  5. Message processing begins

Docker installation

Build the image

The included Dockerfile uses Python 3.12 slim as the base:
docker build -t mqtt-gateway:latest .
FROM python:3.12-slim

ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

WORKDIR /app

RUN addgroup --system app && adduser --system --ingroup app app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["python", "app.py"]

Run the container

Run the gateway with environment variables:
docker run --rm \
  -e DB_HOST=192.168.0.137 \
  -e DB_PORT=3306 \
  -e DB_NAME=db \
  -e DB_USER=demo \
  -e DB_PASSWORD=demo \
  -e LOG_DIR=/app/log \
  -v "$(pwd)/log:/app/log" \
  --name mqtt-gateway \
  mqtt-gateway:latest

Volume mounts

Host pathContainer pathPurpose
$(pwd)/log/app/logPersist error logs outside the container
Logs are written in daily rotating files with format YYYY-MM-DD.log (e.g., 2026-03-03.log).

Docker networking considerations

If your MariaDB or MQTT broker is running on the host machine, use host.docker.internal instead of localhost in environment variables (on Docker Desktop for Mac/Windows).On Linux, use --network host or configure a bridge network.
Example for host-based services:
docker run --rm \
  -e DB_HOST=host.docker.internal \
  -e DB_PORT=3306 \
  -e DB_NAME=db \
  -e DB_USER=demo \
  -e DB_PASSWORD=demo \
  -v "$(pwd)/log:/app/log" \
  --add-host=host.docker.internal:host-gateway \
  mqtt-gateway:latest

Database schema

The gateway uses three tables, all created automatically:

mqtt_servers table

Stores MQTT broker connection details.
src/models.py
class MqttServer(Base):
    __tablename__ = "mqtt_servers"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    host: Mapped[str] = mapped_column(String(255), nullable=False)
    port: Mapped[int] = mapped_column(Integer, nullable=False, default=1883)
    username: Mapped[str | None] = mapped_column(String(255), nullable=True)
    password: Mapped[str | None] = mapped_column(String(255), nullable=True)
    enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
    is_default: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
On first run, a default broker at 192.168.0.137:1883 is inserted if no enabled broker exists (see src/db.py:20-35).

flows table

Defines message processing flows.
src/models.py
class Flow(Base):
    __tablename__ = "flows"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    code: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
    description: Mapped[str] = mapped_column(String(255), nullable=False)
    topic: Mapped[str] = mapped_column(String(255), nullable=False)
    action: Mapped[str] = mapped_column(String(30), nullable=False)
    payload_schema: Mapped[dict] = mapped_column(JSON, nullable=False)
    endpoint_url: Mapped[str | None] = mapped_column(String(500), nullable=True)
    last_msg_id: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
    enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
Actions:
  • STORE_DB: Store payload attributes in the data table
  • POST_ENDPOINT: Forward payload to endpoint_url via HTTP POST

data table

Stores message attributes for STORE_DB flows.
src/models.py
class DataRecord(Base):
    __tablename__ = "data"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    received_at: Mapped[datetime] = mapped_column(
        DateTime, nullable=False, default=datetime.utcnow
    )
    flow_code: Mapped[str] = mapped_column(String(100), nullable=False)
    attribute_name: Mapped[str] = mapped_column(String(255), nullable=False)
    attribute_value: Mapped[str] = mapped_column(Text, nullable=False)
    last_msg_id: Mapped[int] = mapped_column(Integer, nullable=False)
Each message creates multiple rows—one per attribute. Group by last_msg_id to reconstruct the original message.

Logging

Error logs are written to the directory specified by LOG_DIR (default: ./log).
  • Format: YYYY-MM-DD.log (e.g., 2026-03-03.log)
  • Rotation: New file created daily at midnight UTC
  • Mode: Append mode during the day
Logs include:
  • MQTT connection failures (see src/mqtt_client.py:66-68)
  • Payload validation errors (see src/mqtt_client.py:80-82)
  • HTTP POST failures for POST_ENDPOINT flows (see src/processor.py:124-129)
  • Flow reload errors (see src/mqtt_client.py:122)

Health checks

The gateway does not expose an HTTP health endpoint by default. To monitor the service:
  1. Process monitoring: Check if python app.py is running
  2. Database queries: Verify last_msg_id is incrementing in the flows table
  3. Log monitoring: Watch for errors in LOG_DIR

Troubleshooting

Symptoms: Service exits with Fatal error during startup in logsSolutions:
  • Verify DB_HOST is accessible: ping 192.168.0.137
  • Test database connectivity: mysql -h DB_HOST -P DB_PORT -u DB_USER -p
  • Check firewall rules allow connections to port 3306
  • In Docker, ensure network mode allows access to host services
Symptoms: MQTT connection failed with reason code in logsSolutions:
  • Verify an enabled MQTT server exists: SELECT * FROM mqtt_servers WHERE enabled = 1
  • Test broker connectivity: telnet 192.168.0.137 1883
  • Check username and password in mqtt_servers if authentication is required
  • Verify the broker supports MQTT 3.1.1 (paho-mqtt compatibility)
Symptoms: Messages published but no data in data tableSolutions:
  • Verify flows are enabled: SELECT * FROM flows WHERE enabled = 1
  • Check topic patterns match: The gateway uses paho.mqtt.client.topic_matches_sub() for wildcard matching
  • Validate payload structure matches payload_schema
  • Check error logs for validation failures
Symptoms: Errors like Error posting flow X to endpoint Y in logsSolutions:
  • Verify endpoint_url is not NULL in the flow
  • Test the endpoint manually: curl -X POST http://endpoint -H "Content-Type: application/json" -d '{"test": "data"}'
  • Increase HTTP_TIMEOUT_SECONDS if the endpoint is slow
  • Check network connectivity from the gateway to the endpoint
Symptoms: New flows added to database but not subscribedSolutions:
  • Wait for FLOWS_RELOAD_INTERVAL_SECONDS (default: 600 seconds / 10 minutes)
  • Restart the service to force immediate reload
  • Check logs for Error reloading flows messages

Next steps

Configuration

Learn how to configure flows, schemas, and MQTT servers

API reference

Explore the database schema and available flow actions

Build docs developers (and LLMs) love