Skip to main content

Overview

GOV.UK Notify uses Celery for asynchronous task processing with:
  • Amazon SQS as the message broker
  • Multiple specialized worker pools for different task types
  • Celery Beat for scheduled tasks
  • Integration with StatsD and Prometheus for monitoring
Reference: README.md:6

Celery Configuration

Broker Configuration

Celery uses Amazon SQS as the message broker:
CELERY = {
    "broker_url": "https://sqs.eu-west-1.amazonaws.com",
    "broker_transport": "sqs",
    "broker_transport_options": {
        "region": "eu-west-1",
        "queue_name_prefix": NOTIFICATION_QUEUE_PREFIX,
        "is_secure": True,
        "predefined_queues": {...}
    },
    "task_ignore_result": True,
    "result_expires": 0,
    "timezone": "UTC"
}
Reference: config.py:258-278

Required Environment Variables

NOTIFICATION_QUEUE_PREFIX
string
required
Prefix for all SQS queue names. Used to separate environments.Example: production-, staging-, local_dev_john
AWS_REGION
string
AWS region for SQS queues.Default: eu-west-1
AWS_ACCOUNT_ID
string
AWS account ID for constructing SQS queue URLs.Default: 123456789012
Reference: config.py:137, 170, 257

Queue Structure

Task Queues

The application defines specialized queues for different task types:
class QueueNames:
    PERIODIC = "periodic-tasks"                    # Scheduled tasks
    DATABASE = "database-tasks"                    # Database operations
    SEND_SMS = "send-sms-tasks"                   # SMS delivery
    SEND_EMAIL = "send-email-tasks"               # Email delivery
    SEND_LETTER = "send-letter-tasks"             # Letter delivery
    RESEARCH_MODE = "research-mode-tasks"         # Test/research mode
    REPORTING = "reporting-tasks"                 # Analytics and reports
    JOBS = "job-tasks"                            # Batch job processing
    RETRY = "retry-tasks"                         # Failed task retries
    NOTIFY = "notify-internal-tasks"              # Internal Notify operations
    CREATE_LETTERS_PDF = "create-letters-pdf-tasks" # Letter PDF generation
    CALLBACKS = "service-callbacks"               # Service callback webhooks
    CALLBACKS_RETRY = "service-callbacks-retry"   # Callback retries
    LETTERS = "letter-tasks"                      # Letter processing
    SES_CALLBACKS = "ses-callbacks"               # Email delivery receipts
    SMS_CALLBACKS = "sms-callbacks"               # SMS delivery receipts
    LETTER_CALLBACKS = "letter-callbacks"         # Letter delivery updates
    REPORT_REQUESTS_NOTIFICATIONS = "report-requests-notifications-tasks"
Reference: config.py:12-32

External Queues

External services write to these queues:
ANTIVIRUS = "antivirus-tasks"        # Antivirus scan results
SANITISE_LETTERS = "sanitise-letter-tasks"  # Letter sanitization
Reference: config.py:30-31

Running Celery Workers

Local Development

# Run generic worker (all queues)
make run-celery

# Equivalent to:
celery -A run_celery.notify_celery worker \
  --pidfile="/tmp/celery.pid" \
  --loglevel=INFO \
  --concurrency=4
Reference: Makefile:38-44

Docker Development

For development environments with pycurl issues:
# Build Docker image with test dependencies
make bootstrap-with-docker

# Run Celery worker in Docker
make run-celery-with-docker
Reference: README.md:106-117, Makefile:46-48

Production Workers

In production, run specialized workers for each queue type:

API Worker (Generic)

./entrypoint.sh worker
# Processes all queues with default concurrency

Sender Worker (SMS/Email)

./entrypoint.sh api-worker-sender
# Queues: send-sms-tasks, send-email-tasks

Letter Sender Worker

./entrypoint.sh api-worker-sender-letters
# Queue: send-letter-tasks

Jobs Worker

./entrypoint.sh api-worker-jobs
# Queues: database-tasks, job-tasks

Letters Worker

./entrypoint.sh api-worker-letters
# Queues: create-letters-pdf-tasks, letter-tasks

Receipts Worker

./entrypoint.sh api-worker-receipts
# Queues: ses-callbacks, sms-callbacks, letter-callbacks

Periodic Worker

./entrypoint.sh api-worker-periodic
# Queue: periodic-tasks

Reporting Worker

./entrypoint.sh api-worker-reporting
# Queue: reporting-tasks

Internal Worker

./entrypoint.sh api-worker-internal
# Queue: notify-internal-tasks (high priority)

