Skip to main content
The Step Context provides access to runtime information, metadata, and utilities within your step functions. It allows steps to access information about the current pipeline run, interact with the stack, and retrieve artifacts from previous runs.

Accessing Step Context

Use the get_step_context() function to access context within a step:
from zenml import step, get_step_context

@step
def my_step() -> str:
    """Step that uses context."""
    context = get_step_context()
    
    # Access context information
    step_name = context.step_name
    run_name = context.pipeline_run.name
    
    print(f"Running step '{step_name}' in run '{run_name}'")
    return "done"

What’s Available in Step Context

1
Step 1: Basic Information
2
Access basic information about the current execution:
3
from zenml import step, get_step_context

@step
def info_step() -> dict:
    """Display step execution information."""
    context = get_step_context()
    
    info = {
        "step_name": context.step_name,
        "step_run_id": str(context.step_run.id),
        "pipeline_name": context.pipeline.name,
        "pipeline_run_id": str(context.pipeline_run.id),
        "run_name": context.pipeline_run.name,
    }
    
    print(f"Step: {info['step_name']}")
    print(f"Pipeline: {info['pipeline_name']}")
    print(f"Run: {info['run_name']}")
    
    return info
4
Step 2: Access Stack Components
5
Interact with active stack components:
6
from zenml import step, get_step_context

@step
def stack_aware_step() -> dict:
    """Access active stack components."""
    context = get_step_context()
    
    # Get artifact store
    artifact_store = context.stack.artifact_store
    print(f"Artifact store: {artifact_store.name}")
    print(f"Artifact store path: {artifact_store.path}")
    
    # Get orchestrator
    orchestrator = context.stack.orchestrator
    print(f"Orchestrator: {orchestrator.name}")
    
    # Check for optional components
    if context.stack.experiment_tracker:
        print(f"Experiment tracker: {context.stack.experiment_tracker.name}")
    
    return {
        "artifact_store": artifact_store.name,
        "orchestrator": orchestrator.name,
    }
7
Step 3: Retrieve Previous Artifacts
8
Load artifacts from previous pipeline runs:
9
from zenml import step, get_step_context
from zenml.client import Client

@step
def compare_with_baseline(current_metrics: dict) -> dict:
    """Compare current metrics with baseline from previous run."""
    context = get_step_context()
    client = Client()
    
    # Get the current pipeline
    pipeline_name = context.pipeline.name
    
    # Find previous successful runs
    previous_runs = client.list_pipeline_runs(
        pipeline_name=pipeline_name,
        status="completed",
        size=2  # Get last 2 runs (current + previous)
    )
    
    if len(previous_runs.items) > 1:
        # Get the previous run (index 1, since 0 is current)
        previous_run = previous_runs.items[1]
        
        # Load artifact from previous run
        baseline_metrics = previous_run.steps["evaluate_model"].output.load()
        
        comparison = {
            "current_accuracy": current_metrics["accuracy"],
            "baseline_accuracy": baseline_metrics["accuracy"],
            "improvement": current_metrics["accuracy"] - baseline_metrics["accuracy"]
        }
        
        print(f"Improvement: {comparison['improvement']:.3f}")
        return comparison
    else:
        print("No baseline available, this is the first run")
        return {"current_accuracy": current_metrics["accuracy"]}
10
Step 4: Access Model Context
11
Work with ZenML model registry:
12
from zenml import step, get_step_context, Model

@step(model=Model(name="my_model"))
def save_to_model_registry(model: object, metrics: dict) -> None:
    """Save model and associate with model registry."""
    context = get_step_context()
    
    # Access the model context
    model_context = context.model
    print(f"Model name: {model_context.name}")
    print(f"Model version: {model_context.version}")
    
    # The model and metrics are automatically tracked
    # in the model registry through the context

Common Use Cases

Conditional Logic Based on Environment

Make decisions based on the active stack:
@step
def environment_aware_step(data: dict) -> dict:
    """Adjust behavior based on environment."""
    context = get_step_context()
    
    # Check if running with specific orchestrator
    if context.stack.orchestrator.flavor == "kubernetes":
        print("Running on Kubernetes, using distributed processing")
        return process_distributed(data)
    else:
        print("Running locally, using single-node processing")
        return process_local(data)

Dynamic Artifact Naming

Create artifact names based on runtime information:
from datetime import datetime

@step
def save_with_dynamic_name(data: dict) -> dict:
    """Save data with timestamp in artifact name."""
    context = get_step_context()
    
    # Create dynamic name using context
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    run_name = context.pipeline_run.name
    
    artifact_name = f"data_{run_name}_{timestamp}"
    print(f"Saving as: {artifact_name}")
    
    # Your save logic here
    return data

Accessing Input Artifacts

Get information about input artifacts:
@step
def analyze_input_artifacts(data: dict) -> dict:
    """Analyze metadata of input artifacts."""
    context = get_step_context()
    
    # Get information about inputs
    step_run = context.step_run
    inputs = step_run.inputs
    
    for input_name, input_artifact in inputs.items():
        print(f"Input '{input_name}':")
        print(f"  Artifact ID: {input_artifact.id}")
        print(f"  Artifact type: {input_artifact.type}")
    
    return {"inputs_analyzed": len(inputs)}

