Skip to main content

Objectives

By the end of this lab you will be able to:
  • Integrate Azure OpenAI embedding models for text vectorization
  • Implement pgvector for efficient similarity search operations
  • Build semantic search tools for natural language product queries
  • Create hybrid search combining traditional keyword and vector search
  • Optimize vector queries for production performance
  • Design recommendation systems using embedding similarity

Prerequisites

  • Completed Lab 6: Tool Development
  • Azure OpenAI service deployed with text-embedding-3-small model
  • product_embeddings table created (from Lab 4)

How semantic search works

User Query: "comfortable running shoes for rainy weather"


Azure OpenAI text-embedding-3-small
         │  1536-dimensional vector

pgvector cosine similarity search
  embedding <=> query_vector


Ranked results:
  1. Nike Air Zoom Trail (0.89)
  2. Adidas Terrex Gore-Tex (0.85)
  3. Salomon Speedcross (0.82)
Semantic search finds products that are conceptually similar to the query, even when the exact words don’t appear in product descriptions. This enables queries like “waterproof outdoor footwear” to match products described as “rain-resistant hiking shoes.”

Step 1: Embedding manager

# mcp_server/embeddings/embedding_manager.py
import asyncio
import hashlib
from typing import List, Optional
from datetime import datetime, timedelta
from azure.ai.projects.aio import AIProjectClient
from azure.identity.aio import DefaultAzureCredential
from azure.core.exceptions import HttpResponseError
import logging

logger = logging.getLogger(__name__)

class EmbeddingManager:
    """Manage text embeddings for semantic search."""

    def __init__(self, project_endpoint: str, deployment_name: str = "text-embedding-3-small"):
        self.project_endpoint = project_endpoint
        self.deployment_name = deployment_name
        self.credential = DefaultAzureCredential()
        self.client = None
        self.embedding_dimension = 1536
        self.batch_size = 100

        # In-memory embedding cache (24-hour TTL)
        self.embedding_cache = {}
        self.cache_ttl = timedelta(hours=24)

    async def initialize(self):
        self.client = AIProjectClient(
            endpoint=self.project_endpoint,
            credential=self.credential
        )
        # Validate connection on startup
        test = await self.generate_embedding("test connection")
        if len(test) != self.embedding_dimension:
            raise ValueError(f"Unexpected embedding dimension: {len(test)}")
        logger.info("Embedding manager initialized")

    async def generate_embedding(self, text: str, use_cache: bool = True) -> List[float]:
        """Generate embedding for a single text string."""
        if not text or not text.strip():
            raise ValueError("Text cannot be empty")

        cache_key = self._get_cache_key(text)
        if use_cache:
            cached = self._get_cached_embedding(cache_key)
            if cached:
                return cached

        if not self.client:
            await self.initialize()

        try:
            response = await self.client.embeddings.create(
                model=self.deployment_name,
                input=text.strip()
            )
            embedding = response.data[0].embedding

            if use_cache:
                self._cache_embedding(cache_key, embedding)

            return embedding

        except HttpResponseError as e:
            raise Exception(f"Embedding generation failed: {e}")

    async def generate_embeddings_batch(
        self, texts: List[str], use_cache: bool = True
    ) -> List[List[float]]:
        """Generate embeddings for multiple texts with batching and caching."""
        if not texts:
            return []

        embeddings = [None] * len(texts)
        cache_misses = []
        cache_miss_indices = []

        for i, text in enumerate(texts):
            if not text or not text.strip():
                embeddings[i] = []
                continue
            if use_cache:
                cached = self._get_cached_embedding(self._get_cache_key(text))
                if cached:
                    embeddings[i] = cached
                    continue
            cache_misses.append(text.strip())
            cache_miss_indices.append(i)

        if cache_misses:
            for batch_start in range(0, len(cache_misses), self.batch_size):
                batch = cache_misses[batch_start:batch_start + self.batch_size]
                response = await self.client.embeddings.create(
                    model=self.deployment_name, input=batch
                )
                for j, emb_data in enumerate(response.data):
                    idx = cache_miss_indices[batch_start + j]
                    embeddings[idx] = emb_data.embedding
                    if use_cache:
                        self._cache_embedding(self._get_cache_key(batch[j]), emb_data.embedding)
                if batch_start + self.batch_size < len(cache_misses):
                    await asyncio.sleep(0.1)  # Rate limiting

        return embeddings

    def _get_cache_key(self, text: str) -> str:
        return hashlib.sha256(f"{self.deployment_name}:{text.strip()}".encode()).hexdigest()

    def _get_cached_embedding(self, cache_key: str) -> Optional[List[float]]:
        if cache_key in self.embedding_cache:
            entry = self.embedding_cache[cache_key]
            if datetime.now() - entry['timestamp'] < self.cache_ttl:
                return entry['embedding']
            del self.embedding_cache[cache_key]
        return None

    def _cache_embedding(self, cache_key: str, embedding: List[float]):
        if len(self.embedding_cache) > 10000:
            # Evict oldest 1000 entries
            oldest = sorted(self.embedding_cache.keys(),
                            key=lambda k: self.embedding_cache[k]['timestamp'])[:1000]
            for key in oldest:
                del self.embedding_cache[key]
        self.embedding_cache[cache_key] = {'embedding': embedding, 'timestamp': datetime.now()}

    async def cleanup(self):
        if self.client:
            await self.client.close()

