The EventStore service provides functionality for publishing application events to a pub/sub system with automatic metadata enrichment, authentication, and lifecycle tracking integration.
Overview
EventStore supports:
- Automatic metadata enrichment: Adds workflow and activity context to events
- Lifecycle event tracking: Integrates with Segment for analytics
- Authentication injection: Automatically adds auth headers for HTTP bindings
- Graceful degradation: Continues operation when event store is unavailable
Class Reference
EventStore
Unified event store service for publishing application events.All methods are class methods and can be called directly without instantiation.
Core Methods
publish_event
Publish event with automatic metadata enrichment and authentication.
await EventStore.publish_event(event: Event)
This method handles the complete event publishing flow including metadata enrichment, authentication header injection, and component availability validation. It automatically falls back gracefully if the event store component is not available.
Event data to publish. Must be an instance of application_sdk.interceptors.models.Event.
Note: The method will silently skip publishing if the event store component is not registered, allowing applications to run without event publishing capability.
Raises:
Exception: If there’s an error during event publishing (logged but not re-raised)
Example:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event
# Publish workflow status event
status_event = Event(
event_type="workflow.status_changed",
data={
"workflow_id": "wf-123",
"old_status": "running",
"new_status": "completed",
"duration_seconds": 1800
}
)
await EventStore.publish_event(status_event)
# Publish data processing event
processing_event = Event(
event_type="data.batch_processed",
data={
"batch_id": "batch-456",
"records_processed": 10000,
"success_count": 9995,
"error_count": 5
}
)
await EventStore.publish_event(processing_event)
Enrich the event metadata with workflow and activity context information.
EventStore.enrich_event_metadata(event: Event) -> Event
This method automatically populates event metadata with context from the current Temporal workflow and activity execution, including IDs, types, and execution state.
Event data to enrich with metadata.
The same event instance with enriched metadata.
Note: This method safely handles cases where the code is not running within a Temporal workflow or activity context.
Example:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event
# Create basic event
event = Event(event_type="data.processed", data={"count": 100})
# Enrich with current context (if available)
enriched = EventStore.enrich_event_metadata(event)
print(f"Workflow ID: {enriched.metadata.workflow_id}")
print(f"Activity: {enriched.metadata.activity_type}")
print(f"Timestamp: {enriched.metadata.created_timestamp}")
Event Model
Event
The Event class from application_sdk.interceptors.models:
The type of event (e.g., “workflow.started”, “data.processed”).
Optional event name for lifecycle events.
Event metadata (automatically enriched if not provided).
Metadata automatically attached to events:
Name of the application publishing the event.
Unix timestamp when the event was created.
The topic name derived from the event type.
Type of the workflow (if event published from workflow context).
Unique identifier of the workflow execution.
Run ID of the workflow execution.
Current state of the workflow (e.g., “RUNNING”, “COMPLETED”).
Type of the activity (if event published from activity context).
Unique identifier of the activity execution.
Attempt number for the activity execution (for retry tracking).
Usage Patterns
Workflow Lifecycle Events
Publish events at key workflow stages:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event
from temporalio import workflow
@workflow.defn
class DataProcessingWorkflow:
@workflow.run
async def run(self, input_data: dict):
# Publish workflow started event
await EventStore.publish_event(
Event(
event_type="workflow.started",
data={
"input_size": len(input_data),
"processing_mode": "batch"
}
)
)
try:
# Execute workflow activities
result = await workflow.execute_activity(
process_data,
input_data,
start_to_close_timeout=timedelta(hours=1)
)
# Publish success event
await EventStore.publish_event(
Event(
event_type="workflow.completed",
data={
"records_processed": result['count'],
"duration_seconds": result['duration']
}
)
)
return result
except Exception as e:
# Publish failure event
await EventStore.publish_event(
Event(
event_type="workflow.failed",
data={
"error": str(e),
"error_type": type(e).__name__
}
)
)
raise
Activity Progress Tracking
Publish progress events from long-running activities:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event
from temporalio import activity
@activity.defn
async def process_large_dataset(dataset_id: str, total_records: int):
batch_size = 1000
processed = 0
for batch_start in range(0, total_records, batch_size):
# Process batch
batch_end = min(batch_start + batch_size, total_records)
process_batch(batch_start, batch_end)
processed += (batch_end - batch_start)
# Publish progress event
progress_pct = (processed / total_records) * 100
await EventStore.publish_event(
Event(
event_type="activity.progress",
data={
"dataset_id": dataset_id,
"processed_records": processed,
"total_records": total_records,
"progress_percentage": progress_pct
}
)
)
return {"processed": processed}
Business Event Publishing
Publish domain-specific business events:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event
@activity.defn
async def process_order(order_id: str):
# Process order
order = fetch_order(order_id)
# Publish order received event
await EventStore.publish_event(
Event(
event_type="order.received",
data={
"order_id": order_id,
"customer_id": order['customer_id'],
"total_amount": order['total'],
"item_count": len(order['items'])
}
)
)
# Process payment
payment_result = process_payment(order)
# Publish payment event
await EventStore.publish_event(
Event(
event_type="order.payment_processed",
data={
"order_id": order_id,
"payment_status": payment_result['status'],
"transaction_id": payment_result['transaction_id']
}
)
)
return payment_result
Error and Alert Events
Publish alerts for exceptional conditions:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event
@activity.defn
async def monitor_data_quality(dataset_id: str):
quality_metrics = calculate_quality_metrics(dataset_id)
# Check for quality issues
if quality_metrics['null_percentage'] > 10:
await EventStore.publish_event(
Event(
event_type="data_quality.alert",
data={
"dataset_id": dataset_id,
"alert_type": "high_null_percentage",
"null_percentage": quality_metrics['null_percentage'],
"severity": "warning"
}
)
)
if quality_metrics['duplicate_percentage'] > 5:
await EventStore.publish_event(
Event(
event_type="data_quality.alert",
data={
"dataset_id": dataset_id,
"alert_type": "high_duplicate_rate",
"duplicate_percentage": quality_metrics['duplicate_percentage'],
"severity": "error"
}
)
)
return quality_metrics
Lifecycle Events and Segment Integration
The EventStore automatically sends lifecycle events to Segment for analytics:
Lifecycle Event Names
ApplicationEventNames.WORKFLOW_START
Triggered when a workflow starts (mapped to “workflow_started” in Segment).
ApplicationEventNames.WORKFLOW_END
Triggered when a workflow completes (mapped to “workflow_completed” in Segment).
ApplicationEventNames.ACTIVITY_START
Triggered when an activity starts (mapped to “activity_started” in Segment).
ApplicationEventNames.ACTIVITY_END
Triggered when an activity ends (mapped to “activity_ended” in Segment).
Example:
from application_sdk.interceptors.models import Event, ApplicationEventNames
from application_sdk.services import EventStore
# This event will be sent to Segment automatically
await EventStore.publish_event(
Event(
event_name=ApplicationEventNames.WORKFLOW_START.value,
event_type="workflow.lifecycle",
data={"workflow_name": "DataIngestion"}
)
)
Best Practices
Automatic Context: Event metadata is automatically enriched with workflow and activity context. You don’t need to manually add workflow IDs or activity information.
Graceful Degradation: If the event store component is not registered, publish_event will log a warning and continue without raising an error. This allows applications to function without event publishing capability.
Non-Blocking: While publish_event is async, it doesn’t block on delivery confirmation. Events are fire-and-forget. Don’t rely on event publishing for critical workflow logic.
Event Naming: Use consistent event naming conventions like domain.action (e.g., “order.created”, “payment.processed”, “data.validated”).
Authentication Integration
EventStore automatically injects authentication headers for HTTP bindings:
# EventStore automatically:
# 1. Retrieves auth token from AtlanAuthClient
# 2. Adds Authorization header to binding metadata
# 3. HTTP bindings use the header, others ignore it
await EventStore.publish_event(event)
# Behind the scenes:
# - Gets token: auth_client.get_authenticated_headers()
# - Adds to metadata: {"Authorization": "Bearer <token>"}
Configuration
EventStore is configured through constants:
from application_sdk.constants import (
EVENT_STORE_NAME, # Default: "eventstore"
DAPR_BINDING_OPERATION_CREATE, # Default: "create"
APP_TENANT_ID, # Tenant identifier
APPLICATION_NAME, # Application name
ATLAN_BASE_URL, # Atlan base URL
)
- SecretStore - Shows similar authentication pattern
- StateStore - Often used together for workflow state and events