Overview
The BaseStep is the fundamental building block of ZenML pipelines. Each step represents a discrete unit of work that processes inputs and produces outputs. Steps can be chained together to form complex ML workflows.
Class Definition
from zenml.steps import BaseStep
Inheritance
Abstract base class (ABC)
Subclassed by user-defined step classes or created via the @step decorator
Creating Steps
There are two ways to create steps in ZenML:
Using the @step Decorator (Recommended)
from zenml import step
@step
def my_step ( input_data : int ) -> int :
return input_data * 2
Subclassing BaseStep
from zenml.steps import BaseStep
class MyStep ( BaseStep ):
def entrypoint ( self , input_data : int ) -> int :
return input_data * 2
Initialization Parameters
The name of the step. If not provided, defaults to the class name.
Whether to enable caching for this step. Cached steps skip execution if inputs haven’t changed.
Whether to enable artifact metadata collection for this step.
enable_artifact_visualization
Whether to enable automatic artifact visualization.
Whether to enable step logs.
Name of the experiment tracker to use, or False to disable.
Name of the step operator to use for executing this step, or False to disable.
Function parameters to pass to the step.
output_materializers
OutputMaterializersSpecification
Materializers to use for step outputs. Can be a single materializer, a list, or a dict mapping output names to materializers.
Environment variables to set when running this step.
Secrets to make available as environment variables during step execution.
settings
Mapping[str, SettingsOrDict]
Settings for this step, such as resource requirements.
Extra configurations for this step.
Callback function to run if the step fails. Receives the exception as an argument.
Callback function to run if the step succeeds.
Model configuration for the Model Control Plane.
Configuration for retrying the step on failure.
Cache policy for this step.
The step runtime (inline, isolated, etc.). Only applicable for dynamic pipelines.
heartbeat_healthy_threshold
Time in minutes before a step without heartbeat is considered unhealthy.
Group information for organizing related steps.
Abstract Method
entrypoint
@abstractmethod
def entrypoint ( self , * args : Any, ** kwargs : Any) -> Any:
"""The core logic of the step."""
Positional arguments passed to the step.
Keyword arguments passed to the step.
The output(s) of the step. Can be a single value, tuple of values, or dict.
When using the @step decorator, the decorated function becomes the entrypoint method.
Core Methods
def configure (
self ,
enable_cache : Optional[ bool ] = None ,
enable_artifact_metadata : Optional[ bool ] = None ,
# ... other parameters ...
merge : bool = True ,
) -> Self:
"""Configures the step."""
Configure step behavior and settings. All parameters are the same as initialization parameters.
If True, merges with existing configuration. If False, overwrites it.
Returns self for method chaining.
Example:
step_instance.configure(
enable_cache = False ,
parameters = { "threshold" : 0.95 },
settings = { "resources" : { "cpu_count" : 4 }}
)
with_options
def with_options (
self ,
enable_cache : Optional[ bool ] = None ,
# ... other parameters ...
merge : bool = True ,
) -> BaseStep:
"""Creates a copy of the step with new configuration."""
A new step instance with the updated configuration.
Example:
# Original step
original = my_step( data = input_data)
# Create variant with different settings
variant = my_step.with_options(
enable_cache = False ,
name = "my_step_no_cache"
)( data = input_data)
call_entrypoint
def call_entrypoint ( self , * args : Any, ** kwargs : Any) -> Any:
"""Directly calls the step's entrypoint function."""
The return value of the entrypoint function.
This method bypasses the normal ZenML pipeline execution flow and should only be used for testing.
Dynamic Pipeline Methods
These methods are only available within dynamic pipelines.
submit
def submit (
self ,
* args : Any,
id : Optional[ str ] = None ,
after : Union[AnyStepFuture, Sequence[AnyStepFuture], None ] = None ,
** kwargs : Any,
) -> StepFuture:
"""Submits the step to run concurrently."""
Custom invocation ID for this step instance.
after
AnyStepFuture | Sequence[AnyStepFuture]
Step futures to wait for before executing this step.
A future representing the submitted step execution.
Example:
from zenml import pipeline, step
@step
def process_data ( data : int ) -> int :
return data * 2
@pipeline ( enable_cache = False )
def dynamic_pipeline ():
# Submit multiple steps concurrently
future1 = process_data.submit( data = 1 )
future2 = process_data.submit( data = 2 )
# Results will be computed in parallel
map
def map (
self ,
* args : Any,
after : Union[AnyStepFuture, Sequence[AnyStepFuture], None ] = None ,
** kwargs : Any,
) -> MapResultsFuture:
"""Maps the step over input sequences."""
A future representing the mapped step executions.
Example:
from zenml import pipeline, step
@step
def create_data () -> list[ int ]:
return [ 1 , 2 , 3 ]
@step
def process ( value : int ) -> int :
return value * 2
@pipeline ( enable_cache = False )
def map_pipeline ():
data = create_data()
# Launches 3 separate process steps
results = process.map( value = data)
product
def product (
self ,
* args : Any,
after : Union[AnyStepFuture, Sequence[AnyStepFuture], None ] = None ,
** kwargs : Any,
) -> MapResultsFuture:
"""Maps over the cartesian product of inputs."""
A future representing the product-mapped step executions.
Example:
@step
def create_numbers () -> list[ int ]:
return [ 1 , 2 ]
@step
def create_letters () -> list[ str ]:
return [ "a" , "b" ]
@step
def combine ( num : int , letter : str ) -> str :
return f " { num }{ letter } "
@pipeline ( enable_cache = False )
def product_pipeline ():
numbers = create_numbers()
letters = create_letters()
# Launches 4 steps: (1,a), (1,b), (2,a), (2,b)
results = combine.product( num = numbers, letter = letters)
Properties
name
@ property
def name ( self ) -> str :
"""The name of the step."""
enable_cache
@ property
def enable_cache ( self ) -> Optional[ bool ]:
"""Whether caching is enabled for this step."""
configuration
@ property
def configuration ( self ) -> PartialStepConfiguration:
"""The configuration of the step."""
source_code
@ property
def source_code ( self ) -> str :
"""The source code of this step."""
docstring
@ property
def docstring ( self ) -> Optional[ str ]:
"""The docstring of this step."""
Complete Example
from typing import Tuple
from zenml import pipeline, step
from zenml.config.retry_config import StepRetryConfig
# Define a preprocessing step
@step (
enable_cache = True ,
settings = { "resources" : { "cpu_count" : 2 }}
)
def load_data ( data_path : str ) -> pd.DataFrame:
"""Load data from a CSV file."""
return pd.read_csv(data_path)
# Define a training step with retry logic
@step (
retry = StepRetryConfig( max_retries = 3 , delay = 60 ),
enable_artifact_metadata = True
)
def train_model (
data : pd.DataFrame,
learning_rate : float = 0.01
) -> Tuple[Any, float ]:
"""Train a machine learning model."""
# Training logic
model = ...
accuracy = ...
return model, accuracy
# Use in a pipeline
@pipeline
def training_pipeline ( data_path : str , lr : float = 0.01 ):
data = load_data( data_path = data_path)
model, accuracy = train_model(
data = data,
learning_rate = lr
)
# Configure step at runtime
custom_step = train_model.with_options(
parameters = { "learning_rate" : 0.001 },
settings = { "resources" : { "gpu_count" : 1 }}
)
@pipeline
def custom_pipeline ( data_path : str ):
data = load_data( data_path = data_path)
model, accuracy = custom_step( data = data)
Step Hooks
Steps support failure and success hooks:
def on_failure_handler ( exception : BaseException ) -> None :
"""Called when step fails."""
print ( f "Step failed with: { exception } " )
# Send notification, log error, etc.
def on_success_handler () -> None :
"""Called when step succeeds."""
print ( "Step completed successfully!" )
@step (
on_failure = on_failure_handler,
on_success = on_success_handler
)
def monitored_step () -> str :
return "Success!"
See Also
BaseOrchestrator Learn about orchestrators that execute steps
Stack Understand how steps run on stacks
Steps Guide Guide to creating and using steps
Caching Learn about step caching