Skip to main content

Overview

This quickstart guides you through creating, running, and understanding your first ZenML pipeline. You’ll learn:
  • How to define steps and pipelines
  • How ZenML tracks artifacts and metadata
  • How to run pipelines locally and view results
  • How to parameterize pipelines for different scenarios
By the end, you’ll have a working pipeline and understand ZenML’s core building blocks.

Prerequisites

Before starting, make sure you have:
1

Python 3.10+

ZenML requires Python 3.10 or higher. Check your version:
python --version
# Should output: Python 3.10.x or higher
2

ZenML Installed

Install ZenML with server capabilities:
pip install "zenml[server]"
3

ZenML Initialized

Initialize ZenML in your working directory:
zenml init
This creates a .zen directory to store configuration.
4

Server Running (Optional)

For the web dashboard, start a local server:
zenml login --local
Access the dashboard at http://localhost:8237
You can run pipelines without starting the server, but you won’t have access to the web dashboard for visualization.

Your First Pipeline

Step 1: Create a Step

Steps are the building blocks of ZenML pipelines. Create a file called quickstart.py:
quickstart.py
from typing import Optional
from zenml import step

@step
def simple_step(name: Optional[str] = None) -> str:
    """A simple step that returns a personalized greeting.
    
    This is the simplest possible ZenML step. It:
    1. Takes an optional input parameter
    2. Returns a string
    3. Is automatically tracked as an artifact by ZenML
    
    Args:
        name: Optional name to personalize the greeting
        
    Returns:
        A personalized greeting message
    """
    if name:
        message = f"Hello {name}! Welcome to ZenML 🚀"
    else:
        message = "Hello from ZenML! 🚀"
    print(message)
    return message
The @step decorator tells ZenML to track this function’s execution, inputs, outputs, and metadata automatically.

Step 2: Create a Pipeline

Pipelines compose multiple steps into a workflow. Add this to quickstart.py:
quickstart.py
from typing import Annotated, Optional
from zenml import pipeline, step

@step
def simple_step(name: Optional[str] = None) -> str:
    """A simple step that returns a personalized greeting."""
    if name:
        message = f"Hello {name}! Welcome to ZenML 🚀"
    else:
        message = "Hello from ZenML! 🚀"
    print(message)
    return message

@pipeline(enable_cache=False)
def simple_pipeline(name: Optional[str] = None) -> Annotated[str, "greeting"]:
    """A simple pipeline that demonstrates ZenML basics.
    
    This pipeline:
    1. Takes an optional name parameter
    2. Calls a single step that returns a personalized greeting
    3. Returns the result as a tracked artifact
    
    Args:
        name: Optional name to personalize the greeting
        
    Returns:
        A greeting message as an artifact
    """
    greeting = simple_step(name=name)
    return greeting

if __name__ == "__main__":
    # Run the pipeline
    result = simple_pipeline()
    print(f"Pipeline returned: {result}")
The Annotated[str, "greeting"] syntax gives the output artifact a meaningful name in the ZenML dashboard.

Step 3: Run the Pipeline

python quickstart.py
You’ll see output similar to:
🚀 Running ZenML quickstart pipeline...
Hello from ZenML! 🚀
Pipeline simple_pipeline completed successfully!
┏━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Pipeline Run       │ simple_pipeline-2024_03_09-10_30  ┃
┠────────────────────┼───────────────────────────────────┨
┃ Status             │ completed                         ┃
┃ Steps              │ 1                                 ┃
┃ Artifacts          │ 1                                 ┃
┃ Duration           │ 2.3s                              ┃
┗━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
ZenML automatically versioned your code, tracked the execution time, and stored the output artifact. All of this is queryable through the Python API or dashboard.

Step 4: Run with Parameters

Pipelines can accept parameters to customize behavior:
# Run with a custom name
result = simple_pipeline(name="Alice")
print(f"Pipeline returned: {result}")
Output:
Hello Alice! Welcome to ZenML 🚀
Pipeline returned: Hello Alice! Welcome to ZenML 🚀

Understanding What Happened

When you ran the pipeline, ZenML:
1

Tracked Execution

Created a pipeline run record with a unique ID, timestamp, and status.
2

Stored Artifacts

Saved the output string as a versioned artifact in the artifact store (local filesystem by default).
3

Captured Metadata

Logged parameters, execution time, system information, and step-level details.
4

Created Lineage

Built a graph showing the relationship between inputs, steps, and outputs.

Viewing Results

Command Line

Inspect pipeline runs using the CLI:
# List recent pipeline runs
zenml pipeline runs list

# Show details of the latest run
zenml pipeline runs describe

