Overview
Azen’s memory system is built on a three-layer architecture that combines relational database storage, asynchronous embedding processing, and vector search capabilities. Every memory goes through a complete lifecycle from creation to searchability.
Memory Lifecycle
Database Schema
The memory system uses two core tables in PostgreSQL:
Memory Table
Defined in packages/db/src/db/schema.ts:230-245:
export const memory = pgTable("Memory", {
id: text("id").primaryKey(),
userId: text("user_id").notNull()
.references(() => user.id, { onDelete: "cascade" }),
organizationId: text("organization_id")
.references(() => organization.id, { onDelete: "cascade" }),
encryptedContent: text("encrypted_content").notNull(),
iv: text("iv").notNull(),
tag: text("tag").notNull(),
metadata: json("metadata"),
createdAt: timestamp("created_at").defaultNow().notNull(),
embedded: boolean("embedded").default(false),
});
The embedded boolean flag tracks whether vector embeddings have been successfully created and indexed for this memory.
EmbeddingJob Table
Defined in packages/db/src/db/schema.ts:252-271:
export const embeddingJob = pgTable("EmbeddingJob", {
id: text("id").primaryKey(),
memoryId: text("memory_id").notNull()
.references(() => memory.id, { onDelete: "cascade" }),
userId: text("user_id").notNull()
.references(() => user.id, { onDelete: "cascade" }),
organizationId: text("organization_id")
.references(() => organization.id, { onDelete: "cascade" }),
status: text("status").default("pending"),
attempts: integer("attempts").default(0),
lastError: text("last_error"),
availableAt: timestamp("available_at"),
createdAt: timestamp("created_at").defaultNow().notNull(),
updatedAt: timestamp("updated_at").$onUpdate(() => new Date()).notNull(),
});
Memory Creation Flow
When a client creates a memory via POST /memory, the following steps occur (apps/api/src/routes/memory.ts:20-92):
1. Encryption
The plain text is immediately encrypted before storage:
const memId = randomUUID();
const { ciphertext, iv, tag } = encryptText(text);
See Encryption for details on the AES-256-GCM implementation.
2. Database Insert
Both the memory record and embedding job are created atomically:
const [rec] = await db
.insert(memory)
.values({
id: memId,
userId,
organizationId,
encryptedContent: ciphertext,
iv,
tag,
}).returning();
const jobId = randomUUID();
const [jobRec] = await db
.insert(embeddingJob)
.values({
id: jobId,
memoryId: rec.id,
userId,
organizationId,
status: "queued",
}).returning();
3. Queue Job
The embedding job is added to a BullMQ queue with retry logic:
await embeddingsQueue.add('embed', {
jobId: jobRec.id,
memoryId: rec.id,
text: text,
organizationId,
userId,
}, {
attempts: Number(process.env.DLQ_ATTEMPTS ?? 5),
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: 1000,
});
The original plaintext is passed to the queue for embedding. Vector embeddings are computed on unencrypted text.
Asynchronous Embedding Processing
Embedding jobs are processed by workers with batching optimization (apps/api/src/workers/embeddings-workers.ts).
Batching Strategy
Workers buffer jobs and process them in batches to optimize throughput:
let localBuffer: BufferedItem[] = [];
let bufferTimer: NodeJS.Timeout | null = null;
const worker = new Worker(QUEUE_NAME, async (job) => {
return await new Promise<void>((resolve, reject) => {
localBuffer.push({ job, resolve, reject });
if (localBuffer.length >= Number(BATCH_SIZE)) {
if (bufferTimer) { clearTimeout(bufferTimer); bufferTimer = null; }
scheduleFlush();
} else {
scheduleFlush();
}
});
}, {
connection: bullRedis,
concurrency: Number(opts.concurrency ?? WORKER_CONCURRENCY),
lockDuration: 5 * 60 * 1000,
});
Jobs are flushed when:
- The buffer reaches
BATCH_SIZE
BATCH_WAIT_MS timeout expires
Embedding Job Processing
The core embedding logic (apps/api/src/jobs/embed-job.ts:15-41):
export async function processEmbeddingJob(payload: EmbedPayLoad) {
const { text, memoryId, organizationId, jobId } = payload;
// 1. Chunk text into 512-token segments with 50-token overlap
const chunks = chunkText(text);
// 2. Generate embeddings for all chunks
const vectors = await embedBatch(chunks);
// 3. Create vector IDs: memoryId::chunkIndex
const ids = chunks.map((_, i) => `${memoryId}::${i}`);
// 4. Upsert to organization-specific namespace
const namespace = `org-${organizationId}`;
await upsertVectors(ids, vectors, namespace, memoryID);
// 5. Mark memory as embedded
await db
.update(memory)
.set({ embedded: true })
.where(eq(memory.id, memoryId));
}
Text Chunking
Large texts are split into overlapping chunks (apps/api/src/lib/chunk.ts:5-13):
export function chunkText(text: string, maxTokens = 512, overlap = 50) {
const tokens = enc.encode(text);
const chunks: string[] = [];
for(let i = 0; i < tokens.length; i += (maxTokens - overlap)) {
const sliced = tokens.slice(i, i + maxTokens);
chunks.push(enc.decode(sliced));
}
return chunks;
}
Chunking uses js-tiktoken with the GPT-4o encoding model to ensure accurate token counts.
Why overlapping chunks?
- Prevents semantic information from being split at chunk boundaries
- Default 50-token overlap maintains context between adjacent chunks
- Each chunk can be independently embedded and searched
Memory Retrieval
List All Memories
The GET /memory endpoint returns paginated memories for an organization (apps/api/src/routes/memory.ts:94-148):
const items = await db
.select({
id: memory.id,
encryptedContent: memory.encryptedContent,
iv: memory.iv,
tag: memory.tag,
metadata: memory.metadata,
createdAt: memory.createdAt,
embedded: memory.embedded,
})
.from(memory)
.where(eq(memory.organizationId, organizationId))
.orderBy(desc(memory.createdAt))
.offset(offset)
.limit(per);
// Decrypt on-the-fly
const memories = items.map((m) => ({
id: m.id,
content: decryptText(m.encryptedContent, m.iv, m.tag),
metadata: m.metadata,
createdAt: m.createdAt,
embedded: m.embedded,
}));
Decryption happens at query time. Memories are never stored in plaintext.
Get Single Memory
Retrieval by ID with organization-level isolation (apps/api/src/routes/memory.ts:150-190):
const [rec] = await db
.select()
.from(memory)
.where(
and(
eq(memory.id, memoryId),
eq(memory.organizationId, organizationId)
)
)
.limit(1);
const content = decryptText(rec.encryptedContent, rec.iv, rec.tag);
Memory Deletion
Deletion requires cleaning up three systems (apps/api/src/routes/memory.ts:192-253):
// 1. Delete from vector database
const namespace = `org-${rec.organizationId}`;
await deleteMemoryVectors(rec.id, namespace);
// 2. Delete embedding jobs
await db.delete(embeddingJob).where(
and(
eq(embeddingJob.memoryId, memoryId),
eq(embeddingJob.organizationId, organizationId)
)
);
// 3. Delete memory record
await db.delete(memory).where(
and(
eq(memory.id, memoryId),
eq(memory.organizationId, organizationId)
)
);
Vector deletion uses metadata filtering in Pinecone to remove all chunks associated with a memory.
Error Handling and Retry Logic
Embedding jobs have built-in resilience:
Retry Configuration
- Max Attempts: 5 (configurable via
DLQ_ATTEMPTS)
- Backoff: Exponential with 2000ms initial delay
- Lock Duration: 5 minutes per job
Failure Handling
When a job exhausts all retries (apps/api/src/workers/embeddings-workers.ts:99-116):
worker.on('failed', async (job, err) => {
const attemptsMade = job?.attemptsMade ?? 0;
const threshold = Number(DLQ_ATTEMPTS ?? DLQ_ATTEMPTS);
if (job?.data?.jobId && attemptsMade >= threshold) {
await db
.update(embeddingJob)
.set({
status: "failed",
lastError: String(err ?? ""),
})
.where(sql`${embeddingJob.id} = ${job.data.jobId}`);
}
});
Failed jobs are marked in the database but the memory record remains, allowing manual retry or reprocessing.
Write Path
- Synchronous: Encryption + DB insert (~10-50ms)
- Asynchronous: Embedding generation + vector upsert (~500-2000ms)
- Client Response: Immediate after DB insert
Read Path
- List: Paginated query + batch decryption (~50-200ms for 20 items)
- Get by ID: Single record query + decryption (~5-20ms)
- Search: Vector query + DB fetch + decryption (~100-500ms)
Scalability
- Batching: Workers process multiple embeddings in single OpenAI API call
- Concurrency: Configurable worker pool size
- Organization Isolation: Vector namespaces prevent cross-tenant queries