Express Plugin
The@genkit-ai/express plugin provides utilities for exposing Genkit flows and actions as REST APIs via Express HTTP servers. This enables you to serve AI capabilities over HTTP with minimal configuration.
Installation
npm install @genkit-ai/express
Quick Start
Express Handler
Expose a single flow as an HTTP endpoint:import { expressHandler } from '@genkit-ai/express';
import { genkit } from 'genkit';
import { googleAI } from '@genkit-ai/google-genai';
import express from 'express';
const ai = genkit({
plugins: [googleAI()],
});
const simpleFlow = ai.defineFlow(
{ name: 'simpleFlow' },
async (input, { sendChunk }) => {
const { text } = await ai.generate({
model: googleAI.model('gemini-2.5-flash'),
prompt: input,
onChunk: (c) => sendChunk(c.text),
});
return text;
}
);
const app = express();
app.use(express.json());
app.post('/simpleFlow', expressHandler(simpleFlow));
app.listen(8080, () => {
console.log('Server running on http://localhost:8080');
});
Flow Server
Quickly expose multiple flows:import { startFlowServer } from '@genkit-ai/express';
import { genkit } from 'genkit';
const ai = genkit({});
const menuFlow = ai.defineFlow(
{ name: 'menuSuggestionFlow' },
async (restaurantTheme) => {
// Generate menu suggestions
return suggestions;
}
);
const reviewFlow = ai.defineFlow(
{ name: 'reviewAnalysisFlow' },
async (reviews) => {
// Analyze reviews
return analysis;
}
);
startFlowServer({
flows: [menuFlow, reviewFlow],
port: 3500,
});
POST http://localhost:3500/menuSuggestionFlowPOST http://localhost:3500/reviewAnalysisFlow
Features
Streaming Support
Flows automatically support streaming responses:const streamingFlow = ai.defineFlow(
{ name: 'streamingFlow' },
async (input, { sendChunk }) => {
const { text } = await ai.generate({
model: googleAI.model('gemini-2.5-flash'),
prompt: input,
onChunk: (chunk) => {
sendChunk(chunk.text); // Stream to client
},
});
return text;
}
);
app.post('/streaming', expressHandler(streamingFlow));
Authentication
Add authentication using context providers:import { UserFacingError } from 'genkit';
import { ContextProvider, RequestData } from 'genkit/context';
const context: ContextProvider<Context> = (req: RequestData) => {
// Check authorization header
if (req.headers['authorization'] !== 'Bearer secret-token') {
throw new UserFacingError('PERMISSION_DENIED', 'Not authorized');
}
return {
auth: {
user: 'authenticated-user',
roles: ['admin'],
},
};
};
app.post(
'/secureFlow',
expressHandler(myFlow, { context })
);
function authMiddleware(req, res, next) {
const token = req.headers['authorization']?.split(' ')[1];
if (!token || !isValidToken(token)) {
return res.status(401).json({ error: 'Unauthorized' });
}
req.user = decodeToken(token);
next();
}
app.post(
'/protectedFlow',
authMiddleware,
expressHandler(myFlow, {
context: (req) => ({ user: req.user }),
})
);
Durable Streaming (Beta)
Persist stream state for reconnection:import { InMemoryStreamManager } from 'genkit/beta';
// Development: In-memory manager
app.post(
'/durableFlow',
expressHandler(myFlow, {
streamManager: new InMemoryStreamManager(),
})
);
// Production: Firestore or RTDB manager
import { FirestoreStreamManager } from '@genkit-ai/firebase/beta';
import { initializeApp } from 'firebase-admin/app';
import { getFirestore } from 'firebase-admin/firestore';
const fApp = initializeApp();
const firestore = new FirestoreStreamManager({
firebaseApp: fApp,
db: getFirestore(fApp),
collection: 'streams',
});
app.post(
'/productionFlow',
expressHandler(myFlow, {
streamManager: firestore,
})
);
Client Usage
Use the Genkit client library to call flows:import { runFlow, streamFlow } from 'genkit/beta/client';
// Non-streaming call
const result = await runFlow({
url: 'http://localhost:8080/simpleFlow',
input: 'Generate a haiku about coding',
});
console.log(result);
// Streaming call
const streamResult = streamFlow({
url: 'http://localhost:8080/simpleFlow',
input: 'Write a long story',
});
for await (const chunk of streamResult.stream) {
process.stdout.write(chunk);
}
const finalOutput = await streamResult.output;
console.log('\nFinal:', finalOutput);
// With authentication
const authResult = await runFlow({
url: 'http://localhost:8080/secureFlow',
headers: {
Authorization: 'Bearer secret-token',
},
input: 'Protected request',
});
Reconnect to Durable Streams
import { streamFlow } from 'genkit/beta/client';
// Start stream and save ID
const result = streamFlow({
url: 'http://localhost:8080/durableFlow',
input: 'Long-running task',
});
const streamId = await result.streamId;
console.log('Stream ID:', streamId);
// Later, reconnect
const reconnected = streamFlow({
url: 'http://localhost:8080/durableFlow',
streamId: streamId,
});
for await (const chunk of reconnected.stream) {
console.log('Resumed:', chunk);
}
Configuration
Flow Server Options
startFlowServer({
flows: [flow1, flow2],
port: 4567, // Server port (default: 3500)
cors: { // CORS configuration
origin: '*',
methods: ['POST'],
credentials: true,
},
pathPrefix: '/api/v1', // URL prefix for all flows
});
Express Handler Options
express.post(
'/myFlow',
expressHandler(myFlow, {
context: contextProvider, // Auth/context provider
streamManager: streamManager, // Durable streaming
})
);
Complete Examples
REST API with Multiple Endpoints
import express from 'express';
import { expressHandler } from '@genkit-ai/express';
import { genkit, z } from 'genkit';
import { googleAI } from '@genkit-ai/google-genai';
const ai = genkit({
plugins: [googleAI()],
});
// Define flows
const summarize = ai.defineFlow(
{
name: 'summarize',
inputSchema: z.object({ text: z.string() }),
outputSchema: z.string(),
},
async ({ text }) => {
const { text: summary } = await ai.generate({
model: googleAI.model('gemini-2.5-flash'),
prompt: `Summarize: ${text}`,
});
return summary;
}
);
const translate = ai.defineFlow(
{
name: 'translate',
inputSchema: z.object({
text: z.string(),
targetLang: z.string(),
}),
outputSchema: z.string(),
},
async ({ text, targetLang }) => {
const { text: translation } = await ai.generate({
model: googleAI.model('gemini-2.5-flash'),
prompt: `Translate to ${targetLang}: ${text}`,
});
return translation;
}
);
// Create Express app
const app = express();
app.use(express.json());
// Mount endpoints
app.post('/api/summarize', expressHandler(summarize));
app.post('/api/translate', expressHandler(translate));
// Health check
app.get('/health', (req, res) => {
res.json({ status: 'ok' });
});
app.listen(3000, () => {
console.log('API server running on http://localhost:3000');
});
Authenticated API with Rate Limiting
import express from 'express';
import { expressHandler } from '@genkit-ai/express';
import rateLimit from 'express-rate-limit';
import { UserFacingError } from 'genkit';
const app = express();
app.use(express.json());
// Rate limiting
const limiter = rateLimit({
windowMs: 15 * 60 * 1000, // 15 minutes
max: 100, // Max 100 requests per window
});
app.use('/api/', limiter);
// Authentication context
const authContext = (req) => {
const apiKey = req.headers['x-api-key'];
if (!apiKey || !isValidApiKey(apiKey)) {
throw new UserFacingError('UNAUTHENTICATED', 'Invalid API key');
}
const user = getUserFromApiKey(apiKey);
if (!user.hasAccess) {
throw new UserFacingError('PERMISSION_DENIED', 'Access denied');
}
return { user };
};
// Protected endpoint
app.post(
'/api/generate',
expressHandler(generateFlow, { context: authContext })
);
app.listen(3000);
Production Setup with CORS and Helmet
import express from 'express';
import cors from 'cors';
import helmet from 'helmet';
import { expressHandler } from '@genkit-ai/express';
const app = express();
// Security middleware
app.use(helmet());
app.use(cors({
origin: process.env.ALLOWED_ORIGINS?.split(','),
credentials: true,
}));
app.use(express.json({ limit: '10mb' }));
// Logging
app.use((req, res, next) => {
console.log(`${req.method} ${req.path}`);
next();
});
// Flow endpoints
app.post('/api/flow1', expressHandler(flow1));
app.post('/api/flow2', expressHandler(flow2));
// Error handling
app.use((err, req, res, next) => {
console.error('Error:', err);
res.status(500).json({
error: err.message || 'Internal server error',
});
});
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
Best Practices
Error Handling
import { UserFacingError } from 'genkit';
const safeFlow = ai.defineFlow(
{ name: 'safeFlow' },
async (input) => {
try {
const result = await processInput(input);
return result;
} catch (error) {
// User-facing errors are returned to client
throw new UserFacingError(
'INVALID_ARGUMENT',
'Input validation failed'
);
}
}
);
Input Validation
import { z } from 'genkit';
const validatedFlow = ai.defineFlow(
{
name: 'validatedFlow',
inputSchema: z.object({
text: z.string().min(1).max(10000),
language: z.enum(['en', 'es', 'fr']),
options: z.object({
temperature: z.number().min(0).max(1).optional(),
}).optional(),
}),
outputSchema: z.string(),
},
async (input) => {
// Input is automatically validated
return processedResult;
}
);
Monitoring
import { expressHandler } from '@genkit-ai/express';
const monitoredFlow = ai.defineFlow(
{ name: 'monitoredFlow' },
async (input) => {
const startTime = Date.now();
try {
const result = await processInput(input);
// Log success metrics
console.log({
flow: 'monitoredFlow',
duration: Date.now() - startTime,
status: 'success',
});
return result;
} catch (error) {
// Log error metrics
console.error({
flow: 'monitoredFlow',
duration: Date.now() - startTime,
status: 'error',
error: error.message,
});
throw error;
}
}
);