Steps are the building blocks of ZenML pipelines. Create a file called quickstart.py:
quickstart.py
from typing import Optionalfrom zenml import step@stepdef simple_step(name: Optional[str] = None) -> str: """A simple step that returns a personalized greeting. This is the simplest possible ZenML step. It: 1. Takes an optional input parameter 2. Returns a string 3. Is automatically tracked as an artifact by ZenML Args: name: Optional name to personalize the greeting Returns: A personalized greeting message """ if name: message = f"Hello {name}! Welcome to ZenML 🚀" else: message = "Hello from ZenML! 🚀" print(message) return message
The @step decorator tells ZenML to track this function’s execution, inputs, outputs, and metadata automatically.
Pipelines compose multiple steps into a workflow. Add this to quickstart.py:
quickstart.py
from typing import Annotated, Optionalfrom zenml import pipeline, step@stepdef simple_step(name: Optional[str] = None) -> str: """A simple step that returns a personalized greeting.""" if name: message = f"Hello {name}! Welcome to ZenML 🚀" else: message = "Hello from ZenML! 🚀" print(message) return message@pipeline(enable_cache=False)def simple_pipeline(name: Optional[str] = None) -> Annotated[str, "greeting"]: """A simple pipeline that demonstrates ZenML basics. This pipeline: 1. Takes an optional name parameter 2. Calls a single step that returns a personalized greeting 3. Returns the result as a tracked artifact Args: name: Optional name to personalize the greeting Returns: A greeting message as an artifact """ greeting = simple_step(name=name) return greetingif __name__ == "__main__": # Run the pipeline result = simple_pipeline() print(f"Pipeline returned: {result}")
The Annotated[str, "greeting"] syntax gives the output artifact a meaningful name in the ZenML dashboard.
ZenML automatically versioned your code, tracked the execution time, and stored the output artifact. All of this is queryable through the Python API or dashboard.
# List recent pipeline runszenml pipeline runs list# Show details of the latest runzenml pipeline runs describe# List artifacts from the latest runzenml pipeline runs artifact list
Let’s expand the example to demonstrate multiple steps and artifact passing:
complete_pipeline.py
from typing import Annotated, Dict, Listfrom zenml import pipeline, stepimport random@stepdef generate_numbers(count: int = 10) -> Annotated[List[int], "random_numbers"]: """Generate a list of random numbers. Args: count: Number of random integers to generate Returns: List of random integers """ numbers = [random.randint(1, 100) for _ in range(count)] print(f"Generated {count} numbers: {numbers}") return numbers@stepdef calculate_statistics( numbers: List[int]) -> Annotated[Dict[str, float], "statistics"]: """Calculate statistics from a list of numbers. Args: numbers: List of integers to analyze Returns: Dictionary containing mean, median, min, and max """ sorted_numbers = sorted(numbers) n = len(numbers) stats = { "mean": sum(numbers) / n, "median": sorted_numbers[n // 2], "min": min(numbers), "max": max(numbers), "count": n } print(f"Statistics: {stats}") return stats@stepdef format_report( statistics: Dict[str, float]) -> Annotated[str, "report"]: """Create a formatted report from statistics. Args: statistics: Dictionary of calculated statistics Returns: Formatted report string """ report = f""" 📊 Statistics Report {'='*40} Count: {statistics['count']} Mean: {statistics['mean']:.2f} Median: {statistics['median']:.2f} Min: {statistics['min']:.2f} Max: {statistics['max']:.2f} """ print(report) return report@pipeline(enable_cache=True)def statistics_pipeline( count: int = 10) -> Annotated[str, "final_report"]: """Pipeline that generates numbers and computes statistics. This demonstrates: - Multiple steps with dependencies - Artifact passing between steps - Named artifacts with type annotations - Pipeline-level caching Args: count: Number of random integers to generate Returns: Formatted statistics report """ numbers = generate_numbers(count=count) stats = calculate_statistics(numbers=numbers) report = format_report(statistics=stats) return reportif __name__ == "__main__": # Run with default parameters print("🚀 Running statistics pipeline...\n") result = statistics_pipeline() # Run again with different parameters print("\n🔄 Running again with 20 numbers...\n") result = statistics_pipeline(count=20)
Run the complete pipeline twice with the same parameters:
# First run - executes all stepsresult1 = statistics_pipeline(count=10)# Second run - uses cached results (much faster!)result2 = statistics_pipeline(count=10)# Different parameters - cache miss, runs againresult3 = statistics_pipeline(count=20)
ZenML compares:
Input values (count=10 vs count=20)
Step code (did you change the function?)
Dependencies (did upstream artifacts change?)
If everything matches, cached artifacts are reused.
Caching is powerful for expensive operations (training models, processing large datasets), but disable it during development with enable_cache=False to see changes immediately.
Access artifacts from previous runs programmatically:
from zenml.client import Client# Get the ZenML clientclient = Client()# Get the latest run of a pipelinerun = client.get_pipeline("statistics_pipeline").last_successful_run# Load specific artifactsnumbers = run.steps["generate_numbers"].output.load()stats = run.steps["calculate_statistics"].output.load()report = run.steps["format_report"].output.load()print(f"Numbers from previous run: {numbers}")print(f"Stats from previous run: {stats}")
This is useful for:
Comparing model performance across runs
Debugging by inspecting intermediate artifacts
Building evaluation pipelines that analyze previous outputs
ZenML uses cloudpickle for serialization. Most Python objects work, but if you get import errors:
Ensure all imports are at the top of your file
Use built-in materializers for common types (pandas, numpy, sklearn)
Create custom materializers for complex objects (see docs)
Step parameters not updating
If changing step parameters doesn’t affect the output, caching might be enabled. Either:
# Disable caching in the pipeline@pipeline(enable_cache=False)def my_pipeline(): pass# Or pass different parameters to force cache missmy_pipeline(count=11) # Different from previous run
Congratulations! You’ve built your first ZenML pipelines and understand the fundamentals. Ready to build production ML systems? Check out the Starter Guide for advanced topics.