Skip to main content
Resource configuration allows you to specify hardware requirements for individual pipeline steps. This ensures your steps run with appropriate computational resources, whether on local machines or cloud infrastructure.

Understanding Resource Settings

ZenML’s ResourceSettings class provides a unified interface for specifying:
  • CPU cores: Number of CPU cores allocated to a step
  • Memory: Amount of RAM allocated to a step
  • GPU: Number of GPU devices allocated to a step
  • Replicas: Minimum and maximum instance counts (for deployed pipelines)
  • Autoscaling: Metrics and targets for automatic scaling
  • Concurrency: Maximum concurrent requests per instance

Basic Resource Configuration

Configuring a Single Step

Specify resources when defining a step:
from zenml import step
from zenml.config import ResourceSettings

@step(
    settings={
        "resources": ResourceSettings(
            cpu_count=4,
            memory="8GB",
            gpu_count=0,
        )
    }
)
def train_model(data: dict) -> dict:
    """Train model with 4 CPUs and 8GB RAM."""
    # Training logic here
    return {"model": "trained"}

Configuring at Pipeline Level

Apply default resources to all steps:
from zenml import pipeline
from zenml.config import ResourceSettings

@pipeline(
    settings={
        "resources": ResourceSettings(
            cpu_count=2,
            memory="4GB",
        )
    }
)
def ml_pipeline():
    """All steps get 2 CPUs and 4GB RAM by default."""
    data = load_data()
    model = train_model(data)
    evaluate_model(model)
Step-level settings override pipeline-level settings:
# Pipeline default: 2 CPUs, 4GB RAM
@pipeline(
    settings={
        "resources": ResourceSettings(cpu_count=2, memory="4GB")
    }
)
def ml_pipeline():
    data = load_data()  # Uses pipeline default: 2 CPUs, 4GB
    
    # Override for specific step
    model = train_model.with_options(
        settings={
            "resources": ResourceSettings(
                cpu_count=8,
                memory="16GB",
                gpu_count=1,
            )
        }
    )(data)
    
    evaluate_model(model)  # Uses pipeline default: 2 CPUs, 4GB

Memory Specification

Memory Units

ZenML supports standard memory units:
from zenml.config import ResourceSettings

# Decimal units (powers of 1000)
ResourceSettings(memory="1KB")   # 1,000 bytes
ResourceSettings(memory="1MB")   # 1,000,000 bytes
ResourceSettings(memory="1GB")   # 1,000,000,000 bytes
ResourceSettings(memory="1TB")   # 1,000,000,000,000 bytes

# Binary units (powers of 1024)
ResourceSettings(memory="1KiB")  # 1,024 bytes
ResourceSettings(memory="1MiB")  # 1,048,576 bytes
ResourceSettings(memory="1GiB")  # 1,073,741,824 bytes
ResourceSettings(memory="1TiB")  # 1,099,511,627,776 bytes

Converting Memory Units

Programmatically convert between units:
from zenml.config import ResourceSettings, ByteUnit

resources = ResourceSettings(memory="8GB")

# Get memory in different units
memory_gb = resources.get_memory(unit=ByteUnit.GB)  # 8.0
memory_mb = resources.get_memory(unit=ByteUnit.MB)  # 8000.0
memory_gib = resources.get_memory(unit=ByteUnit.GIB)  # ~7.45

# Use with orchestrator configuration
if memory_gb and memory_gb > 10:
    print("Large memory requirement detected")

GPU Configuration

Requesting GPUs

from zenml import step
from zenml.config import ResourceSettings

@step(
    settings={
        "resources": ResourceSettings(
            cpu_count=4,
            memory="16GB",
            gpu_count=1,  # Request 1 GPU
        )
    }
)
def train_with_gpu(data: dict) -> dict:
    """Train model using GPU acceleration."""
    import torch
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Training on: {device}")
    
    # GPU-accelerated training
    model = Model().to(device)
    # ... training logic ...
    
    return {"model": model}

