Background Workers
GAIA uses ARQ (Async Redis Queue) for background task processing and scheduled jobs.ARQ Overview
ARQ is a fast, Redis-based task queue for Python with:- Async/await support: Native asyncio integration
- Cron scheduling: Built-in scheduled job support
- Job retry: Automatic retry with exponential backoff
- Health checks: Monitor worker health
- Job cancellation: Ability to abort running jobs
Worker Configuration
Worker Settings
from arq.connections import RedisSettings
from app.config.settings import settings
class WorkerSettings:
"""
ARQ worker settings configuration.
"""
redis_settings = RedisSettings.from_dsn(settings.REDIS_URL)
# Task functions
functions: list[Callable[..., Coroutine[Any, Any, str]]] = []
# Cron jobs
cron_jobs: list[Any] = []
# Lifecycle functions
on_startup: Optional[Callable[[dict], Coroutine[Any, Any, None]]] = None
on_shutdown: Optional[Callable[[dict], Coroutine[Any, Any, None]]] = None
# Performance settings
max_jobs = 10
job_timeout = 1800 # 30 minutes
keep_result = 0 # Don't keep results in Redis
log_results = True
health_check_interval = 30 # seconds
health_check_key = "arq:health"
allow_abort_jobs = True
Worker Registration
from arq import cron
from app.workers.config.worker_settings import WorkerSettings
from app.workers.lifecycle import shutdown, startup
from app.workers.tasks import (
check_inactive_users,
cleanup_expired_reminders,
cleanup_stuck_personalization,
execute_workflow_by_id,
generate_workflow_steps,
process_gmail_emails_to_memory,
process_personalization_task,
process_reminder,
process_workflow_generation_task,
store_memories_batch,
regenerate_workflow_steps,
)
# Configure the worker settings with all task functions
WorkerSettings.functions = [
process_reminder,
cleanup_expired_reminders,
check_inactive_users,
process_workflow_generation_task,
execute_workflow_by_id,
regenerate_workflow_steps,
generate_workflow_steps,
process_gmail_emails_to_memory,
process_personalization_task,
store_memories_batch,
cleanup_stuck_personalization,
]
# Configure scheduled cron jobs
WorkerSettings.cron_jobs = [
cron(
cleanup_expired_reminders,
hour=0, # At midnight
minute=0,
second=0,
),
cron(
check_inactive_users,
hour=9, # At 9 AM
minute=0,
second=0,
),
cron(
cleanup_stuck_personalization,
minute={0, 30}, # Every 30 minutes
second=0,
),
]
# Set lifecycle hooks
WorkerSettings.on_startup = startup
WorkerSettings.on_shutdown = shutdown
Writing Background Tasks
Basic Task
from app.config.loggers import arq_worker_logger as logger
async def process_reminder(ctx: dict, reminder_id: str) -> str:
"""
Process a reminder task.
Args:
ctx: ARQ context (contains redis pool, job info)
reminder_id: ID of the reminder to process
Returns:
Processing result message
"""
logger.info(f"Processing reminder task: {reminder_id}")
try:
await reminder_scheduler.process_task_execution(reminder_id)
result = f"Successfully processed reminder {reminder_id}"
logger.info(result)
return result
except Exception as e:
error_msg = f"Failed to process reminder {reminder_id}: {str(e)}"
logger.error(error_msg)
raise
Task with Database Access
from app.db.mongodb.collections import reminders_collection
from datetime import datetime, timedelta, timezone
async def cleanup_expired_reminders(ctx: dict) -> str:
"""
Cleanup expired or completed reminders (scheduled task).
Args:
ctx: ARQ context
Returns:
Cleanup result message
"""
logger.info("Running cleanup of expired reminders")
try:
cutoff_date = datetime.now(timezone.utc) - timedelta(days=30)
result = await reminders_collection.delete_many(
{
"status": {"$in": ["completed", "cancelled"]},
"updated_at": {"$lt": cutoff_date},
}
)
message = f"Cleaned up {result.deleted_count} expired reminders"
logger.info(message)
return message
except Exception as e:
error_msg = f"Failed to cleanup expired reminders: {str(e)}"
logger.error(error_msg)
raise
Complex Task with Multiple Operations
from app.services.memory_service import store_memories
from app.services.notification_service import send_notification
async def process_gmail_emails_to_memory(ctx: dict, user_id: str) -> str:
"""
Process Gmail emails and store as memories.
Args:
ctx: ARQ context
user_id: User ID to process emails for
Returns:
Processing result message
"""
logger.info(f"Processing Gmail emails for user {user_id}")
try:
# Fetch emails
emails = await fetch_recent_emails(user_id, limit=100)
logger.info(f"Fetched {len(emails)} emails for user {user_id}")
# Convert to memories
memories = []
for email in emails:
memory = {
"text": f"Email from {email['from']}: {email['subject']}",
"metadata": {
"source": "gmail",
"email_id": email["id"],
"timestamp": email["date"],
}
}
memories.append(memory)
# Store memories
result = await store_memories(user_id, memories)
# Send completion notification
await send_notification(
user_id=user_id,
title="Email Processing Complete",
message=f"Processed {len(emails)} emails"
)
message = f"Processed {len(emails)} emails for user {user_id}"
logger.info(message)
return message
except Exception as e:
error_msg = f"Failed to process Gmail emails: {str(e)}"
logger.error(error_msg)
raise
Enqueuing Tasks
From API Endpoints
from fastapi import APIRouter, Depends
from app.db.redis import get_arq_pool
from arq.connections import ArqRedis
router = APIRouter()
@router.post("/reminders/{reminder_id}/process")
async def trigger_reminder_processing(
reminder_id: str,
user: dict = Depends(get_current_user),
):
"""
Trigger reminder processing as background task.
"""
# Get ARQ Redis pool
arq: ArqRedis = await get_arq_pool()
# Enqueue task
job = await arq.enqueue_job(
"process_reminder",
reminder_id,
)
return {
"job_id": job.job_id,
"reminder_id": reminder_id,
"status": "queued"
}
With Scheduled Time
from datetime import datetime, timedelta, timezone
@router.post("/reminders")
async def create_reminder(
reminder_data: ReminderCreate,
user: dict = Depends(get_current_user),
):
"""
Create reminder and schedule processing.
"""
# Create reminder in database
reminder = await create_reminder_in_db(user["user_id"], reminder_data)
# Schedule task for reminder time
arq: ArqRedis = await get_arq_pool()
defer_until = reminder_data.remind_at
job = await arq.enqueue_job(
"process_reminder",
reminder["id"],
_defer_until=defer_until,
)
return {
"reminder": reminder,
"job_id": job.job_id,
"scheduled_for": defer_until.isoformat()
}
With Retry Configuration
@router.post("/workflows/{workflow_id}/execute")
async def execute_workflow(
workflow_id: str,
user: dict = Depends(get_current_user),
):
"""
Execute workflow with retry on failure.
"""
arq: ArqRedis = await get_arq_pool()
job = await arq.enqueue_job(
"execute_workflow_by_id",
workflow_id,
user["user_id"],
_job_try=3, # Retry up to 3 times
)
return {
"workflow_id": workflow_id,
"job_id": job.job_id,
"status": "queued"
}
Scheduled Jobs (Cron)
Cron Syntax
from arq import cron
# Every day at midnight
cron(cleanup_expired_reminders, hour=0, minute=0, second=0)
# Every day at 9 AM
cron(check_inactive_users, hour=9, minute=0, second=0)
# Every 30 minutes
cron(cleanup_stuck_personalization, minute={0, 30}, second=0)
# Every hour
cron(hourly_task, minute=0)
# Every 5 minutes
cron(frequent_task, minute={0, 5, 10, 15, 20, 25, 30, 35, 40, 45, 50, 55})
# Weekdays at 8 AM
cron(weekday_task, hour=8, minute=0, weekday={1, 2, 3, 4, 5})
# First day of month at midnight
cron(monthly_task, day=1, hour=0, minute=0)
Example Scheduled Tasks
from app.db.mongodb.collections import users_collection
from datetime import datetime, timedelta, timezone
async def check_inactive_users(ctx: dict) -> str:
"""
Check for inactive users and send engagement emails.
Runs daily at 9 AM.
"""
logger.info("Checking for inactive users")
try:
# Find users inactive for 7 days
cutoff_date = datetime.now(timezone.utc) - timedelta(days=7)
inactive_users = await users_collection.find({
"last_activity": {"$lt": cutoff_date},
"inactive_email_sent": {"$ne": True}
}).to_list(None)
# Send engagement emails
for user in inactive_users:
await send_engagement_email(user["email"])
# Mark as contacted
await users_collection.update_one(
{"_id": user["_id"]},
{"$set": {"inactive_email_sent": True}}
)
message = f"Contacted {len(inactive_users)} inactive users"
logger.info(message)
return message
except Exception as e:
error_msg = f"Failed to check inactive users: {str(e)}"
logger.error(error_msg)
raise
Worker Lifecycle
Startup Hook
from app.workers.lifecycle import startup
from app.core.lazy_loader import providers
async def startup(ctx: dict) -> None:
"""
Worker startup hook.
Initializes necessary providers and connections.
Args:
ctx: ARQ context
"""
logger.info("ARQ worker starting up")
try:
# Initialize required providers
await providers.aget("mongodb")
await providers.aget("redis")
await providers.aget("chroma")
logger.info("ARQ worker startup complete")
except Exception as e:
logger.error(f"Error during worker startup: {e}")
raise
Shutdown Hook
from app.workers.lifecycle import shutdown
async def shutdown(ctx: dict) -> None:
"""
Worker shutdown hook.
Cleanly closes connections and resources.
Args:
ctx: ARQ context
"""
logger.info("ARQ worker shutting down")
try:
# Close database connections
await close_mongodb()
await close_postgresql()
await close_redis()
logger.info("ARQ worker shutdown complete")
except Exception as e:
logger.error(f"Error during worker shutdown: {e}")
Running the Worker
Development
# Run worker with hot reload
nx worker api
# Or directly with ARQ
arq app.worker.WorkerSettings
Production
# Run worker in production mode
arq app.worker.WorkerSettings --burst
# Run with specific number of workers
arq app.worker.WorkerSettings --max-jobs 20
Docker
# Dockerfile for worker
FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
CMD ["arq", "app.worker.WorkerSettings"]
Monitoring Workers
Health Checks
from app.db.redis import redis_client
import json
async def get_worker_health() -> dict:
"""
Get worker health status from Redis.
"""
health_key = "arq:health"
health_data = await redis_client.get(health_key)
if health_data:
return json.loads(health_data)
return {"status": "unknown"}
Job Status
from arq.jobs import Job
from app.db.redis import get_arq_pool
async def get_job_status(job_id: str) -> dict:
"""
Get status of a background job.
"""
arq = await get_arq_pool()
job = Job(job_id, arq)
status = await job.status()
result = await job.result()
return {
"job_id": job_id,
"status": status,
"result": result
}
Best Practices
- Idempotent tasks: Tasks should be safe to retry
- Timeout handling: Set appropriate timeouts for long-running tasks
- Error logging: Log errors with context for debugging
- Progress tracking: Store progress in Redis for long tasks
- Resource cleanup: Clean up resources in finally blocks
- Batch processing: Process items in batches for efficiency
- Rate limiting: Respect external API rate limits
- Dead letter queue: Handle permanently failed jobs
- Monitoring: Track job success/failure rates
- Testing: Write tests for task logic separately from ARQ
Example: Batch Processing Task
async def store_memories_batch(ctx: dict, user_id: str, memory_ids: List[str]) -> str:
"""
Store memories in batches for better performance.
Args:
ctx: ARQ context
user_id: User ID
memory_ids: List of memory IDs to process
Returns:
Processing result message
"""
logger.info(f"Processing {len(memory_ids)} memories for user {user_id}")
batch_size = 50
total_processed = 0
total_failed = 0
try:
# Process in batches
for i in range(0, len(memory_ids), batch_size):
batch = memory_ids[i:i + batch_size]
try:
# Process batch
await process_memory_batch(user_id, batch)
total_processed += len(batch)
# Update progress in Redis
progress = (i + len(batch)) / len(memory_ids) * 100
await ctx["redis"].set(
f"memory_batch:{user_id}:progress",
f"{progress:.1f}",
expire=3600
)
except Exception as e:
logger.error(f"Failed to process batch: {e}")
total_failed += len(batch)
message = f"Processed {total_processed}/{len(memory_ids)} memories (failed: {total_failed})"
logger.info(message)
return message
except Exception as e:
error_msg = f"Failed to process memories: {str(e)}"
logger.error(error_msg)
raise