Skip to main content
Actions determine what happens to validated MQTT messages. MQTT Gateway supports two action types: STORE_DB for persisting data to MariaDB, and POST_ENDPOINT for forwarding messages to HTTP endpoints.

STORE_DB action

The STORE_DB action stores each attribute from the payload as a separate record in the data table.

How it works

When a message is processed with STORE_DB:
  1. The payload is validated against the flow’s schema
  2. The flow’s last_msg_id is incremented
  3. Each key-value pair in the payload creates a new DataRecord
# From src/processor.py:97-108
if db_flow.action == "STORE_DB":
    now = datetime.utcnow()
    for key, value in payload.items():
        session.add(
            DataRecord(
                received_at=now,
                flow_code=db_flow.code,
                attribute_name=key,
                attribute_value=_serialize_value(value),
                last_msg_id=current_msg_id,
            )
        )

Data serialization

Complex types (objects and arrays) are JSON-encoded before storage:
# From src/processor.py:73-76
def _serialize_value(value: Any) -> str:
    if isinstance(value, (dict, list)):
        return json.dumps(value, ensure_ascii=False)
    return str(value)
All values are stored as text in the attribute_value column, regardless of their original type. Complex types are JSON-encoded.

Database schema

Each record in the data table contains:
# From src/models.py:36-46
class DataRecord(Base):
    __tablename__ = "data"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    received_at: Mapped[datetime] = mapped_column(
        DateTime, nullable=False, default=datetime.utcnow
    )
    flow_code: Mapped[str] = mapped_column(String(100), nullable=False)
    attribute_name: Mapped[str] = mapped_column(String(255), nullable=False)
    attribute_value: Mapped[str] = mapped_column(Text, nullable=False)
    last_msg_id: Mapped[int] = mapped_column(Integer, nullable=False)

Example

Given this payload on the AHT10_SENSOR flow:
{
  "temperature": 23.5,
  "humidity": 65.2
}
Two records are created:
received_atflow_codeattribute_nameattribute_valuelast_msg_id
2026-03-03 10:15:00AHT10_SENSORtemperature23.51
2026-03-03 10:15:00AHT10_SENSORhumidity65.21
All attributes from a single message share the same last_msg_id and received_at timestamp, allowing you to reconstruct the original message.

POST_ENDPOINT action

The POST_ENDPOINT action forwards the validated payload to an HTTP endpoint via POST request.

How it works

  1. The payload is validated against the flow’s schema
  2. The flow’s last_msg_id is incremented and committed to the database
  3. The entire payload is sent as JSON to the configured endpoint_url
# From src/processor.py:115-129
if db_flow.action == "POST_ENDPOINT":
    try:
        response = requests.post(
            db_flow.endpoint_url,
            json=payload,
            timeout=settings.http_timeout_seconds,
        )
        response.raise_for_status()
    except requests.RequestException as exc:
        logger.error(
            "Error posting flow %s to endpoint %s: %s",
            db_flow.code,
            db_flow.endpoint_url,
            exc,
        )
The HTTP request happens outside the database transaction. The last_msg_id increment is committed even if the HTTP request fails.

Configuration requirements

Flows with POST_ENDPOINT must have a valid endpoint_url:
# From src/processor.py:109-113
elif db_flow.action == "POST_ENDPOINT":
    if not db_flow.endpoint_url:
        raise ValueError(
            f"Flow '{db_flow.code}' is POST_ENDPOINT but endpoint_url is empty"
        )

Example flow

INSERT INTO flows (
  code,
  description,
  topic,
  action,
  payload_schema,
  endpoint_url,
  last_msg_id,
  enabled
)
VALUES (
  'AHT10_ENDPOINT',
  'Flow para publicar a endpoint HTTP',
  'sensors/aht10/http',
  'POST_ENDPOINT',
  '{"temperature":"float","humidity":"float"}',
  'http://host.docker.internal:8080/iot/data',
  0,
  1
);

Request format

The gateway sends a POST request with:
  • Content-Type: application/json
  • Body: The validated payload as JSON
  • Timeout: Configured via http_timeout_seconds setting

Error handling

HTTP errors are logged but do not stop message processing:
If the HTTP request fails, the error is logged and the message ID is still incremented. The gateway continues processing other messages.

Choosing an action

Use STORE_DB when:
  • You need to query historical data
  • You want to analyze trends over time
  • You need a permanent record in your database
Use POST_ENDPOINT when:
  • You’re integrating with external services
  • You need real-time processing by another system
  • You want to trigger webhooks or notifications

Build docs developers (and LLMs) love