Skip to main content

Overview

The AI Trend Analyzer is a sophisticated 5-agent sequential pipeline built with Google’s Agent Development Kit (ADK). It aggregates AI news from multiple sources, analyzes benchmarks, and provides specific model recommendations from Nebius Token Factory.

5-Agent Pipeline

Sequential orchestration with Google ADK

Multi-Source Research

Exa, Tavily, and Firecrawl integration

Benchmark Analysis

AI model statistics and trends

Model Recommendations

Specific Nebius model suggestions

Architecture Pattern

This agent demonstrates the Sequential Agent Pipeline pattern using Google ADK’s SequentialAgent orchestrator.

Pipeline Structure

from google.adk.agents.sequential_agent import SequentialAgent
from google.adk.agents.llm_agent import LlmAgent
from google.adk.models.lite_llm import LiteLlm

# Model configuration
nebius_model = LiteLlm(
    model="openai/meta-llama/Meta-Llama-3.1-8B-Instruct",
    api_base=os.getenv("NEBIUS_API_BASE"),
    api_key=os.getenv("NEBIUS_API_KEY")
)

# Sequential Pipeline with 5 agents
pipeline = SequentialAgent(
    name="AIPipelineAgent",
    sub_agents=[
        exa_agent,        # Fetch AI news from Twitter/X
        tavily_agent,     # Retrieve AI benchmarks
        summary_agent,    # Combine and format results
        firecrawl_agent,  # Scrape Nebius model data
        analysis_agent    # Deep analysis with recommendations
    ]
)

Five Specialized Agents

1

ExaAgent: AI News Fetcher

Searches Twitter/X for latest AI news, LLM releases, and advancements.
2

TavilyAgent: Benchmark Retriever

Gathers AI benchmarks and statistics from specialized sources.
3

SummaryAgent: Data Synthesizer

Combines Exa and Tavily results into structured markdown.
4

FirecrawlAgent: Model Scraper

Extracts Nebius Token Factory model information and pricing.
5

AnalysisAgent: Recommendation Engine

Performs deep analysis and suggests specific Nebius models.

Agent Implementation

Agent 1: ExaAgent (AI News)

from exa_py import Exa
from datetime import datetime, timedelta

def exa_search_ai(_: str) -> dict:
    """Fetch latest AI news from Twitter/X using Exa."""
    try:
        results = Exa(api_key=os.getenv("EXA_API_KEY")).search_and_contents(
            query="Latest AI news OR new LLM models OR AI/Agents advancements",
            include_domains=["twitter.com", "x.com"],
            num_results=10,
            text=True,
            type="auto",
            highlights={"highlights_per_url": 2, "num_sentences": 3},
            start_published_date=(datetime.now() - timedelta(days=30)).isoformat()
        )
        return {
            "type": "exa",
            "results": [r.__dict__ for r in results.results]
        }
    except Exception as e:
        return {"type": "exa", "error": str(e), "results": []}

exa_agent = LlmAgent(
    name="ExaAgent",
    model=nebius_model,
    description="Fetches latest AI news using Exa",
    instruction="""
        Use exa_search_ai to fetch latest AI, LLMs, and advancements.
        Prefix response with "**🔥ExaAgent:**"
    """,
    tools=[exa_search_ai],
    output_key="exa_results"
)

Agent 2: TavilyAgent (Benchmarks)

from tavily import TavilyClient

def tavily_search_ai_analysis(_: str) -> dict:
    """Retrieve AI benchmarks and statistics using Tavily."""
    try:
        client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
        response = client.search(
            query="AI benchmarks OR AI/LLM statistics OR AI providers analysis",
            search_depth="advanced",
            time_range="week",
            include_domains=["artificialanalysis.ai"]
        )
        return {
            "type": "tavily",
            "results": response.get("results", [])
        }
    except Exception as e:
        return {"type": "tavily", "error": str(e), "results": []}

tavily_agent = LlmAgent(
    name="TavilyAgent",
    model=nebius_model,
    description="Fetches AI benchmarks using Tavily",
    instruction="""
        Use tavily_search_ai_analysis to retrieve benchmarks and analysis.
        Prefix response with "**🐳TavilyAgent:**"
    """,
    tools=[tavily_search_ai_analysis],
    output_key="tavily_results"
)

