Skip to main content

Express Plugin

The @genkit-ai/express plugin provides utilities for exposing Genkit flows and actions as REST APIs via Express HTTP servers. This enables you to serve AI capabilities over HTTP with minimal configuration.

Installation

npm install @genkit-ai/express

Quick Start

Express Handler

Expose a single flow as an HTTP endpoint:
import { expressHandler } from '@genkit-ai/express';
import { genkit } from 'genkit';
import { googleAI } from '@genkit-ai/google-genai';
import express from 'express';

const ai = genkit({
  plugins: [googleAI()],
});

const simpleFlow = ai.defineFlow(
  { name: 'simpleFlow' },
  async (input, { sendChunk }) => {
    const { text } = await ai.generate({
      model: googleAI.model('gemini-2.5-flash'),
      prompt: input,
      onChunk: (c) => sendChunk(c.text),
    });
    return text;
  }
);

const app = express();
app.use(express.json());

app.post('/simpleFlow', expressHandler(simpleFlow));

app.listen(8080, () => {
  console.log('Server running on http://localhost:8080');
});

Flow Server

Quickly expose multiple flows:
import { startFlowServer } from '@genkit-ai/express';
import { genkit } from 'genkit';

const ai = genkit({});

const menuFlow = ai.defineFlow(
  { name: 'menuSuggestionFlow' },
  async (restaurantTheme) => {
    // Generate menu suggestions
    return suggestions;
  }
);

const reviewFlow = ai.defineFlow(
  { name: 'reviewAnalysisFlow' },
  async (reviews) => {
    // Analyze reviews
    return analysis;
  }
);

startFlowServer({
  flows: [menuFlow, reviewFlow],
  port: 3500,
});
Flows are automatically exposed at:
  • POST http://localhost:3500/menuSuggestionFlow
  • POST http://localhost:3500/reviewAnalysisFlow

Features

Streaming Support

Flows automatically support streaming responses:
const streamingFlow = ai.defineFlow(
  { name: 'streamingFlow' },
  async (input, { sendChunk }) => {
    const { text } = await ai.generate({
      model: googleAI.model('gemini-2.5-flash'),
      prompt: input,
      onChunk: (chunk) => {
        sendChunk(chunk.text);  // Stream to client
      },
    });
    return text;
  }
);

app.post('/streaming', expressHandler(streamingFlow));

Authentication

Add authentication using context providers:
import { UserFacingError } from 'genkit';
import { ContextProvider, RequestData } from 'genkit/context';

const context: ContextProvider<Context> = (req: RequestData) => {
  // Check authorization header
  if (req.headers['authorization'] !== 'Bearer secret-token') {
    throw new UserFacingError('PERMISSION_DENIED', 'Not authorized');
  }
  
  return {
    auth: {
      user: 'authenticated-user',
      roles: ['admin'],
    },
  };
};

app.post(
  '/secureFlow',
  expressHandler(myFlow, { context })
);
Custom authentication middleware:
function authMiddleware(req, res, next) {
  const token = req.headers['authorization']?.split(' ')[1];
  
  if (!token || !isValidToken(token)) {
    return res.status(401).json({ error: 'Unauthorized' });
  }
  
  req.user = decodeToken(token);
  next();
}

app.post(
  '/protectedFlow',
  authMiddleware,
  expressHandler(myFlow, {
    context: (req) => ({ user: req.user }),
  })
);

Durable Streaming (Beta)

Persist stream state for reconnection:
import { InMemoryStreamManager } from 'genkit/beta';

// Development: In-memory manager
app.post(
  '/durableFlow',
  expressHandler(myFlow, {
    streamManager: new InMemoryStreamManager(),
  })
);

// Production: Firestore or RTDB manager
import { FirestoreStreamManager } from '@genkit-ai/firebase/beta';
import { initializeApp } from 'firebase-admin/app';
import { getFirestore } from 'firebase-admin/firestore';

const fApp = initializeApp();
const firestore = new FirestoreStreamManager({
  firebaseApp: fApp,
  db: getFirestore(fApp),
  collection: 'streams',
});

app.post(
  '/productionFlow',
  expressHandler(myFlow, {
    streamManager: firestore,
  })
);

Client Usage

Use the Genkit client library to call flows:
import { runFlow, streamFlow } from 'genkit/beta/client';

// Non-streaming call
const result = await runFlow({
  url: 'http://localhost:8080/simpleFlow',
  input: 'Generate a haiku about coding',
});

console.log(result);

// Streaming call
const streamResult = streamFlow({
  url: 'http://localhost:8080/simpleFlow',
  input: 'Write a long story',
});

for await (const chunk of streamResult.stream) {
  process.stdout.write(chunk);
}

const finalOutput = await streamResult.output;
console.log('\nFinal:', finalOutput);

// With authentication
const authResult = await runFlow({
  url: 'http://localhost:8080/secureFlow',
  headers: {
    Authorization: 'Bearer secret-token',
  },
  input: 'Protected request',
});

Reconnect to Durable Streams

import { streamFlow } from 'genkit/beta/client';

// Start stream and save ID
const result = streamFlow({
  url: 'http://localhost:8080/durableFlow',
  input: 'Long-running task',
});

const streamId = await result.streamId;
console.log('Stream ID:', streamId);

// Later, reconnect
const reconnected = streamFlow({
  url: 'http://localhost:8080/durableFlow',
  streamId: streamId,
});

for await (const chunk of reconnected.stream) {
  console.log('Resumed:', chunk);
}

