Skip to main content

Overview

Celery provides distributed task queue functionality for handling asynchronous operations like order processing, product synchronization, notifications, and data indexing. The system uses Redis as both the message broker and result backend.

Configuration

Celery is configured with Redis as the broker and backend:
from libs.celery import app
import settings

# Main Celery application
app = celery.Celery(
    'tasks',
    include=['task_helper', 'task_helpers', 'split_helper'],
    broker=f'redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/0',
    backend=f'redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/0'
)

# Task result expiration
app.conf.CELERY_TASK_RESULT_EXPIRES = CELERY_TASK_RESULT_EXPIRES_IN_SEC

# Visibility timeout for long-running tasks
app.conf.broker_transport_options = {
    'visibility_timeout': settings.CELERY_VISIBILITY_TIMEOUT
}
See libs/celery.py for the complete configuration.

Task Queues

The system uses multiple specialized queues for different task types:
SYNC_QUEUE
string
Product and catalog synchronization tasks
CELERY_CANCEL_ORDER_QUEUE
string
Order cancellation and refund processing
CELERY_ORDER_REFUND_QUEUE
string
Refund processing and bank transfer tasks
CALLBACK_QUEUE
string
External API callbacks and webhook processing
FB_EVENTS_QUEUE
string
Facebook pixel and marketing event tracking
ORDER_FLAG_QUEUE
string
Order flagging and customer support alerts
SEARCH_SYNC_QUEUE
string
Search index synchronization (Elasticsearch/Dgraph)
HL_AVAILABILITY_LOG_QUEUE
string
Hyperlocal availability logging
VISITED_PRODUCT_LOGGING_QUEUE
string
User product visit tracking

Common Tasks

Product Synchronization

Sync Product to Dgraph

from tasks import syncDgraphProduct

# Sync products to graph database
syncDgraphProduct.delay([12345, 67890])
This task:
  1. Fetches product data from MySQL
  2. Builds category, tag, and variant relationships
  3. Creates pricelist associations
  4. Inserts/updates nodes in Dgraph
Implementation: See tasks.py:111

Sync Category to Dgraph

from tasks import syncDgraphCategory

# Sync categories and related products
syncDgraphCategory.delay([10, 20, 30])
Implementation: See tasks.py:89

Kinesis Event Streaming

Sync Products to Kinesis

from tasks import sync_kinesis_products, insert_into_kinesis

# Sync product to Kinesis stream
result = sync_kinesis_products.delay([12345])
data, stream_name, partition_key = result.get()

# Insert into Kinesis with rate limiting
insert_into_kinesis.delay([data, stream_name, partition_key])
Rate Limiting: Configured via KINESIS_INSERT_RATE_LIMIT setting Implementation: See tasks.py:199 and tasks.py:234

Sync Orders to Kinesis

from tasks import sync_kinesis_orders

# Stream order data to Kinesis
sync_kinesis_orders.delay(order_id=54321)
Implementation: See tasks.py:247

Order Processing Tasks

Order Cancellation

from task_helper import process_order_cancellation

# Cancel order and process refunds
process_order_cancellation.apply_async(
    args=[order_id],
    queue='cancel_order_queue'
)
Cancellation tasks handle:
  • Order status updates
  • Inventory restoration
  • Payment refunds (bank transfer, Juspay, Cashgram)
  • Gift voucher reversals
  • TSS money/point refunds
  • Notification triggers

Refund Processing

from tasks import initiate_juspay_refund, initiate_cashgram_refund

# Process Juspay refund
initiate_juspay_refund.apply_async(
    args=[order_id, amount, refund_reason],
    queue='order_refund_queue'
)

# Process Cashgram refund
initiate_cashgram_refund.delay(order_id, amount)

Notification Tasks

Order Delivery Notifications

from controllers.OrderDeliverMailAndSmsController import send_delivered_mail_and_sms

# Send delivery confirmation
send_delivered_mail_and_sms.delay(order_id)

WhatsApp Notifications

from controllers.CommunicationController import whatsapp_msg_trigger

# Send WhatsApp message
whatsapp_msg_trigger.delay(
    phone_number="919876543210",
    template_name="order_confirmation",
    params={"order_id": "ORD-123", "amount": 2499}
)

User Attribute Sync

from libs.helper import sync_user_attributes_moengage

# Sync user data to MoEngage
sync_user_attributes_moengage.delay(user_id=12345)

Cache Invalidation

from utils.tasks_helper import invalidate_pdp_cache, invalidate_category_cache

# Invalidate product detail page cache
invalidate_pdp_cache.delay(product_id=12345)

# Invalidate category cache
invalidate_category_cache.delay(category_id=10)

Sitemap Generation

from sitemap import sitemap_main
from tasks import generate_sitemap

# Generate XML sitemaps
generate_sitemap.delay()

Task Monitoring

Flower Dashboard

Flower provides real-time monitoring of Celery workers and tasks:
# Start Flower
celery -A tasks flower --port=5555

