Objectives
By the end of this lab you will be able to:- Integrate Azure OpenAI embedding models for text vectorization
- Implement pgvector for efficient similarity search operations
- Build semantic search tools for natural language product queries
- Create hybrid search combining traditional keyword and vector search
- Optimize vector queries for production performance
- Design recommendation systems using embedding similarity
Prerequisites
- Completed Lab 6: Tool Development
- Azure OpenAI service deployed with
text-embedding-3-smallmodel product_embeddingstable created (from Lab 4)
How semantic search works
User Query: "comfortable running shoes for rainy weather"
│
▼
Azure OpenAI text-embedding-3-small
│ 1536-dimensional vector
▼
pgvector cosine similarity search
embedding <=> query_vector
│
▼
Ranked results:
1. Nike Air Zoom Trail (0.89)
2. Adidas Terrex Gore-Tex (0.85)
3. Salomon Speedcross (0.82)
Semantic search finds products that are conceptually similar to the query, even when the exact words don’t appear in product descriptions. This enables queries like “waterproof outdoor footwear” to match products described as “rain-resistant hiking shoes.”
Step 1: Embedding manager
# mcp_server/embeddings/embedding_manager.py
import asyncio
import hashlib
from typing import List, Optional
from datetime import datetime, timedelta
from azure.ai.projects.aio import AIProjectClient
from azure.identity.aio import DefaultAzureCredential
from azure.core.exceptions import HttpResponseError
import logging
logger = logging.getLogger(__name__)
class EmbeddingManager:
"""Manage text embeddings for semantic search."""
def __init__(self, project_endpoint: str, deployment_name: str = "text-embedding-3-small"):
self.project_endpoint = project_endpoint
self.deployment_name = deployment_name
self.credential = DefaultAzureCredential()
self.client = None
self.embedding_dimension = 1536
self.batch_size = 100
# In-memory embedding cache (24-hour TTL)
self.embedding_cache = {}
self.cache_ttl = timedelta(hours=24)
async def initialize(self):
self.client = AIProjectClient(
endpoint=self.project_endpoint,
credential=self.credential
)
# Validate connection on startup
test = await self.generate_embedding("test connection")
if len(test) != self.embedding_dimension:
raise ValueError(f"Unexpected embedding dimension: {len(test)}")
logger.info("Embedding manager initialized")
async def generate_embedding(self, text: str, use_cache: bool = True) -> List[float]:
"""Generate embedding for a single text string."""
if not text or not text.strip():
raise ValueError("Text cannot be empty")
cache_key = self._get_cache_key(text)
if use_cache:
cached = self._get_cached_embedding(cache_key)
if cached:
return cached
if not self.client:
await self.initialize()
try:
response = await self.client.embeddings.create(
model=self.deployment_name,
input=text.strip()
)
embedding = response.data[0].embedding
if use_cache:
self._cache_embedding(cache_key, embedding)
return embedding
except HttpResponseError as e:
raise Exception(f"Embedding generation failed: {e}")
async def generate_embeddings_batch(
self, texts: List[str], use_cache: bool = True
) -> List[List[float]]:
"""Generate embeddings for multiple texts with batching and caching."""
if not texts:
return []
embeddings = [None] * len(texts)
cache_misses = []
cache_miss_indices = []
for i, text in enumerate(texts):
if not text or not text.strip():
embeddings[i] = []
continue
if use_cache:
cached = self._get_cached_embedding(self._get_cache_key(text))
if cached:
embeddings[i] = cached
continue
cache_misses.append(text.strip())
cache_miss_indices.append(i)
if cache_misses:
for batch_start in range(0, len(cache_misses), self.batch_size):
batch = cache_misses[batch_start:batch_start + self.batch_size]
response = await self.client.embeddings.create(
model=self.deployment_name, input=batch
)
for j, emb_data in enumerate(response.data):
idx = cache_miss_indices[batch_start + j]
embeddings[idx] = emb_data.embedding
if use_cache:
self._cache_embedding(self._get_cache_key(batch[j]), emb_data.embedding)
if batch_start + self.batch_size < len(cache_misses):
await asyncio.sleep(0.1) # Rate limiting
return embeddings
def _get_cache_key(self, text: str) -> str:
return hashlib.sha256(f"{self.deployment_name}:{text.strip()}".encode()).hexdigest()
def _get_cached_embedding(self, cache_key: str) -> Optional[List[float]]:
if cache_key in self.embedding_cache:
entry = self.embedding_cache[cache_key]
if datetime.now() - entry['timestamp'] < self.cache_ttl:
return entry['embedding']
del self.embedding_cache[cache_key]
return None
def _cache_embedding(self, cache_key: str, embedding: List[float]):
if len(self.embedding_cache) > 10000:
# Evict oldest 1000 entries
oldest = sorted(self.embedding_cache.keys(),
key=lambda k: self.embedding_cache[k]['timestamp'])[:1000]
for key in oldest:
del self.embedding_cache[key]
self.embedding_cache[cache_key] = {'embedding': embedding, 'timestamp': datetime.now()}
async def cleanup(self):
if self.client:
await self.client.close()
Step 2: Product embedding pipeline
# mcp_server/embeddings/product_embedder.py
import asyncpg
from typing import List, Dict, Any
import logging
logger = logging.getLogger(__name__)
class ProductEmbedder:
"""Generate and manage product embeddings for semantic search."""
def __init__(self, db_provider, embedding_manager):
self.db_provider = db_provider
self.embedding_manager = embedding_manager
self.text_template = "{product_name} {brand} {description} {category} {tags}"
async def generate_product_embeddings(
self, store_id: str, batch_size: int = 50, force_regenerate: bool = False
) -> Dict[str, Any]:
"""Generate embeddings for all products in a store that need updating."""
async with self.db_provider.get_connection() as conn:
await conn.execute("SELECT retail.set_store_context($1)", store_id)
# Only fetch products without embeddings (or outdated ones)
products_query = """
SELECT
p.product_id,
p.product_name,
p.product_description,
p.brand,
pc.category_name,
array_to_string(p.tags, ' ') as tags_text
FROM retail.products p
LEFT JOIN retail.product_categories pc ON p.category_id = pc.category_id
LEFT JOIN retail.product_embeddings pe ON p.product_id = pe.product_id
WHERE p.is_active = TRUE
AND (pe.product_id IS NULL OR pe.updated_at < p.updated_at)
ORDER BY p.created_at DESC
"""
products = await conn.fetch(products_query)
if not products:
return {'success': True, 'message': 'No products need embedding generation',
'processed_count': 0}
logger.info(f"Generating embeddings for {len(products)} products in {store_id}")
processed_count = 0
for i in range(0, len(products), batch_size):
batch = products[i:i + batch_size]
await self._process_product_batch(conn, batch, store_id)
processed_count += len(batch)
logger.info(f"Processed {processed_count}/{len(products)}")
return {'success': True, 'processed_count': processed_count, 'store_id': store_id}
def _create_product_text(self, product: Dict[str, Any]) -> str:
"""Combine product fields into a single searchable text string."""
return ' '.join(self.text_template.format(
product_name=product.get('product_name') or '',
brand=product.get('brand') or '',
description=product.get('product_description') or '',
category=product.get('category_name') or '',
tags=product.get('tags_text') or ''
).split())
async def _process_product_batch(self, conn, products, store_id: str):
texts = [self._create_product_text(dict(p)) for p in products]
product_ids = [p['product_id'] for p in products]
embeddings = await self.embedding_manager.generate_embeddings_batch(texts)
for product_id, text, embedding in zip(product_ids, texts, embeddings):
if embedding:
embedding_vector = f"[{','.join(map(str, embedding))}]"
await conn.execute("""
INSERT INTO retail.product_embeddings
(product_id, store_id, embedding_text, embedding, embedding_model)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (product_id, embedding_model)
DO UPDATE SET
embedding_text = EXCLUDED.embedding_text,
embedding = EXCLUDED.embedding,
updated_at = CURRENT_TIMESTAMP
""", product_id, store_id, text, embedding_vector,
self.embedding_manager.deployment_name)
Step 3: Semantic search tool
# mcp_server/tools/semantic_search.py
from typing import Dict, Any, List
from .base import DatabaseTool, ToolResult, ToolCategory
import logging
logger = logging.getLogger(__name__)
class SemanticProductSearchTool(DatabaseTool):
"""Semantic search for products using vector similarity."""
def __init__(self, db_provider, embedding_manager):
super().__init__("semantic_search_products",
"Search products using natural language queries with semantic understanding",
db_provider)
self.category = ToolCategory.DATABASE_QUERY
self.embedding_manager = embedding_manager
async def execute(self, **kwargs) -> ToolResult:
query = kwargs.get('query')
store_id = kwargs.get('store_id')
limit = kwargs.get('limit', 20)
similarity_threshold = kwargs.get('similarity_threshold', 0.7)
if not query:
return ToolResult(success=False, error="Search query is required")
if not store_id:
return ToolResult(success=False, error="store_id is required for semantic search")
query_embedding = await self.embedding_manager.generate_embedding(query)
embedding_vector = f"[{','.join(map(str, query_embedding))}]"
search_query = """
SELECT
p.product_id,
p.product_name,
p.brand,
p.price,
p.product_description,
p.current_stock,
p.rating_average,
p.rating_count,
p.tags,
pc.category_name,
1 - (pe.embedding <=> $1::vector) as similarity_score
FROM retail.product_embeddings pe
JOIN retail.products p ON pe.product_id = p.product_id
LEFT JOIN retail.product_categories pc ON p.category_id = pc.category_id
WHERE pe.store_id = $2
AND p.is_active = TRUE
AND 1 - (pe.embedding <=> $1::vector) >= $3
ORDER BY pe.embedding <=> $1::vector
LIMIT $4
"""
async with self.get_connection() as conn:
await conn.execute("SELECT retail.set_store_context($1)", store_id)
results = await conn.fetch(search_query, embedding_vector, store_id,
similarity_threshold, limit)
return ToolResult(
success=True,
data=[dict(r) for r in results],
row_count=len(results),
metadata={'query': query, 'store_id': store_id,
'similarity_threshold': similarity_threshold, 'search_type': 'semantic'}
)
def get_input_schema(self) -> Dict[str, Any]:
return {
"type": "object",
"properties": {
"query": {"type": "string", "minLength": 1, "maxLength": 500},
"store_id": {"type": "string", "pattern": "^[a-zA-Z0-9_-]+$"},
"limit": {"type": "integer", "minimum": 1, "maximum": 100, "default": 20},
"similarity_threshold": {"type": "number", "minimum": 0.0, "maximum": 1.0, "default": 0.7}
},
"required": ["query", "store_id"],
"additionalProperties": False
}
Step 4: Hybrid search (keyword + semantic)
class HybridSearchTool(DatabaseTool):
"""Hybrid search combining keyword matching and semantic similarity."""
def __init__(self, db_provider, embedding_manager):
super().__init__("hybrid_product_search",
"Hybrid search combining keyword and semantic similarity for optimal results",
db_provider)
self.embedding_manager = embedding_manager
async def execute(self, **kwargs) -> ToolResult:
query = kwargs.get('query')
store_id = kwargs.get('store_id')
limit = kwargs.get('limit', 20)
semantic_weight = kwargs.get('semantic_weight', 0.7)
keyword_weight = kwargs.get('keyword_weight', 0.3)
if not query or not store_id:
return ToolResult(success=False, error="query and store_id are required")
query_embedding = await self.embedding_manager.generate_embedding(query)
embedding_vector = f"[{','.join(map(str, query_embedding))}]"
hybrid_query = """
WITH keyword_scores AS (
SELECT p.product_id,
ts_rank(
to_tsvector('english',
p.product_name || ' ' || COALESCE(p.product_description, '') || ' ' ||
COALESCE(p.brand, '') || ' ' || COALESCE(array_to_string(p.tags, ' '), '')
),
plainto_tsquery('english', $2)
) as keyword_score
FROM retail.products p
WHERE p.is_active = TRUE AND p.store_id = $3
AND (
to_tsvector('english', p.product_name || ' ' || COALESCE(p.product_description,''))
@@ plainto_tsquery('english', $2)
OR p.product_name ILIKE '%' || $2 || '%'
)
),
semantic_scores AS (
SELECT pe.product_id,
1 - (pe.embedding <=> $1::vector) as semantic_score
FROM retail.product_embeddings pe
WHERE pe.store_id = $3
AND 1 - (pe.embedding <=> $1::vector) >= 0.5
),
combined AS (
SELECT
COALESCE(ks.product_id, ss.product_id) as product_id,
COALESCE(ks.keyword_score, 0) * $4 +
COALESCE(ss.semantic_score, 0) * $5 as combined_score
FROM keyword_scores ks
FULL OUTER JOIN semantic_scores ss ON ks.product_id = ss.product_id
WHERE COALESCE(ks.keyword_score, 0) * $4 + COALESCE(ss.semantic_score, 0) * $5 > 0
)
SELECT p.product_id, p.product_name, p.brand, p.price,
p.product_description, p.current_stock,
p.rating_average, pc.category_name, c.combined_score
FROM combined c
JOIN retail.products p ON c.product_id = p.product_id
LEFT JOIN retail.product_categories pc ON p.category_id = pc.category_id
ORDER BY c.combined_score DESC
LIMIT $6
"""
async with self.get_connection() as conn:
await conn.execute("SELECT retail.set_store_context($1)", store_id)
results = await conn.fetch(hybrid_query, embedding_vector, query, store_id,
keyword_weight, semantic_weight, limit)
return ToolResult(
success=True,
data=[dict(r) for r in results],
row_count=len(results),
metadata={'query': query, 'semantic_weight': semantic_weight,
'keyword_weight': keyword_weight, 'search_type': 'hybrid'}
)
Performance optimization for vector search
-- Increase work_mem for vector operations
-- In postgresql.conf:
-- work_mem = '256MB'
-- max_parallel_workers_per_gather = 4
-- Optimized HNSW index parameters (m=16 gives better recall)
CREATE INDEX CONCURRENTLY idx_product_embeddings_vector_optimized
ON retail.product_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200);
-- Partial index for active products only (faster index, smaller size)
CREATE INDEX CONCURRENTLY idx_product_embeddings_active
ON retail.product_embeddings
USING hnsw (embedding vector_cosine_ops)
WHERE store_id IN (SELECT store_id FROM retail.stores WHERE is_active = TRUE);
-- Rebuild statistics after loading embeddings
ANALYZE retail.product_embeddings;
Monitor vector performance
CREATE OR REPLACE FUNCTION retail.analyze_vector_performance()
RETURNS TABLE (
index_size TEXT,
total_vectors BIGINT,
cache_hit_ratio NUMERIC
) AS $$
BEGIN
RETURN QUERY
SELECT
pg_size_pretty(pg_relation_size('idx_product_embeddings_vector')) as index_size,
COUNT(*)::BIGINT as total_vectors,
(SELECT 100.0 * blks_hit / (blks_hit + blks_read)
FROM pg_stat_user_indexes
WHERE indexrelname = 'idx_product_embeddings_vector') as cache_hit_ratio
FROM retail.product_embeddings;
END;
$$ LANGUAGE plpgsql;
Key takeaways
- text-embedding-3-small produces 1536-dimensional vectors with good quality at low cost
- Batch embedding generation reduces API calls and respects rate limits
- In-memory cache with TTL avoids regenerating identical embeddings on every query
- HNSW index with
m=16provides fast approximate search with high recall - Hybrid search combines full-text ranking (
ts_rank) and cosine similarity for best results - Similarity threshold 0.7 is a good starting point; tune based on your product catalog
Next: Lab 8 — Testing & Debugging
Build comprehensive unit tests, integration tests, performance tests, and debugging tools for your MCP server.