Skip to main content
Orchestrators are responsible for executing your pipeline steps on various infrastructure backends. Building a custom orchestrator allows you to integrate ZenML with any execution backend, from cloud platforms to on-premise clusters.

Understanding Orchestrators

Orchestrators manage the execution flow of your pipelines. They handle:
  • Submitting pipeline runs to execution backends
  • Managing step dependencies and execution order
  • Providing unique run identifiers for tracking
  • Supporting both static and dynamic pipeline execution

BaseOrchestrator Interface

All orchestrators inherit from BaseOrchestrator and must implement the abstract method get_orchestrator_run_id().

Static vs Dynamic Pipelines

ZenML supports two execution modes: Static pipelines: The complete DAG (directed acyclic graph) is known before execution begins. Steps are submitted individually to the backend. Dynamic pipelines: The DAG can change during execution. An orchestration container runs first to determine which steps to execute.

Implementing Your Orchestrator

Step 1: Create the Configuration Class

Define configuration options for your orchestrator:
from zenml.orchestrators import BaseOrchestratorConfig
from pydantic import Field

class MyOrchestratorConfig(BaseOrchestratorConfig):
    """Configuration for my custom orchestrator."""
    
    region: str = Field(
        description="Cloud region for execution"
    )
    
    execution_role: str = Field(
        description="IAM role for pipeline execution"
    )
    
    @property
    def is_synchronous(self) -> bool:
        """Whether the orchestrator runs synchronously.
        
        Returns:
            False for cloud orchestrators that submit async jobs.
        """
        return False
    
    @property
    def is_schedulable(self) -> bool:
        """Whether the orchestrator supports scheduling.
        
        Returns:
            True if your backend supports cron schedules.
        """
        return True

Step 2: Implement the Orchestrator Class

For containerized orchestrators (most cloud platforms), inherit from ContainerizedOrchestrator:
import os
from typing import Dict, Optional
from zenml.orchestrators import ContainerizedOrchestrator
from zenml.models import PipelineRunResponse, PipelineSnapshotResponse
from zenml.stack import Stack

