Skip to main content
Background tasks allow you to run code after returning a response to the client. This is useful for operations that don’t need to complete before sending the response.

When to Use Background Tasks

Use background tasks for operations like:
  • Sending email notifications
  • Processing data that doesn’t affect the response
  • Writing to log files
  • Triggering webhooks
  • Cleaning up resources
  • Updating caches
For heavy background processing, consider using a dedicated task queue like Celery or RQ instead.

Basic Usage

Add BackgroundTasks as a parameter to your path operation function:
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()

def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)

@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

How It Works

1

Request Processing

Your path operation function receives the request and processes it normally.
2

Add Background Task

Use background_tasks.add_task() to queue functions to run after the response.
3

Response Sent

FastAPI sends the response to the client immediately.
4

Task Execution

After the response is sent, the background tasks run in order.

Adding Tasks

Use the add_task() method to add a function and its arguments:
background_tasks.add_task(func, arg1, arg2, kwarg1="value")
  • First argument: the function to call
  • Following arguments: positional arguments for the function
  • Keyword arguments: keyword arguments for the function

Sync and Async Functions

Both regular def and async def functions work:
def sync_task(name: str):
    # Regular function - will be run in a thread pool
    with open(f"{name}.txt", "w") as f:
        f.write(f"Hello {name}")

async def async_task(name: str):
    # Async function - will be awaited
    await asyncio.sleep(1)
    print(f"Task completed for {name}")

@app.post("/process/{name}")
async def process(name: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(sync_task, name)
    background_tasks.add_task(async_task, name)
    return {"message": "Processing started"}
Regular def functions are executed in a thread pool, so they won’t block the async event loop. Use async def for I/O-bound operations that support async.

Multiple Background Tasks

You can add multiple tasks - they run sequentially in the order they were added:
@app.post("/multi-task/{email}")
async def multi_task(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, f"Processing request for {email}")
    background_tasks.add_task(send_email, email)
    background_tasks.add_task(update_analytics, email)
    return {"message": "Tasks queued"}
Tasks run sequentially, not in parallel. If one task takes a long time, subsequent tasks will wait.

Background Tasks in Dependencies

You can also add background tasks from dependencies:
from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()

def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)

def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q

@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
Background tasks from dependencies and path operations are combined. All tasks will run after the response is sent.

Technical Details

The BackgroundTasks class comes from Starlette and is directly integrated into FastAPI.

Task Execution Context

  • Tasks run after the response is sent to the client
  • They execute within the same request context
  • Dependencies with yield complete before background tasks start
  • Tasks run sequentially in the order they were added

Error Handling

If a background task raises an exception:
  • The exception is logged
  • Subsequent background tasks still run
  • The client has already received the response and won’t see the error
Implement proper error handling within your background task functions:
import logging

def safe_background_task(data: str):
    try:
        # Your task logic here
        process_data(data)
    except Exception as e:
        logging.error(f"Background task failed: {e}")
        # Handle or report the error appropriately

Use Cases

1

Email Notifications

from fastapi import BackgroundTasks
import smtplib

def send_welcome_email(email: str):
    # Send email logic
    pass

@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
    # Save user to database
    background_tasks.add_task(send_welcome_email, email)
    return {"message": "Registration successful"}
2

Data Processing

def process_upload(file_path: str):
    # Process uploaded file
    pass

@app.post("/upload")
async def upload_file(background_tasks: BackgroundTasks):
    # Save file
    file_path = "uploads/file.csv"
    background_tasks.add_task(process_upload, file_path)
    return {"message": "File uploaded, processing started"}
3

Logging and Analytics

def log_request(user_id: int, action: str):
    # Log to analytics service
    pass

@app.post("/action")
async def perform_action(user_id: int, background_tasks: BackgroundTasks):
    background_tasks.add_task(log_request, user_id, "action_performed")
    return {"message": "Action completed"}

Limitations

Background tasks have some limitations:
  • They run in the same process as your web server
  • Not suitable for long-running or CPU-intensive tasks
  • No built-in retry mechanism
  • No task persistence across server restarts
  • Limited observability and monitoring
For production applications with complex background processing needs, use dedicated task queues like:
  • Celery: Distributed task queue with many features
  • RQ (Redis Queue): Simple task queue using Redis
  • Dramatiq: Fast and reliable alternative to Celery
  • ARQ: Async task queue using Redis

When to Use a Task Queue

Consider using a dedicated task queue instead of BackgroundTasks when you need:
  • Task persistence and reliability
  • Retry mechanisms with exponential backoff
  • Task scheduling and delayed execution
  • Distributed task processing across multiple workers
  • Task monitoring and management UI
  • Long-running or CPU-intensive tasks
  • Task priorities and routing

Build docs developers (and LLMs) love