Skip to main content
Background tasks allow you to execute code after returning a response to the client. This is useful for operations that don’t need to complete before sending a response, such as sending emails, processing data, or updating caches.

Basic usage

Use the BackgroundTasks dependency to schedule tasks that run after the response is sent:
from fastrapi import FastrAPI, BackgroundTasks

app = FastrAPI()

def send_email(email: str, message: str):
    print(f"Sending email to {email}: {message}")
    # Email sending logic here

@app.post("/send")
def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(send_email, email, "Welcome to our service!")
    return {"message": "Email will be sent in the background"}

if __name__ == "__main__":
    app.serve("127.0.0.1", 8080)
The response is sent immediately to the client, and the background task executes afterward. The client doesn’t wait for the task to complete.

Adding tasks with arguments

Pass arguments to background tasks by including them after the function:
from fastrapi import FastrAPI, BackgroundTasks
import time

app = FastrAPI()

def process_data(item_id: int, priority: str):
    print(f"Processing item {item_id} with priority {priority}")
    time.sleep(2)  # Simulate processing
    print(f"Finished processing item {item_id}")

@app.post("/process/{item_id}")
def process_item(item_id: int, background_tasks: BackgroundTasks):
    background_tasks.add_task(process_data, item_id, "high")
    return {"status": "accepted", "item_id": item_id}

Multiple background tasks

Schedule multiple tasks for a single request. They execute in the order they’re added:
from fastrapi import FastrAPI, BackgroundTasks

app = FastrAPI()

def log_activity(user_id: int, action: str):
    print(f"User {user_id} performed: {action}")

def update_analytics(action: str):
    print(f"Analytics updated for action: {action}")

def send_notification(user_id: int):
    print(f"Notification sent to user {user_id}")

@app.post("/action")
def perform_action(user_id: int, action: str, background_tasks: BackgroundTasks):
    # Add multiple background tasks
    background_tasks.add_task(log_activity, user_id, action)
    background_tasks.add_task(update_analytics, action)
    background_tasks.add_task(send_notification, user_id)
    
    return {"status": "success", "action": action}
Tasks execute sequentially in the order they’re added. For truly parallel execution, consider using a task queue like Celery or RQ.

Real-world examples

Email notifications

from fastrapi import FastrAPI, BackgroundTasks
from pydantic import BaseModel, EmailStr

app = FastrAPI()

class User(BaseModel):
    email: EmailStr
    name: str

def send_welcome_email(email: str, name: str):
    # Simulate email sending
    print(f"Sending welcome email to {email}")
    # import smtplib
    # ... actual email logic

@app.post("/register")
def register_user(user: User, background_tasks: BackgroundTasks):
    # Register user in database (fast operation)
    # ...
    
    # Send welcome email in background (slow operation)
    background_tasks.add_task(send_welcome_email, user.email, user.name)
    
    return {"message": f"User {user.name} registered successfully"}

File processing

from fastrapi import FastrAPI, BackgroundTasks
import time

app = FastrAPI()

def process_uploaded_file(filename: str):
    print(f"Starting to process {filename}")
    time.sleep(5)  # Simulate heavy processing
    print(f"Finished processing {filename}")

@app.post("/upload")
def upload_file(filename: str, background_tasks: BackgroundTasks):
    # Save file (fast)
    # ...
    
    # Process in background (slow)
    background_tasks.add_task(process_uploaded_file, filename)
    
    return {
        "status": "accepted",
        "message": "File uploaded and processing started"
    }

Cache warming

from fastrapi import FastrAPI, BackgroundTasks

app = FastrAPI()
cache = {}

def warm_cache(user_id: int):
    print(f"Warming cache for user {user_id}")
    # Fetch and cache user data
    cache[user_id] = {"name": "John", "preferences": {...}}

@app.post("/users/{user_id}/settings")
def update_settings(user_id: int, settings: dict, background_tasks: BackgroundTasks):
    # Update settings in database
    # ...
    
    # Refresh cache in background
    background_tasks.add_task(warm_cache, user_id)
    
    return {"message": "Settings updated"}

