Background tasks allow you to execute code after returning a response to the client. This is useful for operations that don’t need to complete before sending a response, such as sending emails, processing data, or updating caches.
Basic usage
Use the BackgroundTasks dependency to schedule tasks that run after the response is sent:
from fastrapi import FastrAPI, BackgroundTasks
app = FastrAPI()
def send_email ( email : str , message : str ):
print ( f "Sending email to { email } : { message } " )
# Email sending logic here
@app.post ( "/send" )
def send_notification ( email : str , background_tasks : BackgroundTasks):
background_tasks.add_task(send_email, email, "Welcome to our service!" )
return { "message" : "Email will be sent in the background" }
if __name__ == "__main__" :
app.serve( "127.0.0.1" , 8080 )
The response is sent immediately to the client, and the background task executes afterward. The client doesn’t wait for the task to complete.
Adding tasks with arguments
Pass arguments to background tasks by including them after the function:
from fastrapi import FastrAPI, BackgroundTasks
import time
app = FastrAPI()
def process_data ( item_id : int , priority : str ):
print ( f "Processing item { item_id } with priority { priority } " )
time.sleep( 2 ) # Simulate processing
print ( f "Finished processing item { item_id } " )
@app.post ( "/process/ {item_id} " )
def process_item ( item_id : int , background_tasks : BackgroundTasks):
background_tasks.add_task(process_data, item_id, "high" )
return { "status" : "accepted" , "item_id" : item_id}
Multiple background tasks
Schedule multiple tasks for a single request. They execute in the order they’re added:
from fastrapi import FastrAPI, BackgroundTasks
app = FastrAPI()
def log_activity ( user_id : int , action : str ):
print ( f "User { user_id } performed: { action } " )
def update_analytics ( action : str ):
print ( f "Analytics updated for action: { action } " )
def send_notification ( user_id : int ):
print ( f "Notification sent to user { user_id } " )
@app.post ( "/action" )
def perform_action ( user_id : int , action : str , background_tasks : BackgroundTasks):
# Add multiple background tasks
background_tasks.add_task(log_activity, user_id, action)
background_tasks.add_task(update_analytics, action)
background_tasks.add_task(send_notification, user_id)
return { "status" : "success" , "action" : action}
Tasks execute sequentially in the order they’re added. For truly parallel execution, consider using a task queue like Celery or RQ.
Real-world examples
Email notifications
from fastrapi import FastrAPI, BackgroundTasks
from pydantic import BaseModel, EmailStr
app = FastrAPI()
class User ( BaseModel ):
email: EmailStr
name: str
def send_welcome_email ( email : str , name : str ):
# Simulate email sending
print ( f "Sending welcome email to { email } " )
# import smtplib
# ... actual email logic
@app.post ( "/register" )
def register_user ( user : User, background_tasks : BackgroundTasks):
# Register user in database (fast operation)
# ...
# Send welcome email in background (slow operation)
background_tasks.add_task(send_welcome_email, user.email, user.name)
return { "message" : f "User { user.name } registered successfully" }
File processing
from fastrapi import FastrAPI, BackgroundTasks
import time
app = FastrAPI()
def process_uploaded_file ( filename : str ):
print ( f "Starting to process { filename } " )
time.sleep( 5 ) # Simulate heavy processing
print ( f "Finished processing { filename } " )
@app.post ( "/upload" )
def upload_file ( filename : str , background_tasks : BackgroundTasks):
# Save file (fast)
# ...
# Process in background (slow)
background_tasks.add_task(process_uploaded_file, filename)
return {
"status" : "accepted" ,
"message" : "File uploaded and processing started"
}
Cache warming
from fastrapi import FastrAPI, BackgroundTasks
app = FastrAPI()
cache = {}
def warm_cache ( user_id : int ):
print ( f "Warming cache for user { user_id } " )
# Fetch and cache user data
cache[user_id] = { "name" : "John" , "preferences" : { ... }}
@app.post ( "/users/ {user_id} /settings" )
def update_settings ( user_id : int , settings : dict , background_tasks : BackgroundTasks):
# Update settings in database
# ...
# Refresh cache in background
background_tasks.add_task(warm_cache, user_id)
return { "message" : "Settings updated" }
Logging and analytics
from fastrapi import FastrAPI, BackgroundTasks
from datetime import datetime
app = FastrAPI()
def log_to_database ( user_id : int , endpoint : str , timestamp : datetime):
print ( f "Logging: User { user_id } accessed { endpoint } at { timestamp } " )
# Write to database or logging service
@app.get ( "/api/data" )
def get_data ( user_id : int , background_tasks : BackgroundTasks):
# Log the access in background
background_tasks.add_task(
log_to_database,
user_id,
"/api/data" ,
datetime.now()
)
return { "data" : [ 1 , 2 , 3 , 4 , 5 ]}
How background tasks work
FastrAPI’s background task system is implemented in Rust for efficient execution:
Task registration
When you call background_tasks.add_task(), the task is stored in a thread-safe vector wrapped in an Arc<Mutex> (src/background.rs:22-29).
Response sent
Your route handler returns, and FastrAPI sends the response to the client immediately.
Task execution
After the response is sent, FastrAPI spawns Tokio tasks for each background task using tokio::task::spawn_blocking() (src/background.rs:43).
Python execution
Each task runs in its own blocking thread with the Python GIL, executing your Python function with the provided arguments (src/background.rs:44-58).
Error handling
Background task errors don’t affect the response since it’s already sent. Handle errors within your tasks:
from fastrapi import FastrAPI, BackgroundTasks
import logging
app = FastrAPI()
logger = logging.getLogger( __name__ )
def risky_operation ( data : str ):
try :
# Perform operation that might fail
if not data:
raise ValueError ( "Data cannot be empty" )
# Process data
print ( f "Processing: { data } " )
except Exception as e:
logger.error( f "Background task failed: { e } " )
# Handle error: retry, save to dead letter queue, etc.
@app.post ( "/task" )
def create_task ( data : str , background_tasks : BackgroundTasks):
background_tasks.add_task(risky_operation, data)
return { "status" : "accepted" }
Background task errors are logged to stderr but don’t raise exceptions in your main application. Always implement error handling within your background tasks.
Limitations
Background tasks can’t return values to the client since the response is already sent. If you need results, consider:
Writing to a database
Using a message queue
Implementing webhooks
Tasks run sequentially in the order added. For parallel execution, use:
Multiple background task calls
External task queues (Celery, RQ)
Async operations within tasks
Tasks must complete before the application shuts down. For long-running tasks, use a dedicated task queue system.
Background tasks don’t automatically retry on failure. Implement retry logic yourself or use a task queue with retry support.
When to use background tasks
Use background tasks
Sending emails/notifications
Logging and analytics
Cache updates
Quick file processing
Non-critical operations
Use task queues instead
Long-running jobs (>30s)
Critical operations needing retry
Jobs requiring monitoring
Distributed processing
Scheduled tasks
Best practices
Background tasks should complete quickly (under 30 seconds). For longer operations, use a dedicated task queue: # Good: Quick operation
def send_email ( email : str ):
# Takes 1-2 seconds
pass
# Bad: Long operation
def process_video ( video_path : str ):
# Takes 5 minutes - use Celery instead
pass
Always wrap background task logic in try/except: def safe_background_task ( data : str ):
try :
# Your logic
pass
except Exception as e:
logger.error( f "Task failed: { e } " )
# Handle appropriately
Background tasks run after the request, so don’t rely on request-scoped state: # Bad: Don't rely on global state
current_user = None
def background_task ():
# current_user might be None or changed
process(current_user)
# Good: Pass data explicitly
def background_task ( user_id : int ):
process(user_id)
Use for non-critical operations
Since tasks can fail silently, only use them for operations that can tolerate occasional failures. For critical operations, use a robust task queue with monitoring.
Dependency injection Learn about FastrAPI dependencies
Error handling Handle errors in your application
API reference Complete BackgroundTasks API
WebSockets Real-time communication