Overview
Memory and retrieval in Fenic follows the Hydrate → Shape → Serve → Operate lifecycle:- Hydrate: Load sources (PDFs, docs, conversations)
- Shape: Extract, embed, chunk, filter
- Serve: Expose as MCP tools or Python functions
- Operate: Version with snapshots, rollback instantly
Memory Patterns
Facts-Based Memory
Extract and recall structured facts from conversations.import fenic as fc
from pydantic import BaseModel, Field
class Preference(BaseModel):
category: str = Field(description="The category of the preference")
value: str = Field(description="The value of the preference")
session = fc.Session.get_or_create(fc.SessionConfig(
app_name="mem_facts",
semantic=fc.SemanticConfig(
language_models={"gpt": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100_000)},
embedding_models={"embed": fc.OpenAIEmbeddingModel(model_name="text-embedding-3-small", rpm=100, tpm=100_000)},
default_embedding_model="embed"
)
))
# Extract facts from messages
msgs = session.create_dataframe([
{"user_id": "user123", "message": "I'm vegetarian and allergic to nuts."},
{"user_id": "user123", "message": "I prefer morning meetings."},
])
prefs = (
msgs.select(
fc.col("user_id"),
fc.semantic.extract(fc.col("message"), Preference).alias("pref"),
fc.semantic.embed(fc.col("message")).alias("vec")
)
.unnest("pref")
)
prefs.write.save_as_table("preferences", mode="overwrite")
Semantic Recall
async def mem_recall(user_id: str, query: str, k: int = 3):
"""Retrieve relevant memories for a user."""
user_prefs = session.table("preferences").filter(
fc.col("user_id") == fc.lit(user_id)
)
q = session.create_dataframe([{"q": query}])
res = q.semantic.sim_join(
user_prefs,
left_on=fc.semantic.embed(fc.col("q")),
right_on=fc.col("vec"),
k=k,
similarity_score_column="relevance"
).select("category", "value", "relevance")
return res._plan
# Use in agent
results = await mem_recall("user123", "food preferences")
Blocks & Episodes
Maintain persistent profiles with recent event timelines.from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field
class MemoryBlock(BaseModel):
block_name: str = Field(description="Name of the memory block")
content: str = Field(description="Content stored in the memory block")
last_updated: str = Field(description="Timestamp of last update")
class AccountEvent(BaseModel):
event_type: str = Field(description="Type of account event")
amount: Optional[float] = Field(default=None, description="Amount involved")
status: Optional[str] = Field(default=None, description="Status of event")
description: Optional[str] = Field(default=None, description="Description")
# Persistent profile block
blocks = session.create_dataframe([{
"user_id": "user123",
"block_name": "profile",
"content": "Name: Taylor; Dept: Finance",
"last_updated": datetime.now().isoformat()
}])
blocks.write.save_as_table("memory_blocks", mode="overwrite")
# Recent event timeline
events = session.create_dataframe([
{"user_id": "user123", "event": "Failed transaction of $99.99", "timestamp": "2025-01-01"},
{"user_id": "user123", "event": "Card expired", "timestamp": "2025-01-05"},
{"user_id": "user123", "event": "Account suspended", "timestamp": "2025-01-06"},
])
timeline = (
events.select(
fc.col("user_id"),
fc.col("timestamp"),
fc.semantic.extract(fc.col("event"), AccountEvent).alias("data")
)
.unnest("data")
)
timeline.write.save_as_table("account_timeline", mode="overwrite")
Retrieve Context Snapshot
async def get_user_context(user_id: str, last_n: int = 3):
"""Get profile + recent events."""
profile = (
session.table("memory_blocks")
.filter(
(fc.col("user_id") == fc.lit(user_id)) &
(fc.col("block_name") == fc.lit("profile"))
)
.select("block_name", "content", "last_updated")
)
recent = (
session.table("account_timeline")
.filter(fc.col("user_id") == fc.lit(user_id))
.sort(fc.col("timestamp").desc())
.limit(last_n)
.select("timestamp", "event_type", "status", "amount", "description")
)
return {"profile": profile._plan, "recent_events": recent._plan}
Decaying Resolution Memory
Aggregate memories with temporal compression.from datetime import date, timedelta
# Daily summaries
today = date.today()
daily = (
session.table("events")
.filter(fc.col("timestamp") >= fc.lit(today))
.group_by("user_id")
.agg(
fc.semantic.reduce(
"Summarize today's events in 2-3 sentences",
fc.col("event_text"),
order_by=[fc.col("timestamp")]
).alias("daily_summary")
)
)
# Weekly rollup
week_start = today - timedelta(days=today.weekday())
weekly = (
session.table("daily_summaries")
.filter(fc.col("date") >= fc.lit(week_start))
.group_by("user_id")
.agg(
fc.semantic.reduce(
"Summarize this week's key events",
fc.col("daily_summary")
).alias("weekly_summary")
)
)
Retrieval Patterns
Schema-First Retrieval
Extract typed Q&A pairs from documents for precise retrieval.from pydantic import BaseModel, Field
class QAPair(BaseModel):
question: str = Field(description="A question extracted from the document")
answer: str = Field(description="The answer to the question")
session = fc.Session.get_or_create(fc.SessionConfig(
app_name="policy_qa",
semantic=fc.SemanticConfig(
language_models={"gpt": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100_000)},
embedding_models={"embed": fc.OpenAIEmbeddingModel(model_name="text-embedding-3-small", rpm=100, tpm=100_000)},
default_embedding_model="embed"
)
))
# Extract Q&A from PDFs
qa_pairs = (
session.read.pdf_metadata("policies/*.pdf")
.select(
fc.col("file_path").alias("source"),
fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
)
.select(
fc.col("source"),
fc.semantic.extract(
fc.col("content").cast(fc.StringType),
QAPair
).alias("qa")
)
.unnest("qa")
.select(
"source",
"question",
"answer",
fc.semantic.embed(fc.col("question")).alias("embedding")
)
)
qa_pairs.write.save_as_table("policy_qa", mode="overwrite")
Query with Citations
async def qa_neighbors(query: str, k: int = 3):
"""Retrieve Q&A with citations."""
q = session.create_dataframe([{"q": query}])
res = q.semantic.sim_join(
session.table("policy_qa"),
left_on=fc.semantic.embed(fc.col("q")),
right_on=fc.col("embedding"),
k=k,
similarity_score_column="relevance"
).select("question", "answer", "source", "relevance")
return res._plan
Semantic Spans (Chunked Retrieval)
Chunk documents with overlap for full-text retrieval.# Chunk and embed documents
exploded = (
session.read.pdf_metadata("docs/**/*.pdf")
.select(
fc.col("file_path").alias("source"),
fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
)
.select(
fc.col("source"),
fc.text.recursive_word_chunk(
fc.col("content").cast(fc.StringType),
chunk_size=500,
chunk_overlap_percentage=10
).alias("chunks")
)
.explode("chunks")
)
# Add IDs and embeddings
chunks = (
session.sql("SELECT ROW_NUMBER() OVER () as chunk_id, * FROM {df}", df=exploded)
.select(
"chunk_id",
"source",
fc.col("chunks").alias("text"),
fc.semantic.embed(fc.col("chunks")).alias("embedding")
)
)
chunks.write.save_as_table("chunks", mode="overwrite")
Retrieve Chunks
async def docs_neighbors(query: str, k: int = 5):
"""Semantic chunk retrieval."""
q = session.create_dataframe([{"q": query}])
res = q.semantic.sim_join(
session.table("chunks"),
left_on=fc.semantic.embed(fc.col("q")),
right_on=fc.col("embedding"),
k=k,
similarity_score_column="relevance"
).select("chunk_id", "source", "text", "relevance")
return res._plan
Hybrid Filtering
Combine semantic and deterministic filters.# Filter by metadata + semantic similarity
q = session.create_dataframe([{"q": "machine learning"}])
results = (
q.semantic.sim_join(
session.table("documents")
.filter(fc.col("year") >= 2020) # Deterministic filter
.filter(fc.col("category") == "research"), # Deterministic filter
left_on=fc.semantic.embed(fc.col("q")),
right_on=fc.col("embedding"),
k=10,
similarity_score_column="score"
)
.filter(fc.col("score") > 0.7) # Semantic threshold
)
Clustering
Group similar items using K-means clustering.# Cluster feedback by theme
feedback_df = session.create_dataframe([
{"id": 1, "feedback": "The checkout process is too slow"},
{"id": 2, "feedback": "Payment failed multiple times"},
{"id": 3, "feedback": "Love the new dark mode feature"},
{"id": 4, "feedback": "Site takes forever to load"},
{"id": 5, "feedback": "The UI redesign looks amazing"},
])
# Embed and cluster
clustered = (
feedback_df.select(
"id",
"feedback",
fc.semantic.embed(fc.col("feedback")).alias("embedding")
)
.semantic.with_cluster_labels(
by=fc.col("embedding"),
num_clusters=2,
label_column="theme_id",
centroid_column="centroid"
)
)
# Analyze clusters
themes = (
clustered.group_by("theme_id").agg(
fc.count("*").alias("count"),
fc.semantic.reduce(
"Describe the common theme in 1-2 sentences",
fc.col("feedback")
).alias("theme_description")
)
)
themes.show()
Serving Tools
MCP Server
Expose retrieval as Model Context Protocol tools.from fenic.api.mcp._tool_generation_utils import auto_generate_system_tools_from_tables
from fenic.core.mcp._server import FenicMCPServer
from fenic.core.mcp.types import SystemTool
# Auto-generate CRUD tools
generated_tools = auto_generate_system_tools_from_tables(
["policy_qa", "chunks"],
session,
tool_namespace="docs",
max_result_limit=100
)
# Custom retrieval tool
async def semantic_search(query: str, k: int = 5):
q = session.create_dataframe([{"q": query}])
res = q.semantic.sim_join(
session.table("chunks"),
left_on=fc.semantic.embed(fc.col("q")),
right_on=fc.col("embedding"),
k=k,
similarity_score_column="score"
).select("source", "text", "score")
return res._plan
# Create server
server = FenicMCPServer(
session._session_state,
user_defined_tools=[],
system_tools=[
SystemTool(
name="semantic_search",
description="Search documents semantically",
max_result_limit=50,
func=semantic_search
),
*generated_tools
],
server_name="Docs Server"
)
Direct Python Functions
def retrieve_context(user_query: str, max_results: int = 5) -> list[dict]:
"""Direct Python function for retrieval."""
q = session.create_dataframe([{"q": user_query}])
results = q.semantic.sim_join(
session.table("policy_qa"),
left_on=fc.semantic.embed(fc.col("q")),
right_on=fc.col("embedding"),
k=max_results,
similarity_score_column="score"
)
return results.to_polars().to_dicts()
# Use in agent framework
context = retrieve_context("refund policy", max_results=3)
State Management
Versioning
# Version your context with dated tables
today = date.today().isoformat()
qa_pairs.write.save_as_table(f"policy_qa_{today}", mode="overwrite")
# Point prod to current version
qa_pairs.write.save_as_table("policy_qa_prod", mode="overwrite")
# Rollback by copying old version
old_version = session.table("policy_qa_2025_01_15")
old_version.write.save_as_table("policy_qa_prod", mode="overwrite")
Incremental Updates
# Append new documents
new_docs = session.read.pdf_metadata("new_policies/*.pdf")
new_qa = process_to_qa(new_docs) # Same pipeline
new_qa.write.save_as_table("policy_qa", mode="append")
# Deduplicate
deduped = (
session.table("policy_qa")
.select("*")
.distinct() # Remove exact duplicates
)
deduped.write.save_as_table("policy_qa", mode="overwrite")
Token Budget Awareness
Track and manage costs throughout pipelines.# Execute and get metrics
metrics = qa_pairs.write.save_as_table("policy_qa", mode="overwrite")
print(f"Total cost: ${metrics.total_lm_metrics.cost:.4f}")
print(f"Input tokens: {metrics.total_lm_metrics.num_uncached_input_tokens}")
print(f"Output tokens: {metrics.total_lm_metrics.num_output_tokens}")
# Tool response shaping
generated_tools = auto_generate_system_tools_from_tables(
["policy_qa"],
session,
tool_namespace="qa",
max_result_limit=50 # Cap response size
)
Best Practices
Pre-embed Everything: Embed documents once during ingestion, not at query time.
Version Before Breaking Changes: Always create dated snapshots before schema changes.
Choose k Wisely: Start with k=5 for retrieval. Increase if recall is low, decrease if precision suffers.
Chunking Strategy
# Good: Overlap for context preservation
fc.text.recursive_word_chunk(
fc.col("text"),
chunk_size=500,
chunk_overlap_percentage=10 # 50 words overlap
)
# Bad: No overlap
fc.text.word_chunk(
fc.col("text"),
chunk_size=500,
chunk_overlap_percentage=0
)
Metadata Filtering
# Good: Filter before similarity join
filtered = (
session.table("documents")
.filter(fc.col("department") == "legal")
.filter(fc.col("year") >= 2023)
)
q.semantic.sim_join(
filtered,
left_on=fc.semantic.embed(fc.col("q")),
right_on=fc.col("embedding"),
k=10
)
# Bad: Filter after (wastes similarity computation)
q.semantic.sim_join(
session.table("documents"),
left_on=fc.semantic.embed(fc.col("q")),
right_on=fc.col("embedding"),
k=100
).filter(fc.col("department") == "legal")
Next Steps
Batch Inference
Configure rate limiting and batching for production
Error Handling
Handle failures, retries, and timeouts
