Skip to main content
Metaflow provides comprehensive support for AWS, offering deep integration with AWS Batch for compute, S3 for storage, Step Functions for orchestration, and Secrets Manager for credential management.

Overview

AWS is Metaflow’s most mature cloud platform, originally developed at Netflix to run thousands of production workflows. The AWS integration includes:
  • AWS Batch: Elastic compute with support for CPUs, GPUs, and multi-node workloads
  • Amazon S3: Scalable object storage for artifacts and data
  • AWS Step Functions: Production workflow orchestration
  • AWS Secrets Manager: Secure credential and secret management
  • Amazon ECR: Container registry for custom Docker images

Setup

Prerequisites

  • AWS account with appropriate permissions
  • AWS CLI configured with credentials
  • Metaflow installed: pip install metaflow

Configuration

Use the Metaflow configure command to set up AWS:
metaflow configure aws
This interactive wizard will prompt you for:
1

S3 Bucket

Specify an S3 bucket for artifact storage:
METAFLOW_DATASTORE_SYSROOT_S3: s3://my-metaflow-bucket/metaflow
2

Batch Configuration

Configure AWS Batch job queue and IAM roles:
METAFLOW_BATCH_JOB_QUEUE: my-batch-queue
METAFLOW_ECS_S3_ACCESS_IAM_ROLE: arn:aws:iam::123456789:role/metaflow-role
3

Step Functions

Optional: Configure Step Functions for production deployments:
METAFLOW_SFN_IAM_ROLE: arn:aws:iam::123456789:role/metaflow-sfn-role
METAFLOW_SFN_STATE_MACHINE_PREFIX: MyProject

Manual Configuration

Alternatively, set environment variables directly:
# Set S3 as default datastore
export METAFLOW_DEFAULT_DATASTORE=s3
export METAFLOW_DATASTORE_SYSROOT_S3=s3://my-bucket/metaflow

# Configure AWS Batch
export METAFLOW_BATCH_JOB_QUEUE=my-batch-queue
export METAFLOW_ECS_S3_ACCESS_IAM_ROLE=arn:aws:iam::123456789:role/metaflow-role

# Optional: Configure container registry
export METAFLOW_BATCH_CONTAINER_REGISTRY=123456789.dkr.ecr.us-east-1.amazonaws.com
export METAFLOW_BATCH_CONTAINER_IMAGE=my-image:latest

AWS Batch Integration

The @batch decorator runs steps on AWS Batch, providing elastic compute with automatic scaling.

Basic Usage

from metaflow import FlowSpec, step, batch

class MyFlow(FlowSpec):
    @batch
    @step
    def compute_step(self):
        # Runs on AWS Batch with default resources
        self.result = expensive_computation()
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Result: {self.result}")

Resource Configuration

Specify CPU, memory, and GPU requirements:
@batch(cpu=8, memory=32000, gpu=1)
@step
def gpu_training(self):
    # 8 CPUs, 32GB memory, 1 GPU
    model = train_model()
    self.model = model
    self.next(self.end)

Parameters Reference

cpu
int
default:"1"
Number of CPUs required (1-96)
memory
int
default:"4096"
Memory in MB (128-245760)
gpu
int
default:"0"
Number of GPUs (0-8)
image
str
Custom Docker image (e.g., my-image:latest)
queue
str
AWS Batch job queue (defaults to METAFLOW_BATCH_JOB_QUEUE)
iam_role
str
IAM role ARN for the container
execution_role
str
Execution role ARN for Fargate

Advanced Features

Custom Docker Images

@batch(image="123456789.dkr.ecr.us-east-1.amazonaws.com/my-model:v1.0")
@step
def custom_environment(self):
    # Uses custom container with specific dependencies
    import special_library
    self.result = special_library.process()
    self.next(self.end)

Inferentia and Trainium Support

@batch(inferentia=1)
@step
def inference_step(self):
    # Uses AWS Inferentia chip for ML inference
    predictions = model.predict(data)
    self.next(self.end)

@batch(trainium=2)
@step
def training_step(self):
    # Uses AWS Trainium chips for training
    model = train_large_model()
    self.next(self.end)

Shared Memory and Swap

@batch(
    memory=16000,
    shared_memory=8000,  # 8GB shared memory
    max_swap=4000,       # 4GB swap
    swappiness=60        # Swappiness value
)
@step
def memory_intensive(self):
    # Configure memory settings for the container
    large_array = process_big_data()
    self.next(self.end)

Tmpfs Support

