Skip to main content

Overview

Memori works seamlessly with streaming responses from LLM providers. Memories are captured even when responses are streamed chunk by chunk.

Basic Streaming

from openai import OpenAI
from memori import Memori

client = OpenAI()
mem = Memori().llm.register(client)
mem.attribution(entity_id="user-123", process_id="chat-app")

# Stream the response
stream = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Tell me about quantum computing"}],
    stream=True,
)

# Process chunks as they arrive
full_response = ""
for chunk in stream:
    if chunk.choices[0].delta.content:
        content = chunk.choices[0].delta.content
        full_response += content
        print(content, end="", flush=True)

print()  # New line

# Memori automatically captures the complete conversation
mem.augmentation.wait()  # Wait for memory processing

Streaming with Async

import asyncio
from openai import AsyncOpenAI
from memori import Memori

async def stream_chat():
    client = AsyncOpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id="user-456", process_id="async-chat")

    # Stream the response asynchronously
    stream = await client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": "Explain neural networks"}],
        stream=True,
    )

    # Process chunks as they arrive
    full_response = ""
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            content = chunk.choices[0].delta.content
            full_response += content
            print(content, end="", flush=True)

    print()

    # Give time for memory processing
    await asyncio.sleep(1)

asyncio.run(stream_chat())

Real-Time Web Application

Here’s how to build a streaming chat application with Memori.
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI
from memori import Memori
import json

app = FastAPI()

@app.post("/chat/stream")
async def stream_chat(user_id: str, message: str):
    client = OpenAI()
    mem = Memori().llm.register(client)
    mem.attribution(entity_id=user_id, process_id="web-chat")

    async def generate():
        stream = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": message}],
            stream=True,
        )

        for chunk in stream:
            if chunk.choices[0].delta.content:
                content = chunk.choices[0].delta.content
                # Send as Server-Sent Events
                yield f"data: {json.dumps({'content': content})}\n\n"

        # Signal completion
        yield f"data: {json.dumps({'done': True})}\n\n"

    return StreamingResponse(generate(), media_type="text/event-stream")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

Client-Side Integration

<!DOCTYPE html>
<html>
<head>
  <title>Streaming Chat</title>
</head>
<body>
  <div id="chat"></div>
  <input id="message" type="text" placeholder="Type a message...">
  <button onclick="sendMessage()">Send</button>

  <script>
    async function sendMessage() {
      const message = document.getElementById('message').value;
      const userId = 'user-123'; // Get from auth system

      const response = await fetch('/chat/stream', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ user_id: userId, message }),
      });

      const reader = response.body.getReader();
      const decoder = new TextDecoder();
      const chatDiv = document.getElementById('chat');
      let messageDiv = document.createElement('div');
      chatDiv.appendChild(messageDiv);

      while (true) {
        const { done, value } = await reader.read();
        if (done) break;

        const chunk = decoder.decode(value);
        const lines = chunk.split('\n');

        for (const line of lines) {
          if (line.startsWith('data: ')) {
            const data = JSON.parse(line.slice(6));
            if (data.content) {
              messageDiv.textContent += data.content;
            }
          }
        }
      }

      document.getElementById('message').value = '';
    }
  </script>
</body>
</html>

Streaming with Anthropic

from anthropic import Anthropic
from memori import Memori

client = Anthropic()
mem = Memori().llm.register(client)
mem.attribution(entity_id="user-789", process_id="claude-chat")

# Stream with Claude
with client.messages.stream(
    model="claude-3-haiku-20240307",
    messages=[{"role": "user", "content": "Write a haiku about AI"}],
    max_tokens=100,
) as stream:
    for text in stream.text_stream:
        print(text, end="", flush=True)

print()
mem.augmentation.wait()

How Memory Works with Streaming

1

Request Initiated

Memori intercepts the streaming request and recalls relevant memories before the stream begins.
2

Stream Processed

Chunks are streamed to your application in real-time without interference.
3

Response Captured

Memori collects all chunks to reconstruct the complete response.
4

Memory Created

After streaming completes, Memori asynchronously extracts and stores memories.

Best Practices

Handle Stream Completion

Always process the stream to completion. Prematurely closing the stream may prevent memory capture.

Wait for Augmentation

For short-lived applications, call mem.augmentation.wait() after streaming to ensure memories are saved.

Error Handling

Wrap streaming logic in try-catch blocks. Network errors during streaming won’t affect memory capture.

Performance

Streaming with Memori adds minimal latency. Memory processing happens asynchronously after the stream.

Next Steps

Async Operations

Learn about async memory operations

Custom Embeddings

Use custom embedding models

Build docs developers (and LLMs) love