Skip to main content

Background Workers

GAIA uses ARQ (Async Redis Queue) for background task processing and scheduled jobs.

ARQ Overview

ARQ is a fast, Redis-based task queue for Python with:
  • Async/await support: Native asyncio integration
  • Cron scheduling: Built-in scheduled job support
  • Job retry: Automatic retry with exponential backoff
  • Health checks: Monitor worker health
  • Job cancellation: Ability to abort running jobs

Worker Configuration

Worker Settings

from arq.connections import RedisSettings
from app.config.settings import settings

class WorkerSettings:
    """
    ARQ worker settings configuration.
    """
    redis_settings = RedisSettings.from_dsn(settings.REDIS_URL)

    # Task functions
    functions: list[Callable[..., Coroutine[Any, Any, str]]] = []

    # Cron jobs
    cron_jobs: list[Any] = []

    # Lifecycle functions
    on_startup: Optional[Callable[[dict], Coroutine[Any, Any, None]]] = None
    on_shutdown: Optional[Callable[[dict], Coroutine[Any, Any, None]]] = None

    # Performance settings
    max_jobs = 10
    job_timeout = 1800  # 30 minutes
    keep_result = 0  # Don't keep results in Redis
    log_results = True
    health_check_interval = 30  # seconds
    health_check_key = "arq:health"
    allow_abort_jobs = True

Worker Registration

from arq import cron
from app.workers.config.worker_settings import WorkerSettings
from app.workers.lifecycle import shutdown, startup
from app.workers.tasks import (
    check_inactive_users,
    cleanup_expired_reminders,
    cleanup_stuck_personalization,
    execute_workflow_by_id,
    generate_workflow_steps,
    process_gmail_emails_to_memory,
    process_personalization_task,
    process_reminder,
    process_workflow_generation_task,
    store_memories_batch,
    regenerate_workflow_steps,
)

# Configure the worker settings with all task functions
WorkerSettings.functions = [
    process_reminder,
    cleanup_expired_reminders,
    check_inactive_users,
    process_workflow_generation_task,
    execute_workflow_by_id,
    regenerate_workflow_steps,
    generate_workflow_steps,
    process_gmail_emails_to_memory,
    process_personalization_task,
    store_memories_batch,
    cleanup_stuck_personalization,
]

# Configure scheduled cron jobs
WorkerSettings.cron_jobs = [
    cron(
        cleanup_expired_reminders,
        hour=0,  # At midnight
        minute=0,
        second=0,
    ),
    cron(
        check_inactive_users,
        hour=9,  # At 9 AM
        minute=0,
        second=0,
    ),
    cron(
        cleanup_stuck_personalization,
        minute={0, 30},  # Every 30 minutes
        second=0,
    ),
]

# Set lifecycle hooks
WorkerSettings.on_startup = startup
WorkerSettings.on_shutdown = shutdown

Writing Background Tasks

Basic Task

from app.config.loggers import arq_worker_logger as logger

async def process_reminder(ctx: dict, reminder_id: str) -> str:
    """
    Process a reminder task.

    Args:
        ctx: ARQ context (contains redis pool, job info)
        reminder_id: ID of the reminder to process

    Returns:
        Processing result message
    """
    logger.info(f"Processing reminder task: {reminder_id}")

    try:
        await reminder_scheduler.process_task_execution(reminder_id)

        result = f"Successfully processed reminder {reminder_id}"
        logger.info(result)
        return result

    except Exception as e:
        error_msg = f"Failed to process reminder {reminder_id}: {str(e)}"
        logger.error(error_msg)
        raise

Task with Database Access

from app.db.mongodb.collections import reminders_collection
from datetime import datetime, timedelta, timezone

async def cleanup_expired_reminders(ctx: dict) -> str:
    """
    Cleanup expired or completed reminders (scheduled task).

    Args:
        ctx: ARQ context

    Returns:
        Cleanup result message
    """
    logger.info("Running cleanup of expired reminders")

    try:
        cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)

        result = await reminders_collection.delete_many(
            {
                "status": {"$in": ["completed", "cancelled"]},
                "updated_at": {"$lt": cutoff_date},
            }
        )

        message = f"Cleaned up {result.deleted_count} expired reminders"
        logger.info(message)
        return message

    except Exception as e:
        error_msg = f"Failed to cleanup expired reminders: {str(e)}"
        logger.error(error_msg)
        raise

Complex Task with Multiple Operations

from app.services.memory_service import store_memories
from app.services.notification_service import send_notification

