Skip to main content
Haystack pipelines provide a composable architecture for building RAG systems. This page covers pipeline design patterns, component composition, and production best practices.

Pipeline architecture

All Haystack pipelines in this module follow a three-stage pattern:
1

Indexing stage

Load documents, generate embeddings, and store in vector database
2

Retrieval stage

Embed query, retrieve candidates, apply post-processing (reranking, filtering, diversification)
3

Generation stage (optional)

Format context, call LLM, return answer

Standard indexing pipeline

All indexing pipelines follow this pattern:
class StandardIndexingPipeline:
    """Standard indexing pipeline pattern."""

    def __init__(self, config_or_path: dict[str, Any] | str) -> None:
        # 1. Load and validate configuration
        self.config = ConfigLoader.load(config_or_path)
        ConfigLoader.validate(self.config, "backend_name")

        # 2. Create embedder(s) from config
        self.embedder = EmbedderFactory.create_document_embedder(self.config)
        
        # For hybrid pipelines:
        if "sparse" in self.config:
            self.sparse_embedder = EmbedderFactory.create_sparse_document_embedder(
                self.config
            )

        # 3. Initialize vector database connection
        backend_config = self.config["backend_name"]
        self.db = VectorDB(
            api_key=backend_config.get("api_key"),
            index_name=backend_config.get("index_name"),
        )

    def run(self) -> dict[str, Any]:
        """Execute indexing pipeline."""
        # 1. Load documents from configured dataset
        dl_config = self.config.get("dataloader", {})
        loader = DataloaderCatalog.create(
            dl_config.get("type", "triviaqa"),
            split=dl_config.get("split", "test"),
            limit=dl_config.get("limit"),
        )
        dataset = loader.load()
        documents = dataset.to_haystack()

        # 2. Generate embeddings
        embedded_docs = self.embedder.run(documents=documents)["documents"]
        
        # For hybrid: add sparse embeddings
        if hasattr(self, "sparse_embedder"):
            embedded_docs = self.sparse_embedder.run(
                documents=embedded_docs
            )["documents"]

        # 3. Create collection/index and insert documents
        self.db.create_collection(
            collection_name=self.collection_name,
            recreate=self.config.get("recreate", False),
        )
        
        self.db.insert_documents(
            documents=embedded_docs,
            collection_name=self.collection_name,
        )

        return {"documents_indexed": len(embedded_docs)}

Standard search pipeline

