Skip to main content

Overview

LatentGEO uses Celery for asynchronous task processing:
  • Audit pipeline: Long-running SEO/GEO analysis
  • PDF generation: Heavy report rendering
  • GEO tools: Keywords, rankings, backlinks, LLM visibility
  • PageSpeed analysis: Performance metrics
  • Article generation: Batch content creation

Architecture

FastAPI → Celery Task Queue → Worker Processes → Database/Storage

           Redis Broker
Redis acts as both the message broker and result backend.

Celery Configuration

Celery App

File: backend/app/workers/celery_app.py
from celery import Celery
from app.core.config import settings

celery_app = Celery(
    "latentgeo",
    broker=settings.CELERY_BROKER,
    backend=settings.CELERY_BACKEND,
    include=["app.workers.tasks"]
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=4000,  # 66 minutes hard limit
    worker_max_tasks_per_child=50,  # Restart worker after 50 tasks
)

Environment Variables

# Redis URLs
REDIS_URL=redis://redis:6379/0
CELERY_BROKER=redis://redis:6379/0
CELERY_BACKEND=redis://redis:6379/1

# Worker concurrency
CELERYD_CONCURRENCY=1
Use separate Redis databases for broker (0) and backend (1) to avoid conflicts.

Task Definitions

Main Audit Task

File: backend/app/workers/tasks.py
from app.workers.celery_app import celery_app
from app.services.audit_service import AuditService
from app.services.pipeline_service import run_initial_audit

@celery_app.task(
    name="run_audit_task",
    bind=True,
    autoretry_for=(Exception,),
    retry_kwargs={"max_retries": 5, "countdown": 60},
    soft_time_limit=3600,  # 60 minutes soft limit
    time_limit=4000,       # 66 minutes hard limit
)
def run_audit_task(self, audit_id: int):
    """
    Main audit pipeline task.
    Runs target audit, competitor analysis, and generates fix plan.
    """
    logger.info(f"Starting audit task for audit_id={audit_id}")
    
    try:
        with get_db_session() as db:
            # Mark as RUNNING
            AuditService.update_audit_progress(
                db=db,
                audit_id=audit_id,
                progress=5,
                status=AuditStatus.RUNNING
            )
            
            audit = AuditService.get_audit(db, audit_id)
            audit_url = str(audit.url)
        
        # Run pipeline (outside DB transaction)
        result = asyncio.run(
            run_initial_audit(
                url=audit_url,
                audit_id=audit_id,
                llm_function=get_llm_function(),
                progress_callback=update_progress,
                generate_report=False,  # Fast mode
                enable_llm_external_intel=True
            )
        )
        
        # Save results
        with get_db_session() as db:
            asyncio.run(
                AuditService.set_audit_results(
                    db=db,
                    audit_id=audit_id,
                    target_audit=result.get("target_audit"),
                    external_intelligence=result.get("external_intelligence"),
                    competitor_audits=result.get("competitor_audits"),
                    fix_plan=result.get("fix_plan"),
                )
            )
            
            AuditService.update_audit_progress(
                db=db,
                audit_id=audit_id,
                progress=100,
                status=AuditStatus.COMPLETED
            )
        
        logger.info(f"Audit {audit_id} completed successfully")
        
    except Exception as e:
        logger.error(f"Error in audit task: {e}", exc_info=True)
        
        with get_db_session() as db:
            AuditService.update_audit_progress(
                db=db,
                audit_id=audit_id,
                progress=0,
                status=AuditStatus.FAILED,
                error_message=str(e)
            )
        raise
Key Features:
  • Automatic retries on failure (5 attempts)
  • Soft/hard time limits
  • Progress tracking with callbacks
  • Database session management
  • Error handling and logging

PDF Generation Task

@celery_app.task(name="generate_pdf_task")
def generate_pdf_task(audit_id: int, report_markdown: str):
    """
    Generate PDF report from markdown content.
    """
    logger.info(f"Starting PDF generation for audit_id={audit_id}")
    
    with get_db_session() as db:
        audit = AuditService.get_audit(db, audit_id)
        
        try:
            pdf_file_path = PDFService.create_from_audit(
                audit=audit,
                markdown_content=report_markdown
            )
            
            ReportService.create_report(
                db=db,
                audit_id=audit_id,
                report_type="PDF",
                file_path=pdf_file_path
            )
            
            logger.info(f"PDF generated: {pdf_file_path}")
        except Exception as e:
            logger.error(f"PDF generation failed: {e}", exc_info=True)
            raise

