Skip to main content
MQTT Gateway uses MariaDB or MySQL to store configuration, message data, and routing rules. The database is automatically initialized on first run.

Database requirements

MQTT Gateway requires:
  • MariaDB or MySQL database server
  • Database created and accessible
  • User credentials with full access to the database

Connection configuration

Configure the database connection using environment variables in your .env file:
DB_HOST=192.168.0.137
DB_PORT=3306
DB_NAME=db
DB_USER=demo
DB_PASSWORD=demo
See environment variables for detailed parameter descriptions.

Database schema

The gateway uses SQLAlchemy ORM to manage the database schema. On startup, it automatically creates the required tables if they don’t exist.

Tables created

mqtt_servers

Stores MQTT broker connection information.
ColumnTypeDescription
idIntegerPrimary key, auto-increment
hostString(255)MQTT broker hostname or IP address
portIntegerMQTT broker port (default: 1883)
usernameString(255)Optional authentication username
passwordString(255)Optional authentication password
enabledBooleanWhether this server is active
is_defaultBooleanWhether this is the default server

flows

Defines message routing rules from MQTT topics to destinations.
ColumnTypeDescription
idIntegerPrimary key, auto-increment
codeString(100)Unique flow identifier
descriptionString(255)Human-readable description
topicString(255)MQTT topic pattern to subscribe to
actionString(30)Action type (e.g., “database”, “http”)
payload_schemaJSONJSON schema for payload validation
endpoint_urlString(500)HTTP endpoint URL (for HTTP actions)
last_msg_idIntegerLast processed message ID
enabledBooleanWhether this flow is active

data

Stores received MQTT message data when using database action.
ColumnTypeDescription
idIntegerPrimary key, auto-increment
received_atDateTimeTimestamp when message was received
flow_codeString(100)Flow identifier that processed this message
attribute_nameString(255)Name of the data attribute
attribute_valueTextValue of the data attribute
last_msg_idIntegerMessage ID from payload

Automatic initialization

The database schema is automatically created when you start the gateway for the first time:
# From src/db.py:16
def initialize_database(engine) -> None:
    Base.metadata.create_all(engine)
This creates all tables defined in the SQLAlchemy models.

Default MQTT server seeding

On first run, if no enabled MQTT server exists in the database, a default server is automatically created:
# From src/db.py:20
def seed_default_mqtt_server(session_factory) -> None:
    with session_factory() as session:
        stmt = select(MqttServer).where(MqttServer.enabled.is_(True))
        existing = session.execute(stmt).scalar_one_or_none()
        if existing:
            return

        session.add(
            MqttServer(
                host="192.168.0.137",
                port=1883,
                enabled=True,
                is_default=True,
            )
        )
        session.commit()
Update the default MQTT server configuration in your database after first run to match your actual MQTT broker settings.

Connection pooling

The gateway uses SQLAlchemy’s connection pooling with the pool_pre_ping option enabled:
# From src/db.py:8
def build_engine(settings: Settings):
    return create_engine(settings.sqlalchemy_url, pool_pre_ping=True)
This ensures that stale database connections are automatically refreshed, improving reliability for long-running processes.

Session management

Database sessions are created using a session factory with these settings:
# From src/db.py:12
def build_session_factory(engine):
    return sessionmaker(bind=engine, expire_on_commit=False, class_=Session)
The expire_on_commit=False setting prevents objects from being expired after commit, which is useful when passing objects between different parts of the application.

Build docs developers (and LLMs) love