Haystack pipelines provide a composable architecture for building RAG systems. This page covers pipeline design patterns, component composition, and production best practices.
Pipeline architecture
All Haystack pipelines in this module follow a three-stage pattern:
Indexing stage
Load documents, generate embeddings, and store in vector database
Retrieval stage
Embed query, retrieve candidates, apply post-processing (reranking, filtering, diversification)
Generation stage (optional)
Format context, call LLM, return answer
Standard indexing pipeline
All indexing pipelines follow this pattern:
class StandardIndexingPipeline :
"""Standard indexing pipeline pattern."""
def __init__ ( self , config_or_path : dict[ str , Any] | str ) -> None :
# 1. Load and validate configuration
self .config = ConfigLoader.load(config_or_path)
ConfigLoader.validate( self .config, "backend_name" )
# 2. Create embedder(s) from config
self .embedder = EmbedderFactory.create_document_embedder( self .config)
# For hybrid pipelines:
if "sparse" in self .config:
self .sparse_embedder = EmbedderFactory.create_sparse_document_embedder(
self .config
)
# 3. Initialize vector database connection
backend_config = self .config[ "backend_name" ]
self .db = VectorDB(
api_key = backend_config.get( "api_key" ),
index_name = backend_config.get( "index_name" ),
)
def run ( self ) -> dict[ str , Any]:
"""Execute indexing pipeline."""
# 1. Load documents from configured dataset
dl_config = self .config.get( "dataloader" , {})
loader = DataloaderCatalog.create(
dl_config.get( "type" , "triviaqa" ),
split = dl_config.get( "split" , "test" ),
limit = dl_config.get( "limit" ),
)
dataset = loader.load()
documents = dataset.to_haystack()
# 2. Generate embeddings
embedded_docs = self .embedder.run( documents = documents)[ "documents" ]
# For hybrid: add sparse embeddings
if hasattr ( self , "sparse_embedder" ):
embedded_docs = self .sparse_embedder.run(
documents = embedded_docs
)[ "documents" ]
# 3. Create collection/index and insert documents
self .db.create_collection(
collection_name = self .collection_name,
recreate = self .config.get( "recreate" , False ),
)
self .db.insert_documents(
documents = embedded_docs,
collection_name = self .collection_name,
)
return { "documents_indexed" : len (embedded_docs)}
Standard search pipeline
All search pipelines follow this pattern:
class StandardSearchPipeline :
"""Standard search pipeline pattern."""
def __init__ ( self , config_or_path : dict[ str , Any] | str ) -> None :
# 1. Load and validate configuration
self .config = ConfigLoader.load(config_or_path)
ConfigLoader.validate( self .config, "backend_name" )
# 2. Create text embedder(s)
self .embedder = EmbedderFactory.create_text_embedder( self .config)
# For hybrid pipelines:
if "sparse" in self .config:
self .sparse_embedder = EmbedderFactory.create_sparse_text_embedder(
self .config
)
# 3. Initialize database connection
backend_config = self .config[ "backend_name" ]
self .db = VectorDB(
api_key = backend_config.get( "api_key" ),
index_name = backend_config.get( "index_name" ),
)
# 4. Optional: Initialize RAG generator
self .rag_enabled = self .config.get( "rag" , {}).get( "enabled" , False )
self .generator = (
RAGHelper.create_generator( self .config) if self .rag_enabled else None
)
def search (
self ,
query : str ,
top_k : int = 10 ,
filters : dict[ str , Any] | None = None ,
) -> dict[ str , Any]:
"""Execute search pipeline."""
# 1. Embed query
query_result = self .embedder.run( text = query)
query_embedding = query_result[ "embedding" ]
# 2. Search vector database
documents = self .db.search(
query_embedding = query_embedding,
top_k = top_k * 2 , # Over-fetch for post-processing
collection_name = self .collection_name,
where = filters,
)
# 3. Post-processing: filtering, diversification, reranking
documents = DocumentFilter.apply(documents, filters)
documents = DiversificationHelper.apply(documents, self .config)
documents = documents[:top_k]
result = {
"documents" : documents,
"query" : query,
}
# 4. Optional: Generate RAG answer
if self .rag_enabled and self .generator and documents:
prompt = RAGHelper.format_prompt(query, documents)
gen_result = self .generator.run( prompt = prompt)
result[ "answer" ] = gen_result.get( "replies" , [ "" ])[ 0 ]
return result
Hybrid search pipeline pattern
Hybrid pipelines extend the standard pattern with dual retrieval and fusion:
class HybridSearchPipeline :
"""Hybrid search pipeline with RRF fusion."""
def search (
self ,
query : str ,
top_k : int = 10 ,
filters : dict[ str , Any] | None = None ,
) -> dict[ str , Any]:
# 1. Embed query with both dense and sparse embedders
dense_embedding = self .dense_embedder.run( text = query)[ "embedding" ]
sparse_embedding = self .sparse_embedder.run( text = query)[ "sparse_embedding" ]
# 2. Dual retrieval
dense_docs = self .db.dense_search(
query_embedding = dense_embedding,
top_k = top_k * 2 ,
)
sparse_docs = self .db.sparse_search(
query_sparse_embedding = sparse_embedding,
top_k = top_k * 2 ,
)
# 3. Fusion with RRF
from vectordb.haystack.components import ResultMerger
fused_docs = ResultMerger.rrf_fusion(
dense_docs = dense_docs,
sparse_docs = sparse_docs,
k = 60 ,
top_k = top_k,
)
return { "documents" : fused_docs, "query" : query}
Query enhancement pipeline pattern
Query enhancement pipelines generate multiple query variants:
class QueryEnhancementPipeline :
"""Search pipeline with query enhancement."""
def __init__ ( self , config_or_path : dict[ str , Any] | str ) -> None :
# Standard initialization
self .config = ConfigLoader.load(config_or_path)
self .embedder = EmbedderFactory.create_text_embedder( self .config)
self .db = VectorDB( ... )
# Add query enhancer
from vectordb.haystack.components import QueryEnhancer
self .enhancer = QueryEnhancer(
model = self .config.get( "query_enhancement" , {}).get(
"model" , "llama-3.3-70b-versatile"
)
)
def search (
self ,
query : str ,
top_k : int = 10 ,
) -> dict[ str , Any]:
# 1. Generate query variants
enhancement_type = self .config.get( "query_enhancement" , {}).get(
"type" , "multi_query"
)
queries = self .enhancer.enhance_query(
query, enhancement_type = enhancement_type
)
# 2. Search with each query variant
all_results = []
for q in queries:
query_embedding = self .embedder.run( text = q)[ "embedding" ]
docs = self .db.search(
query_embedding = query_embedding,
top_k = top_k * 2 ,
)
all_results.append(docs)
# 3. Fuse results from all queries
from vectordb.haystack.components import ResultMerger
fused_docs = ResultMerger.rrf_fusion_many(
ranked_lists = all_results,
k = 60 ,
top_k = top_k,
)
return { "documents" : fused_docs, "query" : query}
Agentic RAG pipeline pattern
Agentic pipelines use self-reflection loops:
class AgenticRAGPipeline :
"""Agentic RAG with self-reflection."""
def __init__ ( self , config_or_path : dict[ str , Any] | str ) -> None :
# Standard initialization
self .config = ConfigLoader.load(config_or_path)
self .embedder = EmbedderFactory.create_text_embedder( self .config)
self .db = VectorDB( ... )
self .generator = RAGHelper.create_generator( self .config)
# Add agentic router
from vectordb.haystack.components import AgenticRouter
self .router = AgenticRouter(
model = self .config.get( "agentic" , {}).get(
"model" , "llama-3.3-70b-versatile"
)
)
def search (
self ,
query : str ,
top_k : int = 10 ,
max_iterations : int = 2 ,
) -> dict[ str , Any]:
# 1. Tool selection
tool = self .router.select_tool(query)
if tool != "retrieval" :
# Route to other tools (web_search, calculation, etc.)
pass
# 2. Standard retrieval
query_embedding = self .embedder.run( text = query)[ "embedding" ]
documents = self .db.search(
query_embedding = query_embedding,
top_k = top_k,
)
# 3. Generate initial answer
context = " \n\n " .join([doc.content for doc in documents])
prompt = RAGHelper.format_prompt(query, documents)
draft_answer = self .generator.run( prompt = prompt)[ "replies" ][ 0 ]
# 4. Self-reflection loop
final_answer = self .router.self_reflect_loop(
query = query,
answer = draft_answer,
context = context,
max_iterations = max_iterations,
)
return {
"documents" : documents,
"query" : query,
"answer" : final_answer,
}
Component composition utilities
EmbedderFactory
Creates and warms up embedders from config:
from vectordb.haystack.utils import EmbedderFactory
# Dense embedders
doc_embedder = EmbedderFactory.create_document_embedder(config)
text_embedder = EmbedderFactory.create_text_embedder(config)
# Sparse embedders
sparse_doc_embedder = EmbedderFactory.create_sparse_document_embedder(config)
sparse_text_embedder = EmbedderFactory.create_sparse_text_embedder(config)
# Get embedding dimension
dim = EmbedderFactory.get_embedding_dimension(doc_embedder)
RAGHelper
Creates generators and formats prompts:
from vectordb.haystack.utils import RAGHelper
# Create generator
generator = RAGHelper.create_generator(config)
# Format prompt
prompt = RAGHelper.format_prompt(query, documents)
# Generate answer
answer = RAGHelper.generate(generator, query, documents)
ConfigLoader
Loads and validates YAML configs:
from vectordb.haystack.utils import ConfigLoader
# Load config from file or dict
config = ConfigLoader.load( "config.yaml" )
config = ConfigLoader.load({ "pinecone" : { ... }, "embeddings" : { ... }})
# Validate required sections
ConfigLoader.validate(config, "pinecone" ) # Checks for required keys
Production best practices
Configuration management
Use environment variables for secrets: api_key: "${PINECONE_API_KEY}"
Keep configs versioned with code
Use separate configs for dev/staging/prod environments
Error handling
All components have fallback behavior (e.g., compression returns original context on failure)
Log failures at ERROR level with context
Return partial results when possible rather than failing entirely
Warm up embedders during initialization to avoid cold-start latency
Over-fetch during retrieval (2x top_k) to allow for diversification and filtering
Batch document embedding with configurable batch_size
Use appropriate device setting (“cuda” for GPU, “cpu” for CPU)
Monitoring
Log compression ratios for context compression
Log retrieval counts at each pipeline stage
Track quality scores in agentic pipelines
Set LOG_LEVEL=DEBUG to see detailed prompt/response content
Common patterns
Multi-stage retrieval
Fast first-pass retrieval (semantic or hybrid)
Reranking with cross-encoder
Diversification or MMR
Final top-k selection
Cost optimization
Smaller candidate pool (lower top_k)
Context compression before generation
Cheaper model tiers for non-critical stages
Quality improvement
Start with semantic search baseline
Add hybrid search for keyword coverage
Add reranking for precision
Add query enhancement for recall
Add agentic loop for complex queries
Next steps
Semantic search See the standard pipeline pattern in action
Hybrid search Learn about dual retrieval and fusion patterns
Components Explore individual pipeline components