Skip to main content

Overview

Runnables are the foundation of LangChain’s composable architecture. Every component that can be invoked, streamed, or batched implements the Runnable interface.

Runnable

Base abstract class for units of work that can be invoked, batched, streamed, transformed and composed. Source: langchain_core.runnables.base:124

Type Parameters

Input
TypeVar
The input type for the runnable
Output
TypeVar
The output type for the runnable

Properties

name
str | None
The name of the runnable. Used for debugging and tracing.
InputType
type[Input]
Input type as a type annotation. Inferred from generic parameterization.
OutputType
type[Output]
Output type as a type annotation. Inferred from generic parameterization.
input_schema
type[BaseModel]
The input type specified as a Pydantic model.
output_schema
type[BaseModel]
The output type specified as a Pydantic model.
config_specs
list[ConfigurableFieldSpec]
List of configurable fields for this runnable.

Core Methods

invoke

def invoke(
    self,
    input: Input,
    config: RunnableConfig | None = None,
    **kwargs: Any
) -> Output
Transform a single input into an output.
input
Input
required
The input to the runnable
config
RunnableConfig | None
Configuration for execution, tags, metadata, callbacks
**kwargs
Any
Additional arguments passed to the runnable
return
Output
The output of the runnable

ainvoke

async def ainvoke(
    self,
    input: Input,
    config: RunnableConfig | None = None,
    **kwargs: Any
) -> Output
Asynchronous version of invoke. By default, runs sync version in thread pool.

batch

def batch(
    self,
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any
) -> list[Output]
Efficiently transform multiple inputs into outputs.
inputs
list[Input]
required
List of inputs to process
config
RunnableConfig | list[RunnableConfig] | None
Configuration for each input. Can be single config or list matching inputs.
return_exceptions
bool
default:"False"
If True, returns exceptions instead of raising them
return
list[Output]
List of outputs corresponding to inputs

abatch

async def abatch(
    self,
    inputs: list[Input],
    config: RunnableConfig | list[RunnableConfig] | None = None,
    *,
    return_exceptions: bool = False,
    **kwargs: Any
) -> list[Output]
Async version of batch.

stream

def stream(
    self,
    input: Input,
    config: RunnableConfig | None = None,
    **kwargs: Any
) -> Iterator[Output]
Stream output from a single input as it’s produced.
return
Iterator[Output]
Iterator yielding output chunks

astream

async def astream(
    self,
    input: Input,
    config: RunnableConfig | None = None,
    **kwargs: Any
) -> AsyncIterator[Output]
Async version of stream.

Composition Methods

Pipe operator (|)

def __or__(self, other: Runnable[Output, Other]) -> RunnableSequence[Input, Other]
Compose this runnable with another to create a sequence.
from langchain_core.runnables import RunnableLambda

sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
sequence.invoke(5)  # 12

Transformation Methods

with_retry

def with_retry(
    self,
    *,
    retry_if_exception_type: tuple[type[BaseException], ...] = (Exception,),
    wait_exponential_jitter: bool = True,
    stop_after_attempt: int = 3
) -> RunnableRetry[Input, Output]
Create a new runnable that retries on failure.
retry_if_exception_type
tuple[type[BaseException], ...]
Exception types to retry on
wait_exponential_jitter
bool
default:"True"
Use exponential backoff with jitter
stop_after_attempt
int
default:"3"
Maximum number of retry attempts

with_fallbacks

def with_fallbacks(
    self,
    fallbacks: Sequence[Runnable[Input, Output]],
    *,
    exceptions_to_handle: tuple[type[BaseException], ...] = (Exception,)
) -> RunnableWithFallbacks[Input, Output]
Create a runnable that falls back to alternative runnables on failure.
fallbacks
Sequence[Runnable[Input, Output]]
required
Sequence of fallback runnables to try in order
exceptions_to_handle
tuple[type[BaseException], ...]
Exception types that trigger fallback

with_config

def with_config(
    self,
    config: RunnableConfig | None = None,
    **kwargs: Any
) -> Runnable[Input, Output]
Bind a config to the runnable.

configurable_fields

def configurable_fields(
    self,
    **kwargs: ConfigurableField | ConfigurableFieldSingleOption | ConfigurableFieldMultiOption
) -> RunnableConfigurableFields
Make init args of the runnable configurable at runtime.

