Flows API
Flows are the primary mechanism for orchestrating multi-step AI tasks in Genkit. Each flow run is automatically traced for observability.
defineFlow()
Defines and registers a flow function.
Signature:
defineFlow<
I extends z.ZodTypeAny = z.ZodTypeAny,
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny
>(
registry: Registry,
config: FlowConfig<I, O, S> | string,
fn: FlowFn<I, O, S>
): Action<I, O, S>
Parameters
config
FlowConfig | string
required
Flow configuration or simple name stringShow FlowConfig properties
Unique identifier for the flow
Zod schema for input validation
Zod schema for output validation
Schema for streaming chunks (for streaming flows)
Human-readable description of the flow’s purpose
Middleware to apply to the flow execution
The flow implementation functiontype FlowFn<I, O, S> = (
input: z.infer<I>,
streamingCallback?: StreamingCallback<z.infer<S>>
) => Promise<z.infer<O>>
Returns
A registered flow action that can be invoked
Execute the flow with input and return output
stream()
(input: I) => StreamingResponse<S, O>
Execute the flow with streaming (if streamSchema is defined)
Metadata about the action (name, schemas, etc.)
Example
import { defineFlow } from '@genkit-ai/core';
import { z } from 'zod';
const menuSuggestionFlow = defineFlow(
registry,
{
name: 'menuSuggestionFlow',
inputSchema: z.string(),
outputSchema: z.string(),
description: 'Suggests menu items for themed restaurants',
},
async (subject) => {
const llmResponse = await generate(registry, {
prompt: `Suggest an item for the menu of a ${subject} themed restaurant`,
});
return llmResponse.text;
}
);
// Execute the flow
const suggestion = await menuSuggestionFlow.run('pirate');
console.log(suggestion);
Streaming Flows
Flows can support streaming by providing a streamSchema and using the streaming callback.
Example
const streamingFlow = defineFlow(
registry,
{
name: 'streamingFlow',
inputSchema: z.string(),
outputSchema: z.string(),
streamSchema: z.string(), // Chunks are strings
},
async (input, streamingCallback) => {
if (streamingCallback) {
// Stream mode
const { response, stream } = generateStream(registry, {
prompt: input,
});
for await (const chunk of stream) {
await streamingCallback(chunk.text);
}
return (await response).text;
} else {
// Non-streaming mode
const response = await generate(registry, { prompt: input });
return response.text;
}
}
);
// Use with streaming
const { response, stream } = streamingFlow.stream('Tell me a story');
for await (const chunk of stream) {
process.stdout.write(chunk);
}
const final = await response;
run()
Executes a function within a flow context, creating a distinct trace span. Used to add observability to sub-operations.
Signature:
run<T>(name: string, func: () => Promise<T>): Promise<T>
run<T>(name: string, input: any, func: (input?: any) => Promise<T>): Promise<T>
Parameters
Optional input to pass to the function
Function to execute within the traced span
Returns
The result of the function execution
Example
const complexFlow = defineFlow(
registry,
{
name: 'complexTask',
inputSchema: z.string(),
outputSchema: z.object({
processed: z.string(),
summary: z.string(),
}),
},
async (input) => {
// Step 1: Process input (traced)
const processed = await run('process-input', async () => {
return input.toUpperCase();
});
// Step 2: Generate summary (traced)
const summary = await run('generate-summary', async () => {
const response = await generate(registry, {
prompt: `Summarize: ${processed}`,
});
return response.text;
});
return { processed, summary };
}
);
Flow Types
FlowFn
The function signature for flow implementations.
type FlowFn<
I extends z.ZodTypeAny = z.ZodTypeAny,
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny
> = (
input: z.infer<I>,
streamingCallback?: StreamingCallback<z.infer<S>>
) => Promise<z.infer<O>>
StreamingCallback
Callback for streaming chunks.
type StreamingCallback<T> = (chunk: T) => void | Promise<void>
FlowConfig
Configuration for defining a flow.
interface FlowConfig<
I extends z.ZodTypeAny = z.ZodTypeAny,
O extends z.ZodTypeAny = z.ZodTypeAny,
S extends z.ZodTypeAny = z.ZodTypeAny
> {
name: string;
inputSchema?: I;
outputSchema?: O;
streamSchema?: S;
description?: string;
middleware?: Middleware[];
}
Usage with Genkit Instance
When using the Genkit class, flows are defined through the instance:
import { genkit } from 'genkit';
import { z } from 'zod';
const ai = genkit({ plugins: [googleAI()] });
const myFlow = ai.defineFlow(
{
name: 'myFlow',
inputSchema: z.object({ query: z.string() }),
outputSchema: z.object({ answer: z.string() }),
},
async (input) => {
const response = await ai.generate({
prompt: input.query,
});
return { answer: response.text };
}
);
// Run the flow
const result = await myFlow({ query: 'What is AI?' });
console.log(result.answer);
Production Deployment
Flows can be deployed as HTTP endpoints:
import { startFlowServer } from '@genkit-ai/flow';
startFlowServer({
flows: [menuSuggestionFlow, complexFlow],
port: 3000,
});
This creates endpoints:
POST /menuSuggestionFlow
POST /complexFlow
See Also