@batch(
    use_tmpfs=True,
    tmpfs_size=10000,      # 10GB tmpfs
    tmpfs_path="/tmp/fast",
    tmpfs_tempdir=True     # Set METAFLOW_TEMPDIR to tmpfs_path
)
@step
def fast_io(self):
    # Uses tmpfs for fast I/O operations
    with open("/tmp/fast/data.bin", "wb") as f:
        f.write(generate_data())
    self.next(self.end)

AWS Batch Tags

@batch(
    aws_batch_tags={
        "project": "ml-training",
        "team": "data-science",
        "cost-center": "research"
    }
)
@step
def tagged_step(self):
    # Tags help with cost tracking and organization
    self.result = train_model()
    self.next(self.end)

Multi-Node Parallel Computing

from metaflow import FlowSpec, step, batch, parallel

class DistributedFlow(FlowSpec):
    @batch(cpu=8, memory=16000)
    @parallel(num_parallel=4)
    @step
    def distributed_training(self):
        # Runs on 4 nodes with gang scheduling
        from metaflow import current
        rank = current.parallel.node_index
        num_nodes = current.parallel.num_nodes
        
        # Distributed training code
        model = train_distributed(rank, num_nodes)
        self.next(self.end)
    
    @step
    def end(self):
        print("Training complete")

S3 Storage

Metaflow automatically stores all artifacts in S3 when configured.

Configuration

# Basic S3 configuration
export METAFLOW_DATASTORE_SYSROOT_S3=s3://my-bucket/metaflow

# Server-side encryption
export METAFLOW_S3_SERVER_SIDE_ENCRYPTION=AES256
# or for KMS:
export METAFLOW_S3_SERVER_SIDE_ENCRYPTION=aws:kms

# Custom endpoint (e.g., for MinIO)
export METAFLOW_S3_ENDPOINT_URL=https://minio.example.com

# Performance tuning
export METAFLOW_S3_RETRY_COUNT=10
export METAFLOW_S3_WORKER_COUNT=64

Accessing Data

Metaflow provides the S3 datatools library for efficient data access:
from metaflow import FlowSpec, step, S3

class DataFlow(FlowSpec):
    @step
    def start(self):
        # Upload data to S3
        with S3(s3root="s3://my-bucket/data") as s3:
            s3.put_files([("local_file.csv", "remote_file.csv")])
        self.next(self.process)
    
    @step
    def process(self):
        # Download data from S3
        with S3(s3root="s3://my-bucket/data") as s3:
            files = s3.get_files(["remote_file.csv"])
            with open(files[0].path, "r") as f:
                data = f.read()
        self.next(self.end)
    
    @step
    def end(self):
        print("Processing complete")

AWS Step Functions

Deploy production workflows to AWS Step Functions for reliable orchestration.

Deploying to Step Functions

# Deploy flow to Step Functions
python myflow.py step-functions create

# Trigger execution
python myflow.py step-functions trigger

# List executions
python myflow.py step-functions list-runs

Scheduling with EventBridge

Use the @schedule decorator for periodic execution:
from metaflow import FlowSpec, step, schedule

class ScheduledFlow(FlowSpec):
    @schedule(cron="0 0 * * *")  # Daily at midnight
    @step
    def start(self):
        self.data = fetch_daily_data()
        self.next(self.process)
    
    @step
    def process(self):
        self.result = process(self.data)
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Result: {self.result}")

Event Triggering

Trigger flows from S3 events:
from metaflow import FlowSpec, step, trigger

@trigger(event="s3")
class EventFlow(FlowSpec):
    @step
    def start(self):
        # Triggered when objects are uploaded to S3
        from metaflow import current
        bucket = current.trigger.s3.bucket
        key = current.trigger.s3.key
        
        self.process_file(bucket, key)
        self.next(self.end)
    
    @step
    def end(self):
        print("Processing complete")

AWS Secrets Manager

Securely manage credentials and secrets using AWS Secrets Manager.

Configuration

# Set default region for Secrets Manager
export METAFLOW_AWS_SECRETS_MANAGER_DEFAULT_REGION=us-east-1

# Optional: Default IAM role for secret access
export METAFLOW_AWS_SECRETS_MANAGER_DEFAULT_ROLE=arn:aws:iam::123456789:role/secrets-role

Using Secrets

from metaflow import FlowSpec, step, secrets

