Skip to main content

Overview

LangGraph supports multiple streaming modes to provide real-time feedback, build responsive UIs, and monitor graph execution. Instead of waiting for the entire graph to complete, you can process results as they become available.

Stream Modes

LangGraph offers 7 different streaming modes, each serving different use cases:
from langgraph.graph import StateGraph

graph = builder.compile()

# Single mode
for event in graph.stream(input_data, stream_mode="values"):
    print(event)

# Multiple modes simultaneously
for event in graph.stream(input_data, stream_mode=["values", "updates"]):
    print(event)

Mode Overview

values

Complete state after each step

updates

Individual node outputs

messages

LLM token streaming

custom

User-defined events

checkpoints

State snapshots

tasks

Task execution events

debug

Debugging information

Values Mode

Emits the complete state after each step:
from typing_extensions import TypedDict

class State(TypedDict):
    messages: list[str]
    count: int

for state in graph.stream({"messages": [], "count": 0}, stream_mode="values"):
    print(state)
    # {'messages': ['Hello'], 'count': 1}
    # {'messages': ['Hello', 'Response'], 'count': 2}
    # {'messages': ['Hello', 'Response', 'Done'], 'count': 3}
Use cases:
  • Display complete state in UI
  • Monitor full state changes
  • Simple progress tracking
values is the default stream mode. It includes the initial state before any nodes execute.

Updates Mode

Emits individual node outputs as they complete:
for event in graph.stream({"count": 0}, stream_mode="updates"):
    print(event)
    # {'node1': {'count': 1}}
    # {'node2': {'count': 2}}
    # {'node3': {'count': 3}}
Each event is a dictionary with:
  • Key: Node name
  • Value: Node’s output (state update)
Use cases:
  • Track which nodes executed
  • Show per-node progress
  • Collect individual results

Parallel Node Updates

# If nodes run in parallel, you get separate events
for event in graph.stream(input_data, stream_mode="updates"):
    print(event)
    # {'fetch_user': {'user': {...}}}
    # {'fetch_orders': {'orders': [...]}}
    # {'combine': {'result': {...}}}

Messages Mode

Stream LLM tokens in real-time:
from langgraph.graph import MessagesState

class State(MessagesState):
    pass

def llm_node(state: State):
    # LLM calls are automatically streamed
    response = llm.invoke(state["messages"])
    return {"messages": [response]}

for event in graph.stream(
    {"messages": [("user", "Tell me a story")]},
    stream_mode="messages"
):
    # Event format: (message_chunk, metadata)
    chunk, metadata = event
    print(chunk.content, end="", flush=True)
    
    # Metadata includes:
    # - langgraph_node: which node emitted this
    # - langgraph_step: execution step
    # - langgraph_path: node path in nested graphs

Message Metadata

for event in graph.stream(input_data, stream_mode="messages"):
    chunk, metadata = event
    
    print(f"Node: {metadata['langgraph_node']}")
    print(f"Step: {metadata['langgraph_step']}")
    print(f"Content: {chunk.content}")
    
    # For tool calls
    if hasattr(chunk, "tool_calls"):
        print(f"Tool calls: {chunk.tool_calls}")
Use cases:
  • Real-time chat interfaces
  • Streaming chatbots
  • Progressive text generation

Custom Mode

Emit custom events from within nodes:
from langgraph.types import StreamWriter

def my_node(state: State, writer: StreamWriter) -> dict:
    """Node that emits custom events."""
    
    # Emit progress updates
    writer("Starting processing...")
    
    for i, item in enumerate(state["items"]):
        result = process_item(item)
        
        # Emit custom event
        writer({
            "type": "progress",
            "item_index": i,
            "result": result,
            "percentage": (i + 1) / len(state["items"]) * 100
        })
    
    writer("Processing complete!")
    
    return {"status": "done"}

# Stream custom events
for event in graph.stream(input_data, stream_mode="custom"):
    print(event)
    # "Starting processing..."
    # {'type': 'progress', 'item_index': 0, ...}
    # {'type': 'progress', 'item_index': 1, ...}
    # "Processing complete!"
StreamWriter is automatically injected when requested as a parameter. It’s a no-op when not using stream_mode="custom".
Use cases:
  • Fine-grained progress tracking
  • Custom metrics/telemetry
  • Application-specific events

Checkpoints Mode

Emits state snapshots when checkpoints are created:
for event in graph.stream(input_data, stream_mode="checkpoints"):
    print(event)
    # StateSnapshot(
    #     values={'messages': [...], 'count': 1},
    #     next=('node2',),
    #     config={'configurable': {'thread_id': '1'}},
    #     metadata={'step': 0, 'source': 'loop'},
    #     created_at='2024-01-15T10:30:00Z',
    #     tasks=(...),
    # )
Each checkpoint includes:
  • values: Current state
  • next: Upcoming nodes
  • config: Runtime configuration
  • metadata: Step info, source
  • created_at: Timestamp
  • tasks: Pending tasks
Use cases:
  • Monitor checkpointing
  • Display execution timeline
  • Debug state persistence
Requires a checkpointer to be configured.

Tasks Mode

