The get_step_context() function provides access to runtime information during step execution, including the current pipeline run, step run, and output artifact URIs.
Signature
def get_step_context () -> StepContext
Returns
A context object containing step runtime information.
StepContext Properties
Information about the current step run.
Information about the current pipeline run.
The name of the current step.
Information about the pipeline.
The model configuration if set in the step or pipeline.
model_version
Optional[ModelVersionResponse]
The model version response object.
StepContext Methods
get_output_artifact_uri
def get_output_artifact_uri ( output_name : Optional[ str ] = None ) -> str
Get the URI where an output artifact will be stored.
Parameters:
output_name (Optional[str]): Name of the output. Required if the step has multiple outputs.
Returns: The URI string
def add_output_metadata (
metadata : Dict[ str , MetadataType],
output_name : Optional[ str ] = None
) -> None
Add metadata to a step output artifact.
def add_output_tags (
tags : List[ str ],
output_name : Optional[ str ] = None
) -> None
Add tags to a step output artifact.
Examples
from zenml import step, get_step_context
@step
def info_step () -> None :
context = get_step_context()
print ( f "Step name: { context.step_name } " )
print ( f "Pipeline run ID: { context.pipeline_run.id } " )
print ( f "Step run ID: { context.step_run.id } " )
Get Output Artifact URI
from zenml import step, get_step_context
import pandas as pd
@step
def save_data () -> pd.DataFrame:
context = get_step_context()
# Get where the output will be stored
uri = context.get_output_artifact_uri()
print ( f "Output will be saved to: { uri } " )
return pd.DataFrame({ "data" : [ 1 , 2 , 3 ]})
Multiple Outputs
from typing import Tuple
from zenml import step, get_step_context
import pandas as pd
@step
def split_data () -> Tuple[pd.DataFrame, pd.DataFrame]:
context = get_step_context()
# Get URIs for specific outputs
train_uri = context.get_output_artifact_uri( "output_0" )
test_uri = context.get_output_artifact_uri( "output_1" )
print ( f "Train data URI: { train_uri } " )
print ( f "Test data URI: { test_uri } " )
train = pd.DataFrame({ "x" : [ 1 , 2 , 3 ]})
test = pd.DataFrame({ "x" : [ 4 , 5 ]})
return train, test
from zenml import step, get_step_context
import pandas as pd
@step
def process_data () -> pd.DataFrame:
df = pd.DataFrame({ "value" : range ( 100 )})
context = get_step_context()
context.add_output_metadata(
metadata = {
"row_count" : len (df),
"columns" : list (df.columns),
"mean_value" : float (df[ "value" ].mean())
}
)
return df
from zenml import step, get_step_context
import pandas as pd
@step
def create_dataset () -> pd.DataFrame:
df = pd.DataFrame({ "feature" : range ( 1000 )})
context = get_step_context()
# Add tags based on data characteristics
if len (df) > 500 :
context.add_output_tags( tags = [ "large-dataset" , "production" ])
else :
context.add_output_tags( tags = [ "small-dataset" , "test" ])
return df
Access Model Context
from zenml import step, get_step_context, Model
@step ( model = Model( name = "my_model" , version = "1.0" ))
def train_model () -> None :
context = get_step_context()
if context.model:
print ( f "Training model: { context.model.name } " )
print ( f "Version: { context.model.version } " )
# Log metadata to the model
context.model.log_metadata({
"training_started" : True ,
"framework" : "sklearn"
})
Conditional Logic Based on Pipeline Run
from zenml import step, get_step_context
@step
def adaptive_step () -> dict :
context = get_step_context()
# Access pipeline run metadata
run_name = context.pipeline_run.name
if "test" in run_name.lower():
print ( "Running in test mode with reduced iterations" )
iterations = 10
else :
print ( "Running in production mode with full iterations" )
iterations = 1000
return { "iterations" : iterations}
Track Step Duration
import time
from zenml import step, get_step_context
@step
def long_running_step () -> None :
context = get_step_context()
start_time = time.time()
# Simulate work
time.sleep( 5 )
duration = time.time() - start_time
# Log duration as metadata
context.add_output_metadata(
metadata = { "duration_seconds" : duration}
)
from typing import Tuple, Annotated
from zenml import step, get_step_context
import pandas as pd
@step
def create_datasets () -> Tuple[
Annotated[pd.DataFrame, "train" ],
Annotated[pd.DataFrame, "test" ],
]:
train_df = pd.DataFrame({ "x" : range ( 800 )})
test_df = pd.DataFrame({ "x" : range ( 200 )})
context = get_step_context()
# Add metadata to train output
context.add_output_metadata(
metadata = { "split" : "train" , "size" : len (train_df)},
output_name = "train"
)
# Add metadata to test output
context.add_output_metadata(
metadata = { "split" : "test" , "size" : len (test_df)},
output_name = "test"
)
return train_df, test_df
Access Previous Step Results
from zenml import step, get_step_context
import pandas as pd
@step
def analyze_previous_run () -> dict :
context = get_step_context()
# Access information about the pipeline run
pipeline_run = context.pipeline_run
# Get information about all steps in this run
stats = {
"total_steps" : len (pipeline_run.steps),
"pipeline_name" : context.pipeline.name,
"run_id" : str (pipeline_run.id)
}
return stats
Important Notes
get_step_context() is only available during step execution (runtime)
It is not available during pipeline definition (design time)
For pipeline configuration during definition, use get_pipeline_context() instead
Raises RuntimeError if called outside a step execution
Common Errors
Called Outside Step
# This will raise RuntimeError
from zenml import get_step_context
context = get_step_context() # Error: No step is currently running
Called During Pipeline Definition
from zenml import pipeline, step, get_step_context
@step
def my_step () -> None :
pass
@pipeline
def my_pipeline ():
# This will raise RuntimeError
context = get_step_context() # Error: Use get_pipeline_context instead
my_step()
@step Learn about creating steps
get_pipeline_context Access pipeline context
log_metadata Log metadata to various resources