Skip to main content
Steps are the fundamental building blocks of ZenML pipelines. Each step is a Python function decorated with @step that performs a specific task in your ML workflow. Steps can read inputs, perform computations, and produce outputs that are automatically tracked as artifacts.

What is a Step?

A step is a discrete unit of computation in a pipeline. Steps are:
  • Reproducible: Each execution is tracked with its inputs, outputs, and configuration
  • Cacheable: Outputs are reused if inputs and code haven’t changed
  • Composable: Steps can be combined in pipelines to create complex workflows
  • Portable: Steps can run on different infrastructure without code changes

Creating a Step

Here’s a simple step from the ZenML quickstart:
from zenml import step

@step
def simple_step(name: str = "World") -> str:
    """A simple step that returns a greeting.
    
    Args:
        name: Name to include in greeting
        
    Returns:
        A personalized greeting message
    """
    message = f"Hello {name}! Welcome to ZenML 🚀"
    print(message)
    return message
All step inputs and outputs must be type-hinted. This enables ZenML to validate data flow and choose appropriate materializers for serialization.

Step Parameters

The @step decorator accepts many configuration options:
from zenml import step
from zenml.config import ResourceSettings

@step(
    name="custom_step_name",              # Override default name
    enable_cache=True,                    # Enable caching (default)
    enable_artifact_metadata=True,        # Track metadata
    enable_artifact_visualization=True,   # Generate visualizations
    experiment_tracker="mlflow",          # Use specific experiment tracker
    step_operator="vertex",               # Run on specific infrastructure
    output_materializers=CustomMaterializer,  # Custom serialization
    settings={
        "resources": ResourceSettings(
            cpu_count=4,
            gpu_count=1,
            memory="16GB"
        )
    },
    retry=StepRetryConfig(max_retries=3),  # Retry on failure
    on_failure=handle_failure,             # Failure callback
    on_success=notify_success,             # Success callback
)
def training_step(data: pd.DataFrame, learning_rate: float) -> Any:
    # Train model
    model = train(data, lr=learning_rate)
    return model

Input and Output Types

Basic Types

Steps can accept and return Python primitives:
@step
def calculate_metrics(accuracy: float, precision: float) -> dict:
    """Calculate and return metrics dictionary."""
    return {
        "accuracy": accuracy,
        "precision": precision,
        "f1": 2 * (precision * accuracy) / (precision + accuracy)
    }

Complex Types

Steps support complex data types like pandas DataFrames, numpy arrays, and ML models:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier

@step
def train_model(
    X_train: pd.DataFrame,
    y_train: np.ndarray,
    n_estimators: int = 100
) -> RandomForestClassifier:
    """Train a random forest classifier."""
    model = RandomForestClassifier(n_estimators=n_estimators)
    model.fit(X_train, y_train)
    return model

Multiple Outputs

Steps can return multiple outputs using tuples with type annotations:
from typing import Tuple, Annotated
import pandas as pd

@step
def split_data(
    data: pd.DataFrame,
    test_size: float = 0.2
) -> Tuple[
    Annotated[pd.DataFrame, "train"],
    Annotated[pd.DataFrame, "test"]
]:
    """Split data into train and test sets."""
    split_idx = int(len(data) * (1 - test_size))
    return data[:split_idx], data[split_idx:]
Use Annotated to give meaningful names to outputs. These names become the artifact names in the dashboard.

Step Context

Access runtime information within a step using the step context:
from zenml import get_step_context, step

@step
def context_aware_step() -> dict:
    """Demonstrate accessing step context."""
    context = get_step_context()
    
    return {
        "step_name": context.step_name,
        "pipeline_name": context.pipeline_name,
        "run_name": context.run_name,
        "step_run_id": str(context.step_run.id),
    }

Accessing Model Context

from zenml import get_step_context, step

@step
def load_production_model():
    """Load the current production model."""
    context = get_step_context()
    
    # Access model version from context
    model_version = context.model
    
    # Load a specific artifact from the model
    model_artifact = model_version.load_artifact("trained_model")
    return model_artifact

Step Configuration

Runtime Configuration

Configure steps dynamically when building the pipeline:
@step
def configurable_step(param: str) -> str:
    return f"Processing: {param}"

@pipeline
def my_pipeline():
    # Configure step at runtime
    result = configurable_step.with_options(
        enable_cache=False,
        name="custom_configured_step"
    )(param="dynamic_value")

Resource Settings

Specify compute resources for a step:
from zenml.config import ResourceSettings

@step(
    settings={
        "resources": ResourceSettings(
            cpu_count=8,
            gpu_count=2,
            memory="32GB"
        )
    }
)
def gpu_training_step(data: pd.DataFrame) -> Any:
    """Train model on GPU with specified resources."""
    # This step will request 2 GPUs and 32GB RAM
    model = train_on_gpu(data)
    return model

