Skip to main content
The AWS SageMaker integration provides orchestration and step execution capabilities using Amazon SageMaker Pipelines, Training Jobs, and Processing Jobs.
This page covers SageMaker-specific details. For general AWS setup, see the AWS Integration page.

Installation

pip install "zenml[aws]"
This installs:
  • sagemaker>=2.237.3,<3.0.0 - SageMaker SDK
  • kubernetes - Kubernetes Python client
  • aws-profile-manager - AWS profile management

Components

SageMaker Orchestrator

Execute complete pipelines as SageMaker Pipelines

SageMaker Step Operator

Run individual steps as SageMaker Training/Processing jobs

SageMaker Orchestrator

Runs your complete pipeline as a SageMaker Pipeline with Processing or Training steps.

Configuration

zenml orchestrator register sagemaker-orch \
    --flavor=sagemaker \
    --execution_role=arn:aws:iam::123456789012:role/SageMakerExecutionRole \
    --region=us-west-2 \
    --bucket=my-sagemaker-bucket
Required:
  • execution_role - IAM role ARN with SageMaker permissions
Optional:
  • region - AWS region (default: from AWS config)
  • bucket - S3 bucket for artifacts (default: sagemaker-{region}-{account-id})
  • scheduler_role - IAM role ARN for scheduled pipelines
  • aws_access_key_id - AWS access key
  • aws_secret_access_key - AWS secret key
  • aws_profile - AWS profile name
  • aws_auth_role_arn - Intermediate role to assume

Step Settings

Customize individual steps with SagemakerOrchestratorSettings:
from zenml import step, pipeline
from zenml.integrations.aws.flavors.sagemaker_orchestrator_flavor import (
    SagemakerOrchestratorSettings,
)

@step(
    settings={
        "orchestrator": SagemakerOrchestratorSettings(
            # Instance configuration
            instance_type="ml.p3.2xlarge",  # GPU instance
            volume_size_in_gb=100,
            max_runtime_in_seconds=7200,  # 2 hours
            
            # Step behavior
            use_training_step=True,  # Use TrainingStep instead of ProcessingStep
            keep_alive_period_in_seconds=300,  # Keep instance warm for 5 min
            
            # Environment
            environment={
                "CUDA_VISIBLE_DEVICES": "0",
                "OMP_NUM_THREADS": "4",
            },
            
            # Tags
            tags={
                "team": "ml-ops",
                "project": "recommendation",
                "cost-center": "engineering",
            },
            
            # Data configuration
            input_data_s3_mode="File",  # or "Pipe"
            input_data_s3_uri="s3://my-bucket/training-data",
            output_data_s3_mode="EndOfJob",  # or "Continuous"
            output_data_s3_uri="s3://my-bucket/outputs",
            
            # Advanced: Direct Processor/Estimator args
            processor_args={
                "instance_count": 2,
                "base_job_name": "custom-processing",
            },
            estimator_args={
                "train_instance_count": 2,
                "train_max_run": 3600,
            },
        )
    }
)
def train_model(data: pd.DataFrame) -> Model:
    # Training code
    ...
Available Settings:
SettingTypeDefaultDescription
instance_typestrml.m5.xlarge / ml.t3.mediumEC2 instance type
volume_size_in_gbint30EBS volume size
max_runtime_in_secondsint86400Max execution time
execution_rolestr-Override orchestrator role
environmentdictEnvironment variables
tagsdictAWS tags for the job
synchronousboolTrueWait for completion
use_training_stepboolTrueUse TrainingStep vs ProcessingStep
keep_alive_period_in_secondsint300Keep instance warm
input_data_s3_modestr”File”Input data mode
input_data_s3_uristr/dictNoneS3 input data location
output_data_s3_modestr”EndOfJob”Output data mode
output_data_s3_uristr/dictNoneS3 output location
processor_argsdictSageMaker Processor arguments
estimator_argsdictSageMaker Estimator arguments

ProcessingStep vs TrainingStep

