Quick Reference
// Non-streaming flow
myFlow := genkit.DefineFlow(g, "myFlow",
func(ctx context.Context, input string) (string, error) {
// Flow implementation
return output, nil
},
)
result, err := myFlow.Run(ctx, "input")
// Streaming flow
streamFlow := genkit.DefineStreamingFlow(g, "streamFlow",
func(ctx context.Context, input int, stream func(context.Context, int) error) (string, error) {
for i := 0; i < input; i++ {
if err := stream(ctx, i); err != nil {
return "", err
}
}
return "done", nil
},
)
for result := range streamFlow.Stream(ctx, 5) {
if result.Done {
fmt.Println(result.Output)
} else {
fmt.Println(result.Stream)
}
}
Flow Methods
Run()
result, err := flow.Run(ctx, input)
Stream()
for result := range flow.Stream(ctx, input) {
// Process streaming results
}
Tracing with Run()
genkit.DefineFlow(g, "tracedFlow",
func(ctx context.Context, input string) (string, error) {
// Traced step 1
processed, err := genkit.Run(ctx, "process", func() (string, error) {
return strings.ToUpper(input), nil
})
if err != nil {
return "", err
}
// Traced step 2
result, err := genkit.Run(ctx, "generate", func() (string, error) {
resp, err := genkit.Generate(ctx, g,
ai.WithPrompt(processed),
)
if err != nil {
return "", err
}
return resp.Text(), nil
})
return result, nil
},
)