WorkflowBuilder API
The WorkflowBuilder provides a fluent API for constructing workflows. Start by defining a start executor, add edges to connect executors, and build the workflow.
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler
class ProcessData ( Executor ):
def __init__ ( self , id : str ):
super (). __init__ ( id = id )
@handler
async def process ( self , data : dict , ctx : WorkflowContext[ dict ]) -> None :
result = { "processed" : True , ** data}
await ctx.send_message(result)
class ValidateData ( Executor ):
def __init__ ( self , id : str ):
super (). __init__ ( id = id )
@handler
async def validate ( self , data : dict , ctx : WorkflowContext[Never, dict ]) -> None :
if data.get( "processed" ):
await ctx.yield_output(data)
process = ProcessData( id = "process" )
validate = ValidateData( id = "validate" )
workflow = (
WorkflowBuilder( start_executor = process)
.add_edge(process, validate)
.build()
)
Creating Executors
Class-based Executors
Define executors by subclassing Executor and adding @handler methods:
from agent_framework import Executor, WorkflowContext, handler
class UpperCase ( Executor ):
def __init__ ( self , id : str ):
super (). __init__ ( id = id )
@handler
async def to_upper_case ( self , text : str , ctx : WorkflowContext[ str ]) -> None :
"""Convert input to uppercase.
Args:
text: Input string (input type)
ctx: WorkflowContext[str] - can send str messages
"""
result = text.upper()
await ctx.send_message(result)
Function-based Executors (Python)
For simple operations, use the @executor decorator:
from agent_framework import executor, WorkflowContext
from typing_extensions import Never
@executor ( id = "reverse_text" )
async def reverse_text ( text : str , ctx : WorkflowContext[Never, str ]) -> None :
"""Reverse text and yield as output.
WorkflowContext[Never, str]:
- Never: does not send messages to other executors
- str: yields str as workflow output
"""
await ctx.yield_output(text[:: - 1 ])
# Use in workflow
workflow = WorkflowBuilder( start_executor = upper_case).add_edge(upper_case, reverse_text).build()
Explicit Type Parameters
When you need more control over types, specify them explicitly on the @handler decorator:
class MultiTypeExecutor ( Executor ):
def __init__ ( self , id : str ):
super (). __init__ ( id = id )
# Explicitly declare input/output types
@handler ( input = str | int , output = str )
async def handle_multiple ( self , message , ctx ) -> None : # type: ignore
"""Handle str or int input, output str.
When using explicit types:
- ALL types must be explicit (no introspection)
- input parameter is required
- output and workflow_output are optional
"""
result = str (message)
await ctx.send_message(result) # type: ignore
With explicit types on @handler, introspection is disabled. You must specify all types via decorator parameters.
Edge Types
Simple Edges
Connect two executors directly:
workflow = (
WorkflowBuilder( start_executor = node_a)
.add_edge(node_a, node_b)
.add_edge(node_b, node_c)
.build()
)
Conditional Edges
Route messages based on conditions:
from agent_framework import Edge
# Condition function receives message data
def is_positive ( data : dict ) -> bool :
return data.get( "value" , 0 ) > 0
workflow = (
WorkflowBuilder( start_executor = analyzer)
.add_edge(
analyzer,
positive_handler,
condition = is_positive
)
.add_edge(
analyzer,
negative_handler,
condition = lambda data : data.get( "value" , 0 ) <= 0
)
.build()
)
Fan-out Edges
Send messages to multiple executors in parallel:
workflow = (
WorkflowBuilder( start_executor = dispatcher)
.add_fan_out_edge(
dispatcher,
[worker_a, worker_b, worker_c]
)
.build()
)
# Optional: selective fan-out
def select_workers ( message : dict , available : list[ str ]) -> list[ str ]:
"""Return subset of workers based on message."""
if message.get( "priority" ) == "high" :
return [ "worker_a" , "worker_b" ]
return available
workflow = (
WorkflowBuilder( start_executor = dispatcher)
.add_fan_out_edge(
dispatcher,
[worker_a, worker_b, worker_c],
selection_func = select_workers
)
.build()
)
Fan-in Edges
Aggregate messages from multiple sources:
workflow = (
WorkflowBuilder( start_executor = dispatcher)
.add_fan_out_edge(dispatcher, [worker_a, worker_b, worker_c])
.add_fan_in_edge([worker_a, worker_b, worker_c], aggregator)
.build()
)
Switch-Case Edges
Route to exactly one target based on ordered conditions:
from agent_framework import Case, Default
def is_spam ( message : dict ) -> bool :
return message.get( "spam_score" , 0 ) > 0.8
def is_uncertain ( message : dict ) -> bool :
return 0.4 < message.get( "spam_score" , 0 ) <= 0.8
workflow = (
WorkflowBuilder( start_executor = classifier)
.add_switch_case_edge_group(
classifier,
[
Case( condition = is_spam, target = spam_handler),
Case( condition = is_uncertain, target = review_handler),
Default( target = inbox_handler), # Fallback when no cases match
]
)
.build()
)
Switch-case evaluates cases in order and routes to the first match. Always include a Default case as the last option.
WorkflowContext
The WorkflowContext parameter in handlers provides methods for message passing and state management:
Sending Messages
class DataProcessor ( Executor ):
@handler
async def process (
self ,
data : dict ,
ctx : WorkflowContext[ dict ] # Can send dict messages
) -> None :
# Send to any connected downstream executor
await ctx.send_message({ "result" : "processed" })
# Send to specific executor by ID
await ctx.send_message(
{ "result" : "processed" },
target_id = "validator"
)
Yielding Outputs
class FinalStep ( Executor ):
@handler
async def finalize (
self ,
data : dict ,
ctx : WorkflowContext[Never, str ] # Can yield str outputs
) -> None :
# Yield workflow output - available via events.get_outputs()
await ctx.yield_output( "Workflow completed successfully" )
State Management
class StatefulExecutor ( Executor ):
@handler
async def process ( self , data : dict , ctx : WorkflowContext[ dict ]) -> None :
# Set state (persists across super steps)
ctx.set_state( "user_id" , data[ "user_id" ])
ctx.set_state( "count" , ctx.get_state( "count" , 0 ) + 1 )
# Get state
user_id = ctx.get_state( "user_id" )
count = ctx.get_state( "count" , default = 0 )
await ctx.send_message({ "user_id" : user_id, "count" : count})
WorkflowContext Type Parameters
WorkflowContext uses generic type parameters to define capabilities:
from typing_extensions import Never
# No outputs
WorkflowContext # Equivalent to WorkflowContext[Never, Never]
# Can send int messages
WorkflowContext[ int ]
# Can send int messages AND yield str workflow outputs
WorkflowContext[ int , str ]
# Example usage
class TypedExecutor ( Executor ):
@handler
async def handle (
self ,
text : str , # Input type
ctx : WorkflowContext[ int , str ] # Send int, yield str
) -> None :
await ctx.send_message( len (text)) # Send int
await ctx.yield_output(text.upper()) # Yield str
Running Workflows
Basic Execution
# Non-streaming execution
events = await workflow.run( "initial input" )
# Get outputs
outputs = events.get_outputs()
for output in outputs:
print (output)
# Check final state
print (events.get_final_state()) # WorkflowRunState.IDLE
Streaming Execution
# Stream events as they occur
async for event in workflow.run( "initial input" , stream = True ):
if event.type == "executor_invoked" :
print ( f "Executor { event.executor_id } started" )
elif event.type == "executor_completed" :
print ( f "Executor { event.executor_id } completed" )
elif event.type == "output" :
print ( f "Output: { event.data } " )
Workflow Configuration
Maximum Iterations
workflow = (
WorkflowBuilder(
start_executor = node,
max_iterations = 10 # Prevent infinite loops
)
.add_edge(node, node) # Self-loop
.build()
)
Workflow Name
workflow = (
WorkflowBuilder( start_executor = node)
.with_name( "data-processing-pipeline" )
.add_edge(node, next_node)
.build()
)
print (workflow.name) # "data-processing-pipeline"
State Isolation Pattern
Wrap workflow construction in a function for clean state between runs:
def create_workflow ():
"""Create workflow with fresh executor instances."""
node_a = NodeA( id = "node_a" )
node_b = NodeB( id = "node_b" )
return (
WorkflowBuilder( start_executor = node_a)
.add_edge(node_a, node_b)
.build()
)
# Each call gets fresh state
workflow1 = create_workflow()
workflow2 = create_workflow()
This pattern is recommended for reusable workflows. Each workflow instance maintains independent executor state.
Next Steps
Orchestration Patterns Learn sequential, concurrent, and parallel execution patterns
Checkpoints Save and restore workflow state