The components directory contains reusable, self-contained Haystack pipeline building blocks that implement specific retrieval and generation sub-tasks. Feature pipelines compose these components rather than reimplementing the same logic.
Component overview
AgenticRouter LLM-based decision-making for agentic RAG with tool selection and self-reflection
ContextCompressor Reduces retrieved context using abstractive, extractive, or relevance filtering
QueryEnhancer Multi-query, HyDE, and step-back query expansion
ResultMerger RRF and weighted fusion for hybrid search results
AgenticRouter
An LLM-based decision-making component for agentic RAG pipelines.
Capabilities
Tool selection : Given a query, selects the appropriate processing path ("retrieval", "web_search", "calculation", or "reasoning")
Answer quality evaluation : Sends the query, draft answer, and retrieved context to the LLM and receives a JSON-structured assessment
Refinement decision : Computes whether the average quality score falls below a threshold
Answer refinement : Given issues and suggestions from evaluation, sends a targeted revision request to the LLM
Self-reflection loop : Orchestrates the full evaluate-refine cycle for up to max_iterations rounds
Implementation
src/vectordb/haystack/components/agentic_router.py
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
from haystack.utils import Secret
class AgenticRouter :
"""Route and orchestrate RAG with agent-like behavior."""
def __init__ (
self ,
model : str = "llama-3.3-70b-versatile" ,
api_key : str | None = None ,
api_base_url : str = "https://api.groq.com/openai/v1" ,
) -> None :
"""Initialize agentic router."""
resolved_api_key = api_key or os.environ.get( "GROQ_API_KEY" )
if not resolved_api_key:
raise ValueError ( "GROQ_API_KEY required" )
# Temperature=0 ensures deterministic routing decisions
self .generator = OpenAIChatGenerator(
api_key = Secret.from_token(resolved_api_key),
model = model,
api_base_url = api_base_url,
generation_kwargs = { "temperature" : 0 , "max_tokens" : 1024 },
)
self .available_tools = [
"retrieval" ,
"web_search" ,
"calculation" ,
"reasoning" ,
]
def select_tool ( self , query : str ) -> str :
"""Select the best tool for a query."""
tools_str = ", " .join( self .available_tools)
prompt = f """Given this query: " { query } "
Select the BEST tool to answer it. Options: { tools_str }
- retrieval: For factual information from a knowledge base
- web_search: For current events, real-time information
- calculation: For mathematical or computational problems
- reasoning: For multi-step logic or analysis
Return ONLY the tool name."""
messages = [ChatMessage.from_user(prompt)]
response = self .generator.run( messages = messages)
tool = response[ "replies" ][ 0 ].text.strip().lower()
if tool not in self .available_tools:
tool = "retrieval" # Fallback
return tool
def evaluate_answer_quality (
self ,
query : str ,
answer : str ,
context : str = "" ,
) -> dict[ str , Any]:
"""Evaluate generated answer quality.
Returns:
Dict with relevance, completeness, grounding scores (0-100),
plus issues and suggestions lists.
"""
prompt = f """Evaluate this answer to the query.
Query: " { query } "
Answer: " { answer } "
Context: " { context } "
Assess:
1. Relevance (0-100): Does it answer the query?
2. Completeness (0-100): Is it sufficiently detailed?
3. Grounding (0-100): Is it grounded in the context?
4. Issues: List any problems (max 3)
5. Suggestions: List improvements (max 2)
Format as JSON:
{{ "relevance": X, "completeness": X, "grounding": X, "issues": [...], "suggestions": [...] }}
Return ONLY the JSON."""
messages = [ChatMessage.from_user(prompt)]
response = self .generator.run( messages = messages)
content = response[ "replies" ][ 0 ].text
return json.loads(content.strip())
def self_reflect_loop (
self ,
query : str ,
answer : str ,
context : str = "" ,
max_iterations : int = 2 ,
quality_threshold : int = 75 ,
) -> str :
"""Run self-reflection loop to iteratively improve answer."""
current_answer = answer
for iteration in range (max_iterations):
eval_result = self .evaluate_answer_quality(query, current_answer, context)
# Exit early if quality is acceptable
avg_score = (
eval_result.get( "relevance" , 0 )
+ eval_result.get( "completeness" , 0 )
+ eval_result.get( "grounding" , 0 )
) / 3
if avg_score >= quality_threshold:
break
# Refine answer based on feedback
current_answer = self .refine_answer(query, current_answer, eval_result)
return current_answer
Usage
from vectordb.haystack.components import AgenticRouter
router = AgenticRouter( model = "llama-3.3-70b-versatile" )
# Tool selection
tool = router.select_tool( "What is quantum entanglement?" )
# → "retrieval"
# Answer quality evaluation
quality = router.evaluate_answer_quality(query, answer, context)
# → {"relevance": 85, "completeness": 70, "grounding": 90, ...}
# Self-reflection loop
final_answer = router.self_reflect_loop(
query, draft_answer, context, max_iterations = 2
)
ContextCompressor
Reduces retrieved context to query-relevant fragments before generation.
Compression strategies
Abstractive : LLM generates a focused summary of the context relevant to the query
Extractive : LLM selects the N most relevant sentences from the original text
Relevance filtering : LLM evaluates each paragraph and drops those below a threshold
All methods fall back to returning the original context unchanged on LLM failure.
Implementation
src/vectordb/haystack/components/context_compressor.py
from haystack.components.generators.chat import OpenAIChatGenerator
class ContextCompressor :
"""Compress and summarize retrieved context."""
def __init__ (
self ,
model : str = "llama-3.3-70b-versatile" ,
api_key : str | None = None ,
) -> None :
"""Initialize context compressor."""
resolved_api_key = api_key or os.environ.get( "GROQ_API_KEY" )
self .generator = OpenAIChatGenerator(
api_key = Secret.from_token(resolved_api_key),
model = model,
api_base_url = "https://api.groq.com/openai/v1" ,
generation_kwargs = { "temperature" : 0 , "max_tokens" : 2048 },
)
def compress_abstractive (
self ,
context : str ,
query : str ,
max_tokens : int = 2048 ,
) -> str :
"""Abstractive compression using LLM summarization."""
prompt = f """Summarize the following context to answer this question: " { query } "
Keep only the most relevant information. Be concise.
Context:
{ context }
Summary (max { max_tokens } tokens):"""
messages = [ChatMessage.from_user(prompt)]
response = self .generator.run( messages = messages)
summary = response[ "replies" ][ 0 ].text
compression_ratio = len (context) / ( len (summary) + 1 )
logger.info( "Abstractive compression: %.2f x" , compression_ratio)
return summary
def compress_extractive (
self ,
context : str ,
query : str ,
num_sentences : int = 5 ,
) -> str :
"""Extractive compression: select key sentences."""
prompt = f """Extract the { num_sentences } most relevant sentences from the following context to answer: " { query } "
Return ONLY the selected sentences in order, without numbering."""
# Implementation similar to abstractive
pass
def compress (
self ,
context : str ,
query : str ,
compression_type : str = "abstractive" ,
** kwargs : Any,
) -> str :
"""Compress context using specified technique."""
if compression_type == "abstractive" :
return self .compress_abstractive(context, query, ** kwargs)
elif compression_type == "extractive" :
return self .compress_extractive(context, query, ** kwargs)
elif compression_type == "relevance_filter" :
return self .filter_by_relevance(context, query, ** kwargs)
else :
raise ValueError ( f "Unsupported compression type: { compression_type } " )
Usage
from vectordb.haystack.components import ContextCompressor
compressor = ContextCompressor( model = "llama-3.3-70b-versatile" )
compressed = compressor.compress(
context, query, compression_type = "extractive" , num_sentences = 5
)
QueryEnhancer
Generates improved retrieval queries from the user’s original input.
Enhancement strategies
Multi-query : Generates N alternative phrasings of the original query (default N=3)
HyDE : Generates M hypothetical documents that would answer the query (default M=3)
Step-back : Generates a broader, more abstract version of the query
Implementation
src/vectordb/haystack/components/query_enhancer.py
class QueryEnhancer :
"""Enhance and expand queries using LLM-based techniques."""
def __init__ (
self ,
model : str = "llama-3.3-70b-versatile" ,
api_key : str | None = None ,
) -> None :
"""Initialize query enhancer."""
self .generator = OpenAIChatGenerator(
api_key = Secret.from_token(api_key or os.environ.get( "GROQ_API_KEY" )),
model = model,
api_base_url = "https://api.groq.com/openai/v1" ,
generation_kwargs = { "temperature" : 0.7 , "max_tokens" : 1024 },
)
def generate_multi_queries (
self ,
query : str ,
num_queries : int = 3 ,
) -> list[ str ]:
"""Generate multiple query variations."""
prompt = f """Generate { num_queries } different queries that would help retrieve relevant information for: " { query } "
Return ONLY the queries, one per line, without numbering or extra text."""
messages = [ChatMessage.from_user(prompt)]
response = self .generator.run( messages = messages)
content = response[ "replies" ][ 0 ].text
queries = [q.strip() for q in content.split( " \n " ) if q.strip()]
# Always include original query first
return [query] + queries[:num_queries - 1 ]
def generate_hypothetical_documents (
self ,
query : str ,
num_docs : int = 3 ,
) -> list[ str ]:
"""Generate hypothetical relevant documents (HyDE)."""
prompt = f """Generate { num_docs } hypothetical document excerpts that would directly answer this question: " { query } "
Return ONLY the document excerpts, separated by "---", without numbering or extra text."""
# Implementation returns list of hypothetical documents
pass
def enhance_query (
self ,
query : str ,
enhancement_type : str = "multi_query" ,
** kwargs : Any,
) -> list[ str ]:
"""Enhance query using specified technique."""
if enhancement_type == "multi_query" :
return self .generate_multi_queries(query, ** kwargs)
elif enhancement_type == "hyde" :
return self .generate_hypothetical_documents(query, ** kwargs)
elif enhancement_type == "step_back" :
step_back = self .generate_step_back_query(query)
return [query, step_back]
else :
raise ValueError ( f "Unsupported enhancement type: { enhancement_type } " )
Usage
from vectordb.haystack.components import QueryEnhancer
enhancer = QueryEnhancer( model = "llama-3.3-70b-versatile" )
queries = enhancer.enhance_query(
"What causes inflation?" , enhancement_type = "multi_query" , num_queries = 3
)
# → ["What causes inflation?", "What drives rising prices?", "Factors behind monetary inflation"]
ResultMerger
Fuses results from multiple retrieval sources into a single ranked list.
Fusion strategies
RRF (Reciprocal Rank Fusion) : Combines rankings using 1 / (k + rank) without requiring score normalization
Weighted fusion : Weights inverse-rank scores by explicit weights
Usage
See the Hybrid search page for detailed implementation examples.
LLM configuration
All LLM-based components use the Groq API via Haystack’s OpenAIChatGenerator:
generator = OpenAIChatGenerator(
api_key = Secret.from_token(api_key),
model = "llama-3.3-70b-versatile" ,
api_base_url = "https://api.groq.com/openai/v1" ,
generation_kwargs = { "temperature" : 0 , "max_tokens" : 1024 },
)
Set the GROQ_API_KEY environment variable or pass api_key directly.
When to use components directly
Building a custom pipeline that does not fit existing feature module templates
Experimenting with one pipeline stage at a time
Combining components from different feature modules into a novel configuration
Common pitfalls
Over-composing before baseline validation : Build and validate the simplest pipeline first. Add components incrementally and measure the impact of each addition.
Inconsistent interfaces between custom stages : If you extend these components, maintain the same input/output conventions (Haystack Document objects, standard config dicts).
No tracing at component boundaries : Each component logs at INFO level. Set LOG_LEVEL=DEBUG to see detailed prompt and response content.
Next steps
Pipelines Learn how to compose components into full pipelines
Semantic search See components in action in semantic search pipelines