Skip to main content
The onData middleware is where you process the actual email message content. Fumi provides the message as a stream, allowing you to handle messages of any size efficiently.

Basic Message Processing

Use onData to access the message stream:
import { Fumi } from "@puiusabin/fumi";

const app = new Fumi({ authOptional: true });

app.onData(async (ctx, next) => {
  // Read the message stream
  const chunks: Uint8Array[] = [];
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const message = Buffer.concat(chunks).toString();
  console.log("Received message:", message);
  
  await next();
});

await app.listen(25);

DataContext Interface

The onData middleware receives a DataContext:
interface DataContext {
  session: Session;                    // Connection and envelope info
  stream: ReadableStream<Uint8Array>;  // Message content stream
  sizeExceeded: boolean;               // True if message exceeds size limit
  reject(message?: string, code?: number): never;  // Reject the message
}

Reading the Message Body

The message is provided as a ReadableStream<Uint8Array>. You can consume it in several ways:

Stream Iteration

The most common approach using for-await:
app.onData(async (ctx, next) => {
  const chunks: Uint8Array[] = [];
  
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const messageBuffer = Buffer.concat(chunks);
  const messageText = messageBuffer.toString();
  
  await next();
});

Pipe to Writable Stream

For efficient processing without buffering:
app.onData(async (ctx, next) => {
  await ctx.stream.pipeTo(
    new WritableStream({
      write(chunk) {
        // Process each chunk as it arrives
        console.log("Received chunk:", chunk.length, "bytes");
      }
    })
  );
  
  await next();
});

Save to File

Write the message directly to disk:
import { Writable } from "stream";
import { createWriteStream } from "fs";

app.onData(async (ctx, next) => {
  const filename = `./messages/${ctx.session.id}.eml`;
  const fileStream = Bun.file(filename).writer();
  
  for await (const chunk of ctx.stream) {
    fileStream.write(chunk);
  }
  
  fileStream.end();
  console.log("Saved message to", filename);
  
  await next();
});

Size Limits

Control message size using FumiOptions.size and check ctx.sizeExceeded:
const app = new Fumi({ 
  authOptional: true,
  size: 1_000_000  // 1MB limit
});

app.onData(async (ctx, next) => {
  await ctx.stream.pipeTo(new WritableStream());
  
  if (ctx.sizeExceeded) {
    ctx.reject("Message exceeds maximum size", 552);
  }
  
  await next();
});
The sizeExceeded property is only accurate after consuming the stream. Set FumiOptions.size to enable size tracking.

maxSize Plugin

Fumi includes a built-in plugin for size enforcement:
import { Fumi } from "@puiusabin/fumi";
import { maxSize } from "@puiusabin/fumi/plugins";

const limit = 1_000_000;  // 1MB

const app = new Fumi({ 
  authOptional: true,
  size: limit  // Must match plugin limit
});

app.use(maxSize(limit));

await app.listen(25);
The plugin source shows how to implement size checks:
export function maxSize(bytes: number): Plugin {
  return (app) => {
    app.onData(async (ctx, next) => {
      await next();
      await ctx.stream.pipeTo(new WritableStream());
      if (ctx.sizeExceeded) {
        ctx.reject(`Message exceeds the maximum size of ${bytes} bytes`, 552);
      }
    });
  };
}

Accessing Session Information

The ctx.session object contains connection and envelope data:
interface Session {
  id: string;               // Unique session ID
  secure: boolean;          // TLS encrypted connection
  remoteAddress: string;    // Client IP address
  clientHostname: string;   // EHLO/HELO hostname
  openingCommand: string;   // "EHLO" or "HELO"
  user: unknown;            // Set by ctx.accept() in onAuth
  envelope: Envelope;       // Mail envelope (from/to)
  transmissionType: string; // Transmission protocol
}

interface Envelope {
  mailFrom: Address;        // MAIL FROM address
  rcptTo: Address[];        // RCPT TO addresses
}

interface Address {
  address: string;          // Email address
  args: Record<string, unknown>;  // ESMTP parameters
}

Using Envelope Information

app.onData(async (ctx, next) => {
  const { envelope, user, remoteAddress } = ctx.session;
  
  console.log("Message from:", envelope.mailFrom.address);
  console.log("Message to:", envelope.rcptTo.map(r => r.address));
  console.log("Client IP:", remoteAddress);
  console.log("Authenticated user:", user);
  
  // Process message...
  
  await next();
});

Real-World Examples

Save Messages to Database

import { Fumi } from "@puiusabin/fumi";
import { db } from "./db";

const app = new Fumi({ authOptional: true });

app.onData(async (ctx, next) => {
  const chunks: Uint8Array[] = [];
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const message = Buffer.concat(chunks).toString();
  
  await db.messages.insert({
    sessionId: ctx.session.id,
    from: ctx.session.envelope.mailFrom.address,
    to: ctx.session.envelope.rcptTo.map(r => r.address),
    body: message,
    remoteAddress: ctx.session.remoteAddress,
    receivedAt: new Date()
  });
  
  await next();
});

