Skip to main content

Overview

Events are the core communication mechanism in workflows. They enable decoupled, composable workflow steps.

Built-in Events

StartEvent

Initiates workflow execution:
import { StartEvent } from "@llamaindex/workflow";

class MyWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    console.log("Started with:", ev.data);
    // ...
  }
}

const workflow = new MyWorkflow();
await workflow.run({ data: "initial data" });

StopEvent

Terminates workflow and returns result:
import { StopEvent } from "@llamaindex/workflow";

class MyWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    // Process data
    const result = await processData(ev.data);
    
    // Stop and return result
    return new StopEvent({ result });
  }
}

Custom Events

Using zodEvent

Create type-safe events with Zod schemas:
import { zodEvent } from "@llamaindex/workflow";
import { z } from "zod";

const DataProcessedEvent = zodEvent("DataProcessed", {
  data: z.string(),
  timestamp: z.string(),
  status: z.enum(["success", "error"])
});

const UserInputEvent = zodEvent("UserInput", {
  message: z.string(),
  userId: z.string()
});

Emitting Events

class MyWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    // Process data
    const processedData = await this.process(ev.data);
    
    // Emit event
    ctx.emit(new DataProcessedEvent({
      data: processedData,
      timestamp: new Date().toISOString(),
      status: "success"
    }));
    
    return new StopEvent({ result: processedData });
  }
}

Event Handlers

Handle events in workflow steps:
import { zodEvent } from "@llamaindex/workflow";
import { z } from "zod";

const QueryEvent = zodEvent("Query", {
  query: z.string()
});

const RetrievalEvent = zodEvent("Retrieval", {
  nodes: z.array(z.any())
});

const ResponseEvent = zodEvent("Response", {
  response: z.string()
});

class RAGWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    // Emit query event
    ctx.emit(new QueryEvent({ query: ev.query }));
    
    // Retrieve nodes
    const nodes = await this.retrieve(ev.query);
    ctx.emit(new RetrievalEvent({ nodes }));
    
    // Generate response
    const response = await this.generate(nodes, ev.query);
    ctx.emit(new ResponseEvent({ response }));
    
    return new StopEvent({ result: response });
  }
  
  private async retrieve(query: string) {
    // Retrieval logic
    return [];
  }
  
  private async generate(nodes: any[], query: string) {
    // Generation logic
    return "Generated response";
  }
}

Event Flow

import { zodEvent } from "@llamaindex/workflow";
import { z } from "zod";

const Step1Event = zodEvent("Step1", { data: z.string() });
const Step2Event = zodEvent("Step2", { data: z.string() });
const Step3Event = zodEvent("Step3", { data: z.string() });

class PipelineWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    // Step 1
    const step1Result = await this.step1(ev.data);
    ctx.emit(new Step1Event({ data: step1Result }));
    
    // Step 2
    const step2Result = await this.step2(step1Result);
    ctx.emit(new Step2Event({ data: step2Result }));
    
    // Step 3
    const step3Result = await this.step3(step2Result);
    ctx.emit(new Step3Event({ data: step3Result }));
    
    return new StopEvent({ result: step3Result });
  }
  
  private async step1(data: any) {
    return `Step 1: ${data}`;
  }
  
  private async step2(data: any) {
    return `Step 2: ${data}`;
  }
  
  private async step3(data: any) {
    return `Step 3: ${data}`;
  }
}

Error Events

Handle errors with custom error events:
const ErrorEvent = zodEvent("Error", {
  message: z.string(),
  code: z.string(),
  details: z.any().optional()
});

class ErrorHandlingWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    try {
      const result = await this.riskyOperation(ev.data);
      return new StopEvent({ result });
    } catch (error) {
      ctx.emit(new ErrorEvent({
        message: error.message,
        code: "OPERATION_FAILED",
        details: error
      }));
      
      return new StopEvent({
        result: null,
        error: error.message
      });
    }
  }
  
  private async riskyOperation(data: any) {
    // Operation that might fail
    throw new Error("Something went wrong");
  }
}

Event Listeners

Listen to events outside the workflow:
const workflow = new MyWorkflow();

// Listen to events
workflow.on("DataProcessed", (event) => {
  console.log("Data processed:", event.data);
});

workflow.on("Error", (event) => {
  console.error("Error occurred:", event.message);
});

const result = await workflow.run({ data: "input" });

Complex Event Patterns

Conditional Events

class ConditionalWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    const { action } = ev;
    
    if (action === "search") {
      ctx.emit(new SearchEvent({ query: ev.query }));
      const results = await this.search(ev.query);
      return new StopEvent({ result: results });
    } else if (action === "analyze") {
      ctx.emit(new AnalyzeEvent({ data: ev.data }));
      const analysis = await this.analyze(ev.data);
      return new StopEvent({ result: analysis });
    }
    
    return new StopEvent({ result: null });
  }
}

Parallel Events

class ParallelWorkflow extends Workflow {
  async run(ctx: Context, ev: StartEvent) {
    // Emit multiple events
    ctx.emit(new Task1Event({ data: ev.data }));
    ctx.emit(new Task2Event({ data: ev.data }));
    ctx.emit(new Task3Event({ data: ev.data }));
    
    // Execute tasks in parallel
    const [result1, result2, result3] = await Promise.all([
      this.task1(ev.data),
      this.task2(ev.data),
      this.task3(ev.data)
    ]);
    
    return new StopEvent({
      result: { result1, result2, result3 }
    });
  }
}

Event Schema Validation

Zod automatically validates event data:
const UserEvent = zodEvent("User", {
  name: z.string().min(1),
  age: z.number().min(0).max(150),
  email: z.string().email()
});

// Valid event
ctx.emit(new UserEvent({
  name: "John",
  age: 30,
  email: "[email protected]"
}));

// Invalid event - will throw error
try {
  ctx.emit(new UserEvent({
    name: "",  // Empty string not allowed
    age: -5,   // Negative age not allowed
    email: "invalid"  // Invalid email format
  }));
} catch (error) {
  console.error("Validation error:", error);
}

Best Practices

  1. Use descriptive event names: Clear, action-oriented names
  2. Validate with Zod: Type-safe events prevent runtime errors
  3. Emit at key milestones: Track workflow progress
  4. Handle errors: Create error events for failures
  5. Document event flow: Comment event sequences
  6. Keep events focused: One event per logical step

See Also

Build docs developers (and LLMs) love