Pass inputs to parallel steps without mapping over them
The unmapped() function wraps a value so it will be passed in full to each invocation when using parallel step execution with .map(), rather than being distributed across invocations.
from zenml import pipeline, step, unmapped@stepdef process_item(item: int, config: dict) -> dict: # config is the same for all invocations return { "item": item, "multiplier": config["multiplier"], "result": item * config["multiplier"] }@pipeline(dynamic=True)def parallel_pipeline(): items = [1, 2, 3, 4, 5] config = {"multiplier": 10} # Map over items, but pass config unchanged to each invocation results = process_item.map( item=items, config=unmapped(config) # Same config for all )
from zenml import pipeline, step, unmappedimport pandas as pdfrom typing import Any@stepdef train_model() -> Any: from sklearn.ensemble import RandomForestClassifier model = RandomForestClassifier() # Training logic... return model@stepdef predict_batch( data_batch: pd.DataFrame, model: Any) -> pd.DataFrame: # Same model is used for all batches predictions = model.predict(data_batch) data_batch["predictions"] = predictions return data_batch@pipeline(dynamic=True)def prediction_pipeline(): # Train once model = train_model() # Create multiple data batches batches = [ pd.DataFrame({"feature": range(i * 100, (i + 1) * 100)}) for i in range(10) ] # Predict on all batches with the same model results = predict_batch.map( data_batch=batches, model=unmapped(model) # Same model for all batches )
from zenml import pipeline, step, unmappedimport pandas as pdfrom typing import List@stepdef transform_data( input_data: pd.DataFrame, transformation: str, reference_values: List[float]) -> pd.DataFrame: # transformation varies per invocation (mapped) # reference_values are the same for all (unmapped) if transformation == "normalize": return input_data / reference_values[0] elif transformation == "standardize": return (input_data - reference_values[1]) / reference_values[2] else: return input_data@pipeline(dynamic=True)def transformation_pipeline(): datasets = [ pd.DataFrame({"value": [1, 2, 3]}), pd.DataFrame({"value": [4, 5, 6]}), pd.DataFrame({"value": [7, 8, 9]}), ] transformations = ["normalize", "standardize", "none"] # These statistics are computed once and used for all reference_values = [100.0, 5.0, 2.0] # max, mean, std results = transform_data.map( input_data=datasets, # Mapped: different for each transformation=transformations, # Mapped: different for each reference_values=unmapped(reference_values) # Unmapped: same for all )