Skip to main content

Overview

Fast Agent provides powerful workflow capabilities for orchestrating multiple agents. Workflows enable complex task decomposition, parallel processing, sequential pipelines, and iterative refinement.

Workflow Types

Orchestrator Workflows

Orchestrators dynamically plan and execute tasks using available child agents.

Full Planning

Creates a complete execution plan upfront before executing:
from fast_agent import FastAgent

fast = FastAgent("my-app")

@fast.agent(name="researcher", instruction="Research topics thoroughly")
async def researcher(): pass

@fast.agent(name="writer", instruction="Write clear content")
async def writer(): pass

@fast.agent(name="editor", instruction="Edit for quality")
async def editor(): pass

@fast.orchestrator(
    name="content_coordinator",
    agents=["researcher", "writer", "editor"],
    plan_type="full",
    plan_iterations=5
)
async def content_coordinator():
    pass

async with fast.run() as app:
    response = await app.content_coordinator.send(
        "Create an article about quantum computing"
    )
Best for:
  • Well-defined tasks
  • Fixed workflows
  • Predictable requirements

Iterative Planning

Plans and executes step-by-step, adapting based on intermediate results:
@fast.orchestrator(
    name="adaptive_coordinator",
    agents=["researcher", "analyzer", "synthesizer"],
    plan_type="iterative",
    plan_iterations=10
)
async def adaptive_coordinator():
    pass
Best for:
  • Complex, open-ended tasks
  • Exploratory workflows
  • Adaptive problem solving

Iterative Planner

A specialized planner that continuously refines its approach:
@fast.iterative_planner(
    name="research_planner",
    agents=["searcher", "analyzer", "synthesizer"],
    plan_iterations=-1,  # Unlimited iterations
    instruction="Plan research tasks iteratively based on findings"
)
async def research_planner():
    pass

async with fast.run() as app:
    response = await app.research_planner.send(
        "Investigate the latest developments in AI safety"
    )
Configuration:
  • plan_iterations=-1: Unlimited iterations
  • plan_iterations=N: Maximum N iterations
Best for:
  • Research tasks
  • Investigation workflows
  • Long-running explorations

Router Workflows

Routers analyze requests and delegate to the most appropriate agent:
@fast.agent(name="technical_support")
async def technical_support(): pass

@fast.agent(name="billing_support")
async def billing_support(): pass

@fast.agent(name="account_support")
async def account_support(): pass

@fast.router(
    name="support_router",
    agents=["technical_support", "billing_support", "account_support"],
    instruction="Route customer requests to the appropriate department"
)
async def support_router():
    pass

async with fast.run() as app:
    # Automatically routes to billing_support
    response = await app.support_router.send(
        "I was charged twice for my subscription"
    )
Best for:
  • Request triage
  • Department routing
  • Skill-based delegation
  • Topic classification

Chain Workflows

Chains execute agents in a fixed sequence:

Incremental Mode (default)

Each agent receives only the previous agent’s output:
@fast.chain(
    name="simple_pipeline",
    sequence=["researcher", "writer", "editor"],
    cumulative=False
)
async def simple_pipeline():
    pass
Flow:
  1. User input → Researcher
  2. Researcher output → Writer
  3. Writer output → Editor
  4. Editor output → User

Cumulative Mode

Each agent sees all previous outputs:
@fast.chain(
    name="cumulative_pipeline",
    sequence=["researcher", "writer", "editor", "publisher"],
    cumulative=True
)
async def cumulative_pipeline():
    pass
Flow:
  1. User input → Researcher
  2. User input + Researcher output → Writer
  3. User input + Researcher output + Writer output → Editor
  4. All previous outputs → Publisher
Best for:
  • Sequential processing pipelines
  • Multi-stage refinement
  • Assembly line patterns
  • Content production workflows

Parallel Workflows

Parallel agents execute multiple agents simultaneously:

Fan-out Only

Execute agents in parallel without aggregation:
@fast.parallel(
    name="parallel_analysis",
    fan_out=["sentiment_analyzer", "topic_classifier", "entity_extractor"],
    fan_in=None
)
async def parallel_analysis():
    pass
Returns array of individual results.

Fan-out with Fan-in

Execute in parallel and aggregate results:
@fast.agent(name="aggregator")
async def aggregator(): pass

@fast.parallel(
    name="comprehensive_analysis",
    fan_out=["sentiment_analyzer", "topic_classifier", "entity_extractor"],
    fan_in="aggregator",
    include_request=True
)
async def comprehensive_analysis():
    pass

async with fast.run() as app:
    response = await app.comprehensive_analysis.send(
        "Analyze this customer feedback: ..."
    )
    # Returns aggregated result from fan_in agent
Parameters:
  • include_request=True: Fan-in agent receives original request plus all parallel results
  • include_request=False: Fan-in agent receives only parallel results
Best for:
  • Multi-perspective analysis
  • Parallel data processing
  • Consensus building
  • Simultaneous evaluations

Evaluator-Optimizer Workflows

Iteratively generate and refine outputs based on quality evaluation:
@fast.agent(
    name="content_generator",
    instruction="Generate high-quality content"
)
async def content_generator(): pass

@fast.agent(
    name="quality_evaluator",
    instruction="""Evaluate content quality and return rating:
    EXCELLENT, GOOD, FAIR, or POOR
    Provide specific feedback for improvement."""
)
async def quality_evaluator(): pass

