Skip to main content

WorkflowBuilder API

The WorkflowBuilder provides a fluent API for constructing workflows. Start by defining a start executor, add edges to connect executors, and build the workflow.
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, handler

class ProcessData(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
    
    @handler
    async def process(self, data: dict, ctx: WorkflowContext[dict]) -> None:
        result = {"processed": True, **data}
        await ctx.send_message(result)

class ValidateData(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
    
    @handler
    async def validate(self, data: dict, ctx: WorkflowContext[Never, dict]) -> None:
        if data.get("processed"):
            await ctx.yield_output(data)

process = ProcessData(id="process")
validate = ValidateData(id="validate")

workflow = (
    WorkflowBuilder(start_executor=process)
    .add_edge(process, validate)
    .build()
)

Creating Executors

Class-based Executors

Define executors by subclassing Executor and adding @handler methods:
from agent_framework import Executor, WorkflowContext, handler

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
    
    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert input to uppercase.
        
        Args:
            text: Input string (input type)
            ctx: WorkflowContext[str] - can send str messages
        """
        result = text.upper()
        await ctx.send_message(result)

Function-based Executors (Python)

For simple operations, use the @executor decorator:
Python
from agent_framework import executor, WorkflowContext
from typing_extensions import Never

@executor(id="reverse_text")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse text and yield as output.
    
    WorkflowContext[Never, str]:
    - Never: does not send messages to other executors
    - str: yields str as workflow output
    """
    await ctx.yield_output(text[::-1])

# Use in workflow
workflow = WorkflowBuilder(start_executor=upper_case).add_edge(upper_case, reverse_text).build()

Explicit Type Parameters

When you need more control over types, specify them explicitly on the @handler decorator:
Python
class MultiTypeExecutor(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
    
    # Explicitly declare input/output types
    @handler(input=str | int, output=str)
    async def handle_multiple(self, message, ctx) -> None:  # type: ignore
        """Handle str or int input, output str.
        
        When using explicit types:
        - ALL types must be explicit (no introspection)
        - input parameter is required
        - output and workflow_output are optional
        """
        result = str(message)
        await ctx.send_message(result)  # type: ignore
With explicit types on @handler, introspection is disabled. You must specify all types via decorator parameters.

Edge Types

Simple Edges

Connect two executors directly:
workflow = (
    WorkflowBuilder(start_executor=node_a)
    .add_edge(node_a, node_b)
    .add_edge(node_b, node_c)
    .build()
)

Conditional Edges

Route messages based on conditions:
from agent_framework import Edge

# Condition function receives message data
def is_positive(data: dict) -> bool:
    return data.get("value", 0) > 0

workflow = (
    WorkflowBuilder(start_executor=analyzer)
    .add_edge(
        analyzer, 
        positive_handler,
        condition=is_positive
    )
    .add_edge(
        analyzer,
        negative_handler,
        condition=lambda data: data.get("value", 0) <= 0
    )
    .build()
)

Fan-out Edges

Send messages to multiple executors in parallel:
workflow = (
    WorkflowBuilder(start_executor=dispatcher)
    .add_fan_out_edge(
        dispatcher,
        [worker_a, worker_b, worker_c]
    )
    .build()
)

# Optional: selective fan-out
def select_workers(message: dict, available: list[str]) -> list[str]:
    """Return subset of workers based on message."""
    if message.get("priority") == "high":
        return ["worker_a", "worker_b"]
    return available

workflow = (
    WorkflowBuilder(start_executor=dispatcher)
    .add_fan_out_edge(
        dispatcher,
        [worker_a, worker_b, worker_c],
        selection_func=select_workers
    )
    .build()
)

Fan-in Edges

Aggregate messages from multiple sources:
workflow = (
    WorkflowBuilder(start_executor=dispatcher)
    .add_fan_out_edge(dispatcher, [worker_a, worker_b, worker_c])
    .add_fan_in_edge([worker_a, worker_b, worker_c], aggregator)
    .build()
)

Switch-Case Edges

Route to exactly one target based on ordered conditions:
Python
from agent_framework import Case, Default

def is_spam(message: dict) -> bool:
    return message.get("spam_score", 0) > 0.8

def is_uncertain(message: dict) -> bool:
    return 0.4 < message.get("spam_score", 0) <= 0.8

workflow = (
    WorkflowBuilder(start_executor=classifier)
    .add_switch_case_edge_group(
        classifier,
        [
            Case(condition=is_spam, target=spam_handler),
            Case(condition=is_uncertain, target=review_handler),
            Default(target=inbox_handler),  # Fallback when no cases match
        ]
    )
    .build()
)
Switch-case evaluates cases in order and routes to the first match. Always include a Default case as the last option.

WorkflowContext

The WorkflowContext parameter in handlers provides methods for message passing and state management:

Sending Messages

Python
class DataProcessor(Executor):
    @handler
    async def process(
        self, 
        data: dict, 
        ctx: WorkflowContext[dict]  # Can send dict messages
    ) -> None:
        # Send to any connected downstream executor
        await ctx.send_message({"result": "processed"})
        
        # Send to specific executor by ID
        await ctx.send_message(
            {"result": "processed"},
            target_id="validator"
        )

Yielding Outputs

Python
class FinalStep(Executor):
    @handler
    async def finalize(
        self,
        data: dict,
        ctx: WorkflowContext[Never, str]  # Can yield str outputs
    ) -> None:
        # Yield workflow output - available via events.get_outputs()
        await ctx.yield_output("Workflow completed successfully")

State Management

Python
class StatefulExecutor(Executor):
    @handler
    async def process(self, data: dict, ctx: WorkflowContext[dict]) -> None:
        # Set state (persists across super steps)
        ctx.set_state("user_id", data["user_id"])
        ctx.set_state("count", ctx.get_state("count", 0) + 1)
        
        # Get state
        user_id = ctx.get_state("user_id")
        count = ctx.get_state("count", default=0)
        
        await ctx.send_message({"user_id": user_id, "count": count})

WorkflowContext Type Parameters

WorkflowContext uses generic type parameters to define capabilities:
Python
from typing_extensions import Never

# No outputs
WorkflowContext  # Equivalent to WorkflowContext[Never, Never]

# Can send int messages
WorkflowContext[int]

# Can send int messages AND yield str workflow outputs
WorkflowContext[int, str]

# Example usage
class TypedExecutor(Executor):
    @handler
    async def handle(
        self,
        text: str,  # Input type
        ctx: WorkflowContext[int, str]  # Send int, yield str
    ) -> None:
        await ctx.send_message(len(text))  # Send int
        await ctx.yield_output(text.upper())  # Yield str

Running Workflows

Basic Execution

# Non-streaming execution
events = await workflow.run("initial input")

# Get outputs
outputs = events.get_outputs()
for output in outputs:
    print(output)

# Check final state
print(events.get_final_state())  # WorkflowRunState.IDLE

Streaming Execution

# Stream events as they occur
async for event in workflow.run("initial input", stream=True):
    if event.type == "executor_invoked":
        print(f"Executor {event.executor_id} started")
    elif event.type == "executor_completed":
        print(f"Executor {event.executor_id} completed")
    elif event.type == "output":
        print(f"Output: {event.data}")

Workflow Configuration

Maximum Iterations

Python
workflow = (
    WorkflowBuilder(
        start_executor=node,
        max_iterations=10  # Prevent infinite loops
    )
    .add_edge(node, node)  # Self-loop
    .build()
)

Workflow Name

Python
workflow = (
    WorkflowBuilder(start_executor=node)
    .with_name("data-processing-pipeline")
    .add_edge(node, next_node)
    .build()
)

print(workflow.name)  # "data-processing-pipeline"

State Isolation Pattern

Wrap workflow construction in a function for clean state between runs:
Python
def create_workflow():
    """Create workflow with fresh executor instances."""
    node_a = NodeA(id="node_a")
    node_b = NodeB(id="node_b")
    
    return (
        WorkflowBuilder(start_executor=node_a)
        .add_edge(node_a, node_b)
        .build()
    )

# Each call gets fresh state
workflow1 = create_workflow()
workflow2 = create_workflow()
This pattern is recommended for reusable workflows. Each workflow instance maintains independent executor state.

Next Steps

Orchestration Patterns

Learn sequential, concurrent, and parallel execution patterns

Checkpoints

Save and restore workflow state

Build docs developers (and LLMs) love