Skip to main content
The EventStore service provides functionality for publishing application events to a pub/sub system with automatic metadata enrichment, authentication, and lifecycle tracking integration.

Overview

EventStore supports:
  • Automatic metadata enrichment: Adds workflow and activity context to events
  • Lifecycle event tracking: Integrates with Segment for analytics
  • Authentication injection: Automatically adds auth headers for HTTP bindings
  • Graceful degradation: Continues operation when event store is unavailable

Class Reference

EventStore

EventStore
class
Unified event store service for publishing application events.All methods are class methods and can be called directly without instantiation.

Core Methods

publish_event

Publish event with automatic metadata enrichment and authentication.
await EventStore.publish_event(event: Event)
This method handles the complete event publishing flow including metadata enrichment, authentication header injection, and component availability validation. It automatically falls back gracefully if the event store component is not available.
event
Event
required
Event data to publish. Must be an instance of application_sdk.interceptors.models.Event.
Note: The method will silently skip publishing if the event store component is not registered, allowing applications to run without event publishing capability. Raises:
  • Exception: If there’s an error during event publishing (logged but not re-raised)
Example:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event

# Publish workflow status event
status_event = Event(
    event_type="workflow.status_changed",
    data={
        "workflow_id": "wf-123",
        "old_status": "running",
        "new_status": "completed",
        "duration_seconds": 1800
    }
)
await EventStore.publish_event(status_event)

# Publish data processing event
processing_event = Event(
    event_type="data.batch_processed",
    data={
        "batch_id": "batch-456",
        "records_processed": 10000,
        "success_count": 9995,
        "error_count": 5
    }
)
await EventStore.publish_event(processing_event)

enrich_event_metadata

Enrich the event metadata with workflow and activity context information.
EventStore.enrich_event_metadata(event: Event) -> Event
This method automatically populates event metadata with context from the current Temporal workflow and activity execution, including IDs, types, and execution state.
event
Event
required
Event data to enrich with metadata.
return
Event
The same event instance with enriched metadata.
Note: This method safely handles cases where the code is not running within a Temporal workflow or activity context. Example:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event

# Create basic event
event = Event(event_type="data.processed", data={"count": 100})

# Enrich with current context (if available)
enriched = EventStore.enrich_event_metadata(event)
print(f"Workflow ID: {enriched.metadata.workflow_id}")
print(f"Activity: {enriched.metadata.activity_type}")
print(f"Timestamp: {enriched.metadata.created_timestamp}")

Event Model

Event

The Event class from application_sdk.interceptors.models:
event_type
str
required
The type of event (e.g., “workflow.started”, “data.processed”).
event_name
str
Optional event name for lifecycle events.
data
Dict[str, Any]
Event payload data.
metadata
EventMetadata
Event metadata (automatically enriched if not provided).

EventMetadata

Metadata automatically attached to events:
application_name
str
Name of the application publishing the event.
created_timestamp
int
Unix timestamp when the event was created.
topic_name
str
The topic name derived from the event type.
workflow_type
str
Type of the workflow (if event published from workflow context).
workflow_id
str
Unique identifier of the workflow execution.
workflow_run_id
str
Run ID of the workflow execution.
workflow_state
str
Current state of the workflow (e.g., “RUNNING”, “COMPLETED”).
activity_type
str
Type of the activity (if event published from activity context).
activity_id
str
Unique identifier of the activity execution.
attempt
int
Attempt number for the activity execution (for retry tracking).

Usage Patterns

Workflow Lifecycle Events

Publish events at key workflow stages:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event
from temporalio import workflow

@workflow.defn
class DataProcessingWorkflow:
    @workflow.run
    async def run(self, input_data: dict):
        # Publish workflow started event
        await EventStore.publish_event(
            Event(
                event_type="workflow.started",
                data={
                    "input_size": len(input_data),
                    "processing_mode": "batch"
                }
            )
        )
        
        try:
            # Execute workflow activities
            result = await workflow.execute_activity(
                process_data,
                input_data,
                start_to_close_timeout=timedelta(hours=1)
            )
            
            # Publish success event
            await EventStore.publish_event(
                Event(
                    event_type="workflow.completed",
                    data={
                        "records_processed": result['count'],
                        "duration_seconds": result['duration']
                    }
                )
            )
            
            return result
            
        except Exception as e:
            # Publish failure event
            await EventStore.publish_event(
                Event(
                    event_type="workflow.failed",
                    data={
                        "error": str(e),
                        "error_type": type(e).__name__
                    }
                )
            )
            raise

Activity Progress Tracking

Publish progress events from long-running activities:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event
from temporalio import activity

