Skip to main content
The components directory contains reusable, self-contained Haystack pipeline building blocks that implement specific retrieval and generation sub-tasks. Feature pipelines compose these components rather than reimplementing the same logic.

Component overview

AgenticRouter

LLM-based decision-making for agentic RAG with tool selection and self-reflection

ContextCompressor

Reduces retrieved context using abstractive, extractive, or relevance filtering

QueryEnhancer

Multi-query, HyDE, and step-back query expansion

ResultMerger

RRF and weighted fusion for hybrid search results

AgenticRouter

An LLM-based decision-making component for agentic RAG pipelines.

Capabilities

  • Tool selection: Given a query, selects the appropriate processing path ("retrieval", "web_search", "calculation", or "reasoning")
  • Answer quality evaluation: Sends the query, draft answer, and retrieved context to the LLM and receives a JSON-structured assessment
  • Refinement decision: Computes whether the average quality score falls below a threshold
  • Answer refinement: Given issues and suggestions from evaluation, sends a targeted revision request to the LLM
  • Self-reflection loop: Orchestrates the full evaluate-refine cycle for up to max_iterations rounds

Implementation

src/vectordb/haystack/components/agentic_router.py
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.utils import Secret

class AgenticRouter:
    """Route and orchestrate RAG with agent-like behavior."""

    def __init__(
        self,
        model: str = "llama-3.3-70b-versatile",
        api_key: str | None = None,
        api_base_url: str = "https://api.groq.com/openai/v1",
    ) -> None:
        """Initialize agentic router."""
        resolved_api_key = api_key or os.environ.get("GROQ_API_KEY")
        if not resolved_api_key:
            raise ValueError("GROQ_API_KEY required")

        # Temperature=0 ensures deterministic routing decisions
        self.generator = OpenAIChatGenerator(
            api_key=Secret.from_token(resolved_api_key),
            model=model,
            api_base_url=api_base_url,
            generation_kwargs={"temperature": 0, "max_tokens": 1024},
        )

        self.available_tools = [
            "retrieval",
            "web_search",
            "calculation",
            "reasoning",
        ]

    def select_tool(self, query: str) -> str:
        """Select the best tool for a query."""
        tools_str = ", ".join(self.available_tools)
        prompt = f"""Given this query: "{query}"

Select the BEST tool to answer it. Options: {tools_str}

- retrieval: For factual information from a knowledge base
- web_search: For current events, real-time information
- calculation: For mathematical or computational problems
- reasoning: For multi-step logic or analysis

Return ONLY the tool name."""

        messages = [ChatMessage.from_user(prompt)]
        response = self.generator.run(messages=messages)
        tool = response["replies"][0].text.strip().lower()
        
        if tool not in self.available_tools:
            tool = "retrieval"  # Fallback
        
        return tool

    def evaluate_answer_quality(
        self,
        query: str,
        answer: str,
        context: str = "",
    ) -> dict[str, Any]:
        """Evaluate generated answer quality.
        
        Returns:
            Dict with relevance, completeness, grounding scores (0-100),
            plus issues and suggestions lists.
        """
        prompt = f"""Evaluate this answer to the query.

Query: "{query}"
Answer: "{answer}"
Context: "{context}"

Assess:
1. Relevance (0-100): Does it answer the query?
2. Completeness (0-100): Is it sufficiently detailed?
3. Grounding (0-100): Is it grounded in the context?
4. Issues: List any problems (max 3)
5. Suggestions: List improvements (max 2)

Format as JSON:
{{"relevance": X, "completeness": X, "grounding": X, "issues": [...], "suggestions": [...]}}

Return ONLY the JSON."""

        messages = [ChatMessage.from_user(prompt)]
        response = self.generator.run(messages=messages)
        content = response["replies"][0].text
        
        return json.loads(content.strip())

    def self_reflect_loop(
        self,
        query: str,
        answer: str,
        context: str = "",
        max_iterations: int = 2,
        quality_threshold: int = 75,
    ) -> str:
        """Run self-reflection loop to iteratively improve answer."""
        current_answer = answer

        for iteration in range(max_iterations):
            eval_result = self.evaluate_answer_quality(query, current_answer, context)
            
            # Exit early if quality is acceptable
            avg_score = (
                eval_result.get("relevance", 0)
                + eval_result.get("completeness", 0)
                + eval_result.get("grounding", 0)
            ) / 3
            
            if avg_score >= quality_threshold:
                break
            
            # Refine answer based on feedback
            current_answer = self.refine_answer(query, current_answer, eval_result)
        
        return current_answer

