Overview
The BaseOrchestrator is the abstract base class for all orchestrators in ZenML. Orchestrators are responsible for executing pipelines by coordinating the execution of individual steps. They determine where and how pipeline steps run, whether locally, in containers, or on remote infrastructure.
Class Definition
from zenml.orchestrators import BaseOrchestrator
Inheritance
Inherits from: StackComponent, ABC
Component Type: StackComponentType.ORCHESTRATOR
Key Concepts
Pipeline Submission
Orchestrators submit pipelines for execution using one of two methods:
Static Pipelines : Use submit_pipeline() when the pipeline DAG is known at submission time
Dynamic Pipelines : Use submit_dynamic_pipeline() when the pipeline DAG can change during execution
Orchestrator Run ID
Every orchestrator must implement get_orchestrator_run_id() which returns a unique identifier that:
Is the same for all steps within a single pipeline run
Is unique across different pipeline runs
Can be used to track and correlate step executions
Configuration
Whether caching is enabled for steps in pipelines run by this orchestrator.
Whether the orchestrator runs synchronously (waits for completion) or asynchronously.
Whether the orchestrator supports scheduling pipelines to run at specific times.
supports_client_side_caching
Whether the orchestrator supports client-side caching of step outputs.
Whether the orchestrator natively handles step retries on failure.
Abstract Methods
These methods must be implemented by all orchestrator subclasses.
get_orchestrator_run_id
@abstractmethod
def get_orchestrator_run_id ( self ) -> str :
"""Returns the run id of the active orchestrator run.
Returns:
The orchestrator run id.
"""
A unique identifier for this orchestrator run that remains consistent across all steps.
This ID must be:
Unique for each pipeline run
Identical for all steps within the same pipeline run
Available in the execution environment of each step
Core Methods
submit_pipeline
def submit_pipeline (
self ,
snapshot : PipelineSnapshotResponse,
stack : Stack,
base_environment : Dict[ str , str ],
step_environments : Dict[ str , Dict[ str , str ]],
placeholder_run : Optional[PipelineRunResponse] = None ,
) -> Optional[SubmissionResult]:
"""Submits a static pipeline to the orchestrator."""
snapshot
PipelineSnapshotResponse
required
The pipeline snapshot containing the pipeline configuration and steps.
The stack on which the pipeline will run.
Environment variables shared by all steps.
step_environments
Dict[str, Dict[str, str]]
required
Step-specific environment variables, keyed by step invocation ID.
Optional placeholder pipeline run created before submission.
Optional result containing metadata and a wait function for synchronous execution.
submit_dynamic_pipeline
def submit_dynamic_pipeline (
self ,
snapshot : PipelineSnapshotResponse,
stack : Stack,
environment : Dict[ str , str ],
placeholder_run : Optional[PipelineRunResponse] = None ,
) -> Optional[SubmissionResult]:
"""Submits a dynamic pipeline to the orchestrator."""
snapshot
PipelineSnapshotResponse
required
The pipeline snapshot for the dynamic pipeline.
The stack on which the pipeline will run.
Environment variables to set in the orchestration environment.
Optional placeholder run for the pipeline.
Optional result containing submission metadata.
run_step
def run_step ( self , step : Step) -> None :
"""Runs a single step in the pipeline."""
The step configuration to execute.
Properties
supports_dynamic_pipelines
@ property
def supports_dynamic_pipelines ( self ) -> bool :
"""Whether the orchestrator supports dynamic pipelines."""
True if the orchestrator has implemented submit_dynamic_pipeline().
can_run_isolated_steps
@ property
def can_run_isolated_steps ( self ) -> bool :
"""Whether the orchestrator can run isolated steps."""
True if the orchestrator supports running individual steps in isolation.
supported_execution_modes
@ property
def supported_execution_modes ( self ) -> List[ExecutionMode]:
"""Returns the supported execution modes for this orchestrator."""
List of execution modes supported by this orchestrator (e.g., CONTINUE_ON_FAILURE).
Lifecycle Hooks
run_init_hook
@ classmethod
def run_init_hook ( cls , snapshot : PipelineSnapshotResponse) -> None :
"""Runs initialization hook before pipeline execution."""
snapshot
PipelineSnapshotResponse
required
The pipeline snapshot for which to run the init hook.
run_cleanup_hook
@ classmethod
def run_cleanup_hook ( cls , snapshot : PipelineSnapshotResponse) -> None :
"""Runs cleanup hook after pipeline execution."""
snapshot
PipelineSnapshotResponse
required
The pipeline snapshot for which to run the cleanup hook.
Pipeline Management
fetch_status
def fetch_status (
self ,
run : PipelineRunResponse,
include_steps : bool = False
) -> Tuple[Optional[ExecutionStatus], Optional[Dict[ str , ExecutionStatus]]]:
"""Fetches the current status of a pipeline run."""
run
PipelineRunResponse
required
The pipeline run to fetch status for.
Whether to also fetch the status of individual steps.
return
Tuple[Optional[ExecutionStatus], Optional[Dict[str, ExecutionStatus]]]
Tuple of (pipeline status, step statuses dict) if available.
stop_run
def stop_run ( self , run : PipelineRunResponse, graceful : bool = False ) -> None :
"""Stops a running pipeline."""
run
PipelineRunResponse
required
The pipeline run to stop.
Whether to attempt graceful shutdown before forcing termination.
Usage Example
from zenml.orchestrators import BaseOrchestrator
from zenml.orchestrators.base_orchestrator import BaseOrchestratorConfig
import os
class CustomOrchestratorConfig ( BaseOrchestratorConfig ):
"""Configuration for custom orchestrator."""
api_endpoint: str
@ property
def is_remote ( self ) -> bool :
return True
class CustomOrchestrator ( BaseOrchestrator ):
"""Custom orchestrator implementation."""
@ property
def config ( self ) -> CustomOrchestratorConfig:
return cast(CustomOrchestratorConfig, self ._config)
def get_orchestrator_run_id ( self ) -> str :
"""Get unique run ID from environment."""
# Example: read from environment variable set by orchestration platform
return os.environ[ "CUSTOM_RUN_ID" ]
def submit_pipeline (
self ,
snapshot : PipelineSnapshotResponse,
stack : Stack,
base_environment : Dict[ str , str ],
step_environments : Dict[ str , Dict[ str , str ]],
placeholder_run : Optional[PipelineRunResponse] = None ,
) -> Optional[SubmissionResult]:
"""Submit pipeline to custom orchestration backend."""
# Implementation: submit to your orchestration platform
# ...
return SubmissionResult(
metadata = { "job_id" : "12345" },
wait_for_completion = self ._wait_for_completion
)
def _wait_for_completion ( self ) -> None :
"""Wait for pipeline to complete."""
# Poll orchestration backend until completion
pass
See Also
Stack Learn about the Stack class that contains orchestrators
BaseStep Understand the steps that orchestrators execute
StackComponent Base class for all stack components
Orchestrator Guide Guide to using and configuring orchestrators