configurable_alternatives

def configurable_alternatives(
    self,
    which: ConfigurableField,
    *,
    default_key: str = "default",
    **kwargs: Runnable[Input, Output] | Callable[[], Runnable[Input, Output]]
) -> RunnableConfigurableAlternatives[Input, Output]
Specify alternative runnables which can be swapped at runtime.

Schema Methods

get_input_schema

def get_input_schema(
    self,
    config: RunnableConfig | None = None
) -> type[BaseModel]
Get a Pydantic model for validating input.

get_output_schema

def get_output_schema(
    self,
    config: RunnableConfig | None = None
) -> type[BaseModel]
Get a Pydantic model for validating output.

get_input_jsonschema

def get_input_jsonschema(
    self,
    config: RunnableConfig | None = None
) -> dict[str, Any]
Get JSON schema representing the input.

get_output_jsonschema

def get_output_jsonschema(
    self,
    config: RunnableConfig | None = None
) -> dict[str, Any]
Get JSON schema representing the output.

config_schema

def config_schema(
    self,
    *,
    include: Sequence[str] | None = None
) -> type[BaseModel]
Get Pydantic model representing the config.

Graph Methods

get_graph

def get_graph(
    self,
    config: RunnableConfig | None = None
) -> Graph
Return a graph representation of this runnable.

get_name

def get_name(
    self,
    suffix: str | None = None,
    *,
    name: str | None = None
) -> str
Get the name of the runnable.
suffix
str | None
Optional suffix to append to the name
name
str | None
Optional name to use instead of the runnable’s name

RunnableSequence

A sequence of runnables where the output of one is the input of the next. Source: langchain_core.runnables.base Created using the | operator or by passing a list to RunnableSequence.
from langchain_core.runnables import RunnableLambda

sequence = RunnableLambda(lambda x: x + 1) | RunnableLambda(lambda x: x * 2)
# or
from langchain_core.runnables import RunnableSequence
sequence = RunnableSequence(first=..., middle=[...], last=...)

RunnableParallel

Invoke runnables concurrently with the same input. Source: langchain_core.runnables.base
from langchain_core.runnables import RunnableLambda, RunnableParallel

parallel = RunnableParallel(
    doubled=RunnableLambda(lambda x: x * 2),
    tripled=RunnableLambda(lambda x: x * 3)
)
parallel.invoke(5)  # {"doubled": 10, "tripled": 15}

RunnableLambda

Wrap a callable as a Runnable. Source: langchain_core.runnables.base
from langchain_core.runnables import RunnableLambda

def add_one(x: int) -> int:
    return x + 1

runnable = RunnableLambda(add_one)
runnable.invoke(5)  # 6

Constructor

def __init__(
    self,
    func: Callable[[Input], Output] | Callable[[Input, RunnableConfig], Output],
    afunc: Callable[[Input], Awaitable[Output]] | None = None
)
func
Callable
required
Synchronous function to wrap. Can optionally accept RunnableConfig as second argument.
afunc
Callable | None
Optional async function. If not provided, runs func in thread pool.

RunnablePassthrough

Pass inputs unchanged or with additional keys. Source: langchain_core.runnables.passthrough
from langchain_core.runnables import RunnablePassthrough

# Pass through unchanged
RunnablePassthrough().invoke({"x": 1})  # {"x": 1}

# Assign additional keys
RunnablePassthrough.assign(y=lambda x: x["x"] * 2).invoke({"x": 1})
# {"x": 1, "y": 2}

RunnableConfig

Configuration for runnable execution. Source: langchain_core.runnables.config
class RunnableConfig(TypedDict, total=False):
    tags: list[str]
    metadata: dict[str, Any]
    callbacks: Callbacks
    run_name: str
    max_concurrency: int | None
    recursion_limit: int
    configurable: dict[str, Any]
tags
list[str]
Tags for tracing and filtering
metadata
dict[str, Any]
Metadata for tracing
callbacks
Callbacks
Callbacks to invoke during execution
run_name
str
Name for the run in tracing
max_concurrency
int | None
Maximum number of parallel calls
recursion_limit
int
Maximum recursion depth
configurable
dict[str, Any]
Runtime configuration for configurable fields

Build docs developers (and LLMs) love