Skip to main content

Overview

The EmbeddingGenerationService handles asynchronous embedding generation for memories in elizaOS. It processes embeddings in a priority queue to avoid blocking the main runtime and provides semantic search capabilities.

Key Features

  • Asynchronous processing: Non-blocking queue-based embedding generation
  • Priority management: High, normal, and low priority queues
  • Intent extraction: Generates semantic intent for better embeddings
  • Context enrichment: Augments short messages with conversation context
  • Retry logic: Automatic retry with configurable attempts
  • Batch processing: Processes multiple embeddings in parallel
  • Graceful degradation: Automatically disables if no embedding model available

Service Lifecycle

Starting the Service

import { EmbeddingGenerationService } from "@elizaos/core";

// Service is automatically started by the runtime
const service = await EmbeddingGenerationService.start(runtime);
The service automatically:
  1. Checks for TEXT_EMBEDDING model availability
  2. Registers event handlers
  3. Starts the processing loop
  4. Begins processing queued embeddings

Stopping the Service

await service.stop();
On stop, the service:
  1. Stops the processing interval
  2. Processes remaining high-priority items
  3. Logs remaining queue size

Queue Management

Priority Levels

High Priority

Processed immediately. Used for critical messages that need semantic search right away.Use cases:
  • Direct mentions
  • Commands
  • User queries requiring context

Normal Priority

Standard priority for most messages. Processed in FIFO order after high priority items.Use cases:
  • Regular conversation messages
  • Background updates

Low Priority

Processed last. Used for non-urgent embeddings.Use cases:
  • Historical data backfill
  • System-generated messages
  • Archived content

Queue Operations

Queue Embedding Generation

// High priority - process immediately
await runtime.queueEmbeddingGeneration(memory, "high");

// Normal priority - standard processing
await runtime.queueEmbeddingGeneration(memory, "normal");

// Low priority - process when idle
await runtime.queueEmbeddingGeneration(memory, "low");

Monitor Queue Status

const service = runtime.getService("embedding-generation") as EmbeddingGenerationService;

// Get total queue size
const size = service.getQueueSize();
console.log(`Queue size: ${size}`);

// Get detailed stats
const stats = service.getQueueStats();
console.log(`High: ${stats.high}, Normal: ${stats.normal}, Low: ${stats.low}`);

Clear Queue

// Clear all pending embeddings (useful for testing or maintenance)
service.clearQueue();

Configuration

Queue Settings

class EmbeddingGenerationService extends Service {
  private maxQueueSize = 1000;        // Maximum items in queue
  private batchSize = 10;              // Process up to 10 at a time
  private processingIntervalMs = 100;  // Check queue every 100ms
}

Retry Configuration

// Default retry settings
interface EmbeddingQueueItem {
  retryCount: number;      // Current retry attempt
  maxRetries: number;      // Maximum 3 retries by default
}

Token Limits

// Maximum tokens for embedding input
const DEFAULT_MAX_EMBEDDING_TOKENS = 8191;

// Override via model configuration
const model = runtime.getModelConfiguration(ModelType.TEXT_EMBEDDING);
const maxTokens = model?.maxInputTokens || DEFAULT_MAX_EMBEDDING_TOKENS;

Intent Generation

The service automatically generates semantic intent for messages to improve embedding quality.

How It Works

  1. Length Check: Only generates intent for messages > 20 characters
  2. Intent Extraction: Uses TEXT_SMALL model to extract core meaning
  3. Embedding Source: Uses intent instead of raw text for embedding
  4. Metadata Storage: Stores intent in memory.metadata.intent

Example

// Original message
const message = "Can you help me deploy this to production? I'm worried about the database migration."

// Generated intent (used for embedding)
const intent = "Request help with production deployment and database migration concerns"

// Stored in metadata
memory.metadata.intent = intent;

Benefits

  • Better semantic search: Intent captures meaning vs. literal words
  • Improved retrieval: More relevant results in RAG
  • Context preservation: Core meaning extracted from verbose messages

Context Enrichment

Short messages (< 100 tokens) are enriched with recent conversation context.

Why Context Matters

Short messages like “yes”, “ok”, or “do it” lack semantic meaning on their own. Context enrichment provides conversation history for better embeddings.

How It Works

