Skip to main content

IO Managers

IO Managers are special resources that handle how data is stored and loaded between assets and ops. They provide a clean separation between your business logic and data persistence, making it easy to:
  • Switch storage backends: Use local files in dev, cloud storage in production
  • Handle different data types: Automatically serialize/deserialize DataFrames, Parquet files, database tables
  • Optimize performance: Implement caching, compression, and partitioned storage
  • Test pipelines: Mock data persistence for unit tests

Why IO Managers Matter

Without IO managers, each asset would need to handle its own persistence:
# ❌ Without IO Manager - persistence logic mixed with business logic
@asset
def upstream():
    data = compute_data()
    # Manual persistence
    df.to_parquet("/data/upstream.parquet")
    return data

@asset
def downstream():
    # Manual loading
    data = pd.read_parquet("/data/upstream.parquet")
    return process(data)
With IO managers, persistence is automatic:
# ✅ With IO Manager - clean separation of concerns
@asset
def upstream():
    return compute_data()  # IO manager handles storage

@asset
def downstream(upstream):  # IO manager handles loading
    return process(upstream)

How IO Managers Work

When an asset or op produces output:
  1. The computation function returns a value
  2. The IO manager’s handle_output method stores the value
When an asset or op needs input:
  1. The IO manager’s load_input method loads the value
  2. The value is passed to the computation function

Default IO Manager

Dagster provides a default IO manager that uses pickle serialization:
from dagster import FilesystemIOManager, job, op

@op
def op_1():
    return 1

@op
def op_2(a):
    return a + 1

@job(resource_defs={"io_manager": FilesystemIOManager()})
def my_job():
    op_2(op_1())
The FilesystemIOManager stores outputs as pickle files in a local directory.
By default, Dagster uses a filesystem IO manager that stores data in $DAGSTER_HOME/storage/. You can override this behavior by providing your own IO manager.

Creating Custom IO Managers

Define custom IO managers by extending ConfigurableIOManager:
import dagster as dg

class MyIOManager(dg.ConfigurableIOManager):
    path_prefix: list[str] = []

    def _get_path(self, context) -> str:
        return "/".join(self.path_prefix + context.asset_key.path)

    def handle_output(self, context: dg.OutputContext, obj):
        write_csv(self._get_path(context), obj)

    def load_input(self, context: dg.InputContext):
        return read_csv(self._get_path(context))

IO Manager Methods

  • handle_output(context, obj): Called when an asset/op produces output. Store obj based on information in context.
  • load_input(context): Called when an asset/op needs input. Load and return data based on information in context.

Context Information

The context objects provide metadata about the asset/op:
from dagster import OutputContext

def handle_output(self, context: OutputContext, obj):
    # Asset/op metadata
    context.asset_key          # AssetKey for the asset
    context.name               # Output name
    context.metadata           # User-defined metadata
    
    # Partitioning
    context.has_partition_key  # Whether output is partitioned
    context.partition_key      # The specific partition
    
    # Type information
    context.dagster_type       # Dagster type of output
    
    # Add metadata to materialization event
    context.add_output_metadata({"rows": len(obj)})

Database IO Manager Example

Here’s a complete example that stores data in database tables:
import dagster as dg

class DataframeTableIOManager(dg.ConfigurableIOManager):
    def handle_output(self, context: dg.OutputContext, obj):
        # Use output name as table name
        table_name = context.name
        write_dataframe_to_table(name=table_name, dataframe=obj)

    def load_input(self, context: dg.InputContext):
        # Load from upstream table
        if context.upstream_output:
            table_name = context.upstream_output.name
            return read_dataframe_from_table(name=table_name)

@dg.job(resource_defs={"io_manager": DataframeTableIOManager()})
def my_job():
    op_2(op_1())

Adding Output Metadata

IO managers can add metadata to materialization events:
import dagster as dg

class DataframeTableIOManagerWithMetadata(dg.ConfigurableIOManager):
    def handle_output(self, context: dg.OutputContext, obj):
        table_name = context.name
        write_dataframe_to_table(name=table_name, dataframe=obj)

        # Add metadata visible in the UI
        context.add_output_metadata({
            "num_rows": len(obj),
            "table_name": table_name,
            "columns": list(obj.columns),
        })

    def load_input(self, context: dg.InputContext):
        if context.upstream_output:
            table_name = context.upstream_output.name
            return read_dataframe_from_table(name=table_name)
Metadata appears in the Dagster UI on the materialization event, helping you understand what was produced.

Partitioned IO Managers

IO managers can handle partitioned assets by checking partition keys:
import dagster as dg

class MyPartitionedIOManager(dg.IOManager):
    def _get_path(self, context) -> str:
        if context.has_partition_key:
            # Store each partition in a separate file
            return "/".join(context.asset_key.path + [context.asset_partition_key])
        else:
            return "/".join(context.asset_key.path)

    def handle_output(self, context: dg.OutputContext, obj):
        write_csv(self._get_path(context), obj)

    def load_input(self, context: dg.InputContext):
        return read_csv(self._get_path(context))
For backfills that process multiple partitions in a single run:
import dagster as dg

class MyIOManager(dg.IOManager):
    def load_input(self, context: dg.InputContext):
        # Get time window for all partitions being processed
        start_datetime, end_datetime = context.asset_partitions_time_window
        return read_data_in_datetime_range(start_datetime, end_datetime)

    def handle_output(self, context: dg.OutputContext, obj):
        start_datetime, end_datetime = context.asset_partitions_time_window
        return overwrite_data_in_datetime_range(start_datetime, end_datetime, obj)

