Skip to main content
The unmapped() function wraps a value so it will be passed in full to each invocation when using parallel step execution with .map(), rather than being distributed across invocations.

Signature

def unmapped(value: T) -> _Unmapped[T]

Parameters

value
T
required
The value to wrap. This value will be passed unchanged to all step invocations.

Returns

wrapped
_Unmapped[T]
A wrapped version of the value that signals it should not be mapped over.

Examples

Basic Parallel Execution

from zenml import pipeline, step, unmapped

@step
def process_item(item: int, config: dict) -> dict:
    # config is the same for all invocations
    return {
        "item": item,
        "multiplier": config["multiplier"],
        "result": item * config["multiplier"]
    }

@pipeline(dynamic=True)
def parallel_pipeline():
    items = [1, 2, 3, 4, 5]
    config = {"multiplier": 10}
    
    # Map over items, but pass config unchanged to each invocation
    results = process_item.map(
        item=items,
        config=unmapped(config)  # Same config for all
    )

Multiple Unmapped Parameters

from zenml import pipeline, step, unmapped
import pandas as pd

@step
def process_chunk(
    chunk_id: int,
    model_config: dict,
    preprocessing_params: dict
) -> pd.DataFrame:
    # Both config dicts are the same for all chunks
    print(f"Processing chunk {chunk_id}")
    print(f"Model: {model_config['name']}")
    print(f"Preprocessing: {preprocessing_params}")
    
    return pd.DataFrame({"chunk": [chunk_id]})

@pipeline(dynamic=True)
def batch_pipeline():
    chunk_ids = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    
    model_config = {
        "name": "RandomForest",
        "n_estimators": 100,
        "max_depth": 10
    }
    
    preprocessing_params = {
        "normalize": True,
        "handle_missing": "mean"
    }
    
    # Map over chunk_ids, pass configs unchanged
    results = process_chunk.map(
        chunk_id=chunk_ids,
        model_config=unmapped(model_config),
        preprocessing_params=unmapped(preprocessing_params)
    )

Shared Model for Predictions

from zenml import pipeline, step, unmapped
import pandas as pd
from typing import Any

@step
def train_model() -> Any:
    from sklearn.ensemble import RandomForestClassifier
    model = RandomForestClassifier()
    # Training logic...
    return model

@step
def predict_batch(
    data_batch: pd.DataFrame,
    model: Any
) -> pd.DataFrame:
    # Same model is used for all batches
    predictions = model.predict(data_batch)
    data_batch["predictions"] = predictions
    return data_batch

@pipeline(dynamic=True)
def prediction_pipeline():
    # Train once
    model = train_model()
    
    # Create multiple data batches
    batches = [
        pd.DataFrame({"feature": range(i * 100, (i + 1) * 100)})
        for i in range(10)
    ]
    
    # Predict on all batches with the same model
    results = predict_batch.map(
        data_batch=batches,
        model=unmapped(model)  # Same model for all batches
    )

Reference Data for All Items

from zenml import pipeline, step, unmapped
import pandas as pd

@step
def load_reference_data() -> pd.DataFrame:
    return pd.DataFrame({
        "id": [1, 2, 3],
        "category": ["A", "B", "C"]
    })

@step
def enrich_item(
    item_id: int,
    reference: pd.DataFrame
) -> dict:
    # Reference data is the same for all items
    # Use it to enrich each item
    category = reference[reference["id"] == item_id]["category"].values[0]
    return {"item_id": item_id, "category": category}

@pipeline(dynamic=True)
def enrichment_pipeline():
    reference = load_reference_data()
    item_ids = [1, 1, 2, 2, 3, 3]  # Some items repeated
    
    # Enrich all items using same reference data
    enriched = enrich_item.map(
        item_id=item_ids,
        reference=unmapped(reference)
    )

Global Configuration

from zenml import pipeline, step, unmapped

@step
def process_file(
    file_path: str,
    api_key: str,
    timeout: int
) -> dict:
    # api_key and timeout are the same for all files
    print(f"Processing {file_path}")
    print(f"Using API key: {api_key[:5]}...")
    print(f"Timeout: {timeout}s")
    
    return {"file": file_path, "status": "processed"}