All search pipelines follow this pattern:
class StandardSearchPipeline:
    """Standard search pipeline pattern."""

    def __init__(self, config_or_path: dict[str, Any] | str) -> None:
        # 1. Load and validate configuration
        self.config = ConfigLoader.load(config_or_path)
        ConfigLoader.validate(self.config, "backend_name")

        # 2. Create text embedder(s)
        self.embedder = EmbedderFactory.create_text_embedder(self.config)
        
        # For hybrid pipelines:
        if "sparse" in self.config:
            self.sparse_embedder = EmbedderFactory.create_sparse_text_embedder(
                self.config
            )

        # 3. Initialize database connection
        backend_config = self.config["backend_name"]
        self.db = VectorDB(
            api_key=backend_config.get("api_key"),
            index_name=backend_config.get("index_name"),
        )

        # 4. Optional: Initialize RAG generator
        self.rag_enabled = self.config.get("rag", {}).get("enabled", False)
        self.generator = (
            RAGHelper.create_generator(self.config) if self.rag_enabled else None
        )

    def search(
        self,
        query: str,
        top_k: int = 10,
        filters: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        """Execute search pipeline."""
        # 1. Embed query
        query_result = self.embedder.run(text=query)
        query_embedding = query_result["embedding"]

        # 2. Search vector database
        documents = self.db.search(
            query_embedding=query_embedding,
            top_k=top_k * 2,  # Over-fetch for post-processing
            collection_name=self.collection_name,
            where=filters,
        )

        # 3. Post-processing: filtering, diversification, reranking
        documents = DocumentFilter.apply(documents, filters)
        documents = DiversificationHelper.apply(documents, self.config)
        documents = documents[:top_k]

        result = {
            "documents": documents,
            "query": query,
        }

        # 4. Optional: Generate RAG answer
        if self.rag_enabled and self.generator and documents:
            prompt = RAGHelper.format_prompt(query, documents)
            gen_result = self.generator.run(prompt=prompt)
            result["answer"] = gen_result.get("replies", [""])[0]

        return result

Hybrid search pipeline pattern

Hybrid pipelines extend the standard pattern with dual retrieval and fusion:
class HybridSearchPipeline:
    """Hybrid search pipeline with RRF fusion."""

    def search(
        self,
        query: str,
        top_k: int = 10,
        filters: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        # 1. Embed query with both dense and sparse embedders
        dense_embedding = self.dense_embedder.run(text=query)["embedding"]
        sparse_embedding = self.sparse_embedder.run(text=query)["sparse_embedding"]

        # 2. Dual retrieval
        dense_docs = self.db.dense_search(
            query_embedding=dense_embedding,
            top_k=top_k * 2,
        )
        sparse_docs = self.db.sparse_search(
            query_sparse_embedding=sparse_embedding,
            top_k=top_k * 2,
        )

        # 3. Fusion with RRF
        from vectordb.haystack.components import ResultMerger
        
        fused_docs = ResultMerger.rrf_fusion(
            dense_docs=dense_docs,
            sparse_docs=sparse_docs,
            k=60,
            top_k=top_k,
        )

        return {"documents": fused_docs, "query": query}

Query enhancement pipeline pattern

Query enhancement pipelines generate multiple query variants:
class QueryEnhancementPipeline:
    """Search pipeline with query enhancement."""

    def __init__(self, config_or_path: dict[str, Any] | str) -> None:
        # Standard initialization
        self.config = ConfigLoader.load(config_or_path)
        self.embedder = EmbedderFactory.create_text_embedder(self.config)
        self.db = VectorDB(...)
        
        # Add query enhancer
        from vectordb.haystack.components import QueryEnhancer
        self.enhancer = QueryEnhancer(
            model=self.config.get("query_enhancement", {}).get(
                "model", "llama-3.3-70b-versatile"
            )
        )

    def search(
        self,
        query: str,
        top_k: int = 10,
    ) -> dict[str, Any]:
        # 1. Generate query variants
        enhancement_type = self.config.get("query_enhancement", {}).get(
            "type", "multi_query"
        )
        queries = self.enhancer.enhance_query(
            query, enhancement_type=enhancement_type
        )

        # 2. Search with each query variant
        all_results = []
        for q in queries:
            query_embedding = self.embedder.run(text=q)["embedding"]
            docs = self.db.search(
                query_embedding=query_embedding,
                top_k=top_k * 2,
            )
            all_results.append(docs)

        # 3. Fuse results from all queries
        from vectordb.haystack.components import ResultMerger
        fused_docs = ResultMerger.rrf_fusion_many(
            ranked_lists=all_results,
            k=60,
            top_k=top_k,
        )

        return {"documents": fused_docs, "query": query}

Agentic RAG pipeline pattern

Agentic pipelines use self-reflection loops:
class AgenticRAGPipeline:
    """Agentic RAG with self-reflection."""

    def __init__(self, config_or_path: dict[str, Any] | str) -> None:
        # Standard initialization
        self.config = ConfigLoader.load(config_or_path)
        self.embedder = EmbedderFactory.create_text_embedder(self.config)
        self.db = VectorDB(...)
        self.generator = RAGHelper.create_generator(self.config)
        
        # Add agentic router
        from vectordb.haystack.components import AgenticRouter
        self.router = AgenticRouter(
            model=self.config.get("agentic", {}).get(
                "model", "llama-3.3-70b-versatile"
            )
        )

    def search(
        self,
        query: str,
        top_k: int = 10,
        max_iterations: int = 2,
    ) -> dict[str, Any]:
        # 1. Tool selection
        tool = self.router.select_tool(query)
        
        if tool != "retrieval":
            # Route to other tools (web_search, calculation, etc.)
            pass

        # 2. Standard retrieval
        query_embedding = self.embedder.run(text=query)["embedding"]
        documents = self.db.search(
            query_embedding=query_embedding,
            top_k=top_k,
        )

        # 3. Generate initial answer
        context = "\n\n".join([doc.content for doc in documents])
        prompt = RAGHelper.format_prompt(query, documents)
        draft_answer = self.generator.run(prompt=prompt)["replies"][0]

        # 4. Self-reflection loop
        final_answer = self.router.self_reflect_loop(
            query=query,
            answer=draft_answer,
            context=context,
            max_iterations=max_iterations,
        )

        return {
            "documents": documents,
            "query": query,
            "answer": final_answer,
        }

Component composition utilities

EmbedderFactory

Creates and warms up embedders from config:
from vectordb.haystack.utils import EmbedderFactory

# Dense embedders
doc_embedder = EmbedderFactory.create_document_embedder(config)
text_embedder = EmbedderFactory.create_text_embedder(config)

# Sparse embedders
sparse_doc_embedder = EmbedderFactory.create_sparse_document_embedder(config)
sparse_text_embedder = EmbedderFactory.create_sparse_text_embedder(config)

# Get embedding dimension
dim = EmbedderFactory.get_embedding_dimension(doc_embedder)

RAGHelper

Creates generators and formats prompts:
from vectordb.haystack.utils import RAGHelper

# Create generator
generator = RAGHelper.create_generator(config)

# Format prompt
prompt = RAGHelper.format_prompt(query, documents)

# Generate answer
answer = RAGHelper.generate(generator, query, documents)

ConfigLoader

Loads and validates YAML configs:
from vectordb.haystack.utils import ConfigLoader

# Load config from file or dict
config = ConfigLoader.load("config.yaml")
config = ConfigLoader.load({"pinecone": {...}, "embeddings": {...}})

# Validate required sections
ConfigLoader.validate(config, "pinecone")  # Checks for required keys

Production best practices

Configuration management

Use environment variables for secrets: api_key: "${PINECONE_API_KEY}"
Keep configs versioned with code
Use separate configs for dev/staging/prod environments

Error handling

All components have fallback behavior (e.g., compression returns original context on failure)
Log failures at ERROR level with context
Return partial results when possible rather than failing entirely

Performance optimization

Warm up embedders during initialization to avoid cold-start latency
Over-fetch during retrieval (2x top_k) to allow for diversification and filtering
Batch document embedding with configurable batch_size
Use appropriate device setting (“cuda” for GPU, “cpu” for CPU)

Monitoring

Log compression ratios for context compression
Log retrieval counts at each pipeline stage
Track quality scores in agentic pipelines
Set LOG_LEVEL=DEBUG to see detailed prompt/response content

Common patterns

Multi-stage retrieval

  1. Fast first-pass retrieval (semantic or hybrid)
  2. Reranking with cross-encoder
  3. Diversification or MMR
  4. Final top-k selection

Cost optimization

  1. Smaller candidate pool (lower top_k)
  2. Context compression before generation
  3. Cheaper model tiers for non-critical stages

Quality improvement

  1. Start with semantic search baseline
  2. Add hybrid search for keyword coverage
  3. Add reranking for precision
  4. Add query enhancement for recall
  5. Add agentic loop for complex queries

Next steps

Semantic search

See the standard pipeline pattern in action

Hybrid search

Learn about dual retrieval and fusion patterns

Components

Explore individual pipeline components

Build docs developers (and LLMs) love