Overview
Energy CMMS uses Celery with Redis as the message broker for handling long-running operations like data imports, document processing, and scheduled maintenance tasks.Architecture
Configuration
Environment-Based Setup
Celery automatically configures based on environment:import os
import sys
# Environment detection
IS_LOCAL = DEBUG and not os.environ.get('COOLIFY_FQDN')
# Celery Broker URL
if IS_LOCAL:
# Development: Local Redis
CELERY_BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'redis://localhost:6379/0')
print(f"[DEBUG] Entorno LOCAL detectado. Redis: {CELERY_BROKER_URL}")
else:
# Production: Redis in Docker/Coolify
CELERY_BROKER_URL = os.environ.get(
'CELERY_BROKER_URL',
'redis://default:saul123@lwcc8sss480ks4oc8gcgw4go:6379/0'
)
print(f"[DEBUG] Entorno PRODUCCION detectado. Redis: {CELERY_BROKER_URL}")
sys.stdout.flush()
# Result Backend
CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'django-db')
CELERY_CACHE_BACKEND = 'django-cache'
# Task Settings
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 minutes max per task
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
# Aggressive timeouts to prevent hangs
CELERY_BROKER_CONNECTION_TIMEOUT = 3 # 3 seconds to connect
CELERY_BROKER_TRANSPORT_OPTIONS = {
'socket_timeout': 3,
'socket_connect_timeout': 3,
'socket_keepalive': True,
}
# Production settings
if not DEBUG:
CELERY_BROKER_CONNECTION_RETRY = True
CELERY_BROKER_CONNECTION_MAX_RETRIES = 10
Redis Cache Configuration
Shared cache for Celery and Django:settings.py - Cache Configuration
import redis
if IS_LOCAL:
try:
# Test Redis connection
redis_url = CELERY_BROKER_URL
r = redis.from_url(redis_url, socket_connect_timeout=1)
r.ping()
# Redis available
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': redis_url,
'OPTIONS': {
'socket_timeout': 5,
'socket_connect_timeout': 5,
'retry_on_timeout': True,
}
}
}
print(f"[DEBUG] Cache: Redis conectado exitosamente en {redis_url}")
except Exception as e:
# Fallback to local memory cache
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'LOCATION': 'unique-snowflake',
}
}
print(f"[DEBUG] Cache: LocMem (Redis no disponible: {e})")
else:
# Production: Redis cache
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': CELERY_BROKER_URL,
'OPTIONS': {
'socket_timeout': 5,
'socket_connect_timeout': 5,
'retry_on_timeout': True,
}
}
}
print(f"[DEBUG] Cache: Redis Produccion ({CELERY_BROKER_URL})")
Task Examples
Asset Import Task
Fromactivos/tasks.py:
activos/tasks.py
from celery import shared_task
from import_export import resources
from django.core.cache import cache
from django.core.files.storage import default_storage
@shared_task(bind=True)
def import_activos_task(self, file_path, file_format, user_id=None,
import_name="Importación sin nombre",
verification_mode=False, dry_run=False):
"""
Task to import assets with progress tracking
"""
from tablib import Dataset
from .models import RegistroImportacion, Activo
from .resources import ActivoResource
from django.contrib.auth.models import User
user = User.objects.get(id=user_id) if user_id else None
cache_key = f"import_activos_progress_{user_id}"
# Create import record
registro = RegistroImportacion.objects.create(
nombre=import_name,
usuario=user,
estado='PROCESANDO'
)
# Read file from storage
with default_storage.open(file_path, 'rb') as f:
file_content = f.read()
dataset = Dataset().load(file_content, format=file_format)
total_rows = len(dataset)
registro.total_rows = total_rows
registro.save()
# Initialize progress
progress_info = {
'current': 0,
'total': total_rows,
'status': 'Procesando...',
'percent': 0,
'new': 0,
'updated': 0,
'errors': 0
}
cache.set(cache_key, progress_info, 3600)
self.update_state(state='PROGRESS', meta=progress_info)
# Import data
resource = ActivoResource()
result = resource.import_data(dataset, dry_run=dry_run, raise_errors=False)
# Collect errors
detailed_errors = []
for error in result.base_errors:
detailed_errors.append(f"Error General: {str(error.error)}")
# Update record
registro.filas_nuevas = result.totals.get('new', 0)
registro.filas_actualizadas = result.totals.get('update', 0)
registro.filas_omitidas = result.totals.get('skip', 0)
registro.filas_error = len(detailed_errors)
registro.estado = 'COMPLETADO'
registro.save()
# Cleanup
if not dry_run and default_storage.exists(file_path):
default_storage.delete(file_path)
final_res = {
'status': 'completed',
'total': total_rows,
'new': result.totals.get('new', 0),
'updated': result.totals.get('update', 0),
'skipped': result.totals.get('skip', 0),
'errors': len(detailed_errors),
'error_list': detailed_errors
}
cache.set(cache_key, final_res, 3600)
return final_res
Document Processing Task
Fromdocumentos/tasks.py:
documentos/tasks.py
import logging
from celery import shared_task
import requests
from django.conf import settings
logger = logging.getLogger(__name__)
@shared_task(name='documentos.tasks.extract_document_metadata')
def extract_document_metadata(revision_id):
"""
Extract text and metadata from document (PDF, etc.)
"""
from .models import Revision
try:
revision = Revision.objects.get(pk=revision_id)
revision.estado_extraccion = 'PROCESANDO'
revision.save()
if not revision.archivo:
revision.estado_extraccion = 'ERROR'
revision.datos_extraidos = {'error': 'No hay archivo asociado'}
revision.save()
return
# Prepare callback URL for n8n
base_callback_url = settings.INTERNAL_SITE_URL
callback_url = f"{base_callback_url}/documentos/api/callback-procesamiento/{revision.id}/"
payload = {
'revision_id': revision.id,
'documento_id': revision.documento.id,
'filename': os.path.basename(revision.archivo.name),
'file_url': revision.archivo.url,
'file_key': revision.archivo.name,
'tipo_documento': revision.documento.tipo_documento.nombre,
'callback_url': callback_url,
'metadatos_requeridos': list(
revision.documento.tipo_documento.metadatos_config.values_list('nombre', flat=True)
)
}
# Send to n8n for processing
n8n_url = settings.N8N_PROCESS_DOCUMENT_WEBHOOK_URL
response = requests.post(n8n_url, json=payload, timeout=10)
logger.info(f"Revision {revision_id}: Enviada a n8n. Status: {response.status_code}")
return True
except Exception as e:
logger.error(f"Error en extract_document_metadata: {str(e)}")
revision.estado_extraccion = 'ERROR'
revision.datos_extraidos = {'error': str(e)}
revision.save()
return False
Maintenance Task (Import Work Orders)
Frommantenimiento/tasks.py:
mantenimiento/tasks.py
from celery import shared_task
@shared_task(bind=True, name='mantenimiento.tasks.import_ordenes_task')
def import_ordenes_task(self, file_path, file_format, user_id=None,
verification_mode=False, dry_run=False,
import_name="Importación OTs"):
"""
Task to import work orders with progress tracking
"""
from tablib import Dataset
from django.core.files.storage import default_storage
from .admin import OrdenTrabajoResource
from django.core.cache import cache
from .models import OrdenTrabajo
from activos.models import RegistroImportacion
from django.contrib.auth.models import User
user = User.objects.get(id=user_id) if user_id else None
cache_key = f"import_ordenes_progress_{user_id}"
# Create import record
if not verification_mode and not dry_run:
registro = RegistroImportacion.objects.create(
nombre=import_name,
tipo='Ordenes Trabajo',
usuario=user,
estado='PROCESANDO'
)
# Read file
with default_storage.open(file_path, 'rb') as f:
file_content = f.read()
dataset = Dataset().load(file_content, format=file_format)
total_rows = len(dataset)
# Initialize resource
resource = OrdenTrabajoResource()
resource.celery_task = self
resource.cache_key = cache_key
resource.total_rows = total_rows
# Set initial progress
progress_info = {
'current': 0,
'total': total_rows,
'status': 'Iniciando importacion...',
'percent': 0
}
cache.set(cache_key, progress_info, 3600)
self.update_state(state='PROGRESS', meta=progress_info)
# Import data
try:
result = resource.import_data(
dataset,
dry_run=dry_run,
raise_errors=False,
use_transactions=True
)
# Collect errors
detailed_errors = []
for error in result.base_errors:
detailed_errors.append(f"Error General: {str(error.error)}")
for line, errors in result.row_errors():
for error in errors:
detailed_errors.append(f"Fila {line}: {str(error.error)}")
# Update record
if not verification_mode and not dry_run:
registro.filas_nuevas = result.totals.get('new', 0)
registro.filas_actualizadas = result.totals.get('update', 0)
registro.filas_omitidas = result.totals.get('skip', 0)
registro.filas_error = len(detailed_errors)
registro.estado = 'COMPLETADO'
if detailed_errors:
registro.detalles_error = "\n".join(detailed_errors[:50])
registro.save()
final_res = {
'status': 'completed',
'total': total_rows,
'new': result.totals.get('new', 0),
'updated': result.totals.get('update', 0),
'skipped': result.totals.get('skip', 0),
'errors': len(detailed_errors),
'error_list': detailed_errors
}
except Exception as e:
error_msg = f"Error crÃtico: {str(e)}"
final_res = {'status': 'error', 'message': error_msg}
# Cleanup file
if not dry_run and default_storage.exists(file_path):
default_storage.delete(file_path)
cache.set(cache_key, final_res, 3600)
return final_res
Periodic Tasks (Celery Beat)
Schedule recurring tasks:settings.py - Beat Schedule
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'sync-document-embeddings-every-minute': {
'task': 'documentos.tasks.sync_document_embeddings',
'schedule': 60.0, # Every 60 seconds
},
'cleanup-old-imports-daily': {
'task': 'activos.tasks.cleanup_old_imports',
'schedule': crontab(hour=2, minute=0), # 2:00 AM daily
},
'generate-maintenance-reports-weekly': {
'task': 'mantenimiento.tasks.generate_weekly_report',
'schedule': crontab(day_of_week=1, hour=8, minute=0), # Monday 8:00 AM
},
}
Running Celery
Development
# Start Celery worker
celery -A energia worker -l info
Production (Docker)
docker-compose.yml
services:
celery_worker:
image: energia-cmms:latest
command: celery -A energia worker -l info
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- DATABASE_URL=postgresql://user:pass@db:5432/energia
depends_on:
- redis
- db
volumes:
- media_data:/app/media
celery_beat:
image: energia-cmms:latest
command: celery -A energia beat -l info
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- DATABASE_URL=postgresql://user:pass@db:5432/energia
depends_on:
- redis
- db
Monitoring Task Progress
Check Task Status
from celery.result import AsyncResult
def check_task_status(task_id):
result = AsyncResult(task_id)
if result.state == 'PENDING':
response = {
'state': result.state,
'status': 'Pending...'
}
elif result.state == 'PROGRESS':
response = {
'state': result.state,
'current': result.info.get('current', 0),
'total': result.info.get('total', 1),
'status': result.info.get('status', ''),
'percent': result.info.get('percent', 0)
}
elif result.state == 'SUCCESS':
response = {
'state': result.state,
'result': result.result
}
else:
# FAILURE or other states
response = {
'state': result.state,
'status': str(result.info)
}
return response
Monitor via Cache
from django.core.cache import cache
def get_import_progress(user_id):
cache_key = f"import_activos_progress_{user_id}"
progress = cache.get(cache_key)
if progress:
return progress
else:
return {'status': 'No import in progress'}
Troubleshooting
Celery worker not connecting to Redis
Celery worker not connecting to Redis
Error:
Error: No connection could be madeSolutions:- Verify Redis is running:
redis-cli ping
# Expected: PONG
- Check connection string:
from django.conf import settings
print(settings.CELERY_BROKER_URL)
- Test connection:
import redis
r = redis.from_url('redis://localhost:6379/0')
r.ping() # Should return True
Tasks hanging indefinitely
Tasks hanging indefinitely
Symptoms:
- Tasks stay in PENDING state
- No error messages
- Check worker is running:
celery -A energia inspect active
- Verify task is registered:
celery -A energia inspect registered
- Check timeouts:
CELERY_TASK_TIME_LIMIT = 30 * 60 # 30 minutes
CELERY_BROKER_CONNECTION_TIMEOUT = 3
Database connection errors in worker
Database connection errors in worker
Error:
OperationalError: connection timed outSolutions:- Verify DATABASE_URL uses correct hostname:
# Bad (for Docker)
DATABASE_URL=postgresql://user:pass@localhost:5432/db
# Good (for Docker)
DATABASE_URL=postgresql://user:pass@db:5432/db
- Increase connection timeout:
DATABASES['default']['OPTIONS'].update({
'connect_timeout': 20,
'keepalives': 1,
'keepalives_idle': 30,
})
Memory leaks in long-running workers
Memory leaks in long-running workers
Solution: Restart workers periodically:
# Max tasks before restart
celery -A energia worker --max-tasks-per-child=1000
# Max memory before restart
celery -A energia worker --max-memory-per-child=200000 # 200MB
Best Practices
Use bind=True
Always use
@shared_task(bind=True) for tasks that need to track progress or retryImplement Timeouts
Set
CELERY_TASK_TIME_LIMIT to prevent runaway tasksClean Up Files
Delete temporary files in
finally blocks to prevent storage leaksUse Idempotent Tasks
Design tasks to be safely retried without side effects
Related Resources
n8n Integration
Trigger n8n workflows from Celery tasks
MinIO Storage
Process files from MinIO in background tasks