Skip to main content
Dynamic pipelines allow you to create execution graphs that adapt based on runtime conditions, data characteristics, or external inputs. Unlike static pipelines where the DAG is known before execution, dynamic pipelines determine their structure during runtime.

Static vs Dynamic Pipelines

Static Pipelines

In static pipelines, the complete execution graph is known before any step runs:
from zenml import pipeline, step

@step
def load_data() -> dict:
    return {"records": 100}

@step
def process_data(data: dict) -> dict:
    return {"processed": data["records"] * 2}

@step
def save_results(data: dict) -> None:
    print(f"Saved {data['processed']} records")

@pipeline
def static_pipeline():
    """All steps are known before execution starts."""
    data = load_data()
    processed = process_data(data)
    save_results(processed)
The orchestrator knows the full DAG:
load_data → process_data → save_results

Dynamic Pipelines

Dynamic pipelines determine which steps to run based on runtime conditions:
from zenml import pipeline, step
from zenml.config import ResourceSettings

@step
def check_data_size() -> int:
    """Determine data size at runtime."""
    # Could query database, check file size, etc.
    return 1000000  # 1M records

@step(settings={"resources": ResourceSettings(cpu_count=2, memory="4GB")})
def process_small_batch(batch_id: int) -> dict:
    return {"batch": batch_id, "processed": True}

@step(settings={"resources": ResourceSettings(cpu_count=8, memory="16GB")})
def process_large_batch(batch_id: int) -> dict:
    return {"batch": batch_id, "processed": True}

@pipeline
def dynamic_pipeline():
    """Steps are determined during execution."""
    size = check_data_size()
    
    # Branch based on runtime value
    if size < 10000:
        # Small dataset: single-threaded
        results = [process_small_batch(i) for i in range(3)]
    else:
        # Large dataset: parallel processing
        results = [process_large_batch(i) for i in range(10)]
    
    return results

How Dynamic Pipelines Work

Orchestration Container

For dynamic pipelines, orchestrators launch an orchestration container that:
  1. Runs the pipeline function to determine the execution graph
  2. Submits discovered steps to the orchestration backend
  3. Monitors step execution and handles failures
┌─────────────────────────────────────────┐
│     Orchestration Container             │
│                                         │
│  1. Run pipeline function               │
│  2. Discover steps dynamically          │
│  3. Submit steps to backend             │
└─────────────────────────────────────────┘

              ├──────┬──────┬──────┬──────┐
              ▼      ▼      ▼      ▼      ▼
          Step 1  Step 2  Step 3  Step 4  Step 5

Database Schema

Dynamic pipelines use a different database schema than static pipelines. The is_dynamic flag on pipeline snapshots indicates this:
# From migration: af27025fe19c_dynamic_pipelines.py

# Pipeline snapshots have is_dynamic flag
pipeline_snapshot.is_dynamic = True  # or False

# Step configurations can be linked to:
# - snapshot_id (static: all steps known upfront)
# - step_run_id (dynamic: steps discovered during execution)
This allows steps to be added to the pipeline as they’re discovered during execution.

Orchestrator Support

Not all orchestrators support dynamic pipelines. Check the supports_dynamic_pipelines property:
from zenml.orchestrators import BaseOrchestrator

class MyOrchestrator(BaseOrchestrator):
    @property
    def supports_dynamic_pipelines(self) -> bool:
        """Whether this orchestrator supports dynamic pipelines.
        
        Returns True if submit_dynamic_pipeline() is implemented.
        """
        return (
            getattr(self.submit_dynamic_pipeline, "__func__", None)
            is not BaseOrchestrator.submit_dynamic_pipeline
        )

Implementing Dynamic Pipeline Support

To support dynamic pipelines in your custom orchestrator:
from typing import Dict, Optional
from zenml.orchestrators import ContainerizedOrchestrator, SubmissionResult
from zenml.models import PipelineRunResponse, PipelineSnapshotResponse
from zenml.stack import Stack

