This page documents configuration types and utilities used to access runtime information and resources within LangGraph nodes and tasks.
get_config
Get the current runnable configuration from within a graph node or task.This function retrieves the RunnableConfig for the currently executing node or task. The config contains metadata, callbacks, tags, and other runtime information.Important: Must be called from within a runnable context (inside a node or task). Raises RuntimeError if called outside of a runnable context.Python version requirement: Python 3.11 or later is required to use this in an async context.Defined in: langgraph/config.py:17
Returns
The configuration for the current runnable context.
Raises
Raised when called outside of a runnable context, or when using Python < 3.11 in an async context.
Usage Example
from langgraph.config import get_config
from langgraph.graph import StateGraph, START
from typing_extensions import TypedDict
class State(TypedDict):
value: int
def my_node(state: State):
# Access the current config
config = get_config()
# You can access various properties
print(f"Thread ID: {config['configurable'].get('thread_id')}")
print(f"Tags: {config.get('tags', [])}")
return {"value": state["value"] + 1}
builder = StateGraph(State)
builder.add_node("my_node", my_node)
builder.add_edge(START, "my_node")
graph = builder.compile()
# Pass config with metadata
config = {
"configurable": {"thread_id": "123"},
"tags": ["example"]
}
graph.invoke({"value": 0}, config)
get_store
Access LangGraph store from inside a graph node or entrypoint task at runtime.Can be called from inside any StateGraph node or functional API task, as long as the StateGraph or the entrypoint was initialized with a store.Python version requirement: Python 3.11 or later is required to use this in an async context (uses contextvar propagation).Defined in: langgraph/config.py:32
Returns
The store instance configured for the current graph.
Raises
Raised when called outside of a runnable context.
Usage with StateGraph
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.store.memory import InMemoryStore
from langgraph.config import get_store
store = InMemoryStore()
store.put(("values",), "foo", {"bar": 2})
class State(TypedDict):
foo: int
def my_node(state: State):
my_store = get_store()
stored_value = my_store.get(("values",), "foo").value["bar"]
return {"foo": stored_value + 1}
graph = (
StateGraph(State)
.add_node(my_node)
.add_edge(START, "my_node")
.compile(store=store)
)
result = graph.invoke({"foo": 1})
print(result) # {"foo": 3}
Usage with Functional API
from langgraph.func import entrypoint, task
from langgraph.store.memory import InMemoryStore
from langgraph.config import get_store
store = InMemoryStore()
store.put(("values",), "foo", {"bar": 2})
@task
def my_task(value: int):
my_store = get_store()
stored_value = my_store.get(("values",), "foo").value["bar"]
return stored_value + 1
@entrypoint(store=store)
def workflow(value: int):
return my_task(value).result()
result = workflow.invoke(1)
print(result) # 3
get_stream_writer
Access LangGraph StreamWriter from inside a graph node or entrypoint task at runtime.Can be called from inside any StateGraph node or functional API task. The StreamWriter allows you to emit custom data during graph execution when using stream_mode="custom".Python version requirement: Python 3.11 or later is required to use this in an async context (uses contextvar propagation).Defined in: langgraph/config.py:126
Returns
A callable that accepts a single argument and writes it to the output stream. This is a no-op when not using stream_mode="custom".
Raises
Raised when called outside of a runnable context.
Usage with StateGraph
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.config import get_stream_writer
class State(TypedDict):
foo: int
def my_node(state: State):
my_stream_writer = get_stream_writer()
# Emit custom data to the stream
my_stream_writer({"custom_data": "Hello!"})
my_stream_writer({"progress": 50})
return {"foo": state["foo"] + 1}
graph = (
StateGraph(State)
.add_node(my_node)
.add_edge(START, "my_node")
.compile()
)
# Stream with custom mode to receive the custom data
for chunk in graph.stream({"foo": 1}, stream_mode="custom"):
print(chunk)
# {"custom_data": "Hello!"}
# {"progress": 50}
Usage with Functional API
from langgraph.func import entrypoint, task
from langgraph.config import get_stream_writer
@task
def my_task(value: int):
my_stream_writer = get_stream_writer()
# Emit custom progress updates
my_stream_writer({"status": "processing"})
result = value + 1
my_stream_writer({"status": "complete"})
return result
@entrypoint()
def workflow(value: int):
return my_task(value).result()
for chunk in workflow.stream(1, stream_mode="custom"):
print(chunk)
# {"status": "processing"}
# {"status": "complete"}
RunnableConfig
Configuration for a runnable execution. This type is imported from langchain_core.runnables.The RunnableConfig contains various runtime settings and metadata that control how a runnable (node, task, or graph) executes.
Common Fields
Configurable parameters that can be set at runtime. Common keys include:
thread_id: Identifier for the execution thread (required for checkpointing)
checkpoint_id: Specific checkpoint to resume from
checkpoint_ns: Namespace for checkpoints
Tags to attach to this run for filtering and organization.
Arbitrary metadata to attach to this run.
callbacks
list[BaseCallbackHandler]
Callback handlers to invoke during execution.
Maximum number of steps the graph can execute before raising a GraphRecursionError. Defaults to 25.
Maximum number of concurrent operations.
Usage Example
from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import InMemorySaver
class State(TypedDict):
messages: list[str]
builder = StateGraph(State)
# ... add nodes and edges ...
graph = builder.compile(checkpointer=InMemorySaver())
# Create a config with various settings
config = {
"configurable": {
"thread_id": "user-123",
},
"tags": ["production", "user-session"],
"metadata": {
"user_id": "123",
"session_type": "chat"
},
"recursion_limit": 100,
}
result = graph.invoke({"messages": []}, config)
Thread Management
The configurable.thread_id field in RunnableConfig is particularly important for stateful applications:
Thread ID
import uuid
from langgraph.checkpoint.memory import InMemorySaver
# Initialize graph with checkpointer
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# Each thread maintains independent state
thread_1_config = {"configurable": {"thread_id": "thread-1"}}
thread_2_config = {"configurable": {"thread_id": "thread-2"}}
# These run independently
graph.invoke({"messages": ["Hello"]}, thread_1_config)
graph.invoke({"messages": ["Hi"]}, thread_2_config)
# Continue the first thread
graph.invoke({"messages": ["How are you?"]}, thread_1_config)
Checkpoint Navigation
# Get the current state
state = graph.get_state(thread_1_config)
# Access parent checkpoint
if state.parent_config:
parent_state = graph.get_state(state.parent_config)
print(f"Previous state: {parent_state.values}")
# Resume from a specific checkpoint
checkpoint_config = {
"configurable": {
"thread_id": "thread-1",
"checkpoint_id": "1234-5678-90ab-cdef"
}
}
result = graph.invoke(None, checkpoint_config)
Advanced Configuration
Recursion Limit
Control how many steps a graph can execute:
config = {
"recursion_limit": 1000 # Allow up to 1000 steps
}
try:
result = graph.invoke(initial_state, config)
except GraphRecursionError:
print("Graph exceeded maximum steps")
Callbacks
from langchain_core.callbacks import BaseCallbackHandler
class MyCallback(BaseCallbackHandler):
def on_chain_start(self, serialized, inputs, **kwargs):
print(f"Starting: {serialized.get('name')}")
def on_chain_end(self, outputs, **kwargs):
print(f"Finished with: {outputs}")
config = {
"callbacks": [MyCallback()]
}
graph.invoke(initial_state, config)
Combining Multiple Settings
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.callbacks import StdOutCallbackHandler
graph = builder.compile(checkpointer=InMemorySaver())
config = {
"configurable": {
"thread_id": "session-abc",
},
"tags": ["production", "high-priority"],
"metadata": {
"user_id": "user-456",
"request_id": "req-789",
},
"callbacks": [StdOutCallbackHandler()],
"recursion_limit": 200,
}
result = graph.invoke(initial_state, config)
Best Practices
1. Always Use Thread IDs for Stateful Apps
# Good: Unique thread ID per conversation
import uuid
config = {
"configurable": {
"thread_id": str(uuid.uuid4())
}
}
2. Access Config Inside Nodes
def my_node(state: State):
config = get_config()
thread_id = config["configurable"]["thread_id"]
# Use thread_id for user-specific logic
user_data = load_user_data(thread_id)
return {"data": user_data}
config = {
"metadata": {
"user_id": user_id,
"session_start": datetime.now().isoformat(),
"version": "1.0"
}
}
4. Set Appropriate Recursion Limits
# For simple workflows
config = {"recursion_limit": 50}
# For complex, potentially long-running workflows
config = {"recursion_limit": 500}