Agent 3: SummaryAgent (Synthesis)

summary_agent = LlmAgent(
    name="SummaryAgent",
    model=nebius_model,
    description="Summarizes and formats results",
    instruction="""
        You are a summarizer and formatter.
        
        - Combine 'exa_results' (latest AI updates) and 
          'tavily_results' (AI benchmarks)
        - Present structured summary with key trends, new LLMs, statistics
        - Use markdown formatting for clarity
        - Use emojis: 🚀 for launches, 📊 for statistics, 📈 for trends
        - Structure with bullet points and headings
        - Prefix response with "**🍥SummaryAgent:**"
    """,
    tools=[],
    output_key="final_summary"
)

Agent 4: FirecrawlAgent (Model Data)

from firecrawl import FirecrawlApp

def firecrawl_scrape_nebius(_: str) -> dict:
    """Scrape Nebius Token Factory for model information."""
    firecrawl = FirecrawlApp(api_key=os.getenv("FIRECRAWL_API_KEY"))
    try:
        scrape_result = firecrawl.scrape_url(
            url="https://tokenfactory.nebius.com/",
            formats=["markdown"],
            only_main_content=True
        )
        
        if scrape_result.success:
            return {
                "type": "firecrawl",
                "markdown": scrape_result.markdown
            }
        else:
            return {"type": "firecrawl", "error": "Scraping failed."}
    except Exception as e:
        return {"type": "firecrawl", "error": str(e)}

firecrawl_agent = LlmAgent(
    name="FirecrawlAgent",
    model=nebius_model,
    description="Scrapes Nebius Token Factory using Firecrawl",
    instruction="""
        Use firecrawl_scrape_nebius to fetch markdown content from 
        Nebius Token Factory website.
        Prefix response with "**🔥FirecrawlAgent:**"
    """,
    tools=[firecrawl_scrape_nebius],
    output_key="firecrawl_content"
)

Agent 5: AnalysisAgent (Recommendations)

analysis_agent = LlmAgent(
    name="AnalysisAgent",
    model=LiteLlm(
        model="openai/nvidia/Llama-3_1-Nemotron-Ultra-253B-v1",
        api_base=api_base,
        api_key=api_key
    ),
    instruction="""
        You are an AI analyst specializing in LLM trends.
        
        ANALYSIS PROCESS:
        1. Analyze 'final_summary' with 'exa_results' and 'tavily_results'
        2. Identify key trends, growth areas, and statistics
        3. Examine 'firecrawl_content' for Nebius model details:
           - Model names, pricing, token limits, availability
        
        MATCHING STRATEGY:
        - Focus on functional capabilities and use cases (not just names)
        - Cross-reference LLM functionalities with Nebius offerings
        - Prioritize models with similar:
          * Context window size
          * Training data
          * Specialized capabilities
        - Use metadata: descriptions, tags, categories
        
        RECOMMENDATIONS:
        - If relevant LLM found on Nebius: provide specific recommendation
          with features, pricing, token limits, benefits
        - If no exact match: suggest alternatives with closest alignment
        - If close match: suggest fine-tuning possibilities
        
        FORMAT:
        - Clear, concise language
        - Quantifiable data and insights
        - Markdown tables for statistics:
        
        | Metric | Value |
        |---|---|
        | Growth Rate | 25% |
        
        Prefix response with "**🔍AnalysisAgent:**"
    """,
    description="Analyzes summary and presents insights",
    output_key="analysis_results"
)

Sequential Orchestration

Pipeline Execution

from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.genai import types

# Session management
APP_NAME = "ai_analysis_pipeline"
USER_ID = "user"
SESSION_ID = "ai_analysis_session"

session_service = InMemorySessionService()
session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID
)

# Runner setup
runner = Runner(
    agent=pipeline,
    app_name=APP_NAME,
    session_service=session_service
)

