Skip to main content

Overview

Workflows enable complex multi-stage processing by orchestrating multiple specialized agents. The Workflow class provides a pattern for building sequential or parallel agent pipelines.

Basic Workflow Pattern

Agno Workflow

from agno.workflow import Workflow, RunResponse, RunEvent
from agno.agent import Agent
from agno.models.nebius import Nebius
from typing import Iterator
import os

class ResearchWorkflow(Workflow):
    """
    A multi-stage research workflow that:
    1. Gathers information from the web
    2. Analyzes and synthesizes findings
    3. Produces a structured report
    """
    
    # Step 1: Information gathering
    searcher: Agent = Agent(
        tools=[ScrapeGraphTools(api_key=os.getenv("SGAI_API_KEY"))],
        model=Nebius(
            id="deepseek-ai/DeepSeek-V3-0324",
            api_key=os.getenv("NEBIUS_API_KEY")
        ),
        show_tool_calls=True,
        markdown=True,
        description="Expert at finding and extracting information from the web",
        instructions=[
            "Search for the most recent and authoritative sources",
            "Extract key facts, statistics, and expert opinions",
            "Cover multiple perspectives and highlight controversies",
            "Include relevant statistics and data",
            "Organize findings in a clear, structured format",
            "Mention references and sources"
        ]
    )
    
    # Step 2: Analysis
    analyst: Agent = Agent(
        model=Nebius(
            id="deepseek-ai/DeepSeek-V3-0324",
            api_key=os.getenv("NEBIUS_API_KEY")
        ),
        markdown=True,
        description="Critical thinker who synthesizes research into actionable insights",
        instructions=[
            "Identify key themes, trends, and contradictions",
            "Highlight the most important findings and implications",
            "Suggest areas for further investigation if gaps exist",
            "Present analysis in a structured, easy-to-read format",
            "Extract and list ONLY reference links provided by the researcher",
            "Do NOT create or hallucinate any links"
        ]
    )
    
    # Step 3: Report writing
    writer: Agent = Agent(
        model=Nebius(
            id="deepseek-ai/DeepSeek-V3-0324",
            api_key=os.getenv("NEBIUS_API_KEY")
        ),
        markdown=True,
        description="Professional technical writer who crafts clear, structured reports",
        instructions=[
            "Write an engaging introduction that sets context",
            "Organize main findings into logical sections with headings",
            "Use bullet points, tables, or lists for clarity",
            "Conclude with summary and actionable recommendations",
            "Include References section ONLY if analyst provided actual links",
            "Use ONLY reference links explicitly provided by the analyst"
        ]
    )
    
    def run(self, topic: str) -> Iterator[RunResponse]:
        """
        Orchestrates the research, analysis, and report writing process.
        
        Args:
            topic: The research topic
            
        Yields:
            RunResponse objects containing the final report chunks
        """
        # Step 1: Research
        research_content = self.searcher.run(topic)
        
        # Step 2: Analysis
        analysis = self.analyst.run(research_content.content)
        
        # Step 3: Report Writing (with streaming)
        report = self.writer.run(analysis.content, stream=True)
        yield from report

Workflow Class

Workflow
class
Base class for creating multi-agent workflows.

Agent Definitions

Define specialized agents as class attributes:
class MyWorkflow(Workflow):
    agent_name: Agent = Agent(
        model=model,
        tools=[tools],
        description="Agent purpose",
        instructions=["Specific instructions"]
    )

run() Method

Implement the run() method to orchestrate agent execution:
def run(self, input_data: str) -> Iterator[RunResponse]:
    # Execute agents sequentially
    result1 = self.agent1.run(input_data)
    result2 = self.agent2.run(result1.content)
    
    # Stream final output
    final = self.agent3.run(result2.content, stream=True)
    yield from final

Running Workflows

Basic Execution

workflow = ResearchWorkflow()
result = workflow.run(topic="AI trends in 2026")

# For streaming responses
for chunk in result:
    if chunk.content:
        print(chunk.content, end="")

Collecting Full Response

def run_research(query: str) -> str:
    workflow = ResearchWorkflow()
    result_iterator = workflow.run(topic=query)
    
    # Collect all streaming content
    full_report = ""
    for chunk in result_iterator:
        if chunk.content:
            full_report += chunk.content
    
    return full_report

report = run_research("Latest developments in quantum computing")
print(report)

Workflow Patterns

Sequential Pipeline

