Skip to main content

Overview

The BaseStep is the fundamental building block of ZenML pipelines. Each step represents a discrete unit of work that processes inputs and produces outputs. Steps can be chained together to form complex ML workflows.

Class Definition

from zenml.steps import BaseStep

Inheritance

  • Abstract base class (ABC)
  • Subclassed by user-defined step classes or created via the @step decorator

Creating Steps

There are two ways to create steps in ZenML:
from zenml import step

@step
def my_step(input_data: int) -> int:
    return input_data * 2

Subclassing BaseStep

from zenml.steps import BaseStep

class MyStep(BaseStep):
    def entrypoint(self, input_data: int) -> int:
        return input_data * 2

Initialization Parameters

name
str
The name of the step. If not provided, defaults to the class name.
enable_cache
bool
Whether to enable caching for this step. Cached steps skip execution if inputs haven’t changed.
enable_artifact_metadata
bool
Whether to enable artifact metadata collection for this step.
enable_artifact_visualization
bool
Whether to enable automatic artifact visualization.
enable_step_logs
bool
Whether to enable step logs.
experiment_tracker
str | bool
Name of the experiment tracker to use, or False to disable.
step_operator
str | bool
Name of the step operator to use for executing this step, or False to disable.
parameters
Dict[str, Any]
Function parameters to pass to the step.
output_materializers
OutputMaterializersSpecification
Materializers to use for step outputs. Can be a single materializer, a list, or a dict mapping output names to materializers.
environment
Dict[str, Any]
Environment variables to set when running this step.
secrets
List[str | UUID]
Secrets to make available as environment variables during step execution.
settings
Mapping[str, SettingsOrDict]
Settings for this step, such as resource requirements.
extra
Dict[str, Any]
Extra configurations for this step.
on_failure
HookSpecification
Callback function to run if the step fails. Receives the exception as an argument.
on_success
HookSpecification
Callback function to run if the step succeeds.
model
Model
Model configuration for the Model Control Plane.
retry
StepRetryConfig
Configuration for retrying the step on failure.
cache_policy
CachePolicyOrString
Cache policy for this step.
runtime
StepRuntime
The step runtime (inline, isolated, etc.). Only applicable for dynamic pipelines.
heartbeat_healthy_threshold
int
default:"30"
Time in minutes before a step without heartbeat is considered unhealthy.
group
GroupInfo | str
Group information for organizing related steps.

Abstract Method

entrypoint

@abstractmethod
def entrypoint(self, *args: Any, **kwargs: Any) -> Any:
    """The core logic of the step."""
*args
Any
Positional arguments passed to the step.
**kwargs
Any
Keyword arguments passed to the step.
return
Any
The output(s) of the step. Can be a single value, tuple of values, or dict.
When using the @step decorator, the decorated function becomes the entrypoint method.

Core Methods

configure

def configure(
    self,
    enable_cache: Optional[bool] = None,
    enable_artifact_metadata: Optional[bool] = None,
    # ... other parameters ...
    merge: bool = True,
) -> Self:
    """Configures the step."""
Configure step behavior and settings. All parameters are the same as initialization parameters.
merge
bool
default:"True"
If True, merges with existing configuration. If False, overwrites it.
return
BaseStep
Returns self for method chaining.
Example:
step_instance.configure(
    enable_cache=False,
    parameters={"threshold": 0.95},
    settings={"resources": {"cpu_count": 4}}
)

with_options

def with_options(
    self,
    enable_cache: Optional[bool] = None,
    # ... other parameters ...
    merge: bool = True,
) -> BaseStep:
    """Creates a copy of the step with new configuration."""
return
BaseStep
A new step instance with the updated configuration.
Example:
# Original step
original = my_step(data=input_data)

# Create variant with different settings
variant = my_step.with_options(
    enable_cache=False,
    name="my_step_no_cache"
)(data=input_data)

call_entrypoint

def call_entrypoint(self, *args: Any, **kwargs: Any) -> Any:
    """Directly calls the step's entrypoint function."""
return
Any
The return value of the entrypoint function.
This method bypasses the normal ZenML pipeline execution flow and should only be used for testing.

Dynamic Pipeline Methods

These methods are only available within dynamic pipelines.

submit

def submit(
    self,
    *args: Any,
    id: Optional[str] = None,
    after: Union[AnyStepFuture, Sequence[AnyStepFuture], None] = None,
    **kwargs: Any,
) -> StepFuture:
    """Submits the step to run concurrently."""
