MQTT Gateway uses MariaDB or MySQL to store configuration, message data, and routing rules. The database is automatically initialized on first run.
Database requirements
MQTT Gateway requires:
- MariaDB or MySQL database server
- Database created and accessible
- User credentials with full access to the database
Connection configuration
Configure the database connection using environment variables in your .env file:
DB_HOST=192.168.0.137
DB_PORT=3306
DB_NAME=db
DB_USER=demo
DB_PASSWORD=demo
See environment variables for detailed parameter descriptions.
Database schema
The gateway uses SQLAlchemy ORM to manage the database schema. On startup, it automatically creates the required tables if they don’t exist.
Tables created
mqtt_servers
Stores MQTT broker connection information.
| Column | Type | Description |
|---|
id | Integer | Primary key, auto-increment |
host | String(255) | MQTT broker hostname or IP address |
port | Integer | MQTT broker port (default: 1883) |
username | String(255) | Optional authentication username |
password | String(255) | Optional authentication password |
enabled | Boolean | Whether this server is active |
is_default | Boolean | Whether this is the default server |
flows
Defines message routing rules from MQTT topics to destinations.
| Column | Type | Description |
|---|
id | Integer | Primary key, auto-increment |
code | String(100) | Unique flow identifier |
description | String(255) | Human-readable description |
topic | String(255) | MQTT topic pattern to subscribe to |
action | String(30) | Action type (e.g., “database”, “http”) |
payload_schema | JSON | JSON schema for payload validation |
endpoint_url | String(500) | HTTP endpoint URL (for HTTP actions) |
last_msg_id | Integer | Last processed message ID |
enabled | Boolean | Whether this flow is active |
data
Stores received MQTT message data when using database action.
| Column | Type | Description |
|---|
id | Integer | Primary key, auto-increment |
received_at | DateTime | Timestamp when message was received |
flow_code | String(100) | Flow identifier that processed this message |
attribute_name | String(255) | Name of the data attribute |
attribute_value | Text | Value of the data attribute |
last_msg_id | Integer | Message ID from payload |
Automatic initialization
The database schema is automatically created when you start the gateway for the first time:
# From src/db.py:16
def initialize_database(engine) -> None:
Base.metadata.create_all(engine)
This creates all tables defined in the SQLAlchemy models.
Default MQTT server seeding
On first run, if no enabled MQTT server exists in the database, a default server is automatically created:
# From src/db.py:20
def seed_default_mqtt_server(session_factory) -> None:
with session_factory() as session:
stmt = select(MqttServer).where(MqttServer.enabled.is_(True))
existing = session.execute(stmt).scalar_one_or_none()
if existing:
return
session.add(
MqttServer(
host="192.168.0.137",
port=1883,
enabled=True,
is_default=True,
)
)
session.commit()
Update the default MQTT server configuration in your database after first run to match your actual MQTT broker settings.
Connection pooling
The gateway uses SQLAlchemy’s connection pooling with the pool_pre_ping option enabled:
# From src/db.py:8
def build_engine(settings: Settings):
return create_engine(settings.sqlalchemy_url, pool_pre_ping=True)
This ensures that stale database connections are automatically refreshed, improving reliability for long-running processes.
Session management
Database sessions are created using a session factory with these settings:
# From src/db.py:12
def build_session_factory(engine):
return sessionmaker(bind=engine, expire_on_commit=False, class_=Session)
The expire_on_commit=False setting prevents objects from being expired after commit, which is useful when passing objects between different parts of the application.