if (estimatedTokens < 100 && memory.roomId) {
  // Fetch last 5 messages from room
  const recentMessages = await runtime.getMemories({
    tableName: "messages",
    roomId: memory.roomId,
    count: 5,
  });
  
  // Build context from recent messages
  contextText = recentMessages
    .map((m) => m.content?.text ?? "")
    .join("\n");
}

Example

// Without context
message: "yes"
embedding: [0.1, 0.2, ...] // Generic "yes" embedding

// With context
context: `
User: Should we deploy to production?
Agent: I recommend waiting for QA approval. Should I proceed?
User: yes
`
embedding: [0.5, 0.8, ...] // Contextual "approve production deployment" embedding

Text Preparation

Embedding text is cleaned and prepared to maximize semantic quality.

Preparation Steps

  1. Strip Formatting: Remove names, timestamps, entity IDs, markdown
  2. Context Enrichment: Add conversation context for short messages
  3. Truncation: Trim to model’s max token limit
  4. Validation: Ensure non-empty text

Stripping Function

import { stripMessageFormatting } from "@elizaos/core";

const raw = "**John** (10:30 AM): Hey @agent, can you help?";
const clean = stripMessageFormatting(raw);
// Result: "Hey can you help?"

Token-Based Truncation

// Estimate 4 chars per token (safe upper bound)
const maxChars = maxTokens * 4;

if (cleaned.length > maxChars) {
  // Keep most recent content (end of string)
  cleaned = cleaned.slice(-maxChars);
}

Events

The service emits and listens to several events:

EMBEDDING_GENERATION_REQUESTED

Triggered when a new embedding is queued.
await runtime.emitEvent(EventType.EMBEDDING_GENERATION_REQUESTED, {
  memory: Memory,
  priority: "high" | "normal" | "low",
  retryCount: number,
  maxRetries: number,
  runId: string,
});

EMBEDDING_GENERATION_COMPLETED

Emitted when embedding generation succeeds.
await runtime.emitEvent(EventType.EMBEDDING_GENERATION_COMPLETED, {
  runtime: IAgentRuntime,
  memory: Memory, // Includes embedding field
  source: "embeddingService",
});

EMBEDDING_GENERATION_FAILED

Emitted when embedding generation fails after max retries.
await runtime.emitEvent(EventType.EMBEDDING_GENERATION_FAILED, {
  runtime: IAgentRuntime,
  memory: Memory,
  error: string,
  source: "embeddingService",
});

Monitoring and Logging

The service logs detailed information for observability:

Log Events

// Queue additions
"Added memory to queue" // { queueSize: number }

// Processing
"Processing batch" // { batchSize: number, remaining: number }

// Completion
"Generated embedding" // { memoryId: UUID, durationMs: number, hasIntent: boolean }

// Retries
"Re-queued item for retry" // { retryCount: number, maxRetries: number }

// Queue management
"Removed items from queue" // { removedCount: number, newSize: number }

Runtime Logs

The service also creates runtime logs for tracking:
await runtime.log({
  entityId: runtime.agentId,
  roomId: memory.roomId,
  type: "embedding_event",
  body: {
    runId: string,
    memoryId: UUID,
    status: "completed" | "failed",
    duration?: number,
    error?: string,
    source: "embeddingService",
    hasIntent: boolean,
  },
});

Error Handling

Retry Logic

if (item.retryCount < item.maxRetries) {
  item.retryCount++;
  // Re-add to queue with same priority
  this.insertItemByPriority(item);
} else {
  // Log failure and emit event
  await runtime.log({ /* failure details */ });
  await runtime.emitEvent(EventType.EMBEDDING_GENERATION_FAILED, {
    memory: item.memory,
    error: error.message,
  });
}

Common Failures

Service automatically disables if no TEXT_EMBEDDING model is registered.Solution: Register an embedding model provider.
runtime.registerModel(ModelType.TEXT_EMBEDDING, myEmbeddingModel);
Embedding API may rate limit requests.Solution: Reduce batch size or increase processing interval.
private batchSize = 5;              // Smaller batches
private processingIntervalMs = 200; // Slower processing
Input text exceeds model’s token limit.Solution: Automatic truncation handles this, but ensure maxInputTokens is set correctly.
const model = {
  maxInputTokens: 8191,
  // ...
};
Memory has no text content to embed.Solution: Service skips these automatically. Ensure messages have content.text.