ProcessingStep (default for processing):
  • For data transformation and preprocessing
  • No distributed training support
  • Lower cost for non-ML workloads
  • Default instance: ml.t3.medium
TrainingStep (default for training):
  • Optimized for ML training
  • Supports distributed training
  • Managed spot training support
  • Keep-alive for faster retries
  • Default instance: ml.m5.xlarge
# Force TrainingStep for a preprocessing step
SagemakerOrchestratorSettings(
    use_training_step=True,
    keep_alive_period_in_seconds=600,  # Keep warm for 10 min
)

# Force ProcessingStep for training (not recommended)
SagemakerOrchestratorSettings(
    use_training_step=False,
)

Instance Types

Compute-Optimized:
  • ml.c5.xlarge - 4 vCPU, 8 GB RAM
  • ml.c5.2xlarge - 8 vCPU, 16 GB RAM
  • ml.c5.4xlarge - 16 vCPU, 32 GB RAM
Memory-Optimized:
  • ml.r5.xlarge - 4 vCPU, 32 GB RAM
  • ml.r5.2xlarge - 8 vCPU, 64 GB RAM
  • ml.r5.4xlarge - 16 vCPU, 128 GB RAM
GPU Instances:
  • ml.p3.2xlarge - 8 vCPU, 61 GB RAM, 1x V100 (16GB)
  • ml.p3.8xlarge - 32 vCPU, 244 GB RAM, 4x V100 (64GB)
  • ml.p3.16xlarge - 64 vCPU, 488 GB RAM, 8x V100 (128GB)
  • ml.g4dn.xlarge - 4 vCPU, 16 GB RAM, 1x T4 (16GB)
  • ml.g5.xlarge - 4 vCPU, 16 GB RAM, 1x A10G (24GB)

Distributed Training

Multi-Instance Training:
SagemakerOrchestratorSettings(
    use_training_step=True,
    estimator_args={
        "train_instance_count": 4,  # 4 instances
        "train_instance_type": "ml.p3.2xlarge",
        "distribution": {
            "smdistributed": {
                "dataparallel": {
                    "enabled": True,
                }
            }
        },
    },
)
Managed Spot Training:
SagemakerOrchestratorSettings(
    use_training_step=True,
    estimator_args={
        "train_use_spot_instances": True,
        "train_max_run": 3600,  # Max time
        "train_max_wait": 7200,  # Max wait for spot
    },
)

Data Modes

File Mode (default):
  • Downloads data before training
  • Full dataset available locally
  • Good for small to medium datasets
Pipe Mode:
  • Streams data during training
  • Lower latency to start
  • Good for large datasets
  • Requires data pipeline support in code
SagemakerOrchestratorSettings(
    input_data_s3_mode="Pipe",
    input_data_s3_uri="s3://my-bucket/data",
)

SageMaker Step Operator

Runs individual steps as SageMaker jobs while orchestrating elsewhere.

Configuration

zenml step-operator register sagemaker-step-op \
    --flavor=sagemaker \
    --role=arn:aws:iam::123456789012:role/SageMakerExecutionRole \
    --region=us-west-2 \
    --instance_type=ml.m5.xlarge

Usage

from zenml import step, pipeline

@step(step_operator="sagemaker-step-op")
def train_on_sagemaker(data: pd.DataFrame) -> Model:
    # Runs on SageMaker
    ...

@step
def preprocess_locally(raw_data: pd.DataFrame) -> pd.DataFrame:
    # Runs locally
    ...

@pipeline
def hybrid_pipeline():
    data = preprocess_locally(...)
    model = train_on_sagemaker(data)

IAM Permissions