Full Report Generation Task

@celery_app.task(name="generate_full_report_task")
def generate_full_report_task(audit_id: int):
    """
    Orchestrator task for complete PDF report:
    1. Run PageSpeed if missing
    2. Run GEO Tools if missing
    3. Regenerate report with LLM
    4. Generate PDF
    """
    logger.info(f"Starting full report generation for audit_id={audit_id}")
    
    # Step 1: PageSpeed
    with get_db_session() as db:
        audit = AuditService.get_audit(db, audit_id)
        
        if not audit.pagespeed_data:
            logger.info("Running PageSpeed analysis...")
            pagespeed_data = asyncio.run(
                PageSpeedService.analyze_both_strategies(
                    url=str(audit.url),
                    api_key=settings.GOOGLE_PAGESPEED_API_KEY
                )
            )
            audit.pagespeed_data = pagespeed_data
            db.commit()
    
    # Step 2: GEO Tools
    with get_db_session() as db:
        audit = AuditService.get_audit(db, audit_id)
        
        if not has_geo_data(audit):
            logger.info("Running GEO tools...")
            asyncio.run(run_geo_tools(db, audit_id))
    
    # Step 3: Regenerate Report
    with get_db_session() as db:
        audit = AuditService.get_audit(db, audit_id)
        complete_context = AuditService.get_complete_audit_context(db, audit_id)
        
        new_report_markdown, new_fix_plan = asyncio.run(
            PipelineService.generate_report(
                target_audit=audit.target_audit,
                external_intelligence=audit.external_intelligence,
                pagespeed_data=audit.pagespeed_data,
                keywords_data=complete_context['keywords'],
                backlinks_data=complete_context['backlinks'],
                rank_tracking_data=complete_context['rank_tracking'],
                llm_visibility_data=complete_context['llm_visibility'],
                llm_function=get_llm_function()
            )
        )
        
        audit.report_markdown = new_report_markdown
        audit.fix_plan = new_fix_plan
        db.commit()
    
    # Step 4: Generate PDF
    with get_db_session() as db:
        audit = AuditService.get_audit(db, audit_id)
        pdf_file_path = PDFService.create_from_audit(
            audit=audit,
            markdown_content=audit.report_markdown
        )
        
        ReportService.create_report(
            db=db,
            audit_id=audit_id,
            report_type="PDF",
            file_path=pdf_file_path
        )
    
    logger.info(f"Full report generated for audit {audit_id}")

GEO Analysis Task

@celery_app.task(
    name="run_geo_analysis_task",
    bind=True,
    autoretry_for=(Exception,),
    retry_kwargs={"max_retries": 3, "countdown": 60},
    soft_time_limit=900,  # 15 minutes
    time_limit=1000,
)
def run_geo_analysis_task(self, audit_id: int):
    """
    Run GEO tools: Keywords, Rankings, Backlinks, LLM Visibility.
    """
    logger.info(f"Starting GEO analysis for audit_id={audit_id}")
    
    with get_db_session() as db:
        audit = AuditService.get_audit(db, audit_id)
        domain = urlparse(str(audit.url)).netloc.replace("www.", "")
        brand_name = domain.split(".")[0]
        
        keywords = [brand_name]
        if audit.external_intelligence:
            category = audit.external_intelligence.get("category")
            if category:
                keywords.append(category)
        
        async def run_tools():
            rank_service = RankTrackerService(db)
            backlink_service = BacklinkService(db)
            visibility_service = LLMVisibilityService(db)
            
            await rank_service.track_rankings(audit_id, domain, keywords)
            await backlink_service.analyze_backlinks(audit_id, domain)
            await visibility_service.check_visibility(audit_id, brand_name, keywords)
        
        asyncio.run(run_tools())
        logger.info(f"GEO analysis completed for audit {audit_id}")

Running Workers

Local Development

