Background tasks allow you to run code after returning a response to the client. This is useful for operations that don’t need to complete before sending the response.
When to Use Background Tasks
Use background tasks for operations like:
- Sending email notifications
- Processing data that doesn’t affect the response
- Writing to log files
- Triggering webhooks
- Cleaning up resources
- Updating caches
For heavy background processing, consider using a dedicated task queue like Celery or RQ instead.
Basic Usage
Add BackgroundTasks as a parameter to your path operation function:
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
How It Works
Request Processing
Your path operation function receives the request and processes it normally.
Add Background Task
Use background_tasks.add_task() to queue functions to run after the response.
Response Sent
FastAPI sends the response to the client immediately.
Task Execution
After the response is sent, the background tasks run in order.
Adding Tasks
Use the add_task() method to add a function and its arguments:
background_tasks.add_task(func, arg1, arg2, kwarg1="value")
- First argument: the function to call
- Following arguments: positional arguments for the function
- Keyword arguments: keyword arguments for the function
Sync and Async Functions
Both regular def and async def functions work:
def sync_task(name: str):
# Regular function - will be run in a thread pool
with open(f"{name}.txt", "w") as f:
f.write(f"Hello {name}")
async def async_task(name: str):
# Async function - will be awaited
await asyncio.sleep(1)
print(f"Task completed for {name}")
@app.post("/process/{name}")
async def process(name: str, background_tasks: BackgroundTasks):
background_tasks.add_task(sync_task, name)
background_tasks.add_task(async_task, name)
return {"message": "Processing started"}
Regular def functions are executed in a thread pool, so they won’t block the async event loop. Use async def for I/O-bound operations that support async.
Multiple Background Tasks
You can add multiple tasks - they run sequentially in the order they were added:
@app.post("/multi-task/{email}")
async def multi_task(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, f"Processing request for {email}")
background_tasks.add_task(send_email, email)
background_tasks.add_task(update_analytics, email)
return {"message": "Tasks queued"}
Tasks run sequentially, not in parallel. If one task takes a long time, subsequent tasks will wait.
Background Tasks in Dependencies
You can also add background tasks from dependencies:
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
Background tasks from dependencies and path operations are combined. All tasks will run after the response is sent.
Technical Details
The BackgroundTasks class comes from Starlette and is directly integrated into FastAPI.
Task Execution Context
- Tasks run after the response is sent to the client
- They execute within the same request context
- Dependencies with
yield complete before background tasks start
- Tasks run sequentially in the order they were added
Error Handling
If a background task raises an exception:
- The exception is logged
- Subsequent background tasks still run
- The client has already received the response and won’t see the error
Implement proper error handling within your background task functions:
import logging
def safe_background_task(data: str):
try:
# Your task logic here
process_data(data)
except Exception as e:
logging.error(f"Background task failed: {e}")
# Handle or report the error appropriately
Use Cases
Email Notifications
from fastapi import BackgroundTasks
import smtplib
def send_welcome_email(email: str):
# Send email logic
pass
@app.post("/register")
async def register(email: str, background_tasks: BackgroundTasks):
# Save user to database
background_tasks.add_task(send_welcome_email, email)
return {"message": "Registration successful"}
Data Processing
def process_upload(file_path: str):
# Process uploaded file
pass
@app.post("/upload")
async def upload_file(background_tasks: BackgroundTasks):
# Save file
file_path = "uploads/file.csv"
background_tasks.add_task(process_upload, file_path)
return {"message": "File uploaded, processing started"}
Logging and Analytics
def log_request(user_id: int, action: str):
# Log to analytics service
pass
@app.post("/action")
async def perform_action(user_id: int, background_tasks: BackgroundTasks):
background_tasks.add_task(log_request, user_id, "action_performed")
return {"message": "Action completed"}
Limitations
Background tasks have some limitations:
- They run in the same process as your web server
- Not suitable for long-running or CPU-intensive tasks
- No built-in retry mechanism
- No task persistence across server restarts
- Limited observability and monitoring
For production applications with complex background processing needs, use dedicated task queues like:
- Celery: Distributed task queue with many features
- RQ (Redis Queue): Simple task queue using Redis
- Dramatiq: Fast and reliable alternative to Celery
- ARQ: Async task queue using Redis
When to Use a Task Queue
Consider using a dedicated task queue instead of BackgroundTasks when you need:
- Task persistence and reliability
- Retry mechanisms with exponential backoff
- Task scheduling and delayed execution
- Distributed task processing across multiple workers
- Task monitoring and management UI
- Long-running or CPU-intensive tasks
- Task priorities and routing