Service Callbacks Worker

./entrypoint.sh api-worker-service-callbacks
# Queue: service-callbacks

Callbacks Retry Worker

./entrypoint.sh api-worker-service-callbacks-retry
# Queue: service-callbacks-retry

Retry Worker

./entrypoint.sh api-worker-retry-tasks
# Queue: retry-tasks

Research Mode Worker

./entrypoint.sh api-worker-research
# Queue: research-mode-tasks

Report Requests Worker

./entrypoint.sh api-worker-report-requests-notifications
# Queue: report-requests-notifications-tasks
Reference: entrypoint.sh:11-66

Worker Configuration

Concurrency

Control the number of concurrent tasks per worker:
# Set concurrency via environment variable
export CONCURRENCY=8
./entrypoint.sh worker

# Default: 4 concurrent tasks
Reference: entrypoint.sh:5-8

Prefetch Multiplier

For long-running tasks, set prefetch multiplier to prevent task hoarding:
export CELERYD_PREFETCH_MULTIPLIER=1
This limits each worker to prefetch only 1 task at a time. Reference: config.py:480-482

Log Levels

# Worker log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
export CELERY_WORKER_LOG_LEVEL=INFO

# Beat log level
export CELERY_BEAT_LOG_LEVEL=INFO
Reference: config.py:91-92

Celery Beat (Scheduled Tasks)

Running Celery Beat

# Local development
make run-celery-beat

# Docker
make run-celery-beat-with-docker

# Production
./entrypoint.sh celery-beat
Important: Only run one instance of Celery Beat per environment to avoid duplicate task execution. Reference: Makefile:50-56, entrypoint.sh:67-68

Scheduled Tasks

Celery Beat runs tasks on predefined schedules:

High-Frequency Tasks (Every Minute)

  • generate-sms-delivery-stats - SMS delivery statistics
  • switch-current-sms-provider-on-slow-delivery - Provider failover
  • check-job-status - Monitor job completion
  • update-status-of-fully-processed-jobs - Update job statuses
Reference: config.py:299-327

Periodic Tasks (Every 5-15 Minutes)

  • run-scheduled-jobs - Every 15 minutes (0, 15, 30, 45)
  • tend-providers-back-to-middle - Every 5 minutes
  • check-for-missing-rows-in-completed-jobs - Every 10 minutes
  • replay-created-notifications - Every 15 minutes
Reference: config.py:283-333

Nightly Tasks

  • timeout-sending-notifications - 00:05 UTC
  • archive-unsubscribe-requests - 00:05 UTC
  • create-nightly-billing - 00:15 UTC
  • create-nightly-notification-status - 00:30 UTC
  • delete-notifications-older-than-retention - 03:00 UTC
  • delete-inbound-sms - 01:40 UTC
  • save-daily-notification-processing-time - 02:00 UTC
  • remove_sms_email_jobs - 04:00 UTC
  • remove_letter_jobs - 04:20 UTC
Reference: config.py:340-400

Letter Processing Tasks

  • check-if-letters-still-in-created - Weekdays at 07:00 UTC
  • check-if-letters-still-pending-virus-check - Every 10 minutes + nightly
  • check-time-to-collate-letters - 16:50 and 17:50 UTC
  • raise-alert-if-letter-notifications-still-sending - 19:00 UTC
Reference: config.py:401-439

Weekly Tasks

  • check-for-low-available-inbound-sms-numbers - Monday 09:00 UTC
  • weekly-dwp-report - Monday 09:00 UTC
  • weekly-user-research-email - Wednesday 10:00 UTC
Reference: config.py:450-464

Monthly Tasks

  • change-dvla-api-key - First Tuesday, 09:00 UTC
  • change-dvla-password - Wednesday after first Tuesday, 09:00 UTC
  • run-populate-annual-billing - April 1st, 02:01 UTC
Reference: config.py:334-338, 466-476

Task Modules

Tasks are organized in separate modules:
CELERY["imports"] = [
    "app.celery.tasks",              # General tasks
    "app.celery.scheduled_tasks",    # Periodic tasks
    "app.celery.reporting_tasks",    # Reporting tasks
    "app.celery.nightly_tasks",      # Nightly maintenance
]
Additional task modules:
  • app.celery.letters_pdf_tasks - Letter PDF generation
  • app.celery.process_letter_client_response_tasks - Letter delivery updates
  • app.celery.process_ses_receipts_tasks - Email delivery receipts
  • app.celery.process_sms_client_response_tasks - SMS delivery receipts
  • app.celery.provider_tasks - Provider communication
  • app.celery.research_mode_tasks - Research/test mode
  • app.celery.service_callback_tasks - Service webhooks
