Skip to main content

Flows API

Flows are the primary mechanism for orchestrating multi-step AI tasks in Genkit. Each flow run is automatically traced for observability.

defineFlow()

Defines and registers a flow function. Signature:
defineFlow<
  I extends z.ZodTypeAny = z.ZodTypeAny,
  O extends z.ZodTypeAny = z.ZodTypeAny,
  S extends z.ZodTypeAny = z.ZodTypeAny
>(
  registry: Registry,
  config: FlowConfig<I, O, S> | string,
  fn: FlowFn<I, O, S>
): Action<I, O, S>

Parameters

config
FlowConfig | string
required
Flow configuration or simple name string
fn
FlowFn<I, O, S>
required
The flow implementation function
type FlowFn<I, O, S> = (
  input: z.infer<I>,
  streamingCallback?: StreamingCallback<z.infer<S>>
) => Promise<z.infer<O>>

Returns

action
Action<I, O, S>
A registered flow action that can be invoked

Example

import { defineFlow } from '@genkit-ai/core';
import { z } from 'zod';

const menuSuggestionFlow = defineFlow(
  registry,
  {
    name: 'menuSuggestionFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
    description: 'Suggests menu items for themed restaurants',
  },
  async (subject) => {
    const llmResponse = await generate(registry, {
      prompt: `Suggest an item for the menu of a ${subject} themed restaurant`,
    });
    return llmResponse.text;
  }
);

// Execute the flow
const suggestion = await menuSuggestionFlow.run('pirate');
console.log(suggestion);

Streaming Flows

Flows can support streaming by providing a streamSchema and using the streaming callback.

Example

const streamingFlow = defineFlow(
  registry,
  {
    name: 'streamingFlow',
    inputSchema: z.string(),
    outputSchema: z.string(),
    streamSchema: z.string(), // Chunks are strings
  },
  async (input, streamingCallback) => {
    if (streamingCallback) {
      // Stream mode
      const { response, stream } = generateStream(registry, {
        prompt: input,
      });

      for await (const chunk of stream) {
        await streamingCallback(chunk.text);
      }

      return (await response).text;
    } else {
      // Non-streaming mode
      const response = await generate(registry, { prompt: input });
      return response.text;
    }
  }
);

// Use with streaming
const { response, stream } = streamingFlow.stream('Tell me a story');

for await (const chunk of stream) {
  process.stdout.write(chunk);
}

const final = await response;

run()

Executes a function within a flow context, creating a distinct trace span. Used to add observability to sub-operations. Signature:
run<T>(name: string, func: () => Promise<T>): Promise<T>
run<T>(name: string, input: any, func: (input?: any) => Promise<T>): Promise<T>

Parameters

name
string
required
Label for the trace span
input
any
Optional input to pass to the function
func
Function
required
Function to execute within the traced span

Returns

result
T
The result of the function execution

Example

const complexFlow = defineFlow(
  registry,
  {
    name: 'complexTask',
    inputSchema: z.string(),
    outputSchema: z.object({
      processed: z.string(),
      summary: z.string(),
    }),
  },
  async (input) => {
    // Step 1: Process input (traced)
    const processed = await run('process-input', async () => {
      return input.toUpperCase();
    });

    // Step 2: Generate summary (traced)
    const summary = await run('generate-summary', async () => {
      const response = await generate(registry, {
        prompt: `Summarize: ${processed}`,
      });
      return response.text;
    });

    return { processed, summary };
  }
);

Flow Types

FlowFn

The function signature for flow implementations.
type FlowFn<
  I extends z.ZodTypeAny = z.ZodTypeAny,
  O extends z.ZodTypeAny = z.ZodTypeAny,
  S extends z.ZodTypeAny = z.ZodTypeAny
> = (
  input: z.infer<I>,
  streamingCallback?: StreamingCallback<z.infer<S>>
) => Promise<z.infer<O>>

StreamingCallback

Callback for streaming chunks.
type StreamingCallback<T> = (chunk: T) => void | Promise<void>

FlowConfig

Configuration for defining a flow.
interface FlowConfig<
  I extends z.ZodTypeAny = z.ZodTypeAny,
  O extends z.ZodTypeAny = z.ZodTypeAny,
  S extends z.ZodTypeAny = z.ZodTypeAny
> {
  name: string;
  inputSchema?: I;
  outputSchema?: O;
  streamSchema?: S;
  description?: string;
  middleware?: Middleware[];
}

Usage with Genkit Instance

When using the Genkit class, flows are defined through the instance:
import { genkit } from 'genkit';
import { z } from 'zod';

const ai = genkit({ plugins: [googleAI()] });

const myFlow = ai.defineFlow(
  {
    name: 'myFlow',
    inputSchema: z.object({ query: z.string() }),
    outputSchema: z.object({ answer: z.string() }),
  },
  async (input) => {
    const response = await ai.generate({
      prompt: input.query,
    });
    return { answer: response.text };
  }
);

// Run the flow
const result = await myFlow({ query: 'What is AI?' });
console.log(result.answer);

Production Deployment

Flows can be deployed as HTTP endpoints:
import { startFlowServer } from '@genkit-ai/flow';

startFlowServer({
  flows: [menuSuggestionFlow, complexFlow],
  port: 3000,
});
This creates endpoints:
  • POST /menuSuggestionFlow
  • POST /complexFlow

See Also

Build docs developers (and LLMs) love