Skip to main content

AWS Integration

The dagster-aws library provides integrations with Amazon Web Services, enabling you to:
  • Store assets and intermediate data in S3
  • Execute compute workloads on ECS and EMR
  • Query data with Athena
  • Manage secrets with AWS Secrets Manager
  • Connect to RDS databases
  • Use AWS Lambda for serverless execution

Installation

pip install dagster-aws

# Install with specific service extras
pip install dagster-aws[redshift,athena]

Core Components

S3 Resource

Connect to S3 for storage operations:
from dagster import asset, Definitions
from dagster_aws.s3 import S3Resource

@asset
def s3_file(s3: S3Resource):
    # Get S3 client
    s3_client = s3.get_client()
    
    # List objects
    response = s3_client.list_objects_v2(
        Bucket="my-bucket",
        Prefix="data/",
    )
    
    # Upload data
    s3_client.put_object(
        Bucket="my-bucket",
        Key="output/result.json",
        Body='{"result": "success"}',
    )
    
    return {"files": len(response.get("Contents", []))}

defs = Definitions(
    assets=[s3_file],
    resources={
        "s3": S3Resource(
            region_name="us-west-2",
            aws_access_key_id={"env": "AWS_ACCESS_KEY_ID"},
            aws_secret_access_key={"env": "AWS_SECRET_ACCESS_KEY"},
        )
    },
)
S3Resource configuration:
  • region_name: AWS region
  • endpoint_url: Custom S3 endpoint (for S3-compatible services)
  • use_unsigned_session: Use unsigned requests (for public buckets)
  • profile_name: AWS profile from credentials file
  • aws_access_key_id: AWS access key
  • aws_secret_access_key: AWS secret key
  • aws_session_token: Temporary session token

S3 IO Manager

Store asset outputs in S3:
from dagster import asset, Definitions
from dagster_aws.s3 import S3PickleIOManager, S3Resource
import pandas as pd

@asset
def extract_data() -> pd.DataFrame:
    return pd.DataFrame({"id": [1, 2, 3], "value": [10, 20, 30]})

@asset
def transform_data(extract_data: pd.DataFrame) -> pd.DataFrame:
    # Data automatically loaded from S3
    return extract_data[extract_data["value"] > 15]

defs = Definitions(
    assets=[extract_data, transform_data],
    resources={
        "io_manager": S3PickleIOManager(
            s3_resource=S3Resource(),
            s3_bucket="my-dagster-bucket",
            s3_prefix="dagster/storage",
        )
    },
)
The S3 IO Manager automatically handles serialization, S3 uploads/downloads, and path management for asset outputs.

ECS Integration

Execute tasks on AWS ECS:
from dagster import asset, Definitions
from dagster_aws.ecs import EcsRunLauncher

# Configure in dagster.yaml
# run_launcher:
#   module: dagster_aws.ecs
#   class: EcsRunLauncher
#   config:
#     cluster: my-ecs-cluster
#     subnets:
#       - subnet-12345
#     security_group_ids:
#       - sg-12345

@asset
def ecs_computation():
    # This runs on ECS when using EcsRunLauncher
    return {"result": "computed on ECS"}

EMR Integration

Run Spark jobs on Amazon EMR:
from dagster import asset, Definitions, OpExecutionContext
from dagster_aws.emr import EmrJobRunner, EmrClient

@asset
def emr_spark_job(context: OpExecutionContext):
    emr_client = EmrClient(
        region_name="us-west-2",
    )
    
    job_runner = EmrJobRunner(
        region="us-west-2",
        cluster_id="j-XXXXXXXXXXXXX",  # Existing cluster
    )
    
    # Submit Spark job
    step_id = emr_client.run_job_flow(
        cluster_id="j-XXXXXXXXXXXXX",
        steps=[
            {
                "Name": "Spark Job",
                "ActionOnFailure": "CONTINUE",
                "HadoopJarStep": {
                    "Jar": "command-runner.jar",
                    "Args": [
                        "spark-submit",
                        "s3://my-bucket/scripts/job.py",
                    ],
                },
            }
        ],
    )
    
    context.log.info(f"Submitted EMR step: {step_id}")
    return {"step_id": step_id}