Step Execution Modes

Steps can execute in different environments:

Local Execution

By default, steps run in the same environment as the pipeline:
@step
def local_step() -> str:
    """Runs locally in the pipeline environment."""
    return "Executed locally"

Step Operator

Offload specific steps to specialized infrastructure:
@step(step_operator="kubernetes")
def kubernetes_step(large_dataset: pd.DataFrame) -> Any:
    """This step runs on Kubernetes cluster."""
    # Heavy computation on distributed infrastructure
    result = process_at_scale(large_dataset)
    return result

Containerized Execution

Steps can run in isolated containers:
from zenml.config import DockerSettings

@step(
    settings={
        "docker": DockerSettings(
            requirements=["tensorflow==2.13.0"],
            parent_image="tensorflow/tensorflow:latest-gpu"
        )
    }
)
def containerized_step() -> Any:
    """Runs in a custom Docker container."""
    import tensorflow as tf
    # Use specific TensorFlow version
    return tf.__version__

Materializers

Materializers handle serialization and deserialization of step inputs and outputs:
from zenml import step
from zenml.materializers import BuiltInMaterializer

@step(output_materializers=BuiltInMaterializer)
def custom_serialization_step() -> CustomObject:
    """Use custom materializer for output."""
    return CustomObject()

Per-Output Materializers

Specify different materializers for multiple outputs:
from typing import Tuple, Annotated

@step(
    output_materializers={
        "model": PickleMaterializer,
        "metrics": JSONMaterializer
    }
)
def multi_output_step() -> Tuple[
    Annotated[Any, "model"],
    Annotated[dict, "metrics"]
]:
    model = train_model()
    metrics = evaluate_model(model)
    return model, metrics

Artifact Configuration

Configure how outputs are stored:
from typing import Annotated
from zenml.artifacts import ArtifactConfig
from zenml.enums import ArtifactType

@step
def configured_output_step() -> Annotated[
    Any,
    ArtifactConfig(
        name="trained_model",
        version="v1.0",
        tags=["production", "tensorflow"],
        artifact_type=ArtifactType.MODEL
    )
]:
    """Step with configured output artifact."""
    model = train_model()
    return model

Step Hooks

Execute code when steps succeed or fail:
def on_step_failure(exception: BaseException) -> None:
    """Handle step failure."""
    print(f"Step failed with error: {exception}")
    send_alert(f"Step failure: {exception}")

def on_step_success() -> None:
    """Handle step success."""
    print("Step completed successfully!")

@step(
    on_failure=on_step_failure,
    on_success=on_step_success
)
def monitored_step(data: pd.DataFrame) -> Any:
    """Step with failure and success hooks."""
    result = process_data(data)
    return result

Retry Configuration

Automatically retry steps on failure:
from zenml.config import StepRetryConfig

@step(
    retry=StepRetryConfig(
        max_retries=3,
        delay=60,  # Wait 60 seconds between retries
        backoff=2   # Multiply delay by 2 after each retry
    )
)
def unreliable_step() -> str:
    """Step that retries on failure."""
    # Might fail due to network issues, etc.
    response = call_external_api()
    return response

Step Groups

Organize related steps into logical groups:
from zenml import step
from zenml.config import GroupInfo

@step(group="preprocessing")
def load_data() -> pd.DataFrame:
    return pd.read_csv("data.csv")

@step(group="preprocessing")
def clean_data(data: pd.DataFrame) -> pd.DataFrame:
    return data.dropna()

@step(group="training")
def train_model(data: pd.DataFrame) -> Any:
    return train(data)

External Artifacts

Load artifacts that weren’t created by a previous step:
from zenml import step
from zenml.artifacts import ExternalArtifact
import pandas as pd

@step
def use_external_data(
    existing_data: pd.DataFrame,
) -> pd.DataFrame:
    """Use data loaded from external source."""
    return existing_data

@pipeline
def pipeline_with_external():
    # Load existing artifact by name
    result = use_external_data(
        existing_data=ExternalArtifact(name="preprocessed_data")
    )

Best Practices

Single Responsibility

Each step should do one thing well. Split complex logic into multiple steps.

Type Everything

Always use type hints for inputs and outputs. This enables validation and proper serialization.

Deterministic Steps

Steps should be deterministic for the same inputs. Avoid random seeds without control.

Small Interfaces

Keep step interfaces simple. Pass only what’s needed, not entire datasets when a path suffices.

Code Reference

  • Step decorator: src/zenml/steps/step_decorator.py:91
  • BaseStep class: src/zenml/steps/base_step.py:100
  • Step context: src/zenml/steps/step_context.py

Build docs developers (and LLMs) love