Logging and analytics

from fastrapi import FastrAPI, BackgroundTasks
from datetime import datetime

app = FastrAPI()

def log_to_database(user_id: int, endpoint: str, timestamp: datetime):
    print(f"Logging: User {user_id} accessed {endpoint} at {timestamp}")
    # Write to database or logging service

@app.get("/api/data")
def get_data(user_id: int, background_tasks: BackgroundTasks):
    # Log the access in background
    background_tasks.add_task(
        log_to_database,
        user_id,
        "/api/data",
        datetime.now()
    )
    
    return {"data": [1, 2, 3, 4, 5]}

How background tasks work

FastrAPI’s background task system is implemented in Rust for efficient execution:
1

Task registration

When you call background_tasks.add_task(), the task is stored in a thread-safe vector wrapped in an Arc<Mutex> (src/background.rs:22-29).
2

Response sent

Your route handler returns, and FastrAPI sends the response to the client immediately.
3

Task execution

After the response is sent, FastrAPI spawns Tokio tasks for each background task using tokio::task::spawn_blocking() (src/background.rs:43).
4

Python execution

Each task runs in its own blocking thread with the Python GIL, executing your Python function with the provided arguments (src/background.rs:44-58).

Error handling

Background task errors don’t affect the response since it’s already sent. Handle errors within your tasks:
from fastrapi import FastrAPI, BackgroundTasks
import logging

app = FastrAPI()
logger = logging.getLogger(__name__)

def risky_operation(data: str):
    try:
        # Perform operation that might fail
        if not data:
            raise ValueError("Data cannot be empty")
        
        # Process data
        print(f"Processing: {data}")
        
    except Exception as e:
        logger.error(f"Background task failed: {e}")
        # Handle error: retry, save to dead letter queue, etc.

@app.post("/task")
def create_task(data: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(risky_operation, data)
    return {"status": "accepted"}
Background task errors are logged to stderr but don’t raise exceptions in your main application. Always implement error handling within your background tasks.

Limitations

Background tasks can’t return values to the client since the response is already sent. If you need results, consider:
  • Writing to a database
  • Using a message queue
  • Implementing webhooks
Tasks run sequentially in the order added. For parallel execution, use:
  • Multiple background task calls
  • External task queues (Celery, RQ)
  • Async operations within tasks
Tasks must complete before the application shuts down. For long-running tasks, use a dedicated task queue system.
Background tasks don’t automatically retry on failure. Implement retry logic yourself or use a task queue with retry support.

When to use background tasks

Use background tasks

  • Sending emails/notifications
  • Logging and analytics
  • Cache updates
  • Quick file processing
  • Non-critical operations

Use task queues instead

  • Long-running jobs (>30s)
  • Critical operations needing retry
  • Jobs requiring monitoring
  • Distributed processing
  • Scheduled tasks

Best practices

Background tasks should complete quickly (under 30 seconds). For longer operations, use a dedicated task queue:
# Good: Quick operation
def send_email(email: str):
    # Takes 1-2 seconds
    pass

# Bad: Long operation
def process_video(video_path: str):
    # Takes 5 minutes - use Celery instead
    pass
Always wrap background task logic in try/except:
def safe_background_task(data: str):
    try:
        # Your logic
        pass
    except Exception as e:
        logger.error(f"Task failed: {e}")
        # Handle appropriately
Background tasks run after the request, so don’t rely on request-scoped state:
# Bad: Don't rely on global state
current_user = None

def background_task():
    # current_user might be None or changed
    process(current_user)

# Good: Pass data explicitly
def background_task(user_id: int):
    process(user_id)
Since tasks can fail silently, only use them for operations that can tolerate occasional failures. For critical operations, use a robust task queue with monitoring.

Dependency injection

Learn about FastrAPI dependencies

Error handling

Handle errors in your application

API reference

Complete BackgroundTasks API

WebSockets

Real-time communication

Build docs developers (and LLMs) love