Skip to main content

Overview

Asynchronous inference decouples API requests from model inference using message queues. This pattern is essential for:
  • Long-running inference tasks
  • Batch processing
  • Better resource utilization
  • Handling traffic spikes
  • Cost optimization

Synchronous vs Async Inference

Synchronous Pattern

Client → API → Model → API → Client
         |______________|  
           (waiting)
Pros:
  • Simple to implement
  • Immediate results
  • Easy to debug
Cons:
  • Client waits for entire inference
  • Connection timeouts on long tasks
  • Poor resource utilization
  • Difficult to scale

Asynchronous Pattern

Client → API → Queue → Worker → Model
         ↓                        ↓
         Job ID ←←←←←←←←← Results Store
Pros:
  • Non-blocking API responses
  • Better resource utilization
  • Handle traffic bursts
  • Easy to scale workers
  • No timeout issues
Cons:
  • More complex architecture
  • Need status polling or webhooks
  • Eventual consistency
Use async inference when inference time > 5 seconds or traffic is highly variable.

Implementation Patterns

Pattern 1: Simple Queue (Modal)

Minimal async setup using Modal’s built-in queue:
simple_queue.py
from fastapi import FastAPI
from pydantic import BaseModel
import modal

app = modal.App("simple-queue")


@app.function()
def process_job(data):
    import time

    time.sleep(60)
    return {"result": data}


web_app = FastAPI()


def submit_job(data):
    process_job = modal.Function.lookup("simple-queue", "process_job")
    call = process_job.spawn(data)
    return call.object_id


def get_job_result(call_id):
    function_call = modal.functions.FunctionCall.from_id(call_id)
    try:
        result = function_call.get(timeout=0)
    except modal.exception.OutputExpiredError:
        result = {"result": "expired"}
    except TimeoutError:
        result = {"result": "pending"}
    return result


class SubmitJobRequest(BaseModel):
    data: str


@web_app.post("/submit_job")
async def submit_job_endpoint(request: SubmitJobRequest):
    call_id = submit_job(request.data)
    return {"call_id": call_id}


@web_app.get("/get_job_result")
async def get_job_result_endpoint(call_id: str):
    result = get_job_result(call_id)
    return result


if __name__ == "__main__":
    import uvicorn

    uvicorn.run(web_app, host="0.0.0.0", port=8000)

Deploy and Test

# Deploy Modal function
modal deploy simple_queue.py

# Run API locally
python simple_queue.py

# Submit job
curl -X POST http://localhost:8000/submit_job \
  -H "Content-Type: application/json" \
  -d '{"data": "test input"}'

# Returns: {"call_id": "call-abc123"}

# Check result
curl http://localhost:8000/get_job_result?call_id=call-abc123

# Returns: {"result": "pending"} or {"result": "test input"}

Pattern 2: SQS Queue

Production-ready pattern using AWS SQS (or compatible):
sqs_queue.py
from fastapi import FastAPI
from pydantic import BaseModel
import typer
import uvicorn
import boto3
import json
from botocore.exceptions import ClientError
import time
from tinydb import TinyDB, Query
from uuid import uuid4

AWS_ENDPOINT = "http://localhost:3001"
QUEUE_NAME = "ml-infernece"
QUEUE_URL = f"{AWS_ENDPOINT}/1/{QUEUE_NAME}"


class RecordManager:
    def __init__(self, db_path="db.json"):
        self.db = TinyDB(db_path)
        self.query = Query()

    def add_record(self):
        record_id = str(uuid4())
        self.db.insert({"id": record_id, "status": "pending"})
        return record_id

    def update_record(self, record_id, result_data):
        update_data = {"status": "done"}
        update_data.update(result_data)
        self.db.update(update_data, self.query.id == record_id)

    def pull_record(self, record_id):
        return self.db.get(self.query.id == record_id)


web_app = FastAPI()
record_manager = RecordManager()


def get_or_create_queue(queue_name, aws_region="us-east-1"):
    sqs = boto3.client("sqs", region_name=aws_region, endpoint_url=AWS_ENDPOINT)
    try:
        response = sqs.get_queue_url(QueueName=queue_name)
        return response["QueueUrl"]
    except ClientError as e:
        if e.response["Error"]["Code"] == "QueueDoesNotExist":
            response = sqs.create_queue(QueueName=queue_name)
            return response["QueueUrl"]
        else:
            raise


def submit_message_to_sqs(queue_url, data, aws_region="us-east-1"):
    sqs = boto3.client("sqs", region_name=aws_region, endpoint_url=AWS_ENDPOINT)
    message_body = json.dumps(data)
    try:
        response = sqs.send_message(QueueUrl=queue_url, MessageBody=message_body)
        return response
    except ClientError as e:
        raise e


