Skip to main content
This example demonstrates how to create event-driven workflows for complex multi-step AI applications.

Overview

Workflows allow you to:
  • Create event-driven architectures
  • Manage state across multiple steps
  • Build iterative refinement loops
  • Compose complex AI pipelines
This example builds a joke refinement workflow that iteratively improves a joke based on LLM critique.

Complete Example

joke-workflow.ts
import { openai } from "@llamaindex/openai";
import {
  createStatefulMiddleware,
  createWorkflow,
  workflowEvent,
} from "@llamaindex/workflow";

// Create LLM instance
const llm = openai({ model: "gpt-4.1-mini" });

// Define workflow events
const startEvent = workflowEvent<string>(); // Input topic
const jokeEvent = workflowEvent<{ joke: string }>(); // Generated joke
const critiqueEvent = workflowEvent<{ joke: string; critique: string }>(); // Critique
const resultEvent = workflowEvent<{ joke: string; critique: string }>(); // Final result

// Create stateful workflow
const { withState, getContext } = createStatefulMiddleware(() => ({
  numIterations: 0,
  maxIterations: 3,
}));
const jokeFlow = withState(createWorkflow());

// Handler 1: Generate initial joke
jokeFlow.handle([startEvent], async (context, event) => {
  const prompt = `Write your best joke about ${event.data}. Write the joke between <joke> and </joke> tags.`;
  const response = await llm.complete({ prompt });

  const joke =
    response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ??
    response.text;
  return jokeEvent.with({ joke });
});

// Handler 2: Critique the joke
jokeFlow.handle([jokeEvent], async (context, event) => {
  const prompt = `Give a thorough critique of the following joke. If the joke needs improvement, put "IMPROVE" somewhere in the critique: ${event.data.joke}`;
  const response = await llm.complete({ prompt });

  // Check if improvement needed
  if (response.text.includes("IMPROVE")) {
    return critiqueEvent.with({
      joke: event.data.joke,
      critique: response.text,
    });
  }

  return resultEvent.with({ joke: event.data.joke, critique: response.text });
});

// Handler 3: Improve the joke based on critique
jokeFlow.handle([critiqueEvent], async (context, event) => {
  const state = context.state;
  state.numIterations++;

  const prompt = `Write a new joke based on the following critique and the original joke. Write the joke between <joke> and </joke> tags.\n\nJoke: ${event.data.joke}\n\nCritique: ${event.data.critique}`;
  const response = await llm.complete({ prompt });

  const joke =
    response.text.match(/<joke>([\s\S]*?)<\/joke>/)?.[1]?.trim() ??
    response.text;

  // Check iteration limit
  if (state.numIterations < state.maxIterations) {
    return jokeEvent.with({ joke });
  }

  return resultEvent.with({ joke, critique: event.data.critique });
});

// Usage
async function main() {
  const { stream, sendEvent } = jokeFlow.createContext();
  sendEvent(startEvent.with("pirates"));

  let result: { joke: string; critique: string } | undefined;

  for await (const event of stream) {
    if (resultEvent.include(event)) {
      result = event.data;
      break; // Stop when we get the final result
    }
  }

  console.log(result);
}

main().catch(console.error);

Step-by-Step Explanation

1. Define Events

Events represent data flowing through your workflow:
import { workflowEvent } from "@llamaindex/workflow";

// Define typed events
const startEvent = workflowEvent<string>();
const jokeEvent = workflowEvent<{ joke: string }>();
const critiqueEvent = workflowEvent<{ joke: string; critique: string }>();
const resultEvent = workflowEvent<{ joke: string; critique: string }>();
Each event carries typed data through the workflow.

2. Create Stateful Workflow

Add state management to track workflow progress:
import { createStatefulMiddleware, createWorkflow } from "@llamaindex/workflow";

const { withState } = createStatefulMiddleware(() => ({
  numIterations: 0,
  maxIterations: 3,
}));

const jokeFlow = withState(createWorkflow());

3. Define Event Handlers

Handlers process events and emit new events:
// Handler: startEvent -> jokeEvent
jokeFlow.handle([startEvent], async (context, event) => {
  // Process input
  const topic = event.data;
  
  // Generate output
  const joke = await generateJoke(topic);
  
  // Emit next event
  return jokeEvent.with({ joke });
});

4. Conditional Routing

