Skip to main content
Hero Light

What is MQTT Gateway?

MQTT Gateway is a lightweight, production-ready Python service designed to bridge MQTT messaging with persistent storage and HTTP endpoints. It provides a flexible, database-driven flow system that allows you to define how incoming MQTT messages should be processed without code changes. The gateway automatically creates database tables, manages MQTT broker connections, and dynamically reloads flows without requiring service restarts.

Key features

Database-driven flows

Define message processing flows in MariaDB without touching code. Enable, disable, or modify flows on the fly.

Dual routing modes

Route messages to MariaDB for persistence or forward them to HTTP endpoints for external processing.

Dynamic subscription management

Flows are reloaded every 10 minutes (configurable). Add or remove MQTT topic subscriptions without restarting.

Schema validation

Define JSON schemas for each flow. Invalid payloads are rejected and logged automatically.

Production ready

Built with SQLAlchemy, automatic database initialization, connection pooling, and daily rotating logs.

Docker support

Ships with a production Dockerfile using Python 3.12 slim base. Easy deployment with environment variables.

How it works

  1. Startup: The gateway connects to your MariaDB database and creates three tables (mqtt_servers, flows, data) if they don’t exist
  2. MQTT connection: It reads the enabled MQTT broker from mqtt_servers and connects using the paho-mqtt client
  3. Flow loading: All enabled flows are loaded from the database, and the gateway subscribes to their MQTT topics
  4. Message processing: When a message arrives, the payload is validated against the flow’s schema and then either:
    • Stored in the data table with one row per attribute (action: STORE_DB)
    • Posted to an HTTP endpoint (action: POST_ENDPOINT)
  5. Hot reload: Every 10 minutes, flows are reloaded and subscriptions are synchronized without disconnecting from MQTT

Architecture

def run() -> None:
    settings = load_settings()
    logger = setup_error_logger(settings.log_dir)

    try:
        engine = build_engine(settings)
        session_factory = build_session_factory(engine)
        initialize_database(engine)
        seed_default_mqtt_server(session_factory)

        mqtt_client = GatewayMqttClient(
            settings=settings,
            session_factory=session_factory,
            logger=logger,
        )
        mqtt_client.run_forever()
    except Exception as exc:
        logger.error("Fatal error during startup: %s", exc)
        raise

Use cases

  • IoT sensor data collection: Collect temperature, humidity, or other sensor readings from MQTT-enabled devices and store them in MariaDB
  • Event forwarding: Route MQTT events to HTTP APIs for processing by downstream services
  • Multi-tenant data ingestion: Use different flows for different devices or tenants, all managed through database configuration
  • Hybrid storage: Store critical data in the database while forwarding copies to external analytics platforms

Next steps

Quick start

Get MQTT Gateway running in under 5 minutes

Installation

Detailed installation instructions for local and Docker deployments

Configuration

Learn about environment variables and database-driven flows

API reference

Explore the database schema and flow configuration options

Build docs developers (and LLMs) love