Skip to main content
Pipelines are the foundation of ZenML. A pipeline is a sequence of steps that define your machine learning workflow, from data ingestion to model deployment. ZenML pipelines create a Directed Acyclic Graph (DAG) of steps that can be executed locally or on remote infrastructure.

What is a Pipeline?

A ZenML pipeline is a Python function decorated with @pipeline that connects multiple steps together. Each pipeline execution is tracked, versioned, and can be reproduced exactly as it was run.

Creating a Pipeline

Here’s a simple pipeline example from the ZenML source:
from zenml import pipeline
from typing import Annotated

@pipeline
def simple_pipeline(name: str = "World") -> Annotated[str, "greeting"]:
    """A simple pipeline that demonstrates ZenML basics.
    
    Args:
        name: Name to include in greeting
        
    Returns:
        A greeting message as an artifact
    """
    greeting = simple_step(name=name)
    return greeting

Pipeline Parameters

The @pipeline decorator accepts numerous configuration options:
@pipeline(
    name="my_pipeline",                    # Override default name
    enable_cache=True,                     # Enable step caching
    enable_artifact_metadata=True,         # Track artifact metadata
    enable_step_logs=True,                 # Capture step logs
    settings={"docker": docker_settings},  # Configure stack settings
    tags=["production", "v1.0"],          # Add tags for filtering
    model=Model(name="my_model"),          # Link to model registry
    on_failure=alert_on_failure,          # Failure callback
    on_success=notify_on_success,         # Success callback
)
def training_pipeline():
    data = load_data()
    model = train_model(data)
    evaluate_model(model)
Pipeline parameters (name, enable_cache, etc.) can only be passed to the @pipeline decorator, not when calling the pipeline. Step-specific configurations are passed to individual steps.

Pipeline Execution

Pipelines can be run in different ways:

Local Execution

The simplest way to run a pipeline:
if __name__ == "__main__":
    simple_pipeline(name="Alice")

Scheduled Execution

Schedule pipelines to run automatically:
from zenml.config import Schedule

schedule = Schedule(cron_expression="0 2 * * *")  # Daily at 2 AM

simple_pipeline.run(schedule=schedule)

With Configuration Override

Override pipeline configuration at runtime:
from zenml.config import PipelineRunConfiguration

run_config = PipelineRunConfiguration(
    enable_cache=False,
    run_name="experimental_run"
)

simple_pipeline.with_options(**run_config)(name="Bob")

DAG Structure

ZenML automatically creates a Directed Acyclic Graph (DAG) from your pipeline by analyzing step dependencies. Dependencies are determined by data flow between steps.

Implicit Dependencies

When one step uses another step’s output, ZenML creates a dependency:
@pipeline
def ml_pipeline():
    # ZenML automatically determines execution order
    raw_data = load_data()           # Runs first
    clean_data = preprocess(raw_data)  # Runs second (depends on load_data)
    model = train(clean_data)        # Runs third (depends on preprocess)
    metrics = evaluate(model)        # Runs fourth (depends on train)

Parallel Execution

Steps without dependencies can run in parallel:
@pipeline
def parallel_pipeline():
    data = load_data()
    
    # These three steps run in parallel (no dependencies between them)
    train_result = train_model_a(data)
    test_result = train_model_b(data)
    baseline = train_model_c(data)
    
    # This step waits for all three to complete
    best = compare_models(train_result, test_result, baseline)

Dynamic Pipelines

Dynamic pipelines allow you to create steps and control flow programmatically at runtime:
from zenml import pipeline

@pipeline(dynamic=True)
def dynamic_training_pipeline(model_types: list[str]):
    """Train multiple models dynamically based on input list."""
    data = load_data()
    
    models = []
    for model_type in model_types:
        # Create steps dynamically
        model = train_model.with_options(name=f"train_{model_type}")(
            data=data,
            model_type=model_type
        )
        models.append(model)
    
    # Select best model
    best_model = select_best(models)
    return best_model
