Overview
Nodes are the computational units in LangGraph, while edges define the execution order. Together they form a directed graph that can contain cycles for iterative workflows.
Nodes
A node is any callable that accepts state and returns a partial state update.
Function Nodes
The simplest node is a Python function:
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START , END
class State ( TypedDict ):
count: int
message: str
def increment ( state : State) -> dict :
"""A basic node that increments count."""
return { "count" : state[ "count" ] + 1 }
def format_message ( state : State) -> dict :
"""Format a message based on count."""
return { "message" : f "Count is { state[ 'count' ] } " }
builder = StateGraph(State)
builder.add_node( "increment" , increment)
builder.add_node( "format" , format_message)
Runnable Nodes
Any LangChain Runnable can be a node:
from langchain_core.runnables import RunnableLambda
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
# Runnable chain as a node
chain = (
ChatPromptTemplate.from_messages([
( "system" , "You are a helpful assistant." ),
( "human" , " {input} " ),
])
| ChatOpenAI()
)
builder.add_node( "llm" , chain)
Class-Based Nodes
Callable classes work as nodes:
class ProcessorNode :
def __init__ ( self , config : dict ):
self .config = config
def __call__ ( self , state : State) -> dict :
# Use self.config in processing
return { "count" : state[ "count" ] * 2 }
processor = ProcessorNode( config = { "multiplier" : 2 })
builder.add_node( "processor" , processor)
Node Signatures
Nodes can access runtime context:
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime
class Context ( TypedDict ):
user_id: str
# With RunnableConfig
def node_with_config ( state : State, config : RunnableConfig) -> dict :
thread_id = config[ "configurable" ][ "thread_id" ]
return { "message" : f "Thread: { thread_id } " }
# With Runtime context
def node_with_context ( state : State, runtime : Runtime[Context]) -> dict :
user_id = runtime.context[ "user_id" ]
return { "message" : f "User: { user_id } " }
builder.add_node( "node1" , node_with_config)
builder.add_node( "node2" , node_with_context)
Async Nodes
Nodes can be async for concurrent I/O:
import asyncio
async def fetch_data ( state : State) -> dict :
# Simulate API call
await asyncio.sleep( 1 )
data = await fetch_from_api()
return { "data" : data}
builder.add_node( "fetch" , fetch_data)
# Use ainvoke or astream
result = await graph.ainvoke({ "count" : 0 })
Attach metadata for observability:
builder.add_node(
"expensive_operation" ,
my_node,
metadata = {
"cost" : "high" ,
"timeout" : 30 ,
"version" : "2.0"
}
)
Edges
Edges define the execution order between nodes.
Unconditional Edges
Fixed transitions from one node to another:
builder = StateGraph(State)
builder.add_node( "step1" , node1)
builder.add_node( "step2" , node2)
builder.add_node( "step3" , node3)
# Linear flow
builder.add_edge( START , "step1" )
builder.add_edge( "step1" , "step2" )
builder.add_edge( "step2" , "step3" )
builder.add_edge( "step3" , END )
# Or use helper methods
builder.set_entry_point( "step1" ) # START -> step1
builder.set_finish_point( "step3" ) # step3 -> END
Parallel Edges
Multiple nodes execute in parallel:
builder.add_node( "fetch_user" , fetch_user)
builder.add_node( "fetch_orders" , fetch_orders)
builder.add_node( "combine" , combine_results)
# Both fetch nodes run in parallel
builder.add_edge( START , "fetch_user" )
builder.add_edge( START , "fetch_orders" )
# Wait for both before combining
builder.add_edge([ "fetch_user" , "fetch_orders" ], "combine" )
builder.add_edge( "combine" , END )
Conditional Edges
Dynamic routing based on state:
def route_after_llm ( state : State) -> str :
"""Route based on LLM response."""
if "tool_call" in state[ "last_message" ]:
return "tools"
return "end"
builder.add_node( "llm" , llm_node)
builder.add_node( "tools" , tool_node)
builder.add_conditional_edges(
"llm" ,
route_after_llm,
{
"tools" : "tools" ,
"end" : END
}
)
# Tool output goes back to LLM (creating a loop)
builder.add_edge( "tools" , "llm" )
Multi-Destination Routing
Return multiple destinations to run in parallel:
from typing import Sequence
def fan_out ( state : State) -> Sequence[ str ]:
"""Route to multiple nodes based on conditions."""
destinations = []
if state[ "needs_validation" ]:
destinations.append( "validate" )
if state[ "needs_logging" ]:
destinations.append( "log" )
return destinations
builder.add_conditional_edges(
"prepare" ,
fan_out,
# No path_map needed - function returns node names directly
)
Type-Safe Routing with Literals
from typing import Literal
def strict_router ( state : State) -> Literal[ "success" , "failure" ]:
"""Type-checked routing."""
if state[ "count" ] > 0 :
return "success"
return "failure"
builder.add_conditional_edges(
"process" ,
strict_router,
{
"success" : "handle_success" ,
"failure" : "handle_failure"
}
)
The Send API
Dynamically invoke nodes with custom input (map-reduce pattern):
from langgraph.types import Send
from typing import Annotated
import operator
class OverallState ( TypedDict ):
subjects: list[ str ]
results: Annotated[list[ str ], operator.add]
class SubjectState ( TypedDict ):
subject: str
def continue_to_process ( state : OverallState) -> list[Send]:
"""Fan out to process each subject independently."""
return [
Send( "process_subject" , { "subject" : s})
for s in state[ "subjects" ]
]
def process_subject ( state : SubjectState) -> dict :
"""Process a single subject."""
result = f "Processed { state[ 'subject' ] } "
return { "results" : [result]}
builder = StateGraph(OverallState)
builder.add_node( "process_subject" , process_subject)
builder.add_conditional_edges( START , continue_to_process)
builder.add_edge( "process_subject" , END )
graph = builder.compile()
result = graph.invoke({ "subjects" : [ "math" , "science" , "history" ]})
# {'subjects': [...], 'results': ['Processed math', 'Processed science', 'Processed history']}
Send allows nodes to execute with different input than the main graph state, perfect for map-reduce workflows.
Command API
Return Command objects from nodes for advanced control flow:
from langgraph.types import Command
def smart_node ( state : State) -> Command:
"""Node that controls its own routing."""
# Update state and navigate
return Command(
update = { "count" : state[ "count" ] + 1 },
goto = "next_step" if state[ "count" ] < 5 else END
)
def multi_send_node ( state : State) -> Command:
"""Send to multiple nodes with custom input."""
return Command(
update = { "status" : "processing" },
goto = [
Send( "worker" , { "task_id" : 1 }),
Send( "worker" , { "task_id" : 2 }),
]
)
builder.add_node( "smart" , smart_node)
builder.add_edge( START , "smart" )
# No need to define edges from 'smart' - Command handles routing
Command Fields
Field Type Description updatedict | AnyState updates to apply gotostr | list[str] | Send | list[Send]Next node(s) to execute graphNone | "__parent__"Target graph for command resumedict | AnyResume value for interrupts
Sequences
Quickly add a chain of nodes:
def step1 ( state : State) -> dict :
return { "count" : 1 }
def step2 ( state : State) -> dict :
return { "count" : state[ "count" ] + 1 }
def step3 ( state : State) -> dict :
return { "count" : state[ "count" ] * 2 }
builder = StateGraph(State)
# Automatically creates edges: START -> step1 -> step2 -> step3 -> END
builder.add_sequence([step1, step2, step3])
builder.set_entry_point(step1. __name__ )
builder.set_finish_point(step3. __name__ )
Cycles and Loops
LangGraph supports cycles for iterative workflows:
def check_completion ( state : State) -> str :
if state[ "count" ] >= 10 :
return "done"
return "continue"
builder = StateGraph(State)
builder.add_node( "work" , do_work)
builder.add_edge( START , "work" )
# Create a cycle
builder.add_conditional_edges(
"work" ,
check_completion,
{
"continue" : "work" , # Loop back
"done" : END
}
)
Cycles can run indefinitely. Set a recursion_limit to prevent infinite loops: graph = builder.compile()
result = graph.invoke(
{ "count" : 0 },
config = { "recursion_limit" : 25 }
)
Retry Policies
Automatically retry failed nodes:
from langgraph.types import RetryPolicy
retry_policy = RetryPolicy(
initial_interval = 0.5 , # Start with 0.5s
backoff_factor = 2.0 , # Double each retry
max_interval = 10.0 , # Cap at 10s
max_attempts = 3 , # Try 3 times
jitter = True , # Add randomness
retry_on = Exception # Retry on any exception
)
builder.add_node(
"api_call" ,
make_api_request,
retry_policy = retry_policy
)
Multiple Retry Policies
from langgraph.types import RetryPolicy
# Different policies for different errors
retry_policies = [
RetryPolicy(
max_attempts = 5 ,
retry_on = RateLimitError,
initial_interval = 1.0
),
RetryPolicy(
max_attempts = 2 ,
retry_on = TimeoutError ,
initial_interval = 0.1
),
]
builder.add_node(
"flaky_node" ,
my_node,
retry_policy = retry_policies # First matching policy applies
)
Caching
Cache expensive node results:
from langgraph.types import CachePolicy
cache_policy = CachePolicy(
ttl = 3600 , # 1 hour
)
builder.add_node(
"expensive_llm_call" ,
llm_node,
cache_policy = cache_policy
)
# Must provide cache at compile time
from langgraph.cache.memory import InMemoryCache
graph = builder.compile( cache = InMemoryCache())
Cached nodes:
Skip execution on cache hit
Use input state as cache key (customizable)
Respect TTL for expiration
Store results across invocations
Deferred Nodes
Delay node execution until graph end:
builder.add_node(
"cleanup" ,
cleanup_resources,
defer = True # Runs after all other nodes complete
)
Use cases:
Cleanup operations
Final logging/metrics
Post-processing steps
Best Practices
Keep nodes pure and focused on single tasks
Use type hints for state and return values
Handle errors gracefully within nodes
Return partial state updates, not full state
Make nodes testable in isolation
Prefer explicit edges for simple flows
Use conditional edges for dynamic routing
Use Command for complex multi-step routing
Document routing logic clearly
Test all routing paths
Next Steps
Checkpointing Persist state between node executions
Streaming Stream node outputs in real-time
Human-in-the-Loop Add human review points with interrupts