def submit_job(data):
    call_id = record_manager.add_record()
    submit_message_to_sqs(
        queue_url=QUEUE_URL,
        data={"data": data, "call_id": call_id},
        aws_region="us-east-1",
    )
    return call_id


def get_job_result(call_id):
    record = record_manager.pull_record(record_id=call_id)
    return record


class SubmitJobRequest(BaseModel):
    data: str


@web_app.post("/submit_job")
async def submit_job_endpoint(request: SubmitJobRequest):
    call_id = submit_job(request.data)
    return {"call_id": call_id}


@web_app.get("/get_job_result")
async def get_job_result_endpoint(call_id: str):
    result = get_job_result(call_id)
    return result


def run_api():
    get_or_create_queue(queue_name=QUEUE_NAME)
    uvicorn.run(web_app, host="0.0.0.0", port=8000)


def receive_messages_from_sqs(queue_url, max_number=1, aws_region="us-east-1"):
    sqs = boto3.client("sqs", region_name=aws_region, endpoint_url=AWS_ENDPOINT)
    try:
        response = sqs.receive_message(
            QueueUrl=queue_url, MaxNumberOfMessages=max_number, WaitTimeSeconds=10
        )
        messages = response.get("Messages", [])
        for message in messages:
            sqs.delete_message(
                QueueUrl=queue_url, ReceiptHandle=message["ReceiptHandle"]
            )
        return messages
    except ClientError as e:
        raise e


def process_job(data):
    time.sleep(60)
    return {"result": "some-result"}


def run_worker():
    while True:
        messages = receive_messages_from_sqs(queue_url=QUEUE_URL)
        print(f"pulled {messages}")
        if len(messages) == 0:
            time.sleep(1)
        for message in messages:
            data = json.loads(message["Body"])
            result_data = process_job(data=data)
            record_manager.update_record(
                record_id=data["call_id"], result_data=result_data
            )


def cli():
    app = typer.Typer()
    app.command()(run_api)
    app.command()(run_worker)
    app()


if __name__ == "__main__":
    cli()

Setup SQS (Local)

Use SmoothMQ for local testing:
# Clone and build
git clone https://github.com/poundifdef/smoothmq.git
docker build -t sqs:latest ./smoothmq/

# Run SQS-compatible server
docker run -p 3000:3000 -p 3001:3001 sqs:latest

Run API and Worker

# Set credentials
export AWS_ACCESS_KEY_ID=DEV_ACCESS_KEY_ID
export AWS_SECRET_ACCESS_KEY=DEV_SECRET_ACCESS_KEY
export AWS_DEFAULT_REGION=us-east-1

# Terminal 1: Run API
python sqs_queue.py run-api

# Terminal 2: Run worker
python sqs_queue.py run-worker

# Terminal 3: Test
curl -X POST http://localhost:8000/submit_job \
  -H "Content-Type: application/json" \
  -d '{"data": "test input"}'

Production Queue Options

AWS SQS

Pros:
  • Fully managed
  • Highly scalable
  • Pay per use
  • Built-in DLQ support
Cons:
  • AWS vendor lock-in
  • Network latency
  • Limited message size (256KB)
Use when:
  • Already on AWS
  • Need managed service
  • Variable workload

RabbitMQ

Pros:
  • Feature-rich
  • Self-hosted option
  • Multiple protocols
  • Complex routing
Cons:
  • Requires management
  • Scaling complexity
  • Higher operational overhead
Use when:
  • Need advanced routing
  • Self-hosting preferred
  • Complex workflows

Redis Queues

Pros:
  • Very fast
  • Simple setup
  • Multiple data structures
  • Already use Redis
Cons:
  • In-memory (data loss risk)
  • Limited durability
  • Manual persistence config
Use when:
  • Speed is critical
  • Already using Redis
  • Can tolerate data loss

Apache Kafka

Pros:
  • High throughput
  • Durable
  • Event streaming
  • Strong ordering
Cons:
  • Complex setup
  • Heavy resource usage
  • Steep learning curve
Use when:
  • Very high throughput
  • Event streaming needs
  • Strong ordering required
  • Large-scale systems
For ML inference: Start with SQS or Redis. Move to Kafka only if you need >10K messages/sec.

KServe with Kafka

KServe supports end-to-end async inference with Kafka:
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: mnist-kafka
spec:
  predictor:
    logger:
      mode: all
      url: http://kafka-broker.kafka:9092
    model:
      modelFormat:
        name: sklearn
      storageUri: gs://kfserving-examples/models/sklearn/1.0/model
See KServe Kafka documentation for complete setup.