Emits events for task lifecycle:
for event in graph.stream(input_data, stream_mode="tasks"):
    print(event)
    # ('task', {
    #     'id': 'abc-123',
    #     'name': 'process_data',
    #     'path': ('process_data',),
    #     'result': None,  # None while running
    #     'error': None,
    # })
    # 
    # ('task_result', {
    #     'id': 'abc-123',
    #     'name': 'process_data',
    #     'path': ('process_data',),
    #     'result': {'count': 1},
    #     'error': None,
    # })
Event types:
  • task: Task started
  • task_result: Task completed (with result or error)
Use cases:
  • Monitor task execution
  • Track task duration
  • Debug failures

Debug Mode

Combines checkpoints and tasks for comprehensive debugging:
for event in graph.stream(input_data, stream_mode="debug"):
    event_type, event_data = event
    
    if event_type == "checkpoint":
        print(f"Checkpoint at step {event_data.metadata['step']}")
    elif event_type == "task":
        print(f"Task {event_data['name']} started")
    elif event_type == "task_result":
        print(f"Task {event_data['name']} completed")
Use cases:
  • Development debugging
  • Troubleshooting execution
  • Performance analysis

Multiple Stream Modes

Combine modes for richer output:
for event in graph.stream(
    input_data,
    stream_mode=["values", "updates", "messages"]
):
    event_type = list(event.keys())[0]
    event_data = event[event_type]
    
    if event_type == "values":
        print(f"State: {event_data}")
    elif event_type == "updates":
        print(f"Update: {event_data}")
    elif event_type == "messages":
        chunk, metadata = event_data
        print(f"Token: {chunk.content}")
Each event is a dict with a single key indicating the mode:
{"values": <state>}
{"updates": <update>}
{"messages": (<chunk>, <metadata>)}

Async Streaming

All streaming modes support async iteration:
async for event in graph.astream(
    input_data,
    stream_mode="updates"
):
    await process_event(event)
Use async streaming for:
  • Async I/O operations
  • Concurrent event processing
  • WebSocket connections
  • Server-sent events (SSE)

Streaming with Subgraphs

Control subgraph streaming:
# Stream events from subgraphs too
for event in graph.stream(
    input_data,
    stream_mode="updates",
    subgraphs=True  # Include subgraph events
):
    print(event)
    # Events include namespace information
    # {'parent_node:subgraph_node': {'count': 1}}

Building a Streaming UI

Real-Time Chat Interface

from langgraph.graph import MessagesState

class ChatState(MessagesState):
    pass

def display_stream(user_message: str):
    config = {"configurable": {"thread_id": "chat-1"}}
    
    # Stream LLM tokens
    print("Assistant: ", end="")
    for event in graph.stream(
        {"messages": [("user", user_message)]},
        config=config,
        stream_mode="messages"
    ):
        chunk, metadata = event
        print(chunk.content, end="", flush=True)
    print()  # Newline

display_stream("What's the weather?")
# Assistant: I don't have access to real-time weather...

Progress Bar with Custom Events

from tqdm import tqdm
from langgraph.types import StreamWriter

def batch_processor(
    state: State,
    writer: StreamWriter
) -> dict:
    items = state["items"]
    
    for i, item in enumerate(items):
        result = process(item)
        writer({
            "processed": i + 1,
            "total": len(items)
        })
    
    return {"status": "complete"}

# Display progress
with tqdm(total=100) as pbar:
    for event in graph.stream(input_data, stream_mode="custom"):
        if isinstance(event, dict) and "processed" in event:
            pbar.update(1)

FastAPI SSE Endpoint

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.post("/stream")
async def stream_graph(request: dict):
    async def event_generator():
        async for event in graph.astream(
            request["input"],
            stream_mode="updates"
        ):
            yield f"data: {json.dumps(event)}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream"
    )

Stream Configuration

Early Emission

Force eager event emission:
graph = builder.compile(
    stream_eager=True  # Emit events as soon as possible
)
By default, events are batched. stream_eager=True reduces latency.

Filtering Stream Channels

Limit which state keys are streamed:
from langgraph.pregel import Pregel

# Using low-level Pregel API
app = Pregel(
    nodes={...},
    channels={...},
    output_channels=["messages", "status"],
    stream_channels=["messages"],  # Only stream 'messages'
)

Best Practices

  • Use values for state monitoring and simple UIs
  • Use updates to track individual node execution
  • Use messages for chat interfaces with LLMs
  • Use custom for application-specific events
  • Use debug during development
  • Combine modes when you need multiple perspectives
  • Use async streaming for I/O-bound applications
  • Enable stream_eager for lower latency
  • Limit state size to reduce serialization overhead
  • Filter stream channels to reduce bandwidth
  • Batch custom events when possible
  • Buffer tokens before displaying (avoid flickering)
  • Show loading indicators between node executions
  • Handle reconnection for long-running streams
  • Display node names from updates mode
  • Use custom events for progress bars

Troubleshooting

  • Verify correct stream mode
  • Check if graph has any nodes
  • Ensure nodes return state updates
  • For messages mode, confirm LLM is used
  • For custom mode, verify StreamWriter calls
  • Enable stream_eager=True
  • Check for buffering in transport layer
  • Verify async streaming is used correctly
  • Review node execution time
  • Ensure LLM supports streaming
  • Check that LLM is configured for streaming
  • Verify message format is correct
  • Review LangChain callback configuration

Next Steps

Human-in-the-Loop

Combine streaming with interrupts for human oversight

Checkpointing

Use checkpoint streaming for state monitoring

Build docs developers (and LLMs) love