@activity.defn
async def process_large_dataset(dataset_id: str, total_records: int):
    batch_size = 1000
    processed = 0
    
    for batch_start in range(0, total_records, batch_size):
        # Process batch
        batch_end = min(batch_start + batch_size, total_records)
        process_batch(batch_start, batch_end)
        processed += (batch_end - batch_start)
        
        # Publish progress event
        progress_pct = (processed / total_records) * 100
        await EventStore.publish_event(
            Event(
                event_type="activity.progress",
                data={
                    "dataset_id": dataset_id,
                    "processed_records": processed,
                    "total_records": total_records,
                    "progress_percentage": progress_pct
                }
            )
        )
    
    return {"processed": processed}

Business Event Publishing

Publish domain-specific business events:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event

@activity.defn
async def process_order(order_id: str):
    # Process order
    order = fetch_order(order_id)
    
    # Publish order received event
    await EventStore.publish_event(
        Event(
            event_type="order.received",
            data={
                "order_id": order_id,
                "customer_id": order['customer_id'],
                "total_amount": order['total'],
                "item_count": len(order['items'])
            }
        )
    )
    
    # Process payment
    payment_result = process_payment(order)
    
    # Publish payment event
    await EventStore.publish_event(
        Event(
            event_type="order.payment_processed",
            data={
                "order_id": order_id,
                "payment_status": payment_result['status'],
                "transaction_id": payment_result['transaction_id']
            }
        )
    )
    
    return payment_result

Error and Alert Events

Publish alerts for exceptional conditions:
from application_sdk.services import EventStore
from application_sdk.interceptors.models import Event

@activity.defn
async def monitor_data_quality(dataset_id: str):
    quality_metrics = calculate_quality_metrics(dataset_id)
    
    # Check for quality issues
    if quality_metrics['null_percentage'] > 10:
        await EventStore.publish_event(
            Event(
                event_type="data_quality.alert",
                data={
                    "dataset_id": dataset_id,
                    "alert_type": "high_null_percentage",
                    "null_percentage": quality_metrics['null_percentage'],
                    "severity": "warning"
                }
            )
        )
    
    if quality_metrics['duplicate_percentage'] > 5:
        await EventStore.publish_event(
            Event(
                event_type="data_quality.alert",
                data={
                    "dataset_id": dataset_id,
                    "alert_type": "high_duplicate_rate",
                    "duplicate_percentage": quality_metrics['duplicate_percentage'],
                    "severity": "error"
                }
            )
        )
    
    return quality_metrics

Lifecycle Events and Segment Integration

The EventStore automatically sends lifecycle events to Segment for analytics:

Lifecycle Event Names

ApplicationEventNames.WORKFLOW_START
enum
Triggered when a workflow starts (mapped to “workflow_started” in Segment).
ApplicationEventNames.WORKFLOW_END
enum
Triggered when a workflow completes (mapped to “workflow_completed” in Segment).
ApplicationEventNames.ACTIVITY_START
enum
Triggered when an activity starts (mapped to “activity_started” in Segment).
ApplicationEventNames.ACTIVITY_END
enum
Triggered when an activity ends (mapped to “activity_ended” in Segment).
Example:
from application_sdk.interceptors.models import Event, ApplicationEventNames
from application_sdk.services import EventStore

# This event will be sent to Segment automatically
await EventStore.publish_event(
    Event(
        event_name=ApplicationEventNames.WORKFLOW_START.value,
        event_type="workflow.lifecycle",
        data={"workflow_name": "DataIngestion"}
    )
)

Best Practices

Automatic Context: Event metadata is automatically enriched with workflow and activity context. You don’t need to manually add workflow IDs or activity information.
Graceful Degradation: If the event store component is not registered, publish_event will log a warning and continue without raising an error. This allows applications to function without event publishing capability.
Non-Blocking: While publish_event is async, it doesn’t block on delivery confirmation. Events are fire-and-forget. Don’t rely on event publishing for critical workflow logic.
Event Naming: Use consistent event naming conventions like domain.action (e.g., “order.created”, “payment.processed”, “data.validated”).

Authentication Integration

EventStore automatically injects authentication headers for HTTP bindings:
# EventStore automatically:
# 1. Retrieves auth token from AtlanAuthClient
# 2. Adds Authorization header to binding metadata
# 3. HTTP bindings use the header, others ignore it

await EventStore.publish_event(event)
# Behind the scenes:
# - Gets token: auth_client.get_authenticated_headers()
# - Adds to metadata: {"Authorization": "Bearer <token>"}

Configuration

EventStore is configured through constants:
from application_sdk.constants import (
    EVENT_STORE_NAME,                    # Default: "eventstore"
    DAPR_BINDING_OPERATION_CREATE,       # Default: "create"
    APP_TENANT_ID,                       # Tenant identifier
    APPLICATION_NAME,                    # Application name
    ATLAN_BASE_URL,                      # Atlan base URL
)
  • SecretStore - Shows similar authentication pattern
  • StateStore - Often used together for workflow state and events

Build docs developers (and LLMs) love