class MyOrchestrator(ContainerizedOrchestrator):
    def submit_dynamic_pipeline(
        self,
        snapshot: PipelineSnapshotResponse,
        stack: Stack,
        environment: Dict[str, str],
        placeholder_run: Optional[PipelineRunResponse] = None,
    ) -> Optional[SubmissionResult]:
        """Submit a dynamic pipeline to the orchestration backend.
        
        Args:
            snapshot: Pipeline snapshot
            stack: The stack the pipeline runs on
            environment: Environment variables for orchestration container
            placeholder_run: Optional placeholder run
            
        Returns:
            Submission result with optional wait callback
        """
        # Get the Docker image for the orchestration container
        image = self.get_image(snapshot)
        
        # The orchestration container runs the pipeline function
        # which discovers and submits steps dynamically
        entrypoint = self._get_entrypoint_command()
        
        # Launch orchestration container
        job_id = self._submit_job(
            image=image,
            command=entrypoint,
            environment=environment,
            # Orchestration containers typically need more resources
            cpu_count=2,
            memory="4GB",
        )
        
        return SubmissionResult(
            wait_for_completion=lambda: self._wait_for_job(job_id)
        )
    
    def _get_entrypoint_command(self) -> list:
        """Get command to run pipeline orchestration."""
        return [
            "python",
            "-m",
            "zenml.orchestrators.pipeline_runner",
        ]

Advanced Dynamic Patterns

Conditional Branching

Execute different step sequences based on data characteristics:
@step
def analyze_data() -> str:
    """Analyze data and determine processing strategy."""
    # Could check data quality, size, distribution, etc.
    data_quality = check_quality()
    
    if data_quality < 0.5:
        return "needs_cleaning"
    elif data_quality < 0.8:
        return "needs_validation"
    else:
        return "ready"

@step
def clean_data(data: dict) -> dict:
    return {"cleaned": True}

@step
def validate_data(data: dict) -> dict:
    return {"validated": True}

@step
def process_data(data: dict) -> dict:
    return {"processed": True}

@pipeline
def adaptive_pipeline():
    """Adapt processing based on data quality."""
    quality = analyze_data()
    
    if quality == "needs_cleaning":
        data = clean_data()
        data = validate_data(data)
    elif quality == "needs_validation":
        data = validate_data()
    else:
        data = {"status": "ready"}
    
    result = process_data(data)
    return result

Dynamic Parallelism

Scale parallelism based on data characteristics:
@step
def count_partitions() -> int:
    """Determine optimal partition count."""
    total_records = get_record_count()
    optimal_batch_size = 10000
    return (total_records + optimal_batch_size - 1) // optimal_batch_size

@step
def process_partition(partition_id: int, total: int) -> dict:
    """Process a single partition."""
    return {
        "partition": partition_id,
        "total_partitions": total,
        "records_processed": 10000,
    }

@step
def merge_results(results: list) -> dict:
    """Merge results from all partitions."""
    total_processed = sum(r["records_processed"] for r in results)
    return {"total_processed": total_processed}

@pipeline
def dynamic_parallel_pipeline():
    """Process data with dynamic parallelism."""
    num_partitions = count_partitions()
    
    # Create partition steps dynamically
    results = [
        process_partition(i, num_partitions)
        for i in range(num_partitions)
    ]
    
    final_result = merge_results(results)
    return final_result

External Input Integration

Make decisions based on external services:
@step
def check_external_api() -> dict:
    """Query external API for configuration."""
    import requests
    
    response = requests.get("https://api.example.com/config")
    return response.json()

@step
def process_with_gpu(data: dict) -> dict:
    """GPU-accelerated processing."""
    return {"method": "gpu", "result": data}

@step
def process_with_cpu(data: dict) -> dict:
    """CPU-only processing."""
    return {"method": "cpu", "result": data}