@fast.evaluator_optimizer(
    name="quality_content",
    generator="content_generator",
    evaluator="quality_evaluator",
    min_rating="EXCELLENT",
    max_refinements=5,
    refinement_instruction="Improve the content based on the evaluator's feedback"
)
async def quality_content():
    pass

async with fast.run() as app:
    response = await app.quality_content.send(
        "Write a professional email announcing our new product"
    )
Process:
  1. Generator creates initial output
  2. Evaluator rates and provides feedback
  3. If rating < min_rating and refinements < max_refinements:
    • Generator refines based on feedback
    • Return to step 2
  4. Return final output
Best for:
  • Quality-critical outputs
  • Code generation and refinement
  • Professional writing
  • Iterative improvement

MAKER Workflows

Statistical error correction through voting consensus:
@fast.agent(
    name="answer_generator",
    instruction="Provide a concise answer",
    model="openai:gpt-4o-mini"  # Cost-effective model
)
async def answer_generator(): pass

@fast.maker(
    name="reliable_answers",
    worker="answer_generator",
    k=3,  # First to 3-vote margin wins
    max_samples=50,
    match_strategy="normalized",
    red_flag_max_length=500
)
async def reliable_answers():
    pass

async with fast.run() as app:
    # High-reliability answer via statistical consensus
    response = await app.reliable_answers.send(
        "What is the capital of France?"
    )
Match Strategies:
  • "exact": Character-for-character match
  • "normalized": Ignore whitespace and case differences
  • "structured": Parse as JSON and compare structurally
Red Flag Length: Per the MAKER paper, overly long responses correlate with errors. Setting red_flag_max_length filters out verbose responses. Best for:
  • High-stakes decisions
  • Error-critical outputs
  • Verification tasks
  • Calculations and factual queries

Workflow Composition

Nested Workflows

Workflows can use other workflows as child agents:
# Define sub-pipelines
@fast.chain(
    name="data_pipeline",
    sequence=["collector", "cleaner", "validator"]
)
async def data_pipeline(): pass

@fast.chain(
    name="analysis_pipeline",
    sequence=["analyzer", "visualizer", "reporter"]
)
async def analysis_pipeline(): pass

# Compose into meta-workflow
@fast.orchestrator(
    name="master_coordinator",
    agents=["data_pipeline", "analysis_pipeline"],
    plan_type="full"
)
async def master_coordinator():
    pass

Parallel + Chain Pattern

# Parallel analysis
@fast.parallel(
    name="multi_perspective",
    fan_out=["expert1", "expert2", "expert3"],
    fan_in="synthesizer"
)
async def multi_perspective(): pass

# Sequential refinement of parallel results
@fast.chain(
    name="analysis_workflow",
    sequence=["multi_perspective", "editor", "publisher"]
)
async def analysis_workflow(): pass

Router + Workflow Pattern

# Define specialized workflows
@fast.chain(name="technical_workflow", sequence=["diagnose", "fix", "verify"])
async def technical_workflow(): pass

@fast.chain(name="billing_workflow", sequence=["review", "adjust", "confirm"])
async def billing_workflow(): pass

# Route to appropriate workflow
@fast.router(
    name="smart_router",
    agents=["technical_workflow", "billing_workflow", "general_support"]
)
async def smart_router():
    pass

Best Practices

Choose the Right Workflow

  • Tasks require dynamic planning
  • Requirements are complex or open-ended
  • You need adaptive task decomposition
  • Clear categories or departments exist
  • Requests need classification
  • Only one agent should handle each request
  • Workflow is sequential and predictable
  • Each step depends on the previous
  • You have a fixed pipeline
  • Tasks can run independently
  • You need multiple perspectives
  • Speed is important
  • Results need aggregation
  • Output quality is critical
  • You can define quality criteria
  • Iterative refinement makes sense
  • Reliability is paramount
  • Cost-effective models are preferred
  • Statistical consensus is valuable
  • Errors are unacceptable

Workflow Configuration

Model Selection:
@fast.orchestrator(
    name="coordinator",
    agents=["agent1", "agent2"],
    model="anthropic:claude-3-5-sonnet-20241022",  # Override default
    request_params={"temperature": 0.3}  # Lower temperature for planning
)
async def coordinator(): pass
History Management:
# Orchestrators typically don't need history
@fast.orchestrator(
    name="planner",
    agents=["worker1", "worker2"],
    use_history=False  # Default for orchestrator
)

# But routers might benefit from it
@fast.router(
    name="smart_router",
    agents=["agent1", "agent2"],
    use_history=True  # Remember routing context
)

Error Handling

async with fast.run() as app:
    try:
        response = await app.workflow.send("task")
    except Exception as e:
        # Handle workflow errors
        print(f"Workflow failed: {e}")

Performance Optimization

Parallel Execution:
# Maximize parallelism for independent tasks
@fast.parallel(
    name="fast_analysis",
    fan_out=["analyzer1", "analyzer2", "analyzer3", "analyzer4"]
)
Chain Optimization:
# Use incremental mode when full context isn't needed
@fast.chain(
    name="efficient_pipeline",
    sequence=["step1", "step2", "step3"],
    cumulative=False  # Smaller context per step
)

See Also

Build docs developers (and LLMs) love