The EventStore service provides event publishing capabilities with automatic metadata enrichment and lifecycle event tracking.
EventStore
application_sdk.services.eventstore.EventStore
Unified event store service for publishing application events.
Class Methods
publish_event
Publish event with automatic metadata enrichment and authentication.
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event
await EventStore.publish_event(event: Event)
- Automatically enriches event with workflow and activity context
- Injects authentication headers
- Validates component availability
- Sends lifecycle events to Segment
- Silently skips if event store component is not registered
Errors are logged but not re-raised to prevent disruption
Enrich the event metadata with workflow and activity context information.
enriched_event = EventStore.enrich_event_metadata(event: Event)
Event data to enrich with metadata
The same event instance with enriched metadata including:
- application_name
- created_timestamp
- topic_name
- workflow_type, workflow_id, workflow_run_id (if in workflow context)
- activity_type, activity_id, attempt (if in activity context)
This method safely handles cases where the code is not running within a Temporal workflow or activity context.
Event Models
Event
application_sdk.interceptors.models.Event
Base event model for application events.
Attributes
The type of event (e.g., “workflow.started”, “data.processed”)
Human-readable event name
Event metadata including workflow and activity context
Methods
topic_name = event.get_topic_name()
Get the topic name for the event based on event_type.
application_sdk.interceptors.models.EventMetadata
Metadata for events including execution context.
Unix timestamp when the event was created
Type of workflow (if in workflow context)
Workflow identifier (if in workflow context)
Workflow run identifier (if in workflow context)
Current workflow state (if in workflow context)
Type of activity (if in activity context)
Activity identifier (if in activity context)
Activity attempt number (if in activity context)
Example Usage
Basic Event Publishing
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event
# Create and publish event
event = Event(
event_type="data.processed",
event_name="DataProcessed",
data={
"batch_id": "batch-123",
"records_count": 1000,
"success_count": 995,
"error_count": 5
}
)
await EventStore.publish_event(event)
Workflow Status Events
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event
# Workflow started event
started_event = Event(
event_type="workflow.started",
event_name="WorkflowStarted",
data={
"workflow_id": "wf-123",
"source": "manual",
"user_id": "user-456"
}
)
await EventStore.publish_event(started_event)
# Workflow completed event
completed_event = Event(
event_type="workflow.completed",
event_name="WorkflowCompleted",
data={
"workflow_id": "wf-123",
"status": "success",
"duration_seconds": 1800,
"records_processed": 10000
}
)
await EventStore.publish_event(completed_event)
Data Processing Events
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event
# Batch processing started
batch_start = Event(
event_type="batch.started",
event_name="BatchProcessingStarted",
data={
"batch_id": "batch-456",
"item_count": 5000,
"priority": "high"
}
)
await EventStore.publish_event(batch_start)
# Batch processing completed
batch_complete = Event(
event_type="batch.completed",
event_name="BatchProcessingCompleted",
data={
"batch_id": "batch-456",
"processed_count": 5000,
"duration_ms": 45000
}
)
await EventStore.publish_event(batch_complete)
Activity Events
from application_sdk.activities import ActivitiesInterface
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event
from temporalio import activity
class EventPublishingActivities(ActivitiesInterface):
@activity.defn
async def process_data(self, workflow_args):
"""Process data and publish events."""
# Publish start event
start_event = Event(
event_type="activity.data_processing_started",
data={"source": workflow_args["source"]}
)
await EventStore.publish_event(start_event)
try:
# Process data
result = await self._do_processing(workflow_args)
# Publish success event
success_event = Event(
event_type="activity.data_processing_completed",
data={
"source": workflow_args["source"],
"records": result["count"]
}
)
await EventStore.publish_event(success_event)
return result
except Exception as e:
# Publish error event
error_event = Event(
event_type="activity.data_processing_failed",
data={
"source": workflow_args["source"],
"error": str(e)
}
)
await EventStore.publish_event(error_event)
raise
Custom Event Enrichment
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event, EventMetadata
# Create event with custom metadata
event = Event(
event_type="custom.action",
event_name="CustomAction",
data={"action": "data_export"},
metadata=EventMetadata(
# Custom fields will be preserved
custom_field="custom_value"
)
)
# Enrich with workflow context
enriched = EventStore.enrich_event_metadata(event)
# enriched.metadata now includes:
# - application_name
# - created_timestamp
# - workflow_id, workflow_run_id (if in workflow)
# - activity_id (if in activity)
# - custom_field (preserved)
await EventStore.publish_event(enriched)
Error Event Publishing
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event
async def handle_error(error: Exception, context: dict):
"""Publish error event."""
error_event = Event(
event_type="error.occurred",
event_name="ErrorOccurred",
data={
"error_type": type(error).__name__,
"error_message": str(error),
"context": context,
"severity": "high" if isinstance(error, CriticalError) else "medium"
}
)
await EventStore.publish_event(error_event)
Multi-Event Publishing
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import Event
import asyncio
async def publish_pipeline_events(pipeline_id: str, stages: list):
"""Publish events for all pipeline stages."""
events = [
Event(
event_type=f"pipeline.stage.{stage['name']}",
event_name=f"Stage{stage['name'].title()}",
data={
"pipeline_id": pipeline_id,
"stage": stage["name"],
"status": stage["status"]
}
)
for stage in stages
]
# Publish all events
await asyncio.gather(*[
EventStore.publish_event(event)
for event in events
])
Lifecycle Event Tracking
from application_sdk.services.eventstore import EventStore
from application_sdk.interceptors.models import (
Event,
ApplicationEventNames
)
# Lifecycle events are automatically sent to Segment
# Workflow lifecycle
workflow_start = Event(
event_name=ApplicationEventNames.WORKFLOW_START.value,
event_type="workflow.started",
data={"workflow_type": "DataProcessing"}
)
await EventStore.publish_event(workflow_start)
# Activity lifecycle
activity_start = Event(
event_name=ApplicationEventNames.ACTIVITY_START.value,
event_type="activity.started",
data={"activity_type": "ExtractData"}
)
await EventStore.publish_event(activity_start)
Best Practices
Event Design
- Use descriptive event types and names
- Include relevant context in event data
- Keep event payloads focused and lightweight
- Use consistent naming conventions
- Document custom event types
Event Publishing
- Publish events at key points in workflow execution
- Don’t block on event publishing (it’s async)
- Handle publishing errors gracefully
- Avoid publishing excessive events
- Use appropriate event types for different scenarios
- Let automatic enrichment handle workflow/activity context
- Add custom metadata when needed
- Include timestamps in event data
- Preserve correlation IDs
- Event publishing is non-blocking
- Batch related events when possible
- Monitor event volume
- Use appropriate event granularity