Skip to main content
MQTT Gateway is a long-running service that processes MQTT messages continuously. Monitoring its health requires checking logs and database state.

Database connectivity

The application uses SQLAlchemy with pool_pre_ping=True to maintain database connections (src/db.py:9):
return create_engine(settings.sqlalchemy_url, pool_pre_ping=True)
This ensures that stale connections are detected and replaced automatically.

Connection string format

The database connection is built from environment variables (src/config.py:22-26):
mysql+pymysql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}

MQTT connection health

Connection configuration

The MQTT client connects using settings from the database and environment variables (src/mqtt_client.py:112):
  • Host and Port: Retrieved from the mqtt_servers table (first enabled server)
  • Client ID: MQTT_CLIENT_ID environment variable (default: mqtt-gateway)
  • Keepalive: MQTT_KEEPALIVE environment variable (default: 60 seconds)

Connection monitoring

Check for connection errors in the log files. Failed connections are logged with the reason code (src/mqtt_client.py:67):
ERROR | mqtt_gateway | MQTT connection failed with reason code: 5
Common MQTT reason codes:
  • 0: Connection successful
  • 1: Incorrect protocol version
  • 2: Invalid client identifier
  • 3: Server unavailable
  • 4: Bad username or password
  • 5: Not authorized

Flow reload interval

The service automatically reloads flows from the database at regular intervals (src/mqtt_client.py:115-122):
reload_interval_seconds = max(1, self.settings.flows_reload_interval_seconds)
while True:
    time.sleep(reload_interval_seconds)
    try:
        self.reload_flows()
    except Exception as exc:
        self.logger.error("Error reloading flows: %s", exc)
Configure the interval via environment variable:
FLOWS_RELOAD_INTERVAL_SECONDS=600  # 10 minutes (default)
Changes to flows in the database are picked up automatically without restarting the service. The minimum reload interval is 1 second.

Message processing metrics

Message counter

Each flow maintains a last_msg_id counter that increments with every processed message (src/processor.py:94):
db_flow.last_msg_id += 1
current_msg_id = db_flow.last_msg_id
Monitor this field in the flows table to verify messages are being processed.

Database records

For STORE_DB flows, check the data table for recent records:
SELECT flow_code, COUNT(*) as message_count, MAX(received_at) as last_received
FROM data
GROUP BY flow_code;

HTTP endpoint monitoring

Timeout configuration

HTTP POST requests to endpoints have a configurable timeout (src/processor.py:120):
HTTP_TIMEOUT_SECONDS=10  # Default timeout

Endpoint failures

Failed HTTP requests are logged but don’t stop message processing (src/processor.py:124-129). Monitor logs for:
ERROR | mqtt_gateway | Error posting flow {code} to endpoint {url}: {error}
HTTP endpoint failures are logged but the message’s last_msg_id is still incremented. The message will not be retried automatically.

Checking service status

Startup sequence

On startup, the service performs these initialization steps (src/main.py:14-25):
  1. Load configuration from environment variables
  2. Connect to MariaDB database
  3. Create tables if they don’t exist
  4. Seed default MQTT server if none exists
  5. Load enabled flows from database
  6. Connect to MQTT broker
  7. Subscribe to flow topics
Any errors during startup are logged and cause the process to exit with code 1 (src/main.py:34).

Runtime health indicators

  • Log files: Check for recent ERROR entries
  • Database connectivity: Query the flows table
  • MQTT subscriptions: Verify messages are incrementing last_msg_id
  • Process running: Check if the Python process is active

Docker monitoring

When running in Docker, monitor the container:
# Check if container is running
docker ps | grep mqtt-gateway

# View recent logs
docker logs mqtt-gateway --tail 50

# Check container health
docker inspect mqtt-gateway

Build docs developers (and LLMs) love