Overview
Runnables are the foundation of LangChain’s composable architecture. Every component that can be invoked, streamed, or batched implements the Runnable interface.
Runnable
Base abstract class for units of work that can be invoked, batched, streamed, transformed and composed.
Source: langchain_core.runnables.base:124
Type Parameters
The input type for the runnable
The output type for the runnable
Properties
The name of the runnable. Used for debugging and tracing.
Input type as a type annotation. Inferred from generic parameterization.
Output type as a type annotation. Inferred from generic parameterization.
The input type specified as a Pydantic model.
The output type specified as a Pydantic model.
config_specs
list[ConfigurableFieldSpec]
List of configurable fields for this runnable.
Core Methods
invoke
def invoke(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any
) -> Output
Transform a single input into an output.
The input to the runnable
Configuration for execution, tags, metadata, callbacks
Additional arguments passed to the runnable
The output of the runnable
ainvoke
async def ainvoke(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any
) -> Output
Asynchronous version of invoke. By default, runs sync version in thread pool.
batch
def batch(
self,
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any
) -> list[Output]
Efficiently transform multiple inputs into outputs.
List of inputs to process
config
RunnableConfig | list[RunnableConfig] | None
Configuration for each input. Can be single config or list matching inputs.
If True, returns exceptions instead of raising them
List of outputs corresponding to inputs
abatch
async def abatch(
self,
inputs: list[Input],
config: RunnableConfig | list[RunnableConfig] | None = None,
*,
return_exceptions: bool = False,
**kwargs: Any
) -> list[Output]
Async version of batch.
stream
def stream(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any
) -> Iterator[Output]
Stream output from a single input as it’s produced.
Iterator yielding output chunks
astream
async def astream(
self,
input: Input,
config: RunnableConfig | None = None,
**kwargs: Any
) -> AsyncIterator[Output]
Async version of stream.
Composition Methods
Pipe operator (|)
def __or__(self, other: Runnable[Output, Other]) -> RunnableSequence[Input, Other]
Compose this runnable with another to create a sequence.
from langchain_core.runnables import RunnableLambda
sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
sequence.invoke(5) # 12
with_retry
def with_retry(
self,
*,
retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
wait_exponential_jitter: bool = True,
stop_after_attempt: int = 3
) -> RunnableRetry[Input, Output]
Create a new runnable that retries on failure.
retry_if_exception_type
tuple[type[BaseException], ...]
Exception types to retry on
Use exponential backoff with jitter
Maximum number of retry attempts
with_fallbacks
def with_fallbacks(
self,
fallbacks: Sequence[Runnable[Input, Output]],
*,
exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,)
) -> RunnableWithFallbacks[Input, Output]
Create a runnable that falls back to alternative runnables on failure.
fallbacks
Sequence[Runnable[Input, Output]]
required
Sequence of fallback runnables to try in order
exceptions_to_handle
tuple[type[BaseException], ...]
Exception types that trigger fallback
with_config
def with_config(
self,
config: RunnableConfig | None = None,
**kwargs: Any
) -> Runnable[Input, Output]
Bind a config to the runnable.
configurable_fields
def configurable_fields(
self,
**kwargs: ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption
) -> RunnableConfigurableFields
Make init args of the runnable configurable at runtime.
configurable_alternatives
def configurable_alternatives(
self,
which: ConfigurableField,
*,
default_key: str = "default",
**kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
) -> RunnableConfigurableAlternatives[Input, Output]
Specify alternative runnables which can be swapped at runtime.
Schema Methods
def get_input_schema(
self,
config: RunnableConfig | None = None
) -> type[BaseModel]
Get a Pydantic model for validating input.
get_output_schema
def get_output_schema(
self,
config: RunnableConfig | None = None
) -> type[BaseModel]
Get a Pydantic model for validating output.
def get_input_jsonschema(
self,
config: RunnableConfig | None = None
) -> dict[str, Any]
Get JSON schema representing the input.
get_output_jsonschema
def get_output_jsonschema(
self,
config: RunnableConfig | None = None
) -> dict[str, Any]
Get JSON schema representing the output.
config_schema
def config_schema(
self,
*,
include: Sequence[str] | None = None
) -> type[BaseModel]
Get Pydantic model representing the config.
Graph Methods
get_graph
def get_graph(
self,
config: RunnableConfig | None = None
) -> Graph
Return a graph representation of this runnable.
get_name
def get_name(
self,
suffix: str | None = None,
*,
name: str | None = None
) -> str
Get the name of the runnable.
Optional suffix to append to the name
Optional name to use instead of the runnable’s name
RunnableSequence
A sequence of runnables where the output of one is the input of the next.
Source: langchain_core.runnables.base
Created using the | operator or by passing a list to RunnableSequence.
from langchain_core.runnables import RunnableLambda
sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
# or
from langchain_core.runnables import RunnableSequence
sequence = RunnableSequence(first=..., middle=[...], last=...)
RunnableParallel
Invoke runnables concurrently with the same input.
Source: langchain_core.runnables.base
from langchain_core.runnables import RunnableLambda, RunnableParallel
parallel = RunnableParallel(
doubled=RunnableLambda(lambda x: x * 2),
tripled=RunnableLambda(lambda x: x * 3)
)
parallel.invoke(5) # {"doubled": 10, "tripled": 15}
RunnableLambda
Wrap a callable as a Runnable.
Source: langchain_core.runnables.base
from langchain_core.runnables import RunnableLambda
def add_one(x: int) -> int:
return x + 1
runnable = RunnableLambda(add_one)
runnable.invoke(5) # 6
Constructor
def __init__(
self,
func: Callable[[Input], Output] | Callable[[Input, RunnableConfig], Output],
afunc: Callable[[Input], Awaitable[Output]] | None = None
)
Synchronous function to wrap. Can optionally accept RunnableConfig as second argument.
Optional async function. If not provided, runs func in thread pool.
RunnablePassthrough
Pass inputs unchanged or with additional keys.
Source: langchain_core.runnables.passthrough
from langchain_core.runnables import RunnablePassthrough
# Pass through unchanged
RunnablePassthrough().invoke({"x": 1}) # {"x": 1}
# Assign additional keys
RunnablePassthrough.assign(y=lambda x: x["x"] * 2).invoke({"x": 1})
# {"x": 1, "y": 2}
RunnableConfig
Configuration for runnable execution.
Source: langchain_core.runnables.config
class RunnableConfig(TypedDict, total=False):
tags: list[str]
metadata: dict[str, Any]
callbacks: Callbacks
run_name: str
max_concurrency: int | None
recursion_limit: int
configurable: dict[str, Any]
Tags for tracing and filtering
Callbacks to invoke during execution
Name for the run in tracing
Maximum number of parallel calls
Runtime configuration for configurable fields