class MyOrchestrator(ContainerizedOrchestrator):
    """Custom orchestrator implementation."""
    
    @property
    def config(self) -> MyOrchestratorConfig:
        """Returns the orchestrator configuration."""
        return cast(MyOrchestratorConfig, self._config)
    
    def get_orchestrator_run_id(self) -> str:
        """Returns the run ID of the active orchestrator run.
        
        This is the most critical method to implement correctly.
        
        Requirements:
        - Must return the SAME value for all steps in a pipeline run
        - Must be UNIQUE across different runs
        - Limited to ~250 characters (MySQL column limit)
        
        Returns:
            The unique orchestrator run ID.
        """
        # For static pipelines: read from environment variable
        # set by your orchestrator when launching steps
        if "MY_ORCHESTRATOR_RUN_ID" in os.environ:
            return os.environ["MY_ORCHESTRATOR_RUN_ID"]
        
        # For dynamic pipelines: use container-specific identifier
        # This only needs to be unique within the orchestration container
        return os.environ.get("CONTAINER_ID", "unknown")
    
    def submit_pipeline(
        self,
        snapshot: PipelineSnapshotResponse,
        stack: Stack,
        base_environment: Dict[str, str],
        step_environments: Dict[str, Dict[str, str]],
        placeholder_run: Optional[PipelineRunResponse] = None,
    ) -> Optional[SubmissionResult]:
        """Submit a static pipeline to the orchestration backend.
        
        Args:
            snapshot: Pipeline snapshot with step configurations
            stack: The stack the pipeline will run on
            base_environment: Environment variables for all steps
            step_environments: Per-step environment variables
            placeholder_run: Optional placeholder for the pipeline run
            
        Returns:
            Optional submission result with wait callback.
        """
        from zenml.orchestrators import SubmissionResult
        
        # Generate unique run ID for this pipeline execution
        orchestrator_run_id = self._generate_run_id()
        
        # Prepare steps for execution
        for invocation_id, step in snapshot.step_configurations.items():
            # Get the Docker image for this step
            image = self.get_image(snapshot, step.config.name)
            
            # Get environment variables for this step
            env = step_environments[invocation_id].copy()
            env["MY_ORCHESTRATOR_RUN_ID"] = orchestrator_run_id
            
            # Get resource requirements
            resources = step.config.resource_settings
            
            # Submit step to your backend
            self._submit_step(
                step_name=step.config.name,
                image=image,
                environment=env,
                cpu_count=resources.cpu_count,
                memory=resources.memory,
                gpu_count=resources.gpu_count,
            )
        
        # Return result with optional wait callback
        return SubmissionResult(
            wait_for_completion=lambda: self._wait_for_run(orchestrator_run_id)
        )
    
    def submit_dynamic_pipeline(
        self,
        snapshot: PipelineSnapshotResponse,
        stack: Stack,
        environment: Dict[str, str],
        placeholder_run: Optional[PipelineRunResponse] = None,
    ) -> Optional[SubmissionResult]:
        """Submit a dynamic pipeline to the orchestration backend.
        
        For dynamic pipelines, submit an orchestration container that will
        determine and launch the actual pipeline steps at runtime.
        
        Args:
            snapshot: Pipeline snapshot
            stack: The stack the pipeline will run on
            environment: Environment variables for orchestration container
            placeholder_run: Optional placeholder for the pipeline run
            
        Returns:
            Optional submission result.
        """
        from zenml.orchestrators import SubmissionResult
        
        # Get image for orchestration container
        image = self.get_image(snapshot)
        
        # Submit orchestration container
        job_id = self._submit_orchestration_job(
            image=image,
            environment=environment,
        )
        
        return SubmissionResult(
            wait_for_completion=lambda: self._wait_for_job(job_id)
        )
    
    def _generate_run_id(self) -> str:
        """Generate a unique run ID for the pipeline."""
        import uuid
        return f"run-{uuid.uuid4().hex[:12]}"
    
    def _submit_step(
        self,
        step_name: str,
        image: str,
        environment: Dict[str, str],
        cpu_count: Optional[float],
        memory: Optional[str],
        gpu_count: Optional[int],
    ) -> None:
        """Submit a single step to your execution backend."""
        # Implementation depends on your backend
        # Examples: boto3 for AWS, google.cloud for GCP, kubernetes for K8s
        pass
    
    def _submit_orchestration_job(self, image: str, environment: Dict[str, str]) -> str:
        """Submit orchestration container for dynamic pipelines."""
        # Submit a job that will run the orchestration logic
        pass
    
    def _wait_for_run(self, run_id: str) -> None:
        """Wait for a pipeline run to complete."""
        # Poll your backend until the run finishes
        pass
    
    def _wait_for_job(self, job_id: str) -> None:
        """Wait for an orchestration job to complete."""
        pass

Step 3: Create the Flavor Class

from typing import Type, TYPE_CHECKING
from zenml.orchestrators import BaseOrchestratorFlavor

if TYPE_CHECKING:
    from zenml.orchestrators import BaseOrchestrator

class MyOrchestratorFlavor(BaseOrchestratorFlavor):
    """Flavor for my custom orchestrator."""
    
    @property
    def name(self) -> str:
        """The flavor name."""
        return "my_orchestrator"
    
    @property
    def config_class(self) -> Type[MyOrchestratorConfig]:
        """The configuration class."""
        return MyOrchestratorConfig
    
    @property
    def implementation_class(self) -> Type["BaseOrchestrator"]:
        """The implementation class."""
        from my_integration.orchestrators import MyOrchestrator
        return MyOrchestrator

The Tricky Part: get_orchestrator_run_id()

This method is where most implementers struggle. Let’s understand why it’s needed and how to implement it correctly.