Multi-GPU Training

@step(
    settings={
        "resources": ResourceSettings(
            cpu_count=16,
            memory="64GB",
            gpu_count=4,  # Request 4 GPUs
        )
    }
)
def distributed_training(data: dict) -> dict:
    """Distributed training across multiple GPUs."""
    import torch
    import torch.distributed as dist
    
    # Initialize distributed training
    if torch.cuda.device_count() > 1:
        print(f"Using {torch.cuda.device_count()} GPUs")
        model = torch.nn.DataParallel(model)
    
    # ... training logic ...
    
    return {"model": model}

Orchestrator Implementation

Reading Resource Settings

Orchestrators access resource settings from step configurations:
from zenml.orchestrators import BaseOrchestrator
from zenml.config.resource_settings import ResourceSettings

class MyOrchestrator(BaseOrchestrator):
    def submit_pipeline(self, snapshot, stack, base_environment, step_environments, placeholder_run):
        """Submit pipeline with resource configurations."""
        
        for invocation_id, step in snapshot.step_configurations.items():
            # Get resource settings for this step
            resources: ResourceSettings = step.config.resource_settings
            
            # Check if resources are specified
            if resources.empty:
                # Use orchestrator defaults
                cpu_count = 1
                memory_mb = 2048
                gpu_count = 0
            else:
                # Use specified resources
                cpu_count = resources.cpu_count or 1
                
                # Convert memory to orchestrator format
                memory_mb = None
                if resources.memory:
                    memory_gb = resources.get_memory(unit="GB")
                    memory_mb = int(memory_gb * 1024)
                
                gpu_count = resources.gpu_count or 0
            
            # Submit step with resources
            self._submit_step(
                step_name=step.config.name,
                image=self.get_image(snapshot, step.config.name),
                cpu_count=cpu_count,
                memory_mb=memory_mb,
                gpu_count=gpu_count,
            )

Platform-Specific Resource Mapping

Different platforms use different resource specifications:
class KubernetesOrchestrator(ContainerizedOrchestrator):
    def _build_pod_spec(self, step_config):
        """Build Kubernetes pod spec with resource requests/limits."""
        resources = step_config.resource_settings
        
        # Kubernetes format
        k8s_resources = {
            "requests": {},
            "limits": {},
        }
        
        # CPU (in cores)
        if resources.cpu_count:
            k8s_resources["requests"]["cpu"] = str(resources.cpu_count)
            k8s_resources["limits"]["cpu"] = str(resources.cpu_count)
        
        # Memory (in bytes)
        if resources.memory:
            memory_bytes = int(resources.get_memory(unit="GB") * 1024**3)
            k8s_resources["requests"]["memory"] = f"{memory_bytes}"
            k8s_resources["limits"]["memory"] = f"{memory_bytes}"
        
        # GPU (nvidia.com/gpu)
        if resources.gpu_count:
            k8s_resources["limits"]["nvidia.com/gpu"] = str(resources.gpu_count)
        
        return k8s_resources

class SageMakerOrchestrator(ContainerizedOrchestrator):
    def _get_instance_type(self, resources: ResourceSettings) -> str:
        """Map resources to SageMaker instance type."""
        # AWS SageMaker uses instance types
        if resources.gpu_count and resources.gpu_count > 0:
            if resources.gpu_count == 1:
                return "ml.p3.2xlarge"  # 1 GPU, 8 vCPU, 61GB RAM
            elif resources.gpu_count == 4:
                return "ml.p3.8xlarge"  # 4 GPUs, 32 vCPU, 244GB RAM
        
        # CPU instances
        memory_gb = resources.get_memory(unit="GB") if resources.memory else 4
        cpu_count = resources.cpu_count or 2
        
        if cpu_count <= 2 and memory_gb <= 4:
            return "ml.m5.large"  # 2 vCPU, 8GB RAM
        elif cpu_count <= 4 and memory_gb <= 16:
            return "ml.m5.xlarge"  # 4 vCPU, 16GB RAM
        elif cpu_count <= 8 and memory_gb <= 32:
            return "ml.m5.2xlarge"  # 8 vCPU, 32GB RAM
        else:
            return "ml.m5.4xlarge"  # 16 vCPU, 64GB RAM

