Skip to main content
GOV.UK Notify uses Celery with Amazon SQS as the message broker for asynchronous task processing. The task architecture handles notification delivery, job processing, scheduled tasks, and callbacks.

Queue Architecture

Location: app/config.py:12

Queue Definitions

class QueueNames:
    PERIODIC = "periodic-tasks"              # Scheduled/cron tasks
    DATABASE = "database-tasks"              # DB writes from jobs
    SEND_SMS = "send-sms-tasks"              # SMS delivery
    SEND_EMAIL = "send-email-tasks"          # Email delivery
    SEND_LETTER = "send-letter-tasks"        # Letter delivery
    RESEARCH_MODE = "research-mode-tasks"    # Test mode sends
    REPORTING = "reporting-tasks"            # Analytics/billing
    JOBS = "job-tasks"                       # Job processing
    RETRY = "retry-tasks"                    # Failed task retries
    NOTIFY = "notify-internal-tasks"         # Internal operations
    CREATE_LETTERS_PDF = "create-letters-pdf-tasks"
    CALLBACKS = "service-callbacks"          # Service callbacks
    CALLBACKS_RETRY = "service-callbacks-retry"
    LETTERS = "letter-tasks"                 # Letter operations
    SES_CALLBACKS = "ses-callbacks"          # Email provider callbacks
    SMS_CALLBACKS = "sms-callbacks"          # SMS provider callbacks
    LETTER_CALLBACKS = "letter-callbacks"    # Letter provider callbacks
    ANTIVIRUS = "antivirus-tasks"            # External AV service
    SANITISE_LETTERS = "sanitise-letter-tasks"  # External sanitisation

Queue Configuration

CELERY = {
    "broker_url": "https://sqs.eu-west-1.amazonaws.com",
    "broker_transport": "sqs",
    "task_ignore_result": True,
    "broker_transport_options": {
        "region": "eu-west-1",
        "queue_name_prefix": NOTIFICATION_QUEUE_PREFIX,
        "predefined_queues": QueueNames.predefined_queues(),
    },
}

Core Tasks

Job Processing Tasks

Location: app/celery/tasks.py

process-job

@notify_celery.task(name="process-job")
def process_job(job_id, sender_id=None, shatter_batch_size=32):
    """
    Main task to process a bulk notification job.
    
    Flow:
    1. Fetch job and validate status
    2. Check service is active
    3. Verify sending limits not exceeded
    4. Read CSV from S3
    5. Batch rows and create shatter-job-rows tasks
    6. Mark job as finished
    """
Process:
  1. Updates job status to in progress
  2. Validates service is active
  3. Checks daily sending limits
  4. Reads recipient CSV from S3
  5. Batches rows (default 32) and creates subtasks
  6. Each batch sent to shatter-job-rows task
  7. Marks job as finished

shatter-job-rows

@notify_celery.task(name="shatter-job-rows")
def shatter_job_rows(template_type: str, args_kwargs_seq: Sequence):
    """
    Takes a batch of job rows and creates individual notification tasks.
    
    Uses subdivision if SQS message size limit exceeded.
    """
    for task_args_kwargs in args_kwargs_seq:
        process_job_row(template_type, task_args_kwargs)
Features:
  • Handles SQS message size limits via recursive subdivision
  • Routes to appropriate save task based on notification type
  • Queue: JOBS

Notification Saving Tasks

Location: app/celery/tasks.py:275

save-sms

@notify_celery.task(
    bind=True, 
    name="save-sms", 
    max_retries=5, 
    default_retry_delay=300
)
def save_sms(self, service_id, notification_id, encoded_notification, sender_id=None):
    """
    Persists SMS notification to database and queues for delivery.
    """
Flow:
  1. Decode signed notification data
  2. Load service and template
  3. Validate recipient format
  4. Persist notification to database
  5. Queue deliver_sms task to SEND_SMS
Error Handling:
  • Invalid phone numbers marked as validation-failed
  • SQLAlchemy errors trigger retry with exponential backoff
  • Max retries: 5, delay: 300s

save-email

@notify_celery.task(
    bind=True,
    name="save-email",
    max_retries=5,
    default_retry_delay=300
)
def save_email(self, service_id, notification_id, encoded_notification, sender_id=None):
    """
    Persists email notification and queues for delivery.
    """
Special Handling:
  • Adds email file attachment links to personalisation
  • Handles reply-to address configuration
  • Unsubscribe link generation if enabled

save-letter

@notify_celery.task(
    bind=True,
    name="save-letter",
    max_retries=5,
    default_retry_delay=300
)
def save_letter(self, service_id, notification_id, encoded_notification):
    """
    Persists letter notification and triggers PDF generation.
    """
Flow:
  1. Parse postal address from personalisation
  2. Determine postage (first/second class, international)
  3. Persist notification
  4. Queue get_pdf_for_templated_letter to CREATE_LETTERS_PDF

Provider Delivery Tasks

Location: app/celery/provider_tasks.py

deliver_sms

@notify_celery.task(
    bind=True,
    name="deliver_sms",
    max_retries=48,
    default_retry_delay=300
)
def deliver_sms(self, notification_id):
    """
    Sends SMS to provider (MMG or Firetext).
    
    Retries: 48 times (4 days)
    """
Error Handling:
  • SmsClientResponseException → retry
  • Max retries exceeded → mark as technical-failure
  • First retry: immediate (countdown=0)
  • Subsequent retries: 300s delay
  • Queue: RETRY for all retries