@pipeline(dynamic=True)
def file_processing_pipeline():
    files = [
        "/data/file1.csv",
        "/data/file2.csv",
        "/data/file3.csv",
        "/data/file4.csv",
    ]
    
    # Global settings for all file processing
    api_key = "secret_key_12345"
    timeout = 300
    
    results = process_file.map(
        file_path=files,
        api_key=unmapped(api_key),
        timeout=unmapped(timeout)
    )

Mixing Mapped and Unmapped

from zenml import pipeline, step, unmapped
import pandas as pd
from typing import List

@step
def transform_data(
    input_data: pd.DataFrame,
    transformation: str,
    reference_values: List[float]
) -> pd.DataFrame:
    # transformation varies per invocation (mapped)
    # reference_values are the same for all (unmapped)
    
    if transformation == "normalize":
        return input_data / reference_values[0]
    elif transformation == "standardize":
        return (input_data - reference_values[1]) / reference_values[2]
    else:
        return input_data

@pipeline(dynamic=True)
def transformation_pipeline():
    datasets = [
        pd.DataFrame({"value": [1, 2, 3]}),
        pd.DataFrame({"value": [4, 5, 6]}),
        pd.DataFrame({"value": [7, 8, 9]}),
    ]
    
    transformations = ["normalize", "standardize", "none"]
    
    # These statistics are computed once and used for all
    reference_values = [100.0, 5.0, 2.0]  # max, mean, std
    
    results = transform_data.map(
        input_data=datasets,  # Mapped: different for each
        transformation=transformations,  # Mapped: different for each
        reference_values=unmapped(reference_values)  # Unmapped: same for all
    )

Load Balancing with Shared Resource

from zenml import pipeline, step, unmapped
from typing import Any

@step
def create_connection() -> Any:
    # Create expensive resource once
    return create_database_connection()

@step
def query_data(
    query_id: int,
    connection: Any
) -> dict:
    # Reuse same connection for all queries
    result = connection.execute(f"SELECT * FROM table WHERE id={query_id}")
    return {"query_id": query_id, "result": result}

@pipeline(dynamic=True)
def distributed_query_pipeline():
    # Create connection once
    conn = create_connection()
    
    # Multiple queries to execute
    query_ids = list(range(1, 101))  # 100 queries
    
    # Execute all queries with shared connection
    results = query_data.map(
        query_id=query_ids,
        connection=unmapped(conn)
    )

Complex Example with Multiple Patterns

from zenml import pipeline, step, unmapped
import pandas as pd
from typing import Any

@step
def load_model() -> Any:
    # Load model once
    return load_pretrained_model()

@step
def load_tokenizer() -> Any:
    # Load tokenizer once
    return load_pretrained_tokenizer()

@step
def process_text(
    text: str,
    batch_id: int,
    model: Any,
    tokenizer: Any,
    max_length: int
) -> dict:
    # model, tokenizer, and max_length are shared
    # text and batch_id vary per invocation
    
    tokens = tokenizer.encode(text, max_length=max_length)
    output = model(tokens)
    
    return {
        "batch_id": batch_id,
        "text": text,
        "output": output
    }

@pipeline(dynamic=True)
def nlp_pipeline():
    # Load resources once
    model = load_model()
    tokenizer = load_tokenizer()
    
    # Data to process
    texts = [
        "Hello world",
        "Machine learning is great",
        "ZenML makes pipelines easy",
        "Dynamic pipelines are powerful"
    ]
    batch_ids = [1, 2, 3, 4]
    
    # Process all texts with shared resources
    results = process_text.map(
        text=texts,  # Mapped
        batch_id=batch_ids,  # Mapped
        model=unmapped(model),  # Unmapped
        tokenizer=unmapped(tokenizer),  # Unmapped
        max_length=unmapped(512)  # Unmapped
    )

Important Notes

  • unmapped() only works with .map() calls in dynamic pipelines
  • Without unmapped(), all parameters are distributed across invocations
  • The wrapped value is passed by reference (not copied) to each invocation
  • Use unmapped() for expensive resources that should be shared (models, connections, configurations)
  • Lists of unmapped values still need the unmapped() wrapper

Use Cases

  1. Share trained models across prediction batches
  2. Reuse database connections for multiple queries
  3. Pass global configuration to all parallel steps
  4. Share reference data for enrichment operations
  5. Distribute API keys or credentials to all workers
  6. Use common preprocessing parameters across batches
  7. Share expensive resources to optimize performance

@pipeline

Learn about dynamic pipelines

@step

Learn about creating steps

Build docs developers (and LLMs) love