await app.listen(25);

Parse and Validate Messages

import { simpleParser } from "mailparser";

app.onData(async (ctx, next) => {
  const chunks: Uint8Array[] = [];
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const buffer = Buffer.concat(chunks);
  const parsed = await simpleParser(buffer);
  
  // Validate headers
  if (!parsed.subject) {
    ctx.reject("Subject header required", 550);
  }
  
  // Check content
  if (parsed.text && parsed.text.length > 10000) {
    ctx.reject("Message body too long", 552);
  }
  
  // Process attachments
  for (const attachment of parsed.attachments || []) {
    console.log("Attachment:", attachment.filename);
  }
  
  await next();
});

Forward to External Service

app.onData(async (ctx, next) => {
  const chunks: Uint8Array[] = [];
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const message = Buffer.concat(chunks);
  
  // Forward to external API
  const response = await fetch("https://api.example.com/inbound", {
    method: "POST",
    headers: {
      "Content-Type": "message/rfc822"
    },
    body: message
  });
  
  if (!response.ok) {
    ctx.reject("Failed to process message", 451);
  }
  
  await next();
});

Spam Filtering

import { spamCheck } from "./spam-filter";

app.onData(async (ctx, next) => {
  const chunks: Uint8Array[] = [];
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const message = Buffer.concat(chunks).toString();
  const spamScore = await spamCheck(message, ctx.session.remoteAddress);
  
  if (spamScore > 5.0) {
    console.log(`Rejected spam (score: ${spamScore}) from ${ctx.session.remoteAddress}`);
    ctx.reject("Message rejected as spam", 554);
  }
  
  await next();
});

Virus Scanning

import { scanForViruses } from "./antivirus";

app.onData(async (ctx, next) => {
  const chunks: Uint8Array[] = [];
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const message = Buffer.concat(chunks);
  const scanResult = await scanForViruses(message);
  
  if (scanResult.infected) {
    console.log(`Virus detected: ${scanResult.virus} from ${ctx.session.remoteAddress}`);
    ctx.reject("Message contains malware", 554);
  }
  
  await next();
});

Middleware Chaining

Multiple onData middlewares run in sequence:
// First middleware: Log message info
app.onData(async (ctx, next) => {
  console.log("Receiving message from:", ctx.session.envelope.mailFrom.address);
  await next();
});

// Second middleware: Process message
app.onData(async (ctx, next) => {
  const chunks: Uint8Array[] = [];
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const message = Buffer.concat(chunks).toString();
  await saveMessage(message, ctx.session);
  
  await next();
});

// Third middleware: Log completion
app.onData(async (ctx) => {
  console.log("Message processed successfully");
});
The stream can only be consumed once. If you need the message in multiple middlewares, store it in a variable or use ctx.session to pass data between middlewares.

Rejecting Messages

Call ctx.reject() to refuse the message:
app.onData(async (ctx, next) => {
  const chunks: Uint8Array[] = [];
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const message = Buffer.concat(chunks).toString();
  
  // Check for banned content
  if (message.includes("banned-word")) {
    ctx.reject("Content policy violation", 550);
  }
  
  await next();
});
Common SMTP response codes for DATA phase:
  • 250 - Message accepted (automatic on success)
  • 450 - Temporary failure, try again later
  • 451 - Processing error
  • 452 - Insufficient storage
  • 550 - Message rejected
  • 552 - Message too large
  • 554 - Transaction failed

Performance Tips

1
Stream Processing
2
For large messages, process chunks as they arrive instead of buffering:
3
app.onData(async (ctx, next) => {
  let lineCount = 0;
  let buffer = "";
  
  for await (const chunk of ctx.stream) {
    buffer += chunk.toString();
    const lines = buffer.split("\n");
    buffer = lines.pop() || "";
    
    lineCount += lines.length;
  }
  
  console.log(`Message has ${lineCount} lines`);
  await next();
});
4
Async Processing
5
Process messages in the background after accepting:
6
app.onData(async (ctx, next) => {
  const chunks: Uint8Array[] = [];
  for await (const chunk of ctx.stream) {
    chunks.push(chunk);
  }
  
  const message = Buffer.concat(chunks);
  
  // Accept immediately
  await next();
  
  // Process async (don't await)
  processMessage(message, ctx.session).catch(err => {
    console.error("Background processing failed:", err);
  });
});
7
Set Appropriate Size Limits
8
Prevent resource exhaustion:
9
const app = new Fumi({ 
  authOptional: true,
  size: 10_000_000,  // 10MB max
  maxClients: 50     // Limit concurrent connections
});

Next Steps

Build docs developers (and LLMs) love