Skip to main content

Overview

Credo uses an event-driven architecture to notify your application about important state changes and actions. The EventEmitter provides both callback-based and reactive (RxJS Observable) interfaces.

EventEmitter

The EventEmitter is injected into services and wraps Node.js’s native EventEmitter with Credo-specific functionality.
import { EventEmitter, injectable, inject, InjectionSymbols } from '@credo-ts/core'

@injectable()
export class MyService {
  private eventEmitter: EventEmitter

  constructor(@inject(InjectionSymbols.EventEmitter) eventEmitter: EventEmitter) {
    this.eventEmitter = eventEmitter
  }
}

Event Structure

All events follow the BaseEvent interface:
export interface BaseEvent {
  type: string                          // Event type identifier
  payload: Record<string, unknown>      // Event-specific data
  metadata: EventMetadata               // Context information
}

export interface EventMetadata {
  contextCorrelationId: string          // Agent context ID
}

Listening to Events

Using Callbacks

The simplest way to listen to events:
import { Agent, ConnectionEventTypes, ConnectionStateChangedEvent } from '@credo-ts/core'

const agent = new Agent({ /* config */ })
await agent.initialize()

// Listen for connection state changes
agent.events.on<ConnectionStateChangedEvent>(
  ConnectionEventTypes.ConnectionStateChanged,
  (event) => {
    console.log('Connection state changed:', event.payload.connectionRecord.state)
    console.log('Context:', event.metadata.contextCorrelationId)
  }
)

Removing Listeners

const listener = (event: ConnectionStateChangedEvent) => {
  console.log('Connection state:', event.payload.connectionRecord.state)
}

// Add listener
agent.events.on(ConnectionEventTypes.ConnectionStateChanged, listener)

// Remove listener
agent.events.off(ConnectionEventTypes.ConnectionStateChanged, listener)

Using Observables

For more advanced reactive patterns, use RxJS observables:
import { filter, map } from 'rxjs/operators'

// Create observable
const connection$ = agent.events.observable<ConnectionStateChangedEvent>(
  ConnectionEventTypes.ConnectionStateChanged
)

// Subscribe with operators
connection$
  .pipe(
    filter(event => event.payload.connectionRecord.state === 'completed'),
    map(event => event.payload.connectionRecord)
  )
  .subscribe((connection) => {
    console.log('Connection completed:', connection.id)
  })

Filter by Context

In multi-tenant scenarios, filter events by context:
import { filterContextCorrelationId } from '@credo-ts/core'

const contextId = agentContext.contextCorrelationId

agent.events.observable<ConnectionStateChangedEvent>(
  ConnectionEventTypes.ConnectionStateChanged
)
  .pipe(
    filterContextCorrelationId(contextId),
    filter(event => event.payload.connectionRecord.state === 'completed')
  )
  .subscribe((event) => {
    console.log('Connection completed in context:', contextId)
  })

Emitting Events

When creating custom services or modules, emit events to notify other parts of the application:
import { EventEmitter, AgentContext, BaseEvent } from '@credo-ts/core'

// Define your event type
export enum MyServiceEventTypes {
  DataProcessed = 'MyServiceDataProcessed',
  ErrorOccurred = 'MyServiceErrorOccurred',
}

// Define event interface
export interface MyServiceDataProcessedEvent extends BaseEvent {
  type: typeof MyServiceEventTypes.DataProcessed
  payload: {
    recordId: string
    result: string
  }
}

@injectable()
export class MyService {
  private eventEmitter: EventEmitter

  constructor(eventEmitter: EventEmitter) {
    this.eventEmitter = eventEmitter
  }

  async processData(agentContext: AgentContext, data: string) {
    // Process the data
    const result = await this.doProcessing(data)

    // Emit event
    this.eventEmitter.emit<MyServiceDataProcessedEvent>(agentContext, {
      type: MyServiceEventTypes.DataProcessed,
      payload: {
        recordId: 'record-123',
        result: result,
      },
    })
  }
}

Common Event Types

Connection Events

import { ConnectionEventTypes } from '@credo-ts/core'

agent.events.on(
  ConnectionEventTypes.ConnectionStateChanged,
  (event) => {
    const { connectionRecord, previousState } = event.payload
    console.log(`${previousState} -> ${connectionRecord.state}`)
  }
)

Credential Events

import { CredentialEventTypes } from '@credo-ts/core'

agent.events.on(
  CredentialEventTypes.CredentialStateChanged,
  (event) => {
    const { credentialRecord } = event.payload
    console.log('Credential state:', credentialRecord.state)
  }
)

Proof Events

import { ProofEventTypes } from '@credo-ts/core'

agent.events.on(
  ProofEventTypes.ProofStateChanged,
  (event) => {
    const { proofRecord } = event.payload
    console.log('Proof state:', proofRecord.state)
  }
)

Practical Examples

