The Indicator Service is a stateless FastAPI application that manages tourism indicators and their time-series data. It relies on three external dependencies: MongoDB for persistence, Redis for caching, and RabbitMQ for asynchronous data ingestion.
High-level overview
┌─────────────────────────────────────────────────────────────────┐
│ API Clients │
│ (browsers, dashboards, producers) │
└───────────────┬──────────────────────────┬──────────────────────┘
│ HTTP │ AMQP
▼ ▼
┌───────────────────────────┐ ┌──────────────────────────────┐
│ FastAPI Application │ │ RabbitMQ │
│ indicator-service:8080 │ │ resource_data queue │
│ │ │ resource_deleted queue │
│ Routes: │◄──│ │
│ /indicators │ └──────────────────────────────┘
│ /domains │
│ /indicators/{id}/data │
│ /health │
└──────┬─────────┬──────────┘
│ │
▼ ▼
┌──────────┐ ┌──────────────────────────────────────────────┐
│ Redis │ │ MongoDB │
│ cache │ │ indicators domains data_segments │
│ 6379 │ │ merged_indicators annotations │
└──────────┘ └──────────────────────────────────────────────┘
Component breakdown
FastAPI application layer
The application is built with FastAPI and served by Uvicorn on port 8080. CORS is configured at startup from the ORIGINS environment variable.
The router structure exposes five groups of endpoints:
| Prefix | Tags | Description |
|---|
/indicators | Indicators | CRUD for indicators, search, filtering by domain/subdomain |
/domains | Domains | CRUD for domains, icon/image upload |
/indicators/{id}/data | Data | Paginated time-series retrieval with granularity and aggregation |
/indicators/{id}/annotations | Annotations | Annotations attached to indicator data |
/health | Health | Liveness probe |
On startup, the application uses an asynccontextmanager lifespan to connect to RabbitMQ and register all queue consumers before accepting traffic. On shutdown, all consumer tasks are cancelled and the connection is closed cleanly.
The service uses Motor’s AsyncIOMotorClient for all MongoDB access, keeping every database operation non-blocking on the asyncio event loop.
MongoDB (Motor async driver)
MongoDB is the primary data store. The service connects using the MONGO_URI setting and always operates on the database specified in the URI (default: indicators).
The driver retries the initial connection every 5 seconds until MongoDB is reachable — this pairs with the Docker Compose healthcheck on indicators-mongo that prevents the application container from starting until MongoDB is ready.
Five collections are used:
| Collection | Contents |
|---|
indicators | Indicator metadata — name, periodicity, domain reference, subdomain, linked resource IDs |
domains | Domain records with name, color, icon, image, and list of subdomain names |
data_segments | Raw time-series segments as received from producers; each segment carries an indicator_id, resource_id, and an array of {x, y} points |
merged_indicators | De-duplicated, sorted union of all segments for an indicator. Rebuilt whenever a new segment arrives or a resource is deleted |
annotations | User-defined annotations linked to an indicator |
merged_indicators is the read path for all data queries. Raw data_segments are kept for provenance and to support re-merging when a resource is removed.
Redis — caching strategy
Redis sits in front of all time-series data reads and uses an adaptive, threshold-based caching strategy driven by miss counters.
Cache key formats (from data_propagator.py):
| Key pattern | Purpose | TTL |
|---|
indicator_data:{id}:{granularity}:{aggregator}:{params} | Specific query result (paginated, filtered, sorted) | CACHE_TTL_SECONDS = 3600 s |
indicator_data:{id}:{granularity}:{aggregator} | Full dataset for an indicator+granularity+aggregator combination | CACHE_TTL_SECONDS = 3600 s |
indicator_miss:{id}:{granularity}:{aggregator}:counter | Rolling miss counter for a given combination | MISS_COUNTER_TTL = 90 s |
stats:{id}* | Cached statistics (min, max, mean, etc.) | STATS_CACHE_TTL = 15 s |
Cache lookup order per request:
- Check the specific cache key (query with all parameters encoded).
- On miss, check the full-dataset cache key and derive the requested slice from it in memory.
- On second miss, increment the miss counter and query MongoDB directly, caching the specific result.
- Once the miss counter reaches
MISS_THRESHOLD = 5, a background task pre-populates the full-dataset cache key so subsequent requests can be served entirely from Redis.
The 90-second MISS_COUNTER_TTL window means the full-cache background task is triggered when the same indicator+granularity combination is missed 5 times within 90 seconds — this avoids pre-warming cold indicators that are rarely queried.
Cache invalidation happens on every write path:
- When a new data segment is ingested, all cache keys for that indicator are deleted using Redis
SCAN + DELETE.
- When a resource is deleted, the same invalidation runs after the merged collection is rebuilt.
RabbitMQ — message queues
The service consumes two durable queues declared at startup via aio-pika:
| Queue | Default name | Direction | Description |
|---|
RESOURCE_DATA_QUEUE | resource_data | Inbound | Time-series data published by data producers |
RESOURCE_DELETED_QUEUE | resource_deleted | Inbound | Resource deletion events from other services |
The RabbitMQClient maintains a channel pool of 5 channels (pool_size=5) and sets prefetch_count=10 per channel. Consumer tasks run as persistent asyncio tasks for the lifetime of the process.
resource_data message format:
{
"resource_id": "<string>",
"data": [
{"x": "2024-01-15T00:00:00Z", "y": 12.4},
{"x": "2024-01-16T00:00:00Z", "y": 15.1}
]
}
resource_deleted message format:
{
"resource_id": "<string>"
}
Error handling per queue:
| Condition | Action |
|---|
| Invalid JSON | ACK + discard (no requeue) |
| Invalid message structure | ACK + discard |
No matching indicator for resource_id | ACK + discard |
| MongoDB connection failure | NACK + requeue |
| Redis connection failure | NACK + requeue |
Messages with unknown resource_id values are silently discarded. A resource_id must be linked to an indicator via POST /indicators/{indicator_id}/resources before producers start publishing data for it.
Data flow
The following describes how a data point travels from a producer to an API response.
Ingestion path
Producer
│
│ publishes JSON to resource_data queue
▼
RabbitMQ (durable queue, persistent messages)
│
│ aio-pika consumer picks up message
▼
data_ingestor.process_message()
│
├─► Looks up indicator by resource_id (MongoDB: indicators)
├─► Validates message format
├─► Inserts raw DataSegment (MongoDB: data_segments)
├─► Calls merge_indicator_data() (reads all data_segments for indicator)
├─► Upserts merged sorted points (MongoDB: merged_indicators)
└─► Deletes all Redis keys for indicator (SCAN + DEL)
Read path
HTTP GET /indicators/{id}/data?granularity=1d&aggregator=avg
│
▼
data_propagator.get_data_points()
│
├─[1]─► Redis GET specific_cache_key → HIT: return immediately
│
├─[2]─► Redis GET full_cache_key → HIT: slice in memory, cache result, return
│
└─[3]─► Increment miss counter in Redis
MongoDB aggregation on merged_indicators
Cache specific result in Redis (TTL: 3600 s)
If miss_count >= 5: schedule background task to warm full cache
Return data
Service dependencies
indicator-service
├── depends_on: indicators-mongo (service_healthy)
├── depends_on: indicators-redis (service_started)
└── depends_on: rabbitmq (service_healthy) [override]
indicators-mongo
└── healthcheck: mongosh --eval "db.adminCommand('ping')"
interval: 6 s, timeout: 5 s, retries: 5
rabbitmq
└── healthcheck: rabbitmq-diagnostics check_port_connectivity
interval: 6 s, timeout: 5 s, retries: 5
All services share the indicator-network Docker network. MongoDB data is persisted in the indicators_db named volume so it survives container restarts.
Redis has no persistence configured — it is a pure cache and will start empty after a restart. All data will be re-loaded from MongoDB on the next cache miss.
Technology stack
| Component | Library / Image | Version |
|---|
| HTTP framework | fastapi | 0.116.1 |
| ASGI server | uvicorn | 0.35.0 |
| MongoDB driver | motor | 3.7.1 |
| RabbitMQ client | aio-pika | 9.5.7 |
| Redis client | redis (asyncio) | 6.4.0 |
| Data validation | pydantic / pydantic-settings | 2.11.7 / 2.10.1 |
| HTTP client (proxy calls) | httpx | 0.28.1 |
| Chart rendering | plotly + kaleido | 5.24.0 / 0.2.1 |
| MongoDB image | mongo:latest | — |
| Redis image | redis:alpine | — |
| RabbitMQ image | rabbitmq:3-management | — |