Understanding ZenML pipelines and how they orchestrate ML workflows
Pipelines are the foundation of ZenML. A pipeline is a sequence of steps that define your machine learning workflow, from data ingestion to model deployment. ZenML pipelines create a Directed Acyclic Graph (DAG) of steps that can be executed locally or on remote infrastructure.
A ZenML pipeline is a Python function decorated with @pipeline that connects multiple steps together. Each pipeline execution is tracked, versioned, and can be reproduced exactly as it was run.
Here’s a simple pipeline example from the ZenML source:
from zenml import pipelinefrom typing import Annotated@pipelinedef simple_pipeline(name: str = "World") -> Annotated[str, "greeting"]: """A simple pipeline that demonstrates ZenML basics. Args: name: Name to include in greeting Returns: A greeting message as an artifact """ greeting = simple_step(name=name) return greeting
The @pipeline decorator accepts numerous configuration options:
@pipeline( name="my_pipeline", # Override default name enable_cache=True, # Enable step caching enable_artifact_metadata=True, # Track artifact metadata enable_step_logs=True, # Capture step logs settings={"docker": docker_settings}, # Configure stack settings tags=["production", "v1.0"], # Add tags for filtering model=Model(name="my_model"), # Link to model registry on_failure=alert_on_failure, # Failure callback on_success=notify_on_success, # Success callback)def training_pipeline(): data = load_data() model = train_model(data) evaluate_model(model)
Pipeline parameters (name, enable_cache, etc.) can only be passed to the @pipeline decorator, not when calling the pipeline. Step-specific configurations are passed to individual steps.
ZenML automatically creates a Directed Acyclic Graph (DAG) from your pipeline by analyzing step dependencies. Dependencies are determined by data flow between steps.
@pipelinedef parallel_pipeline(): data = load_data() # These three steps run in parallel (no dependencies between them) train_result = train_model_a(data) test_result = train_model_b(data) baseline = train_model_c(data) # This step waits for all three to complete best = compare_models(train_result, test_result, baseline)
Dynamic pipelines allow you to create steps and control flow programmatically at runtime:
from zenml import pipeline@pipeline(dynamic=True)def dynamic_training_pipeline(model_types: list[str]): """Train multiple models dynamically based on input list.""" data = load_data() models = [] for model_type in model_types: # Create steps dynamically model = train_model.with_options(name=f"train_{model_type}")( data=data, model_type=model_type ) models.append(model) # Select best model best_model = select_best(models) return best_model
Dynamic pipelines provide more flexibility but have some limitations compared to static pipelines. They’re compiled at runtime, so certain validations happen later in the execution.
def cleanup(): """Always runs at the end, even if pipeline fails.""" close_connections() release_resources()@pipeline(on_cleanup=cleanup)def pipeline_with_cleanup(): allocate_resources() do_work()
ZenML automatically caches step outputs to avoid recomputing unchanged steps:
@pipeline(enable_cache=True) # Default is Truedef cached_pipeline(): data = expensive_data_loading() # Cached after first run model = train_model(data) # Only reruns if data changes
from zenml.config import CachePolicy@pipeline( cache_policy=CachePolicy( max_age_days=7, # Invalidate cache after 7 days ignore_fields=["timestamp"] # Ignore these fields for cache key ))def time_sensitive_pipeline(): process_data(timestamp=datetime.now())
Caching compares input parameters, code, and configuration. If anything changes, the step reruns. This ensures reproducibility while saving time.
Link pipelines to models in the Model Control Plane:
from zenml import Model@pipeline( model=Model( name="customer_churn", version="production", tags=["classification", "production"] ))def model_training_pipeline(): """Pipeline runs are linked to the model version.""" data = load_data() model = train(data) return model