# Execute pipeline
def run_ai_analysis():
    content = types.Content(
        role="user",
        parts=[types.Part(text="Start the AI analysis")]
    )
    
    events = runner.run(
        user_id=USER_ID,
        session_id=SESSION_ID,
        new_message=content
    )
    
    for event in events:
        if event.is_final_response():
            print("AI News Analysis and Insights:")
            print(event.content.parts[0].text)

Agent Communication Flow

Data Flow Between Agents

# Each agent accesses previous outputs via output_key

# Agent 1 produces:
exa_results = {"type": "exa", "results": [...]}

# Agent 2 produces:
tavily_results = {"type": "tavily", "results": [...]}

# Agent 3 accesses both:
summary_agent.instruction = """
    Combine 'exa_results' and 'tavily_results'...
"""
# Produces:
final_summary = "Structured markdown summary..."

# Agent 4 produces:
firecrawl_content = {"type": "firecrawl", "markdown": "..."}

# Agent 5 accesses all:
analysis_agent.instruction = """
    Analyze 'final_summary', 'exa_results', 'tavily_results',
    and 'firecrawl_content'...
"""

ADK CLI Integration

Google ADK provides CLI tools for running and debugging:
# Run agent directly in terminal
adk run analyzer_agent

# Interactive conversation
> Start the AI analysis
[Agent executes pipeline and returns results]

Advanced Patterns

Tool Function Signature

# ADK tools accept string input (unused in this case)
def exa_search_ai(_: str) -> dict:
    # Ignores input, uses hardcoded query
    results = Exa(...).search_and_contents(
        query="Latest AI news...",
        ...
    )
    return {"type": "exa", "results": [...]}

# Agent calls tool without needing to pass query
exa_agent = LlmAgent(
    tools=[exa_search_ai],
    instruction="Use exa_search_ai to fetch..."
)

Error Handling in Tools

def tavily_search_ai_analysis(_: str) -> dict:
    try:
        client = TavilyClient(api_key=os.getenv("TAVILY_API_KEY"))
        response = client.search(...)
        return {"type": "tavily", "results": response.get("results", [])}
    except Exception as e:
        # Return error in standardized format
        return {
            "type": "tavily",
            "error": f"Tavily search failed: {str(e)}",
            "results": []
        }

Multi-Model Strategy

# Lightweight model for data gathering
nebius_model = LiteLlm(
    model="openai/meta-llama/Meta-Llama-3.1-8B-Instruct"
)

# Powerful model for final analysis
analysis_model = LiteLlm(
    model="openai/nvidia/Llama-3_1-Nemotron-Ultra-253B-v1"
)

# Agents 1-4 use lightweight model
exa_agent = LlmAgent(model=nebius_model, ...)

# Agent 5 uses powerful model
analysis_agent = LlmAgent(model=analysis_model, ...)

Configuration

Environment Variables

# AI Inference
NEBIUS_API_KEY=your_nebius_api_key
NEBIUS_API_BASE=https://api.studio.nebius.ai/v1

# Search APIs
EXA_API_KEY=your_exa_api_key
TAVILY_API_KEY=your_tavily_api_key

# Web Scraping
FIRECRAWL_API_KEY=your_firecrawl_api_key

Model Configuration

from google.adk.models.lite_llm import LiteLlm

model = LiteLlm(
    model="openai/provider/model-name",  # OpenAI-compatible format
    api_base=os.getenv("NEBIUS_API_BASE"),
    api_key=os.getenv("NEBIUS_API_KEY")
)

Use Cases

AI Market Research

Track latest AI releases, trends, and benchmarks

Model Selection

Get recommendations for specific use cases

Competitive Analysis

Compare AI providers and model capabilities

Trend Monitoring

Stay updated on AI industry developments

Project Structure

trend_analyzer_agent/
├── agent.py            # 5-agent pipeline implementation
├── __init__.py         # Package initialization
├── requirements.txt    # Dependencies
├── .env               # API keys (not committed)
└── .env.example       # Environment template

Conference Talk Generator

Similar ADK pipeline with RAG integration

Deep Researcher

Sequential workflow with different framework

Learn More

Multi-Agent Patterns

Sequential and parallel agent orchestration

Advanced Agents

More advanced agent examples

Model Providers

Configure different LLM providers

Build docs developers (and LLMs) love