Skip to main content
The Indicator Service is a stateless FastAPI application that manages tourism indicators and their time-series data. It relies on three external dependencies: MongoDB for persistence, Redis for caching, and RabbitMQ for asynchronous data ingestion.

High-level overview

┌─────────────────────────────────────────────────────────────────┐
│                        API Clients                              │
│               (browsers, dashboards, producers)                 │
└───────────────┬──────────────────────────┬──────────────────────┘
                │ HTTP                     │ AMQP
                ▼                          ▼
┌───────────────────────────┐   ┌──────────────────────────────┐
│   FastAPI Application     │   │         RabbitMQ             │
│  indicator-service:8080   │   │  resource_data queue         │
│                           │   │  resource_deleted queue      │
│  Routes:                  │◄──│                              │
│  /indicators              │   └──────────────────────────────┘
│  /domains                 │
│  /indicators/{id}/data    │
│  /health                  │
└──────┬─────────┬──────────┘
       │         │
       ▼         ▼
┌──────────┐  ┌──────────────────────────────────────────────┐
│  Redis   │  │                 MongoDB                      │
│  cache   │  │  indicators  domains  data_segments          │
│  6379    │  │  merged_indicators  annotations              │
└──────────┘  └──────────────────────────────────────────────┘

Component breakdown

FastAPI application layer

The application is built with FastAPI and served by Uvicorn on port 8080. CORS is configured at startup from the ORIGINS environment variable. The router structure exposes five groups of endpoints:
PrefixTagsDescription
/indicatorsIndicatorsCRUD for indicators, search, filtering by domain/subdomain
/domainsDomainsCRUD for domains, icon/image upload
/indicators/{id}/dataDataPaginated time-series retrieval with granularity and aggregation
/indicators/{id}/annotationsAnnotationsAnnotations attached to indicator data
/healthHealthLiveness probe
On startup, the application uses an asynccontextmanager lifespan to connect to RabbitMQ and register all queue consumers before accepting traffic. On shutdown, all consumer tasks are cancelled and the connection is closed cleanly.
The service uses Motor’s AsyncIOMotorClient for all MongoDB access, keeping every database operation non-blocking on the asyncio event loop.

MongoDB (Motor async driver)

MongoDB is the primary data store. The service connects using the MONGO_URI setting and always operates on the database specified in the URI (default: indicators). The driver retries the initial connection every 5 seconds until MongoDB is reachable — this pairs with the Docker Compose healthcheck on indicators-mongo that prevents the application container from starting until MongoDB is ready. Five collections are used:
CollectionContents
indicatorsIndicator metadata — name, periodicity, domain reference, subdomain, linked resource IDs
domainsDomain records with name, color, icon, image, and list of subdomain names
data_segmentsRaw time-series segments as received from producers; each segment carries an indicator_id, resource_id, and an array of {x, y} points
merged_indicatorsDe-duplicated, sorted union of all segments for an indicator. Rebuilt whenever a new segment arrives or a resource is deleted
annotationsUser-defined annotations linked to an indicator
merged_indicators is the read path for all data queries. Raw data_segments are kept for provenance and to support re-merging when a resource is removed.

Redis — caching strategy

Redis sits in front of all time-series data reads and uses an adaptive, threshold-based caching strategy driven by miss counters. Cache key formats (from data_propagator.py):
Key patternPurposeTTL
indicator_data:{id}:{granularity}:{aggregator}:{params}Specific query result (paginated, filtered, sorted)CACHE_TTL_SECONDS = 3600 s
indicator_data:{id}:{granularity}:{aggregator}Full dataset for an indicator+granularity+aggregator combinationCACHE_TTL_SECONDS = 3600 s
indicator_miss:{id}:{granularity}:{aggregator}:counterRolling miss counter for a given combinationMISS_COUNTER_TTL = 90 s
stats:{id}*Cached statistics (min, max, mean, etc.)STATS_CACHE_TTL = 15 s
Cache lookup order per request:
  1. Check the specific cache key (query with all parameters encoded).
  2. On miss, check the full-dataset cache key and derive the requested slice from it in memory.
  3. On second miss, increment the miss counter and query MongoDB directly, caching the specific result.
  4. Once the miss counter reaches MISS_THRESHOLD = 5, a background task pre-populates the full-dataset cache key so subsequent requests can be served entirely from Redis.