Logging Custom Metadata

Add custom metadata to step runs:
@step
def step_with_metadata(data: dict) -> dict:
    """Process data and log custom metadata."""
    context = get_step_context()
    
    # Process data
    result = process_data(data)
    
    # Log custom metadata
    context.add_output_metadata(
        output_name="result",
        metadata={
            "num_records": len(result),
            "processing_time": 1.23,
            "quality_score": 0.95,
        }
    )
    
    return result

Working with Experiment Trackers

Integrate with experiment tracking systems:
@step
def train_with_tracking(
    train_data: dict,
    learning_rate: float = 0.001
) -> object:
    """Train model with experiment tracking."""
    context = get_step_context()
    
    # Check if experiment tracker is available
    if context.stack.experiment_tracker:
        tracker = context.stack.experiment_tracker
        print(f"Logging to: {tracker.name}")
        
        # Log parameters
        tracker.log_params({
            "learning_rate": learning_rate,
            "dataset_size": len(train_data)
        })
    
    # Train model
    model = train_model(train_data, learning_rate)
    
    if context.stack.experiment_tracker:
        # Log metrics
        tracker.log_metrics({
            "train_accuracy": 0.95,
            "train_loss": 0.05
        })
    
    return model

Accessing Configuration

Retrieve step and pipeline configuration:
@step
def config_aware_step() -> dict:
    """Access step configuration."""
    context = get_step_context()
    
    # Get step configuration
    step_config = context.step_run.config
    
    print(f"Step name: {step_config.name}")
    print(f"Enable cache: {step_config.enable_cache}")
    
    # Access resource settings if configured
    if step_config.resource_settings:
        resources = step_config.resource_settings
        print(f"CPU count: {resources.cpu_count}")
        print(f"Memory: {resources.memory}")
    
    return {"configured": True}

Pipeline State Management

Share state between steps using the run context:
from zenml import step, get_step_context

@step
def step_one() -> str:
    """First step that sets pipeline state."""
    context = get_step_context()
    
    # Access run-level state
    run_context = context.run_context
    
    # Set shared state (only works within same pipeline run)
    run_context.state = {"initialized": True, "version": "1.0"}
    
    return "initialized"

@step
def step_two(input_data: str) -> dict:
    """Second step that reads pipeline state."""
    context = get_step_context()
    run_context = context.run_context
    
    # Read shared state
    if run_context.state:
        print(f"Pipeline state: {run_context.state}")
        version = run_context.state.get("version", "unknown")
    else:
        version = "unknown"
    
    return {"version": version}

Error Handling with Context

Use context for better error messages:
@step
def robust_step(data: dict) -> dict:
    """Step with context-aware error handling."""
    context = get_step_context()
    
    try:
        result = risky_operation(data)
        return result
    except Exception as e:
        # Include context in error message
        error_msg = (
            f"Error in step '{context.step_name}' "
            f"of pipeline '{context.pipeline.name}' "
            f"(run: {context.pipeline_run.name}): {str(e)}"
        )
        print(error_msg)
        raise RuntimeError(error_msg)

Best Practices

Use Context Judiciously

Only access context when needed. Don’t make every step context-dependent.

Cache Context Reference

Call get_step_context() once and reuse the reference within the step.

Handle Missing Components

Always check if optional stack components exist before using them.

Document Context Usage

Clearly document when steps depend on specific stack components.

Context Properties Reference

Available Properties

context = get_step_context()

# Step information
context.step_name                    # Name of current step
context.step_run                     # Current step run object
context.step_run.id                  # Step run ID
context.step_run.config              # Step configuration
context.step_run.inputs              # Input artifacts
context.step_run.outputs             # Output artifacts

# Pipeline information
context.pipeline                     # Pipeline object
context.pipeline.name                # Pipeline name
context.pipeline_run                 # Current pipeline run
context.pipeline_run.id              # Pipeline run ID
context.pipeline_run.name            # Pipeline run name
context.pipeline_run.config          # Pipeline configuration

# Stack components
context.stack                        # Active stack
context.stack.name                   # Stack name
context.stack.artifact_store         # Artifact store component
context.stack.orchestrator           # Orchestrator component
context.stack.experiment_tracker     # Experiment tracker (optional)
context.stack.model_deployer         # Model deployer (optional)

# Model context (if configured)
context.model                        # Model context
context.model.name                   # Model name
context.model.version                # Model version

# Run context (for sharing state)
context.run_context                  # Run-level context
context.run_context.state            # Shared state object

Limitations

Important limitations of step context:
  • Context is only available during step execution
  • Cannot be accessed outside of step functions
  • Run context state is not persisted between pipeline runs
  • Some properties may be None if not configured (optional components)

Next Steps

Writing Steps

Learn the fundamentals of creating pipeline steps

Artifact Management

Understand how artifacts are tracked and accessed

Stack Configuration

Learn about configuring stack components

Build docs developers (and LLMs) love