Skip to main content
The @step decorator transforms a Python function into a ZenML step that can be used in pipelines.

Signature

@step(
    name: Optional[str] = None,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    enable_artifact_visualization: Optional[bool] = None,
    enable_step_logs: Optional[bool] = None,
    experiment_tracker: Optional[Union[bool, str]] = None,
    step_operator: Optional[Union[bool, str]] = None,
    output_materializers: Optional[OutputMaterializersSpecification] = None,
    environment: Optional[Dict[str, Any]] = None,
    secrets: Optional[List[Union[UUID, str]]] = None,
    settings: Optional[Dict[str, SettingsOrDict]] = None,
    extra: Optional[Dict[str, Any]] = None,
    on_failure: Optional[HookSpecification] = None,
    on_success: Optional[HookSpecification] = None,
    model: Optional[Model] = None,
    retry: Optional[StepRetryConfig] = None,
    substitutions: Optional[Dict[str, str]] = None,
    cache_policy: Optional[CachePolicyOrString] = None,
    runtime: Optional[StepRuntime] = None,
    heartbeat_healthy_threshold: Optional[int] = None,
    group: Optional[Union[GroupInfo, str]] = None,
) -> BaseStep

Parameters

name
str
The name of the step. If not provided, the function name is used.
enable_cache
bool
Whether caching is enabled for this step. Defaults to True.
enable_artifact_metadata
bool
Whether to extract and store metadata for artifacts. Defaults to True.
enable_artifact_visualization
bool
Whether to generate visualizations for artifacts. Defaults to True.
enable_step_logs
bool
Whether step logs should be enabled.
experiment_tracker
Union[bool, str]
The experiment tracker to use for this step.
step_operator
Union[bool, str]
The step operator to use for running this step.
output_materializers
OutputMaterializersSpecification
Custom materializers for step outputs. Can be a single materializer class, a list, or a dict mapping output names to materializers.
environment
Dict[str, Any]
Environment variables to set when running this step.
secrets
List[Union[UUID, str]]
Secrets to set as environment variables.
settings
Dict[str, SettingsOrDict]
Stack component settings for this step.
extra
Dict[str, Any]
Extra configurations for this step.
on_failure
HookSpecification
Callback function to execute on step failure.
on_success
HookSpecification
Callback function to execute on step success.
model
Model
Model configuration for the Model Control Plane.
retry
StepRetryConfig
Retry configuration in case of step failure.
substitutions
Dict[str, str]
Extra substitutions for model and artifact name placeholders.
cache_policy
CachePolicyOrString
Cache policy for this step.
runtime
StepRuntime
The step runtime. If not configured, the step runs inline unless a step operator or docker/resource settings are configured.
heartbeat_healthy_threshold
int
Time in minutes that a running step can go without heartbeat before being considered unhealthy. Defaults to 30 minutes.
group
Union[GroupInfo, str]
Group information for this step.

Returns

step
BaseStep
A configured step instance that can be used in pipelines.

Examples

Basic Step

from zenml import step
import pandas as pd

@step
def load_data() -> pd.DataFrame:
    """Load data from a source."""
    return pd.read_csv("data.csv")

@step
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
    """Clean the input data."""
    return df.dropna()

Step with Type Annotations

from typing import Annotated
from zenml import step, ArtifactConfig
import pandas as pd

@step
def process_data(
    df: pd.DataFrame
) -> Annotated[
    pd.DataFrame,
    ArtifactConfig(name="processed_data", tags=["clean"])
]:
    """Process and clean data."""
    return df.dropna().reset_index(drop=True)

Step with Configuration

from zenml import step, Model

@step(
    name="model_trainer",
    enable_cache=False,
    experiment_tracker="mlflow",
    model=Model(name="my_model", version="1.0.0")
)
def train_model(X_train: pd.DataFrame, y_train: pd.Series) -> Any:
    """Train a machine learning model."""
    from sklearn.ensemble import RandomForestClassifier
    model = RandomForestClassifier()
    model.fit(X_train, y_train)
    return model

Step with Resource Settings

from zenml import step
from zenml.config import ResourceSettings

@step(
    settings={
        "resources": ResourceSettings(
            cpu_count=4,
            memory="8GB",
            gpu_count=1
        )
    }
)
def train_deep_model(data: pd.DataFrame) -> Any:
    """Train a model with GPU acceleration."""
    # Training code here
    pass

@pipeline

Learn about creating pipelines

get_step_context

Access step context

ArtifactConfig

Configure step outputs

ResourceSettings

Configure step resources

Build docs developers (and LLMs) love