Skip to main content
These examples are taken directly from scripts/sample_data.sql and demonstrate common flow patterns.

Temperature and humidity to database

Stores sensor readings in the database for historical tracking.
INSERT INTO flows (
  code,
  description,
  topic,
  action,
  payload_schema,
  endpoint_url,
  last_msg_id,
  enabled
)
VALUES (
  'AHT10_SENSOR',
  'Flow sensor de temperatura y humedad',
  'sensors/aht10',
  'STORE_DB',
  '{"temperature":"float","humidity":"float"}',
  NULL,
  0,
  1
);
How it works:
  • Subscribes to the sensors/aht10 topic
  • Validates that incoming messages have temperature and humidity fields (both floats)
  • Stores each field as a separate record in the data table
  • No HTTP endpoint required (action is STORE_DB)
Expected payload:
{
  "temperature": 23.5,
  "humidity": 65.2
}
Source: scripts/sample_data.sql:11-20

Generic temperature/humidity database flow

Similar to the AHT10 example but using a more generic topic pattern.
INSERT INTO flows (
  code,
  description,
  topic,
  action,
  payload_schema,
  endpoint_url,
  last_msg_id,
  enabled
)
VALUES (
  'TEMP_HUM_DB',
  'Flow de temperatura/humedad a base de datos',
  'sensor/temperatura_humedad',
  'STORE_DB',
  '{"temperature":"float","humidity":"float"}',
  NULL,
  0,
  1
);
Use case: Multiple different sensor types can publish to the same topic, and this flow will store all readings with the same schema. Source: scripts/sample_data.sql:32-41

HTTP endpoint forwarding

Forwards sensor data to an external HTTP service instead of storing in the database.
INSERT INTO flows (
  code,
  description,
  topic,
  action,
  payload_schema,
  endpoint_url,
  last_msg_id,
  enabled
)
VALUES (
  'AHT10_ENDPOINT',
  'Flow para publicar a endpoint HTTP',
  'sensors/aht10/http',
  'POST_ENDPOINT',
  '{"temperature":"float","humidity":"float"}',
  'http://host.docker.internal:8080/iot/data',
  0,
  1
);
How it works:
  • Subscribes to sensors/aht10/http
  • Validates the same temperature/humidity schema
  • POSTs the entire payload as JSON to the configured endpoint
  • The gateway increments last_msg_id but does NOT store data in the database
HTTP request sent:
POST http://host.docker.internal:8080/iot/data
Content-Type: application/json

{"temperature": 23.5, "humidity": 65.2}
Note: The endpoint URL host.docker.internal is used when the gateway runs in Docker and needs to reach services on the host machine. Source: scripts/sample_data.sql:53-62

Flow processing behavior

All flows share these common behaviors implemented in src/processor.py:79-135:
  1. Payload validation - The validate_payload() function checks that all fields in payload_schema exist and have the correct type
  2. Message ID increment - Every processed message increments the flow’s last_msg_id counter
  3. Database transaction - Flow updates and data storage happen atomically with row-level locking (with_for_update())
  4. Error handling - Invalid payloads raise ValueError, HTTP errors are logged but don’t fail the transaction

Build docs developers (and LLMs) love