Skip to main content
The Pregel class manages the runtime behavior for LangGraph applications. It combines actors (nodes) and channels into a single application following the Pregel algorithm (Bulk Synchronous Parallel model). Defined in: langgraph/pregel/main.py:325

Overview

Prege l is the low-level execution engine that powers LangGraph. Most users will interact with Pregel indirectly through StateGraph or MessageGraph, which compile down to Pregel under the hood. For advanced use cases only. If you’re not sure whether you need to use Pregel directly, you probably don’t—use the Graph API instead.

Execution Model

Prege l organizes execution into multiple steps, each consisting of three phases:
  1. Plan: Determine which actors to execute in this step
  2. Execution: Execute all selected actors in parallel until completion, failure, or timeout
  3. Update: Update the channels with values written by the actors
This process repeats until no actors are selected for execution or a maximum number of steps is reached.

Constructor

Prege l(
    *,
    nodes: dict[str, PregelNode | NodeBuilder],
    channels: dict[str, BaseChannel | ManagedValueSpec] | None,
    input_channels: str | Sequence[str],
    output_channels: str | Sequence[str],
    stream_mode: StreamMode = "values",
    stream_channels: str | Sequence[str] | None = None,
    stream_eager: bool = False,
    interrupt_after_nodes: All | Sequence[str] = (),
    interrupt_before_nodes: All | Sequence[str] = (),
    step_timeout: float | None = None,
    debug: bool | None = None,
    checkpointer: Checkpointer = None,
    store: BaseStore | None = None,
    cache: BaseCache | None = None,
    retry_policy: RetryPolicy | Sequence[RetryPolicy] = (),
    cache_policy: CachePolicy | None = None,
    context_schema: type[ContextT] | None = None,
    name: str = "LangGraph",
    auto_validate: bool = True,
)

Parameters

nodes
dict[str, PregelNode | NodeBuilder]
required
Dictionary mapping node names to PregelNode or NodeBuilder instances.
channels
dict[str, BaseChannel | ManagedValueSpec] | None
required
Dictionary mapping channel names to channel instances. Channels are used for communication between nodes.
input_channels
str | Sequence[str]
required
Channel name(s) to use as input to the graph.
output_channels
str | Sequence[str]
required
Channel name(s) to use as output from the graph.
stream_mode
StreamMode
default:"'values'"
Mode to stream output. Options: 'values', 'updates', 'checkpoints', 'tasks', 'debug', 'messages', 'custom'.
stream_channels
str | Sequence[str] | None
default:"None"
Channels to stream. Defaults to all channels not in reserved channels.
stream_eager
bool
default:"False"
Whether to force emitting stream events eagerly. Automatically enabled for 'messages' and 'custom' stream modes.
interrupt_after_nodes
All | Sequence[str]
default:"()"
Node names to interrupt after. Use '*' to interrupt after all nodes.
interrupt_before_nodes
All | Sequence[str]
default:"()"
Node names to interrupt before. Use '*' to interrupt before all nodes.
step_timeout
float | None
default:"None"
Maximum time to wait for a step to complete, in seconds.
debug
bool | None
default:"None"
Whether to print debug information during execution.
checkpointer
Checkpointer
default:"None"
Checkpointer used to save and load graph state.
store
BaseStore | None
default:"None"
Memory store to use for SharedValues.
cache
BaseCache | None
default:"None"
Cache to use for storing node results.
retry_policy
RetryPolicy | Sequence[RetryPolicy]
default:"()"
Retry policies to use when running tasks. Empty set disables retries.
cache_policy
CachePolicy | None
default:"None"
Cache policy to use for all nodes. Can be overridden by individual nodes.
context_schema
type[ContextT] | None
default:"None"
Schema for the context object that will be passed to the workflow.
name
str
default:"'LangGraph'"
Name of the graph.
auto_validate
bool
default:"True"
Whether to automatically validate the graph structure on initialization.

Core Methods

invoke

invoke(
    input: InputT,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode | list[StreamMode] | None = None,
    output_keys: Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    debug: bool | None = None,
    **kwargs: Any,
) -> OutputT
Synchronously invoke the graph and return the final output.

Parameters

input
InputT
required
The input to the graph.
config
RunnableConfig | None
default:"None"
Configuration for the run, including thread_id for checkpointing.
stream_mode
StreamMode | list[StreamMode] | None
default:"None"
Override the graph’s default stream mode for this invocation.
output_keys
Sequence[str] | None
default:"None"
Specific output keys to return.
interrupt_before
All | Sequence[str] | None
default:"None"
Override interrupt_before_nodes for this invocation.
interrupt_after
All | Sequence[str] | None
default:"None"
Override interrupt_after_nodes for this invocation.
debug
bool | None
default:"None"
Override the debug flag for this invocation.

Returns

return
OutputT
The final output from the graph.

Usage Example

from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
    .build()
)

app = Pregel(
    nodes={"node1": node1},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b"],
)

result = app.invoke({"a": "foo"})
print(result)  # {'b': 'foofoo'}

stream

stream(
    input: InputT,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode | list[StreamMode] | None = None,
    output_keys: Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    debug: bool | None = None,
    subgraphs: bool = False,
    **kwargs: Any,
) -> Iterator[Any]
Synchronously stream graph execution, yielding outputs as they become available.

