Overview
Asynchronous inference decouples API requests from model inference using message queues. This pattern is essential for:
- Long-running inference tasks
- Batch processing
- Better resource utilization
- Handling traffic spikes
- Cost optimization
Synchronous vs Async Inference
Synchronous Pattern
Client → API → Model → API → Client
|______________|
(waiting)
Pros:
- Simple to implement
- Immediate results
- Easy to debug
Cons:
- Client waits for entire inference
- Connection timeouts on long tasks
- Poor resource utilization
- Difficult to scale
Asynchronous Pattern
Client → API → Queue → Worker → Model
↓ ↓
Job ID ←←←←←←←←← Results Store
Pros:
- Non-blocking API responses
- Better resource utilization
- Handle traffic bursts
- Easy to scale workers
- No timeout issues
Cons:
- More complex architecture
- Need status polling or webhooks
- Eventual consistency
Use async inference when inference time > 5 seconds or traffic is highly variable.
Implementation Patterns
Pattern 1: Simple Queue (Modal)
Minimal async setup using Modal’s built-in queue:
from fastapi import FastAPI
from pydantic import BaseModel
import modal
app = modal.App("simple-queue")
@app.function()
def process_job(data):
import time
time.sleep(60)
return {"result": data}
web_app = FastAPI()
def submit_job(data):
process_job = modal.Function.lookup("simple-queue", "process_job")
call = process_job.spawn(data)
return call.object_id
def get_job_result(call_id):
function_call = modal.functions.FunctionCall.from_id(call_id)
try:
result = function_call.get(timeout=0)
except modal.exception.OutputExpiredError:
result = {"result": "expired"}
except TimeoutError:
result = {"result": "pending"}
return result
class SubmitJobRequest(BaseModel):
data: str
@web_app.post("/submit_job")
async def submit_job_endpoint(request: SubmitJobRequest):
call_id = submit_job(request.data)
return {"call_id": call_id}
@web_app.get("/get_job_result")
async def get_job_result_endpoint(call_id: str):
result = get_job_result(call_id)
return result
if __name__ == "__main__":
import uvicorn
uvicorn.run(web_app, host="0.0.0.0", port=8000)
Deploy and Test
# Deploy Modal function
modal deploy simple_queue.py
# Run API locally
python simple_queue.py
# Submit job
curl -X POST http://localhost:8000/submit_job \
-H "Content-Type: application/json" \
-d '{"data": "test input"}'
# Returns: {"call_id": "call-abc123"}
# Check result
curl http://localhost:8000/get_job_result?call_id=call-abc123
# Returns: {"result": "pending"} or {"result": "test input"}
Pattern 2: SQS Queue
Production-ready pattern using AWS SQS (or compatible):
from fastapi import FastAPI
from pydantic import BaseModel
import typer
import uvicorn
import boto3
import json
from botocore.exceptions import ClientError
import time
from tinydb import TinyDB, Query
from uuid import uuid4
AWS_ENDPOINT = "http://localhost:3001"
QUEUE_NAME = "ml-infernece"
QUEUE_URL = f"{AWS_ENDPOINT}/1/{QUEUE_NAME}"
class RecordManager:
def __init__(self, db_path="db.json"):
self.db = TinyDB(db_path)
self.query = Query()
def add_record(self):
record_id = str(uuid4())
self.db.insert({"id": record_id, "status": "pending"})
return record_id
def update_record(self, record_id, result_data):
update_data = {"status": "done"}
update_data.update(result_data)
self.db.update(update_data, self.query.id == record_id)
def pull_record(self, record_id):
return self.db.get(self.query.id == record_id)
web_app = FastAPI()
record_manager = RecordManager()
def get_or_create_queue(queue_name, aws_region="us-east-1"):
sqs = boto3.client("sqs", region_name=aws_region, endpoint_url=AWS_ENDPOINT)
try:
response = sqs.get_queue_url(QueueName=queue_name)
return response["QueueUrl"]
except ClientError as e:
if e.response["Error"]["Code"] == "QueueDoesNotExist":
response = sqs.create_queue(QueueName=queue_name)
return response["QueueUrl"]
else:
raise
def submit_message_to_sqs(queue_url, data, aws_region="us-east-1"):
sqs = boto3.client("sqs", region_name=aws_region, endpoint_url=AWS_ENDPOINT)
message_body = json.dumps(data)
try:
response = sqs.send_message(QueueUrl=queue_url, MessageBody=message_body)
return response
except ClientError as e:
raise e
def submit_job(data):
call_id = record_manager.add_record()
submit_message_to_sqs(
queue_url=QUEUE_URL,
data={"data": data, "call_id": call_id},
aws_region="us-east-1",
)
return call_id
def get_job_result(call_id):
record = record_manager.pull_record(record_id=call_id)
return record
class SubmitJobRequest(BaseModel):
data: str
@web_app.post("/submit_job")
async def submit_job_endpoint(request: SubmitJobRequest):
call_id = submit_job(request.data)
return {"call_id": call_id}
@web_app.get("/get_job_result")
async def get_job_result_endpoint(call_id: str):
result = get_job_result(call_id)
return result
def run_api():
get_or_create_queue(queue_name=QUEUE_NAME)
uvicorn.run(web_app, host="0.0.0.0", port=8000)
def receive_messages_from_sqs(queue_url, max_number=1, aws_region="us-east-1"):
sqs = boto3.client("sqs", region_name=aws_region, endpoint_url=AWS_ENDPOINT)
try:
response = sqs.receive_message(
QueueUrl=queue_url, MaxNumberOfMessages=max_number, WaitTimeSeconds=10
)
messages = response.get("Messages", [])
for message in messages:
sqs.delete_message(
QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
)
return messages
except ClientError as e:
raise e
def process_job(data):
time.sleep(60)
return {"result": "some-result"}
def run_worker():
while True:
messages = receive_messages_from_sqs(queue_url=QUEUE_URL)
print(f"pulled {messages}")
if len(messages) == 0:
time.sleep(1)
for message in messages:
data = json.loads(message["Body"])
result_data = process_job(data=data)
record_manager.update_record(
record_id=data["call_id"], result_data=result_data
)
def cli():
app = typer.Typer()
app.command()(run_api)
app.command()(run_worker)
app()
if __name__ == "__main__":
cli()
Setup SQS (Local)
Use SmoothMQ for local testing:
# Clone and build
git clone https://github.com/poundifdef/smoothmq.git
docker build -t sqs:latest ./smoothmq/
# Run SQS-compatible server
docker run -p 3000:3000 -p 3001:3001 sqs:latest
Run API and Worker
# Set credentials
export AWS_ACCESS_KEY_ID=DEV_ACCESS_KEY_ID
export AWS_SECRET_ACCESS_KEY=DEV_SECRET_ACCESS_KEY
export AWS_DEFAULT_REGION=us-east-1
# Terminal 1: Run API
python sqs_queue.py run-api
# Terminal 2: Run worker
python sqs_queue.py run-worker
# Terminal 3: Test
curl -X POST http://localhost:8000/submit_job \
-H "Content-Type: application/json" \
-d '{"data": "test input"}'
Production Queue Options
AWS SQS
Pros:
- Fully managed
- Highly scalable
- Pay per use
- Built-in DLQ support
Cons:
- AWS vendor lock-in
- Network latency
- Limited message size (256KB)
Use when:
- Already on AWS
- Need managed service
- Variable workload
RabbitMQ
Pros:
- Feature-rich
- Self-hosted option
- Multiple protocols
- Complex routing
Cons:
- Requires management
- Scaling complexity
- Higher operational overhead
Use when:
- Need advanced routing
- Self-hosting preferred
- Complex workflows
Redis Queues
Pros:
- Very fast
- Simple setup
- Multiple data structures
- Already use Redis
Cons:
- In-memory (data loss risk)
- Limited durability
- Manual persistence config
Use when:
- Speed is critical
- Already using Redis
- Can tolerate data loss
Apache Kafka
Pros:
- High throughput
- Durable
- Event streaming
- Strong ordering
Cons:
- Complex setup
- Heavy resource usage
- Steep learning curve
Use when:
- Very high throughput
- Event streaming needs
- Strong ordering required
- Large-scale systems
For ML inference: Start with SQS or Redis. Move to Kafka only if you need >10K messages/sec.
KServe with Kafka
KServe supports end-to-end async inference with Kafka:
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: mnist-kafka
spec:
predictor:
logger:
mode: all
url: http://kafka-broker.kafka:9092
model:
modelFormat:
name: sklearn
storageUri: gs://kfserving-examples/models/sklearn/1.0/model
See KServe Kafka documentation for complete setup.
Architecture Patterns
Pattern A: Simple Async
┌──────┐ ┌─────┐ ┌───────┐ ┌────────┐
│Client│────>│ API │────>│ Queue │────>│ Worker │
└──────┘ └─────┘ └───────┘ └────────┘
│ │
│ ┌──────────┐ │
└────────>│ Database │<──────┘
└──────────┘
Best for: Simple jobs, polling acceptable
Pattern B: Webhook Notifications
┌──────┐ ┌─────┐ ┌───────┐ ┌────────┐
│Client│────>│ API │────>│ Queue │────>│ Worker │
└──────┘ └─────┘ └───────┘ └────────┘
↑ │
│ │
└────────────────────────────────────────┘
(webhook callback)
Best for: Real-time notifications needed
Pattern C: Multiple Workers
┌────────┐
┌───>│Worker 1│
┌──────┐ ┌─────┐ │ └────────┘
│Client│──>│ API │─┤ ┌────────┐
└──────┘ └─────┘ ├───>│Worker 2│
│ └────────┘
│ ┌────────┐
└───>│Worker 3│
└────────┘
Best for: High throughput, parallel processing
Pattern D: Priority Queues
┌──────────┐ ┌────────────┐
│High Queue│──>│Fast Worker │
┌──────┐ ┌─────┐ ├──────────┤ └────────────┘
│Client│──>│ API │─┤Low Queue │──>┌────────────┐
└──────┘ └─────┘ └──────────┘ │Slow Worker │
└────────────┘
Best for: Mixed workload, SLA requirements
Best Practices
Idempotent Processing
Ensure workers can safely retry failed jobs without side effects
Dead Letter Queues
Configure DLQ to capture failed messages for investigation
Exponential Backoff
Retry failed jobs with increasing delays to avoid overwhelming the system
Monitor Queue Depth
Alert when queue depth grows beyond threshold
Error Handling
import time
import logging
from tenacity import retry, stop_after_attempt, wait_exponential
logger = logging.getLogger(__name__)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=60)
)
def process_job_with_retry(data):
try:
result = run_inference(data)
return {"status": "success", "result": result}
except Exception as e:
logger.error(f"Job failed: {e}")
raise
Monitoring
Track these metrics:
| Metric | Alert Threshold | Action |
|---|
| Queue Depth | > 1000 | Scale workers |
| Processing Time | > 60s | Investigate bottleneck |
| Error Rate | > 5% | Check logs |
| DLQ Depth | > 0 | Investigate failures |
| Worker CPU | > 80% | Scale up |
Comparison: Sync vs Async
| Aspect | Synchronous | Asynchronous |
|---|
| Latency | Lower for quick tasks | Higher (includes polling) |
| Throughput | Limited by connections | Much higher |
| Reliability | Timeout issues | Robust |
| Complexity | Simple | Moderate |
| Cost | Higher (always on) | Lower (scale workers) |
| Best For | < 5s inference | > 5s inference |
Real-World Examples
AWS SageMaker Async Inference
import boto3
sagemaker = boto3.client('sagemaker-runtime')
# Submit async inference
response = sagemaker.invoke_endpoint_async(
EndpointName='my-endpoint',
InputLocation='s3://bucket/input.json',
InferenceId='unique-id'
)
output_location = response['OutputLocation']
AWS Documentation
Seldon Core V2 with Kafka
Seldon supports native Kafka integration:
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
name: iris
spec:
storageUri: gs://seldon-models/sklearn/iris
requirements:
- sklearn
kafkaConfig:
topicName: seldon.default.iris.inputs
servers: kafka:9092
Seldon Documentation
Troubleshooting
Messages pile up in queue
Causes:
- Workers too slow
- Not enough workers
- Worker crashes
Solutions:
- Scale worker count
- Optimize inference code
- Check worker logs for errors
- Monitor worker health
Causes:
- Visibility timeout too short
- Processing takes too long
- Not extending timeout
Solutions:
- Increase visibility timeout
- Extend timeout in worker
- Break into smaller tasks
- Use progress updates
Causes:
- Not deleting messages
- Worker crashes before delete
- Network issues
Solutions:
- Delete after processing
- Make processing idempotent
- Use deduplication IDs
- Check message attributes
Additional Resources
Stream Processing
Introduction to streaming for data scientists
SageMaker Async
AWS SageMaker async inference guide
Event-Driven
REST vs Messaging for Microservices
KEDA
Kubernetes Event-driven Autoscaling
Next Steps
Quantization
Learn how to optimize model performance with quantization