Async Patterns
Memori works with Python’s async/await out of the box. Use AsyncOpenAI or AsyncAnthropic instead of their sync counterparts — everything else stays the same.
When to Use Async
| Scenario | Async? | Why |
|---|
| Web servers (FastAPI) | Yes | Concurrent request handling |
| Chatbots with many users | Yes | Non-blocking I/O |
| CLI scripts | No | Sync is simpler |
| Jupyter notebooks | No | Event loop already running |
Basic Async Setup
With Memori Cloud
import os
import asyncio
from memori import Memori
from openai import AsyncOpenAI
os.environ["MEMORI_API_KEY"] = "your-api-key"
async def main():
client = AsyncOpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id="user_123", process_id="async_agent")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "I prefer async Python."}]
)
print(response.choices[0].message.content)
mem.augmentation.wait()
asyncio.run(main())
With BYODB (SQLAlchemy)
import asyncio
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from memori import Memori
from openai import AsyncOpenAI
engine = create_engine("sqlite:///memori.db")
SessionLocal = sessionmaker(bind=engine)
async def main():
client = AsyncOpenAI()
mem = Memori(conn=SessionLocal).llm.register(client)
mem.attribution(entity_id="user_123", process_id="async_agent")
mem.config.storage.build()
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "I prefer async Python."}]
)
print(response.choices[0].message.content)
mem.augmentation.wait()
asyncio.run(main())
AsyncAnthropic
Memori works identically with Anthropic’s async client:
import asyncio
from memori import Memori
from anthropic import AsyncAnthropic
async def main():
client = AsyncAnthropic()
mem = Memori().llm.register(client)
mem.attribution(entity_id="user_123", process_id="claude_agent")
message = await client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello, Claude!"}]
)
print(message.content[0].text)
mem.augmentation.wait()
asyncio.run(main())
FastAPI Integration
Basic FastAPI Example
import os
from fastapi import FastAPI
from pydantic import BaseModel
from memori import Memori
from openai import AsyncOpenAI
app = FastAPI()
os.environ["MEMORI_API_KEY"] = "your-api-key"
class ChatRequest(BaseModel):
message: str
@app.post("/chat/{user_id}")
async def chat(user_id: str, req: ChatRequest):
client = AsyncOpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id=user_id, process_id="fastapi_async")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": req.message}]
)
return {"response": response.choices[0].message.content}
FastAPI with BYODB
import os
from fastapi import FastAPI, Depends
from pydantic import BaseModel
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from memori import Memori
from openai import AsyncOpenAI
app = FastAPI()
# Database setup
engine = create_engine(
"sqlite:///memori.db",
connect_args={"check_same_thread": False}
)
SessionLocal = sessionmaker(bind=engine)
# Build schema on startup
@app.on_event("startup")
def startup():
Memori(conn=SessionLocal).config.storage.build()
class ChatRequest(BaseModel):
message: str
@app.post("/chat/{user_id}")
async def chat(user_id: str, req: ChatRequest):
client = AsyncOpenAI()
mem = Memori(conn=SessionLocal).llm.register(client)
mem.attribution(entity_id=user_id, process_id="fastapi_async")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": req.message}]
)
return {"response": response.choices[0].message.content}
FastAPI with Dependency Injection
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from memori import Memori
from openai import AsyncOpenAI
app = FastAPI()
security = HTTPBearer()
def get_user_id(credentials: HTTPAuthorizationCredentials = Depends(security)) -> str:
# Validate token and extract user ID
token = credentials.credentials
# Replace with actual authentication logic
if token == "valid_token":
return "user_123"
raise HTTPException(status_code=401, detail="Invalid token")
class ChatRequest(BaseModel):
message: str
@app.post("/chat")
async def chat(
req: ChatRequest,
user_id: str = Depends(get_user_id)
):
client = AsyncOpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id=user_id, process_id="authenticated_chat")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": req.message}]
)
return {"response": response.choices[0].message.content}
Streaming Responses
Memori supports streaming with async clients:
import asyncio
from memori import Memori
from openai import AsyncOpenAI
async def stream_chat():
client = AsyncOpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id="user_123", process_id="streaming_agent")
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
print(content, end="", flush=True)
print() # Newline after streaming
mem.augmentation.wait()
asyncio.run(stream_chat())
FastAPI Streaming
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from memori import Memori
from openai import AsyncOpenAI
app = FastAPI()
async def generate_stream(user_id: str, message: str):
client = AsyncOpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id=user_id, process_id="stream_bot")
stream = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": message}],
stream=True
)
async for chunk in stream:
content = chunk.choices[0].delta.content
if content:
yield content
@app.get("/stream/{user_id}")
async def stream_chat(user_id: str, message: str):
return StreamingResponse(
generate_stream(user_id, message),
media_type="text/event-stream"
)
Concurrent Operations
Multiple Users Concurrently
import asyncio
from memori import Memori
from openai import AsyncOpenAI
async def handle_user(user_id: str, message: str):
client = AsyncOpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id=user_id, process_id="concurrent_bot")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": message}]
)
return user_id, response.choices[0].message.content
async def main():
users = [
("user_alice", "What's my favorite color?"),
("user_bob", "Tell me about my preferences"),
("user_charlie", "What do I like?")
]
# Process all users concurrently
results = await asyncio.gather(*[
handle_user(user_id, message)
for user_id, message in users
])
for user_id, response in results:
print(f"{user_id}: {response}")
asyncio.run(main())
Batched Recall
import asyncio
from memori import Memori
async def batch_recall(entity_ids: list[str], query: str):
tasks = []
for entity_id in entity_ids:
mem = Memori()
mem.attribution(entity_id=entity_id, process_id="batch_recall")
# Note: recall is sync, so run in executor
task = asyncio.to_thread(mem.recall, query, limit=5)
tasks.append(task)
results = await asyncio.gather(*tasks)
return dict(zip(entity_ids, results))
async def main():
users = ["user_alice", "user_bob", "user_charlie"]
results = await batch_recall(users, "coding preferences")
for user_id, facts in results.items():
print(f"\n{user_id}:")
for fact in facts:
print(f" - {fact.content}")
asyncio.run(main())
Internal Async Architecture
Memori’s Advanced Augmentation engine uses async internally, even when you use sync clients:
Key components:
- Async event loop — Dedicated thread for augmentation processing (
memori/memory/augmentation/_runtime.py)
- Semaphore — Controls max concurrent augmentation tasks (default: 50)
- Batched DB writes — Queue-based writer with async processing
Source: memori/memory/augmentation/_manager.py:112-153
# Internal augmentation flow (simplified)
async def _process_augmentations(input_data):
async with runtime.semaphore: # Limit concurrency
ctx = AugmentationContext(payload=input_data)
for aug in self.augmentations:
ctx = await aug.process(ctx, driver)
if ctx.writes:
self._enqueue_writes(ctx.writes) # Batched writes
Mixing Sync and Async
You can use sync clients in async contexts and vice versa:
import asyncio
from memori import Memori
from openai import OpenAI, AsyncOpenAI
async def mixed_example():
# Async client (preferred in async contexts)
async_client = AsyncOpenAI()
mem_async = Memori().llm.register(async_client)
mem_async.attribution(entity_id="user_123", process_id="async")
response = await async_client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Async message"}]
)
print(response.choices[0].message.content)
# Sync client in async context (runs in thread pool)
sync_client = OpenAI()
mem_sync = Memori().llm.register(sync_client)
mem_sync.attribution(entity_id="user_456", process_id="sync")
response = await asyncio.to_thread(
sync_client.chat.completions.create,
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Sync message"}]
)
print(response.choices[0].message.content)
asyncio.run(mixed_example())
Prefer async clients in async contexts for better performance. Use asyncio.to_thread() only when necessary for sync operations.
Thread Safety
Memori uses a ThreadPoolExecutor for background operations:
- Default workers: 15 (
memori/_config.py:87)
- Thread-safe: Each Memori instance manages its own state
- Connection pooling: Handled by the database adapter
import asyncio
from memori import Memori
from openai import AsyncOpenAI
async def worker(user_id: str):
# Each async task gets its own Memori instance
client = AsyncOpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id=user_id, process_id="worker")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Hello"}]
)
return response.choices[0].message.content
async def main():
# Run 100 concurrent workers
tasks = [worker(f"user_{i}") for i in range(100)]
results = await asyncio.gather(*tasks)
print(f"Processed {len(results)} users")
asyncio.run(main())
Error Handling
Async Error Handling
import asyncio
from memori import Memori, QuotaExceededError
from openai import AsyncOpenAI
async def safe_chat(user_id: str, message: str):
try:
client = AsyncOpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id=user_id, process_id="safe_bot")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": message}]
)
return response.choices[0].message.content
except QuotaExceededError:
return "Quota exceeded — please try again later"
except Exception as e:
return f"Error: {str(e)}"
async def main():
result = await safe_chat("user_123", "Hello")
print(result)
asyncio.run(main())
- Use async clients —
AsyncOpenAI and AsyncAnthropic for better concurrency
- Batch operations — Process multiple users concurrently with
asyncio.gather()
- Don’t await augmentation — Let it run in the background for long-running apps
- Use connection pooling — SQLAlchemy’s
sessionmaker provides connection pooling
- Tune worker pools — Increase
max_workers for augmentation if needed
from memori import Memori
mem = Memori()
# Tune augmentation for higher throughput
mem.augmentation.max_workers = 100 # More concurrent augmentation tasks
mem.augmentation.db_writer_batch_size = 200 # Larger batches
Increasing worker pools and batch sizes improves throughput but increases memory usage. Monitor your application’s resource consumption when tuning these parameters.
Testing Async Code
pytest-asyncio
import pytest
from memori import Memori
from openai import AsyncOpenAI
@pytest.mark.asyncio
async def test_async_chat():
client = AsyncOpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id="test_user", process_id="test_bot")
response = await client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Test message"}]
)
assert response.choices[0].message.content
mem.augmentation.wait() # Wait for augmentation in tests
Always call mem.augmentation.wait() in tests to ensure augmentation completes before assertions.