async def process_gmail_emails_to_memory(ctx: dict, user_id: str) -> str:
    """
    Process Gmail emails and store as memories.

    Args:
        ctx: ARQ context
        user_id: User ID to process emails for

    Returns:
        Processing result message
    """
    logger.info(f"Processing Gmail emails for user {user_id}")

    try:
        # Fetch emails
        emails = await fetch_recent_emails(user_id, limit=100)
        logger.info(f"Fetched {len(emails)} emails for user {user_id}")

        # Convert to memories
        memories = []
        for email in emails:
            memory = {
                "text": f"Email from {email['from']}: {email['subject']}",
                "metadata": {
                    "source": "gmail",
                    "email_id": email["id"],
                    "timestamp": email["date"],
                }
            }
            memories.append(memory)

        # Store memories
        result = await store_memories(user_id, memories)

        # Send completion notification
        await send_notification(
            user_id=user_id,
            title="Email Processing Complete",
            message=f"Processed {len(emails)} emails"
        )

        message = f"Processed {len(emails)} emails for user {user_id}"
        logger.info(message)
        return message

    except Exception as e:
        error_msg = f"Failed to process Gmail emails: {str(e)}"
        logger.error(error_msg)
        raise

Enqueuing Tasks

From API Endpoints

from fastapi import APIRouter, Depends
from app.db.redis import get_arq_pool
from arq.connections import ArqRedis

router = APIRouter()

@router.post("/reminders/{reminder_id}/process")
async def trigger_reminder_processing(
    reminder_id: str,
    user: dict = Depends(get_current_user),
):
    """
    Trigger reminder processing as background task.
    """
    # Get ARQ Redis pool
    arq: ArqRedis = await get_arq_pool()

    # Enqueue task
    job = await arq.enqueue_job(
        "process_reminder",
        reminder_id,
    )

    return {
        "job_id": job.job_id,
        "reminder_id": reminder_id,
        "status": "queued"
    }

With Scheduled Time

from datetime import datetime, timedelta, timezone

@router.post("/reminders")
async def create_reminder(
    reminder_data: ReminderCreate,
    user: dict = Depends(get_current_user),
):
    """
    Create reminder and schedule processing.
    """
    # Create reminder in database
    reminder = await create_reminder_in_db(user["user_id"], reminder_data)

    # Schedule task for reminder time
    arq: ArqRedis = await get_arq_pool()
    defer_until = reminder_data.remind_at

    job = await arq.enqueue_job(
        "process_reminder",
        reminder["id"],
        _defer_until=defer_until,
    )

    return {
        "reminder": reminder,
        "job_id": job.job_id,
        "scheduled_for": defer_until.isoformat()
    }

With Retry Configuration

@router.post("/workflows/{workflow_id}/execute")
async def execute_workflow(
    workflow_id: str,
    user: dict = Depends(get_current_user),
):
    """
    Execute workflow with retry on failure.
    """
    arq: ArqRedis = await get_arq_pool()

    job = await arq.enqueue_job(
        "execute_workflow_by_id",
        workflow_id,
        user["user_id"],
        _job_try=3,  # Retry up to 3 times
    )

    return {
        "workflow_id": workflow_id,
        "job_id": job.job_id,
        "status": "queued"
    }

Scheduled Jobs (Cron)

Cron Syntax

from arq import cron

# Every day at midnight
cron(cleanup_expired_reminders, hour=0, minute=0, second=0)

# Every day at 9 AM
cron(check_inactive_users, hour=9, minute=0, second=0)

# Every 30 minutes
cron(cleanup_stuck_personalization, minute={0, 30}, second=0)

# Every hour
cron(hourly_task, minute=0)

# Every 5 minutes
cron(frequent_task, minute={0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55})

# Weekdays at 8 AM
cron(weekday_task, hour=8, minute=0, weekday={1, 2, 3, 4, 5})

# First day of month at midnight
cron(monthly_task, day=1, hour=0, minute=0)

Example Scheduled Tasks

from app.db.mongodb.collections import users_collection
from datetime import datetime, timedelta, timezone

async def check_inactive_users(ctx: dict) -> str:
    """
    Check for inactive users and send engagement emails.

    Runs daily at 9 AM.
    """
    logger.info("Checking for inactive users")

    try:
        # Find users inactive for 7 days
        cutoff_date = datetime.now(timezone.utc) - timedelta(days=7)

        inactive_users = await users_collection.find({
            "last_activity": {"$lt": cutoff_date},
            "inactive_email_sent": {"$ne": True}
        }).to_list(None)

        # Send engagement emails
        for user in inactive_users:
            await send_engagement_email(user["email"])

            # Mark as contacted
            await users_collection.update_one(
                {"_id": user["_id"]},
                {"$set": {"inactive_email_sent": True}}
            )

        message = f"Contacted {len(inactive_users)} inactive users"
        logger.info(message)
        return message

    except Exception as e:
        error_msg = f"Failed to check inactive users: {str(e)}"
        logger.error(error_msg)
        raise

