The @pipeline decorator transforms a Python function into a ZenML pipeline that orchestrates multiple steps.
Signature
@pipeline (
name: Optional[ str ] = None ,
dynamic: Optional[ bool ] = None ,
enable_cache: Optional[ bool ] = None ,
enable_artifact_metadata: Optional[ bool ] = None ,
enable_step_logs: Optional[ bool ] = None ,
enable_heartbeat: Optional[ bool ] = None ,
enable_pipeline_logs: Optional[ bool ] = None ,
environment: Optional[Dict[ str , Any]] = None ,
secrets: Optional[List[Union[ UUID , str ]]] = None ,
settings: Optional[Dict[ str , SettingsOrDict]] = None ,
tags: Optional[List[Union[ str , Tag]]] = None ,
extra: Optional[Dict[ str , Any]] = None ,
on_failure: Optional[HookSpecification] = None ,
on_success: Optional[HookSpecification] = None ,
on_init: Optional[InitHookSpecification] = None ,
on_init_kwargs: Optional[Dict[ str , Any]] = None ,
on_cleanup: Optional[HookSpecification] = None ,
model: Optional[Model] = None ,
retry: Optional[StepRetryConfig] = None ,
substitutions: Optional[Dict[ str , str ]] = None ,
execution_mode: Optional[ExecutionMode] = None ,
cache_policy: Optional[CachePolicyOrString] = None ,
) -> Pipeline
Parameters
The name of the pipeline. If not provided, the function name is used.
Whether this is a dynamic pipeline that can create steps at runtime.
Whether to use caching for pipeline steps. Defaults to True.
Whether to enable metadata extraction for artifacts. Defaults to True.
Whether to enable step logs.
Whether to enable heartbeat monitoring for steps.
Whether to enable pipeline logs.
Environment variables to set when running this pipeline.
Secrets to set as environment variables.
settings
Dict[str, SettingsOrDict]
Stack component settings for this pipeline.
Tags to apply to pipeline runs.
Extra configurations for this pipeline.
Callback function or source path to execute on pipeline failure.
Callback function or source path to execute on pipeline success.
Callback function to run on pipeline initialization.
Arguments for the init hook.
Callback function to run on pipeline cleanup.
Model configuration for the Model Control Plane.
Retry configuration for pipeline steps.
Extra substitutions for name placeholders like {date} and {time}.
The execution mode to use for the pipeline.
Cache policy for this pipeline.
Returns
A configured Pipeline instance that can be called to run the pipeline.
Examples
Basic Pipeline
from zenml import pipeline, step
@step
def load_data () -> dict :
return { "data" : [ 1 , 2 , 3 ]}
@step
def process_data ( data : dict ) -> dict :
return { "processed" : data[ "data" ]}
@pipeline
def ml_pipeline ():
data = load_data()
process_data(data)
# Run the pipeline
ml_pipeline()
Pipeline with Configuration
from zenml import pipeline, step, Model
@pipeline (
name = "training_pipeline" ,
enable_cache = False ,
tags = [ "training" , "production" ],
model = Model(
name = "my_model" ,
version = "v1"
)
)
def training_pipeline ():
data = load_data()
train_model(data)
Dynamic Pipeline
from zenml import pipeline, step
@step
def dynamic_step ( x : int ) -> int :
return x * 2
@pipeline ( dynamic = True )
def dynamic_pipeline ( n : int ):
# Create steps dynamically based on input
for i in range (n):
dynamic_step( x = i)
@step Learn about creating pipeline steps
Model Configure model versioning
get_pipeline_context Access pipeline context
Schedule Schedule pipeline runs