The worker is automatically started by Docker Compose:
worker:
  build:
    context: .
    dockerfile: Dockerfile.backend.dev
  command: celery -A app.workers.tasks worker --loglevel=info --concurrency=1
  environment:
    DATABASE_URL: ${DATABASE_URL}
    REDIS_URL: redis://redis:6379/0
    CELERY_BROKER: redis://redis:6379/0
    CELERY_BACKEND: redis://redis:6379/1

Manual Worker Start

# Start worker
celery -A app.workers.tasks worker --loglevel=info --concurrency=2

# Start with autoreload (development)
watchmedo auto-restart --directory=./backend/app --pattern=*.py --recursive -- \
  celery -A app.workers.tasks worker --loglevel=info

Production Worker

# Start with multiple workers and prefork pool
celery -A app.workers.tasks worker \
  --loglevel=info \
  --concurrency=4 \
  --max-tasks-per-child=50 \
  --time-limit=4000

Dispatching Tasks

From FastAPI routes:
from app.workers.tasks import run_audit_task

@router.post("/audits/")
async def create_audit(audit_create: AuditCreate, db: Session = Depends(get_db)):
    # Create audit in DB
    audit = AuditService.create_audit(db, audit_create)
    
    # Dispatch Celery task (async)
    run_audit_task.delay(audit.id)
    
    return audit

Monitoring Workers

View Logs

# Docker Compose logs
docker compose -f docker-compose.dev.yml logs -f worker

# Filter for specific audit
docker compose -f docker-compose.dev.yml logs -f worker | grep "audit_id=123"

Celery Flower (Web UI)

Optional monitoring dashboard:
# Install
pip install flower

# Start Flower
celery -A app.workers.tasks flower --port=5555

# Visit http://localhost:5555

Redis CLI

Inspect queue:
# Connect to Redis
docker compose exec redis redis-cli

# Check queue length
LLEN celery

# View pending tasks
LRANGE celery 0 -1

Error Handling

Automatic Retries

@celery_app.task(
    bind=True,
    autoretry_for=(ConnectionError, TimeoutError),
    retry_kwargs={"max_retries": 5, "countdown": 60},
)
def flaky_task(self, audit_id: int):
    try:
        # Task logic
        pass
    except SomeException as exc:
        # Manual retry with exponential backoff
        raise self.retry(exc=exc, countdown=2 ** self.request.retries)

Task Failure Callbacks

@celery_app.task(name="run_audit_task")
def run_audit_task(audit_id: int):
    try:
        # Task logic
        pass
    except Exception as e:
        # Mark audit as failed
        with get_db_session() as db:
            AuditService.update_audit_progress(
                db=db,
                audit_id=audit_id,
                status=AuditStatus.FAILED,
                error_message=str(e)
            )
        raise

Best Practices

Don’t keep DB sessions open for the entire task. Open/close sessions as needed.
# Good
def task():
    with get_db_session() as db:
        data = fetch_data(db)
    
    result = heavy_computation(data)  # Outside session
    
    with get_db_session() as db:
        save_result(db, result)
Use soft_time_limit (graceful) and time_limit (hard kill).
@celery_app.task(
    soft_time_limit=600,   # 10 minutes warning
    time_limit=660,        # 11 minutes hard kill
)
Route heavy tasks to dedicated workers.
celery_app.conf.task_routes = {
    'generate_pdf_task': {'queue': 'heavy'},
    'run_audit_task': {'queue': 'default'},
}
Restart workers periodically to prevent memory leaks.
celery_app.conf.worker_max_tasks_per_child = 50

Troubleshooting

Task Not Starting

  1. Check worker is running:
    docker compose ps worker
    
  2. Check Redis connection:
    docker compose logs redis
    
  3. Verify task is in queue:
    docker compose exec redis redis-cli LLEN celery
    

Task Stuck

  1. Check worker logs:
    docker compose logs -f worker
    
  2. Inspect task state:
    from celery.result import AsyncResult
    result = AsyncResult(task_id)
    print(result.state, result.info)
    
  3. Restart worker:
    docker compose restart worker
    

High Memory Usage

Increase restart frequency:
celery_app.conf.worker_max_tasks_per_child = 25  # Lower value

Next Steps

Backend Services

Learn about services called by workers

Testing

Test Celery tasks

Local Setup

Configure Redis and workers locally

Contributing

Contribute to task definitions

Build docs developers (and LLMs) love