Auto-Accept Connections

import { ConnectionEventTypes, DidExchangeState } from '@credo-ts/core'

agent.events.on<ConnectionStateChangedEvent>(
  ConnectionEventTypes.ConnectionStateChanged,
  async ({ payload }) => {
    const { connectionRecord } = payload

    if (connectionRecord.state === DidExchangeState.RequestReceived) {
      await agent.connections.acceptRequest(connectionRecord.id)
      console.log('Auto-accepted connection request')
    }
  }
)

Log All Events

const eventTypes = [
  ...Object.values(ConnectionEventTypes),
  ...Object.values(CredentialEventTypes),
  ...Object.values(ProofEventTypes),
]

for (const eventType of eventTypes) {
  agent.events.on(eventType, (event) => {
    console.log(`[${new Date().toISOString()}] ${event.type}`, event.payload)
  })
}

Notify External Service

import { CredentialEventTypes, CredentialState } from '@credo-ts/core'

agent.events.on<CredentialStateChangedEvent>(
  CredentialEventTypes.CredentialStateChanged,
  async ({ payload }) => {
    const { credentialRecord } = payload

    if (credentialRecord.state === CredentialState.Done) {
      // Notify external service
      await fetch('https://api.example.com/credentials', {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          credentialId: credentialRecord.id,
          issuedAt: new Date().toISOString(),
        }),
      })
    }
  }
)

Event Aggregation with RxJS

import { merge } from 'rxjs'
import { filter, map, debounceTime } from 'rxjs/operators'

// Combine multiple event streams
const connectionComplete$ = agent.events
  .observable<ConnectionStateChangedEvent>(ConnectionEventTypes.ConnectionStateChanged)
  .pipe(
    filter(e => e.payload.connectionRecord.state === DidExchangeState.Completed),
    map(e => ({ type: 'connection', id: e.payload.connectionRecord.id }))
  )

const credentialReceived$ = agent.events
  .observable<CredentialStateChangedEvent>(CredentialEventTypes.CredentialStateChanged)
  .pipe(
    filter(e => e.payload.credentialRecord.state === CredentialState.Done),
    map(e => ({ type: 'credential', id: e.payload.credentialRecord.id }))
  )

// Merge and process
merge(connectionComplete$, credentialReceived$)
  .pipe(debounceTime(1000))
  .subscribe((activity) => {
    console.log('Recent activity:', activity.type, activity.id)
  })

Event Lifecycle

The EventEmitter respects the agent’s lifecycle:
// Events automatically stop when agent shuts down
await agent.shutdown()

// Observables complete automatically via the stop$ subject
const stop$ = new Subject<boolean>()

const subscription = fromEventPattern(
  (handler) => agent.events.on('SomeEvent', handler),
  (handler) => agent.events.off('SomeEvent', handler)
)
  .pipe(takeUntil(stop$))
  .subscribe(/* ... */)

// When agent shuts down, stop$ emits and observables complete

Best Practices

1. Type Your Event Handlers

Always use TypeScript types for better type safety:
import { ConnectionStateChangedEvent } from '@credo-ts/core'

agent.events.on<ConnectionStateChangedEvent>(
  ConnectionEventTypes.ConnectionStateChanged,
  (event) => {
    // event.payload is properly typed
    const state = event.payload.connectionRecord.state
  }
)

2. Clean Up Listeners

Remove event listeners when they’re no longer needed:
class MyComponent {
  private listener: (event: ConnectionStateChangedEvent) => void

  onMount() {
    this.listener = (event) => { /* handle event */ }
    agent.events.on(ConnectionEventTypes.ConnectionStateChanged, this.listener)
  }

  onUnmount() {
    agent.events.off(ConnectionEventTypes.ConnectionStateChanged, this.listener)
  }
}

3. Use Observables for Complex Logic

When you need filtering, debouncing, or combining streams, use observables:
// Better than multiple callback listeners
agent.events.observable(ConnectionEventTypes.ConnectionStateChanged)
  .pipe(
    filter(e => e.payload.connectionRecord.state === 'completed'),
    debounceTime(500),
    take(1)
  )
  .subscribe(/* ... */)

4. Handle Async Errors in Listeners

agent.events.on(ConnectionEventTypes.ConnectionStateChanged, async (event) => {
  try {
    await processConnection(event.payload.connectionRecord)
  } catch (error) {
    console.error('Error processing connection:', error)
    // Don't let errors crash the event emitter
  }
})

5. Filter by Context in Multi-Tenant Apps

const contextId = agentContext.contextCorrelationId

agent.events.observable(CredentialEventTypes.CredentialStateChanged)
  .pipe(filterContextCorrelationId(contextId))
  .subscribe(/* only events from this context */)

Custom Modules

Learn how to emit custom events from modules

Agent API

API reference for the Agent class

Build docs developers (and LLMs) love