Dynamic pipelines provide more flexibility but have some limitations compared to static pipelines. They’re compiled at runtime, so certain validations happen later in the execution.

Pipeline Hooks

Hooks allow you to execute code at specific points in the pipeline lifecycle:

Initialization Hook

def setup_environment():
    """Run before pipeline starts."""
    print("Setting up environment...")
    return {"start_time": datetime.now()}

@pipeline(on_init=setup_environment)
def my_pipeline():
    process_data()

Success and Failure Hooks

def on_success():
    """Called when pipeline completes successfully."""
    send_notification("Pipeline succeeded!")

def on_failure(exception: BaseException):
    """Called when pipeline fails."""
    log_error(exception)
    alert_team(f"Pipeline failed: {exception}")

@pipeline(
    on_success=on_success,
    on_failure=on_failure
)
def robust_pipeline():
    risky_step()

Cleanup Hook

def cleanup():
    """Always runs at the end, even if pipeline fails."""
    close_connections()
    release_resources()

@pipeline(on_cleanup=cleanup)
def pipeline_with_cleanup():
    allocate_resources()
    do_work()

Pipeline Configuration

Pipelines can be configured through multiple mechanisms:

Environment Variables

Set environment variables for the entire pipeline:
@pipeline(
    environment={
        "CUDA_VISIBLE_DEVICES": "0,1",
        "OMP_NUM_THREADS": "4"
    }
)
def gpu_pipeline():
    train_on_gpu()

Secrets Integration

@pipeline(
    secrets=["aws_credentials", "api_keys"]
)
def secure_pipeline():
    # Secrets are available as environment variables
    upload_to_s3()

Stack Settings

Configure how the pipeline runs on specific stack components:
from zenml.config import DockerSettings

docker_settings = DockerSettings(
    requirements=["tensorflow==2.13.0", "pandas"],
    parent_image="tensorflow/tensorflow:latest-gpu"
)

@pipeline(settings={"docker": docker_settings})
def containerized_pipeline():
    train_in_container()

Caching

ZenML automatically caches step outputs to avoid recomputing unchanged steps:
@pipeline(enable_cache=True)  # Default is True
def cached_pipeline():
    data = expensive_data_loading()  # Cached after first run
    model = train_model(data)        # Only reruns if data changes

Cache Policy

Control caching behavior with cache policies:
from zenml.config import CachePolicy

@pipeline(
    cache_policy=CachePolicy(
        max_age_days=7,  # Invalidate cache after 7 days
        ignore_fields=["timestamp"]  # Ignore these fields for cache key
    )
)
def time_sensitive_pipeline():
    process_data(timestamp=datetime.now())
Caching compares input parameters, code, and configuration. If anything changes, the step reruns. This ensures reproducibility while saving time.

Model Context

Link pipelines to models in the Model Control Plane:
from zenml import Model

@pipeline(
    model=Model(
        name="customer_churn",
        version="production",
        tags=["classification", "production"]
    )
)
def model_training_pipeline():
    """Pipeline runs are linked to the model version."""
    data = load_data()
    model = train(data)
    return model

Best Practices

Keep Pipelines Simple

Design pipelines as orchestration logic. Keep complex computations inside steps.

Use Type Hints

Always type-hint inputs and outputs. This enables validation and better IDE support.

Enable Caching

Use caching to speed up development. Disable for final production runs if needed.

Add Documentation

Document your pipeline’s purpose, parameters, and expected outputs using docstrings.
  • Steps - Learn about pipeline building blocks
  • Artifacts - Understand how data flows between steps
  • Stacks - Configure where and how pipelines run
  • Models - Link pipelines to model versions

Code Reference

  • Pipeline decorator: src/zenml/pipelines/pipeline_decorator.py:80
  • Pipeline class: src/zenml/pipelines/pipeline_definition.py:86
  • Dynamic pipelines: src/zenml/pipelines/dynamic/pipeline_definition.py

Build docs developers (and LLMs) love