Why It’s Needed

In static pipelines, steps start executing immediately without a central coordinator. The first step to execute needs to:
  1. Create a pipeline run in ZenML’s database
  2. Share the run ID with all other steps
The orchestrator run ID serves as the “meeting point” - all steps use it to find the same pipeline run.

Key Requirements

Same for All Steps

Every step in a run MUST get the exact same ID. Don’t generate a new ID per step.

Unique Per Run

Different pipeline executions MUST get different IDs. Don’t return a fixed string.

From Backend

Use an ID provided by your orchestration backend (job ID, execution ARN, etc.).

Size Limit

Must be under ~250 characters due to database column constraints.

Implementation Patterns

For Kubernetes-based orchestrators:
import socket
import os

def get_orchestrator_run_id(self) -> str:
    # Static pipelines: use pre-set environment variable
    try:
        return os.environ["ZENML_KUBERNETES_RUN_ID"]
    except KeyError:
        # Dynamic pipelines: use pod name (unique per container)
        return socket.gethostname()
For cloud platforms (AWS, GCP, Azure):
import os
import json

def get_orchestrator_run_id(self) -> str:
    # Check environment variables
    for env_var in ["MY_PLATFORM_JOB_ID", "MY_PLATFORM_EXECUTION_ARN"]:
        if env_var in os.environ:
            return os.environ[env_var]
    
    # Fall back to config file if your platform provides one
    config_path = "/opt/platform/config.json"
    if os.path.exists(config_path):
        with open(config_path) as f:
            config = json.load(f)
            return config["job_id"]
    
    raise RuntimeError("Unable to determine orchestrator run ID")

Handling Resource Settings

Orchestrators should respect resource requirements from step configurations:
from zenml.config.resource_settings import ResourceSettings

def _apply_resources(self, step_config, backend_job_config):
    """Apply resource settings to backend job configuration."""
    resources: ResourceSettings = step_config.resource_settings
    
    # CPU configuration
    if resources.cpu_count:
        backend_job_config["cpu"] = resources.cpu_count
    
    # Memory configuration
    if resources.memory:
        # Convert to backend-specific format
        memory_mb = resources.get_memory(unit="MB")
        backend_job_config["memory_mb"] = int(memory_mb)
    
    # GPU configuration
    if resources.gpu_count:
        backend_job_config["gpu_count"] = resources.gpu_count
        # Some backends need GPU type specification
        backend_job_config["gpu_type"] = "nvidia-tesla-t4"

Testing Your Orchestrator

Create a simple test pipeline:
from zenml import pipeline, step

@step
def load_data() -> int:
    return 42

@step
def process_data(data: int) -> int:
    return data * 2

@pipeline
def test_pipeline():
    data = load_data()
    process_data(data)

# Run with your orchestrator
test_pipeline.with_options(
    orchestrator="my_orchestrator"
)()

Common Pitfalls

Non-Unique IDs

Returning a fixed string or timestamp that could collide across runs

Different IDs Per Step

Generating a new ID for each step instead of sharing one ID across all steps

Missing Environment Variables

Not setting the orchestrator run ID in the environment when launching step containers

Ignoring Resource Settings

Not reading cpu_count, memory, and gpu_count from step configurations

Reference Implementations

Study these real orchestrators for guidance:
  • Kubernetes Orchestrator: src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py
  • AWS SageMaker Orchestrator: src/zenml/integrations/aws/orchestrators/sagemaker_orchestrator.py
  • GCP Vertex Orchestrator: src/zenml/integrations/gcp/orchestrators/vertex_orchestrator.py

Next Steps

Resource Configuration

Learn about CPU, memory, and GPU settings

Containerization

Understand Docker image building and management

Dynamic Pipelines

Support pipelines with runtime-determined DAGs

Custom Materializers

Handle custom data types in your pipelines

Build docs developers (and LLMs) love