Ingestion flow
Producer publishes a message
An external system publishes a JSON message to the
resource_data queue (configurable via RESOURCE_DATA_QUEUE). The message contains a resource_id and an array of {x, y} data points.Consumer receives and validates
The
data_ingestor consumer deserializes the message body. If the JSON is invalid, or if resource_id or data are missing, the message is acknowledged and discarded (not requeued).Indicator lookup
The service queries MongoDB for an indicator whose
resources array contains the given resource_id. If no indicator is found, the message is acknowledged and discarded.Data segment stored
A
DataSegment document is inserted into the data_segments collection. Each segment carries the indicator_id, resource_id, the array of time points, and a timestamp recording when the segment arrived.Merge into MergedIndicator
All segments for the indicator are re-merged into a single
MergedIndicator document in the merged_indicators collection. This document holds the deduplicated, sorted view of the indicator’s complete time series.Message format
Messages must be UTF-8 JSON. The queue is declared as durable and messages are delivered withPERSISTENT delivery mode.
resource_id— a string matching a value in an indicator’sresourcesarraydata— array of data points; each point has anxvalue (ISO 8601 datetime string) and a numericyvalue
DataPoint and DataSegment schemas
DataPoint
The data point type used in API responses (x can be either a datetime or a float for non-time-series indicators):
| Field | Type | Description |
|---|---|---|
x | datetime | float | Timestamp or numeric x-axis value |
y | float | Measured value |
TimePoint
The point type used internally within aDataSegment (x is strictly a datetime):
| Field | Type | Description |
|---|---|---|
x | datetime | Timestamp for the measurement |
y | float | Measured value |
DataSegment
Stored in thedata_segments MongoDB collection:
| Field | Type | Description |
|---|---|---|
indicator_id | ObjectId | The indicator this segment belongs to |
resource_id | ObjectId | The resource that produced this segment |
points | TimePoint[] | Array of time-stamped data points |
timestamp | datetime | UTC datetime when the segment was stored (auto-set) |
MergedIndicator
Stored in themerged_indicators MongoDB collection (one document per indicator):
| Field | Type | Description |
|---|---|---|
indicator_id | ObjectId | The indicator this document belongs to |
points | DataPoint[] | Deduplicated, sorted data points |
last_updated | datetime | UTC datetime of the last merge |
Merge strategy
After every new segment is stored, allDataSegment documents for that indicator are re-read and merged into the MergedIndicator document using the following rules:
- Points with datetime
xvalues are grouped by their exact timestamp. - For a given timestamp, only the
yvalue from the segment with the newesttimestampfield is kept. This means the most recently ingested segment always wins over older ones at the same point in time. - Numeric (non-datetime)
xvalues are not deduplicated — they are appended as-is. - All resulting points are sorted in ascending order by
x.
Publishing messages with aio-pika
Here is a minimal Python example that publishes a data message to theresource_data queue:
Resource deletion
When an external resource is deleted, the service listens on a second queue (resource_deleted, configurable via RESOURCE_DELETED_QUEUE) and performs the following cleanup for each indicator that referenced the deleted resource:
- Removes the
resource_idstring from the indicator’sresourcesarray ($pull). - Deletes all
DataSegmentdocuments with thatresource_id. - Re-merges the remaining segments and updates the
MergedIndicatordocument. - Clears all Redis cache keys for the indicator.
Cleanup failures for individual indicators are logged and do not affect other indicators in the same deletion event. Each indicator is processed independently.
Error handling
| Condition | Action |
|---|---|
| Invalid JSON | Acknowledged (discarded), error logged |
Missing resource_id or data | Acknowledged (discarded), warning logged |
No indicator found for resource_id | Acknowledged (discarded), warning logged |
| Invalid ObjectId format | Acknowledged (discarded), error logged |
| MongoDB connection failure | Rejected with requeue=True |
| Redis connection failure | Rejected with requeue=True |