id
str
Custom invocation ID for this step instance.
after
AnyStepFuture | Sequence[AnyStepFuture]
Step futures to wait for before executing this step.
return
StepFuture
A future representing the submitted step execution.
Example:
from zenml import pipeline, step

@step
def process_data(data: int) -> int:
    return data * 2

@pipeline(enable_cache=False)
def dynamic_pipeline():
    # Submit multiple steps concurrently
    future1 = process_data.submit(data=1)
    future2 = process_data.submit(data=2)
    
    # Results will be computed in parallel

map

def map(
    self,
    *args: Any,
    after: Union[AnyStepFuture, Sequence[AnyStepFuture], None] = None,
    **kwargs: Any,
) -> MapResultsFuture:
    """Maps the step over input sequences."""
return
MapResultsFuture
A future representing the mapped step executions.
Example:
from zenml import pipeline, step

@step
def create_data() -> list[int]:
    return [1, 2, 3]

@step
def process(value: int) -> int:
    return value * 2

@pipeline(enable_cache=False)
def map_pipeline():
    data = create_data()
    # Launches 3 separate process steps
    results = process.map(value=data)

product

def product(
    self,
    *args: Any,
    after: Union[AnyStepFuture, Sequence[AnyStepFuture], None] = None,
    **kwargs: Any,
) -> MapResultsFuture:
    """Maps over the cartesian product of inputs."""
return
MapResultsFuture
A future representing the product-mapped step executions.
Example:
@step
def create_numbers() -> list[int]:
    return [1, 2]

@step
def create_letters() -> list[str]:
    return ["a", "b"]

@step
def combine(num: int, letter: str) -> str:
    return f"{num}{letter}"

@pipeline(enable_cache=False)
def product_pipeline():
    numbers = create_numbers()
    letters = create_letters()
    # Launches 4 steps: (1,a), (1,b), (2,a), (2,b)
    results = combine.product(num=numbers, letter=letters)

Properties

name

@property
def name(self) -> str:
    """The name of the step."""

enable_cache

@property
def enable_cache(self) -> Optional[bool]:
    """Whether caching is enabled for this step."""

configuration

@property
def configuration(self) -> PartialStepConfiguration:
    """The configuration of the step."""

source_code

@property
def source_code(self) -> str:
    """The source code of this step."""

docstring

@property
def docstring(self) -> Optional[str]:
    """The docstring of this step."""

Complete Example

from typing import Tuple
from zenml import pipeline, step
from zenml.config.retry_config import StepRetryConfig

# Define a preprocessing step
@step(
    enable_cache=True,
    settings={"resources": {"cpu_count": 2}}
)
def load_data(data_path: str) -> pd.DataFrame:
    """Load data from a CSV file."""
    return pd.read_csv(data_path)

# Define a training step with retry logic
@step(
    retry=StepRetryConfig(max_retries=3, delay=60),
    enable_artifact_metadata=True
)
def train_model(
    data: pd.DataFrame,
    learning_rate: float = 0.01
) -> Tuple[Any, float]:
    """Train a machine learning model."""
    # Training logic
    model = ...
    accuracy = ...
    return model, accuracy

# Use in a pipeline
@pipeline
def training_pipeline(data_path: str, lr: float = 0.01):
    data = load_data(data_path=data_path)
    model, accuracy = train_model(
        data=data,
        learning_rate=lr
    )

# Configure step at runtime
custom_step = train_model.with_options(
    parameters={"learning_rate": 0.001},
    settings={"resources": {"gpu_count": 1}}
)

@pipeline
def custom_pipeline(data_path: str):
    data = load_data(data_path=data_path)
    model, accuracy = custom_step(data=data)

Step Hooks

Steps support failure and success hooks:
def on_failure_handler(exception: BaseException) -> None:
    """Called when step fails."""
    print(f"Step failed with: {exception}")
    # Send notification, log error, etc.

def on_success_handler() -> None:
    """Called when step succeeds."""
    print("Step completed successfully!")

@step(
    on_failure=on_failure_handler,
    on_success=on_success_handler
)
def monitored_step() -> str:
    return "Success!"

See Also

BaseOrchestrator

Learn about orchestrators that execute steps

Stack

Understand how steps run on stacks

Steps Guide

Guide to creating and using steps

Caching

Learn about step caching

Build docs developers (and LLMs) love