Configuration

Flow Server Options

startFlowServer({
  flows: [flow1, flow2],
  port: 4567,                    // Server port (default: 3500)
  cors: {                        // CORS configuration
    origin: '*',
    methods: ['POST'],
    credentials: true,
  },
  pathPrefix: '/api/v1',         // URL prefix for all flows
});

Express Handler Options

express.post(
  '/myFlow',
  expressHandler(myFlow, {
    context: contextProvider,         // Auth/context provider
    streamManager: streamManager,     // Durable streaming
  })
);

Complete Examples

REST API with Multiple Endpoints

import express from 'express';
import { expressHandler } from '@genkit-ai/express';
import { genkit, z } from 'genkit';
import { googleAI } from '@genkit-ai/google-genai';

const ai = genkit({
  plugins: [googleAI()],
});

// Define flows
const summarize = ai.defineFlow(
  {
    name: 'summarize',
    inputSchema: z.object({ text: z.string() }),
    outputSchema: z.string(),
  },
  async ({ text }) => {
    const { text: summary } = await ai.generate({
      model: googleAI.model('gemini-2.5-flash'),
      prompt: `Summarize: ${text}`,
    });
    return summary;
  }
);

const translate = ai.defineFlow(
  {
    name: 'translate',
    inputSchema: z.object({
      text: z.string(),
      targetLang: z.string(),
    }),
    outputSchema: z.string(),
  },
  async ({ text, targetLang }) => {
    const { text: translation } = await ai.generate({
      model: googleAI.model('gemini-2.5-flash'),
      prompt: `Translate to ${targetLang}: ${text}`,
    });
    return translation;
  }
);

// Create Express app
const app = express();
app.use(express.json());

// Mount endpoints
app.post('/api/summarize', expressHandler(summarize));
app.post('/api/translate', expressHandler(translate));

// Health check
app.get('/health', (req, res) => {
  res.json({ status: 'ok' });
});

app.listen(3000, () => {
  console.log('API server running on http://localhost:3000');
});

Authenticated API with Rate Limiting

import express from 'express';
import { expressHandler } from '@genkit-ai/express';
import rateLimit from 'express-rate-limit';
import { UserFacingError } from 'genkit';

const app = express();
app.use(express.json());

// Rate limiting
const limiter = rateLimit({
  windowMs: 15 * 60 * 1000,  // 15 minutes
  max: 100,                   // Max 100 requests per window
});

app.use('/api/', limiter);

// Authentication context
const authContext = (req) => {
  const apiKey = req.headers['x-api-key'];
  
  if (!apiKey || !isValidApiKey(apiKey)) {
    throw new UserFacingError('UNAUTHENTICATED', 'Invalid API key');
  }
  
  const user = getUserFromApiKey(apiKey);
  
  if (!user.hasAccess) {
    throw new UserFacingError('PERMISSION_DENIED', 'Access denied');
  }
  
  return { user };
};

// Protected endpoint
app.post(
  '/api/generate',
  expressHandler(generateFlow, { context: authContext })
);

app.listen(3000);

Production Setup with CORS and Helmet

import express from 'express';
import cors from 'cors';
import helmet from 'helmet';
import { expressHandler } from '@genkit-ai/express';

const app = express();

// Security middleware
app.use(helmet());
app.use(cors({
  origin: process.env.ALLOWED_ORIGINS?.split(','),
  credentials: true,
}));
app.use(express.json({ limit: '10mb' }));

// Logging
app.use((req, res, next) => {
  console.log(`${req.method} ${req.path}`);
  next();
});

// Flow endpoints
app.post('/api/flow1', expressHandler(flow1));
app.post('/api/flow2', expressHandler(flow2));

// Error handling
app.use((err, req, res, next) => {
  console.error('Error:', err);
  res.status(500).json({
    error: err.message || 'Internal server error',
  });
});

const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
  console.log(`Server running on port ${PORT}`);
});

Best Practices

Error Handling

import { UserFacingError } from 'genkit';

const safeFlow = ai.defineFlow(
  { name: 'safeFlow' },
  async (input) => {
    try {
      const result = await processInput(input);
      return result;
    } catch (error) {
      // User-facing errors are returned to client
      throw new UserFacingError(
        'INVALID_ARGUMENT',
        'Input validation failed'
      );
    }
  }
);

Input Validation

import { z } from 'genkit';

const validatedFlow = ai.defineFlow(
  {
    name: 'validatedFlow',
    inputSchema: z.object({
      text: z.string().min(1).max(10000),
      language: z.enum(['en', 'es', 'fr']),
      options: z.object({
        temperature: z.number().min(0).max(1).optional(),
      }).optional(),
    }),
    outputSchema: z.string(),
  },
  async (input) => {
    // Input is automatically validated
    return processedResult;
  }
);

Monitoring

import { expressHandler } from '@genkit-ai/express';

const monitoredFlow = ai.defineFlow(
  { name: 'monitoredFlow' },
  async (input) => {
    const startTime = Date.now();
    
    try {
      const result = await processInput(input);
      
      // Log success metrics
      console.log({
        flow: 'monitoredFlow',
        duration: Date.now() - startTime,
        status: 'success',
      });
      
      return result;
    } catch (error) {
      // Log error metrics
      console.error({
        flow: 'monitoredFlow',
        duration: Date.now() - startTime,
        status: 'error',
        error: error.message,
      });
      throw error;
    }
  }
);

Build docs developers (and LLMs) love