Each agent processes the output of the previous agent:
class SequentialWorkflow(Workflow):
    step1: Agent = Agent(...)
    step2: Agent = Agent(...)
    step3: Agent = Agent(...)
    
    def run(self, input_data: str) -> Iterator[RunResponse]:
        result1 = self.step1.run(input_data)
        result2 = self.step2.run(result1.content)
        result3 = self.step3.run(result2.content, stream=True)
        yield from result3

Parallel Processing with Aggregation

Multiple agents process the same input, then results are combined:
from typing import List

class ParallelWorkflow(Workflow):
    analyzer1: Agent = Agent(...)
    analyzer2: Agent = Agent(...)
    analyzer3: Agent = Agent(...)
    aggregator: Agent = Agent(...)
    
    def run(self, input_data: str) -> Iterator[RunResponse]:
        # Run agents in parallel (conceptually)
        result1 = self.analyzer1.run(input_data)
        result2 = self.analyzer2.run(input_data)
        result3 = self.analyzer3.run(input_data)
        
        # Combine results
        combined = f"""
        Analysis 1: {result1.content}
        
        Analysis 2: {result2.content}
        
        Analysis 3: {result3.content}
        """
        
        # Aggregate findings
        final = self.aggregator.run(combined, stream=True)
        yield from final

Conditional Branching

Route to different agents based on input or intermediate results:
class ConditionalWorkflow(Workflow):
    classifier: Agent = Agent(...)
    specialist_a: Agent = Agent(...)
    specialist_b: Agent = Agent(...)
    
    def run(self, input_data: str) -> Iterator[RunResponse]:
        # Classify the input
        classification = self.classifier.run(
            f"Classify this query: {input_data}"
        )
        
        # Route to appropriate specialist
        if "technical" in classification.content.lower():
            result = self.specialist_a.run(input_data, stream=True)
        else:
            result = self.specialist_b.run(input_data, stream=True)
        
        yield from result

Specialized Workflow Examples

Financial Analysis Workflow

from agno.workflow import Workflow
from agno.agent import Agent
from agno.models.nebius import Nebius
from agno.tools.yfinance import YFinanceTools
from agno.tools.duckduckgo import DuckDuckGoTools
import os

class FinancialAnalysisWorkflow(Workflow):
    # Data gatherer
    data_collector: Agent = Agent(
        name="DataCollector",
        model=Nebius(id="Qwen/Qwen3-32B", api_key=os.getenv("NEBIUS_API_KEY")),
        tools=[
            YFinanceTools(
                stock_price=True,
                company_info=True,
                analyst_recommendations=True,
                stock_fundamentals=True
            ),
            DuckDuckGoTools(news=True)
        ],
        instructions=[
            "Gather comprehensive financial data",
            "Include stock prices, fundamentals, and recent news",
            "Organize data in a structured format"
        ]
    )
    
    # Fundamental analyst
    fundamental_analyst: Agent = Agent(
        name="FundamentalAnalyst",
        model=Nebius(id="meta-llama/Llama-3.3-70B-Instruct", api_key=os.getenv("NEBIUS_API_KEY")),
        description="Expert in company valuation and financial analysis",
        instructions=[
            "Analyze company fundamentals and financial health",
            "Evaluate P/E ratios, revenue growth, and profitability",
            "Assess competitive position and market share",
            "Provide investment thesis"
        ]
    )
    
    # Risk analyst
    risk_analyst: Agent = Agent(
        name="RiskAnalyst",
        model=Nebius(id="meta-llama/Llama-3.3-70B-Instruct", api_key=os.getenv("NEBIUS_API_KEY")),
        description="Expert in risk assessment and mitigation",
        instructions=[
            "Identify market risks and company-specific risks",
            "Analyze volatility and downside scenarios",
            "Suggest risk mitigation strategies"
        ]
    )
    
    # Report writer
    report_writer: Agent = Agent(
        name="ReportWriter",
        model=Nebius(id="deepseek-ai/DeepSeek-V3-0324", api_key=os.getenv("NEBIUS_API_KEY")),
        markdown=True,
        instructions=[
            "Create comprehensive investment report",
            "Include executive summary, analysis, and recommendations",
            "Use tables for financial data",
            "Provide clear buy/hold/sell recommendation"
        ]
    )
    
    def run(self, symbol: str) -> Iterator[RunResponse]:
        # Gather data
        data = self.data_collector.run(f"Collect all financial data for {symbol}")
        
        # Parallel analysis
        fundamental = self.fundamental_analyst.run(data.content)
        risk = self.risk_analyst.run(data.content)
        
        # Combine and report
        combined = f"""
        Stock: {symbol}
        
        Fundamental Analysis:
        {fundamental.content}
        
        Risk Analysis:
        {risk.content}
        """
        
        report = self.report_writer.run(combined, stream=True)
        yield from report