deliver_email

@notify_celery.task(
    bind=True,
    name="deliver_email",
    max_retries=48,
    default_retry_delay=300
)
def deliver_email(self, notification_id):
    """
    Sends email via AWS SES.
    """
Special Cases:
  • EmailClientNonRetryableException → immediate technical-failure (no retry)
  • AwsSesClientThrottlingSendRateException → retry with warning
  • Generic exceptions → retry

deliver_letter

@notify_celery.task(
    bind=True,
    name="deliver_letter",
    max_retries=55
)
def deliver_letter(self, notification_id):
    """
    Sends letter to DVLA print provider.
    """
Process:
  1. Validate notification status is created
  2. Fetch PDF from S3
  3. Send to DVLA API with callback URL
  4. Update status to sending
Error Types:
  • DvlaThrottlingException → retry with warning
  • DvlaRetryableException → retry
  • DvlaDuplicatePrintRequestException → mark sending (idempotent)
  • Other exceptions → technical-failure

Scheduled Tasks

Location: app/config.py:282 (beat_schedule)

High-Frequency Tasks (Every Minute)

"generate-sms-delivery-stats": {
    "task": "generate-sms-delivery-stats",
    "schedule": crontab(),  # Every minute
},
"switch-current-sms-provider-on-slow-delivery": {
    "task": "switch-current-sms-provider-on-slow-delivery",
    "schedule": crontab(),
},
"check-job-status": {
    "task": "check-job-status",
    "schedule": crontab(),
},

Periodic Tasks

"run-scheduled-jobs": {
    "task": "run-scheduled-jobs",
    "schedule": crontab(minute="0,15,30,45"),  # Every 15 mins
},
"tend-providers-back-to-middle": {
    "task": "tend-providers-back-to-middle",
    "schedule": crontab(minute="*/5"),  # Every 5 mins
},
"replay-created-notifications": {
    "task": "replay-created-notifications",
    "schedule": crontab(minute="0,15,30,45"),
},

Nightly Tasks

"timeout-sending-notifications": {
    "task": "timeout-sending-notifications",
    "schedule": crontab(hour=0, minute=5),
},
"create-nightly-billing": {
    "task": "create-nightly-billing",
    "schedule": crontab(hour=0, minute=15),
},
"create-nightly-notification-status": {
    "task": "create-nightly-notification-status",
    "schedule": crontab(hour=0, minute=30),
},
"delete-notifications-older-than-retention": {
    "task": "delete-notifications-older-than-retention",
    "schedule": crontab(hour=3, minute=0),
},

Letter Collation

"check-time-to-collate-letters": {
    "task": "check-time-to-collate-letters",
    "schedule": crontab(hour="16,17", minute=50),  # 4:50pm and 5:50pm UTC
    # Task checks if local time is 5:50pm BST before proceeding
},

Weekly/Monthly Tasks

"weekly-dwp-report": {
    "task": "weekly-dwp-report",
    "schedule": crontab(hour=9, minute=0, day_of_week="mon"),
},
"change-dvla-api-key": {
    "task": "change-dvla-api-key",
    "schedule": crontab(hour=9, minute=0, day_of_week="tue", day_of_month="1-7"),
    # First Tuesday of every month
},

Task Design Patterns

Retry Strategy

@notify_celery.task(bind=True, max_retries=48, default_retry_delay=300)
def my_task(self, notification_id):
    try:
        do_work(notification_id)
    except Exception as e:
        if self.request.retries == 0:
            self.retry(queue=QueueNames.RETRY, countdown=0)  # Immediate
        else:
            self.retry(queue=QueueNames.RETRY)  # 300s delay

Error Handling Hierarchy

  1. Non-retryable → Immediate technical-failure
  2. Retryable → Exponential backoff
  3. Max retries → Technical-failure with logging

Idempotency

All tasks check for duplicate processing:
if not get_notification_by_id(notification_id):
    # Already processed, safe to skip
    return

Task Signing

Sensitive data encoded with signing module:
encoded = signing.encode({
    "template": str(template_id),
    "to": recipient,
    "personalisation": {...}
})

Performance Optimizations

Batching

  • Job rows batched (default 32) before task creation
  • Reduces SQS API calls
  • Subdivision if message too large

Queue Separation

  • Delivery tasks separated by channel (SMS/Email/Letter)
  • Prevents head-of-line blocking
  • Independent scaling per queue

Prefetch Multiplier

if os.getenv("CELERYD_PREFETCH_MULTIPLIER"):
    CELERY["worker_prefetch_multiplier"] = os.getenv("CELERYD_PREFETCH_MULTIPLIER")
Set to 1 for long-running tasks to prevent timeout issues.

Monitoring

Tasks emit StatsD metrics:
self.statsd_client.incr(f"clients.{provider}.success")
self.statsd_client.timing(f"clients.{provider}.request-time", elapsed)
Logging with structured context:
current_app.logger.info(
    "SMS notification %(notification_id)s created",
    extra={"notification_id": notification_id}
)
  • app/celery/tasks.py - Job and notification tasks
  • app/celery/provider_tasks.py - Provider delivery tasks
  • app/celery/scheduled_tasks.py - Scheduled/periodic tasks
  • app/celery/nightly_tasks.py - Nightly maintenance tasks
  • app/celery/reporting_tasks.py - Analytics tasks
  • app/config.py - Queue and schedule configuration

Build docs developers (and LLMs) love