Skip to main content
The extraction pipeline transforms raw agent session transcripts into structured memory candidates using DSPy’s ChainOfThought prompting with transcript windowing.

Pipeline overview

The extraction pipeline is implemented in src/lerim/memory/extract_pipeline.py and runs as a runtime tool called by the lead agent during sync flows.
# src/lerim/runtime/agent.py:367-378
@agent.tool
def extract_pipeline(
    ctx: RunContext[RuntimeToolContext],
    guidance: str | None = None,
) -> dict[str, Any]:
    """Run DSPy extraction pipeline on the session trace. 
    Paths and metadata are handled automatically. 
    Pass optional guidance about focus areas or dedupe hints."""
    return run_extract_pipeline_tool(
        context=ctx.deps,
        guidance=guidance,
    )
The lead agent calls extract_pipeline with optional natural language guidance about what to focus on or how to deduplicate against existing memories.

DSPy signature classes

Extraction uses two DSPy signatures for windowed processing:

MemoryExtractSignature

Extracts memory candidates from a single transcript window:
# src/lerim/memory/extract_pipeline.py:26-53
class MemoryExtractSignature(dspy.Signature):
    """Extract reusable memory candidates from this transcript segment.

    Focus on decisions (explicit choices/policies) and learnings
    (lessons/fixes/pitfalls/preferences). Keep one short evidence quote
    per item (<=200 chars). Prefer precision over recall.

    Kind (for learnings only):
    - insight: a reusable observation or pattern.
    - procedure: a step-by-step fix or workflow.
    - friction: a blocker, struggle, or time-waster.
    - pitfall: a mistake to avoid.
    - preference: a user preference, habit, convention, or style choice.

    Tags: assign descriptive group/cluster labels for categorization.
    """

    transcript: str = dspy.InputField(
        desc="Raw session transcript text (JSONL or JSON, schema varies by agent)"
    )
    metadata: dict[str, Any] = dspy.InputField(desc="Session metadata")
    metrics: dict[str, Any] = dspy.InputField(desc="Deterministic metrics")
    guidance: str = dspy.InputField(
        desc="Optional lead-agent natural language guidance about focus areas, trace context, and dedupe hints"
    )
    primitives: list[MemoryCandidate] = dspy.OutputField(
        desc="Extracted memory candidate list"
    )
The signature docstring becomes part of the LLM prompt. DSPy uses this to ground the model’s extraction behavior.

MemoryMergeSignature

Merges and deduplicates candidates from multiple windows:
# src/lerim/memory/extract_pipeline.py:56-68
class MemoryMergeSignature(dspy.Signature):
    """Merge and deduplicate memory candidates extracted from multiple transcript windows.

    Remove near-duplicates (keep highest-confidence version).
    Drop weak or redundant items. Return the final clean list.
    """

    candidates: list[dict] = dspy.InputField(
        desc="All per-window candidates as list of dicts"
    )
    metadata: dict = dspy.InputField(desc="Session metadata")
    primitives: list[MemoryCandidate] = dspy.OutputField(
        desc="Deduplicated final list"
    )

Transcript windowing strategy

Large transcripts exceed model context limits. Lerim uses overlapping window chunking:
  1. Window size: Configurable via extract_role.max_window_tokens (default: 300,000 tokens)
  2. Overlap: Configurable via extract_role.window_overlap_tokens (default: 10% of window size)
  3. Chunking: window_transcript() splits on newlines to preserve message boundaries
# src/lerim/memory/extract_pipeline.py:81-83
config = get_config()
max_tokens = config.extract_role.max_window_tokens
overlap_tokens = config.extract_role.window_overlap_tokens
windows = window_transcript(transcript, max_tokens, overlap_tokens)
Each window is processed independently, then candidates are merged.
Overlapping windows ensure that important context near window boundaries isn’t lost. A decision that spans a boundary will appear in both windows, and the merge step deduplicates it.Example: With 300K token windows and 30K overlap, a 500K transcript becomes:
  • Window 1: tokens 0-300K
  • Window 2: tokens 270K-500K (30K overlap with Window 1)

Candidate extraction process

Single window extraction