# Access dashboard
http://localhost:5555
Flower shows:
  • Active/scheduled/completed tasks
  • Worker status and utilization
  • Task execution times
  • Failure rates and error messages

Task Status Checking

from celery.result import AsyncResult

# Check task status
task = syncDgraphProduct.delay([12345])
result = AsyncResult(task.id, app=app)

print(f"Status: {result.state}")
print(f"Result: {result.result}")
print(f"Successful: {result.successful()}")

Error Handling

Task Retries

from libs.celery import app

@app.task(bind=True, max_retries=3)
def sync_with_retry(self, product_id):
    try:
        # Perform sync operation
        sync_product_to_api(product_id)
    except Exception as exc:
        # Retry with exponential backoff
        raise self.retry(
            exc=exc,
            countdown=2 ** self.request.retries  # 2, 4, 8 seconds
        )

Error Logging

from utils.logger import get_logger

logger = get_logger()

@app.task
def sync_task(data):
    try:
        process_data(data)
    except Exception as e:
        logger.error(f"Task failed: {str(e)}", exc_info=True)
        raise

Task Scheduling

Periodic Tasks with Celery Beat

from celery.schedules import crontab

app.conf.beat_schedule = {
    'sync-products-every-hour': {
        'task': 'tasks.sync_all_products',
        'schedule': crontab(minute=0),  # Every hour
    },
    'cleanup-expired-sessions': {
        'task': 'tasks.cleanup_sessions',
        'schedule': crontab(hour=2, minute=0),  # Daily at 2 AM
    },
    'generate-daily-sitemap': {
        'task': 'sitemap.sitemap_main',
        'schedule': crontab(hour=3, minute=0),  # Daily at 3 AM
    },
}
Start Celery Beat:
celery -A tasks beat --loglevel=info

Performance Optimization

Task Chunking

from celery import group

# Process large product list in chunks
product_ids = list(range(1, 10001))  # 10,000 products
chunk_size = 100

job = group(
    syncDgraphProduct.s(product_ids[i:i+chunk_size])
    for i in range(0, len(product_ids), chunk_size)
)
result = job.apply_async()

Task Priorities

# High priority task
sync_critical_product.apply_async(
    args=[product_id],
    priority=9  # 0-9, higher = more priority
)

# Low priority task
generate_sitemap.apply_async(
    priority=1
)

Rate Limiting

# Limit task execution rate
@app.task(rate_limit='10/m')  # 10 tasks per minute
def call_external_api(data):
    # API call with rate limiting
    pass

# Per-task rate limit from settings
@app.task(rate_limit=settings.KINESIS_INSERT_RATE_LIMIT)
def insert_into_kinesis(data):
    pass

Abandon Cart Processing

A separate Celery instance handles abandon cart operations:
from libs.celery import abandon_cart_app

# Separate Redis backend for abandon cart
abandon_cart_app = celery.Celery(
    'fast_queue',
    broker=f"redis://{ABANDON_CART_REDIS_HOST}:{ABANDON_CART_REDIS_PORT}/{ABANDON_CART_REDIS_DB}",
    backend=f"redis://{ABANDON_CART_REDIS_HOST}:{ABANDON_CART_REDIS_PORT}/{ABANDON_CART_REDIS_DB}"
)
See libs/celery.py:47 for configuration.

Deployment

Docker Configuration

The project includes multiple Dockerfile configurations for Celery:
  • Dockerfile-celery - Standard Celery worker
  • Dockerfile-celery-AbandonCart - Abandon cart worker
  • Dockerfile-flower - Flower monitoring
  • Dockerfile-celery_python310 - Python 3.10 worker

Starting Workers

# Start default worker
celery -A tasks worker --loglevel=info

# Start worker for specific queue
celery -A tasks worker --loglevel=info -Q sync_queue

# Start multiple workers
celery multi start worker1 worker2 -A tasks --loglevel=info

# Start worker with concurrency
celery -A tasks worker --concurrency=4 --loglevel=info

Best Practices

Task Design Principles:
  1. Idempotency - Tasks should produce the same result when executed multiple times
  2. Small Tasks - Break large operations into smaller, manageable tasks
  3. Proper Timeouts - Set appropriate task timeouts to prevent hung tasks
  4. Error Handling - Always handle exceptions and log errors
  5. Result Expiration - Set appropriate result expiration times
  6. Queue Separation - Use separate queues for different task types
  7. Monitoring - Always monitor task execution and failures
Common Pitfalls to Avoid:
  • Don’t pass large objects as task arguments (use IDs instead)
  • Don’t create database sessions inside tasks without proper cleanup
  • Don’t ignore task timeouts for long-running operations
  • Don’t forget to handle task failures and retries
  • Don’t mix synchronous and asynchronous task calls incorrectly

Kinesis Integration

Event streaming with AWS Kinesis

Webhooks

Webhook processing with Celery tasks

Build docs developers (and LLMs) love