Skip to main content

Overview

State is the backbone of LangGraph applications. Every node reads from and writes to a shared state object, enabling coordination and data flow across your graph.

State Schema

TypedDict Schema

The most common approach uses TypedDict to define your state structure:
from typing_extensions import TypedDict
from langgraph.graph import StateGraph

class AgentState(TypedDict):
    messages: list[str]
    user_id: str
    iteration: int

graph = StateGraph(AgentState)
TypeScript-style for those familiar:
interface AgentState {
  messages: string[];
  user_id: string;
  iteration: number;
}

Pydantic Models

For validation and more complex types, use Pydantic:
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph

class AgentState(BaseModel):
    messages: list[str] = Field(default_factory=list)
    user_id: str
    iteration: int = 0
    
    class Config:
        # Allow arbitrary types in state
        arbitrary_types_allowed = True

graph = StateGraph(AgentState)

Annotated Types with Reducers

Reducers control how multiple updates to the same key are merged:
from typing import Annotated
import operator

class State(TypedDict):
    # Default: Last write wins
    status: str
    
    # Append to list
    messages: Annotated[list[str], operator.add]
    
    # Sum integers
    total_cost: Annotated[int, operator.add]
    
    # Custom reducer
    metadata: Annotated[dict, lambda old, new: {**old, **new}]
Reducers receive (current_value, new_value) and must return the merged result.

How State Updates Work

Node Return Values

Nodes return partial state updates. Only specified keys are updated:
class State(TypedDict):
    count: int
    message: str
    logs: list[str]

def increment(state: State) -> dict:
    # Only updates 'count', leaves 'message' and 'logs' unchanged
    return {"count": state["count"] + 1}

def log_action(state: State) -> dict:
    # Updates multiple keys
    return {
        "logs": [f"Action at count {state['count']}"],
        "message": "Processing"
    }

Update Semantics

from typing import Annotated
import operator

class State(TypedDict):
    # Without reducer: last write wins
    status: str
    
    # With reducer: accumulates all updates
    events: Annotated[list[str], operator.add]

def node_a(state: State) -> dict:
    return {"status": "A", "events": ["a"]}

def node_b(state: State) -> dict:
    return {"status": "B", "events": ["b"]}

# If both nodes run in the same step:
# - status will be "A" or "B" (undefined which wins)
# - events will be ["a", "b"] (both accumulated)
Multiple nodes updating the same non-reducer key in one step raises InvalidUpdateError.

Built-in Reducers

List Operations

from typing import Annotated
import operator

class State(TypedDict):
    # Concatenate lists
    items: Annotated[list, operator.add]
    
    # Or use a custom function
    unique_items: Annotated[list, lambda old, new: list(set(old + new))]

Message History

LangGraph provides specialized support for message lists:
from langgraph.graph.message import add_messages, MessagesState

# Use the built-in MessagesState
class State(MessagesState):
    # Inherits 'messages' with add_messages reducer
    user_id: str

# Or define manually
class CustomState(TypedDict):
    messages: Annotated[list, add_messages]
    other_field: str
The add_messages reducer:
  • Appends new messages
  • Updates messages by ID if they already exist
  • Removes messages when passed RemoveMessage(id=...)
from langchain_core.messages import HumanMessage, AIMessage, RemoveMessage

def node(state: State):
    return {
        "messages": [
            HumanMessage(content="Hello", id="1"),
            AIMessage(content="Hi there!", id="2"),
        ]
    }

def remove_old(state: State):
    # Remove message with id="1"
    return {"messages": [RemoveMessage(id="1")]}

Numeric Aggregation

import operator
from typing import Annotated

class Metrics(TypedDict):
    # Sum all updates
    total_tokens: Annotated[int, operator.add]
    
    # Multiply updates
    score: Annotated[float, operator.mul]
    
    # Maximum value
    max_latency: Annotated[float, max]
    
    # Minimum value  
    min_cost: Annotated[float, min]

Input and Output Schemas

Control what data enters and exits your graph:
class InternalState(TypedDict):
    messages: list
    internal_log: list[str]  # Not exposed
    user_id: str

