Skip to main content
Time-series data enters the service asynchronously via RabbitMQ. External producers publish messages to a queue; the service consumes those messages, validates them, stores raw segments, and maintains a merged view of each indicator’s data.

Ingestion flow

1

Producer publishes a message

An external system publishes a JSON message to the resource_data queue (configurable via RESOURCE_DATA_QUEUE). The message contains a resource_id and an array of {x, y} data points.
2

Consumer receives and validates

The data_ingestor consumer deserializes the message body. If the JSON is invalid, or if resource_id or data are missing, the message is acknowledged and discarded (not requeued).
3

Indicator lookup

The service queries MongoDB for an indicator whose resources array contains the given resource_id. If no indicator is found, the message is acknowledged and discarded.
4

Data segment stored

A DataSegment document is inserted into the data_segments collection. Each segment carries the indicator_id, resource_id, the array of time points, and a timestamp recording when the segment arrived.
5

Merge into MergedIndicator

All segments for the indicator are re-merged into a single MergedIndicator document in the merged_indicators collection. This document holds the deduplicated, sorted view of the indicator’s complete time series.
6

Cache invalidation

All Redis keys for the indicator are cleared — data cache keys, miss counters, and statistics cache keys — so the next read fetches fresh data from MongoDB.

Message format

Messages must be UTF-8 JSON. The queue is declared as durable and messages are delivered with PERSISTENT delivery mode.
{
  "resource_id": "64c3b2e4d0f6a34f5e8b0456",
  "data": [
    {"x": "2024-01-01T00:00:00", "y": 72.5},
    {"x": "2024-02-01T00:00:00", "y": 68.3},
    {"x": "2024-03-01T00:00:00", "y": 75.1}
  ]
}
  • resource_id — a string matching a value in an indicator’s resources array
  • data — array of data points; each point has an x value (ISO 8601 datetime string) and a numeric y value
Messages with an invalid resource_id format, missing fields, or malformed JSON are permanently discarded (acknowledged without requeue). Ensure your producer validates messages before publishing.

DataPoint and DataSegment schemas

DataPoint

The data point type used in API responses (x can be either a datetime or a float for non-time-series indicators):
FieldTypeDescription
xdatetime | floatTimestamp or numeric x-axis value
yfloatMeasured value

TimePoint

The point type used internally within a DataSegment (x is strictly a datetime):
FieldTypeDescription
xdatetimeTimestamp for the measurement
yfloatMeasured value

DataSegment

Stored in the data_segments MongoDB collection:
FieldTypeDescription
indicator_idObjectIdThe indicator this segment belongs to
resource_idObjectIdThe resource that produced this segment
pointsTimePoint[]Array of time-stamped data points
timestampdatetimeUTC datetime when the segment was stored (auto-set)

MergedIndicator

Stored in the merged_indicators MongoDB collection (one document per indicator):
FieldTypeDescription
indicator_idObjectIdThe indicator this document belongs to
pointsDataPoint[]Deduplicated, sorted data points
last_updateddatetimeUTC datetime of the last merge

Merge strategy

After every new segment is stored, all DataSegment documents for that indicator are re-read and merged into the MergedIndicator document using the following rules:
  1. Points with datetime x values are grouped by their exact timestamp.
  2. For a given timestamp, only the y value from the segment with the newest timestamp field is kept. This means the most recently ingested segment always wins over older ones at the same point in time.
  3. Numeric (non-datetime) x values are not deduplicated — they are appended as-is.
  4. All resulting points are sorted in ascending order by x.
Segment A (stored at 10:00): [{x: "2024-01-01", y: 70.0}, {x: "2024-02-01", y: 65.0}]
Segment B (stored at 11:00): [{x: "2024-01-01", y: 72.5}]  ← same timestamp, newer segment

Merged result: [{x: "2024-01-01", y: 72.5}, {x: "2024-02-01", y: 65.0}]
               ↑ B's value wins because B.timestamp > A.timestamp

Publishing messages with aio-pika

Here is a minimal Python example that publishes a data message to the resource_data queue:
import asyncio
import json
import aio_pika

async def publish_data():
    connection = await aio_pika.connect_robust("amqp://guest:guest@rabbitmq/")

    async with connection:
        channel = await connection.channel()

        # Declare the queue as durable (must match the consumer's declaration)
        queue = await channel.declare_queue("resource_data", durable=True)

        payload = {
            "resource_id": "64c3b2e4d0f6a34f5e8b0456",
            "data": [
                {"x": "2024-01-01T00:00:00", "y": 72.5},
                {"x": "2024-02-01T00:00:00", "y": 68.3},
                {"x": "2024-03-01T00:00:00", "y": 75.1},
            ],
        }

        message = aio_pika.Message(
            body=json.dumps(payload).encode(),
            delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
        )

        await channel.default_exchange.publish(
            message,
            routing_key="resource_data",
        )
        print("Published data segment")

asyncio.run(publish_data())

Resource deletion

When an external resource is deleted, the service listens on a second queue (resource_deleted, configurable via RESOURCE_DELETED_QUEUE) and performs the following cleanup for each indicator that referenced the deleted resource:
  1. Removes the resource_id string from the indicator’s resources array ($pull).
  2. Deletes all DataSegment documents with that resource_id.
  3. Re-merges the remaining segments and updates the MergedIndicator document.
  4. Clears all Redis cache keys for the indicator.
The deletion message format is:
{
  "resource_id": "64c3b2e4d0f6a34f5e8b0456"
}
Cleanup failures for individual indicators are logged and do not affect other indicators in the same deletion event. Each indicator is processed independently.

Error handling

ConditionAction
Invalid JSONAcknowledged (discarded), error logged
Missing resource_id or dataAcknowledged (discarded), warning logged
No indicator found for resource_idAcknowledged (discarded), warning logged
Invalid ObjectId formatAcknowledged (discarded), error logged
MongoDB connection failureRejected with requeue=True
Redis connection failureRejected with requeue=True

Build docs developers (and LLMs) love