OutputDefinition
Defines an output from an op’s compute function. Ops can have multiple outputs, in which case outputs cannot be anonymous. Many ops have only one output, which is given the default name “result”.
Class Definition
class OutputDefinition:
def __init__(
self,
dagster_type=None,
name: str | None = None,
description: str | None = None,
is_required: bool = True,
io_manager_key: str | None = None,
metadata: ArbitraryMetadataMapping | None = None,
code_version: str | None = None,
)
Defined in: dagster._core.definitions.output
Parameters
dagster_type
Type | DagsterType | None
The type of this output. Users should provide the Python type of objects they expect the op to yield, or a DagsterType for runtime validation.Defaults to Any.from dagster import OutputDefinition, op
@op(out=OutputDefinition(dagster_type=int))
def my_op():
return 42
name
str | None
default:"result"
Name of the output. Defaults to “result” for single outputs.from dagster import OutputDefinition, op
@op(out=OutputDefinition(name="my_output"))
def my_op():
return "data"
Human-readable description of the output.OutputDefinition(
description="Processed customer records from the ETL pipeline"
)
Whether the presence of this output is required.# Optional output
OutputDefinition(is_required=False)
io_manager_key
str | None
default:"io_manager"
The resource key of the IOManager used for storing this output and loading it in downstream steps.from dagster import OutputDefinition
OutputDefinition(io_manager_key="s3_io_manager")
Static metadata for the output. For example, file paths or database table information.OutputDefinition(
metadata={
"table": "customers",
"schema": "public"
}
)
Version of the code that generates this output. Versions should be set only for code that deterministically produces the same output given the same inputs.OutputDefinition(code_version="1.0.0")
Properties
The Dagster type of the output.
The description of the output.
Whether the output is required.
Whether the output is optional (inverse of is_required).
The IO manager key for this output.
The metadata attached to this output.
Whether this is a dynamic output. Returns False for regular outputs.
Methods
mapping_from
def mapping_from(
self,
node_name: str,
output_name: str | None = None,
from_dynamic_mapping: bool = False
) -> OutputMapping
Create an output mapping from an output of a child node. Used in graph definitions.
The name of the child node from which to map this output.
The name of the child node’s output. Defaults to “result”.
from dagster import OutputDefinition
output_mapping = OutputDefinition(int).mapping_from('child_node')
Out
A more concise way to define outputs using the @op decorator. This is the recommended approach for defining outputs.
Class Definition
class Out(NamedTuple):
dagster_type: DagsterType | type[NoValueSentinel]
description: str | None
is_required: bool
io_manager_key: str
metadata: RawMetadataMapping | None
code_version: str | None
Defined in: dagster._core.definitions.output
Constructor
def __new__(
cls,
dagster_type: type | DagsterType | None = NoValueSentinel,
description: str | None = None,
is_required: bool = True,
io_manager_key: str | None = None,
metadata: ArbitraryMetadataMapping | None = None,
code_version: str | None = None,
)
Parameters
Parameters are identical to OutputDefinition, but Out is designed for use with the decorator syntax.
dagster_type
Type | DagsterType | None
The type of this output. Only set if the correct type cannot be inferred from the function signature.
Human-readable description of the output.
Whether this output is required.
io_manager_key
str | None
default:"io_manager"
The resource key of the output manager.
Version of the code that generates this output.
Examples
Single Output
from dagster import op, Out
@op(out=Out(dagster_type=int, description="The computed value"))
def compute_value() -> int:
return 42
Multiple Outputs
from dagster import op, Out
from typing import Tuple
@op(
out={
"sum": Out(int, description="Sum of inputs"),
"product": Out(int, description="Product of inputs"),
}
)
def calculate(a: int, b: int) -> Tuple[int, int]:
return (a + b, a * b)
With IO Manager
from dagster import op, Out
@op(out=Out(io_manager_key="s3_io_manager"))
def store_in_s3():
return large_dataframe
from dagster import op, Out
@op(
out=Out(
metadata={
"table": "customers",
"partition_key": "date"
}
)
)
def load_customers():
return customer_data
Optional Output
from dagster import op, Out
from typing import Optional
@op(out=Out(is_required=False))
def maybe_output(condition: bool) -> Optional[str]:
if condition:
return "data"
return None
With Type Inference
from dagster import op, Out
import pandas as pd
# Type is inferred from return annotation
@op(out=Out(description="Customer dataframe"))
def load_data() -> pd.DataFrame:
return pd.read_csv("customers.csv")
DynamicOutputDefinition
Variant of OutputDefinition for outputs that will dynamically alter the graph at runtime.
Class Definition
class DynamicOutputDefinition(OutputDefinition):
# Inherits constructor from OutputDefinition
pass
Defined in: dagster._core.definitions.output
Usage
When using in composition functions like @job, dynamic outputs must be used with:
map - Clone downstream nodes for each DynamicOutput
collect - Gather all DynamicOutput into a list
Example
import os
from dagster import op, job, DynamicOut, DynamicOutput
@op(out=DynamicOut(str))
def files_in_directory(context):
path = context.op_config["path"]
dirname, _, filenames = next(os.walk(path))
for file in filenames:
yield DynamicOutput(
os.path.join(dirname, file),
mapping_key=file.replace(".", "_")
)
@op
def process_file(file_path: str) -> int:
# Process individual file
return len(open(file_path).read())
@op
def summarize_directory(file_sizes: list[int]) -> int:
return sum(file_sizes)
@job
def process_directory():
files = files_in_directory()
# Map process_file over each dynamic output
file_results = files.map(process_file)
# Collect results into a list
summarize_directory(file_results.collect())
DynamicOut
Variant of Out for dynamic outputs. Used with the decorator syntax.
Class Definition
class DynamicOut(Out):
# Inherits constructor from Out
pass
Defined in: dagster._core.definitions.output
Example
from dagster import op, job, DynamicOut, DynamicOutput
@op(
config_schema={"items": [str]},
out=DynamicOut(str)
)
def generate_items(context) -> None:
for item in context.op_config["items"]:
yield DynamicOutput(item, mapping_key=item)
@op
def process_item(item: str) -> str:
return item.upper()
@op
def combine_items(items: list[str]) -> str:
return ", ".join(items)
@job
def dynamic_job():
items = generate_items()
processed = items.map(process_item)
combine_items(processed.collect())
OutputMapping
Defines an output mapping for a graph, connecting an output from a child node to a graph output.
Class Definition
class OutputMapping(NamedTuple):
graph_output_name: str
mapped_node_name: str
mapped_node_output_name: str
graph_output_description: str | None = None
dagster_type: DagsterType | None = None
from_dynamic_mapping: bool = False
Defined in: dagster._core.definitions.output
Parameters
Name of the output in the graph being mapped to.
Name of the node (op/graph) that the output is being mapped from.
Name of the output in the node being mapped from.
Description of the graph output.
Set to true if the node being mapped to is a mapped dynamic node.
The dagster type of the graph’s output.
Example
from dagster import OutputMapping, GraphDefinition, op, graph, GraphOut
@op
def emit_five():
return 5
# Using OutputMapping explicitly
GraphDefinition(
name="the_graph",
node_defs=[emit_five],
output_mappings=[
OutputMapping(
graph_output_name="result",
mapped_node_name="emit_five",
mapped_node_output_name="result"
)
]
)
# Equivalent using decorator
@graph(out=GraphOut())
def the_graph():
return emit_five()
GraphOut
Represents information about outputs that a graph maps. Used with the @graph decorator.
Class Definition
class GraphOut(NamedTuple):
description: str | None
Defined in: dagster._core.definitions.output
Constructor
def __new__(cls, description: str | None = None)
Parameter
Human-readable description of the output.
Example
from dagster import graph, op, GraphOut
@op
def op_a() -> int:
return 1
@op
def op_b() -> int:
return 2
@graph(out=GraphOut(description="Sum of op_a and op_b"))
def my_graph() -> int:
return op_a() + op_b()
Common Patterns
Single vs Multiple Outputs
from dagster import op, Out
# Single output (implicit)
@op
def single_output():
return 42
# Single output (explicit)
@op(out=Out(int))
def explicit_single():
return 42
# Multiple outputs
@op(out={"a": Out(int), "b": Out(str)})
def multiple_outputs():
return 42, "hello"
Type Safety
from dagster import op, Out
import pandas as pd
# Type inferred from annotation
@op
def load_dataframe() -> pd.DataFrame:
return pd.DataFrame({"col": [1, 2, 3]})
# Explicit type
@op(out=Out(dagster_type=pd.DataFrame))
def load_dataframe_explicit():
return pd.DataFrame({"col": [1, 2, 3]})
Custom IO Managers
from dagster import op, Out, asset, Definitions
from dagster import ConfigurableIOManager
class MyIOManager(ConfigurableIOManager):
def handle_output(self, context, obj):
# Custom storage logic
pass
def load_input(self, context):
# Custom loading logic
pass
@op(out=Out(io_manager_key="my_io_manager"))
def my_op():
return "data"
defs = Definitions(
ops=[my_op],
resources={"my_io_manager": MyIOManager()}
)
from dagster import op, Out
@op(
out=Out(
metadata={
"format": "parquet",
"compression": "snappy",
"schema_version": "2.0"
}
)
)
def save_data():
return dataframe