Athena Integration

Query data in S3 using Athena:
from dagster import asset, Definitions, OpExecutionContext
from dagster_aws.athena import AthenaClient
import time

@asset
def athena_query(context: OpExecutionContext):
    athena = AthenaClient(
        region_name="us-west-2",
        database="my_database",
        output_location="s3://my-bucket/athena-results/",
    )
    
    # Execute query
    query = "SELECT * FROM my_table WHERE date = '2024-01-01'"
    execution_id = athena.execute_query(query)
    
    # Wait for completion
    while True:
        status = athena.get_query_status(execution_id)
        if status in ["SUCCEEDED", "FAILED", "CANCELLED"]:
            break
        time.sleep(2)
    
    if status == "SUCCEEDED":
        # Get results
        results = athena.get_query_results(execution_id)
        context.log.info(f"Query returned {len(results)} rows")
        return {"row_count": len(results)}
    else:
        raise Exception(f"Query failed with status: {status}")

defs = Definitions(
    assets=[athena_query],
)

Secrets Manager

Retrieve secrets from AWS Secrets Manager:
from dagster import asset, Definitions, OpExecutionContext
from dagster_aws.secretsmanager import SecretsManagerResource

@asset
def secret_consumer(context: OpExecutionContext, secrets: SecretsManagerResource):
    # Fetch secret
    db_password = secrets.fetch_secret("prod/database/password")
    
    # Use secret for database connection
    context.log.info("Retrieved database password from Secrets Manager")
    return {"status": "connected"}

defs = Definitions(
    assets=[secret_consumer],
    resources={
        "secrets": SecretsManagerResource(
            region_name="us-west-2",
        )
    },
)

RDS Integration

Connect to RDS databases:
from dagster import asset, Definitions
from dagster_aws.rds import RDSResource

@asset
def rds_query(rds: RDSResource):
    # Get database connection
    with rds.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT COUNT(*) FROM customers")
        count = cursor.fetchone()[0]
    
    return {"customer_count": count}

defs = Definitions(
    assets=[rds_query],
    resources={
        "rds": RDSResource(
            host="my-db.us-west-2.rds.amazonaws.com",
            port=5432,
            database="mydb",
            user={"env": "DB_USER"},
            password={"env": "DB_PASSWORD"},
        )
    },
)

Redshift

Connect to Amazon Redshift:
from dagster import asset, Definitions
from dagster_aws.redshift import RedshiftResource

@asset
def redshift_analysis(redshift: RedshiftResource):
    query = """
        SELECT product_category, SUM(revenue) as total_revenue
        FROM sales
        WHERE date >= CURRENT_DATE - 30
        GROUP BY product_category
        ORDER BY total_revenue DESC
    """
    
    with redshift.get_connection() as conn:
        results = conn.cursor().execute(query).fetchall()
    
    return {"top_categories": results}

defs = Definitions(
    assets=[redshift_analysis],
    resources={
        "redshift": RedshiftResource(
            host="my-cluster.redshift.amazonaws.com",
            port=5439,
            database="analytics",
            user={"env": "REDSHIFT_USER"},
            password={"env": "REDSHIFT_PASSWORD"},
        )
    },
)

CloudWatch Integration

Send metrics and logs to CloudWatch:
from dagster import asset, OpExecutionContext
from dagster_aws.cloudwatch import CloudWatchClient

@asset
def monitored_computation(context: OpExecutionContext):
    cloudwatch = CloudWatchClient(region_name="us-west-2")
    
    # Emit custom metric
    cloudwatch.put_metric_data(
        namespace="Dagster/Pipeline",
        metric_data=[
            {
                "MetricName": "RecordsProcessed",
                "Value": 1000,
                "Unit": "Count",
            }
        ],
    )
    
    return {"records": 1000}

Dagster Pipes with AWS