Architecture Patterns

Pattern A: Simple Async

┌──────┐     ┌─────┐     ┌───────┐     ┌────────┐
│Client│────>│ API │────>│ Queue │────>│ Worker │
└──────┘     └─────┘     └───────┘     └────────┘
               │                            │
               │         ┌──────────┐       │
               └────────>│ Database │<──────┘
                         └──────────┘
Best for: Simple jobs, polling acceptable

Pattern B: Webhook Notifications

┌──────┐     ┌─────┐     ┌───────┐     ┌────────┐
│Client│────>│ API │────>│ Queue │────>│ Worker │
└──────┘     └─────┘     └───────┘     └────────┘
   ↑                                        │
   │                                        │
   └────────────────────────────────────────┘
              (webhook callback)
Best for: Real-time notifications needed

Pattern C: Multiple Workers

                         ┌────────┐
                    ┌───>│Worker 1│
┌──────┐   ┌─────┐ │    └────────┘
│Client│──>│ API │─┤    ┌────────┐
└──────┘   └─────┘ ├───>│Worker 2│
                    │    └────────┘
                    │    ┌────────┐
                    └───>│Worker 3│
                         └────────┘
Best for: High throughput, parallel processing

Pattern D: Priority Queues

                    ┌──────────┐   ┌────────────┐
                    │High Queue│──>│Fast Worker │
┌──────┐   ┌─────┐ ├──────────┤   └────────────┘
│Client│──>│ API │─┤Low Queue │──>┌────────────┐
└──────┘   └─────┘ └──────────┘   │Slow Worker │
                                    └────────────┘
Best for: Mixed workload, SLA requirements

Best Practices

Idempotent Processing

Ensure workers can safely retry failed jobs without side effects

Dead Letter Queues

Configure DLQ to capture failed messages for investigation

Exponential Backoff

Retry failed jobs with increasing delays to avoid overwhelming the system

Monitor Queue Depth

Alert when queue depth grows beyond threshold

Error Handling

import time
import logging
from tenacity import retry, stop_after_attempt, wait_exponential

logger = logging.getLogger(__name__)

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=4, max=60)
)
def process_job_with_retry(data):
    try:
        result = run_inference(data)
        return {"status": "success", "result": result}
    except Exception as e:
        logger.error(f"Job failed: {e}")
        raise

Monitoring

Track these metrics:
MetricAlert ThresholdAction
Queue Depth> 1000Scale workers
Processing Time> 60sInvestigate bottleneck
Error Rate> 5%Check logs
DLQ Depth> 0Investigate failures
Worker CPU> 80%Scale up

Comparison: Sync vs Async

AspectSynchronousAsynchronous
LatencyLower for quick tasksHigher (includes polling)
ThroughputLimited by connectionsMuch higher
ReliabilityTimeout issuesRobust
ComplexitySimpleModerate
CostHigher (always on)Lower (scale workers)
Best For< 5s inference> 5s inference

Real-World Examples

AWS SageMaker Async Inference

import boto3

sagemaker = boto3.client('sagemaker-runtime')

# Submit async inference
response = sagemaker.invoke_endpoint_async(
    EndpointName='my-endpoint',
    InputLocation='s3://bucket/input.json',
    InferenceId='unique-id'
)

output_location = response['OutputLocation']
AWS Documentation

Seldon Core V2 with Kafka

Seldon supports native Kafka integration:
apiVersion: mlops.seldon.io/v1alpha1
kind: Model
metadata:
  name: iris
spec:
  storageUri: gs://seldon-models/sklearn/iris
  requirements:
  - sklearn
  kafkaConfig:
    topicName: seldon.default.iris.inputs
    servers: kafka:9092
Seldon Documentation

Troubleshooting

Causes:
  • Workers too slow
  • Not enough workers
  • Worker crashes
Solutions:
  • Scale worker count
  • Optimize inference code
  • Check worker logs for errors
  • Monitor worker health
Causes:
  • Visibility timeout too short
  • Processing takes too long
  • Not extending timeout
Solutions:
  • Increase visibility timeout
  • Extend timeout in worker
  • Break into smaller tasks
  • Use progress updates
Causes:
  • Not deleting messages
  • Worker crashes before delete
  • Network issues
Solutions:
  • Delete after processing
  • Make processing idempotent
  • Use deduplication IDs
  • Check message attributes

Additional Resources

Stream Processing

Introduction to streaming for data scientists

SageMaker Async

AWS SageMaker async inference guide

Event-Driven

REST vs Messaging for Microservices

KEDA

Kubernetes Event-driven Autoscaling

Next Steps

Quantization

Learn how to optimize model performance with quantization

Build docs developers (and LLMs) love