Skip to main content
The @io_manager decorator defines an I/O manager - a special type of resource that handles storing op/asset outputs and loading them as inputs to downstream ops/assets.

Signature

@io_manager(
    config_schema: Optional[ConfigSchema] = None,
    description: Optional[str] = None,
    output_config_schema: Optional[ConfigSchema] = None,
    input_config_schema: Optional[ConfigSchema] = None,
    required_resource_keys: Optional[Set[str]] = None,
    version: Optional[str] = None,
) -> IOManagerDefinition

Parameters

config_schema
Optional[ConfigSchema]
The schema for the resource config. Configuration data available in init_context.resource_config. If not set, Dagster will accept any config provided.
description
Optional[str]
A human-readable description of the resource.
output_config_schema
Optional[ConfigSchema]
The schema for per-output config. If not set, no per-output configuration will be allowed.
input_config_schema
Optional[ConfigSchema]
The schema for per-input config. If not set, Dagster will accept any config provided.
required_resource_keys
Optional[Set[str]]
Keys for the resources required by the I/O manager.
version
Optional[str]
The version of a resource function. Two wrapped resource functions should only have the same version if they produce the same resource definition when provided with the same inputs.

Returns

Type: IOManagerDefinition An I/O manager definition object.

Examples

Basic I/O Manager

from dagster import IOManager, io_manager
import pickle

class MyIOManager(IOManager):
    def handle_output(self, context, obj):
        # Store the output
        file_path = f"/tmp/{context.asset_key.path[-1]}.pkl"
        with open(file_path, "wb") as f:
            pickle.dump(obj, f)

    def load_input(self, context):
        # Load the input
        file_path = f"/tmp/{context.asset_key.path[-1]}.pkl"
        with open(file_path, "rb") as f:
            return pickle.load(f)

@io_manager
def my_io_manager(init_context):
    return MyIOManager()

I/O Manager with Configuration

from dagster import IOManager, io_manager, Field, String
import pandas as pd

class CSVIOManager(IOManager):
    def __init__(self, base_dir: str):
        self.base_dir = base_dir

    def handle_output(self, context, obj: pd.DataFrame):
        file_path = f"{self.base_dir}/{context.asset_key.path[-1]}.csv"
        obj.to_csv(file_path, index=False)

    def load_input(self, context) -> pd.DataFrame:
        file_path = f"{self.base_dir}/{context.asset_key.path[-1]}.csv"
        return pd.read_csv(file_path)

@io_manager(
    config_schema={
        "base_dir": Field(String, default_value="/tmp/data")
    }
)
def csv_io_manager(init_context):
    base_dir = init_context.resource_config["base_dir"]
    return CSVIOManager(base_dir=base_dir)

Using I/O Manager with Assets

from dagster import asset, Definitions, io_manager, IOManager
import json

class JSONIOManager(IOManager):
    def handle_output(self, context, obj):
        file_path = f"/tmp/{context.asset_key.path[-1]}.json"
        with open(file_path, "w") as f:
            json.dump(obj, f)

    def load_input(self, context):
        file_path = f"/tmp/{context.asset_key.path[-1]}.json"
        with open(file_path, "r") as f:
            return json.load(f)

@io_manager
def json_io_manager(init_context):
    return JSONIOManager()

@asset
def upstream_asset():
    return {"data": [1, 2, 3]}

@asset
def downstream_asset(upstream_asset):
    # upstream_asset is automatically loaded by the I/O manager
    return {"processed": upstream_asset["data"]}

defs = Definitions(
    assets=[upstream_asset, downstream_asset],
    resources={"io_manager": json_io_manager},
)

I/O Manager with Per-Output Config

from dagster import io_manager, IOManager, Out, op, job

class ConfigurableIOManager(IOManager):
    def handle_output(self, context, obj):
        # Access per-output config
        file_format = context.config.get("format", "json")
        file_path = f"/tmp/{context.asset_key.path[-1]}.{file_format}"
        # Save based on format...

    def load_input(self, context):
        # Load based on metadata...
        pass

@io_manager(
    output_config_schema={
        "format": str,
        "compression": bool,
    }
)
def configurable_io_manager(init_context):
    return ConfigurableIOManager()

@op(out=Out(io_manager_key="my_io_manager"))
def my_op():
    return {"data": [1, 2, 3]}

@job(
    resource_defs={"my_io_manager": configurable_io_manager},
    config={
        "ops": {
            "my_op": {
                "outputs": {
                    "result": {"format": "parquet", "compression": True}
                }
            }
        }
    },
)
def my_job():
    my_op()

Multiple I/O Managers

from dagster import asset, Definitions, io_manager, IOManager

class DatabaseIOManager(IOManager):
    def handle_output(self, context, obj):
        # Store in database
        pass

    def load_input(self, context):
        # Load from database
        pass

class S3IOManager(IOManager):
    def handle_output(self, context, obj):
        # Store in S3
        pass

    def load_input(self, context):
        # Load from S3
        pass

@io_manager
def database_io_manager(init_context):
    return DatabaseIOManager()

@io_manager
def s3_io_manager(init_context):
    return S3IOManager()

@asset(io_manager_key="database_io_manager")
def database_asset():
    return {"data": [1, 2, 3]}

@asset(io_manager_key="s3_io_manager")
def s3_asset():
    return {"data": [4, 5, 6]}

defs = Definitions(
    assets=[database_asset, s3_asset],
    resources={
        "database_io_manager": database_io_manager,
        "s3_io_manager": s3_io_manager,
    },
)

Partitioned I/O Manager

from dagster import (
    IOManager,
    io_manager,
    asset,
    DailyPartitionsDefinition,
)
import pandas as pd

class PartitionedIOManager(IOManager):
    def handle_output(self, context, obj: pd.DataFrame):
        partition_key = context.partition_key
        file_path = f"/tmp/data/{partition_key}.parquet"
        obj.to_parquet(file_path)

    def load_input(self, context) -> pd.DataFrame:
        partition_key = context.partition_key
        file_path = f"/tmp/data/{partition_key}.parquet"
        return pd.read_parquet(file_path)

@io_manager
def partitioned_io_manager(init_context):
    return PartitionedIOManager()

@asset(
    partitions_def=DailyPartitionsDefinition(start_date="2024-01-01")
)
def partitioned_asset(context):
    date = context.partition_key
    return pd.DataFrame({"date": [date], "value": [42]})

I/O Manager with Resource Dependencies

from dagster import io_manager, IOManager, resource

@resource
def database_connection(init_context):
    return DatabaseConnection()

class DatabaseIOManager(IOManager):
    def __init__(self, connection):
        self.connection = connection

    def handle_output(self, context, obj):
        self.connection.execute(f"INSERT INTO {context.asset_key.path[-1]} ...")

    def load_input(self, context):
        return self.connection.query(f"SELECT * FROM {context.asset_key.path[-1]}")

@io_manager(required_resource_keys={"database_connection"})
def database_io_manager(init_context):
    connection = init_context.resources.database_connection
    return DatabaseIOManager(connection=connection)

Build docs developers (and LLMs) love