Understanding the Runnable protocol and LangChain Expression Language for composable AI applications
Runnables are the foundational building blocks of LangChain, providing a universal protocol for creating composable, production-ready AI applications. The LangChain Expression Language (LCEL) enables declarative composition of these components.
Every component in LangChain implements the Runnable protocol, defined in /libs/core/langchain_core/runnables/base.py:124:
from langchain_core.runnables import Runnablefrom typing import Generic, TypeVarInput = TypeVar("Input")Output = TypeVar("Output")class Runnable(Generic[Input, Output]): """A unit of work that can be invoked, batched, streamed, and composed.""" def invoke(self, input: Input, config: RunnableConfig | None = None) -> Output: """Transform a single input into an output.""" def batch(self, inputs: list[Input], config: RunnableConfig | None = None) -> list[Output]: """Efficiently transform multiple inputs into outputs.""" def stream(self, input: Input, config: RunnableConfig | None = None) -> Iterator[Output]: """Stream output from a single input as it's produced."""
All Runnables automatically support both synchronous and asynchronous execution. Methods prefixed with a (e.g., ainvoke, astream, abatch) provide async variants.
for chunk in model.stream("Tell me a story"): print(chunk.content, end="", flush=True)# Async streamingasync for chunk in model.astream("Tell me a story"): print(chunk.content, end="", flush=True)
questions = [ "What is AI?", "What is ML?", "What is DL?"]# Batch processingresponses = model.batch(questions)for response in responses: print(response.content)# Async batchresponses = await model.abatch(questions)
By default, batch runs invoke() in parallel using a thread pool executor. Override the batch method to implement provider-specific batch APIs for better performance.
async for event in chain.astream_events(input, version="v2"): kind = event["event"] if kind == "on_chat_model_stream": print(event["data"]["chunk"].content, end="")
Override batch() for provider-specific batch APIs:
from langchain_core.runnables import Runnableclass BatchOptimizedModel(Runnable[str, str]): def invoke(self, input: str) -> str: return self._call_api([input])[0] def batch(self, inputs: list[str]) -> list[str]: # Use provider's batch API instead of parallel invoke return self._call_api(inputs) def _call_api(self, inputs: list[str]) -> list[str]: # Single API call for all inputs return [f"Response to: {inp}" for inp in inputs]
The default batch() implementation runs invoke() in parallel using ThreadPoolExecutor. For APIs with native batch support, override batch() for better performance.
Building standard workflows (prompt → model → parser)
You need automatic async/streaming/batch support
Composition clarity matters
You want built-in tracing and debugging
Use custom code when:
Complex business logic with many conditionals
Performance-critical tight loops
Existing code integration
Type hints for better DX
Always specify input/output types on custom Runnables:
class MyRunnable(Runnable[dict[str, str], str]): # IDE autocomplete knows input is dict[str, str] # and output is str pass
Config propagation
RunnableConfig automatically merges down the chain. Set global defaults at the root:
chain = prompt | model | parserresult = chain.invoke( input, config={"tags": ["production"], "callbacks": [handler]})# All components receive the same config