Skip to main content
Activities implement the individual tasks that make up a workflow, handling data processing, external API calls, and state management.

ActivitiesInterface

application_sdk.activities.ActivitiesInterface Abstract base class defining the interface for workflow activities with built-in state management.

State Management

Activities maintain state through the ActivitiesState model, which is automatically managed per workflow execution.

_get_state

Retrieve the state for the current workflow.
await self._get_state(workflow_args: Dict[str, Any]) -> ActivitiesState
workflow_args
Dict[str, Any]
required
Dictionary containing workflow arguments and configuration
return
ActivitiesState
The state data for the current workflow. Automatically initialized if it doesn’t exist.
State is automatically refreshed if more than 15 minutes have passed since the last update.

_set_state

Initialize or update the state for the current workflow.
await self._set_state(workflow_args: Dict[str, Any]) -> None
workflow_args
Dict[str, Any]
required
Arguments for the workflow, containing configuration and runtime parameters

_clean_state

Remove the state data for the current workflow.
await self._clean_state() -> None
Typically called during cleanup or error handling. Failures are logged but don’t raise exceptions.

Built-in Activities

get_workflow_args

Safely retrieve workflow configuration from state store.
@activity.defn
@auto_heartbeater
async def get_workflow_args(
    self,
    workflow_config: Dict[str, Any]
) -> Dict[str, Any]
workflow_config
Dict[str, Any]
required
Dictionary containing workflow_id and other parameters
return
Dict[str, Any]
Complete workflow configuration with:
  • All configuration from state store
  • output_prefix: Output path prefix
  • output_path: Full output path
  • workflow_id: Workflow identifier
  • workflow_run_id: Workflow run identifier
  • Preserved atlan-* prefixed keys for logging context
raises
IOError
If configuration cannot be retrieved from state store

preflight_check

Perform preflight checks before workflow execution.
@activity.defn
@auto_heartbeater
async def preflight_check(
    self,
    workflow_args: Dict[str, Any]
) -> Dict[str, Any]
workflow_args
Dict[str, Any]
required
Dictionary containing workflow arguments and configuration. Must include a metadata key.
return
Dict[str, Any]
Dictionary containing the results of the preflight check
raises
ValueError
If the handler is not found or if the preflight check fails

convert_files

Convert input files to the specified output type.
@activity.defn
@auto_heartbeater
async def convert_files(
    self,
    workflow_args: Dict[str, Any]
) -> ActivityResult
workflow_args
Dict[str, Any]
required
Dictionary containing:
  • input_files: List of input file paths
  • output_file_type: Target file type (e.g., “parquet”, “json”, “csv”)
return
ActivityResult
Result object with:
  • status: “success” or “warning”
  • message: Description of the operation
  • metadata.input_files: List of converted file paths

ActivitiesState

application_sdk.activities.ActivitiesState Base state model for workflow activities.

Attributes

handler
HandlerType | None
Handler instance for activity-specific operations. Must be a subclass of HandlerInterface.
workflow_args
Dict[str, Any] | None
Arguments passed to the workflow containing configuration and runtime parameters
last_updated_timestamp
datetime | None
Timestamp of the last state update, used for automatic state refresh

Example Usage

Basic Activities

from application_sdk.activities import ActivitiesInterface, ActivitiesState
from application_sdk.handlers.base import BaseHandler
from temporalio import activity

class MyHandler(BaseHandler):
    async def fetch_data(self, source):
        # Fetch data from source
        return {"data": []}

class MyActivitiesState(ActivitiesState):
    handler: MyHandler

class MyActivities(ActivitiesInterface[MyActivitiesState]):
    
    @activity.defn
    async def initialize(self, workflow_args):
        """Initialize handler with credentials."""
        state = await self._get_state(workflow_args)
        
        # Load handler with credentials
        handler = MyHandler()
        await handler.load(workflow_args["credentials"])
        
        # Update state
        state.handler = handler
        await self._set_state(workflow_args)
    
    @activity.defn
    async def process_data(self, workflow_args):
        """Process data using the handler."""
        state = await self._get_state(workflow_args)
        
        # Use handler from state
        data = await state.handler.fetch_data(
            workflow_args["source"]
        )
        
        # Process data
        processed = self._transform_data(data)
        
        return {"processed_count": len(processed)}
    
    def _transform_data(self, data):
        # Transform logic
        return data

