Overview
Celery provides distributed task queue functionality for handling asynchronous operations like order processing, product synchronization, notifications, and data indexing. The system uses Redis as both the message broker and result backend.
Configuration
Celery is configured with Redis as the broker and backend:
from libs.celery import app
import settings
# Main Celery application
app = celery.Celery(
'tasks' ,
include = [ 'task_helper' , 'task_helpers' , 'split_helper' ],
broker = f 'redis:// { settings. REDIS_HOST } : { settings. REDIS_PORT } /0' ,
backend = f 'redis:// { settings. REDIS_HOST } : { settings. REDIS_PORT } /0'
)
# Task result expiration
app.conf. CELERY_TASK_RESULT_EXPIRES = CELERY_TASK_RESULT_EXPIRES_IN_SEC
# Visibility timeout for long-running tasks
app.conf.broker_transport_options = {
'visibility_timeout' : settings. CELERY_VISIBILITY_TIMEOUT
}
See libs/celery.py for the complete configuration.
Task Queues
The system uses multiple specialized queues for different task types:
Product and catalog synchronization tasks
CELERY_CANCEL_ORDER_QUEUE
Order cancellation and refund processing
CELERY_ORDER_REFUND_QUEUE
Refund processing and bank transfer tasks
External API callbacks and webhook processing
Facebook pixel and marketing event tracking
Order flagging and customer support alerts
Search index synchronization (Elasticsearch/Dgraph)
HL_AVAILABILITY_LOG_QUEUE
Hyperlocal availability logging
VISITED_PRODUCT_LOGGING_QUEUE
User product visit tracking
Common Tasks
Product Synchronization
Sync Product to Dgraph
from tasks import syncDgraphProduct
# Sync products to graph database
syncDgraphProduct.delay([ 12345 , 67890 ])
This task:
Fetches product data from MySQL
Builds category, tag, and variant relationships
Creates pricelist associations
Inserts/updates nodes in Dgraph
Implementation: See tasks.py:111
Sync Category to Dgraph
from tasks import syncDgraphCategory
# Sync categories and related products
syncDgraphCategory.delay([ 10 , 20 , 30 ])
Implementation: See tasks.py:89
Kinesis Event Streaming
Sync Products to Kinesis
from tasks import sync_kinesis_products, insert_into_kinesis
# Sync product to Kinesis stream
result = sync_kinesis_products.delay([ 12345 ])
data, stream_name, partition_key = result.get()
# Insert into Kinesis with rate limiting
insert_into_kinesis.delay([data, stream_name, partition_key])
Rate Limiting: Configured via KINESIS_INSERT_RATE_LIMIT setting
Implementation: See tasks.py:199 and tasks.py:234
Sync Orders to Kinesis
from tasks import sync_kinesis_orders
# Stream order data to Kinesis
sync_kinesis_orders.delay( order_id = 54321 )
Implementation: See tasks.py:247
Order Processing Tasks
Order Cancellation
from task_helper import process_order_cancellation
# Cancel order and process refunds
process_order_cancellation.apply_async(
args = [order_id],
queue = 'cancel_order_queue'
)
Cancellation tasks handle:
Order status updates
Inventory restoration
Payment refunds (bank transfer, Juspay, Cashgram)
Gift voucher reversals
TSS money/point refunds
Notification triggers
Refund Processing
from tasks import initiate_juspay_refund, initiate_cashgram_refund
# Process Juspay refund
initiate_juspay_refund.apply_async(
args = [order_id, amount, refund_reason],
queue = 'order_refund_queue'
)
# Process Cashgram refund
initiate_cashgram_refund.delay(order_id, amount)
Notification Tasks
Order Delivery Notifications
from controllers.OrderDeliverMailAndSmsController import send_delivered_mail_and_sms
# Send delivery confirmation
send_delivered_mail_and_sms.delay(order_id)
WhatsApp Notifications
from controllers.CommunicationController import whatsapp_msg_trigger
# Send WhatsApp message
whatsapp_msg_trigger.delay(
phone_number = "919876543210" ,
template_name = "order_confirmation" ,
params = { "order_id" : "ORD-123" , "amount" : 2499 }
)
User Attribute Sync
from libs.helper import sync_user_attributes_moengage
# Sync user data to MoEngage
sync_user_attributes_moengage.delay( user_id = 12345 )
Cache Invalidation
from utils.tasks_helper import invalidate_pdp_cache, invalidate_category_cache
# Invalidate product detail page cache
invalidate_pdp_cache.delay( product_id = 12345 )
# Invalidate category cache
invalidate_category_cache.delay( category_id = 10 )
Sitemap Generation
from sitemap import sitemap_main
from tasks import generate_sitemap
# Generate XML sitemaps
generate_sitemap.delay()
Task Monitoring
Flower Dashboard
Flower provides real-time monitoring of Celery workers and tasks:
# Start Flower
celery -A tasks flower --port=5555
# Access dashboard
http://localhost:5555
Flower shows:
Active/scheduled/completed tasks
Worker status and utilization
Task execution times
Failure rates and error messages
Task Status Checking
from celery.result import AsyncResult
# Check task status
task = syncDgraphProduct.delay([ 12345 ])
result = AsyncResult(task.id, app = app)
print ( f "Status: { result.state } " )
print ( f "Result: { result.result } " )
print ( f "Successful: { result.successful() } " )
Error Handling
Task Retries
from libs.celery import app
@app.task ( bind = True , max_retries = 3 )
def sync_with_retry ( self , product_id ):
try :
# Perform sync operation
sync_product_to_api(product_id)
except Exception as exc:
# Retry with exponential backoff
raise self .retry(
exc = exc,
countdown = 2 ** self .request.retries # 2, 4, 8 seconds
)
Error Logging
from utils.logger import get_logger
logger = get_logger()
@app.task
def sync_task ( data ):
try :
process_data(data)
except Exception as e:
logger.error( f "Task failed: { str (e) } " , exc_info = True )
raise
Task Scheduling
Periodic Tasks with Celery Beat
from celery.schedules import crontab
app.conf.beat_schedule = {
'sync-products-every-hour' : {
'task' : 'tasks.sync_all_products' ,
'schedule' : crontab( minute = 0 ), # Every hour
},
'cleanup-expired-sessions' : {
'task' : 'tasks.cleanup_sessions' ,
'schedule' : crontab( hour = 2 , minute = 0 ), # Daily at 2 AM
},
'generate-daily-sitemap' : {
'task' : 'sitemap.sitemap_main' ,
'schedule' : crontab( hour = 3 , minute = 0 ), # Daily at 3 AM
},
}
Start Celery Beat:
celery -A tasks beat --loglevel=info
Task Chunking
from celery import group
# Process large product list in chunks
product_ids = list ( range ( 1 , 10001 )) # 10,000 products
chunk_size = 100
job = group(
syncDgraphProduct.s(product_ids[i:i + chunk_size])
for i in range ( 0 , len (product_ids), chunk_size)
)
result = job.apply_async()
Task Priorities
# High priority task
sync_critical_product.apply_async(
args = [product_id],
priority = 9 # 0-9, higher = more priority
)
# Low priority task
generate_sitemap.apply_async(
priority = 1
)
Rate Limiting
# Limit task execution rate
@app.task ( rate_limit = '10/m' ) # 10 tasks per minute
def call_external_api ( data ):
# API call with rate limiting
pass
# Per-task rate limit from settings
@app.task ( rate_limit = settings. KINESIS_INSERT_RATE_LIMIT )
def insert_into_kinesis ( data ):
pass
Abandon Cart Processing
A separate Celery instance handles abandon cart operations:
from libs.celery import abandon_cart_app
# Separate Redis backend for abandon cart
abandon_cart_app = celery.Celery(
'fast_queue' ,
broker = f "redis:// { ABANDON_CART_REDIS_HOST } : { ABANDON_CART_REDIS_PORT } / { ABANDON_CART_REDIS_DB } " ,
backend = f "redis:// { ABANDON_CART_REDIS_HOST } : { ABANDON_CART_REDIS_PORT } / { ABANDON_CART_REDIS_DB } "
)
See libs/celery.py:47 for configuration.
Deployment
Docker Configuration
The project includes multiple Dockerfile configurations for Celery:
Dockerfile-celery - Standard Celery worker
Dockerfile-celery-AbandonCart - Abandon cart worker
Dockerfile-flower - Flower monitoring
Dockerfile-celery_python310 - Python 3.10 worker
Starting Workers
# Start default worker
celery -A tasks worker --loglevel=info
# Start worker for specific queue
celery -A tasks worker --loglevel=info -Q sync_queue
# Start multiple workers
celery multi start worker1 worker2 -A tasks --loglevel=info
# Start worker with concurrency
celery -A tasks worker --concurrency=4 --loglevel=info
Best Practices
Task Design Principles:
Idempotency - Tasks should produce the same result when executed multiple times
Small Tasks - Break large operations into smaller, manageable tasks
Proper Timeouts - Set appropriate task timeouts to prevent hung tasks
Error Handling - Always handle exceptions and log errors
Result Expiration - Set appropriate result expiration times
Queue Separation - Use separate queues for different task types
Monitoring - Always monitor task execution and failures
Common Pitfalls to Avoid:
Don’t pass large objects as task arguments (use IDs instead)
Don’t create database sessions inside tasks without proper cleanup
Don’t ignore task timeouts for long-running operations
Don’t forget to handle task failures and retries
Don’t mix synchronous and asynchronous task calls incorrectly
Kinesis Integration Event streaming with AWS Kinesis
Webhooks Webhook processing with Celery tasks