Minimal IAM policy for the execution role:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "sagemaker:CreatePipeline",
        "sagemaker:StartPipelineExecution",
        "sagemaker:StopPipelineExecution",
        "sagemaker:DescribePipelineExecution",
        "sagemaker:CreateProcessingJob",
        "sagemaker:CreateTrainingJob",
        "sagemaker:DescribeProcessingJob",
        "sagemaker:DescribeTrainingJob",
        "sagemaker:AddTags",
        "sagemaker:ListTags"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::my-sagemaker-bucket",
        "arn:aws:s3:::my-sagemaker-bucket/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "ecr:GetAuthorizationToken",
        "ecr:BatchCheckLayerAvailability",
        "ecr:GetDownloadUrlForLayer",
        "ecr:BatchGetImage"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "logs:CreateLogGroup",
        "logs:CreateLogStream",
        "logs:PutLogEvents",
        "logs:DescribeLogStreams"
      ],
      "Resource": "arn:aws:logs:*:*:*"
    }
  ]
}
Trust Relationship:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "sagemaker.amazonaws.com"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

Complete Example

from zenml import step, pipeline
from zenml.integrations.aws.flavors.sagemaker_orchestrator_flavor import (
    SagemakerOrchestratorSettings,
)
import pandas as pd
from sklearn.ensemble import RandomForestClassifier

@step
def load_data() -> pd.DataFrame:
    # Load from S3 or local
    return pd.read_csv("data.csv")

@step(
    settings={
        "orchestrator": SagemakerOrchestratorSettings(
            instance_type="ml.m5.xlarge",
            volume_size_in_gb=50,
        )
    }
)
def preprocess_data(data: pd.DataFrame) -> pd.DataFrame:
    # Preprocessing on medium instance
    return data.dropna()

@step(
    settings={
        "orchestrator": SagemakerOrchestratorSettings(
            instance_type="ml.p3.2xlarge",
            volume_size_in_gb=100,
            max_runtime_in_seconds=3600,
            use_training_step=True,
            environment={"CUDA_VISIBLE_DEVICES": "0"},
            tags={"stage": "training", "model": "v2"},
        )
    }
)
def train_model(data: pd.DataFrame) -> RandomForestClassifier:
    # Training on GPU instance
    X = data.drop("target", axis=1)
    y = data["target"]
    model = RandomForestClassifier(n_estimators=100)
    model.fit(X, y)
    return model

@step
def evaluate_model(model: RandomForestClassifier, data: pd.DataFrame) -> float:
    X = data.drop("target", axis=1)
    y = data["target"]
    return model.score(X, y)

@pipeline
def training_pipeline():
    data = load_data()
    processed = preprocess_data(data)
    model = train_model(processed)
    score = evaluate_model(model, processed)

Best Practices

Match instance types to workload:
  • Preprocessing: ml.t3.medium or ml.m5.xlarge
  • Training (CPU): ml.m5.2xlarge or ml.c5.4xlarge
  • Training (GPU): ml.p3.2xlarge or ml.g4dn.xlarge
  • Large datasets: ml.r5.* (memory-optimized)
During development, keep instances warm:
SagemakerOrchestratorSettings(
    use_training_step=True,
    keep_alive_period_in_seconds=900,  # 15 minutes
)
Save up to 90% on training costs:
SagemakerOrchestratorSettings(
    estimator_args={
        "train_use_spot_instances": True,
        "train_max_wait": 7200,
    }
)
Use tags to track costs by project/team:
SagemakerOrchestratorSettings(
    tags={
        "Project": "recommendation-engine",
        "Team": "ml-ops",
        "Environment": "production",
        "CostCenter": "engineering",
    }
)

Monitoring and Debugging

View Logs:
# Get pipeline execution ARN from ZenML output
aws sagemaker describe-pipeline-execution \
    --pipeline-execution-arn arn:aws:sagemaker:...

# View logs in CloudWatch
aws logs tail /aws/sagemaker/ProcessingJobs --follow
SageMaker Console:
  1. Go to SageMaker console
  2. Navigate to Pipelines > Pipeline executions
  3. Click on execution to see DAG and logs
  4. View CloudWatch logs for each step

Next Steps

AWS Integration

General AWS integration guide

Orchestrators

Learn about orchestration concepts

Remote Execution

Production deployment patterns

SageMaker Docs

Official SageMaker documentation

Build docs developers (and LLMs) love