Content Creation Workflow

class ContentCreationWorkflow(Workflow):
    researcher: Agent = Agent(
        name="Researcher",
        model=Nebius(id="Qwen/Qwen3-32B", api_key=os.getenv("NEBIUS_API_KEY")),
        tools=[DuckDuckGoTools(search=True, news=True)],
        instructions=["Research topic thoroughly", "Find latest information"]
    )
    
    outliner: Agent = Agent(
        name="Outliner",
        model=Nebius(id="meta-llama/Llama-3.3-70B-Instruct", api_key=os.getenv("NEBIUS_API_KEY")),
        instructions=["Create detailed content outline", "Structure sections logically"]
    )
    
    writer: Agent = Agent(
        name="Writer",
        model=Nebius(id="deepseek-ai/DeepSeek-V3-0324", api_key=os.getenv("NEBIUS_API_KEY")),
        markdown=True,
        instructions=["Write engaging, informative content", "Follow outline structure"]
    )
    
    editor: Agent = Agent(
        name="Editor",
        model=Nebius(id="meta-llama/Llama-3.3-70B-Instruct", api_key=os.getenv("NEBIUS_API_KEY")),
        markdown=True,
        instructions=["Edit for clarity and flow", "Fix grammar and style issues"]
    )
    
    def run(self, topic: str) -> Iterator[RunResponse]:
        research = self.researcher.run(f"Research: {topic}")
        outline = self.outliner.run(f"Create outline for: {topic}\n\nResearch: {research.content}")
        draft = self.writer.run(f"Write content:\n\nOutline: {outline.content}\n\nResearch: {research.content}")
        final = self.editor.run(f"Edit this draft:\n\n{draft.content}", stream=True)
        yield from final

Best Practices

1. Clear Agent Responsibilities

Each agent should have a specific, well-defined role:
searcher: Agent = Agent(
    description="Expert at finding and extracting information from the web",
    instructions=["Search for authoritative sources", "Extract key facts"]
)

analyst: Agent = Agent(
    description="Critical thinker who synthesizes research into insights",
    instructions=["Identify key themes", "Highlight important findings"]
)

2. Use Appropriate Models

Match model capabilities to agent tasks:
# Fast model for simple tasks
data_collector: Agent = Agent(
    model=Nebius(id="Qwen/Qwen3-30B-A3B", api_key=key)
)

# Powerful model for complex reasoning
analyst: Agent = Agent(
    model=Nebius(id="deepseek-ai/DeepSeek-V3-0324", api_key=key)
)

3. Handle Intermediate Results

def run(self, input_data: str) -> Iterator[RunResponse]:
    # Store intermediate results for debugging
    step1_result = self.agent1.run(input_data)
    logger.info(f"Step 1 complete: {len(step1_result.content)} chars")
    
    step2_result = self.agent2.run(step1_result.content)
    logger.info(f"Step 2 complete: {len(step2_result.content)} chars")
    
    # Stream final output
    final = self.agent3.run(step2_result.content, stream=True)
    yield from final

4. Enable Streaming for Final Output

def run(self, input_data: str) -> Iterator[RunResponse]:
    # Non-streaming for intermediate steps
    result1 = self.agent1.run(input_data)
    result2 = self.agent2.run(result1.content)
    
    # Stream final output for better UX
    final = self.agent3.run(result2.content, stream=True)
    yield from final

5. Document Workflow Logic

class MyWorkflow(Workflow):
    """
    Multi-stage workflow for [purpose].
    
    Stages:
    1. [Agent 1] - [Purpose]
    2. [Agent 2] - [Purpose]
    3. [Agent 3] - [Purpose]
    
    Usage:
        workflow = MyWorkflow()
        result = workflow.run(input_data)
    """

Error Handling

from agno.utils.log import logger

class RobustWorkflow(Workflow):
    agent1: Agent = Agent(...)
    agent2: Agent = Agent(...)
    
    def run(self, input_data: str) -> Iterator[RunResponse]:
        try:
            result1 = self.agent1.run(input_data)
            
            if not result1.content:
                logger.error("Agent 1 returned empty result")
                raise ValueError("No content from agent 1")
            
            result2 = self.agent2.run(result1.content, stream=True)
            yield from result2
            
        except Exception as e:
            logger.error(f"Workflow error: {e}")
            raise

Build docs developers (and LLMs) love