Route to different events based on conditions:
jokeFlow.handle([jokeEvent], async (context, event) => {
  const critique = await critiqueJoke(event.data.joke);
  
  if (critique.needsImprovement) {
    // Continue refining
    return critiqueEvent.with({
      joke: event.data.joke,
      critique: critique.text,
    });
  }
  
  // Done, return result
  return resultEvent.with({
    joke: event.data.joke,
    critique: critique.text,
  });
});

5. Access State

Read and update workflow state:
jokeFlow.handle([critiqueEvent], async (context, event) => {
  // Access state
  const state = context.state;
  state.numIterations++;
  
  // Use state in logic
  if (state.numIterations < state.maxIterations) {
    return jokeEvent.with({ joke: improvedJoke });
  }
  
  return resultEvent.with({ joke: improvedJoke, critique });
});

6. Execute Workflow

const { stream, sendEvent } = jokeFlow.createContext();

// Start the workflow
sendEvent(startEvent.with("pirates"));

// Process events
for await (const event of stream) {
  if (resultEvent.include(event)) {
    console.log("Final result:", event.data);
    break;
  }
}

Workflow Patterns

Linear Pipeline

Sequential processing steps:
const step1Event = workflowEvent<Data1>();
const step2Event = workflowEvent<Data2>();
const step3Event = workflowEvent<Data3>();

workflow.handle([step1Event], async (ctx, event) => {
  const result = await processStep1(event.data);
  return step2Event.with(result);
});

workflow.handle([step2Event], async (ctx, event) => {
  const result = await processStep2(event.data);
  return step3Event.with(result);
});

Parallel Execution

Process multiple paths simultaneously:
const inputEvent = workflowEvent<Input>();
const pathAEvent = workflowEvent<ResultA>();
const pathBEvent = workflowEvent<ResultB>();
const mergeEvent = workflowEvent<Combined>();

workflow.handle([inputEvent], async (ctx, event) => {
  // Trigger both paths
  ctx.sendEvent(pathAEvent.with(event.data));
  ctx.sendEvent(pathBEvent.with(event.data));
});

Iterative Refinement

Loop until condition met:
workflow.handle([processEvent], async (ctx, event) => {
  const result = await process(event.data);
  
  if (isGoodEnough(result)) {
    return doneEvent.with(result);
  }
  
  // Refine and loop back
  return processEvent.with(refine(result));
});

Error Handling

Handle errors gracefully:
const errorEvent = workflowEvent<Error>();

workflow.handle([processEvent], async (ctx, event) => {
  try {
    const result = await riskyOperation(event.data);
    return successEvent.with(result);
  } catch (error) {
    return errorEvent.with(error);
  }
});

workflow.handle([errorEvent], async (ctx, event) => {
  console.error("Workflow error:", event.data);
  // Recovery logic
});

Running the Example

  1. Install dependencies:
npm install @llamaindex/openai @llamaindex/workflow
  1. Set your API key:
export OPENAI_API_KEY="sk-..."
  1. Run the workflow:
npx tsx joke-workflow.ts

Expected Output

The workflow will:
  1. Generate an initial joke about pirates
  2. Critique the joke
  3. If needed, improve the joke up to 3 times
  4. Return the final joke and critique
{
  "joke": "Why couldn't the pirate play cards? Because he was standing on the deck!",
  "critique": "This is a clever play on words. Well done!"
}

Advanced Features

Timeout Handling

const timeoutEvent = workflowEvent<void>();

const { stream, sendEvent } = workflow.createContext();
sendEvent(startEvent.with(input));

const timeout = setTimeout(() => {
  sendEvent(timeoutEvent.with(undefined));
}, 30000); // 30 second timeout

for await (const event of stream) {
  if (resultEvent.include(event)) {
    clearTimeout(timeout);
    break;
  }
  if (timeoutEvent.include(event)) {
    console.error("Workflow timeout");
    break;
  }
}

Workflow Composition

Combine multiple workflows:
const subWorkflow = createWorkflow();
// Define sub-workflow handlers...

const mainWorkflow = createWorkflow();
mainWorkflow.handle([triggerEvent], async (ctx, event) => {
  const { stream, sendEvent } = subWorkflow.createContext();
  sendEvent(subStartEvent.with(event.data));
  
  for await (const subEvent of stream) {
    if (subResultEvent.include(subEvent)) {
      return nextEvent.with(subEvent.data);
    }
  }
});

Next Steps

Advanced Workflows

Explore the workflows-ts repository

Agents

Combine workflows with agents

RAG Pipeline

Build RAG ingestion workflows

Event Streaming

Stream workflow events to clients

Build docs developers (and LLMs) love