Skip to main content

Overview

The RAGAgent class orchestrates the complete RAG (Retrieval-Augmented Generation) flow including question analysis, multi-source search, response generation, and iterative improvement.

Class Definition

RAGAgent

from agent.rag.rag_agent import RAGAgent

agent = RAGAgent(openai_api_key="your-api-key")

Constructor

__init__(openai_api_key=None)

Initialize the RAG Agent with all necessary components.
openai_api_key
Optional[str]
default:"None"
OpenAI API key for LLM operations. If None, will attempt to load from environment variables.
Raises:
  • ValueError - If OpenAI API key is not provided and not found in environment
import os
from agent.rag.rag_agent import RAGAgent

# With explicit API key
agent = RAGAgent(openai_api_key="sk-...")

# From environment variable
agent = RAGAgent()  # Uses OPENAI_API_KEY env var

Core Methods

set_database_connection(db_connection)

Set the database connection for retrieving conversation history.
db_connection
Database Connection
Database connection object for conversation memory
from psycopg2 import connect

# Create database connection
conn = connect(database="finance_db", user="user")

# Set on agent
agent.set_database_connection(conn)

RAG Flow Pipeline

The RAG agent executes a multi-stage pipeline:

Stage 1: Setup

  • Initialize context and logging
  • Set maximum iterations

Stage 2: Reasoning

  • Analyze question with LLM
  • Extract metadata (tickers, quarters)
  • Explain research approach

Stage 3: Search Planning

  • Resolve temporal references to specific quarters
  • Generate queries for each data source
Runs searches in parallel for maximum performance:

Stage 5: Context Preparation

  • Build news/10-K context strings
  • Combine citations from all sources

Stage 6: Improvement Loop

  • Generate initial answer
  • Evaluate quality and confidence
  • Perform follow-up searches if needed
  • Re-generate answer with additional context

Stage 7: Finalization

  • Deduplicate citations
  • Build final result structure
  • Update conversation memory

Internal Methods

Question Analysis

async def _perform_question_analysis(question: str, conversation_id: str) -> tuple
Performs comprehensive question analysis:
question
str
The user’s question to analyze
conversation_id
str
Unique conversation identifier
Returns: tuple of (question_analysis, target_quarters, error_dict)
  • question_analysis (Dict): Analysis results including status, tickers, question type, processed question, and quarter context
  • target_quarters (List[str]): Quarters to search (e.g., ['2025_q1', '2025_q2'])
  • error_dict (Dict): Error information if validation fails

Search Execution

async def _execute_search(ctx: RAGFlowContext) -> None
Execute search based on question type (general vs ticker-specific). Reads from context:
  • question
  • question_analysis
  • target_quarters
Writes to context:
  • individual_results
  • all_chunks
  • all_citations
  • search_time
  • is_general_question
  • is_multi_ticker
  • tickers_to_process
  • target_quarter

Ticker Processing

def _process_single_ticker_sync(
    ticker: str,
    question: str,
    processed_question: str,
    is_multi_ticker: bool,
    target_quarters: List[str]
) -> Dict[str, Any]
Process a single ticker with synchronous search and answer generation.
ticker
str
Ticker symbol to process
question
str
Original user question
processed_question
str
Cleaned/normalized question
is_multi_ticker
bool
Whether this is part of multi-ticker query
target_quarters
List[str]
Quarters to search
Returns: Dictionary containing ticker, answer, chunks, context_chunks, and citations

Parallel Processing

def _process_tickers_parallel_sync(
    tickers: List[str],
    question: str,
    processed_question: str,
    is_multi_ticker: bool,
    target_quarters: List[str]
) -> List[Dict[str, Any]]
Process multiple tickers in parallel using ThreadPoolExecutor.
tickers
List[str]
List of ticker symbols to process
Returns: List of result dictionaries, one per ticker

Configuration

The agent uses a Config object with these key settings:
hybrid_search_enabled
bool
default:"True"
Enable/disable hybrid search (vector + keyword)
vector_weight
float
default:"0.7"
Weight for vector search results in hybrid search
keyword_weight
float
default:"0.3"
Weight for keyword search results in hybrid search
max_quarters
int
default:"12"
Maximum number of quarters to search (3 years)
max_tickers
int
default:"4"
Maximum number of tickers to process in parallel
chunks_per_quarter
int
default:"15"
Number of chunks to retrieve per quarter

Usage Examples

from agent.rag.rag_agent import RAGAgent
import os

# Initialize agent
agent = RAGAgent(openai_api_key=os.getenv("OPENAI_API_KEY"))

# The agent is ready to process questions
# Question analysis and search will happen automatically
# when the agent's execute method is called

Components

The RAG Agent initializes and orchestrates several components:

DatabaseManager

Manages database connections and queries

QuestionAnalyzer

Analyzes questions to extract intent and entities

ReasoningPlanner

Plans research approach and strategy

SearchPlanner

Plans which data sources to query

SearchEngine

Executes hybrid search operations

ResponseGenerator

Generates natural language responses

TavilyService

Fetches recent news articles

SECFilingsService

Retrieves and processes 10-K filings

Logging

The RAG Agent provides detailed logging at multiple levels:
import logging

# Configure logging to see agent operations
logging.basicConfig(level=logging.INFO)

# Specific loggers:
# - 'rag_agent' - RAG flow operations
# - 'rag_system' - Low-level system operations

Error Handling

The agent handles various error scenarios:
  • Missing API Key: Raises ValueError on initialization
  • Database Errors: Logs warning and continues with default config
  • Quarter Unavailable: Returns user-friendly error with available quarters
  • No Results: Returns empty context response when no data is found
  • Search Failures: Falls back gracefully and logs errors

Performance Optimization

The RAG Agent includes several performance optimizations:
  1. Parallel Search: Multiple data sources searched concurrently
  2. Thread Pool: CPU-bound operations executed in thread pool
  3. Multi-Ticker Parallel: Tickers processed simultaneously
  4. Multi-Quarter Parallel: Quarters searched in parallel
  5. Async Operations: Non-blocking I/O operations

SearchEngine

Search operations and hybrid search implementation

Agent

Main agent interface and aliases

Build docs developers (and LLMs) love