Skip to main content

Step Operators

Step operators allow you to run individual pipeline steps on custom infrastructure. While an orchestrator defines how and where your entire pipeline runs, a step operator defines how and where a specific step runs.

Overview

Step operators are useful when:
  • A specific step needs GPU resources (e.g., model training)
  • A step requires more compute resources than others
  • You want to run a step on different infrastructure (e.g., serverless)
  • A step has special requirements (e.g., specific hardware, libraries)

How Step Operators Work

When a pipeline runs:
  1. The orchestrator executes most steps normally
  2. When a step with a step operator is reached:
    • The orchestrator hands off execution to the step operator
    • The step operator runs the step on its configured infrastructure
    • Results are returned to the orchestrator
  3. The orchestrator continues with the next step
This allows fine-grained control over where each step executes.

Available Step Operators

Kubernetes Step Operator

Runs individual steps as Kubernetes jobs. Installation:
zenml integration install kubernetes
Configuration:
zenml step-operator register k8s_step_operator --flavor=kubernetes \
  --kubernetes_context=my-context \
  --kubernetes_namespace=zenml
Requirements:
  • Kubernetes cluster access
  • Container registry in your stack
  • Configured kubectl context
Use cases:
  • Running GPU-intensive training steps
  • Steps requiring specific node pools
  • Isolated execution environments
  • Resource-intensive data processing
Example:
from zenml import step, pipeline
from zenml.config import ResourceSettings

@step(
    step_operator="k8s_step_operator",
    settings={
        "resources": ResourceSettings(
            cpu_count=8,
            memory="32GB",
            gpu_count=1,
        )
    },
)
def gpu_training_step(data) -> Model:
    # This step runs on Kubernetes with GPU
    model = train_on_gpu(data)
    return model

@step
def preprocessing_step() -> Data:
    # This step runs on the orchestrator
    data = load_and_preprocess()
    return data

@pipeline
def training_pipeline():
    data = preprocessing_step()
    model = gpu_training_step(data)

SageMaker Step Operator

Runs steps on AWS SageMaker Training Jobs. Installation:
zenml integration install aws
Configuration:
zenml step-operator register sagemaker_step_operator --flavor=sagemaker \
  --role=arn:aws:iam::123456789012:role/ZenMLSageMaker \
  --instance_type=ml.p3.2xlarge
Requirements:
  • AWS account with SageMaker access
  • IAM role with SageMaker permissions
  • Container registry (ECR)
  • S3 artifact store
Features:
  • Managed infrastructure
  • Wide range of instance types
  • Spot instance support
  • Built-in monitoring
  • Auto-scaling capabilities
Use cases:
  • AWS-based ML infrastructure
  • GPU/TPU training jobs
  • Large-scale model training
  • Cost optimization with spot instances
Example:
from zenml import step
from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import (
    SageMakerStepOperatorSettings,
)

sagemaker_settings = SageMakerStepOperatorSettings(
    instance_type="ml.p3.2xlarge",
    max_runtime_in_seconds=3600,
    use_spot_instances=True,
)

@step(
    step_operator="sagemaker_step_operator",
    settings={"step_operator.sagemaker": sagemaker_settings},
)
def train_on_sagemaker(data) -> Model:
    # Runs on SageMaker training instance
    model = train_large_model(data)
    return model

Vertex AI Step Operator

Runs steps on Google Cloud Vertex AI Custom Jobs. Installation:
zenml integration install gcp
Configuration:
zenml step-operator register vertex_step_operator --flavor=vertex \
  --project=my-gcp-project \
  --location=us-central1 \
  --machine_type=n1-standard-4 \
  --accelerator_type=NVIDIA_TESLA_T4 \
  --accelerator_count=1
Requirements:
  • GCP project with Vertex AI enabled
  • Service account with Vertex AI permissions
  • Container registry (GCR/Artifact Registry)
  • GCS artifact store
Features:
  • Managed ML infrastructure
  • GPU and TPU support
  • Custom machine types
  • Pre-configured ML containers
  • Integration with Vertex AI ecosystem
Use cases:
  • GCP-based ML workflows
  • GPU/TPU training
  • Large-scale distributed training
  • Vertex AI platform integration
Example:
from zenml import step
from zenml.integrations.gcp.flavors.vertex_step_operator_flavor import (
    VertexStepOperatorSettings,
)

vertex_settings = VertexStepOperatorSettings(
    machine_type="n1-standard-8",
    accelerator_type="NVIDIA_TESLA_V100",
    accelerator_count=2,
)