Use Dagster Pipes for AWS compute:
from dagster import asset, Definitions
from dagster_aws.pipes import PipesECSClient, PipesLambdaClient

@asset
def ecs_pipes_asset(context, pipes_ecs: PipesECSClient):
    # Run code on ECS with Pipes
    return pipes_ecs.run(
        context=context,
        task_definition="my-task-definition",
        cluster="my-cluster",
        launch_type="FARGATE",
    ).get_materialize_result()

@asset
def lambda_pipes_asset(context, pipes_lambda: PipesLambdaClient):
    # Execute Lambda function with Pipes
    return pipes_lambda.run(
        context=context,
        function_name="my-lambda-function",
        event={"key": "value"},
    ).get_materialize_result()

defs = Definitions(
    assets=[ecs_pipes_asset, lambda_pipes_asset],
    resources={
        "pipes_ecs": PipesECSClient(),
        "pipes_lambda": PipesLambdaClient(),
    },
)

IAM Roles and Permissions

Dagster AWS integrations use boto3, which respects standard AWS credential chain:
  1. Environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
  2. Shared credentials file (~/.aws/credentials)
  3. IAM role for EC2 instances
  4. ECS task role
  5. IAM role for Lambda
When running in AWS (ECS, EC2, Lambda), use IAM roles instead of hardcoded credentials for better security.
Example IAM policy for S3 and Secrets Manager:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::my-dagster-bucket/*",
        "arn:aws:s3:::my-dagster-bucket"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetSecretValue"
      ],
      "Resource": "arn:aws:secretsmanager:*:*:secret:dagster/*"
    }
  ]
}

Multi-Region Support

Work with resources across multiple AWS regions:
from dagster import asset, Definitions
from dagster_aws.s3 import S3Resource

@asset
def cross_region_sync(us_s3: S3Resource, eu_s3: S3Resource):
    # Read from US bucket
    us_client = us_s3.get_client()
    obj = us_client.get_object(Bucket="us-bucket", Key="data.json")
    data = obj["Body"].read()
    
    # Write to EU bucket
    eu_client = eu_s3.get_client()
    eu_client.put_object(Bucket="eu-bucket", Key="data.json", Body=data)
    
    return {"synced": True}

defs = Definitions(
    assets=[cross_region_sync],
    resources={
        "us_s3": S3Resource(region_name="us-west-2"),
        "eu_s3": S3Resource(region_name="eu-west-1"),
    },
)

Best Practices

  1. Use IAM roles: Prefer IAM roles over access keys when running in AWS
  2. Leverage S3 IO Manager: Use for automatic asset persistence to S3
  3. Regional resources: Create region-specific resources to minimize latency
  4. Secrets management: Store credentials in Secrets Manager, not code
  5. Cost optimization: Use S3 lifecycle policies for old execution data
  6. Monitoring: Send custom metrics to CloudWatch for observability

Troubleshooting

Credentials not found

Ensure AWS credentials are configured:
aws configure
# or set environment variables
export AWS_ACCESS_KEY_ID=your_key
export AWS_SECRET_ACCESS_KEY=your_secret
export AWS_DEFAULT_REGION=us-west-2

S3 permissions errors

Verify IAM policy includes necessary S3 actions:
  • s3:GetObject
  • s3:PutObject
  • s3:ListBucket

ECS task failures

Check:
  • Task definition is valid
  • Cluster has capacity
  • Security groups allow necessary traffic
  • IAM task role has required permissions

API Reference

Key resources and clients:
  • S3Resource: S3 client access
  • S3PickleIOManager: S3-backed IO manager
  • EcsRunLauncher: Execute runs on ECS
  • EmrJobRunner: Submit EMR jobs
  • AthenaClient: Query with Athena
  • SecretsManagerResource: Fetch secrets
  • RDSResource: RDS database connections
  • RedshiftResource: Redshift connections
  • PipesECSClient: Dagster Pipes on ECS
  • PipesLambdaClient: Dagster Pipes on Lambda
For complete documentation, see dagster-aws API reference.

Next Steps

Build docs developers (and LLMs) love