Steps are the fundamental building blocks of ZenML pipelines. Each step is a Python function decorated with @step that performs a specific task in your ML workflow. Steps can read inputs, perform computations, and produce outputs that are automatically tracked as artifacts.
from zenml import step@stepdef simple_step(name: str = "World") -> str: """A simple step that returns a greeting. Args: name: Name to include in greeting Returns: A personalized greeting message """ message = f"Hello {name}! Welcome to ZenML 🚀" print(message) return message
All step inputs and outputs must be type-hinted. This enables ZenML to validate data flow and choose appropriate materializers for serialization.
Steps support complex data types like pandas DataFrames, numpy arrays, and ML models:
import pandas as pdimport numpy as npfrom sklearn.ensemble import RandomForestClassifier@stepdef train_model( X_train: pd.DataFrame, y_train: np.ndarray, n_estimators: int = 100) -> RandomForestClassifier: """Train a random forest classifier.""" model = RandomForestClassifier(n_estimators=n_estimators) model.fit(X_train, y_train) return model
from zenml import get_step_context, step@stepdef load_production_model(): """Load the current production model.""" context = get_step_context() # Access model version from context model_version = context.model # Load a specific artifact from the model model_artifact = model_version.load_artifact("trained_model") return model_artifact
from zenml.config import ResourceSettings@step( settings={ "resources": ResourceSettings( cpu_count=8, gpu_count=2, memory="32GB" ) })def gpu_training_step(data: pd.DataFrame) -> Any: """Train model on GPU with specified resources.""" # This step will request 2 GPUs and 32GB RAM model = train_on_gpu(data) return model
Offload specific steps to specialized infrastructure:
@step(step_operator="kubernetes")def kubernetes_step(large_dataset: pd.DataFrame) -> Any: """This step runs on Kubernetes cluster.""" # Heavy computation on distributed infrastructure result = process_at_scale(large_dataset) return result
from zenml.config import DockerSettings@step( settings={ "docker": DockerSettings( requirements=["tensorflow==2.13.0"], parent_image="tensorflow/tensorflow:latest-gpu" ) })def containerized_step() -> Any: """Runs in a custom Docker container.""" import tensorflow as tf # Use specific TensorFlow version return tf.__version__
from zenml.config import StepRetryConfig@step( retry=StepRetryConfig( max_retries=3, delay=60, # Wait 60 seconds between retries backoff=2 # Multiply delay by 2 after each retry ))def unreliable_step() -> str: """Step that retries on failure.""" # Might fail due to network issues, etc. response = call_external_api() return response
Load artifacts that weren’t created by a previous step:
from zenml import stepfrom zenml.artifacts import ExternalArtifactimport pandas as pd@stepdef use_external_data( existing_data: pd.DataFrame,) -> pd.DataFrame: """Use data loaded from external source.""" return existing_data@pipelinedef pipeline_with_external(): # Load existing artifact by name result = use_external_data( existing_data=ExternalArtifact(name="preprocessed_data") )