@pipeline
def external_config_pipeline():
    """Use external configuration to determine execution."""
    config = check_external_api()
    
    if config.get("gpu_available"):
        result = process_with_gpu(config)
    else:
        result = process_with_cpu(config)
    
    return result

Resource Requirements

Orchestration Container Resources

The orchestration container typically needs minimal resources since it only coordinates execution:
class MyOrchestrator(ContainerizedOrchestrator):
    def submit_dynamic_pipeline(self, snapshot, stack, environment, placeholder_run):
        # Orchestration container: modest resources
        orchestration_job = self._submit_job(
            image=self.get_image(snapshot),
            cpu_count=1,  # Just needs to coordinate
            memory="2GB",
            environment=environment,
        )
        
        return SubmissionResult(
            wait_for_completion=lambda: self._wait(orchestration_job)
        )

Step Resources

Individual steps can have their own resource requirements:
from zenml import step
from zenml.config import ResourceSettings

@step(
    settings={
        "resources": ResourceSettings(
            cpu_count=8,
            memory="32GB",
            gpu_count=1,
        )
    }
)
def heavy_processing_step(data: dict) -> dict:
    """Step with significant resource requirements."""
    # GPU-accelerated processing
    return {"processed": True}

The get_orchestrator_run_id() Challenge

For dynamic pipelines, get_orchestrator_run_id() has simpler requirements than static pipelines:
import socket
import os

def get_orchestrator_run_id(self) -> str:
    """Return unique ID for orchestrator run.
    
    For dynamic pipelines:
    - Only needs to be unique within the orchestration container
    - Doesn't need to be shared across step containers
    - The orchestration container creates the pipeline run
    
    For static pipelines:
    - Must be the same for ALL steps in the pipeline run
    - Must be unique across different pipeline runs
    - Used by all steps to find the same pipeline run
    """
    # Static pipeline: use environment variable set when launching steps
    if "MY_ORCHESTRATOR_RUN_ID" in os.environ:
        return os.environ["MY_ORCHESTRATOR_RUN_ID"]
    
    # Dynamic pipeline: use container ID (only for orchestration container)
    return socket.gethostname()
The orchestration container creates the pipeline run, so individual steps don’t need to coordinate through the orchestrator run ID.

Best Practices

Minimize Orchestration Logic

Keep the orchestration container lightweight. Move heavy computation to steps.

Handle Failures Gracefully

Dynamic pipelines can fail during step discovery. Add error handling.

Document Decision Logic

Make it clear what runtime conditions trigger different execution paths.

Test Edge Cases

Test with boundary conditions that trigger different execution paths.

Testing Dynamic Pipelines

import pytest
from zenml import pipeline, step

@step
def get_size(test_size: int) -> int:
    """Allow injecting size for testing."""
    return test_size

@step
def process_small(data: int) -> str:
    return f"small-{data}"

@step
def process_large(data: int) -> str:
    return f"large-{data}"

@pipeline
def testable_dynamic_pipeline(test_size: int):
    size = get_size(test_size)
    
    if size < 100:
        return process_small(size)
    else:
        return process_large(size)

# Test different branches
def test_small_branch():
    result = testable_dynamic_pipeline(test_size=50)
    assert "small" in result

def test_large_branch():
    result = testable_dynamic_pipeline(test_size=200)
    assert "large" in result

Common Pitfalls

Non-Deterministic Pipelines

Avoid randomness in branching logic. Pipeline reruns should produce the same execution graph for the same inputs.

Expensive Orchestration

Don’t do heavy computation in the orchestration container. Move it to dedicated steps.

Assuming All Orchestrators Support Dynamic Pipelines

Always check orchestrator.supports_dynamic_pipelines before using dynamic features.

Next Steps

Custom Orchestrators

Build your own orchestrator with dynamic pipeline support

Resource Configuration

Configure CPU, memory, and GPU for dynamic steps

Containerization

Understand Docker image building for dynamic pipelines

Custom Materializers

Handle custom data types between dynamic steps

Build docs developers (and LLMs) love