Deployed Pipeline Resources

For deployed pipelines (as services), additional resource settings control scaling behavior:

Replica Configuration

from zenml.config import ResourceSettings

resources = ResourceSettings(
    # Basic resources
    cpu_count=2,
    memory="4GB",
    
    # Replica settings
    min_replicas=1,   # Minimum instances (0 allows scale-to-zero)
    max_replicas=10,  # Maximum instances (0 means no limit)
)

Autoscaling Configuration

resources = ResourceSettings(
    cpu_count=2,
    memory="4GB",
    
    # Scaling bounds
    min_replicas=2,
    max_replicas=20,
    
    # Autoscaling metric
    autoscaling_metric="cpu",  # Options: "cpu", "memory", "concurrency", "rps"
    autoscaling_target=75.0,   # Target 75% CPU utilization
)

Concurrency Limits

resources = ResourceSettings(
    cpu_count=4,
    memory="8GB",
    
    # Per-instance concurrency limit
    max_concurrency=50,  # Max 50 concurrent requests per instance
    
    # Autoscaling based on concurrency
    autoscaling_metric="concurrency",
    autoscaling_target=40.0,  # Scale when average concurrency > 40
)

Checking Resource Requirements

Orchestrators can check if a step needs special resources:
from zenml.orchestrators import BaseOrchestrator
from zenml.config.step_configurations import Step

class MyOrchestrator(BaseOrchestrator):
    @staticmethod
    def requires_resources_in_orchestration_environment(step: Step) -> bool:
        """Check if step needs special resources.
        
        Args:
            step: The step to check
            
        Returns:
            True if step requires custom resources, False otherwise
        """
        # If using a step operator, resources are handled there
        if step.config.step_operator:
            return False
        
        # Check if resources are specified
        return not step.config.resource_settings.empty
This is useful for orchestrators that need to allocate resources before launching steps.

Best Practices

Start Conservative

Begin with modest resource requests and scale up based on actual usage.

Monitor Usage

Track actual resource consumption to optimize configurations.

GPU Cost Awareness

GPUs are expensive. Only request them for steps that benefit from acceleration.

Memory Buffer

Request slightly more memory than expected to handle variability.

Resource Sizing Guidelines

Data Loading Steps:
ResourceSettings(
    cpu_count=2,
    memory="4GB",
    gpu_count=0,
)
Training Steps (Small Models):
ResourceSettings(
    cpu_count=4,
    memory="8GB",
    gpu_count=0,  # or 1 for GPU acceleration
)
Training Steps (Large Models):
ResourceSettings(
    cpu_count=8,
    memory="32GB",
    gpu_count=1,  # or more for distributed training
)
Inference Steps:
ResourceSettings(
    cpu_count=2,
    memory="4GB",
    gpu_count=0,  # CPU inference is often sufficient
)

Validation

ZenML validates resource settings:
from zenml.config import ResourceSettings
from pydantic import ValidationError

# Valid configuration
resources = ResourceSettings(
    min_replicas=2,
    max_replicas=10,
)

# Invalid: min > max
try:
    resources = ResourceSettings(
        min_replicas=10,
        max_replicas=2,
    )
except ValidationError as e:
    print("Validation error:", e)
    # Error: min_replicas (10) cannot be greater than max_replicas (2)

Next Steps

Custom Orchestrators

Build orchestrators that handle resource configurations

Containerization

Package steps with their dependencies

Dynamic Pipelines

Adapt resource requirements at runtime

Custom Materializers

Handle data efficiently to optimize memory usage

Build docs developers (and LLMs) love