Skip to main content
The @resources decorator specifies compute requirements for workflow steps independently of the execution platform. This makes your flows portable across AWS Batch, Kubernetes, and other compute backends.

Overview

Use @resources to declare what your step needs:
from metaflow import FlowSpec, step, resources

class ResourceFlow(FlowSpec):
    @resources(cpu=4, memory=16000, gpu=1)
    @step
    def train(self):
        # This step needs 4 CPUs, 16GB RAM, and 1 GPU
        model = train_model()
        self.next(self.end)
    
    @step
    def end(self):
        pass

Why Use @resources?

Platform Independence

Specify requirements once, run anywhere:
@resources(cpu=8, memory=32000, gpu=2)
@step
def train(self):
    # Same resource spec works on both platforms
    pass
Run on different platforms:
# Run on AWS Batch
python myflow.py run --with batch

# Run on Kubernetes
python myflow.py run --with kubernetes

# Resources automatically applied to whichever platform you choose

Separation of Concerns

Keep resource requirements separate from platform-specific details:
# Resources: What you need
@resources(cpu=4, memory=16000)

# Platform: Where to run
@batch  # or @kubernetes

@step
def process(self):
    pass

Parameters

The @resources decorator accepts these parameters:
@resources(
    cpu=4,              # Number of CPUs (int, default: 1)
    memory=16000,       # Memory in MB (int, default: 4096)
    gpu=1,              # Number of GPUs (int, default: None)
    disk=50000,         # Disk size in MB (int, Kubernetes only)
    shared_memory=1024, # Shared memory in MiB (int, default: None)
)
@step
def compute(self):
    pass

CPU

Number of CPU cores:
@resources(cpu=1)
@step
def light_task(self):
    # Single CPU for light computation
    pass

@resources(cpu=8)
@step
def parallel_task(self):
    # 8 CPUs for parallel processing
    import multiprocessing
    with multiprocessing.Pool(8) as pool:
        results = pool.map(expensive_func, data)
Specify CPU as an integer. Fractional CPUs (like 0.5) are not supported in @resources.

Memory

Memory in megabytes:
@resources(memory=4096)  # 4GB
@step
def small_data(self):
    pass

@resources(memory=32000)  # 32GB
@step
def large_data(self):
    import pandas as pd
    df = pd.read_csv('large_file.csv')  # Needs lots of memory
Common memory sizes:
  • 4096 = 4GB (default)
  • 8192 = 8GB
  • 16384 = 16GB
  • 32768 = 32GB
  • 65536 = 64GB
Under-allocating memory causes OOM (Out Of Memory) errors. Over-allocating wastes money. Monitor actual usage and adjust.

GPU

Number of GPUs:
@resources(gpu=1, memory=32000)
@step
def train_gpu(self):
    import torch
    device = torch.device('cuda')
    model = MyModel().to(device)
    train(model)

@resources(gpu=4, memory=64000)
@step
def multi_gpu(self):
    # Use 4 GPUs with DataParallel or DistributedDataParallel
    import torch
    model = torch.nn.DataParallel(model, device_ids=[0, 1, 2, 3])
GPU availability depends on your compute environment. Ensure your AWS Batch queue or Kubernetes cluster has GPU nodes.

Disk

Disk space in megabytes (Kubernetes only):
@resources(disk=100000)  # 100GB
@step
def process_data(self):
    # Store intermediate results on disk
    large_dataset.to_parquet('/tmp/intermediate.parquet')
Common disk sizes:
  • 10240 = 10GB (default)
  • 51200 = 50GB
  • 102400 = 100GB
  • 512000 = 500GB
The disk parameter only works with Kubernetes. AWS Batch has limited disk control (uses instance storage).

Shared Memory

Shared memory for inter-process communication:
@resources(shared_memory=8192)  # 8GB /dev/shm
@step
def ipc_task(self):
    # Use shared memory for IPC
    from multiprocessing import shared_memory
    shm = shared_memory.SharedMemory(create=True, size=1000000)
Useful for:
  • PyTorch DataLoader with multiple workers
  • Multiprocessing with shared data
  • Deep learning frameworks that use /dev/shm

Combining with Platform Decorators

Resources + Batch

from metaflow import FlowSpec, step, batch, resources

class BatchResourceFlow(FlowSpec):
    @batch
    @resources(cpu=8, memory=32000, gpu=1)
    @step
    def train(self):
        # Runs on AWS Batch with specified resources
        pass
Metaflow automatically:
  1. Reads @resources requirements
  2. Configures AWS Batch job with those resources
  3. Launches appropriate EC2 instance type

