Skip to main content

Overview

The BaseOrchestrator is the abstract base class for all orchestrators in ZenML. Orchestrators are responsible for executing pipelines by coordinating the execution of individual steps. They determine where and how pipeline steps run, whether locally, in containers, or on remote infrastructure.

Class Definition

from zenml.orchestrators import BaseOrchestrator

Inheritance

  • Inherits from: StackComponent, ABC
  • Component Type: StackComponentType.ORCHESTRATOR

Key Concepts

Pipeline Submission

Orchestrators submit pipelines for execution using one of two methods:
  • Static Pipelines: Use submit_pipeline() when the pipeline DAG is known at submission time
  • Dynamic Pipelines: Use submit_dynamic_pipeline() when the pipeline DAG can change during execution

Orchestrator Run ID

Every orchestrator must implement get_orchestrator_run_id() which returns a unique identifier that:
  • Is the same for all steps within a single pipeline run
  • Is unique across different pipeline runs
  • Can be used to track and correlate step executions

Configuration

enable_cache
bool
default:"True"
Whether caching is enabled for steps in pipelines run by this orchestrator.
is_synchronous
bool
default:"False"
Whether the orchestrator runs synchronously (waits for completion) or asynchronously.
is_schedulable
bool
default:"False"
Whether the orchestrator supports scheduling pipelines to run at specific times.
supports_client_side_caching
bool
default:"True"
Whether the orchestrator supports client-side caching of step outputs.
handles_step_retries
bool
default:"False"
Whether the orchestrator natively handles step retries on failure.

Abstract Methods

These methods must be implemented by all orchestrator subclasses.

get_orchestrator_run_id

@abstractmethod
def get_orchestrator_run_id(self) -> str:
    """Returns the run id of the active orchestrator run.
    
    Returns:
        The orchestrator run id.
    """
return
str
A unique identifier for this orchestrator run that remains consistent across all steps.
This ID must be:
  • Unique for each pipeline run
  • Identical for all steps within the same pipeline run
  • Available in the execution environment of each step

Core Methods

submit_pipeline

def submit_pipeline(
    self,
    snapshot: PipelineSnapshotResponse,
    stack: Stack,
    base_environment: Dict[str, str],
    step_environments: Dict[str, Dict[str, str]],
    placeholder_run: Optional[PipelineRunResponse] = None,
) -> Optional[SubmissionResult]:
    """Submits a static pipeline to the orchestrator."""
snapshot
PipelineSnapshotResponse
required
The pipeline snapshot containing the pipeline configuration and steps.
stack
Stack
required
The stack on which the pipeline will run.
base_environment
Dict[str, str]
required
Environment variables shared by all steps.
step_environments
Dict[str, Dict[str, str]]
required
Step-specific environment variables, keyed by step invocation ID.
placeholder_run
PipelineRunResponse
Optional placeholder pipeline run created before submission.
return
SubmissionResult | None
Optional result containing metadata and a wait function for synchronous execution.

submit_dynamic_pipeline

def submit_dynamic_pipeline(
    self,
    snapshot: PipelineSnapshotResponse,
    stack: Stack,
    environment: Dict[str, str],
    placeholder_run: Optional[PipelineRunResponse] = None,
) -> Optional[SubmissionResult]:
    """Submits a dynamic pipeline to the orchestrator."""
snapshot
PipelineSnapshotResponse
required
The pipeline snapshot for the dynamic pipeline.
stack
Stack
required
The stack on which the pipeline will run.
environment
Dict[str, str]
required
Environment variables to set in the orchestration environment.
placeholder_run
PipelineRunResponse
Optional placeholder run for the pipeline.
return
SubmissionResult | None
Optional result containing submission metadata.

run_step

def run_step(self, step: Step) -> None:
    """Runs a single step in the pipeline."""
step
Step
required
The step configuration to execute.

Properties

supports_dynamic_pipelines

@property
def supports_dynamic_pipelines(self) -> bool:
    """Whether the orchestrator supports dynamic pipelines."""
return
bool
True if the orchestrator has implemented submit_dynamic_pipeline().

can_run_isolated_steps

@property
def can_run_isolated_steps(self) -> bool:
    """Whether the orchestrator can run isolated steps."""
return
bool
True if the orchestrator supports running individual steps in isolation.

supported_execution_modes

@property
def supported_execution_modes(self) -> List[ExecutionMode]:
    """Returns the supported execution modes for this orchestrator."""
return
List[ExecutionMode]
List of execution modes supported by this orchestrator (e.g., CONTINUE_ON_FAILURE).

Lifecycle Hooks

run_init_hook

@classmethod
def run_init_hook(cls, snapshot: PipelineSnapshotResponse) -> None:
    """Runs initialization hook before pipeline execution."""
snapshot
PipelineSnapshotResponse
required
The pipeline snapshot for which to run the init hook.

run_cleanup_hook

@classmethod
def run_cleanup_hook(cls, snapshot: PipelineSnapshotResponse) -> None:
    """Runs cleanup hook after pipeline execution."""
snapshot
PipelineSnapshotResponse
required
The pipeline snapshot for which to run the cleanup hook.

Pipeline Management

fetch_status

def fetch_status(
    self,
    run: PipelineRunResponse,
    include_steps: bool = False
) -> Tuple[Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]]:
    """Fetches the current status of a pipeline run."""
run
PipelineRunResponse
required
The pipeline run to fetch status for.
include_steps
bool
default:"False"
Whether to also fetch the status of individual steps.
return
Tuple[Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]]
Tuple of (pipeline status, step statuses dict) if available.

stop_run

def stop_run(self, run: PipelineRunResponse, graceful: bool = False) -> None:
    """Stops a running pipeline."""
run
PipelineRunResponse
required
The pipeline run to stop.
graceful
bool
default:"False"
Whether to attempt graceful shutdown before forcing termination.

Usage Example

from zenml.orchestrators import BaseOrchestrator
from zenml.orchestrators.base_orchestrator import BaseOrchestratorConfig
import os

class CustomOrchestratorConfig(BaseOrchestratorConfig):
    """Configuration for custom orchestrator."""
    
    api_endpoint: str
    
    @property
    def is_remote(self) -> bool:
        return True

class CustomOrchestrator(BaseOrchestrator):
    """Custom orchestrator implementation."""
    
    @property
    def config(self) -> CustomOrchestratorConfig:
        return cast(CustomOrchestratorConfig, self._config)
    
    def get_orchestrator_run_id(self) -> str:
        """Get unique run ID from environment."""
        # Example: read from environment variable set by orchestration platform
        return os.environ["CUSTOM_RUN_ID"]
    
    def submit_pipeline(
        self,
        snapshot: PipelineSnapshotResponse,
        stack: Stack,
        base_environment: Dict[str, str],
        step_environments: Dict[str, Dict[str, str]],
        placeholder_run: Optional[PipelineRunResponse] = None,
    ) -> Optional[SubmissionResult]:
        """Submit pipeline to custom orchestration backend."""
        # Implementation: submit to your orchestration platform
        # ...
        
        return SubmissionResult(
            metadata={"job_id": "12345"},
            wait_for_completion=self._wait_for_completion
        )
    
    def _wait_for_completion(self) -> None:
        """Wait for pipeline to complete."""
        # Poll orchestration backend until completion
        pass

See Also

Stack

Learn about the Stack class that contains orchestrators

BaseStep

Understand the steps that orchestrators execute

StackComponent

Base class for all stack components

Orchestrator Guide

Guide to using and configuring orchestrators

Build docs developers (and LLMs) love