Skip to main content
Stream LLM responses token-by-token using async generators. Users see output as it’s generated, not after the full response completes.

When to Use

  • Chat interfaces — Show responses as they’re generated
  • Long-form content — Don’t make users wait for full generation
  • Real-time feedback — Stream progress indicators

Basic Pattern

Use async generators to yield tokens as they arrive:
from hypergraph import Graph, node, AsyncRunner
from anthropic import Anthropic

client = Anthropic()

@node(output_name="response")
async def stream_response(messages: list, system: str = "") -> str:
    """Stream tokens from Claude, yielding as they arrive."""

    chunks = []

    with client.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=1024,
        system=system,
        messages=messages,
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)  # Stream to user
            chunks.append(text)

    print()  # Newline after streaming
    return "".join(chunks)


graph = Graph([stream_response])
runner = AsyncRunner()

result = await runner.run(graph, {
    "messages": [{"role": "user", "content": "Explain quantum computing"}],
    "system": "You are a helpful physics tutor.",
})
1

Stream tokens as they arrive

Print each chunk immediately using flush=True
2

Accumulate for final result

Store chunks to return the complete response
3

Return full text

The node still produces the full response for downstream nodes

Streaming with OpenAI

from openai import OpenAI

client = OpenAI()

@node(output_name="response")
async def stream_openai(prompt: str, instructions: str = "") -> str:
    """Stream tokens from GPT-5.2 using the Responses API."""

    stream = client.responses.create(
        model="gpt-5.2",
        input=prompt,
        instructions=instructions,
        stream=True,
    )

    chunks = []
    for part in stream:
        if part.output_text:
            print(part.output_text, end="", flush=True)
            chunks.append(part.output_text)

    print()
    return "".join(chunks)

Async Generator Nodes

For true async iteration, use async generators:
from typing import AsyncIterator

@node(output_name="tokens")
async def generate_tokens(prompt: str) -> AsyncIterator[str]:
    """Yield tokens one at a time."""

    with client.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        for text in stream.text_stream:
            yield text


# Check node properties
print(generate_tokens.is_async)      # True
print(generate_tokens.is_generator)  # True
Async generators are detected automatically from the function signature. The is_async and is_generator properties reflect this.

Streaming in RAG Pipelines

Combine retrieval (fast) with streaming generation:
@node(output_name="docs")
async def retrieve(query: str) -> list[str]:
    """Fast retrieval - no need to stream."""
    embedding = await embedder.embed(query)
    return await vector_db.search(embedding, k=5)

@node(output_name="response")
async def generate(docs: list[str], query: str) -> str:
    """Stream the generation step."""

    context = "\n\n---\n\n".join(docs)

    with client.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=2048,
        system=f"Answer based on this context:\n{context}",
        messages=[{"role": "user", "content": query}],
    ) as stream:
        chunks = []
        for text in stream.text_stream:
            print(text, end="", flush=True)
            chunks.append(text)

    print()
    return "".join(chunks)


rag_pipeline = Graph([retrieve, generate])

runner = AsyncRunner()
result = await runner.run(rag_pipeline, {"query": "How do I use hypergraph?"})

Streaming with Callbacks

Pass a callback for custom handling:
from typing import Callable

@node(output_name="response")
async def generate_with_callback(
    prompt: str,
    on_token: Callable[[str], None] | None = None,
) -> str:
    """Stream tokens with optional callback."""

    chunks = []

    with client.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=1024,
        messages=[{"role": "user", "content": prompt}],
    ) as stream:
        for text in stream.text_stream:
            if on_token:
                on_token(text)
            chunks.append(text)

    return "".join(chunks)


# Usage with callback
def handle_token(token: str):
    # Send to websocket, update UI, etc.
    websocket.send(token)

result = await runner.run(graph, {
    "prompt": "Write a story",
    "on_token": handle_token,
})
Use callbacks to send tokens to websockets, update progress bars, or implement custom streaming logic.

Multi-Turn Streaming

Stream responses in a conversation loop:
from hypergraph import route, END

@node(output_name="response")
async def stream_turn(messages: list, user_input: str) -> str:
    """Stream one conversation turn."""

    full_messages = messages + [{"role": "user", "content": user_input}]

    chunks = []
    print("Assistant: ", end="")

    with client.messages.stream(
        model="claude-sonnet-4-5-20250929",
        max_tokens=1024,
        messages=full_messages,
    ) as stream:
        for text in stream.text_stream:
            print(text, end="", flush=True)
            chunks.append(text)

    print("\n")
    return "".join(chunks)

@node(output_name="messages")
def update_history(messages: list, user_input: str, response: str) -> list:
    return messages + [
        {"role": "user", "content": user_input},
        {"role": "assistant", "content": response},
    ]

@route(targets=["stream_turn", END])
def should_continue(messages: list) -> str:
    if len(messages) >= 20:
        return END
    return "stream_turn"

streaming_chat = Graph([stream_turn, update_history, should_continue])

Error Handling in Streams

Handle streaming errors gracefully:
@node(output_name="response")
async def safe_stream(prompt: str) -> str:
    """Stream with error handling."""

    chunks = []

    try:
        with client.messages.stream(
            model="claude-sonnet-4-5-20250929",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            for text in stream.text_stream:
                print(text, end="", flush=True)
                chunks.append(text)

        print()
        return "".join(chunks)

    except Exception as e:
        # Return partial response if available
        if chunks:
            return "".join(chunks) + f"\n\n[Error: {e}]"
        raise
Always handle streaming errors. LLM APIs can fail mid-stream, and you should preserve partial results when possible.

Testing Streaming Nodes

Test the accumulated output:
@pytest.mark.asyncio
async def test_streaming():
    graph = Graph([stream_response])
    runner = AsyncRunner()

    result = await runner.run(graph, {
        "messages": [{"role": "user", "content": "Say hello"}],
    })

    assert "response" in result
    assert len(result["response"]) > 0

What’s Next?

Build docs developers (and LLMs) love