@step(
    step_operator="vertex_step_operator",
    settings={"step_operator.vertex": vertex_settings},
)
def train_on_vertex(data) -> Model:
    # Runs on Vertex AI with 2 V100 GPUs
    model = distributed_training(data)
    return model

Azure ML Step Operator

Runs steps on Azure Machine Learning Compute. Installation:
zenml integration install azure
Configuration:
zenml step-operator register azureml_step_operator --flavor=azureml \
  --subscription_id=<subscription-id> \
  --resource_group=<resource-group> \
  --workspace_name=<workspace-name> \
  --compute_target_name=gpu-cluster
Requirements:
  • Azure subscription
  • Azure ML workspace
  • Compute cluster or compute instance
  • Azure Container Registry
  • Azure Blob Storage artifact store
Features:
  • Managed compute resources
  • Auto-scaling clusters
  • GPU and CPU options
  • Cost management
  • Integration with Azure ML
Use cases:
  • Azure-based infrastructure
  • Enterprise Azure deployments
  • GPU training on Azure
  • Azure ML ecosystem integration
Runs steps on Modal’s serverless infrastructure. Installation:
zenml integration install modal
Configuration:
zenml step-operator register modal_step_operator --flavor=modal
Authentication:
modal token new
Features:
  • Serverless execution
  • Pay-per-use pricing
  • Fast cold starts
  • GPU support
  • Automatic scaling
Use cases:
  • Serverless ML workflows
  • Sporadic GPU needs
  • Cost optimization
  • Quick experimentation
Example:
from zenml import step
from zenml.integrations.modal.flavors.modal_step_operator_flavor import (
    ModalStepOperatorSettings,
)

modal_settings = ModalStepOperatorSettings(
    gpu="any",  # Request any available GPU
    cpu=4.0,
    memory=16384,  # MB
)

@step(
    step_operator="modal_step_operator",
    settings={"step_operator.modal": modal_settings},
)
def train_on_modal(data) -> Model:
    # Runs serverless on Modal with GPU
    model = train(data)
    return model

Choosing a Step Operator

Step OperatorBest ForPricing ModelGPU Support
KubernetesSelf-hosted, flexibilityInfrastructure costYes (if cluster has GPUs)
SageMakerAWS infrastructurePer-second billingYes (wide selection)
Vertex AIGCP infrastructurePer-second billingYes (GPUs and TPUs)
Azure MLAzure infrastructurePer-minute billingYes (various SKUs)
ModalServerless, experimentationPay-per-useYes (on-demand)

Resource Configuration

Specifying Resources

from zenml import step
from zenml.config import ResourceSettings

@step(
    step_operator="k8s_step_operator",
    settings={
        "resources": ResourceSettings(
            cpu_count=8,
            memory="32GB",
            gpu_count=2,
        )
    },
)
def resource_intensive_step(data) -> Result:
    # This step gets the specified resources
    result = process_with_resources(data)
    return result

Cloud-Specific Resources

SageMaker:
from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import (
    SageMakerStepOperatorSettings,
)

settings = SageMakerStepOperatorSettings(
    instance_type="ml.p3.8xlarge",  # 4 V100 GPUs
    max_runtime_in_seconds=7200,
    volume_size_in_gb=100,
    use_spot_instances=True,
    max_wait_time_in_seconds=86400,
)
Vertex AI:
from zenml.integrations.gcp.flavors.vertex_step_operator_flavor import (
    VertexStepOperatorSettings,
)

settings = VertexStepOperatorSettings(
    machine_type="n1-highmem-8",
    accelerator_type="NVIDIA_TESLA_A100",
    accelerator_count=1,
    disk_size_gb=200,
)

Mixed Infrastructure Pipelines

Combine different execution environments:
from zenml import step, pipeline

@step  # Runs on orchestrator
def load_data() -> Data:
    return load_from_database()

@step  # Runs on orchestrator
def preprocess(data: Data) -> Data:
    return clean_and_transform(data)

@step(step_operator="sagemaker_step_operator")  # Runs on SageMaker
def train_model(data: Data) -> Model:
    return train_large_model(data)

@step(step_operator="modal_step_operator")  # Runs on Modal
def generate_explanations(model: Model, data: Data) -> Explanations:
    return explain_model(model, data)

@step  # Runs on orchestrator
def save_results(model: Model, explanations: Explanations) -> None:
    save_to_registry(model)
    save_to_database(explanations)

