Payload schemas define the expected structure and data types for MQTT messages. The gateway validates every incoming message against its flow’s schema before processing, ensuring data integrity.
Schemas are stored as JSON in the payload_schema column of the flows table. They define expected attributes and their types:
{
"temperature": "float",
"humidity": "float"
}
The gateway supports two schema formats:
Simple format (recommended):
{
"temperature": "float",
"humidity": "float",
"sensor_id": "string"
}
JSON Schema format:
{
"properties": {
"temperature": { "type": "float" },
"humidity": { "type": "float" },
"sensor_id": { "type": "string" }
}
}
Both formats are equivalent. The validation logic extracts types from either format:
# From src/processor.py:37-53
def _extract_expected_types(payload_schema: dict[str, Any]) -> dict[str, Any]:
if "properties" in payload_schema and isinstance(payload_schema["properties"], dict):
out: dict[str, Any] = {}
for key, value in payload_schema["properties"].items():
if isinstance(value, dict) and "type" in value:
out[key] = value["type"]
elif isinstance(value, str):
out[key] = value
return out
out = {}
for key, value in payload_schema.items():
if isinstance(value, str):
out[key] = value
elif isinstance(value, dict) and "type" in value:
out[key] = value["type"]
return out
Supported types
The gateway validates against these Python types:
| Schema type | Python type | Example values |
|---|
string, str | str | "sensor_01", "active" |
number, float | int, float | 23.5, 100, -15.2 |
int, integer | int | 42, -10, 0 |
bool, boolean | bool | true, false |
object | dict | {"nested": "value"} |
array | list | [1, 2, 3], ["a", "b"] |
# From src/processor.py:23-34
_TYPE_MAP = {
"string": str,
"str": str,
"number": (int, float),
"float": (int, float),
"int": int,
"integer": int,
"bool": bool,
"boolean": bool,
"object": dict,
"array": list,
}
The number and float types accept both integers and floats. Use int or integer if you need to enforce whole numbers only.
Validation process
When a message arrives, the gateway validates it before processing:
# From src/processor.py:56-70
def validate_payload(payload: dict[str, Any], payload_schema: dict[str, Any]) -> None:
expected_types = _extract_expected_types(payload_schema)
for key, expected_type_name in expected_types.items():
if key not in payload:
raise ValueError(f"Missing attribute '{key}' in payload")
expected_python_type = _TYPE_MAP.get(str(expected_type_name).lower())
if expected_python_type is None:
continue
if not isinstance(payload[key], expected_python_type):
actual_type = type(payload[key]).__name__
raise ValueError(
f"Invalid type for '{key}'. Expected '{expected_type_name}', got '{actual_type}'"
)
Validation rules
- All schema attributes are required: Every attribute defined in the schema must be present in the payload
- Type checking is strict: Values must match the expected Python type exactly
- Extra attributes are allowed: Payloads can contain additional attributes not in the schema
- Unknown types are ignored: If a schema type isn’t in
_TYPE_MAP, validation for that attribute is skipped
If validation fails, the message is rejected and an error is logged. The flow’s last_msg_id is not incremented.
Validation examples
Valid payload
Schema:
{"temperature": "float", "humidity": "float"}
Payload:
{"temperature": 23.5, "humidity": 65.2}
Result: ✓ Valid
Payload:
{"temperature": 23.5, "humidity": 65.2, "sensor_id": "AHT10_01"}
Result: ✓ Valid (extra attributes are allowed)
Missing required attribute
Payload:
Result: ✗ Error: Missing attribute 'humidity' in payload
Type mismatch
Payload:
{"temperature": "23.5", "humidity": 65.2}
Result: ✗ Error: Invalid type for 'temperature'. Expected 'float', got 'str'
Complex types
Schemas can define object and array types:
{
"sensor_data": "object",
"readings": "array"
}
Valid payload:
{
"sensor_data": {
"model": "AHT10",
"version": "1.0"
},
"readings": [23.5, 65.2, 24.1]
}
The gateway validates that objects are dictionaries and arrays are lists, but does not validate their internal structure. Nested validation is not supported.
Integration with message processing
Validation happens inside the database transaction, before any action is executed:
# From src/processor.py:86-95
with session_factory() as session:
with session.begin():
db_flow = session.execute(
select(Flow).where(Flow.id == flow_id).with_for_update()
).scalar_one()
validate_payload(payload, db_flow.payload_schema)
db_flow.last_msg_id += 1
current_msg_id = db_flow.last_msg_id
If validation fails, the transaction is rolled back and no changes are made to the database.