Usage

from vectordb.haystack.components import AgenticRouter

router = AgenticRouter(model="llama-3.3-70b-versatile")

# Tool selection
tool = router.select_tool("What is quantum entanglement?")
# → "retrieval"

# Answer quality evaluation
quality = router.evaluate_answer_quality(query, answer, context)
# → {"relevance": 85, "completeness": 70, "grounding": 90, ...}

# Self-reflection loop
final_answer = router.self_reflect_loop(
    query, draft_answer, context, max_iterations=2
)

ContextCompressor

Reduces retrieved context to query-relevant fragments before generation.

Compression strategies

  • Abstractive: LLM generates a focused summary of the context relevant to the query
  • Extractive: LLM selects the N most relevant sentences from the original text
  • Relevance filtering: LLM evaluates each paragraph and drops those below a threshold
All methods fall back to returning the original context unchanged on LLM failure.

Implementation

src/vectordb/haystack/components/context_compressor.py
from haystack.components.generators.chat import OpenAIChatGenerator

class ContextCompressor:
    """Compress and summarize retrieved context."""

    def __init__(
        self,
        model: str = "llama-3.3-70b-versatile",
        api_key: str | None = None,
    ) -> None:
        """Initialize context compressor."""
        resolved_api_key = api_key or os.environ.get("GROQ_API_KEY")
        
        self.generator = OpenAIChatGenerator(
            api_key=Secret.from_token(resolved_api_key),
            model=model,
            api_base_url="https://api.groq.com/openai/v1",
            generation_kwargs={"temperature": 0, "max_tokens": 2048},
        )

    def compress_abstractive(
        self,
        context: str,
        query: str,
        max_tokens: int = 2048,
    ) -> str:
        """Abstractive compression using LLM summarization."""
        prompt = f"""Summarize the following context to answer this question: "{query}"

Keep only the most relevant information. Be concise.

Context:
{context}

Summary (max {max_tokens} tokens):"""

        messages = [ChatMessage.from_user(prompt)]
        response = self.generator.run(messages=messages)
        summary = response["replies"][0].text
        
        compression_ratio = len(context) / (len(summary) + 1)
        logger.info("Abstractive compression: %.2fx", compression_ratio)
        
        return summary

    def compress_extractive(
        self,
        context: str,
        query: str,
        num_sentences: int = 5,
    ) -> str:
        """Extractive compression: select key sentences."""
        prompt = f"""Extract the {num_sentences} most relevant sentences from the following context to answer: "{query}"

Return ONLY the selected sentences in order, without numbering."""
        
        # Implementation similar to abstractive
        pass

    def compress(
        self,
        context: str,
        query: str,
        compression_type: str = "abstractive",
        **kwargs: Any,
    ) -> str:
        """Compress context using specified technique."""
        if compression_type == "abstractive":
            return self.compress_abstractive(context, query, **kwargs)
        elif compression_type == "extractive":
            return self.compress_extractive(context, query, **kwargs)
        elif compression_type == "relevance_filter":
            return self.filter_by_relevance(context, query, **kwargs)
        else:
            raise ValueError(f"Unsupported compression type: {compression_type}")

Usage

from vectordb.haystack.components import ContextCompressor

compressor = ContextCompressor(model="llama-3.3-70b-versatile")

compressed = compressor.compress(
    context, query, compression_type="extractive", num_sentences=5
)

QueryEnhancer

Generates improved retrieval queries from the user’s original input.

Enhancement strategies

  • Multi-query: Generates N alternative phrasings of the original query (default N=3)
  • HyDE: Generates M hypothetical documents that would answer the query (default M=3)
  • Step-back: Generates a broader, more abstract version of the query