# List artifacts from the latest run
zenml pipeline runs artifact list

Dashboard

If you started the server with zenml login --local, open http://localhost:8237 to:
  • View all pipeline runs in a timeline
  • Explore the DAG (directed acyclic graph) of your pipeline
  • Inspect artifact contents and metadata
  • Compare runs across different parameters
  • Debug failed steps with logs and stack traces

Building a More Complete Pipeline

Let’s expand the example to demonstrate multiple steps and artifact passing:
complete_pipeline.py
from typing import Annotated, Dict, List
from zenml import pipeline, step
import random

@step
def generate_numbers(count: int = 10) -> Annotated[List[int], "random_numbers"]:
    """Generate a list of random numbers.
    
    Args:
        count: Number of random integers to generate
        
    Returns:
        List of random integers
    """
    numbers = [random.randint(1, 100) for _ in range(count)]
    print(f"Generated {count} numbers: {numbers}")
    return numbers

@step
def calculate_statistics(
    numbers: List[int]
) -> Annotated[Dict[str, float], "statistics"]:
    """Calculate statistics from a list of numbers.
    
    Args:
        numbers: List of integers to analyze
        
    Returns:
        Dictionary containing mean, median, min, and max
    """
    sorted_numbers = sorted(numbers)
    n = len(numbers)
    
    stats = {
        "mean": sum(numbers) / n,
        "median": sorted_numbers[n // 2],
        "min": min(numbers),
        "max": max(numbers),
        "count": n
    }
    
    print(f"Statistics: {stats}")
    return stats

@step
def format_report(
    statistics: Dict[str, float]
) -> Annotated[str, "report"]:
    """Create a formatted report from statistics.
    
    Args:
        statistics: Dictionary of calculated statistics
        
    Returns:
        Formatted report string
    """
    report = f"""
    📊 Statistics Report
    {'='*40}
    Count:  {statistics['count']}
    Mean:   {statistics['mean']:.2f}
    Median: {statistics['median']:.2f}
    Min:    {statistics['min']:.2f}
    Max:    {statistics['max']:.2f}
    """
    print(report)
    return report

@pipeline(enable_cache=True)
def statistics_pipeline(
    count: int = 10
) -> Annotated[str, "final_report"]:
    """Pipeline that generates numbers and computes statistics.
    
    This demonstrates:
    - Multiple steps with dependencies
    - Artifact passing between steps
    - Named artifacts with type annotations
    - Pipeline-level caching
    
    Args:
        count: Number of random integers to generate
        
    Returns:
        Formatted statistics report
    """
    numbers = generate_numbers(count=count)
    stats = calculate_statistics(numbers=numbers)
    report = format_report(statistics=stats)
    return report

if __name__ == "__main__":
    # Run with default parameters
    print("🚀 Running statistics pipeline...\n")
    result = statistics_pipeline()
    
    # Run again with different parameters
    print("\n🔄 Running again with 20 numbers...\n")
    result = statistics_pipeline(count=20)
Run it:
python complete_pipeline.py

Key Concepts Demonstrated

Artifact Passing

The numbers artifact from generate_numbers is automatically passed to calculate_statistics. ZenML handles serialization and deserialization.

Type Annotations

Annotated[List[int], "random_numbers"] specifies the output type AND gives it a name in the artifact store.

Caching

With enable_cache=True, ZenML reuses results when inputs haven’t changed. Run the same pipeline twice - the second run is instant!

Dependencies

ZenML automatically determines execution order based on which steps consume which artifacts.

Caching in Action

Run the complete pipeline twice with the same parameters:
# First run - executes all steps
result1 = statistics_pipeline(count=10)

# Second run - uses cached results (much faster!)
result2 = statistics_pipeline(count=10)

# Different parameters - cache miss, runs again
result3 = statistics_pipeline(count=20)
ZenML compares:
  • Input values (count=10 vs count=20)
  • Step code (did you change the function?)
  • Dependencies (did upstream artifacts change?)
If everything matches, cached artifacts are reused.
Caching is powerful for expensive operations (training models, processing large datasets), but disable it during development with enable_cache=False to see changes immediately.

Loading Artifacts

Access artifacts from previous runs programmatically:
from zenml.client import Client

# Get the ZenML client
client = Client()

# Get the latest run of a pipeline
run = client.get_pipeline("statistics_pipeline").last_successful_run

# Load specific artifacts
numbers = run.steps["generate_numbers"].output.load()
stats = run.steps["calculate_statistics"].output.load()
report = run.steps["format_report"].output.load()

print(f"Numbers from previous run: {numbers}")
print(f"Stats from previous run: {stats}")
This is useful for:
  • Comparing model performance across runs
  • Debugging by inspecting intermediate artifacts
  • Building evaluation pipelines that analyze previous outputs

Adding Context to Steps

Steps can access runtime information:
from zenml import step, get_step_context

@step
def context_aware_step() -> str:
    """Demonstrate accessing step context."""
    context = get_step_context()
    
    print(f"Pipeline name: {context.pipeline_name}")
    print(f"Step name: {context.step_name}")
    print(f"Run name: {context.pipeline_run.name}")
    
    return f"Executed in {context.pipeline_name}"
Context provides:
  • Pipeline and step names
  • Run ID and metadata
  • Stack configuration
  • Input/output artifact info

Configuration and Settings

Pipeline Configuration

from zenml import pipeline
from zenml.config import DeploymentSettings, CORSConfig

# Configure pipeline for deployment
deployment_settings = DeploymentSettings(
    app_title="Statistics Pipeline",
    cors=CORSConfig(allow_origins=["*"]),
)

@pipeline(
    enable_cache=True,
    settings={
        "deployment": deployment_settings,
    },
)
def my_pipeline():
    # Pipeline implementation
    pass

Step Configuration

from zenml import step
from zenml.config import ResourceSettings

@step(
    settings={
        "resources": ResourceSettings(
            cpu_count=4,
            memory="8GB",
        )
    }
)
def resource_intensive_step() -> str:
    # This step requests specific resources
    return "Computed with 4 CPUs and 8GB RAM"

Next Steps

You now understand ZenML’s core concepts! Here’s what to explore next:

Starter Guide

Deep dive into pipelines, steps, stacks, and artifacts

Example Projects

Explore production-ready examples: ML training, LLM apps, agents

Stack Components

Learn about orchestrators, artifact stores, and integrations

Deploy to Production

Run pipelines on Kubernetes, AWS, GCP, or Azure

Common Patterns

Data Loading Step

import pandas as pd
from typing import Annotated
from zenml import step

@step
def load_data(
    data_path: str
) -> Annotated[pd.DataFrame, "raw_data"]:
    """Load data from a file."""
    df = pd.read_csv(data_path)
    print(f"Loaded {len(df)} rows")
    return df

Model Training Step

from sklearn.ensemble import RandomForestClassifier
from typing import Annotated, Tuple
import numpy as np

@step
def train_model(
    X_train: np.ndarray,
    y_train: np.ndarray
) -> Annotated[RandomForestClassifier, "model"]:
    """Train a random forest classifier."""
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    return model

Evaluation Step

from sklearn.metrics import accuracy_score, f1_score
from typing import Dict
import numpy as np

@step
def evaluate_model(
    model: RandomForestClassifier,
    X_test: np.ndarray,
    y_test: np.ndarray
) -> Annotated[Dict[str, float], "metrics"]:
    """Evaluate model performance."""
    predictions = model.predict(X_test)
    
    metrics = {
        "accuracy": accuracy_score(y_test, predictions),
        "f1_score": f1_score(y_test, predictions, average="weighted")
    }
    
    print(f"Model metrics: {metrics}")
    return metrics

Troubleshooting

Make sure you started the ZenML server:
zenml login --local
Or check status:
zenml status
Caching requires:
  1. enable_cache=True in the pipeline decorator
  2. Identical inputs (parameters and upstream artifacts)
  3. Unchanged step code
Disable temporarily to force re-execution:
@pipeline(enable_cache=False)
def my_pipeline():
    pass
ZenML uses cloudpickle for serialization. Most Python objects work, but if you get import errors:
  1. Ensure all imports are at the top of your file
  2. Use built-in materializers for common types (pandas, numpy, sklearn)
  3. Create custom materializers for complex objects (see docs)
If changing step parameters doesn’t affect the output, caching might be enabled. Either:
# Disable caching in the pipeline
@pipeline(enable_cache=False)
def my_pipeline():
    pass

# Or pass different parameters to force cache miss
my_pipeline(count=11)  # Different from previous run

Additional Resources

  • ZenML Examples: Real-world projects including:
    • Classical ML pipelines (scikit-learn, XGBoost)
    • Deep learning workflows (PyTorch, TensorFlow)
    • LLM applications and RAG pipelines
    • AI agent orchestration
  • SDK Documentation: Complete API reference
  • Slack Community: Get help from 4,000+ ML engineers
  • YouTube Tutorials: Video walkthroughs
Congratulations! You’ve built your first ZenML pipelines and understand the fundamentals. Ready to build production ML systems? Check out the Starter Guide for advanced topics.

Build docs developers (and LLMs) love