Worker Lifecycle

Startup Hook

from app.workers.lifecycle import startup
from app.core.lazy_loader import providers

async def startup(ctx: dict) -> None:
    """
    Worker startup hook.

    Initializes necessary providers and connections.

    Args:
        ctx: ARQ context
    """
    logger.info("ARQ worker starting up")

    try:
        # Initialize required providers
        await providers.aget("mongodb")
        await providers.aget("redis")
        await providers.aget("chroma")

        logger.info("ARQ worker startup complete")

    except Exception as e:
        logger.error(f"Error during worker startup: {e}")
        raise

Shutdown Hook

from app.workers.lifecycle import shutdown

async def shutdown(ctx: dict) -> None:
    """
    Worker shutdown hook.

    Cleanly closes connections and resources.

    Args:
        ctx: ARQ context
    """
    logger.info("ARQ worker shutting down")

    try:
        # Close database connections
        await close_mongodb()
        await close_postgresql()
        await close_redis()

        logger.info("ARQ worker shutdown complete")

    except Exception as e:
        logger.error(f"Error during worker shutdown: {e}")

Running the Worker

Development

# Run worker with hot reload
nx worker api

# Or directly with ARQ
arq app.worker.WorkerSettings

Production

# Run worker in production mode
arq app.worker.WorkerSettings --burst

# Run with specific number of workers
arq app.worker.WorkerSettings --max-jobs 20

Docker

# Dockerfile for worker
FROM python:3.11-slim

WORKDIR /app
COPY . .

RUN pip install -r requirements.txt

CMD ["arq", "app.worker.WorkerSettings"]

Monitoring Workers

Health Checks

from app.db.redis import redis_client
import json

async def get_worker_health() -> dict:
    """
    Get worker health status from Redis.
    """
    health_key = "arq:health"
    health_data = await redis_client.get(health_key)

    if health_data:
        return json.loads(health_data)

    return {"status": "unknown"}

Job Status

from arq.jobs import Job
from app.db.redis import get_arq_pool

async def get_job_status(job_id: str) -> dict:
    """
    Get status of a background job.
    """
    arq = await get_arq_pool()
    job = Job(job_id, arq)

    status = await job.status()
    result = await job.result()

    return {
        "job_id": job_id,
        "status": status,
        "result": result
    }

Best Practices

  1. Idempotent tasks: Tasks should be safe to retry
  2. Timeout handling: Set appropriate timeouts for long-running tasks
  3. Error logging: Log errors with context for debugging
  4. Progress tracking: Store progress in Redis for long tasks
  5. Resource cleanup: Clean up resources in finally blocks
  6. Batch processing: Process items in batches for efficiency
  7. Rate limiting: Respect external API rate limits
  8. Dead letter queue: Handle permanently failed jobs
  9. Monitoring: Track job success/failure rates
  10. Testing: Write tests for task logic separately from ARQ

Example: Batch Processing Task

async def store_memories_batch(ctx: dict, user_id: str, memory_ids: List[str]) -> str:
    """
    Store memories in batches for better performance.

    Args:
        ctx: ARQ context
        user_id: User ID
        memory_ids: List of memory IDs to process

    Returns:
        Processing result message
    """
    logger.info(f"Processing {len(memory_ids)} memories for user {user_id}")

    batch_size = 50
    total_processed = 0
    total_failed = 0

    try:
        # Process in batches
        for i in range(0, len(memory_ids), batch_size):
            batch = memory_ids[i:i + batch_size]

            try:
                # Process batch
                await process_memory_batch(user_id, batch)
                total_processed += len(batch)

                # Update progress in Redis
                progress = (i + len(batch)) / len(memory_ids) * 100
                await ctx["redis"].set(
                    f"memory_batch:{user_id}:progress",
                    f"{progress:.1f}",
                    expire=3600
                )

            except Exception as e:
                logger.error(f"Failed to process batch: {e}")
                total_failed += len(batch)

        message = f"Processed {total_processed}/{len(memory_ids)} memories (failed: {total_failed})"
        logger.info(message)
        return message

    except Exception as e:
        error_msg = f"Failed to process memories: {str(e)}"
        logger.error(error_msg)
        raise

Build docs developers (and LLMs) love