Resources + Kubernetes

from metaflow import FlowSpec, step, kubernetes, resources

class K8sResourceFlow(FlowSpec):
    @kubernetes
    @resources(cpu=4, memory=16000, disk=50000)
    @step
    def process(self):
        # Runs on Kubernetes with specified resources
        pass
Metaflow automatically:
  1. Reads @resources requirements
  2. Sets pod resource requests and limits
  3. Kubernetes scheduler finds appropriate node

Override Behavior

If both @resources and platform decorator specify the same resource, the maximum is used:
@batch(cpu=4, memory=8000)      # Batch says: 4 CPU, 8GB
@resources(cpu=2, memory=16000)  # Resources says: 2 CPU, 16GB
@step
def train(self):
    # Actual: 4 CPU (max of 4,2), 16GB (max of 8000,16000)
    pass
This ensures your step gets sufficient resources from either decorator.

Resource Estimation

Start Conservative

Begin with small allocations and scale up:
# First attempt
@resources(cpu=1, memory=4096)
@step
def process(self):
    process_data()
If it fails with OOM:
# Increase memory
@resources(cpu=1, memory=8192)
@step
def process(self):
    process_data()

Monitor Usage

Check actual resource usage:
import psutil

@resources(cpu=4, memory=16000)
@step
def monitor_usage(self):
    # Log memory usage
    process = psutil.Process()
    memory_mb = process.memory_info().rss / 1024 / 1024
    print(f"Memory usage: {memory_mb:.2f} MB")
    
    # Log CPU usage
    cpu_percent = psutil.cpu_percent(interval=1)
    print(f"CPU usage: {cpu_percent}%")
    
    # Your actual work
    result = expensive_computation()
Adjust allocations based on logs:
  • If using 50% of allocated memory → reduce allocation
  • If near 100% → increase allocation
  • If OOM errors → significantly increase

Rule of Thumb

Common workload estimates:
# Light data processing
@resources(cpu=2, memory=8000)

# Medium ML training
@resources(cpu=4, memory=16000)

# Large dataset processing
@resources(cpu=8, memory=32000)

# Deep learning with GPU
@resources(cpu=8, memory=32000, gpu=1)

# Multi-GPU training
@resources(cpu=16, memory=64000, gpu=4)

Platform-Specific Behavior

AWS Batch

How @resources maps to AWS Batch:
ParameterAWS Batch
cpuvCPU count
memoryMemory in MB
gpuGPU count (requires GPU instance family)
diskNot directly controlled (uses instance storage)
@batch
@resources(cpu=4, memory=16000)
@step
def process(self):
    # AWS Batch selects instance type with ≥4 vCPUs and ≥16GB RAM
    # e.g., c5.xlarge (4 vCPU, 8GB) won't be selected
    # e.g., c5.2xlarge (8 vCPU, 16GB) will be selected
    pass

Kubernetes

How @resources maps to Kubernetes:
ParameterKubernetes
cpuCPU request and limit
memoryMemory request and limit (in MB)
gpuGPU resource request
diskEphemeral storage request
@kubernetes
@resources(cpu=4, memory=16000, disk=50000)
@step
def process(self):
    # Creates pod with:
    # resources:
    #   requests:
    #     cpu: "4"
    #     memory: "16000Mi"
    #     ephemeral-storage: "50000Mi"
    #   limits:
    #     cpu: "4"
    #     memory: "16000Mi"
    #     ephemeral-storage: "50000Mi"
    pass

Best Practices

Always prefer @resources over platform-specific parameters:
# Good: Portable across platforms
@batch
@resources(cpu=4, memory=16000)
@step
def train(self):
    pass

# Less good: Locked to AWS Batch
@batch(cpu=4, memory=16000)
@step
def train(self):
    pass
Don’t guess resource needs. Profile first:
  1. Run locally or with minimal resources
  2. Monitor actual usage (CPU, memory, disk)
  3. Scale up based on measurements
  4. Add 20-30% buffer for safety
More resources = higher cost. Balance performance and cost:
# Fast but expensive
@resources(cpu=32, memory=128000)

# Slower but cheaper
@resources(cpu=4, memory=16000)
Sometimes 2x resources doesn’t mean 2x speed.
GPUs need sufficient CPU memory for data loading:
# Insufficient CPU memory for GPU training
@resources(gpu=1, memory=4096)  # Too little!

# Better: GPU training needs CPU memory for data
@resources(gpu=1, memory=32000)  # Much better

