Skip to main content

Async Patterns

Memori works with Python’s async/await out of the box. Use AsyncOpenAI or AsyncAnthropic instead of their sync counterparts — everything else stays the same.

When to Use Async

ScenarioAsync?Why
Web servers (FastAPI)YesConcurrent request handling
Chatbots with many usersYesNon-blocking I/O
CLI scriptsNoSync is simpler
Jupyter notebooksNoEvent loop already running

Basic Async Setup

With Memori Cloud

import os
import asyncio
from memori import Memori
from openai import AsyncOpenAI

os.environ["MEMORI_API_KEY"] = "your-api-key"

async def main():
    client = AsyncOpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id="user_123", process_id="async_agent")

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "I prefer async Python."}]
    )
    print(response.choices[0].message.content)
    mem.augmentation.wait()

asyncio.run(main())

With BYODB (SQLAlchemy)

import asyncio
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from memori import Memori
from openai import AsyncOpenAI

engine = create_engine("sqlite:///memori.db")
SessionLocal = sessionmaker(bind=engine)

async def main():
    client = AsyncOpenAI()
    mem = Memori(conn=SessionLocal).llm.register(client)
    mem.attribution(entity_id="user_123", process_id="async_agent")
    mem.config.storage.build()

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "I prefer async Python."}]
    )
    print(response.choices[0].message.content)
    mem.augmentation.wait()

asyncio.run(main())

AsyncAnthropic

Memori works identically with Anthropic’s async client:
import asyncio
from memori import Memori
from anthropic import AsyncAnthropic

async def main():
    client = AsyncAnthropic()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id="user_123", process_id="claude_agent")

    message = await client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=1024,
        messages=[{"role": "user", "content": "Hello, Claude!"}]
    )
    print(message.content[0].text)
    mem.augmentation.wait()

asyncio.run(main())

FastAPI Integration

Basic FastAPI Example

import os
from fastapi import FastAPI
from pydantic import BaseModel
from memori import Memori
from openai import AsyncOpenAI

app = FastAPI()
os.environ["MEMORI_API_KEY"] = "your-api-key"

class ChatRequest(BaseModel):
    message: str

@app.post("/chat/{user_id}")
async def chat(user_id: str, req: ChatRequest):
    client = AsyncOpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id=user_id, process_id="fastapi_async")

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": req.message}]
    )
    return {"response": response.choices[0].message.content}

FastAPI with BYODB

import os
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from memori import Memori
from openai import AsyncOpenAI

app = FastAPI()

# Database setup
engine = create_engine(
    "sqlite:///memori.db",
    connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(bind=engine)

# Build schema on startup
@app.on_event("startup")
def startup():
    Memori(conn=SessionLocal).config.storage.build()

class ChatRequest(BaseModel):
    message: str

@app.post("/chat/{user_id}")
async def chat(user_id: str, req: ChatRequest):
    client = AsyncOpenAI()
    mem = Memori(conn=SessionLocal).llm.register(client)
    mem.attribution(entity_id=user_id, process_id="fastapi_async")

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": req.message}]
    )
    return {"response": response.choices[0].message.content}

FastAPI with Dependency Injection

from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from memori import Memori
from openai import AsyncOpenAI

app = FastAPI()
security = HTTPBearer()

def get_user_id(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
    # Validate token and extract user ID
    token = credentials.credentials
    # Replace with actual authentication logic
    if token == "valid_token":
        return "user_123"
    raise HTTPException(status_code=401, detail="Invalid token")

class ChatRequest(BaseModel):
    message: str

@app.post("/chat")
async def chat(
    req: ChatRequest,
    user_id: str = Depends(get_user_id)
):
    client = AsyncOpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id=user_id, process_id="authenticated_chat")

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": req.message}]
    )
    return {"response": response.choices[0].message.content}

Streaming Responses

Memori supports streaming with async clients:
import asyncio
from memori import Memori
from openai import AsyncOpenAI

async def stream_chat():
    client = AsyncOpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id="user_123", process_id="streaming_agent")

    stream = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Tell me a story"}],
        stream=True
    )

    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            print(content, end="", flush=True)

    print()  # Newline after streaming
    mem.augmentation.wait()

asyncio.run(stream_chat())

FastAPI Streaming

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from memori import Memori
from openai import AsyncOpenAI

app = FastAPI()

async def generate_stream(user_id: str, message: str):
    client = AsyncOpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id=user_id, process_id="stream_bot")

    stream = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": message}],
        stream=True
    )

    async for chunk in stream:
        content = chunk.choices[0].delta.content
        if content:
            yield content

@app.get("/stream/{user_id}")
async def stream_chat(user_id: str, message: str):
    return StreamingResponse(
        generate_stream(user_id, message),
        media_type="text/event-stream"
    )

Concurrent Operations

Multiple Users Concurrently

import asyncio
from memori import Memori
from openai import AsyncOpenAI

async def handle_user(user_id: str, message: str):
    client = AsyncOpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id=user_id, process_id="concurrent_bot")

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": message}]
    )
    return user_id, response.choices[0].message.content

async def main():
    users = [
        ("user_alice", "What's my favorite color?"),
        ("user_bob", "Tell me about my preferences"),
        ("user_charlie", "What do I like?")
    ]

    # Process all users concurrently
    results = await asyncio.gather(*[
        handle_user(user_id, message)
        for user_id, message in users
    ])

    for user_id, response in results:
        print(f"{user_id}: {response}")

