Skip to main content
Pipelines are the core abstraction in ZenML. They define a sequence of steps that process data and produce artifacts. A pipeline is a directed acyclic graph (DAG) where each node represents a step.

Basic Pipeline Structure

A ZenML pipeline is created using the @pipeline decorator:
from zenml import pipeline

@pipeline
def my_pipeline():
    """A simple pipeline that runs steps in sequence."""
    result = step_one()
    step_two(result)

Creating Your First Pipeline

1
Step 1: Import Required Components
2
Start by importing the pipeline decorator and any steps you’ll use:
3
from zenml import pipeline, step
from typing import Annotated
4
Step 2: Define Your Steps
5
Create the steps that will make up your pipeline:
6
@step
def load_data() -> Annotated[dict, "dataset"]:
    """Load data from a source."""
    return {"data": [1, 2, 3, 4, 5]}

@step
def process_data(dataset: dict) -> Annotated[list, "processed"]:
    """Process the loaded data."""
    return [x * 2 for x in dataset["data"]]

@step
def save_results(processed: list) -> None:
    """Save the processed results."""
    print(f"Results: {processed}")
7
Step 3: Connect Steps in a Pipeline
8
Connect your steps together in the pipeline function:
9
@pipeline
def data_processing_pipeline():
    """Pipeline that loads, processes, and saves data."""
    dataset = load_data()
    processed = process_data(dataset)
    save_results(processed)
10
Step 4: Run Your Pipeline
11
Execute the pipeline in your Python script:
12
if __name__ == "__main__":
    data_processing_pipeline()

Pipeline Parameters

You can make pipelines configurable by adding parameters:
from typing import Optional

@pipeline
def training_pipeline(
    learning_rate: float = 0.001,
    epochs: int = 10,
    model_name: Optional[str] = None
):
    """Configurable training pipeline.
    
    Args:
        learning_rate: Learning rate for model training
        epochs: Number of training epochs
        model_name: Optional name for the trained model
    """
    data = load_training_data()
    model = train_model(data, learning_rate=learning_rate, epochs=epochs)
    
    if model_name:
        save_model(model, name=model_name)
Run with custom parameters:
training_pipeline(
    learning_rate=0.01,
    epochs=20,
    model_name="my_model_v1"
)

Pipeline Return Values

Pipelines can return artifacts that are tracked by ZenML:
@pipeline
def simple_pipeline(name: Optional[str] = None) -> Annotated[str, "greeting"]:
    """Pipeline that returns a tracked artifact.
    
    Args:
        name: Optional name to personalize the greeting
        
    Returns:
        A greeting message as a tracked artifact
    """
    greeting = simple_step(name=name)
    return greeting

Advanced Pipeline Patterns

Conditional Execution

Use Python conditionals to control pipeline flow:
@pipeline
def conditional_pipeline(run_evaluation: bool = True):
    """Pipeline with conditional step execution."""
    data = load_data()
    model = train_model(data)
    
    if run_evaluation:
        metrics = evaluate_model(model, data)
        log_metrics(metrics)

Multiple Outputs

Steps can return multiple artifacts:
from typing import Tuple

@step
def split_data(
    dataset: dict
) -> Tuple[
    Annotated[dict, "train_data"],
    Annotated[dict, "test_data"]
]:
    """Split dataset into train and test sets."""
    # Split logic here
    return train_data, test_data

@pipeline
def ml_pipeline():
    """Pipeline with multiple step outputs."""
    dataset = load_data()
    train_data, test_data = split_data(dataset)
    model = train_model(train_data)
    evaluate_model(model, test_data)

Nested Pipelines

Compose complex workflows by calling pipelines within pipelines:
@pipeline
def feature_engineering():
    """Feature engineering pipeline."""
    raw_data = load_raw_data()
    features = extract_features(raw_data)
    return features

@pipeline
def training_pipeline(use_cached_features: bool = False):
    """Training pipeline that can reuse features."""
    if use_cached_features:
        # Load features from previous run
        features = load_cached_features()
    else:
        # Run feature engineering
        features = feature_engineering()
    
    model = train_model(features)
    return model

Pipeline Configuration

Configure pipeline behavior with settings:
from zenml import pipeline

@pipeline(
    enable_cache=False,  # Disable caching
    name="my_custom_pipeline",  # Custom pipeline name
)
def configured_pipeline():
    """Pipeline with custom configuration."""
    step_one()
    step_two()

Best Practices

Keep Pipelines Focused

Each pipeline should have a clear, single purpose (training, inference, data processing, etc.)

Use Type Hints

Always use type hints and Annotated for step inputs/outputs to enable proper artifact tracking

Make Pipelines Parameterizable

Use pipeline parameters instead of hardcoding values for flexibility

Document Your Pipelines

Add clear docstrings explaining what the pipeline does and what parameters it accepts

Running Pipelines

There are multiple ways to run a pipeline:
# In your Python script
if __name__ == "__main__":
    my_pipeline()

Next Steps

Deploying Pipelines

Learn how to deploy pipelines to production environments

Scheduling Pipelines

Set up automated pipeline runs on a schedule

Writing Steps

Learn how to write effective pipeline steps

Artifact Management

Understand how ZenML tracks and manages artifacts

Build docs developers (and LLMs) love