Skip to main content
Fenic enables you to build sophisticated memory and retrieval systems that work with any agent framework. Extract facts, embed content, and serve precise results through bounded tools.

Overview

Memory and retrieval in Fenic follows the Hydrate → Shape → Serve → Operate lifecycle:
  1. Hydrate: Load sources (PDFs, docs, conversations)
  2. Shape: Extract, embed, chunk, filter
  3. Serve: Expose as MCP tools or Python functions
  4. Operate: Version with snapshots, rollback instantly

Memory Patterns

Facts-Based Memory

Extract and recall structured facts from conversations.
import fenic as fc
from pydantic import BaseModel, Field

class Preference(BaseModel):
    category: str = Field(description="The category of the preference")
    value: str = Field(description="The value of the preference")

session = fc.Session.get_or_create(fc.SessionConfig(
    app_name="mem_facts",
    semantic=fc.SemanticConfig(
        language_models={"gpt": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100_000)},
        embedding_models={"embed": fc.OpenAIEmbeddingModel(model_name="text-embedding-3-small", rpm=100, tpm=100_000)},
        default_embedding_model="embed"
    )
))

# Extract facts from messages
msgs = session.create_dataframe([
    {"user_id": "user123", "message": "I'm vegetarian and allergic to nuts."},
    {"user_id": "user123", "message": "I prefer morning meetings."},
])

prefs = (
    msgs.select(
        fc.col("user_id"),
        fc.semantic.extract(fc.col("message"), Preference).alias("pref"),
        fc.semantic.embed(fc.col("message")).alias("vec")
    )
    .unnest("pref")
)

prefs.write.save_as_table("preferences", mode="overwrite")

Semantic Recall

async def mem_recall(user_id: str, query: str, k: int = 3):
    """Retrieve relevant memories for a user."""
    user_prefs = session.table("preferences").filter(
        fc.col("user_id") == fc.lit(user_id)
    )
    
    q = session.create_dataframe([{"q": query}])
    
    res = q.semantic.sim_join(
        user_prefs,
        left_on=fc.semantic.embed(fc.col("q")),
        right_on=fc.col("vec"),
        k=k,
        similarity_score_column="relevance"
    ).select("category", "value", "relevance")
    
    return res._plan

# Use in agent
results = await mem_recall("user123", "food preferences")

Blocks & Episodes

Maintain persistent profiles with recent event timelines.
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, Field

class MemoryBlock(BaseModel):
    block_name: str = Field(description="Name of the memory block")
    content: str = Field(description="Content stored in the memory block")
    last_updated: str = Field(description="Timestamp of last update")

class AccountEvent(BaseModel):
    event_type: str = Field(description="Type of account event")
    amount: Optional[float] = Field(default=None, description="Amount involved")
    status: Optional[str] = Field(default=None, description="Status of event")
    description: Optional[str] = Field(default=None, description="Description")

# Persistent profile block
blocks = session.create_dataframe([{
    "user_id": "user123",
    "block_name": "profile",
    "content": "Name: Taylor; Dept: Finance",
    "last_updated": datetime.now().isoformat()
}])
blocks.write.save_as_table("memory_blocks", mode="overwrite")

# Recent event timeline
events = session.create_dataframe([
    {"user_id": "user123", "event": "Failed transaction of $99.99", "timestamp": "2025-01-01"},
    {"user_id": "user123", "event": "Card expired", "timestamp": "2025-01-05"},
    {"user_id": "user123", "event": "Account suspended", "timestamp": "2025-01-06"},
])

timeline = (
    events.select(
        fc.col("user_id"),
        fc.col("timestamp"),
        fc.semantic.extract(fc.col("event"), AccountEvent).alias("data")
    )
    .unnest("data")
)
timeline.write.save_as_table("account_timeline", mode="overwrite")

Retrieve Context Snapshot

async def get_user_context(user_id: str, last_n: int = 3):
    """Get profile + recent events."""
    profile = (
        session.table("memory_blocks")
        .filter(
            (fc.col("user_id") == fc.lit(user_id)) &
            (fc.col("block_name") == fc.lit("profile"))
        )
        .select("block_name", "content", "last_updated")
    )
    
    recent = (
        session.table("account_timeline")
        .filter(fc.col("user_id") == fc.lit(user_id))
        .sort(fc.col("timestamp").desc())
        .limit(last_n)
        .select("timestamp", "event_type", "status", "amount", "description")
    )
    
    return {"profile": profile._plan, "recent_events": recent._plan}

Decaying Resolution Memory

Aggregate memories with temporal compression.
from datetime import date, timedelta

# Daily summaries
today = date.today()
daily = (
    session.table("events")
    .filter(fc.col("timestamp") >= fc.lit(today))
    .group_by("user_id")
    .agg(
        fc.semantic.reduce(
            "Summarize today's events in 2-3 sentences",
            fc.col("event_text"),
            order_by=[fc.col("timestamp")]
        ).alias("daily_summary")
    )
)