daily_partition = dg.DailyPartitionsDefinition(start_date="2020-01-01")

@dg.asset(
    partitions_def=daily_partition,
    backfill_policy=dg.BackfillPolicy.single_run(),
)
def events(context: dg.AssetExecutionContext, raw_events):
    return compute_events_from_raw_events(raw_events)

IO Manager Factory Pattern

For IO managers with complex initialization, use the factory pattern:
import dagster as dg

class ExternalIOManager(dg.IOManager):
    def __init__(self, api_token):
        self._api_token = api_token
        self._cache = {}  # Stateful cache

    def handle_output(self, context: dg.OutputContext, obj):
        # Store via external API
        store_in_external_system(self._api_token, context.asset_key, obj)

    def load_input(self, context: dg.InputContext):
        # Check cache first
        if context.asset_key in self._cache:
            return self._cache[context.asset_key]
        
        # Load from external system
        data = fetch_from_external_system(self._api_token, context.asset_key)
        self._cache[context.asset_key] = data
        return data

class ConfigurableExternalIOManager(dg.ConfigurableIOManagerFactory):
    api_token: str

    def create_io_manager(self, context) -> ExternalIOManager:
        return ExternalIOManager(self.api_token)

Per-Asset IO Managers

Different assets can use different IO managers:
import dagster as dg

@dg.asset(io_manager_key="fs_io_manager")
def asset_uses_fs():
    return [1, 2, 3]

@dg.asset(io_manager_key="db_io_manager")
def asset_uses_db():
    return {"key": "value"}

defs = dg.Definitions(
    assets=[asset_uses_fs, asset_uses_db],
    resources={
        "fs_io_manager": dg.FilesystemIOManager(base_dir="/data"),
        "db_io_manager": DatabaseIOManager(connection_string="..."),
    },
)

Per-Output IO Managers

Multi-assets can specify different IO managers for each output:
import dagster as dg

@dg.multi_asset(
    outs={
        "raw_data": dg.AssetOut(io_manager_key="fs_io_manager"),
        "transformed_data": dg.AssetOut(io_manager_key="db_io_manager"),
    }
)
def extract_and_transform():
    raw = fetch_raw_data()
    transformed = transform(raw)
    
    yield dg.Output(raw, output_name="raw_data")
    yield dg.Output(transformed, output_name="transformed_data")

Input Managers

For loading data without a corresponding output, use input managers:
import dagster as dg

class MyInputManager(dg.InputManager):
    def load_input(self, context: dg.InputContext):
        # Load from external source
        return read_from_source(context.asset_key)

@dg.asset(ins={"external_data": dg.AssetIn(input_manager_key="my_input_manager")})
def process_external_data(external_data):
    return transform(external_data)

defs = dg.Definitions(
    assets=[process_external_data],
    resources={
        "my_input_manager": MyInputManager(),
    },
)

Type-Specific IO Managers

Create IO managers that handle specific data types:
import dagster as dg
import pandas as pd

class ParquetIOManager(dg.ConfigurableIOManager):
    base_path: str

    def _get_path(self, context) -> str:
        return f"{self.base_path}/{'/'.join(context.asset_key.path)}.parquet"

    def handle_output(self, context: dg.OutputContext, obj: pd.DataFrame):
        path = self._get_path(context)
        obj.to_parquet(path)
        
        context.add_output_metadata({
            "num_rows": len(obj),
            "num_columns": len(obj.columns),
            "path": path,
        })

    def load_input(self, context: dg.InputContext) -> pd.DataFrame:
        return pd.read_parquet(self._get_path(context))

Testing with IO Managers

Test IO managers in isolation:
import dagster as dg
import tempfile
import os

def test_my_io_manager():
    with tempfile.TemporaryDirectory() as tmpdir:
        io_manager = MyIOManager(path_prefix=[tmpdir])
        
        # Create mock context
        context = dg.build_output_context(
            asset_key=dg.AssetKey(["test_asset"])
        )
        
        # Test handle_output
        test_data = [1, 2, 3]
        io_manager.handle_output(context, test_data)
        
        # Test load_input
        loaded_data = io_manager.load_input(
            dg.build_input_context(asset_key=dg.AssetKey(["test_asset"]))
        )
        
        assert loaded_data == test_data
Test assets with mock IO managers:
import dagster as dg

class MockIOManager(dg.IOManager):
    def __init__(self):
        self.storage = {}

    def handle_output(self, context, obj):
        self.storage[context.asset_key] = obj

    def load_input(self, context):
        return self.storage[context.asset_key]

def test_asset_pipeline():
    result = dg.materialize(
        [upstream_asset, downstream_asset],
        resources={"io_manager": MockIOManager()},
    )
    assert result.success

Built-in IO Managers

Dagster provides several built-in IO managers:
from dagster import FilesystemIOManager

defs = dg.Definitions(
    assets=[my_asset],
    resources={
        "io_manager": FilesystemIOManager(
            base_dir="/data/storage"
        )
    },
)
Stores outputs as pickle files in the specified directory.

Best Practices

Use filesystem IO managers for local development, cloud storage IO managers for production, and database IO managers for tabular data that needs to be queried.
Use context.add_output_metadata() to record information like row counts, file sizes, and data quality metrics.
For partitioned assets, make sure your IO manager stores each partition separately and can load individual partitions or partition ranges.
Stateless IO managers are easier to test and reason about. Use the factory pattern if you need stateful caching.
Write unit tests for your IO manager’s handle_output and load_input methods before using them in production.

API Reference

Build docs developers (and LLMs) love