Queue Optimization

Making Room

When the queue reaches capacity, the service removes items strategically:
private makeRoomInQueue(): void {
  // Remove 10% of queue (min 1, max 10 items)
  const itemsToRemove = Math.min(10, Math.max(1, Math.floor(this.maxQueueSize * 0.1)));
  
  // Sort by priority (low first) and age (oldest first)
  // Remove lowest priority, oldest items
}
Priority for removal:
  1. Low priority, oldest
  2. Low priority, newer
  3. Normal priority, oldest
  4. Normal priority, newer
  5. High priority (rarely removed)

Batch Processing

Embeddings are processed in parallel batches:
private async processQueue(): Promise<void> {
  // Take up to 10 items from queue
  const batch = this.queue.splice(0, Math.min(this.batchSize, this.queue.length));
  
  // Process in parallel
  await Promise.all(batch.map(item => this.generateEmbedding(item)));
}

Advanced Usage

Custom Embedding Models

import { ModelType } from "@elizaos/core";

// Register custom embedding model
runtime.registerModel(ModelType.TEXT_EMBEDDING, {
  name: "custom-embeddings",
  maxInputTokens: 512,
  async generate(text: string): Promise<number[]> {
    // Your embedding logic
    return [0.1, 0.2, 0.3, /* ... */];
  },
});

Monitoring Queue Health

// Create a health check endpoint
app.get("/health/embeddings", (req, res) => {
  const service = runtime.getService("embedding-generation") as EmbeddingGenerationService;
  const stats = service.getQueueStats();
  
  res.json({
    queueSize: stats.total,
    highPriority: stats.high,
    normalPriority: stats.normal,
    lowPriority: stats.low,
    healthy: stats.total < 500, // Alert if queue grows too large
  });
});

Batch Backfill

// Backfill embeddings for existing memories
async function backfillEmbeddings(runtime: IAgentRuntime) {
  const memories = await runtime.getMemories({
    tableName: "messages",
    count: 1000,
  });
  
  for (const memory of memories) {
    if (!memory.embedding) {
      // Use low priority for backfill
      await runtime.queueEmbeddingGeneration(memory, "low");
    }
  }
}

Event Listeners

// Listen for completed embeddings
runtime.registerEvent(
  EventType.EMBEDDING_GENERATION_COMPLETED,
  async (payload) => {
    console.log(`Embedding generated for ${payload.memory.id}`);
    // Update search index, trigger workflows, etc.
  }
);

// Listen for failures
runtime.registerEvent(
  EventType.EMBEDDING_GENERATION_FAILED,
  async (payload) => {
    console.error(`Failed to embed ${payload.memory.id}: ${payload.error}`);
    // Alert, retry with different settings, etc.
  }
);

Best Practices

Priority Assignment

  • Use high priority for user-facing messages that need immediate semantic search
  • Use normal priority for most messages
  • Use low priority for bulk operations and historical data

Queue Management

  • Monitor queue size regularly
  • Increase batch size for bulk operations
  • Adjust processing interval based on API rate limits
  • Set maxQueueSize based on available memory

Performance

  • Enable intent generation for better semantic quality
  • Use context enrichment for short messages
  • Process embeddings asynchronously (don’t await)
  • Batch operations when possible

Monitoring

  • Watch for EMBEDDING_GENERATION_FAILED events
  • Alert on queue size > 500
  • Track average processing duration
  • Monitor retry rates

Troubleshooting

Service Not Processing

  1. Check if TEXT_EMBEDDING model is registered
  2. Verify service is not disabled
  3. Check queue stats - may be empty
  4. Review logs for errors

High Queue Size

  1. Increase batch size
  2. Decrease processing interval
  3. Add more workers (scale horizontally)
  4. Optimize embedding model performance

Intent Generation Failures

  1. Verify TEXT_SMALL model is available
  2. Check message length (>20 chars required)
  3. Review logs for generation errors
  4. Falls back to original text automatically

Context Enrichment Issues

  1. Ensure room has message history
  2. Check memory access permissions
  3. Verify getMemories() implementation
  4. Falls back to original message if fails

Build docs developers (and LLMs) love