Skip to main content
The @batch decorator specifies that a step should execute on AWS Batch.

Basic Usage

from metaflow import FlowSpec, step, batch

class MyFlow(FlowSpec):
    @batch(cpu=4, memory=16384)
    @step
    def process(self):
        # This step runs on AWS Batch
        pass

if __name__ == '__main__':
    MyFlow()

Description

The @batch decorator allows steps to run on AWS Batch, which provides managed compute resources for large-scale batch processing. AWS Batch automatically provisions the right amount of compute resources based on your requirements.

Prerequisites

  • AWS credentials configured
  • S3 datastore configured (--datastore=s3)
  • AWS Batch job queue and compute environment set up

Parameters

cpu
int
default:"1"
Number of CPUs required for this step. If @resources is also present, the maximum value from all decorators is used.
memory
int
default:"4096"
Memory size (in MB) required for this step. If @resources is also present, the maximum value from all decorators is used.
gpu
int
default:"0"
Number of GPUs required for this step. If @resources is also present, the maximum value from all decorators is used.
image
str
default:"None"
Docker image to use when launching on AWS Batch. If not specified and METAFLOW_BATCH_CONTAINER_IMAGE is set, that image is used. Otherwise, defaults to a Python image matching your Python version.
queue
str
default:"METAFLOW_BATCH_JOB_QUEUE"
AWS Batch Job Queue to submit the job to.
iam_role
str
default:"METAFLOW_ECS_S3_ACCESS_IAM_ROLE"
AWS IAM role that AWS Batch container uses to access AWS cloud resources.
execution_role
str
default:"METAFLOW_ECS_FARGATE_EXECUTION_ROLE"
AWS IAM role that AWS Batch uses to trigger AWS Fargate tasks.
shared_memory
int
default:"None"
The value for the size (in MiB) of the /dev/shm volume for this step. This parameter maps to the --shm-size option in Docker.
max_swap
int
default:"None"
The total amount of swap memory (in MiB) a container can use for this step. This parameter is translated to the --memory-swap option in Docker where the value is the sum of the container memory plus the max_swap value.
swappiness
int
default:"None"
Tune memory swappiness behavior for this step. A value of 0 causes swapping not to happen unless absolutely necessary. A value of 100 causes pages to be swapped very aggressively. Accepted values are whole numbers between 0 and 100.
inferentia
int
default:"0"
Number of AWS Inferentia chips required for this step.
trainium
int
default:"None"
Number of AWS Trainium chips required for this step. Alias for inferentia - use only one.
efa
int
default:"0"
Number of elastic fabric adapter network devices to attach to container.
use_tmpfs
bool
default:"False"
Enable an explicit tmpfs mount for this step. Note that tmpfs is not available on Fargate compute environments.
tmpfs_tempdir
bool
default:"True"
Sets METAFLOW_TEMPDIR to tmpfs_path if enabled.
tmpfs_size
int
default:"None"
The value for the size (in MiB) of the tmpfs mount for this step. Defaults to 50% of the memory allocated for this step.
tmpfs_path
str
default:"/metaflow_temp"
Path to tmpfs mount for this step.
ephemeral_storage
int
default:"None"
The total amount, in GiB, of ephemeral storage to set for the task (21-200 GiB). Only relevant for Fargate compute environments.
aws_batch_tags
Dict[str, str]
default:"None"
Sets arbitrary AWS tags on the AWS Batch compute environment. Specified as string key-value pairs.
log_driver
str
default:"None"
The log driver to use for the Amazon ECS container.
log_options
List[str]
default:"None"
List of strings containing options for the chosen log driver. Example: ["awslogs-group:aws/batch/job"]
privileged
bool
default:"False"
Control whether the task can run as a privileged process on AWS Batch.

Examples

Basic Batch Execution

@batch(cpu=4, memory=16384)
@step
def process_data(self):
    # Process large dataset on AWS Batch
    pass

GPU-Accelerated Training

@batch(cpu=8, memory=32768, gpu=2)
@step
def train_model(self):
    import torch
    # Train model using 2 GPUs
    pass

Custom Docker Image

@batch(
    image='my-registry.com/ml-image:latest',
    cpu=4,
    memory=16384
)
@step
def custom_environment(self):
    # Runs with custom dependencies
    pass

Using AWS Inferentia

@batch(cpu=4, memory=16384, inferentia=1)
@step
def inference(self):
    # Use AWS Inferentia for ML inference
    pass

With Shared Memory

@batch(
    cpu=8,
    memory=32768,
    shared_memory=16384
)
@step
def multiprocess_task(self):
    # Use shared memory for IPC
    pass

Adding AWS Tags

@batch(
    cpu=4,
    memory=8192,
    aws_batch_tags={
        'Project': 'DataPipeline',
        'Environment': 'Production'
    }
)
@step
def tagged_job(self):
    pass

Runtime Override

Override batch parameters at runtime:
python flow.py run --with batch:cpu=8,memory=32768,gpu=1

Environment Variables

When running on AWS Batch, these environment variables are available:
  • AWS_BATCH_JOB_ID - The job ID
  • AWS_BATCH_JOB_ATTEMPT - The attempt number
  • AWS_BATCH_CE_NAME - Compute environment name
  • AWS_BATCH_JQ_NAME - Job queue name

Best Practices

  1. Right-size resources: Monitor actual usage and adjust CPU/memory accordingly
  2. Use spot instances: Configure your AWS Batch compute environment to use spot instances for cost savings
  3. Custom images: Build custom Docker images with your dependencies pre-installed
  4. Timeout decorators: Always use @timeout with @batch to prevent hung jobs
  5. Retry logic: Combine with @retry to handle transient failures

Common Patterns

Batch with Error Handling

@retry(times=3)
@timeout(hours=2)
@batch(cpu=4, memory=16384)
@step
def robust_processing(self):
    # Retries on failure, times out after 2 hours
    pass

Conditional Batch Execution

from metaflow import current

@step
def decide(self):
    if self.large_dataset:
        self.next(self.batch_process)
    else:
        self.next(self.local_process)

@batch(cpu=8, memory=32768)
@step
def batch_process(self):
    # Heavy processing on Batch
    self.next(self.join)

@step
def local_process(self):
    # Light processing locally
    self.next(self.join)

See Also

Build docs developers (and LLMs) love