Overview
Workflows enable complex multi-stage processing by orchestrating multiple specialized agents. TheWorkflow class provides a pattern for building sequential or parallel agent pipelines.
Basic Workflow Pattern
Agno Workflow
from agno.workflow import Workflow, RunResponse, RunEvent
from agno.agent import Agent
from agno.models.nebius import Nebius
from typing import Iterator
import os
class ResearchWorkflow(Workflow):
"""
A multi-stage research workflow that:
1. Gathers information from the web
2. Analyzes and synthesizes findings
3. Produces a structured report
"""
# Step 1: Information gathering
searcher: Agent = Agent(
tools=[ScrapeGraphTools(api_key=os.getenv("SGAI_API_KEY"))],
model=Nebius(
id="deepseek-ai/DeepSeek-V3-0324",
api_key=os.getenv("NEBIUS_API_KEY")
),
show_tool_calls=True,
markdown=True,
description="Expert at finding and extracting information from the web",
instructions=[
"Search for the most recent and authoritative sources",
"Extract key facts, statistics, and expert opinions",
"Cover multiple perspectives and highlight controversies",
"Include relevant statistics and data",
"Organize findings in a clear, structured format",
"Mention references and sources"
]
)
# Step 2: Analysis
analyst: Agent = Agent(
model=Nebius(
id="deepseek-ai/DeepSeek-V3-0324",
api_key=os.getenv("NEBIUS_API_KEY")
),
markdown=True,
description="Critical thinker who synthesizes research into actionable insights",
instructions=[
"Identify key themes, trends, and contradictions",
"Highlight the most important findings and implications",
"Suggest areas for further investigation if gaps exist",
"Present analysis in a structured, easy-to-read format",
"Extract and list ONLY reference links provided by the researcher",
"Do NOT create or hallucinate any links"
]
)
# Step 3: Report writing
writer: Agent = Agent(
model=Nebius(
id="deepseek-ai/DeepSeek-V3-0324",
api_key=os.getenv("NEBIUS_API_KEY")
),
markdown=True,
description="Professional technical writer who crafts clear, structured reports",
instructions=[
"Write an engaging introduction that sets context",
"Organize main findings into logical sections with headings",
"Use bullet points, tables, or lists for clarity",
"Conclude with summary and actionable recommendations",
"Include References section ONLY if analyst provided actual links",
"Use ONLY reference links explicitly provided by the analyst"
]
)
def run(self, topic: str) -> Iterator[RunResponse]:
"""
Orchestrates the research, analysis, and report writing process.
Args:
topic: The research topic
Yields:
RunResponse objects containing the final report chunks
"""
# Step 1: Research
research_content = self.searcher.run(topic)
# Step 2: Analysis
analysis = self.analyst.run(research_content.content)
# Step 3: Report Writing (with streaming)
report = self.writer.run(analysis.content, stream=True)
yield from report
Workflow Class
Base class for creating multi-agent workflows.
Agent Definitions
Define specialized agents as class attributes:class MyWorkflow(Workflow):
agent_name: Agent = Agent(
model=model,
tools=[tools],
description="Agent purpose",
instructions=["Specific instructions"]
)
run() Method
Implement therun() method to orchestrate agent execution:
def run(self, input_data: str) -> Iterator[RunResponse]:
# Execute agents sequentially
result1 = self.agent1.run(input_data)
result2 = self.agent2.run(result1.content)
# Stream final output
final = self.agent3.run(result2.content, stream=True)
yield from final
Running Workflows
Basic Execution
workflow = ResearchWorkflow()
result = workflow.run(topic="AI trends in 2026")
# For streaming responses
for chunk in result:
if chunk.content:
print(chunk.content, end="")
Collecting Full Response
def run_research(query: str) -> str:
workflow = ResearchWorkflow()
result_iterator = workflow.run(topic=query)
# Collect all streaming content
full_report = ""
for chunk in result_iterator:
if chunk.content:
full_report += chunk.content
return full_report
report = run_research("Latest developments in quantum computing")
print(report)
Workflow Patterns
Sequential Pipeline
Each agent processes the output of the previous agent:class SequentialWorkflow(Workflow):
step1: Agent = Agent(...)
step2: Agent = Agent(...)
step3: Agent = Agent(...)
def run(self, input_data: str) -> Iterator[RunResponse]:
result1 = self.step1.run(input_data)
result2 = self.step2.run(result1.content)
result3 = self.step3.run(result2.content, stream=True)
yield from result3
Parallel Processing with Aggregation
Multiple agents process the same input, then results are combined:from typing import List
class ParallelWorkflow(Workflow):
analyzer1: Agent = Agent(...)
analyzer2: Agent = Agent(...)
analyzer3: Agent = Agent(...)
aggregator: Agent = Agent(...)
def run(self, input_data: str) -> Iterator[RunResponse]:
# Run agents in parallel (conceptually)
result1 = self.analyzer1.run(input_data)
result2 = self.analyzer2.run(input_data)
result3 = self.analyzer3.run(input_data)
# Combine results
combined = f"""
Analysis 1: {result1.content}
Analysis 2: {result2.content}
Analysis 3: {result3.content}
"""
# Aggregate findings
final = self.aggregator.run(combined, stream=True)
yield from final
Conditional Branching
Route to different agents based on input or intermediate results:class ConditionalWorkflow(Workflow):
classifier: Agent = Agent(...)
specialist_a: Agent = Agent(...)
specialist_b: Agent = Agent(...)
def run(self, input_data: str) -> Iterator[RunResponse]:
# Classify the input
classification = self.classifier.run(
f"Classify this query: {input_data}"
)
# Route to appropriate specialist
if "technical" in classification.content.lower():
result = self.specialist_a.run(input_data, stream=True)
else:
result = self.specialist_b.run(input_data, stream=True)
yield from result
Specialized Workflow Examples
Financial Analysis Workflow
from agno.workflow import Workflow
from agno.agent import Agent
from agno.models.nebius import Nebius
from agno.tools.yfinance import YFinanceTools
from agno.tools.duckduckgo import DuckDuckGoTools
import os
class FinancialAnalysisWorkflow(Workflow):
# Data gatherer
data_collector: Agent = Agent(
name="DataCollector",
model=Nebius(id="Qwen/Qwen3-32B", api_key=os.getenv("NEBIUS_API_KEY")),
tools=[
YFinanceTools(
stock_price=True,
company_info=True,
analyst_recommendations=True,
stock_fundamentals=True
),
DuckDuckGoTools(news=True)
],
instructions=[
"Gather comprehensive financial data",
"Include stock prices, fundamentals, and recent news",
"Organize data in a structured format"
]
)
# Fundamental analyst
fundamental_analyst: Agent = Agent(
name="FundamentalAnalyst",
model=Nebius(id="meta-llama/Llama-3.3-70B-Instruct", api_key=os.getenv("NEBIUS_API_KEY")),
description="Expert in company valuation and financial analysis",
instructions=[
"Analyze company fundamentals and financial health",
"Evaluate P/E ratios, revenue growth, and profitability",
"Assess competitive position and market share",
"Provide investment thesis"
]
)
# Risk analyst
risk_analyst: Agent = Agent(
name="RiskAnalyst",
model=Nebius(id="meta-llama/Llama-3.3-70B-Instruct", api_key=os.getenv("NEBIUS_API_KEY")),
description="Expert in risk assessment and mitigation",
instructions=[
"Identify market risks and company-specific risks",
"Analyze volatility and downside scenarios",
"Suggest risk mitigation strategies"
]
)
# Report writer
report_writer: Agent = Agent(
name="ReportWriter",
model=Nebius(id="deepseek-ai/DeepSeek-V3-0324", api_key=os.getenv("NEBIUS_API_KEY")),
markdown=True,
instructions=[
"Create comprehensive investment report",
"Include executive summary, analysis, and recommendations",
"Use tables for financial data",
"Provide clear buy/hold/sell recommendation"
]
)
def run(self, symbol: str) -> Iterator[RunResponse]:
# Gather data
data = self.data_collector.run(f"Collect all financial data for {symbol}")
# Parallel analysis
fundamental = self.fundamental_analyst.run(data.content)
risk = self.risk_analyst.run(data.content)
# Combine and report
combined = f"""
Stock: {symbol}
Fundamental Analysis:
{fundamental.content}
Risk Analysis:
{risk.content}
"""
report = self.report_writer.run(combined, stream=True)
yield from report
Content Creation Workflow
class ContentCreationWorkflow(Workflow):
researcher: Agent = Agent(
name="Researcher",
model=Nebius(id="Qwen/Qwen3-32B", api_key=os.getenv("NEBIUS_API_KEY")),
tools=[DuckDuckGoTools(search=True, news=True)],
instructions=["Research topic thoroughly", "Find latest information"]
)
outliner: Agent = Agent(
name="Outliner",
model=Nebius(id="meta-llama/Llama-3.3-70B-Instruct", api_key=os.getenv("NEBIUS_API_KEY")),
instructions=["Create detailed content outline", "Structure sections logically"]
)
writer: Agent = Agent(
name="Writer",
model=Nebius(id="deepseek-ai/DeepSeek-V3-0324", api_key=os.getenv("NEBIUS_API_KEY")),
markdown=True,
instructions=["Write engaging, informative content", "Follow outline structure"]
)
editor: Agent = Agent(
name="Editor",
model=Nebius(id="meta-llama/Llama-3.3-70B-Instruct", api_key=os.getenv("NEBIUS_API_KEY")),
markdown=True,
instructions=["Edit for clarity and flow", "Fix grammar and style issues"]
)
def run(self, topic: str) -> Iterator[RunResponse]:
research = self.researcher.run(f"Research: {topic}")
outline = self.outliner.run(f"Create outline for: {topic}\n\nResearch: {research.content}")
draft = self.writer.run(f"Write content:\n\nOutline: {outline.content}\n\nResearch: {research.content}")
final = self.editor.run(f"Edit this draft:\n\n{draft.content}", stream=True)
yield from final
Best Practices
1. Clear Agent Responsibilities
Each agent should have a specific, well-defined role:searcher: Agent = Agent(
description="Expert at finding and extracting information from the web",
instructions=["Search for authoritative sources", "Extract key facts"]
)
analyst: Agent = Agent(
description="Critical thinker who synthesizes research into insights",
instructions=["Identify key themes", "Highlight important findings"]
)
2. Use Appropriate Models
Match model capabilities to agent tasks:# Fast model for simple tasks
data_collector: Agent = Agent(
model=Nebius(id="Qwen/Qwen3-30B-A3B", api_key=key)
)
# Powerful model for complex reasoning
analyst: Agent = Agent(
model=Nebius(id="deepseek-ai/DeepSeek-V3-0324", api_key=key)
)
3. Handle Intermediate Results
def run(self, input_data: str) -> Iterator[RunResponse]:
# Store intermediate results for debugging
step1_result = self.agent1.run(input_data)
logger.info(f"Step 1 complete: {len(step1_result.content)} chars")
step2_result = self.agent2.run(step1_result.content)
logger.info(f"Step 2 complete: {len(step2_result.content)} chars")
# Stream final output
final = self.agent3.run(step2_result.content, stream=True)
yield from final
4. Enable Streaming for Final Output
def run(self, input_data: str) -> Iterator[RunResponse]:
# Non-streaming for intermediate steps
result1 = self.agent1.run(input_data)
result2 = self.agent2.run(result1.content)
# Stream final output for better UX
final = self.agent3.run(result2.content, stream=True)
yield from final
5. Document Workflow Logic
class MyWorkflow(Workflow):
"""
Multi-stage workflow for [purpose].
Stages:
1. [Agent 1] - [Purpose]
2. [Agent 2] - [Purpose]
3. [Agent 3] - [Purpose]
Usage:
workflow = MyWorkflow()
result = workflow.run(input_data)
"""
Error Handling
from agno.utils.log import logger
class RobustWorkflow(Workflow):
agent1: Agent = Agent(...)
agent2: Agent = Agent(...)
def run(self, input_data: str) -> Iterator[RunResponse]:
try:
result1 = self.agent1.run(input_data)
if not result1.content:
logger.error("Agent 1 returned empty result")
raise ValueError("No content from agent 1")
result2 = self.agent2.run(result1.content, stream=True)
yield from result2
except Exception as e:
logger.error(f"Workflow error: {e}")
raise