Implementation

src/vectordb/haystack/components/query_enhancer.py
class QueryEnhancer:
    """Enhance and expand queries using LLM-based techniques."""

    def __init__(
        self,
        model: str = "llama-3.3-70b-versatile",
        api_key: str | None = None,
    ) -> None:
        """Initialize query enhancer."""
        self.generator = OpenAIChatGenerator(
            api_key=Secret.from_token(api_key or os.environ.get("GROQ_API_KEY")),
            model=model,
            api_base_url="https://api.groq.com/openai/v1",
            generation_kwargs={"temperature": 0.7, "max_tokens": 1024},
        )

    def generate_multi_queries(
        self,
        query: str,
        num_queries: int = 3,
    ) -> list[str]:
        """Generate multiple query variations."""
        prompt = f"""Generate {num_queries} different queries that would help retrieve relevant information for: "{query}"

Return ONLY the queries, one per line, without numbering or extra text."""

        messages = [ChatMessage.from_user(prompt)]
        response = self.generator.run(messages=messages)
        content = response["replies"][0].text
        
        queries = [q.strip() for q in content.split("\n") if q.strip()]
        # Always include original query first
        return [query] + queries[:num_queries - 1]

    def generate_hypothetical_documents(
        self,
        query: str,
        num_docs: int = 3,
    ) -> list[str]:
        """Generate hypothetical relevant documents (HyDE)."""
        prompt = f"""Generate {num_docs} hypothetical document excerpts that would directly answer this question: "{query}"

Return ONLY the document excerpts, separated by "---", without numbering or extra text."""
        
        # Implementation returns list of hypothetical documents
        pass

    def enhance_query(
        self,
        query: str,
        enhancement_type: str = "multi_query",
        **kwargs: Any,
    ) -> list[str]:
        """Enhance query using specified technique."""
        if enhancement_type == "multi_query":
            return self.generate_multi_queries(query, **kwargs)
        elif enhancement_type == "hyde":
            return self.generate_hypothetical_documents(query, **kwargs)
        elif enhancement_type == "step_back":
            step_back = self.generate_step_back_query(query)
            return [query, step_back]
        else:
            raise ValueError(f"Unsupported enhancement type: {enhancement_type}")

Usage

from vectordb.haystack.components import QueryEnhancer

enhancer = QueryEnhancer(model="llama-3.3-70b-versatile")

queries = enhancer.enhance_query(
    "What causes inflation?", enhancement_type="multi_query", num_queries=3
)
# → ["What causes inflation?", "What drives rising prices?", "Factors behind monetary inflation"]

ResultMerger

Fuses results from multiple retrieval sources into a single ranked list.

Fusion strategies

  • RRF (Reciprocal Rank Fusion): Combines rankings using 1 / (k + rank) without requiring score normalization
  • Weighted fusion: Weights inverse-rank scores by explicit weights

Usage

See the Hybrid search page for detailed implementation examples.

LLM configuration

All LLM-based components use the Groq API via Haystack’s OpenAIChatGenerator:
generator = OpenAIChatGenerator(
    api_key=Secret.from_token(api_key),
    model="llama-3.3-70b-versatile",
    api_base_url="https://api.groq.com/openai/v1",
    generation_kwargs={"temperature": 0, "max_tokens": 1024},
)
Set the GROQ_API_KEY environment variable or pass api_key directly.

When to use components directly

  • Building a custom pipeline that does not fit existing feature module templates
  • Experimenting with one pipeline stage at a time
  • Combining components from different feature modules into a novel configuration

Common pitfalls

Over-composing before baseline validation: Build and validate the simplest pipeline first. Add components incrementally and measure the impact of each addition.
Inconsistent interfaces between custom stages: If you extend these components, maintain the same input/output conventions (Haystack Document objects, standard config dicts).
No tracing at component boundaries: Each component logs at INFO level. Set LOG_LEVEL=DEBUG to see detailed prompt and response content.

Next steps

Pipelines

Learn how to compose components into full pipelines

Semantic search

See components in action in semantic search pipelines

Build docs developers (and LLMs) love