Skip to main content
This page documents configuration types and utilities used to access runtime information and resources within LangGraph nodes and tasks.

get_config

get_config()
function
Get the current runnable configuration from within a graph node or task.This function retrieves the RunnableConfig for the currently executing node or task. The config contains metadata, callbacks, tags, and other runtime information.Important: Must be called from within a runnable context (inside a node or task). Raises RuntimeError if called outside of a runnable context.Python version requirement: Python 3.11 or later is required to use this in an async context.Defined in: langgraph/config.py:17

Returns

return
RunnableConfig
The configuration for the current runnable context.

Raises

RuntimeError
exception
Raised when called outside of a runnable context, or when using Python < 3.11 in an async context.

Usage Example

from langgraph.config import get_config
from langgraph.graph import StateGraph, START
from typing_extensions import TypedDict

class State(TypedDict):
    value: int

def my_node(state: State):
    # Access the current config
    config = get_config()
    
    # You can access various properties
    print(f"Thread ID: {config['configurable'].get('thread_id')}")
    print(f"Tags: {config.get('tags', [])}")
    
    return {"value": state["value"] + 1}

builder = StateGraph(State)
builder.add_node("my_node", my_node)
builder.add_edge(START, "my_node")
graph = builder.compile()

# Pass config with metadata
config = {
    "configurable": {"thread_id": "123"},
    "tags": ["example"]
}
graph.invoke({"value": 0}, config)

get_store

get_store()
function
Access LangGraph store from inside a graph node or entrypoint task at runtime.Can be called from inside any StateGraph node or functional API task, as long as the StateGraph or the entrypoint was initialized with a store.Python version requirement: Python 3.11 or later is required to use this in an async context (uses contextvar propagation).Defined in: langgraph/config.py:32

Returns

return
BaseStore
The store instance configured for the current graph.

Raises

RuntimeError
exception
Raised when called outside of a runnable context.

Usage with StateGraph

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.store.memory import InMemoryStore
from langgraph.config import get_store

store = InMemoryStore()
store.put(("values",), "foo", {"bar": 2})


class State(TypedDict):
    foo: int


def my_node(state: State):
    my_store = get_store()
    stored_value = my_store.get(("values",), "foo").value["bar"]
    return {"foo": stored_value + 1}


graph = (
    StateGraph(State)
    .add_node(my_node)
    .add_edge(START, "my_node")
    .compile(store=store)
)

result = graph.invoke({"foo": 1})
print(result)  # {"foo": 3}

Usage with Functional API

from langgraph.func import entrypoint, task
from langgraph.store.memory import InMemoryStore
from langgraph.config import get_store

store = InMemoryStore()
store.put(("values",), "foo", {"bar": 2})


@task
def my_task(value: int):
    my_store = get_store()
    stored_value = my_store.get(("values",), "foo").value["bar"]
    return stored_value + 1


@entrypoint(store=store)
def workflow(value: int):
    return my_task(value).result()


result = workflow.invoke(1)
print(result)  # 3

get_stream_writer

get_stream_writer()
function
Access LangGraph StreamWriter from inside a graph node or entrypoint task at runtime.Can be called from inside any StateGraph node or functional API task. The StreamWriter allows you to emit custom data during graph execution when using stream_mode="custom".Python version requirement: Python 3.11 or later is required to use this in an async context (uses contextvar propagation).Defined in: langgraph/config.py:126

Returns

return
StreamWriter
A callable that accepts a single argument and writes it to the output stream. This is a no-op when not using stream_mode="custom".

Raises

RuntimeError
exception
Raised when called outside of a runnable context.

Usage with StateGraph

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START
from langgraph.config import get_stream_writer


class State(TypedDict):
    foo: int


def my_node(state: State):
    my_stream_writer = get_stream_writer()
    
    # Emit custom data to the stream
    my_stream_writer({"custom_data": "Hello!"})
    my_stream_writer({"progress": 50})
    
    return {"foo": state["foo"] + 1}


graph = (
    StateGraph(State)
    .add_node(my_node)
    .add_edge(START, "my_node")
    .compile()
)

# Stream with custom mode to receive the custom data
for chunk in graph.stream({"foo": 1}, stream_mode="custom"):
    print(chunk)
# {"custom_data": "Hello!"}
# {"progress": 50}

Usage with Functional API

from langgraph.func import entrypoint, task
from langgraph.config import get_stream_writer


@task
def my_task(value: int):
    my_stream_writer = get_stream_writer()
    
    # Emit custom progress updates
    my_stream_writer({"status": "processing"})
    result = value + 1
    my_stream_writer({"status": "complete"})
    
    return result


@entrypoint()
def workflow(value: int):
    return my_task(value).result()


for chunk in workflow.stream(1, stream_mode="custom"):
    print(chunk)