# src/lerim/memory/extract_pipeline.py:70-106
def _extract_candidates(
    transcript: str,
    *,
    metadata: dict[str, Any] | None = None,
    metrics: dict[str, Any] | None = None,
    guidance: str = "",
) -> list[dict[str, Any]]:
    """Run ChainOfThought extraction with windowing and return normalized candidates."""
    if not transcript.strip():
        return []
    
    # ... windowing setup ...
    
    all_candidates: list[dict[str, Any]] = []
    extractor = dspy.ChainOfThought(MemoryExtractSignature)
    history_start = len(lm.history)
    
    with dspy.context(lm=lm):
        for window in windows:
            result = extractor(
                transcript=window, 
                metadata=meta, 
                metrics=met, 
                guidance=guid
            )
            primitives = getattr(result, "primitives", [])
            if isinstance(primitives, list):
                for item in primitives:
                    if isinstance(item, MemoryCandidate):
                        all_candidates.append(
                            item.model_dump(mode="json", exclude_none=True)
                        )

Multi-window merge

When the transcript spans multiple windows, candidates are merged:
# src/lerim/memory/extract_pipeline.py:112-130
if len(windows) == 1:
    capture_dspy_cost(lm, history_start)
    return all_candidates

# Multiple windows: merge and deduplicate
merger = dspy.ChainOfThought(MemoryMergeSignature)
with dspy.context(lm=lm):
    merge_result = merger(candidates=all_candidates, metadata=meta)
capture_dspy_cost(lm, history_start)

merged = getattr(merge_result, "primitives", [])
if not isinstance(merged, list):
    return all_candidates

return [
    item.model_dump(mode="json", exclude_none=True)
    if isinstance(item, MemoryCandidate)
    else item
    for item in merged
    if isinstance(item, (MemoryCandidate, dict))
]
The merge step uses the same ChainOfThought prompting but operates on structured candidate data instead of raw transcripts. This is much faster than re-processing the full transcript.

Deduplication logic

Deduplication happens at two levels:

1. Pipeline-level deduplication

The MemoryMergeSignature removes near-duplicates within the extracted candidates:
  • Keeps the highest-confidence version when multiple windows extract the same memory
  • Drops weak or redundant items that don’t add new information

2. Agent-level deduplication

The lead agent performs a second deduplication pass using the explorer subagent:
  1. Explorer search: Lead delegates to explorer to search existing memories
  2. Semantic comparison: Lead compares extracted candidates against existing memories
  3. Decision policy: Lead decides add|update|no-op for each candidate
This two-level approach ensures that pipeline output is clean, and the lead agent makes final decisions about what to persist.
See runtime/agent.py:491-608 for the lead agent’s sync flow implementation.

Memory candidate schema

Extracted candidates conform to the MemoryCandidate Pydantic model:
class MemoryCandidate(BaseModel):
    primitive: Literal["decision", "learning"]  # memory type
    title: str                                   # short descriptive title
    body: str                                    # detailed explanation
    confidence: float = 0.8                      # 0.0 to 1.0
    tags: list[str] = []                        # group/cluster labels
    kind: str | None = None                     # for learnings: insight/procedure/friction/pitfall/preference
    evidence: str | None = None                 # short quote (<=200 chars)
The lead agent converts these candidates to MemoryRecord objects before writing to disk.

Configuration

Extraction behavior is configured via the extract role in config.toml:
[roles.extract]
provider = "openrouter"
model = "openai/gpt-5-nano"
max_window_tokens = 300000
window_overlap_tokens = 30000
Default model uses OpenRouter’s openai/gpt-5-nano for its 300K context window and low cost.

Cost tracking

DSPy pipeline costs are captured and aggregated:
# src/lerim/memory/extract_pipeline.py:108-109
history_start = len(lm.history)
# ... run extraction ...
capture_dspy_cost(lm, history_start)
Cost data is logged in activity.log and returned in sync/maintain result payloads.

Example extraction

Given this transcript fragment:
{"role":"user","content":"Queue jobs got stuck again. Heartbeat drift caused retries and duplicate claims."}
{"role":"assistant","content":"Fix worked: heartbeat every 15s, max_attempts=3, then dead_letter. Add metrics for retries and dead letters."}
The pipeline might extract:
{
  "primitive": "learning",
  "title": "Queue heartbeat timing fix",
  "body": "Jobs stuck due to heartbeat drift causing duplicate claims. Fixed by setting heartbeat interval to 15 seconds with max_attempts=3 and dead_letter queue for failures. Added metrics for monitoring retries and dead letters.",
  "kind": "procedure",
  "confidence": 0.85,
  "tags": ["queue", "reliability", "monitoring"],
  "evidence": "heartbeat every 15s, max_attempts=3, then dead_letter"
}

Next steps

Agent runtime

See how the lead agent uses extraction pipeline output

Configuration

Configure extraction models and windowing parameters

Build docs developers (and LLMs) love