Skip to main content
Streaming enables real-time display of LLM outputs as they’re generated, creating responsive user experiences. Instead of waiting for complete responses, users see text appear progressively.

Why Stream?

  • Better UX: Users see responses immediately, not after 10+ seconds
  • Lower latency: First token appears much faster
  • Transparency: Users can stop generation early if answer is sufficient
  • Long outputs: Handle long responses without timeout issues

Basic Streaming

All LangChain chat models support streaming via the stream() method:
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4")

# Stream response chunks
for chunk in model.stream("Write a short story about a robot"):
    print(chunk.content, end="", flush=True)

# Output appears progressively:
# Once
# Once upon
# Once upon a
# Once upon a time
# ...

Message Chunks

Streaming returns AIMessageChunk objects:
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4")

for chunk in model.stream("Tell me a joke"):
    print(f"Chunk: {chunk.content!r}")
    print(f"Type: {type(chunk)}")
    print(f"ID: {chunk.id}\n")

# Output:
# Chunk: 'Why'
# Type: <class 'langchain_core.messages.ai.AIMessageChunk'>
# ID: msg_abc123
#
# Chunk: ' did'
# Type: <class 'langchain_core.messages.ai.AIMessageChunk'>
# ID: msg_abc123

Async Streaming

Use async streaming for concurrent operations:
import asyncio
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4")

async def stream_response():
    async for chunk in model.astream("Explain quantum computing"):
        print(chunk.content, end="", flush=True)

# Run async function
await stream_response()

Streaming Multiple Queries

import asyncio
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4")

async def stream_query(query: str, prefix: str):
    print(f"\n{prefix}: ", end="")
    async for chunk in model.astream(query):
        print(chunk.content, end="", flush=True)

async def stream_multiple():
    # Stream multiple queries concurrently
    await asyncio.gather(
        stream_query("What is Python?", "Q1"),
        stream_query("What is JavaScript?", "Q2"),
        stream_query("What is Rust?", "Q3"),
    )

await stream_multiple()

Streaming Chains

Stream through LCEL chains:
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

model = ChatOpenAI(model="gpt-4")

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("human", "{question}")
])

parser = StrOutputParser()

# Build chain
chain = prompt | model | parser

# Stream through entire chain
for chunk in chain.stream({"question": "What is LangChain?"}):
    print(chunk, end="", flush=True)

Streaming with Multiple Steps

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough

model = ChatOpenAI(model="gpt-4")

# Multi-step chain
chain = (
    {"topic": RunnablePassthrough()}
    | ChatPromptTemplate.from_template("Tell me about {topic}")
    | model
    | StrOutputParser()
)

# Stream final output
for chunk in chain.stream("machine learning"):
    print(chunk, end="", flush=True)

Streaming Events

Get granular control with astream_events():
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate

model = ChatOpenAI(model="gpt-4")
prompt = ChatPromptTemplate.from_template("Write about {topic}")
chain = prompt | model

async def stream_with_events():
    async for event in chain.astream_events(
        {"topic": "AI"}, 
        version="v2"
    ):
        kind = event["event"]
        
        if kind == "on_chat_model_stream":
            # Model streaming chunks
            content = event["data"]["chunk"].content
            if content:
                print(content, end="", flush=True)
        
        elif kind == "on_chat_model_start":
            print("Model started...")
        
        elif kind == "on_chat_model_end":
            print("\nModel finished!")

await stream_with_events()

Event Types

Individual token/chunk from model:
if event["event"] == "on_chat_model_stream":
    chunk = event["data"]["chunk"]
    print(chunk.content, end="")

Streaming RAG

Stream retrieval-augmented generation:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.vectorstores import InMemoryVectorStore
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser

# Setup vector store
vectorstore = InMemoryVectorStore.from_texts(
    [
        "LangChain is a framework for LLM applications",
        "Streaming provides real-time responses",
        "RAG combines retrieval with generation"
    ],
    embedding=OpenAIEmbeddings()
)

retriever = vectorstore.as_retriever()

# Create RAG chain
prompt = ChatPromptTemplate.from_messages([
    ("system", "Answer using context: {context}"),
    ("human", "{question}")
])

model = ChatOpenAI(model="gpt-4")

rag_chain = (
    {
        "context": retriever | (lambda docs: "\n".join([d.page_content for d in docs])),
        "question": RunnablePassthrough()
    }
    | prompt
    | model
    | StrOutputParser()
)

# Stream RAG output
for chunk in rag_chain.stream("What is LangChain?"):
    print(chunk, end="", flush=True)