Step 2: Product embedding pipeline

# mcp_server/embeddings/product_embedder.py
import asyncpg
from typing import List, Dict, Any
import logging

logger = logging.getLogger(__name__)

class ProductEmbedder:
    """Generate and manage product embeddings for semantic search."""

    def __init__(self, db_provider, embedding_manager):
        self.db_provider = db_provider
        self.embedding_manager = embedding_manager
        self.text_template = "{product_name} {brand} {description} {category} {tags}"

    async def generate_product_embeddings(
        self, store_id: str, batch_size: int = 50, force_regenerate: bool = False
    ) -> Dict[str, Any]:
        """Generate embeddings for all products in a store that need updating."""
        async with self.db_provider.get_connection() as conn:
            await conn.execute("SELECT retail.set_store_context($1)", store_id)

            # Only fetch products without embeddings (or outdated ones)
            products_query = """
                SELECT
                    p.product_id,
                    p.product_name,
                    p.product_description,
                    p.brand,
                    pc.category_name,
                    array_to_string(p.tags, ' ') as tags_text
                FROM retail.products p
                LEFT JOIN retail.product_categories pc ON p.category_id = pc.category_id
                LEFT JOIN retail.product_embeddings pe ON p.product_id = pe.product_id
                WHERE p.is_active = TRUE
                  AND (pe.product_id IS NULL OR pe.updated_at < p.updated_at)
                ORDER BY p.created_at DESC
            """
            products = await conn.fetch(products_query)

            if not products:
                return {'success': True, 'message': 'No products need embedding generation',
                        'processed_count': 0}

            logger.info(f"Generating embeddings for {len(products)} products in {store_id}")
            processed_count = 0

            for i in range(0, len(products), batch_size):
                batch = products[i:i + batch_size]
                await self._process_product_batch(conn, batch, store_id)
                processed_count += len(batch)
                logger.info(f"Processed {processed_count}/{len(products)}")

            return {'success': True, 'processed_count': processed_count, 'store_id': store_id}

    def _create_product_text(self, product: Dict[str, Any]) -> str:
        """Combine product fields into a single searchable text string."""
        return ' '.join(self.text_template.format(
            product_name=product.get('product_name') or '',
            brand=product.get('brand') or '',
            description=product.get('product_description') or '',
            category=product.get('category_name') or '',
            tags=product.get('tags_text') or ''
        ).split())

    async def _process_product_batch(self, conn, products, store_id: str):
        texts = [self._create_product_text(dict(p)) for p in products]
        product_ids = [p['product_id'] for p in products]
        embeddings = await self.embedding_manager.generate_embeddings_batch(texts)

        for product_id, text, embedding in zip(product_ids, texts, embeddings):
            if embedding:
                embedding_vector = f"[{','.join(map(str, embedding))}]"
                await conn.execute("""
                    INSERT INTO retail.product_embeddings
                        (product_id, store_id, embedding_text, embedding, embedding_model)
                    VALUES ($1, $2, $3, $4, $5)
                    ON CONFLICT (product_id, embedding_model)
                    DO UPDATE SET
                        embedding_text = EXCLUDED.embedding_text,
                        embedding = EXCLUDED.embedding,
                        updated_at = CURRENT_TIMESTAMP
                """, product_id, store_id, text, embedding_vector,
                     self.embedding_manager.deployment_name)