class SecureFlow(FlowSpec):
    @secrets(
        sources=["aws-secrets-manager"],
        secrets=["my-api-key", "database-credentials"]
    )
    @step
    def start(self):
        import os
        # Secrets automatically injected as environment variables
        api_key = os.environ["MY_API_KEY"]
        db_user = os.environ["DATABASE_CREDENTIALS_USERNAME"]
        db_pass = os.environ["DATABASE_CREDENTIALS_PASSWORD"]
        
        # Use credentials securely
        self.data = fetch_data(api_key, db_user, db_pass)
        self.next(self.end)
    
    @step
    def end(self):
        print(f"Fetched {len(self.data)} records")

Secret Formats

JSON Secrets

# Secret stored as JSON in AWS Secrets Manager:
# {
#   "username": "admin",
#   "password": "secret123",
#   "host": "db.example.com"
# }

@secrets(sources=["aws-secrets-manager"], secrets=["database-config"])
@step
def connect(self):
    import os
    # Each JSON key becomes an environment variable
    username = os.environ["DATABASE_CONFIG_USERNAME"]
    password = os.environ["DATABASE_CONFIG_PASSWORD"]
    host = os.environ["DATABASE_CONFIG_HOST"]

Plain Text Secrets

# For non-JSON secrets, specify a custom env var name
@secrets(
    sources=["aws-secrets-manager"],
    secrets=[("api-token", {"env_var_name": "API_TOKEN", "json": False})]
)
@step
def api_call(self):
    import os
    token = os.environ["API_TOKEN"]
    response = call_api(token)

Binary Secrets

# Binary secrets are base64-encoded
@secrets(sources=["aws-secrets-manager"], secrets=["certificate-key"])
@step
def use_cert(self):
    import os
    import base64
    cert_bytes = base64.b64decode(os.environ["CERTIFICATE_KEY"])

IAM Permissions

Required IAM permissions for Metaflow:

S3 Access

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::my-metaflow-bucket/*",
        "arn:aws:s3:::my-metaflow-bucket"
      ]
    }
  ]
}

AWS Batch Access

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "batch:SubmitJob",
        "batch:DescribeJobs",
        "batch:TerminateJob",
        "batch:ListJobs"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "ecs:DescribeTasks",
        "logs:GetLogEvents"
      ],
      "Resource": "*"
    }
  ]
}

Step Functions Access

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "states:CreateStateMachine",
        "states:UpdateStateMachine",
        "states:StartExecution",
        "states:DescribeExecution",
        "states:ListExecutions"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": "iam:PassRole",
      "Resource": "arn:aws:iam::123456789:role/metaflow-sfn-role"
    }
  ]
}

Secrets Manager Access

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetSecretValue",
        "secretsmanager:DescribeSecret"
      ],
      "Resource": "arn:aws:secretsmanager:us-east-1:123456789:secret:*"
    }
  ]
}

Best Practices

  • Use spot instances for fault-tolerant workloads
  • Right-size Batch compute resources
  • Implement S3 lifecycle policies for old artifacts
  • Use S3 Intelligent-Tiering for data storage
  • Monitor costs with AWS Cost Explorer
  • Use Fargate Spot for cost-effective compute
  • Enable S3 Transfer Acceleration for large files
  • Use VPC endpoints for private S3 access
  • Optimize Docker images with multi-stage builds
  • Use tmpfs for temporary high-IOPS storage
  • Use IAM roles instead of access keys
  • Enable S3 bucket encryption (SSE-S3 or SSE-KMS)
  • Use AWS Secrets Manager for credentials
  • Enable VPC for Batch compute environments
  • Implement least-privilege IAM policies
  • Use Step Functions for production workflows
  • Configure appropriate retry policies
  • Monitor Step Functions execution metrics
  • Set up CloudWatch alarms for failures
  • Use DynamoDB for foreach coordination

Troubleshooting

Common Issues

Problem: Jobs stuck in RUNNABLE stateSolutions:
  • Check Batch compute environment capacity
  • Verify IAM role permissions
  • Review VPC and subnet configuration
  • Check service quotas and limits
Problem: Cannot read/write S3 objectsSolutions:
  • Verify IAM role has S3 permissions
  • Check S3 bucket policy
  • Ensure bucket and role are in same account
  • Verify bucket name is correct
Problem: Custom image not foundSolutions:
  • Verify ECR repository exists
  • Check image tag is correct
  • Ensure IAM role can pull from ECR
  • Use full image URI with registry

Next Steps

AWS Batch Deep Dive

Learn advanced Batch features and optimization

Step Functions Guide

Deploy production workflows to Step Functions

S3 Datatools

Master efficient data access with S3

Secrets Management

Secure credential management patterns

Build docs developers (and LLMs) love