The onData middleware is where you process the actual email message content. Fumi provides the message as a stream, allowing you to handle messages of any size efficiently.
Basic Message Processing
Use onData to access the message stream:
import { Fumi } from "@puiusabin/fumi";
const app = new Fumi({ authOptional: true });
app.onData(async (ctx, next) => {
// Read the message stream
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const message = Buffer.concat(chunks).toString();
console.log("Received message:", message);
await next();
});
await app.listen(25);
DataContext Interface
The onData middleware receives a DataContext:
interface DataContext {
session: Session; // Connection and envelope info
stream: ReadableStream<Uint8Array>; // Message content stream
sizeExceeded: boolean; // True if message exceeds size limit
reject(message?: string, code?: number): never; // Reject the message
}
Reading the Message Body
The message is provided as a ReadableStream<Uint8Array>. You can consume it in several ways:
Stream Iteration
The most common approach using for-await:
app.onData(async (ctx, next) => {
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const messageBuffer = Buffer.concat(chunks);
const messageText = messageBuffer.toString();
await next();
});
Pipe to Writable Stream
For efficient processing without buffering:
app.onData(async (ctx, next) => {
await ctx.stream.pipeTo(
new WritableStream({
write(chunk) {
// Process each chunk as it arrives
console.log("Received chunk:", chunk.length, "bytes");
}
})
);
await next();
});
Save to File
Write the message directly to disk:
import { Writable } from "stream";
import { createWriteStream } from "fs";
app.onData(async (ctx, next) => {
const filename = `./messages/${ctx.session.id}.eml`;
const fileStream = Bun.file(filename).writer();
for await (const chunk of ctx.stream) {
fileStream.write(chunk);
}
fileStream.end();
console.log("Saved message to", filename);
await next();
});
Size Limits
Control message size using FumiOptions.size and check ctx.sizeExceeded:
const app = new Fumi({
authOptional: true,
size: 1_000_000 // 1MB limit
});
app.onData(async (ctx, next) => {
await ctx.stream.pipeTo(new WritableStream());
if (ctx.sizeExceeded) {
ctx.reject("Message exceeds maximum size", 552);
}
await next();
});
The sizeExceeded property is only accurate after consuming the stream. Set FumiOptions.size to enable size tracking.
maxSize Plugin
Fumi includes a built-in plugin for size enforcement:
import { Fumi } from "@puiusabin/fumi";
import { maxSize } from "@puiusabin/fumi/plugins";
const limit = 1_000_000; // 1MB
const app = new Fumi({
authOptional: true,
size: limit // Must match plugin limit
});
app.use(maxSize(limit));
await app.listen(25);
The plugin source shows how to implement size checks:
export function maxSize(bytes: number): Plugin {
return (app) => {
app.onData(async (ctx, next) => {
await next();
await ctx.stream.pipeTo(new WritableStream());
if (ctx.sizeExceeded) {
ctx.reject(`Message exceeds the maximum size of ${bytes} bytes`, 552);
}
});
};
}
The ctx.session object contains connection and envelope data:
interface Session {
id: string; // Unique session ID
secure: boolean; // TLS encrypted connection
remoteAddress: string; // Client IP address
clientHostname: string; // EHLO/HELO hostname
openingCommand: string; // "EHLO" or "HELO"
user: unknown; // Set by ctx.accept() in onAuth
envelope: Envelope; // Mail envelope (from/to)
transmissionType: string; // Transmission protocol
}
interface Envelope {
mailFrom: Address; // MAIL FROM address
rcptTo: Address[]; // RCPT TO addresses
}
interface Address {
address: string; // Email address
args: Record<string, unknown>; // ESMTP parameters
}
app.onData(async (ctx, next) => {
const { envelope, user, remoteAddress } = ctx.session;
console.log("Message from:", envelope.mailFrom.address);
console.log("Message to:", envelope.rcptTo.map(r => r.address));
console.log("Client IP:", remoteAddress);
console.log("Authenticated user:", user);
// Process message...
await next();
});
Real-World Examples
Save Messages to Database
import { Fumi } from "@puiusabin/fumi";
import { db } from "./db";
const app = new Fumi({ authOptional: true });
app.onData(async (ctx, next) => {
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const message = Buffer.concat(chunks).toString();
await db.messages.insert({
sessionId: ctx.session.id,
from: ctx.session.envelope.mailFrom.address,
to: ctx.session.envelope.rcptTo.map(r => r.address),
body: message,
remoteAddress: ctx.session.remoteAddress,
receivedAt: new Date()
});
await next();
});
await app.listen(25);
Parse and Validate Messages
import { simpleParser } from "mailparser";
app.onData(async (ctx, next) => {
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const buffer = Buffer.concat(chunks);
const parsed = await simpleParser(buffer);
// Validate headers
if (!parsed.subject) {
ctx.reject("Subject header required", 550);
}
// Check content
if (parsed.text && parsed.text.length > 10000) {
ctx.reject("Message body too long", 552);
}
// Process attachments
for (const attachment of parsed.attachments || []) {
console.log("Attachment:", attachment.filename);
}
await next();
});
Forward to External Service
app.onData(async (ctx, next) => {
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const message = Buffer.concat(chunks);
// Forward to external API
const response = await fetch("https://api.example.com/inbound", {
method: "POST",
headers: {
"Content-Type": "message/rfc822"
},
body: message
});
if (!response.ok) {
ctx.reject("Failed to process message", 451);
}
await next();
});
Spam Filtering
import { spamCheck } from "./spam-filter";
app.onData(async (ctx, next) => {
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const message = Buffer.concat(chunks).toString();
const spamScore = await spamCheck(message, ctx.session.remoteAddress);
if (spamScore > 5.0) {
console.log(`Rejected spam (score: ${spamScore}) from ${ctx.session.remoteAddress}`);
ctx.reject("Message rejected as spam", 554);
}
await next();
});
Virus Scanning
import { scanForViruses } from "./antivirus";
app.onData(async (ctx, next) => {
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const message = Buffer.concat(chunks);
const scanResult = await scanForViruses(message);
if (scanResult.infected) {
console.log(`Virus detected: ${scanResult.virus} from ${ctx.session.remoteAddress}`);
ctx.reject("Message contains malware", 554);
}
await next();
});
Middleware Chaining
Multiple onData middlewares run in sequence:
// First middleware: Log message info
app.onData(async (ctx, next) => {
console.log("Receiving message from:", ctx.session.envelope.mailFrom.address);
await next();
});
// Second middleware: Process message
app.onData(async (ctx, next) => {
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const message = Buffer.concat(chunks).toString();
await saveMessage(message, ctx.session);
await next();
});
// Third middleware: Log completion
app.onData(async (ctx) => {
console.log("Message processed successfully");
});
The stream can only be consumed once. If you need the message in multiple middlewares, store it in a variable or use ctx.session to pass data between middlewares.
Rejecting Messages
Call ctx.reject() to refuse the message:
app.onData(async (ctx, next) => {
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const message = Buffer.concat(chunks).toString();
// Check for banned content
if (message.includes("banned-word")) {
ctx.reject("Content policy violation", 550);
}
await next();
});
Common SMTP response codes for DATA phase:
250 - Message accepted (automatic on success)
450 - Temporary failure, try again later
451 - Processing error
452 - Insufficient storage
550 - Message rejected
552 - Message too large
554 - Transaction failed
For large messages, process chunks as they arrive instead of buffering:
app.onData(async (ctx, next) => {
let lineCount = 0;
let buffer = "";
for await (const chunk of ctx.stream) {
buffer += chunk.toString();
const lines = buffer.split("\n");
buffer = lines.pop() || "";
lineCount += lines.length;
}
console.log(`Message has ${lineCount} lines`);
await next();
});
Process messages in the background after accepting:
app.onData(async (ctx, next) => {
const chunks: Uint8Array[] = [];
for await (const chunk of ctx.stream) {
chunks.push(chunk);
}
const message = Buffer.concat(chunks);
// Accept immediately
await next();
// Process async (don't await)
processMessage(message, ctx.session).catch(err => {
console.error("Background processing failed:", err);
});
});
Set Appropriate Size Limits
Prevent resource exhaustion:
const app = new Fumi({
authOptional: true,
size: 10_000_000, // 10MB max
maxClients: 50 // Limit concurrent connections
});
Next Steps