Skip to main content

Overview

The IngestPipeline class orchestrates event ingestion, handling redaction, normalization, deduplication, and writes to both local mirror and Mubit memory.

Class: IngestPipeline

Constructor

constructor(
  mirror: MirrorAppender,
  options?: IngestPipelineOptions
)
mirror
MirrorAppender
required
Local mirror storage (typically JsonlMirror)
options.memoryEngine
MemoryEngine
Memory backend (typically MubitMemoryEngine)
options.failOnMemoryError
boolean
default:"false"
Throw on memory write errors (default: fail-open)
options.onMemoryError
(error: unknown, event: CapturedEventEnvelope) => void
Error handler for memory write failures
options.memoryWriteTimeoutMs
number
default:"15000"
Timeout for individual memory writes (milliseconds)
options.memoryMaxConsecutiveErrors
number
default:"3"
Max consecutive errors before opening circuit breaker
options.memoryWriteConcurrency
number
default:"1"
Concurrent memory write tasks
options.memoryBatchSize
number
default:"1"
Batch size for memory writes (requires writeEventsBatch support)
options.retryMemoryWriteOnLocalDedup
boolean
default:"false"
Retry memory write even if event was deduplicated locally
options.defaultActorId
string | null
Default actor ID when not provided in context

Methods

ingest

Ingests an event with normalization and dual-store writes.
async ingest(
  eventType: string,
  payload: Record<string, unknown>,
  ctx: IngestContext
): Promise<CapturedEventEnvelope>
eventType
string
required
Event type (e.g., “prompt.submitted”, “item.completed”)
payload
Record<string, unknown>
required
Event payload (will be redacted automatically)
ctx
IngestContext
required
Event context for normalization
ctx.source
AgentSource
required
Event source
ctx.repoId
string
required
Repository ID
ctx.sessionId
string
required
Session ID
ctx.threadId
string | null
required
Thread ID
ctx.sequence
number
required
Sequence number within session
ctx.actorId
string | null
Actor ID (falls back to defaultActorId)
ctx.eventId
string
Explicit event ID (auto-generated if not provided)
ctx.ts
string
Explicit timestamp (defaults to current time)
Returns: The normalized and ingested CapturedEventEnvelope

ingestRawLine

Ingests a raw JSONL line to session’s raw log.
async ingestRawLine(
  sessionId: string,
  line: string
): Promise<void>
sessionId
string
required
Session ID
line
string
required
Raw JSONL line

flush

Flushes all pending writes to mirror and memory.
async flush(): Promise<void>

Ingestion Pipeline Flow

Redaction

The pipeline automatically redacts sensitive data:
  • API keys and tokens
  • SSH keys
  • AWS credentials
  • Generic secrets patterns
See src/lib/redactor.ts for full redaction logic.

Deduplication

Events are deduplicated at two levels:
  1. Mirror level: Based on eventId (deterministic hash)
  2. Memory level: Optional retry on local dedup via retryMemoryWriteOnLocalDedup

Circuit Breaker

After memoryMaxConsecutiveErrors consecutive memory write failures, the circuit opens:
  • Local mirror writes continue
  • Memory writes are skipped
  • Circuit auto-closes on successful write

Usage Example

Basic Usage

import { IngestPipeline } from './lib/ingest-pipeline';
import { JsonlMirror } from './lib/mirror-jsonl';
import { MubitMemoryEngine } from './lib/memory-mubit';

const mirror = new JsonlMirror('.codaph', {
  indexWriteMode: 'batch',
  autoFlushEveryEvents: 50,
});

const memory = new MubitMemoryEngine({
  apiKey: process.env.MUBIT_API_KEY,
  runScope: 'project',
});

const pipeline = new IngestPipeline(mirror, {
  memoryEngine: memory,
  memoryWriteConcurrency: 4,
  memoryBatchSize: 20,
  memoryWriteTimeoutMs: 30000,
  onMemoryError: (error, event) => {
    console.warn('Mubit write failed:', error);
  },
});

const event = await pipeline.ingest(
  'prompt.submitted',
  {
    prompt: 'Fix authentication bug',
    model: 'gpt-4',
  },
  {
    source: 'codex_sdk',
    repoId: 'abc123',
    sessionId: 'session-1',
    threadId: 'thread-1',
    sequence: 1,
  }
);

await pipeline.flush();

High-Throughput Configuration

For history imports with thousands of events:
const pipeline = new IngestPipeline(mirror, {
  memoryEngine: memory,
  memoryWriteConcurrency: 8,
  memoryBatchSize: 50,
  memoryWriteTimeoutMs: 60000,
  memoryMaxConsecutiveErrors: 5,
});

for (const event of historicalEvents) {
  await pipeline.ingest(event.type, event.payload, event.context);
}

await pipeline.flush();

Error Handling

const pipeline = new IngestPipeline(mirror, {
  memoryEngine: memory,
  failOnMemoryError: false, // Fail-open (default)
  onMemoryError: (error, event) => {
    logger.error('Memory write failed', {
      error,
      eventId: event.eventId,
      sessionId: event.sessionId,
    });
  },
});

Performance Tuning

Concurrency

Increase concurrent memory writes for better throughput:
memoryWriteConcurrency: 8  // 8 parallel Mubit writes

Batching

Enable batch writes (requires memory engine support):
memoryBatchSize: 50  // Group 50 events per Mubit request

Timeouts

Adjust timeout based on batch size:
memoryWriteTimeoutMs: 60000  // 60 seconds for large batches

Testing

For testing, you can disable memory writes:
const pipeline = new IngestPipeline(mirror);
// No memory engine = mirror-only writes

Build docs developers (and LLMs) love