# Weekly rollup
week_start = today - timedelta(days=today.weekday())
weekly = (
    session.table("daily_summaries")
    .filter(fc.col("date") >= fc.lit(week_start))
    .group_by("user_id")
    .agg(
        fc.semantic.reduce(
            "Summarize this week's key events",
            fc.col("daily_summary")
        ).alias("weekly_summary")
    )
)

Retrieval Patterns

Schema-First Retrieval

Extract typed Q&A pairs from documents for precise retrieval.
from pydantic import BaseModel, Field

class QAPair(BaseModel):
    question: str = Field(description="A question extracted from the document")
    answer: str = Field(description="The answer to the question")

session = fc.Session.get_or_create(fc.SessionConfig(
    app_name="policy_qa",
    semantic=fc.SemanticConfig(
        language_models={"gpt": fc.OpenAILanguageModel(model_name="gpt-4o-mini", rpm=100, tpm=100_000)},
        embedding_models={"embed": fc.OpenAIEmbeddingModel(model_name="text-embedding-3-small", rpm=100, tpm=100_000)},
        default_embedding_model="embed"
    )
))

# Extract Q&A from PDFs
qa_pairs = (
    session.read.pdf_metadata("policies/*.pdf")
    .select(
        fc.col("file_path").alias("source"),
        fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
    )
    .select(
        fc.col("source"),
        fc.semantic.extract(
            fc.col("content").cast(fc.StringType),
            QAPair
        ).alias("qa")
    )
    .unnest("qa")
    .select(
        "source",
        "question",
        "answer",
        fc.semantic.embed(fc.col("question")).alias("embedding")
    )
)

qa_pairs.write.save_as_table("policy_qa", mode="overwrite")

Query with Citations

async def qa_neighbors(query: str, k: int = 3):
    """Retrieve Q&A with citations."""
    q = session.create_dataframe([{"q": query}])
    
    res = q.semantic.sim_join(
        session.table("policy_qa"),
        left_on=fc.semantic.embed(fc.col("q")),
        right_on=fc.col("embedding"),
        k=k,
        similarity_score_column="relevance"
    ).select("question", "answer", "source", "relevance")
    
    return res._plan

Semantic Spans (Chunked Retrieval)

Chunk documents with overlap for full-text retrieval.
# Chunk and embed documents
exploded = (
    session.read.pdf_metadata("docs/**/*.pdf")
    .select(
        fc.col("file_path").alias("source"),
        fc.semantic.parse_pdf(fc.col("file_path")).alias("content")
    )
    .select(
        fc.col("source"),
        fc.text.recursive_word_chunk(
            fc.col("content").cast(fc.StringType),
            chunk_size=500,
            chunk_overlap_percentage=10
        ).alias("chunks")
    )
    .explode("chunks")
)

# Add IDs and embeddings
chunks = (
    session.sql("SELECT ROW_NUMBER() OVER () as chunk_id, * FROM {df}", df=exploded)
    .select(
        "chunk_id",
        "source",
        fc.col("chunks").alias("text"),
        fc.semantic.embed(fc.col("chunks")).alias("embedding")
    )
)

chunks.write.save_as_table("chunks", mode="overwrite")

Retrieve Chunks

async def docs_neighbors(query: str, k: int = 5):
    """Semantic chunk retrieval."""
    q = session.create_dataframe([{"q": query}])
    
    res = q.semantic.sim_join(
        session.table("chunks"),
        left_on=fc.semantic.embed(fc.col("q")),
        right_on=fc.col("embedding"),
        k=k,
        similarity_score_column="relevance"
    ).select("chunk_id", "source", "text", "relevance")
    
    return res._plan

Hybrid Filtering

Combine semantic and deterministic filters.
# Filter by metadata + semantic similarity
q = session.create_dataframe([{"q": "machine learning"}])

results = (
    q.semantic.sim_join(
        session.table("documents")
            .filter(fc.col("year") >= 2020)              # Deterministic filter
            .filter(fc.col("category") == "research"),  # Deterministic filter
        left_on=fc.semantic.embed(fc.col("q")),
        right_on=fc.col("embedding"),
        k=10,
        similarity_score_column="score"
    )
    .filter(fc.col("score") > 0.7)  # Semantic threshold
)

Clustering

Group similar items using K-means clustering.
# Cluster feedback by theme
feedback_df = session.create_dataframe([
    {"id": 1, "feedback": "The checkout process is too slow"},
    {"id": 2, "feedback": "Payment failed multiple times"},
    {"id": 3, "feedback": "Love the new dark mode feature"},
    {"id": 4, "feedback": "Site takes forever to load"},
    {"id": 5, "feedback": "The UI redesign looks amazing"},
])

# Embed and cluster
clustered = (
    feedback_df.select(
        "id",
        "feedback",
        fc.semantic.embed(fc.col("feedback")).alias("embedding")
    )
    .semantic.with_cluster_labels(
        by=fc.col("embedding"),
        num_clusters=2,
        label_column="theme_id",
        centroid_column="centroid"
    )
)