Step 3: Semantic search tool

# mcp_server/tools/semantic_search.py
from typing import Dict, Any, List
from .base import DatabaseTool, ToolResult, ToolCategory
import logging

logger = logging.getLogger(__name__)

class SemanticProductSearchTool(DatabaseTool):
    """Semantic search for products using vector similarity."""

    def __init__(self, db_provider, embedding_manager):
        super().__init__("semantic_search_products",
                         "Search products using natural language queries with semantic understanding",
                         db_provider)
        self.category = ToolCategory.DATABASE_QUERY
        self.embedding_manager = embedding_manager

    async def execute(self, **kwargs) -> ToolResult:
        query = kwargs.get('query')
        store_id = kwargs.get('store_id')
        limit = kwargs.get('limit', 20)
        similarity_threshold = kwargs.get('similarity_threshold', 0.7)

        if not query:
            return ToolResult(success=False, error="Search query is required")
        if not store_id:
            return ToolResult(success=False, error="store_id is required for semantic search")

        query_embedding = await self.embedding_manager.generate_embedding(query)
        embedding_vector = f"[{','.join(map(str, query_embedding))}]"

        search_query = """
            SELECT
                p.product_id,
                p.product_name,
                p.brand,
                p.price,
                p.product_description,
                p.current_stock,
                p.rating_average,
                p.rating_count,
                p.tags,
                pc.category_name,
                1 - (pe.embedding <=> $1::vector) as similarity_score
            FROM retail.product_embeddings pe
            JOIN retail.products p ON pe.product_id = p.product_id
            LEFT JOIN retail.product_categories pc ON p.category_id = pc.category_id
            WHERE pe.store_id = $2
              AND p.is_active = TRUE
              AND 1 - (pe.embedding <=> $1::vector) >= $3
            ORDER BY pe.embedding <=> $1::vector
            LIMIT $4
        """

        async with self.get_connection() as conn:
            await conn.execute("SELECT retail.set_store_context($1)", store_id)
            results = await conn.fetch(search_query, embedding_vector, store_id,
                                       similarity_threshold, limit)

        return ToolResult(
            success=True,
            data=[dict(r) for r in results],
            row_count=len(results),
            metadata={'query': query, 'store_id': store_id,
                      'similarity_threshold': similarity_threshold, 'search_type': 'semantic'}
        )

    def get_input_schema(self) -> Dict[str, Any]:
        return {
            "type": "object",
            "properties": {
                "query": {"type": "string", "minLength": 1, "maxLength": 500},
                "store_id": {"type": "string", "pattern": "^[a-zA-Z0-9_-]+$"},
                "limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 20},
                "similarity_threshold": {"type": "number", "minimum": 0.0, "maximum": 1.0, "default": 0.7}
            },
            "required": ["query", "store_id"],
            "additionalProperties": False
        }

Step 4: Hybrid search (keyword + semantic)