asyncio.run(main())

Batched Recall

import asyncio
from memori import Memori

async def batch_recall(entity_ids: list[str], query: str):
    tasks = []
    for entity_id in entity_ids:
        mem = Memori()
        mem.attribution(entity_id=entity_id, process_id="batch_recall")
        # Note: recall is sync, so run in executor
        task = asyncio.to_thread(mem.recall, query, limit=5)
        tasks.append(task)
    
    results = await asyncio.gather(*tasks)
    return dict(zip(entity_ids, results))

async def main():
    users = ["user_alice", "user_bob", "user_charlie"]
    results = await batch_recall(users, "coding preferences")
    
    for user_id, facts in results.items():
        print(f"\n{user_id}:")
        for fact in facts:
            print(f"  - {fact.content}")

asyncio.run(main())

Internal Async Architecture

Memori’s Advanced Augmentation engine uses async internally, even when you use sync clients: Key components:
  • Async event loop — Dedicated thread for augmentation processing (memori/memory/augmentation/_runtime.py)
  • Semaphore — Controls max concurrent augmentation tasks (default: 50)
  • Batched DB writes — Queue-based writer with async processing
Source: memori/memory/augmentation/_manager.py:112-153
# Internal augmentation flow (simplified)
async def _process_augmentations(input_data):
    async with runtime.semaphore:  # Limit concurrency
        ctx = AugmentationContext(payload=input_data)
        
        for aug in self.augmentations:
            ctx = await aug.process(ctx, driver)
        
        if ctx.writes:
            self._enqueue_writes(ctx.writes)  # Batched writes

Mixing Sync and Async

You can use sync clients in async contexts and vice versa:
import asyncio
from memori import Memori
from openai import OpenAI, AsyncOpenAI

async def mixed_example():
    # Async client (preferred in async contexts)
    async_client = AsyncOpenAI()
    mem_async = Memori().llm.register(async_client)
    mem_async.attribution(entity_id="user_123", process_id="async")
    
    response = await async_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Async message"}]
    )
    print(response.choices[0].message.content)
    
    # Sync client in async context (runs in thread pool)
    sync_client = OpenAI()
    mem_sync = Memori().llm.register(sync_client)
    mem_sync.attribution(entity_id="user_456", process_id="sync")
    
    response = await asyncio.to_thread(
        sync_client.chat.completions.create,
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Sync message"}]
    )
    print(response.choices[0].message.content)

asyncio.run(mixed_example())
Prefer async clients in async contexts for better performance. Use asyncio.to_thread() only when necessary for sync operations.

Thread Safety

Memori uses a ThreadPoolExecutor for background operations:
  • Default workers: 15 (memori/_config.py:87)
  • Thread-safe: Each Memori instance manages its own state
  • Connection pooling: Handled by the database adapter
import asyncio
from memori import Memori
from openai import AsyncOpenAI

async def worker(user_id: str):
    # Each async task gets its own Memori instance
    client = AsyncOpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id=user_id, process_id="worker")
    
    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Hello"}]
    )
    return response.choices[0].message.content

async def main():
    # Run 100 concurrent workers
    tasks = [worker(f"user_{i}") for i in range(100)]
    results = await asyncio.gather(*tasks)
    print(f"Processed {len(results)} users")

asyncio.run(main())

Error Handling

Async Error Handling

import asyncio
from memori import Memori, QuotaExceededError
from openai import AsyncOpenAI

async def safe_chat(user_id: str, message: str):
    try:
        client = AsyncOpenAI()
        mem = Memori().llm.register(client)
        mem.attribution(entity_id=user_id, process_id="safe_bot")

        response = await client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": message}]
        )
        return response.choices[0].message.content
    except QuotaExceededError:
        return "Quota exceeded — please try again later"
    except Exception as e:
        return f"Error: {str(e)}"

async def main():
    result = await safe_chat("user_123", "Hello")
    print(result)

asyncio.run(main())

Performance Tips

  1. Use async clientsAsyncOpenAI and AsyncAnthropic for better concurrency
  2. Batch operations — Process multiple users concurrently with asyncio.gather()
  3. Don’t await augmentation — Let it run in the background for long-running apps
  4. Use connection pooling — SQLAlchemy’s sessionmaker provides connection pooling
  5. Tune worker pools — Increase max_workers for augmentation if needed
from memori import Memori

mem = Memori()

# Tune augmentation for higher throughput
mem.augmentation.max_workers = 100  # More concurrent augmentation tasks
mem.augmentation.db_writer_batch_size = 200  # Larger batches
Increasing worker pools and batch sizes improves throughput but increases memory usage. Monitor your application’s resource consumption when tuning these parameters.

Testing Async Code

pytest-asyncio

import pytest
from memori import Memori
from openai import AsyncOpenAI

@pytest.mark.asyncio
async def test_async_chat():
    client = AsyncOpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id="test_user", process_id="test_bot")

    response = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Test message"}]
    )
    
    assert response.choices[0].message.content
    mem.augmentation.wait()  # Wait for augmentation in tests
Always call mem.augmentation.wait() in tests to ensure augmentation completes before assertions.

Build docs developers (and LLMs) love