The 90-second MISS_COUNTER_TTL window means the full-cache background task is triggered when the same indicator+granularity combination is missed 5 times within 90 seconds — this avoids pre-warming cold indicators that are rarely queried.
Cache invalidation happens on every write path:
  • When a new data segment is ingested, all cache keys for that indicator are deleted using Redis SCAN + DELETE.
  • When a resource is deleted, the same invalidation runs after the merged collection is rebuilt.

RabbitMQ — message queues

The service consumes two durable queues declared at startup via aio-pika:
QueueDefault nameDirectionDescription
RESOURCE_DATA_QUEUEresource_dataInboundTime-series data published by data producers
RESOURCE_DELETED_QUEUEresource_deletedInboundResource deletion events from other services
The RabbitMQClient maintains a channel pool of 5 channels (pool_size=5) and sets prefetch_count=10 per channel. Consumer tasks run as persistent asyncio tasks for the lifetime of the process. resource_data message format:
{
  "resource_id": "<string>",
  "data": [
    {"x": "2024-01-15T00:00:00Z", "y": 12.4},
    {"x": "2024-01-16T00:00:00Z", "y": 15.1}
  ]
}
resource_deleted message format:
{
  "resource_id": "<string>"
}
Error handling per queue:
ConditionAction
Invalid JSONACK + discard (no requeue)
Invalid message structureACK + discard
No matching indicator for resource_idACK + discard
MongoDB connection failureNACK + requeue
Redis connection failureNACK + requeue
Messages with unknown resource_id values are silently discarded. A resource_id must be linked to an indicator via POST /indicators/{indicator_id}/resources before producers start publishing data for it.

Data flow

The following describes how a data point travels from a producer to an API response.

Ingestion path

Producer

  │  publishes JSON to resource_data queue

RabbitMQ  (durable queue, persistent messages)

  │  aio-pika consumer picks up message

data_ingestor.process_message()

  ├─► Looks up indicator by resource_id   (MongoDB: indicators)
  ├─► Validates message format
  ├─► Inserts raw DataSegment              (MongoDB: data_segments)
  ├─► Calls merge_indicator_data()         (reads all data_segments for indicator)
  ├─► Upserts merged sorted points         (MongoDB: merged_indicators)
  └─► Deletes all Redis keys for indicator (SCAN + DEL)

Read path

HTTP GET /indicators/{id}/data?granularity=1d&aggregator=avg


data_propagator.get_data_points()

  ├─[1]─► Redis GET specific_cache_key  →  HIT: return immediately

  ├─[2]─► Redis GET full_cache_key      →  HIT: slice in memory, cache result, return

  └─[3]─► Increment miss counter in Redis
           MongoDB aggregation on merged_indicators
           Cache specific result in Redis (TTL: 3600 s)
           If miss_count >= 5: schedule background task to warm full cache
           Return data

Service dependencies

indicator-service
  ├── depends_on: indicators-mongo (service_healthy)
  ├── depends_on: indicators-redis (service_started)
  └── depends_on: rabbitmq         (service_healthy)  [override]

indicators-mongo
  └── healthcheck: mongosh --eval "db.adminCommand('ping')"
      interval: 6 s, timeout: 5 s, retries: 5

rabbitmq
  └── healthcheck: rabbitmq-diagnostics check_port_connectivity
      interval: 6 s, timeout: 5 s, retries: 5
All services share the indicator-network Docker network. MongoDB data is persisted in the indicators_db named volume so it survives container restarts.
Redis has no persistence configured — it is a pure cache and will start empty after a restart. All data will be re-loaded from MongoDB on the next cache miss.

Technology stack

ComponentLibrary / ImageVersion
HTTP frameworkfastapi0.116.1
ASGI serveruvicorn0.35.0
MongoDB drivermotor3.7.1
RabbitMQ clientaio-pika9.5.7
Redis clientredis (asyncio)6.4.0
Data validationpydantic / pydantic-settings2.11.7 / 2.10.1
HTTP client (proxy calls)httpx0.28.1
Chart renderingplotly + kaleido5.24.0 / 0.2.1
MongoDB imagemongo:latest
Redis imageredis:alpine
RabbitMQ imagerabbitmq:3-management

Build docs developers (and LLMs) love