Overview
State is the backbone of LangGraph applications. Every node reads from and writes to a shared state object, enabling coordination and data flow across your graph.
State Schema
TypedDict Schema
The most common approach uses TypedDict to define your state structure:
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
class AgentState ( TypedDict ):
messages: list[ str ]
user_id: str
iteration: int
graph = StateGraph(AgentState)
TypeScript-style for those familiar:
interface AgentState {
messages : string [];
user_id : string ;
iteration : number ;
}
Pydantic Models
For validation and more complex types, use Pydantic:
from pydantic import BaseModel, Field
from langgraph.graph import StateGraph
class AgentState ( BaseModel ):
messages: list[ str ] = Field( default_factory = list )
user_id: str
iteration: int = 0
class Config :
# Allow arbitrary types in state
arbitrary_types_allowed = True
graph = StateGraph(AgentState)
Annotated Types with Reducers
Reducers control how multiple updates to the same key are merged:
from typing import Annotated
import operator
class State ( TypedDict ):
# Default: Last write wins
status: str
# Append to list
messages: Annotated[list[ str ], operator.add]
# Sum integers
total_cost: Annotated[ int , operator.add]
# Custom reducer
metadata: Annotated[ dict , lambda old , new : { ** old, ** new}]
Reducers receive (current_value, new_value) and must return the merged result.
How State Updates Work
Node Return Values
Nodes return partial state updates. Only specified keys are updated:
class State ( TypedDict ):
count: int
message: str
logs: list[ str ]
def increment ( state : State) -> dict :
# Only updates 'count', leaves 'message' and 'logs' unchanged
return { "count" : state[ "count" ] + 1 }
def log_action ( state : State) -> dict :
# Updates multiple keys
return {
"logs" : [ f "Action at count { state[ 'count' ] } " ],
"message" : "Processing"
}
Update Semantics
from typing import Annotated
import operator
class State ( TypedDict ):
# Without reducer: last write wins
status: str
# With reducer: accumulates all updates
events: Annotated[list[ str ], operator.add]
def node_a ( state : State) -> dict :
return { "status" : "A" , "events" : [ "a" ]}
def node_b ( state : State) -> dict :
return { "status" : "B" , "events" : [ "b" ]}
# If both nodes run in the same step:
# - status will be "A" or "B" (undefined which wins)
# - events will be ["a", "b"] (both accumulated)
Multiple nodes updating the same non-reducer key in one step raises InvalidUpdateError.
Built-in Reducers
List Operations
from typing import Annotated
import operator
class State ( TypedDict ):
# Concatenate lists
items: Annotated[ list , operator.add]
# Or use a custom function
unique_items: Annotated[ list , lambda old , new : list ( set (old + new))]
Message History
LangGraph provides specialized support for message lists:
from langgraph.graph.message import add_messages, MessagesState
# Use the built-in MessagesState
class State ( MessagesState ):
# Inherits 'messages' with add_messages reducer
user_id: str
# Or define manually
class CustomState ( TypedDict ):
messages: Annotated[ list , add_messages]
other_field: str
The add_messages reducer:
Appends new messages
Updates messages by ID if they already exist
Removes messages when passed RemoveMessage(id=...)
from langchain_core.messages import HumanMessage, AIMessage, RemoveMessage
def node ( state : State):
return {
"messages" : [
HumanMessage( content = "Hello" , id = "1" ),
AIMessage( content = "Hi there!" , id = "2" ),
]
}
def remove_old ( state : State):
# Remove message with id="1"
return { "messages" : [RemoveMessage( id = "1" )]}
Numeric Aggregation
import operator
from typing import Annotated
class Metrics ( TypedDict ):
# Sum all updates
total_tokens: Annotated[ int , operator.add]
# Multiply updates
score: Annotated[ float , operator.mul]
# Maximum value
max_latency: Annotated[ float , max ]
# Minimum value
min_cost: Annotated[ float , min ]
Control what data enters and exits your graph:
class InternalState ( TypedDict ):
messages: list
internal_log: list[ str ] # Not exposed
user_id: str
class InputState ( TypedDict ):
messages: list
user_id: str
class OutputState ( TypedDict ):
messages: list
# user_id not included in output
graph = StateGraph(
state_schema = InternalState,
input_schema = InputState, # What users provide
output_schema = OutputState # What users receive
)
input_schema: Validates and maps input to internal state
output_schema: Filters internal state before returning
All three schemas must share overlapping keys
Channels: The State Backend
Under the hood, state is stored in channels . Each state key maps to a channel:
from langgraph.channels import LastValue, BinaryOperatorAggregate
import operator
# StateGraph automatically creates channels:
class State ( TypedDict ):
# Creates LastValue channel
message: str
# Creates BinaryOperatorAggregate channel with operator.add
count: Annotated[ int , operator.add]
Channel Types
LastValue Stores the most recent value. Default for non-annotated keys.
BinaryOperatorAggregate Applies a reducer function to accumulate updates.
Topic PubSub channel for multi-value communication.
EphemeralValue Temporary value that doesn’t persist across steps.
Direct Channel Usage (Advanced)
from langgraph.pregel import Pregel, NodeBuilder
from langgraph.channels import LastValue, Topic
app = Pregel(
nodes = { "processor" : my_node},
channels = {
"input" : LastValue( str ),
"events" : Topic( str , accumulate = True ),
"output" : LastValue( dict ),
},
input_channels = "input" ,
output_channels = [ "output" , "events" ],
)
Nodes can have their own input schemas, different from the graph state:
class GraphState ( TypedDict ):
messages: list
user_id: str
metadata: dict
class NodeInput ( TypedDict ):
messages: list # Only needs messages
def focused_node ( state : NodeInput) -> dict :
# Receives only 'messages' from graph state
last_msg = state[ "messages" ][ - 1 ]
return { "messages" : [process(last_msg)]}
builder = StateGraph(GraphState)
builder.add_node(
"process" ,
focused_node,
input_schema = NodeInput # Node receives filtered state
)
Benefits:
Clearer node signatures
Reduced coupling
Easier testing
Overwriting Reducers
Bypass a reducer to replace a value entirely:
from typing import Annotated
import operator
from langgraph.types import Overwrite
class State ( TypedDict ):
items: Annotated[ list , operator.add]
def accumulate ( state : State) -> dict :
# Normal: appends to list
return { "items" : [ "new" ]}
def reset ( state : State) -> dict :
# Bypasses operator.add, replaces entirely
return { "items" : Overwrite( value = [])}
Context vs State
LangGraph separates mutable state from immutable context:
from langgraph.runtime import Runtime
class State ( TypedDict ):
messages: list # Mutable
class Context ( TypedDict ):
user_id: str # Immutable
db_conn: object # Immutable
graph = StateGraph(
state_schema = State,
context_schema = Context
)
def node ( state : State, runtime : Runtime[Context]) -> dict :
# Access immutable context
user_id = runtime.context[ "user_id" ]
db = runtime.context[ "db_conn" ]
# Mutate state
return { "messages" : [ f "Hello { user_id } " ]}
# Invoke with context
result = graph.invoke(
{ "messages" : []},
context = { "user_id" : "123" , "db_conn" : db}
)
State Persistence
With a checkpointer, state is automatically persisted:
from langgraph.checkpoint.memory import InMemorySaver
checkpointer = InMemorySaver()
graph = builder.compile( checkpointer = checkpointer)
# First invocation
config = { "configurable" : { "thread_id" : "conversation-1" }}
result1 = graph.invoke({ "messages" : [ "Hi" ]}, config)
# Second invocation - continues from previous state
result2 = graph.invoke({ "messages" : [ "How are you?" ]}, config)
See Checkpointing for more details.
Best Practices
Use TypedDict for simple state, Pydantic for validation
Keep state flat when possible
Use meaningful, descriptive key names
Document reducer behavior clearly
Always use reducers for list/dict accumulation
Test reducer logic independently
Be cautious with non-deterministic reducers
Consider order-independence
Next Steps
Nodes & Edges Learn how nodes consume and update state
Checkpointing Persist state across invocations