@pipeline
def mixed_pipeline():
    data = load_data()
    processed = preprocess(data)
    model = train_model(processed)  # On SageMaker
    explanations = generate_explanations(model, processed)  # On Modal
    save_results(model, explanations)

Best Practices

Use Step Operators for Resource-Intensive Steps

# Good: GPU training on step operator
@step(step_operator="sagemaker_step_operator")
def train_deep_learning_model(data) -> Model:
    return train_on_gpu(data)

# Bad: Simple data loading on step operator (unnecessary overhead)
@step(step_operator="sagemaker_step_operator")
def load_csv() -> Data:
    return pd.read_csv("data.csv")

Minimize Data Transfer

# Good: Process data close to where it's stored
@step(step_operator="vertex_step_operator")
def process_large_dataset() -> ProcessedData:
    # Load from GCS (close to Vertex AI)
    data = load_from_gcs()
    return process(data)

# Consider: May involve unnecessary data transfer
@step(step_operator="vertex_step_operator")
def process_small_preprocessed_data(data: SmallData) -> Result:
    # Small data, might not need step operator
    return simple_transform(data)

Configure Timeouts

from zenml.integrations.aws.flavors.sagemaker_step_operator_flavor import (
    SageMakerStepOperatorSettings,
)

# Set appropriate timeouts
settings = SageMakerStepOperatorSettings(
    max_runtime_in_seconds=3600,  # 1 hour max
    max_wait_time_in_seconds=7200,  # For spot instances
)

Use Spot/Preemptible Instances

# SageMaker spot instances (up to 90% savings)
sagemaker_settings = SageMakerStepOperatorSettings(
    instance_type="ml.p3.2xlarge",
    use_spot_instances=True,
    max_wait_time_in_seconds=3600,
)

# GCP preemptible instances
vertex_settings = VertexStepOperatorSettings(
    machine_type="n1-standard-8",
    use_preemptible=True,
)

Monitoring Step Execution

Check Step Status

from zenml.client import Client

client = Client()
run = client.get_pipeline_run("training_pipeline", "run_name")

for step_name, step in run.steps.items():
    print(f"Step: {step_name}")
    print(f"Status: {step.status}")
    print(f"Duration: {step.duration}")
    
    # Check if step used a step operator
    if step.config.step_operator:
        print(f"Step Operator: {step.config.step_operator}")

Cloud Console Monitoring

SageMaker:
  • AWS Console → SageMaker → Training jobs
  • View logs in CloudWatch
Vertex AI:
  • GCP Console → Vertex AI → Custom Jobs
  • View logs in Cloud Logging
Azure ML:
  • Azure Portal → Machine Learning → Experiments
  • View logs in workspace

Troubleshooting

Step Operator Not Found

# List registered step operators
zenml step-operator list

# Describe specific step operator
zenml step-operator describe k8s_step_operator

Permission Errors

# SageMaker: Check IAM role
aws iam get-role --role-name ZenMLSageMaker

# Vertex AI: Check service account
gcloud projects get-iam-policy PROJECT_ID \
  --flatten="bindings[].members" \
  --filter="bindings.members:serviceAccount:*"

# Azure: Check permissions
az role assignment list --assignee <service-principal-id>

Resource Limits

# AWS: Check SageMaker quotas
aws service-quotas list-service-quotas \
  --service-code sagemaker

# GCP: Check Vertex AI quotas
gcloud compute project-info describe \
  --format="value(quotas)"

# Azure: Check subscription limits
az vm list-usage --location eastus

Container Build Failures

# Enable debug logging
import os
os.environ["ZENML_LOGGING_VERBOSITY"] = "DEBUG"

# Check container registry access
# Ensure your step operator can pull images

Cost Optimization

Use Spot/Preemptible Instances

Save up to 90% on compute costs:
# SageMaker
sagemaker_settings = SageMakerStepOperatorSettings(
    use_spot_instances=True,
    max_wait_time_in_seconds=3600,
)

# Vertex AI
vertex_settings = VertexStepOperatorSettings(
    use_preemptible=True,
)

Right-Size Resources

# Don't over-provision
settings = SageMakerStepOperatorSettings(
    instance_type="ml.m5.xlarge",  # Start small
    # Scale up only if needed
)

Set Timeouts

# Prevent runaway costs
settings = SageMakerStepOperatorSettings(
    max_runtime_in_seconds=3600,  # Kill after 1 hour
)

Next Steps

Stack Components Overview

Learn about other stack components

Advanced Pipelines

Build sophisticated ML pipelines

Build docs developers (and LLMs) love