Parameters

input
InputT
required
The input to the graph.
config
RunnableConfig | None
default:"None"
Configuration for the run.
stream_mode
StreamMode | list[StreamMode] | None
default:"None"
Override the graph’s default stream mode. Can specify multiple modes as a list.
subgraphs
bool
default:"False"
Whether to stream subgraph execution as well.

Returns

return
Iterator[Any]
Iterator yielding outputs based on the stream mode(s).

Usage Example

from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b")
    .build()
)

node2 = (
    NodeBuilder().subscribe_to("b")
    .do(lambda x: x["b"] + x["b"])
    .write_to("c")
    .build()
)

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": LastValue(str),
        "c": EphemeralValue(str),
    },
    input_channels=["a"],
    output_channels=["b", "c"],
    stream_mode="updates",
)

for chunk in app.stream({"a": "foo"}):
    print(chunk)
# {'node1': {'b': 'foofoo'}}
# {'node2': {'c': 'foofoofoofoo'}}

ainvoke

async ainvoke(
    input: InputT,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode | list[StreamMode] | None = None,
    output_keys: Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    debug: bool | None = None,
    **kwargs: Any,
) -> OutputT
Asynchronously invoke the graph and return the final output.

Parameters

Same as invoke().

Returns

return
OutputT
The final output from the graph.

astream

async astream(
    input: InputT,
    config: RunnableConfig | None = None,
    *,
    stream_mode: StreamMode | list[StreamMode] | None = None,
    output_keys: Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    debug: bool | None = None,
    subgraphs: bool = False,
    **kwargs: Any,
) -> AsyncIterator[Any]
Asynchronously stream graph execution.

Parameters

Same as stream().

Returns

return
AsyncIterator[Any]
Async iterator yielding outputs based on the stream mode(s).

get_graph

get_graph(
    config: RunnableConfig | None = None,
    *,
    xray: int | bool = False
) -> Graph
Return a drawable representation of the computation graph.

Parameters

config
RunnableConfig | None
default:"None"
Configuration for the graph.
xray
int | bool
default:"False"
Whether to include subgraph details. If an integer, specifies the depth level.

Returns

return
Graph
A Graph object that can be visualized.

Usage Example

graph = app.get_graph()
png_data = graph.draw_mermaid_png()

# In Jupyter notebooks, the graph is displayed automatically
app.get_graph()

NodeBuilder

NodeBuilder provides a fluent API for building Pregel nodes. Defined in: langgraph/pregel/main.py:161

Methods

subscribe_to

subscribe_to(
    *channels: str,
    read: bool = True,
) -> Self
Add channels to subscribe to. Node will be invoked when any of these channels are updated.

subscribe_only

subscribe_only(channel: str) -> Self
Subscribe to a single channel only.

read_from

read_from(*channels: str) -> Self
Adds channels to read from without subscribing to them.

do

do(node: RunnableLike) -> Self
Adds the specified node/runnable to execute.

write_to

write_to(
    *channels: str | ChannelWriteEntry,
    **kwargs: _WriteValue,
) -> Self
Add channel writes.

meta

meta(*tags: str, **metadata: Any) -> Self
Add tags or metadata to the node.

add_retry_policies

add_retry_policies(*policies: RetryPolicy) -> Self
Adds retry policies to the node.

add_cache_policy

add_cache_policy(policy: CachePolicy) -> Self
Adds cache policy to the node.

build

build() -> PregelNode
Builds and returns the PregelNode.

Usage Example

from langgraph.pregel import NodeBuilder
from langgraph.pregel._write import ChannelWriteEntry

node = (
    NodeBuilder()
    .subscribe_only("input_channel")
    .do(lambda x: x.upper())
    .write_to("output_channel")
    .meta("processing", version="1.0")
    .build()
)

Channels

Channels are used to communicate between actors (nodes). LangGraph provides several built-in channel types:

Basic Channels

  • LastValue: Stores the last value sent to the channel
  • Topic: A configurable PubSub Topic for sending multiple values

Advanced Channels

  • Context: Exposes the value of a context manager
  • BinaryOperatorAggregate: Stores a persistent value updated by applying a binary operator

Usage Example

from langgraph.channels import LastValue, Topic, BinaryOperatorAggregate
import operator

channels = {
    "messages": LastValue(str),
    "events": Topic(dict, accumulate=True),
    "counter": BinaryOperatorAggregate(int, operator=operator.add),
}

Complete Example

from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder

node1 = (
    NodeBuilder().subscribe_only("a")
    .do(lambda x: x + x)
    .write_to("b", "c")
    .build()
)

node2 = (
    NodeBuilder().subscribe_only("b")
    .do(lambda x: x + x)
    .write_to("c")
    .build()
)

def reducer(current, update):
    if current:
        return current + " | " + update
    return update

app = Pregel(
    nodes={"node1": node1, "node2": node2},
    channels={
        "a": EphemeralValue(str),
        "b": EphemeralValue(str),
        "c": BinaryOperatorAggregate(str, operator=reducer),
    },
    input_channels=["a"],
    output_channels=["c"],
    stream_mode="values",
)

result = app.invoke({"a": "foo"})
print(result)  # {'c': 'foofoo | foofoofoofoo'}

See Also

Build docs developers (and LLMs) love