Common Patterns

Parameterized Resources

Make resources configurable:
from metaflow import FlowSpec, step, Parameter, resources

class ConfigurableFlow(FlowSpec):
    num_cpus = Parameter('cpus', default=4)
    memory_gb = Parameter('memory', default=16)
    
    @step
    def start(self):
        self.next(self.train)
    
    @resources(cpu=self.num_cpus, memory=self.memory_gb * 1000)
    @step
    def train(self):
        print(f"Using {self.num_cpus} CPUs and {self.memory_gb}GB RAM")
        self.next(self.end)
    
    @step
    def end(self):
        pass
Run with custom resources:
python myflow.py run --cpus 8 --memory 32

Progressive Resource Scaling

Start small, scale up on retry:
from metaflow import FlowSpec, step, resources, retry, current

class ScalingFlow(FlowSpec):
    @retry(times=3)
    @step
    def start(self):
        # Attempt 0: 4GB
        # Attempt 1: 8GB
        # Attempt 2: 16GB
        memory = 4096 * (2 ** current.retry_count)
        print(f"Attempt {current.retry_count} with {memory}MB")
        
        # This won't work - resources must be static
        # Use platform decorator for dynamic allocation instead
        self.next(self.end)
    
    @step
    def end(self):
        pass
@resources parameters must be static. Use platform decorators directly if you need dynamic resource allocation based on runtime conditions.

Environment-Specific Resources

Adjust resources for different environments:
import os

class EnvFlow(FlowSpec):
    @step
    def start(self):
        self.next(self.train)
    
    @resources(
        cpu=16 if os.getenv('ENV') == 'prod' else 4,
        memory=64000 if os.getenv('ENV') == 'prod' else 16000,
    )
    @step
    def train(self):
        # Production: 16 CPU, 64GB
        # Development: 4 CPU, 16GB
        pass

Troubleshooting

Symptoms: Process killed with exit code 137 or OOMKilledSolutions:
  1. Increase memory parameter
  2. Process data in smaller chunks
  3. Use memory-efficient algorithms
  4. Clear intermediate variables
  5. Use generators instead of loading all data
# Before: OOM
@resources(memory=8000)
@step
def process(self):
    data = load_all_data()  # 20GB dataset!
    result = process(data)

# After: Fixed
@resources(memory=8000)
@step
def process(self):
    result = []
    for chunk in load_data_chunks():  # Process 1GB at a time
        result.append(process(chunk))
Symptoms: Step runs but very slowlySolutions:
  1. Profile to find bottlenecks
  2. Increase cpu if CPU-bound
  3. Parallelize with multiprocessing
  4. Consider algorithm optimization first
# Slow
@resources(cpu=1)
@step
def process(self):
    results = [expensive_func(x) for x in large_list]

# Faster
@resources(cpu=8)
@step
def process(self):
    from multiprocessing import Pool
    with Pool(8) as pool:
        results = pool.map(expensive_func, large_list)
Symptoms: CUDA not available despite gpu=1Solutions:
  1. Verify compute environment has GPUs
  2. Check Docker image has CUDA
  3. Ensure GPU drivers are installed
  4. Test with simple GPU check:
@resources(gpu=1, memory=16000)
@step
def check_gpu(self):
    import torch
    print(f"CUDA available: {torch.cuda.is_available()}")
    print(f"GPU count: {torch.cuda.device_count()}")
    if torch.cuda.is_available():
        print(f"GPU: {torch.cuda.get_device_name(0)}")
Symptoms: No space left on deviceSolutions:
  1. Increase disk parameter (Kubernetes)
  2. Clean up intermediate files
  3. Stream data instead of storing locally
  4. Use cloud storage (S3, etc.) for large files
# Before: Runs out of disk
@resources(disk=10000)  # 10GB
@step
def process(self):
    df = pd.read_csv('s3://bucket/huge.csv')  # 50GB!
    df.to_parquet('/tmp/processed.parquet')

# After: Fixed
@resources(disk=100000)  # 100GB
@step
def process(self):
    # Or stream without storing locally
    import s3fs
    fs = s3fs.S3FileSystem()
    with fs.open('s3://bucket/huge.csv') as f:
        for chunk in pd.read_csv(f, chunksize=10000):
            process_chunk(chunk)

Next Steps

AWS Batch

Learn how @resources works with AWS Batch

Kubernetes

Understand Kubernetes resource mapping

Distributed Computing

Scale resources across multiple nodes

Remote Execution

Combine resources with remote compute

Build docs developers (and LLMs) love