Streaming Tool Calls

Stream agent tool calls:
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool

@tool
def get_weather(location: str) -> str:
    """Get weather for a location."""
    return f"Sunny, 72°F in {location}"

model = ChatOpenAI(model="gpt-4")
model_with_tools = model.bind_tools([get_weather])

# Stream response with tool calls
for chunk in model_with_tools.stream("What's the weather in Paris?"):
    # Check for tool calls in chunk
    if chunk.tool_call_chunks:
        print(f"\nTool call: {chunk.tool_call_chunks}")
    elif chunk.content:
        print(chunk.content, end="", flush=True)

Token Usage Tracking

Track tokens while streaming:
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4")

total_tokens = 0
full_response = ""

for chunk in model.stream("Write a haiku about coding"):
    print(chunk.content, end="", flush=True)
    full_response += chunk.content

# Get usage from final message
final_message = model.invoke("Write a haiku about coding")
if final_message.usage_metadata:
    print(f"\n\nTokens used: {final_message.usage_metadata.total_tokens}")

Custom Stream Processing

Process chunks with custom logic:
from langchain_openai import ChatOpenAI
import re

model = ChatOpenAI(model="gpt-4")

def process_stream(query: str):
    """Stream with word counting."""
    word_count = 0
    buffer = ""
    
    for chunk in model.stream(query):
        content = chunk.content
        print(content, end="", flush=True)
        
        buffer += content
        # Count words when we see spaces
        if ' ' in content:
            words = buffer.strip().split()
            word_count += len(words) - 1  # Keep last word in buffer
            buffer = words[-1] if words else ""
    
    # Count final word
    if buffer.strip():
        word_count += 1
    
    print(f"\n\nTotal words: {word_count}")

process_stream("Write a paragraph about Python")

Buffering Strategies

def stream_by_word(chain, input_data):
    """Buffer and output complete words."""
    buffer = ""
    
    for chunk in chain.stream(input_data):
        buffer += chunk
        
        # Output complete words
        while ' ' in buffer:
            word, buffer = buffer.split(' ', 1)
            print(word, end=" ", flush=True)
    
    # Output remaining
    if buffer:
        print(buffer, flush=True)

Error Handling

Handle streaming errors gracefully:
from langchain_openai import ChatOpenAI
from langchain_core.exceptions import OutputParserException

model = ChatOpenAI(model="gpt-4")

try:
    for chunk in model.stream("Generate text"):
        print(chunk.content, end="", flush=True)
        
except Exception as e:
    print(f"\nStreaming error: {e}")
    # Fallback to non-streaming
    response = model.invoke("Generate text")
    print(response.content)

Streaming with Callbacks

Use callbacks for side effects:
from langchain_openai import ChatOpenAI
from langchain_core.callbacks import StreamingStdOutCallbackHandler

# Built-in streaming callback
model = ChatOpenAI(
    model="gpt-4",
    streaming=True,
    callbacks=[StreamingStdOutCallbackHandler()]
)

# Automatically streams to stdout
model.invoke("Tell me a story")

Custom Callback

from langchain_core.callbacks import BaseCallbackHandler

class CustomStreamHandler(BaseCallbackHandler):
    def __init__(self):
        self.tokens = []
    
    def on_llm_new_token(self, token: str, **kwargs) -> None:
        """Handle each new token."""
        self.tokens.append(token)
        print(f"[{len(self.tokens)}] {token}", end="", flush=True)

# Use custom handler
handler = CustomStreamHandler()
model = ChatOpenAI(model="gpt-4", streaming=True, callbacks=[handler])

model.invoke("Write a haiku")
print(f"\n\nTotal tokens: {len(handler.tokens)}")

Best Practices

1

Use async for concurrency

Process multiple streams concurrently with astream() and asyncio.gather().
2

Buffer appropriately

Choose buffering strategy based on use case (word, sentence, or time-based).
3

Handle errors gracefully

Wrap streaming in try-except and provide fallback to non-streaming.
4

Track usage metadata

Monitor token usage even when streaming for cost tracking.
5

Set appropriate timeouts

Configure timeouts to handle slow or stalled streams.
6

Test with slow connections

Ensure streaming works well with variable network conditions.

Performance Tips

  • Use astream() over stream() for async contexts
  • Buffer chunks for smoother display (word or sentence level)
  • Set temperature=0 for faster, more deterministic streaming
  • Use smaller models (gpt-4o-mini) for lower latency
  • Enable streaming callbacks for automatic handling

Next Steps

Build docs developers (and LLMs) love