Skip to main content
The @pipeline decorator transforms a Python function into a ZenML pipeline that orchestrates multiple steps.

Signature

@pipeline(
    name: Optional[str] = None,
    dynamic: Optional[bool] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    enable_heartbeat: Optional[bool] = None,
    enable_pipeline_logs: Optional[bool] = None,
    environment: Optional[Dict[str, Any]] = None,
    secrets: Optional[List[Union[UUID, str]]] = None,
    settings: Optional[Dict[str, SettingsOrDict]] = None,
    tags: Optional[List[Union[str, Tag]]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional[HookSpecification] = None,
    on_success: Optional[HookSpecification] = None,
    on_init: Optional[InitHookSpecification] = None,
    on_init_kwargs: Optional[Dict[str, Any]] = None,
    on_cleanup: Optional[HookSpecification] = None,
    model: Optional[Model] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
    execution_mode: Optional[ExecutionMode] = None,
    cache_policy: Optional[CachePolicyOrString] = None,
) -> Pipeline

Parameters

name
str
The name of the pipeline. If not provided, the function name is used.
dynamic
bool
Whether this is a dynamic pipeline that can create steps at runtime.
enable_cache
bool
Whether to use caching for pipeline steps. Defaults to True.
enable_artifact_metadata
bool
Whether to enable metadata extraction for artifacts. Defaults to True.
enable_step_logs
bool
Whether to enable step logs.
enable_heartbeat
bool
Whether to enable heartbeat monitoring for steps.
enable_pipeline_logs
bool
Whether to enable pipeline logs.
environment
Dict[str, Any]
Environment variables to set when running this pipeline.
secrets
List[Union[UUID, str]]
Secrets to set as environment variables.
settings
Dict[str, SettingsOrDict]
Stack component settings for this pipeline.
tags
List[Union[str, Tag]]
Tags to apply to pipeline runs.
extra
Dict[str, Any]
Extra configurations for this pipeline.
on_failure
HookSpecification
Callback function or source path to execute on pipeline failure.
on_success
HookSpecification
Callback function or source path to execute on pipeline success.
on_init
InitHookSpecification
Callback function to run on pipeline initialization.
on_init_kwargs
Dict[str, Any]
Arguments for the init hook.
on_cleanup
HookSpecification
Callback function to run on pipeline cleanup.
model
Model
Model configuration for the Model Control Plane.
retry
StepRetryConfig
Retry configuration for pipeline steps.
substitutions
Dict[str, str]
Extra substitutions for name placeholders like {date} and {time}.
execution_mode
ExecutionMode
The execution mode to use for the pipeline.
cache_policy
CachePolicyOrString
Cache policy for this pipeline.

Returns

pipeline
Pipeline
A configured Pipeline instance that can be called to run the pipeline.

Examples

Basic Pipeline

from zenml import pipeline, step

@step
def load_data() -> dict:
    return {"data": [1, 2, 3]}

@step
def process_data(data: dict) -> dict:
    return {"processed": data["data"]}

@pipeline
def ml_pipeline():
    data = load_data()
    process_data(data)

# Run the pipeline
ml_pipeline()

Pipeline with Configuration

from zenml import pipeline, step, Model

@pipeline(
    name="training_pipeline",
    enable_cache=False,
    tags=["training", "production"],
    model=Model(
        name="my_model",
        version="v1"
    )
)
def training_pipeline():
    data = load_data()
    train_model(data)

Dynamic Pipeline

from zenml import pipeline, step

@step
def dynamic_step(x: int) -> int:
    return x * 2

@pipeline(dynamic=True)
def dynamic_pipeline(n: int):
    # Create steps dynamically based on input
    for i in range(n):
        dynamic_step(x=i)

@step

Learn about creating pipeline steps

Model

Configure model versioning

get_pipeline_context

Access pipeline context

Schedule

Schedule pipeline runs

Build docs developers (and LLMs) love