Activities with Error Handling

from application_sdk.activities import ActivitiesInterface
from application_sdk.common.error_codes import OrchestratorError
from temporalio import activity

class RobustActivities(ActivitiesInterface):
    
    @activity.defn
    async def fetch_with_retry(self, workflow_args):
        """Fetch data with automatic retry on failure."""
        state = await self._get_state(workflow_args)
        
        try:
            # Heartbeat to show activity is alive
            activity.heartbeat("Starting data fetch")
            
            result = await state.handler.fetch_metadata(
                metadata=workflow_args["metadata"]
            )
            
            activity.heartbeat("Data fetch completed")
            
            return result
            
        except OrchestratorError as e:
            activity.logger.error(
                f"Orchestrator error: {e}",
                error_code=e.code
            )
            await self._clean_state()
            raise
            
        except Exception as e:
            activity.logger.error(f"Unexpected error: {e}")
            await self._clean_state()
            raise

Activities with Progress Tracking

from application_sdk.activities import ActivitiesInterface
from temporalio import activity

class ProgressTrackingActivities(ActivitiesInterface):
    
    @activity.defn
    async def batch_process(self, workflow_args):
        """Process data in batches with progress tracking."""
        state = await self._get_state(workflow_args)
        
        items = workflow_args["items"]
        batch_size = workflow_args.get("batch_size", 100)
        
        processed = 0
        total = len(items)
        
        for i in range(0, total, batch_size):
            batch = items[i:i+batch_size]
            
            # Process batch
            await self._process_batch(batch, state)
            
            # Update progress
            processed += len(batch)
            progress = (processed / total) * 100
            
            # Send heartbeat with progress
            activity.heartbeat(f"Processed {processed}/{total} items ({progress:.1f}%)")
        
        return {"total_processed": processed}
    
    async def _process_batch(self, batch, state):
        # Process batch
        pass

Activities with State Cleanup

from application_sdk.activities import ActivitiesInterface
from temporalio import activity

class CleanupActivities(ActivitiesInterface):
    
    @activity.defn
    async def process_with_cleanup(self, workflow_args):
        """Process data with guaranteed cleanup."""
        state = await self._get_state(workflow_args)
        
        try:
            # Processing logic
            result = await state.handler.fetch_metadata(
                metadata=workflow_args["metadata"]
            )
            
            return result
            
        finally:
            # Always cleanup state
            await self._clean_state()
            
            # Close any open connections
            if state.handler and hasattr(state.handler, 'client'):
                await state.handler.client.close()

Custom State Model

from application_sdk.activities import ActivitiesInterface, ActivitiesState
from typing import Dict, Any, List
from datetime import datetime

class CustomState(ActivitiesState):
    """Custom state with additional fields."""
    handler: MyHandler
    processed_items: List[str] = []
    start_time: datetime = None
    metrics: Dict[str, Any] = {}

class CustomActivities(ActivitiesInterface[CustomState]):
    
    @activity.defn
    async def initialize(self, workflow_args):
        state = await self._get_state(workflow_args)
        state.start_time = datetime.now()
        state.metrics = {"items_processed": 0}
        await self._set_state(workflow_args)
    
    @activity.defn
    async def process_item(self, workflow_args):
        state = await self._get_state(workflow_args)
        
        item_id = workflow_args["item_id"]
        
        # Process item
        # ...
        
        # Update state
        state.processed_items.append(item_id)
        state.metrics["items_processed"] += 1
        await self._set_state(workflow_args)
        
        return {"item_id": item_id, "status": "processed"}

Best Practices

State Management

  • Always use _get_state() to access current state
  • Call _clean_state() in finally blocks for proper cleanup
  • Keep state objects lightweight
  • Use custom state models for activity-specific data

Error Handling

  • Catch and log errors with appropriate context
  • Clean up state on errors
  • Use appropriate error types (OrchestratorError, etc.)
  • Close resources in finally blocks

Activity Design

  • Make activities idempotent when possible
  • Send heartbeats for long-running activities
  • Break down complex operations into smaller activities
  • Use the @activity.defn decorator
  • Use the @auto_heartbeater decorator for automatic heartbeats

Performance

  • Process data in batches for large datasets
  • Report progress using heartbeats
  • Close connections and clean up resources
  • Minimize state object size

Build docs developers (and LLMs) love