Reference: config.py:274-279

Monitoring and Metrics

Prometheus Metrics

Celery workers export Prometheus metrics:
# Set multiprocess directory
export PROMETHEUS_MULTIPROC_DIR="/tmp"
Reference: entrypoint.sh:3, run_celery.py:12

StatsD Metrics

Celery tasks emit StatsD metrics: Callback Metrics:
  • callback.{provider}.{status} - Delivery receipt callbacks
  • callback.ses.{status} - Email delivery status
  • international-sms.{status}.{prefix} - International SMS by country
Reference: app/celery/process_sms_client_response_tasks.py:95-120 Timing Metrics:
  • callback-to-notification-created - Time from notification creation to callback

Queue Monitoring

Monitor SQS queue metrics:
  • ApproximateNumberOfMessages - Messages waiting in queue
  • ApproximateNumberOfMessagesNotVisible - Messages being processed
  • ApproximateAgeOfOldestMessage - Queue backlog age

Worker Scaling

Horizontal Scaling

Scale workers by running multiple instances:
# Scale sender workers
for i in {1..5}; do
  ./entrypoint.sh api-worker-sender &
done
Each worker instance:
  • Polls the same SQS queue
  • Processes tasks independently
  • Uses SQS visibility timeout to prevent duplicate processing

Vertical Scaling

Increase concurrency within workers:
export CONCURRENCY=16
./entrypoint.sh api-worker-sender
Considerations:
  • Each concurrent task uses a database connection
  • Monitor connection pool usage
  • CPU-bound tasks benefit from more workers
  • I/O-bound tasks benefit from higher concurrency

Auto-Scaling

Scale workers based on queue depth:
# Example ECS auto-scaling policy
ScaleUpPolicy:
  MetricName: ApproximateNumberOfMessagesVisible
  Threshold: 100
  ScalingAdjustment: 2

ScaleDownPolicy:
  MetricName: ApproximateNumberOfMessagesVisible
  Threshold: 10
  ScalingAdjustment: -1

Troubleshooting

Worker Not Consuming Tasks

Symptoms: Tasks queued but not processed Solutions:
  1. Check worker is running and connected to SQS
  2. Verify NOTIFICATION_QUEUE_PREFIX matches queue names
  3. Check AWS credentials and permissions
  4. Verify queue exists in AWS console
  5. Check worker logs for connection errors

Task Timeouts

Symptoms: Tasks fail with timeout errors Solutions:
  1. Increase task timeout in task definition
  2. Optimize slow operations
  3. Break large tasks into smaller chunks
  4. Check database query performance

Memory Leaks

Symptoms: Worker memory usage grows over time Solutions:
  1. Enable max-tasks-per-child to restart workers periodically:
    celery worker --max-tasks-per-child=1000
    
  2. Profile tasks for memory leaks
  3. Check for unclosed database connections
  4. Monitor eventlet greenthread creation

Duplicate Task Execution

Symptoms: Tasks executed multiple times Solutions:
  1. Verify only one Celery Beat instance is running
  2. Check SQS visibility timeout is appropriate
  3. Ensure tasks are idempotent
  4. Check for worker restarts during task execution

Connection Pool Exhaustion

Symptoms: QueuePool limit exceeded errors Solutions:
  1. Reduce worker concurrency
  2. Increase SQLALCHEMY_POOL_SIZE
  3. Check for connection leaks in tasks
  4. Monitor db_connection_total_checked_out metric

PyCURL Issues (Mac M1)

Symptoms: Import errors or version mismatches Solutions:
  1. Follow installation instructions: GitHub Issue #1216
  2. Use Docker for local development:
    make run-celery-with-docker
    
Reference: README.md:15-16, run_celery.py:13-22

Best Practices

  1. Use specialized workers - Separate critical tasks from batch processing
  2. Monitor queue depths - Alert on growing queues
  3. Make tasks idempotent - Tasks should be safe to retry
  4. Set appropriate timeouts - Prevent tasks from running indefinitely
  5. Log task failures - Include context for debugging
  6. Test task retry logic - Ensure failures are handled gracefully
  7. Use database transactions - Ensure consistency in task operations
  8. Limit task size - Break large operations into smaller tasks
  9. Monitor worker health - Alert on worker crashes
  10. Regular worker restarts - Use max-tasks-per-child to prevent memory leaks

Build docs developers (and LLMs) love