# {"status": "processing"}
# {"status": "complete"}

RunnableConfig

RunnableConfig
TypedDict
Configuration for a runnable execution. This type is imported from langchain_core.runnables.The RunnableConfig contains various runtime settings and metadata that control how a runnable (node, task, or graph) executes.

Common Fields

configurable
dict[str, Any]
Configurable parameters that can be set at runtime. Common keys include:
  • thread_id: Identifier for the execution thread (required for checkpointing)
  • checkpoint_id: Specific checkpoint to resume from
  • checkpoint_ns: Namespace for checkpoints
tags
list[str]
Tags to attach to this run for filtering and organization.
metadata
dict[str, Any]
Arbitrary metadata to attach to this run.
callbacks
list[BaseCallbackHandler]
Callback handlers to invoke during execution.
recursion_limit
int
Maximum number of steps the graph can execute before raising a GraphRecursionError. Defaults to 25.
max_concurrency
int
Maximum number of concurrent operations.

Usage Example

from langgraph.graph import StateGraph
from langgraph.checkpoint.memory import InMemorySaver

class State(TypedDict):
    messages: list[str]

builder = StateGraph(State)
# ... add nodes and edges ...
graph = builder.compile(checkpointer=InMemorySaver())

# Create a config with various settings
config = {
    "configurable": {
        "thread_id": "user-123",
    },
    "tags": ["production", "user-session"],
    "metadata": {
        "user_id": "123",
        "session_type": "chat"
    },
    "recursion_limit": 100,
}

result = graph.invoke({"messages": []}, config)

Thread Management

The configurable.thread_id field in RunnableConfig is particularly important for stateful applications:

Thread ID

import uuid
from langgraph.checkpoint.memory import InMemorySaver

# Initialize graph with checkpointer
checkpointer = InMemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# Each thread maintains independent state
thread_1_config = {"configurable": {"thread_id": "thread-1"}}
thread_2_config = {"configurable": {"thread_id": "thread-2"}}

# These run independently
graph.invoke({"messages": ["Hello"]}, thread_1_config)
graph.invoke({"messages": ["Hi"]}, thread_2_config)

# Continue the first thread
graph.invoke({"messages": ["How are you?"]}, thread_1_config)

Checkpoint Navigation

# Get the current state
state = graph.get_state(thread_1_config)

# Access parent checkpoint
if state.parent_config:
    parent_state = graph.get_state(state.parent_config)
    print(f"Previous state: {parent_state.values}")

# Resume from a specific checkpoint
checkpoint_config = {
    "configurable": {
        "thread_id": "thread-1",
        "checkpoint_id": "1234-5678-90ab-cdef"
    }
}
result = graph.invoke(None, checkpoint_config)

Advanced Configuration

Recursion Limit

Control how many steps a graph can execute:
config = {
    "recursion_limit": 1000  # Allow up to 1000 steps
}

try:
    result = graph.invoke(initial_state, config)
except GraphRecursionError:
    print("Graph exceeded maximum steps")

Callbacks

from langchain_core.callbacks import BaseCallbackHandler

class MyCallback(BaseCallbackHandler):
    def on_chain_start(self, serialized, inputs, **kwargs):
        print(f"Starting: {serialized.get('name')}")
    
    def on_chain_end(self, outputs, **kwargs):
        print(f"Finished with: {outputs}")

config = {
    "callbacks": [MyCallback()]
}

graph.invoke(initial_state, config)

Combining Multiple Settings

from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.callbacks import StdOutCallbackHandler

graph = builder.compile(checkpointer=InMemorySaver())

config = {
    "configurable": {
        "thread_id": "session-abc",
    },
    "tags": ["production", "high-priority"],
    "metadata": {
        "user_id": "user-456",
        "request_id": "req-789",
    },
    "callbacks": [StdOutCallbackHandler()],
    "recursion_limit": 200,
}

result = graph.invoke(initial_state, config)

Best Practices

1. Always Use Thread IDs for Stateful Apps

# Good: Unique thread ID per conversation
import uuid

config = {
    "configurable": {
        "thread_id": str(uuid.uuid4())
    }
}

2. Access Config Inside Nodes

def my_node(state: State):
    config = get_config()
    thread_id = config["configurable"]["thread_id"]
    
    # Use thread_id for user-specific logic
    user_data = load_user_data(thread_id)
    return {"data": user_data}

3. Use Metadata for Observability

config = {
    "metadata": {
        "user_id": user_id,
        "session_start": datetime.now().isoformat(),
        "version": "1.0"
    }
}

4. Set Appropriate Recursion Limits

# For simple workflows
config = {"recursion_limit": 50}

# For complex, potentially long-running workflows
config = {"recursion_limit": 500}

Build docs developers (and LLMs) love