The Pregel class manages the runtime behavior for LangGraph applications. It combines actors (nodes) and channels into a single application following the Pregel algorithm (Bulk Synchronous Parallel model).
Defined in: langgraph/pregel/main.py:325
Overview
Prege l is the low-level execution engine that powers LangGraph. Most users will interact with Pregel indirectly through StateGraph or MessageGraph, which compile down to Pregel under the hood.
For advanced use cases only. If you’re not sure whether you need to use Pregel directly, you probably don’t—use the Graph API instead.
Execution Model
Prege l organizes execution into multiple steps, each consisting of three phases:
- Plan: Determine which actors to execute in this step
- Execution: Execute all selected actors in parallel until completion, failure, or timeout
- Update: Update the channels with values written by the actors
This process repeats until no actors are selected for execution or a maximum number of steps is reached.
Constructor
Prege l(
*,
nodes: dict[str, PregelNode | NodeBuilder],
channels: dict[str, BaseChannel | ManagedValueSpec] | None,
input_channels: str | Sequence[str],
output_channels: str | Sequence[str],
stream_mode: StreamMode = "values",
stream_channels: str | Sequence[str] | None = None,
stream_eager: bool = False,
interrupt_after_nodes: All | Sequence[str] = (),
interrupt_before_nodes: All | Sequence[str] = (),
step_timeout: float | None = None,
debug: bool | None = None,
checkpointer: Checkpointer = None,
store: BaseStore | None = None,
cache: BaseCache | None = None,
retry_policy: RetryPolicy | Sequence[RetryPolicy] = (),
cache_policy: CachePolicy | None = None,
context_schema: type[ContextT] | None = None,
name: str = "LangGraph",
auto_validate: bool = True,
)
Parameters
nodes
dict[str, PregelNode | NodeBuilder]
required
Dictionary mapping node names to PregelNode or NodeBuilder instances.
channels
dict[str, BaseChannel | ManagedValueSpec] | None
required
Dictionary mapping channel names to channel instances. Channels are used for communication between nodes.
input_channels
str | Sequence[str]
required
Channel name(s) to use as input to the graph.
output_channels
str | Sequence[str]
required
Channel name(s) to use as output from the graph.
stream_mode
StreamMode
default:"'values'"
Mode to stream output. Options: 'values', 'updates', 'checkpoints', 'tasks', 'debug', 'messages', 'custom'.
stream_channels
str | Sequence[str] | None
default:"None"
Channels to stream. Defaults to all channels not in reserved channels.
Whether to force emitting stream events eagerly. Automatically enabled for 'messages' and 'custom' stream modes.
interrupt_after_nodes
All | Sequence[str]
default:"()"
Node names to interrupt after. Use '*' to interrupt after all nodes.
interrupt_before_nodes
All | Sequence[str]
default:"()"
Node names to interrupt before. Use '*' to interrupt before all nodes.
step_timeout
float | None
default:"None"
Maximum time to wait for a step to complete, in seconds.
debug
bool | None
default:"None"
Whether to print debug information during execution.
checkpointer
Checkpointer
default:"None"
Checkpointer used to save and load graph state.
store
BaseStore | None
default:"None"
Memory store to use for SharedValues.
cache
BaseCache | None
default:"None"
Cache to use for storing node results.
retry_policy
RetryPolicy | Sequence[RetryPolicy]
default:"()"
Retry policies to use when running tasks. Empty set disables retries.
cache_policy
CachePolicy | None
default:"None"
Cache policy to use for all nodes. Can be overridden by individual nodes.
context_schema
type[ContextT] | None
default:"None"
Schema for the context object that will be passed to the workflow.
Whether to automatically validate the graph structure on initialization.
Core Methods
invoke
invoke(
input: InputT,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode | list[StreamMode] | None = None,
output_keys: Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
debug: bool | None = None,
**kwargs: Any,
) -> OutputT
Synchronously invoke the graph and return the final output.
Parameters
config
RunnableConfig | None
default:"None"
Configuration for the run, including thread_id for checkpointing.
stream_mode
StreamMode | list[StreamMode] | None
default:"None"
Override the graph’s default stream mode for this invocation.
output_keys
Sequence[str] | None
default:"None"
Specific output keys to return.
interrupt_before
All | Sequence[str] | None
default:"None"
Override interrupt_before_nodes for this invocation.
interrupt_after
All | Sequence[str] | None
default:"None"
Override interrupt_after_nodes for this invocation.
debug
bool | None
default:"None"
Override the debug flag for this invocation.
Returns
The final output from the graph.
Usage Example
from langgraph.channels import EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
.build()
)
app = Pregel(
nodes={"node1": node1},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b"],
)
result = app.invoke({"a": "foo"})
print(result) # {'b': 'foofoo'}
stream
stream(
input: InputT,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode | list[StreamMode] | None = None,
output_keys: Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
debug: bool | None = None,
subgraphs: bool = False,
**kwargs: Any,
) -> Iterator[Any]
Synchronously stream graph execution, yielding outputs as they become available.
Parameters
config
RunnableConfig | None
default:"None"
Configuration for the run.
stream_mode
StreamMode | list[StreamMode] | None
default:"None"
Override the graph’s default stream mode. Can specify multiple modes as a list.
Whether to stream subgraph execution as well.
Returns
Iterator yielding outputs based on the stream mode(s).
Usage Example
from langgraph.channels import LastValue, EphemeralValue
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b")
.build()
)
node2 = (
NodeBuilder().subscribe_to("b")
.do(lambda x: x["b"] + x["b"])
.write_to("c")
.build()
)
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": LastValue(str),
"c": EphemeralValue(str),
},
input_channels=["a"],
output_channels=["b", "c"],
stream_mode="updates",
)
for chunk in app.stream({"a": "foo"}):
print(chunk)
# {'node1': {'b': 'foofoo'}}
# {'node2': {'c': 'foofoofoofoo'}}
ainvoke
async ainvoke(
input: InputT,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode | list[StreamMode] | None = None,
output_keys: Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
debug: bool | None = None,
**kwargs: Any,
) -> OutputT
Asynchronously invoke the graph and return the final output.
Parameters
Same as invoke().
Returns
The final output from the graph.
astream
async astream(
input: InputT,
config: RunnableConfig | None = None,
*,
stream_mode: StreamMode | list[StreamMode] | None = None,
output_keys: Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
debug: bool | None = None,
subgraphs: bool = False,
**kwargs: Any,
) -> AsyncIterator[Any]
Asynchronously stream graph execution.
Parameters
Same as stream().
Returns
Async iterator yielding outputs based on the stream mode(s).
get_graph
get_graph(
config: RunnableConfig | None = None,
*,
xray: int | bool = False
) -> Graph
Return a drawable representation of the computation graph.
Parameters
config
RunnableConfig | None
default:"None"
Configuration for the graph.
xray
int | bool
default:"False"
Whether to include subgraph details. If an integer, specifies the depth level.
Returns
A Graph object that can be visualized.
Usage Example
graph = app.get_graph()
png_data = graph.draw_mermaid_png()
# In Jupyter notebooks, the graph is displayed automatically
app.get_graph()
NodeBuilder
NodeBuilder provides a fluent API for building Pregel nodes.
Defined in: langgraph/pregel/main.py:161
Methods
subscribe_to
subscribe_to(
*channels: str,
read: bool = True,
) -> Self
Add channels to subscribe to. Node will be invoked when any of these channels are updated.
subscribe_only
subscribe_only(channel: str) -> Self
Subscribe to a single channel only.
read_from
read_from(*channels: str) -> Self
Adds channels to read from without subscribing to them.
do(node: RunnableLike) -> Self
Adds the specified node/runnable to execute.
write_to
write_to(
*channels: str | ChannelWriteEntry,
**kwargs: _WriteValue,
) -> Self
Add channel writes.
meta(*tags: str, **metadata: Any) -> Self
Add tags or metadata to the node.
add_retry_policies
add_retry_policies(*policies: RetryPolicy) -> Self
Adds retry policies to the node.
add_cache_policy
add_cache_policy(policy: CachePolicy) -> Self
Adds cache policy to the node.
build
Builds and returns the PregelNode.
Usage Example
from langgraph.pregel import NodeBuilder
from langgraph.pregel._write import ChannelWriteEntry
node = (
NodeBuilder()
.subscribe_only("input_channel")
.do(lambda x: x.upper())
.write_to("output_channel")
.meta("processing", version="1.0")
.build()
)
Channels
Channels are used to communicate between actors (nodes). LangGraph provides several built-in channel types:
Basic Channels
- LastValue: Stores the last value sent to the channel
- Topic: A configurable PubSub Topic for sending multiple values
Advanced Channels
- Context: Exposes the value of a context manager
- BinaryOperatorAggregate: Stores a persistent value updated by applying a binary operator
Usage Example
from langgraph.channels import LastValue, Topic, BinaryOperatorAggregate
import operator
channels = {
"messages": LastValue(str),
"events": Topic(dict, accumulate=True),
"counter": BinaryOperatorAggregate(int, operator=operator.add),
}
Complete Example
from langgraph.channels import EphemeralValue, BinaryOperatorAggregate
from langgraph.pregel import Pregel, NodeBuilder
node1 = (
NodeBuilder().subscribe_only("a")
.do(lambda x: x + x)
.write_to("b", "c")
.build()
)
node2 = (
NodeBuilder().subscribe_only("b")
.do(lambda x: x + x)
.write_to("c")
.build()
)
def reducer(current, update):
if current:
return current + " | " + update
return update
app = Pregel(
nodes={"node1": node1, "node2": node2},
channels={
"a": EphemeralValue(str),
"b": EphemeralValue(str),
"c": BinaryOperatorAggregate(str, operator=reducer),
},
input_channels=["a"],
output_channels=["c"],
stream_mode="values",
)
result = app.invoke({"a": "foo"})
print(result) # {'c': 'foofoo | foofoofoofoo'}
See Also