# Analyze clusters
themes = (
    clustered.group_by("theme_id").agg(
        fc.count("*").alias("count"),
        fc.semantic.reduce(
            "Describe the common theme in 1-2 sentences",
            fc.col("feedback")
        ).alias("theme_description")
    )
)

themes.show()

Serving Tools

MCP Server

Expose retrieval as Model Context Protocol tools.
from fenic.api.mcp._tool_generation_utils import auto_generate_system_tools_from_tables
from fenic.core.mcp._server import FenicMCPServer
from fenic.core.mcp.types import SystemTool

# Auto-generate CRUD tools
generated_tools = auto_generate_system_tools_from_tables(
    ["policy_qa", "chunks"],
    session,
    tool_namespace="docs",
    max_result_limit=100
)

# Custom retrieval tool
async def semantic_search(query: str, k: int = 5):
    q = session.create_dataframe([{"q": query}])
    res = q.semantic.sim_join(
        session.table("chunks"),
        left_on=fc.semantic.embed(fc.col("q")),
        right_on=fc.col("embedding"),
        k=k,
        similarity_score_column="score"
    ).select("source", "text", "score")
    return res._plan

# Create server
server = FenicMCPServer(
    session._session_state,
    user_defined_tools=[],
    system_tools=[
        SystemTool(
            name="semantic_search",
            description="Search documents semantically",
            max_result_limit=50,
            func=semantic_search
        ),
        *generated_tools
    ],
    server_name="Docs Server"
)

Direct Python Functions

def retrieve_context(user_query: str, max_results: int = 5) -> list[dict]:
    """Direct Python function for retrieval."""
    q = session.create_dataframe([{"q": user_query}])
    
    results = q.semantic.sim_join(
        session.table("policy_qa"),
        left_on=fc.semantic.embed(fc.col("q")),
        right_on=fc.col("embedding"),
        k=max_results,
        similarity_score_column="score"
    )
    
    return results.to_polars().to_dicts()

# Use in agent framework
context = retrieve_context("refund policy", max_results=3)

State Management

Versioning

# Version your context with dated tables
today = date.today().isoformat()
qa_pairs.write.save_as_table(f"policy_qa_{today}", mode="overwrite")

# Point prod to current version
qa_pairs.write.save_as_table("policy_qa_prod", mode="overwrite")

# Rollback by copying old version
old_version = session.table("policy_qa_2025_01_15")
old_version.write.save_as_table("policy_qa_prod", mode="overwrite")

Incremental Updates

# Append new documents
new_docs = session.read.pdf_metadata("new_policies/*.pdf")
new_qa = process_to_qa(new_docs)  # Same pipeline
new_qa.write.save_as_table("policy_qa", mode="append")

# Deduplicate
deduped = (
    session.table("policy_qa")
    .select("*")
    .distinct()  # Remove exact duplicates
)
deduped.write.save_as_table("policy_qa", mode="overwrite")

Token Budget Awareness

Track and manage costs throughout pipelines.
# Execute and get metrics
metrics = qa_pairs.write.save_as_table("policy_qa", mode="overwrite")

print(f"Total cost: ${metrics.total_lm_metrics.cost:.4f}")
print(f"Input tokens: {metrics.total_lm_metrics.num_uncached_input_tokens}")
print(f"Output tokens: {metrics.total_lm_metrics.num_output_tokens}")

# Tool response shaping
generated_tools = auto_generate_system_tools_from_tables(
    ["policy_qa"],
    session,
    tool_namespace="qa",
    max_result_limit=50  # Cap response size
)

Best Practices

Pre-embed Everything: Embed documents once during ingestion, not at query time.
Version Before Breaking Changes: Always create dated snapshots before schema changes.
Choose k Wisely: Start with k=5 for retrieval. Increase if recall is low, decrease if precision suffers.

Chunking Strategy

# Good: Overlap for context preservation
fc.text.recursive_word_chunk(
    fc.col("text"),
    chunk_size=500,
    chunk_overlap_percentage=10  # 50 words overlap
)

# Bad: No overlap
fc.text.word_chunk(
    fc.col("text"),
    chunk_size=500,
    chunk_overlap_percentage=0
)

Metadata Filtering

# Good: Filter before similarity join
filtered = (
    session.table("documents")
    .filter(fc.col("department") == "legal")
    .filter(fc.col("year") >= 2023)
)

q.semantic.sim_join(
    filtered,
    left_on=fc.semantic.embed(fc.col("q")),
    right_on=fc.col("embedding"),
    k=10
)

# Bad: Filter after (wastes similarity computation)
q.semantic.sim_join(
    session.table("documents"),
    left_on=fc.semantic.embed(fc.col("q")),
    right_on=fc.col("embedding"),
    k=100
).filter(fc.col("department") == "legal")

Next Steps

Batch Inference

Configure rate limiting and batching for production

Error Handling

Handle failures, retries, and timeouts

Build docs developers (and LLMs) love