Overview
LatentGEO uses Celery for asynchronous task processing:- Audit pipeline: Long-running SEO/GEO analysis
- PDF generation: Heavy report rendering
- GEO tools: Keywords, rankings, backlinks, LLM visibility
- PageSpeed analysis: Performance metrics
- Article generation: Batch content creation
Architecture
FastAPI → Celery Task Queue → Worker Processes → Database/Storage
↓
Redis Broker
Redis acts as both the message broker and result backend.
Celery Configuration
Celery App
File:backend/app/workers/celery_app.py
from celery import Celery
from app.core.config import settings
celery_app = Celery(
"latentgeo",
broker=settings.CELERY_BROKER,
backend=settings.CELERY_BACKEND,
include=["app.workers.tasks"]
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=4000, # 66 minutes hard limit
worker_max_tasks_per_child=50, # Restart worker after 50 tasks
)
Environment Variables
# Redis URLs
REDIS_URL=redis://redis:6379/0
CELERY_BROKER=redis://redis:6379/0
CELERY_BACKEND=redis://redis:6379/1
# Worker concurrency
CELERYD_CONCURRENCY=1
Use separate Redis databases for broker (0) and backend (1) to avoid conflicts.
Task Definitions
Main Audit Task
File:backend/app/workers/tasks.py
from app.workers.celery_app import celery_app
from app.services.audit_service import AuditService
from app.services.pipeline_service import run_initial_audit
@celery_app.task(
name="run_audit_task",
bind=True,
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 5, "countdown": 60},
soft_time_limit=3600, # 60 minutes soft limit
time_limit=4000, # 66 minutes hard limit
)
def run_audit_task(self, audit_id: int):
"""
Main audit pipeline task.
Runs target audit, competitor analysis, and generates fix plan.
"""
logger.info(f"Starting audit task for audit_id={audit_id}")
try:
with get_db_session() as db:
# Mark as RUNNING
AuditService.update_audit_progress(
db=db,
audit_id=audit_id,
progress=5,
status=AuditStatus.RUNNING
)
audit = AuditService.get_audit(db, audit_id)
audit_url = str(audit.url)
# Run pipeline (outside DB transaction)
result = asyncio.run(
run_initial_audit(
url=audit_url,
audit_id=audit_id,
llm_function=get_llm_function(),
progress_callback=update_progress,
generate_report=False, # Fast mode
enable_llm_external_intel=True
)
)
# Save results
with get_db_session() as db:
asyncio.run(
AuditService.set_audit_results(
db=db,
audit_id=audit_id,
target_audit=result.get("target_audit"),
external_intelligence=result.get("external_intelligence"),
competitor_audits=result.get("competitor_audits"),
fix_plan=result.get("fix_plan"),
)
)
AuditService.update_audit_progress(
db=db,
audit_id=audit_id,
progress=100,
status=AuditStatus.COMPLETED
)
logger.info(f"Audit {audit_id} completed successfully")
except Exception as e:
logger.error(f"Error in audit task: {e}", exc_info=True)
with get_db_session() as db:
AuditService.update_audit_progress(
db=db,
audit_id=audit_id,
progress=0,
status=AuditStatus.FAILED,
error_message=str(e)
)
raise
Key Features:
- Automatic retries on failure (5 attempts)
- Soft/hard time limits
- Progress tracking with callbacks
- Database session management
- Error handling and logging
PDF Generation Task
@celery_app.task(name="generate_pdf_task")
def generate_pdf_task(audit_id: int, report_markdown: str):
"""
Generate PDF report from markdown content.
"""
logger.info(f"Starting PDF generation for audit_id={audit_id}")
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
try:
pdf_file_path = PDFService.create_from_audit(
audit=audit,
markdown_content=report_markdown
)
ReportService.create_report(
db=db,
audit_id=audit_id,
report_type="PDF",
file_path=pdf_file_path
)
logger.info(f"PDF generated: {pdf_file_path}")
except Exception as e:
logger.error(f"PDF generation failed: {e}", exc_info=True)
raise
Full Report Generation Task
@celery_app.task(name="generate_full_report_task")
def generate_full_report_task(audit_id: int):
"""
Orchestrator task for complete PDF report:
1. Run PageSpeed if missing
2. Run GEO Tools if missing
3. Regenerate report with LLM
4. Generate PDF
"""
logger.info(f"Starting full report generation for audit_id={audit_id}")
# Step 1: PageSpeed
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
if not audit.pagespeed_data:
logger.info("Running PageSpeed analysis...")
pagespeed_data = asyncio.run(
PageSpeedService.analyze_both_strategies(
url=str(audit.url),
api_key=settings.GOOGLE_PAGESPEED_API_KEY
)
)
audit.pagespeed_data = pagespeed_data
db.commit()
# Step 2: GEO Tools
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
if not has_geo_data(audit):
logger.info("Running GEO tools...")
asyncio.run(run_geo_tools(db, audit_id))
# Step 3: Regenerate Report
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
complete_context = AuditService.get_complete_audit_context(db, audit_id)
new_report_markdown, new_fix_plan = asyncio.run(
PipelineService.generate_report(
target_audit=audit.target_audit,
external_intelligence=audit.external_intelligence,
pagespeed_data=audit.pagespeed_data,
keywords_data=complete_context['keywords'],
backlinks_data=complete_context['backlinks'],
rank_tracking_data=complete_context['rank_tracking'],
llm_visibility_data=complete_context['llm_visibility'],
llm_function=get_llm_function()
)
)
audit.report_markdown = new_report_markdown
audit.fix_plan = new_fix_plan
db.commit()
# Step 4: Generate PDF
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
pdf_file_path = PDFService.create_from_audit(
audit=audit,
markdown_content=audit.report_markdown
)
ReportService.create_report(
db=db,
audit_id=audit_id,
report_type="PDF",
file_path=pdf_file_path
)
logger.info(f"Full report generated for audit {audit_id}")
GEO Analysis Task
@celery_app.task(
name="run_geo_analysis_task",
bind=True,
autoretry_for=(Exception,),
retry_kwargs={"max_retries": 3, "countdown": 60},
soft_time_limit=900, # 15 minutes
time_limit=1000,
)
def run_geo_analysis_task(self, audit_id: int):
"""
Run GEO tools: Keywords, Rankings, Backlinks, LLM Visibility.
"""
logger.info(f"Starting GEO analysis for audit_id={audit_id}")
with get_db_session() as db:
audit = AuditService.get_audit(db, audit_id)
domain = urlparse(str(audit.url)).netloc.replace("www.", "")
brand_name = domain.split(".")[0]
keywords = [brand_name]
if audit.external_intelligence:
category = audit.external_intelligence.get("category")
if category:
keywords.append(category)
async def run_tools():
rank_service = RankTrackerService(db)
backlink_service = BacklinkService(db)
visibility_service = LLMVisibilityService(db)
await rank_service.track_rankings(audit_id, domain, keywords)
await backlink_service.analyze_backlinks(audit_id, domain)
await visibility_service.check_visibility(audit_id, brand_name, keywords)
asyncio.run(run_tools())
logger.info(f"GEO analysis completed for audit {audit_id}")
Running Workers
Local Development
The worker is automatically started by Docker Compose:worker:
build:
context: .
dockerfile: Dockerfile.backend.dev
command: celery -A app.workers.tasks worker --loglevel=info --concurrency=1
environment:
DATABASE_URL: ${DATABASE_URL}
REDIS_URL: redis://redis:6379/0
CELERY_BROKER: redis://redis:6379/0
CELERY_BACKEND: redis://redis:6379/1
Manual Worker Start
# Start worker
celery -A app.workers.tasks worker --loglevel=info --concurrency=2
# Start with autoreload (development)
watchmedo auto-restart --directory=./backend/app --pattern=*.py --recursive -- \
celery -A app.workers.tasks worker --loglevel=info
Production Worker
# Start with multiple workers and prefork pool
celery -A app.workers.tasks worker \
--loglevel=info \
--concurrency=4 \
--max-tasks-per-child=50 \
--time-limit=4000
Dispatching Tasks
From FastAPI routes:from app.workers.tasks import run_audit_task
@router.post("/audits/")
async def create_audit(audit_create: AuditCreate, db: Session = Depends(get_db)):
# Create audit in DB
audit = AuditService.create_audit(db, audit_create)
# Dispatch Celery task (async)
run_audit_task.delay(audit.id)
return audit
Monitoring Workers
View Logs
# Docker Compose logs
docker compose -f docker-compose.dev.yml logs -f worker
# Filter for specific audit
docker compose -f docker-compose.dev.yml logs -f worker | grep "audit_id=123"
Celery Flower (Web UI)
Optional monitoring dashboard:# Install
pip install flower
# Start Flower
celery -A app.workers.tasks flower --port=5555
# Visit http://localhost:5555
Redis CLI
Inspect queue:# Connect to Redis
docker compose exec redis redis-cli
# Check queue length
LLEN celery
# View pending tasks
LRANGE celery 0 -1
Error Handling
Automatic Retries
@celery_app.task(
bind=True,
autoretry_for=(ConnectionError, TimeoutError),
retry_kwargs={"max_retries": 5, "countdown": 60},
)
def flaky_task(self, audit_id: int):
try:
# Task logic
pass
except SomeException as exc:
# Manual retry with exponential backoff
raise self.retry(exc=exc, countdown=2 ** self.request.retries)
Task Failure Callbacks
@celery_app.task(name="run_audit_task")
def run_audit_task(audit_id: int):
try:
# Task logic
pass
except Exception as e:
# Mark audit as failed
with get_db_session() as db:
AuditService.update_audit_progress(
db=db,
audit_id=audit_id,
status=AuditStatus.FAILED,
error_message=str(e)
)
raise
Best Practices
Use database sessions carefully
Use database sessions carefully
Don’t keep DB sessions open for the entire task. Open/close sessions as needed.
# Good
def task():
with get_db_session() as db:
data = fetch_data(db)
result = heavy_computation(data) # Outside session
with get_db_session() as db:
save_result(db, result)
Set appropriate time limits
Set appropriate time limits
Use
soft_time_limit (graceful) and time_limit (hard kill).@celery_app.task(
soft_time_limit=600, # 10 minutes warning
time_limit=660, # 11 minutes hard kill
)
Use task routing
Use task routing
Route heavy tasks to dedicated workers.
celery_app.conf.task_routes = {
'generate_pdf_task': {'queue': 'heavy'},
'run_audit_task': {'queue': 'default'},
}
Monitor memory usage
Monitor memory usage
Restart workers periodically to prevent memory leaks.
celery_app.conf.worker_max_tasks_per_child = 50
Troubleshooting
Task Not Starting
-
Check worker is running:
docker compose ps worker -
Check Redis connection:
docker compose logs redis -
Verify task is in queue:
docker compose exec redis redis-cli LLEN celery
Task Stuck
-
Check worker logs:
docker compose logs -f worker -
Inspect task state:
from celery.result import AsyncResult result = AsyncResult(task_id) print(result.state, result.info) -
Restart worker:
docker compose restart worker
High Memory Usage
Increase restart frequency:celery_app.conf.worker_max_tasks_per_child = 25 # Lower value
Next Steps
Backend Services
Learn about services called by workers
Testing
Test Celery tasks
Local Setup
Configure Redis and workers locally
Contributing
Contribute to task definitions