class InputState(TypedDict):
    messages: list
    user_id: str

class OutputState(TypedDict):
    messages: list
    # user_id not included in output

graph = StateGraph(
    state_schema=InternalState,
    input_schema=InputState,   # What users provide
    output_schema=OutputState   # What users receive
)
  • input_schema: Validates and maps input to internal state
  • output_schema: Filters internal state before returning
  • All three schemas must share overlapping keys

Channels: The State Backend

Under the hood, state is stored in channels. Each state key maps to a channel:
from langgraph.channels import LastValue, BinaryOperatorAggregate
import operator

# StateGraph automatically creates channels:
class State(TypedDict):
    # Creates LastValue channel
    message: str
    
    # Creates BinaryOperatorAggregate channel with operator.add
    count: Annotated[int, operator.add]

Channel Types

LastValue

Stores the most recent value. Default for non-annotated keys.

BinaryOperatorAggregate

Applies a reducer function to accumulate updates.

Topic

PubSub channel for multi-value communication.

EphemeralValue

Temporary value that doesn’t persist across steps.

Direct Channel Usage (Advanced)

from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue, Topic

app = Pregel(
    nodes={"processor": my_node},
    channels={
        "input": LastValue(str),
        "events": Topic(str, accumulate=True),
        "output": LastValue(dict),
    },
    input_channels="input",
    output_channels=["output", "events"],
)

Node-Specific Input Schemas

Nodes can have their own input schemas, different from the graph state:
class GraphState(TypedDict):
    messages: list
    user_id: str
    metadata: dict

class NodeInput(TypedDict):
    messages: list  # Only needs messages

def focused_node(state: NodeInput) -> dict:
    # Receives only 'messages' from graph state
    last_msg = state["messages"][-1]
    return {"messages": [process(last_msg)]}

builder = StateGraph(GraphState)
builder.add_node(
    "process",
    focused_node,
    input_schema=NodeInput  # Node receives filtered state
)
Benefits:
  • Clearer node signatures
  • Reduced coupling
  • Easier testing

Overwriting Reducers

Bypass a reducer to replace a value entirely:
from typing import Annotated
import operator
from langgraph.types import Overwrite

class State(TypedDict):
    items: Annotated[list, operator.add]

def accumulate(state: State) -> dict:
    # Normal: appends to list
    return {"items": ["new"]}

def reset(state: State) -> dict:
    # Bypasses operator.add, replaces entirely
    return {"items": Overwrite(value=[])}

Context vs State

LangGraph separates mutable state from immutable context:
from langgraph.runtime import Runtime

class State(TypedDict):
    messages: list  # Mutable

class Context(TypedDict):
    user_id: str     # Immutable
    db_conn: object  # Immutable

graph = StateGraph(
    state_schema=State,
    context_schema=Context
)

def node(state: State, runtime: Runtime[Context]) -> dict:
    # Access immutable context
    user_id = runtime.context["user_id"]
    db = runtime.context["db_conn"]
    
    # Mutate state
    return {"messages": [f"Hello {user_id}"]}

# Invoke with context
result = graph.invoke(
    {"messages": []},
    context={"user_id": "123", "db_conn": db}
)

State Persistence

With a checkpointer, state is automatically persisted:
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# First invocation
config = {"configurable": {"thread_id": "conversation-1"}}
result1 = graph.invoke({"messages": ["Hi"]}, config)

# Second invocation - continues from previous state
result2 = graph.invoke({"messages": ["How are you?"]}, config)
See Checkpointing for more details.

Best Practices

  • Use TypedDict for simple state, Pydantic for validation
  • Keep state flat when possible
  • Use meaningful, descriptive key names
  • Document reducer behavior clearly
  • Always use reducers for list/dict accumulation
  • Test reducer logic independently
  • Be cautious with non-deterministic reducers
  • Consider order-independence
  • Minimize state size for faster checkpointing
  • Use input/output schemas to limit data transfer
  • Consider lazy loading for large objects
  • Store references, not full objects when possible

Next Steps

Nodes & Edges

Learn how nodes consume and update state

Checkpointing

Persist state across invocations

Build docs developers (and LLMs) love