class HybridSearchTool(DatabaseTool):
    """Hybrid search combining keyword matching and semantic similarity."""

    def __init__(self, db_provider, embedding_manager):
        super().__init__("hybrid_product_search",
                         "Hybrid search combining keyword and semantic similarity for optimal results",
                         db_provider)
        self.embedding_manager = embedding_manager

    async def execute(self, **kwargs) -> ToolResult:
        query = kwargs.get('query')
        store_id = kwargs.get('store_id')
        limit = kwargs.get('limit', 20)
        semantic_weight = kwargs.get('semantic_weight', 0.7)
        keyword_weight = kwargs.get('keyword_weight', 0.3)

        if not query or not store_id:
            return ToolResult(success=False, error="query and store_id are required")

        query_embedding = await self.embedding_manager.generate_embedding(query)
        embedding_vector = f"[{','.join(map(str, query_embedding))}]"

        hybrid_query = """
            WITH keyword_scores AS (
                SELECT p.product_id,
                    ts_rank(
                        to_tsvector('english',
                            p.product_name || ' ' || COALESCE(p.product_description, '') || ' ' ||
                            COALESCE(p.brand, '') || ' ' || COALESCE(array_to_string(p.tags, ' '), '')
                        ),
                        plainto_tsquery('english', $2)
                    ) as keyword_score
                FROM retail.products p
                WHERE p.is_active = TRUE AND p.store_id = $3
                  AND (
                    to_tsvector('english', p.product_name || ' ' || COALESCE(p.product_description,''))
                        @@ plainto_tsquery('english', $2)
                    OR p.product_name ILIKE '%' || $2 || '%'
                  )
            ),
            semantic_scores AS (
                SELECT pe.product_id,
                    1 - (pe.embedding <=> $1::vector) as semantic_score
                FROM retail.product_embeddings pe
                WHERE pe.store_id = $3
                  AND 1 - (pe.embedding <=> $1::vector) >= 0.5
            ),
            combined AS (
                SELECT
                    COALESCE(ks.product_id, ss.product_id) as product_id,
                    COALESCE(ks.keyword_score, 0) * $4 +
                    COALESCE(ss.semantic_score, 0) * $5 as combined_score
                FROM keyword_scores ks
                FULL OUTER JOIN semantic_scores ss ON ks.product_id = ss.product_id
                WHERE COALESCE(ks.keyword_score, 0) * $4 + COALESCE(ss.semantic_score, 0) * $5 > 0
            )
            SELECT p.product_id, p.product_name, p.brand, p.price,
                   p.product_description, p.current_stock,
                   p.rating_average, pc.category_name, c.combined_score
            FROM combined c
            JOIN retail.products p ON c.product_id = p.product_id
            LEFT JOIN retail.product_categories pc ON p.category_id = pc.category_id
            ORDER BY c.combined_score DESC
            LIMIT $6
        """

        async with self.get_connection() as conn:
            await conn.execute("SELECT retail.set_store_context($1)", store_id)
            results = await conn.fetch(hybrid_query, embedding_vector, query, store_id,
                                       keyword_weight, semantic_weight, limit)

        return ToolResult(
            success=True,
            data=[dict(r) for r in results],
            row_count=len(results),
            metadata={'query': query, 'semantic_weight': semantic_weight,
                      'keyword_weight': keyword_weight, 'search_type': 'hybrid'}
        )
-- Increase work_mem for vector operations
-- In postgresql.conf:
-- work_mem = '256MB'
-- max_parallel_workers_per_gather = 4

-- Optimized HNSW index parameters (m=16 gives better recall)
CREATE INDEX CONCURRENTLY idx_product_embeddings_vector_optimized
ON retail.product_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);

-- Partial index for active products only (faster index, smaller size)
CREATE INDEX CONCURRENTLY idx_product_embeddings_active
ON retail.product_embeddings
USING hnsw (embedding vector_cosine_ops)
WHERE store_id IN (SELECT store_id FROM retail.stores WHERE is_active = TRUE);

-- Rebuild statistics after loading embeddings
ANALYZE retail.product_embeddings;

Monitor vector performance

CREATE OR REPLACE FUNCTION retail.analyze_vector_performance()
RETURNS TABLE (
    index_size TEXT,
    total_vectors BIGINT,
    cache_hit_ratio NUMERIC
) AS $$
BEGIN
    RETURN QUERY
    SELECT
        pg_size_pretty(pg_relation_size('idx_product_embeddings_vector')) as index_size,
        COUNT(*)::BIGINT as total_vectors,
        (SELECT 100.0 * blks_hit / (blks_hit + blks_read)
         FROM pg_stat_user_indexes
         WHERE indexrelname = 'idx_product_embeddings_vector') as cache_hit_ratio
    FROM retail.product_embeddings;
END;
$$ LANGUAGE plpgsql;

Key takeaways

  • text-embedding-3-small produces 1536-dimensional vectors with good quality at low cost
  • Batch embedding generation reduces API calls and respects rate limits
  • In-memory cache with TTL avoids regenerating identical embeddings on every query
  • HNSW index with m=16 provides fast approximate search with high recall
  • Hybrid search combines full-text ranking (ts_rank) and cosine similarity for best results
  • Similarity threshold 0.7 is a good starting point; tune based on your product catalog

Next: Lab 8 — Testing & Debugging

Build comprehensive unit tests, integration tests, performance tests, and debugging tools for your MCP server.

Build docs developers (and LLMs) love