Skip to main content
Payload schemas define the expected structure and data types for MQTT messages. The gateway validates every incoming message against its flow’s schema before processing, ensuring data integrity.

Schema format

Schemas are stored as JSON in the payload_schema column of the flows table. They define expected attributes and their types:
{
  "temperature": "float",
  "humidity": "float"
}

Supported formats

The gateway supports two schema formats: Simple format (recommended):
{
  "temperature": "float",
  "humidity": "float",
  "sensor_id": "string"
}
JSON Schema format:
{
  "properties": {
    "temperature": { "type": "float" },
    "humidity": { "type": "float" },
    "sensor_id": { "type": "string" }
  }
}
Both formats are equivalent. The validation logic extracts types from either format:
# From src/processor.py:37-53
def _extract_expected_types(payload_schema: dict[str, Any]) -> dict[str, Any]:
    if "properties" in payload_schema and isinstance(payload_schema["properties"], dict):
        out: dict[str, Any] = {}
        for key, value in payload_schema["properties"].items():
            if isinstance(value, dict) and "type" in value:
                out[key] = value["type"]
            elif isinstance(value, str):
                out[key] = value
        return out

    out = {}
    for key, value in payload_schema.items():
        if isinstance(value, str):
            out[key] = value
        elif isinstance(value, dict) and "type" in value:
            out[key] = value["type"]
    return out

Supported types

The gateway validates against these Python types:
Schema typePython typeExample values
string, strstr"sensor_01", "active"
number, floatint, float23.5, 100, -15.2
int, integerint42, -10, 0
bool, booleanbooltrue, false
objectdict{"nested": "value"}
arraylist[1, 2, 3], ["a", "b"]
# From src/processor.py:23-34
_TYPE_MAP = {
    "string": str,
    "str": str,
    "number": (int, float),
    "float": (int, float),
    "int": int,
    "integer": int,
    "bool": bool,
    "boolean": bool,
    "object": dict,
    "array": list,
}
The number and float types accept both integers and floats. Use int or integer if you need to enforce whole numbers only.

Validation process

When a message arrives, the gateway validates it before processing:
# From src/processor.py:56-70
def validate_payload(payload: dict[str, Any], payload_schema: dict[str, Any]) -> None:
    expected_types = _extract_expected_types(payload_schema)
    for key, expected_type_name in expected_types.items():
        if key not in payload:
            raise ValueError(f"Missing attribute '{key}' in payload")

        expected_python_type = _TYPE_MAP.get(str(expected_type_name).lower())
        if expected_python_type is None:
            continue

        if not isinstance(payload[key], expected_python_type):
            actual_type = type(payload[key]).__name__
            raise ValueError(
                f"Invalid type for '{key}'. Expected '{expected_type_name}', got '{actual_type}'"
            )

Validation rules

  1. All schema attributes are required: Every attribute defined in the schema must be present in the payload
  2. Type checking is strict: Values must match the expected Python type exactly
  3. Extra attributes are allowed: Payloads can contain additional attributes not in the schema
  4. Unknown types are ignored: If a schema type isn’t in _TYPE_MAP, validation for that attribute is skipped
If validation fails, the message is rejected and an error is logged. The flow’s last_msg_id is not incremented.

Validation examples

Valid payload

Schema:
{"temperature": "float", "humidity": "float"}
Payload:
{"temperature": 23.5, "humidity": 65.2}
Result: ✓ Valid

Valid with extra attributes

Payload:
{"temperature": 23.5, "humidity": 65.2, "sensor_id": "AHT10_01"}
Result: ✓ Valid (extra attributes are allowed)

Missing required attribute

Payload:
{"temperature": 23.5}
Result: ✗ Error: Missing attribute 'humidity' in payload

Type mismatch

Payload:
{"temperature": "23.5", "humidity": 65.2}
Result: ✗ Error: Invalid type for 'temperature'. Expected 'float', got 'str'

Complex types

Schemas can define object and array types:
{
  "sensor_data": "object",
  "readings": "array"
}
Valid payload:
{
  "sensor_data": {
    "model": "AHT10",
    "version": "1.0"
  },
  "readings": [23.5, 65.2, 24.1]
}
The gateway validates that objects are dictionaries and arrays are lists, but does not validate their internal structure. Nested validation is not supported.

Integration with message processing

Validation happens inside the database transaction, before any action is executed:
# From src/processor.py:86-95
with session_factory() as session:
    with session.begin():
        db_flow = session.execute(
            select(Flow).where(Flow.id == flow_id).with_for_update()
        ).scalar_one()

        validate_payload(payload, db_flow.payload_schema)

        db_flow.last_msg_id += 1
        current_msg_id = db_flow.last_msg_id
If validation fails, the transaction is rolled back and no changes are made to the database.

Build docs developers (and LLMs) love