Skip to main content
The EventStore service provides event publishing capabilities with automatic metadata enrichment and lifecycle event tracking.

EventStore

application_sdk.services.eventstore.EventStore Unified event store service for publishing application events.

Class Methods

publish_event

Publish event with automatic metadata enrichment and authentication.
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event

await EventStore.publish_event(event: Event)
event
Event
required
Event data to publish
behavior
Description
  • Automatically enriches event with workflow and activity context
  • Injects authentication headers
  • Validates component availability
  • Sends lifecycle events to Segment
  • Silently skips if event store component is not registered
raises
Exception
Errors are logged but not re-raised to prevent disruption

enrich_event_metadata

Enrich the event metadata with workflow and activity context information.
enriched_event = EventStore.enrich_event_metadata(event: Event)
event
Event
required
Event data to enrich with metadata
return
Event
The same event instance with enriched metadata including:
  • application_name
  • created_timestamp
  • topic_name
  • workflow_type, workflow_id, workflow_run_id (if in workflow context)
  • activity_type, activity_id, attempt (if in activity context)
This method safely handles cases where the code is not running within a Temporal workflow or activity context.

Event Models

Event

application_sdk.interceptors.models.Event Base event model for application events.

Attributes

event_type
str
required
The type of event (e.g., “workflow.started”, “data.processed”)
event_name
str
Human-readable event name
data
Dict[str, Any]
Event payload data
metadata
EventMetadata
Event metadata including workflow and activity context

Methods

topic_name = event.get_topic_name()
Get the topic name for the event based on event_type.

EventMetadata

application_sdk.interceptors.models.EventMetadata Metadata for events including execution context.
application_name
str
Name of the application
created_timestamp
int
Unix timestamp when the event was created
topic_name
str
Topic name for the event
workflow_type
str
Type of workflow (if in workflow context)
workflow_id
str
Workflow identifier (if in workflow context)
workflow_run_id
str
Workflow run identifier (if in workflow context)
workflow_state
str
Current workflow state (if in workflow context)
activity_type
str
Type of activity (if in activity context)
activity_id
str
Activity identifier (if in activity context)
attempt
int
Activity attempt number (if in activity context)

Example Usage

Basic Event Publishing

from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event

# Create and publish event
event = Event(
    event_type="data.processed",
    event_name="DataProcessed",
    data={
        "batch_id": "batch-123",
        "records_count": 1000,
        "success_count": 995,
        "error_count": 5
    }
)

await EventStore.publish_event(event)

Workflow Status Events

from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event

# Workflow started event
started_event = Event(
    event_type="workflow.started",
    event_name="WorkflowStarted",
    data={
        "workflow_id": "wf-123",
        "source": "manual",
        "user_id": "user-456"
    }
)
await EventStore.publish_event(started_event)

# Workflow completed event
completed_event = Event(
    event_type="workflow.completed",
    event_name="WorkflowCompleted",
    data={
        "workflow_id": "wf-123",
        "status": "success",
        "duration_seconds": 1800,
        "records_processed": 10000
    }
)
await EventStore.publish_event(completed_event)

Data Processing Events

from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event

# Batch processing started
batch_start = Event(
    event_type="batch.started",
    event_name="BatchProcessingStarted",
    data={
        "batch_id": "batch-456",
        "item_count": 5000,
        "priority": "high"
    }
)
await EventStore.publish_event(batch_start)

# Batch processing completed
batch_complete = Event(
    event_type="batch.completed",
    event_name="BatchProcessingCompleted",
    data={
        "batch_id": "batch-456",
        "processed_count": 5000,
        "duration_ms": 45000
    }
)
await EventStore.publish_event(batch_complete)

Activity Events

from application_sdk.activities import ActivitiesInterface
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event
from temporalio import activity

class EventPublishingActivities(ActivitiesInterface):
    
    @activity.defn
    async def process_data(self, workflow_args):
        """Process data and publish events."""
        
        # Publish start event
        start_event = Event(
            event_type="activity.data_processing_started",
            data={"source": workflow_args["source"]}
        )
        await EventStore.publish_event(start_event)
        
        try:
            # Process data
            result = await self._do_processing(workflow_args)
            
            # Publish success event
            success_event = Event(
                event_type="activity.data_processing_completed",
                data={
                    "source": workflow_args["source"],
                    "records": result["count"]
                }
            )
            await EventStore.publish_event(success_event)
            
            return result
            
        except Exception as e:
            # Publish error event
            error_event = Event(
                event_type="activity.data_processing_failed",
                data={
                    "source": workflow_args["source"],
                    "error": str(e)
                }
            )
            await EventStore.publish_event(error_event)
            raise

Custom Event Enrichment

from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event, EventMetadata

# Create event with custom metadata
event = Event(
    event_type="custom.action",
    event_name="CustomAction",
    data={"action": "data_export"},
    metadata=EventMetadata(
        # Custom fields will be preserved
        custom_field="custom_value"
    )
)

# Enrich with workflow context
enriched = EventStore.enrich_event_metadata(event)

# enriched.metadata now includes:
# - application_name
# - created_timestamp
# - workflow_id, workflow_run_id (if in workflow)
# - activity_id (if in activity)
# - custom_field (preserved)

await EventStore.publish_event(enriched)

Error Event Publishing

from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event

async def handle_error(error: Exception, context: dict):
    """Publish error event."""
    error_event = Event(
        event_type="error.occurred",
        event_name="ErrorOccurred",
        data={
            "error_type": type(error).__name__,
            "error_message": str(error),
            "context": context,
            "severity": "high" if isinstance(error, CriticalError) else "medium"
        }
    )
    
    await EventStore.publish_event(error_event)

Multi-Event Publishing

from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event
import asyncio

async def publish_pipeline_events(pipeline_id: str, stages: list):
    """Publish events for all pipeline stages."""
    events = [
        Event(
            event_type=f"pipeline.stage.{stage['name']}",
            event_name=f"Stage{stage['name'].title()}",
            data={
                "pipeline_id": pipeline_id,
                "stage": stage["name"],
                "status": stage["status"]
            }
        )
        for stage in stages
    ]
    
    # Publish all events
    await asyncio.gather(*[
        EventStore.publish_event(event)
        for event in events
    ])

Lifecycle Event Tracking

from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import (
    Event,
    ApplicationEventNames
)

# Lifecycle events are automatically sent to Segment

# Workflow lifecycle
workflow_start = Event(
    event_name=ApplicationEventNames.WORKFLOW_START.value,
    event_type="workflow.started",
    data={"workflow_type": "DataProcessing"}
)
await EventStore.publish_event(workflow_start)

# Activity lifecycle
activity_start = Event(
    event_name=ApplicationEventNames.ACTIVITY_START.value,
    event_type="activity.started",
    data={"activity_type": "ExtractData"}
)
await EventStore.publish_event(activity_start)

Best Practices

Event Design

  • Use descriptive event types and names
  • Include relevant context in event data
  • Keep event payloads focused and lightweight
  • Use consistent naming conventions
  • Document custom event types

Event Publishing

  • Publish events at key points in workflow execution
  • Don’t block on event publishing (it’s async)
  • Handle publishing errors gracefully
  • Avoid publishing excessive events
  • Use appropriate event types for different scenarios

Metadata

  • Let automatic enrichment handle workflow/activity context
  • Add custom metadata when needed
  • Include timestamps in event data
  • Preserve correlation IDs

Performance

  • Event publishing is non-blocking
  • Batch related events when possible
  • Monitor event volume
  • Use appropriate event granularity

Build docs developers (and LLMs) love