Skip to main content

Overview

Activities are the building blocks of workflows in the Atlan Application SDK. They represent individual tasks that perform actual work, such as calling APIs, querying databases, processing files, or any other side-effecting operation. Activities are designed to be retryable and fault-tolerant.
Unlike workflows, activities can perform non-deterministic operations like I/O, API calls, and database queries. All side effects should happen in activities.

ActivitiesInterface

The ActivitiesInterface is an abstract base class that provides state management functionality and defines the structure for activity implementations.
from application_sdk.activities import ActivitiesInterface, ActivitiesState
from temporalio import activity
from typing import Dict, Any

class MyActivities(ActivitiesInterface[MyHandler]):
    async def my_activity(self, workflow_args: Dict[str, Any]) -> Dict[str, Any]:
        # Get workflow state
        state = await self._get_state(workflow_args)
        handler = state.handler
        
        # Perform work
        result = await handler.process_data(workflow_args)
        return result

State Management

Activities have access to a shared state that persists across activity executions within the same workflow run. This state includes the handler instance and workflow arguments.

ActivitiesState

The base state model for workflow activities.
class ActivitiesState(BaseModel, Generic[HandlerType]):
    handler: Optional[HandlerType] = None
    workflow_args: Optional[Dict[str, Any]] = None
    last_updated_timestamp: Optional[datetime] = None
handler
HandlerInterface
Handler instance for activity-specific operations. Must be a subclass of HandlerInterface.
workflow_args
Dict[str, Any]
Arguments passed to the workflow. Contains configuration and runtime parameters.
last_updated_timestamp
datetime
Timestamp of last state update. Used for automatic state refresh after 15 minutes.

State Methods

_get_state()

Retrieve or initialize the state for the current workflow.
state = await self._get_state(workflow_args)
handler = state.handler
args = state.workflow_args
workflow_args
Dict[str, Any]
required
Dictionary containing workflow arguments and configuration.
State is automatically refreshed if more than 15 minutes have passed since the last update. This ensures credentials and configurations remain current for long-running workflows.

_set_state()

Initialize or update the state for the current workflow.
await self._set_state(workflow_args)
workflow_args
Dict[str, Any]
required
Arguments for the workflow, containing configuration and runtime parameters.

_clean_state()

Remove the state data for the current workflow. Typically called during cleanup.
await self._clean_state()

Built-in Activities

The SDK provides several built-in activities that are commonly used across workflows.

get_workflow_args()

Retrieve workflow configuration from the state store.
@activity.defn
@auto_heartbeater
async def get_workflow_args(
    self, 
    workflow_config: Dict[str, Any]
) -> Dict[str, Any]:
    """Safely retrieve workflow configuration from state store."""
    pass
workflow_config
Dict[str, Any]
required
Dictionary containing workflow_id and other parameters.
Returns: Complete workflow configuration including:
  • workflow_id: Unique workflow identifier
  • workflow_run_id: Temporal run ID
  • output_prefix: Base path for output files
  • output_path: Full path for workflow outputs
  • metadata: Custom workflow metadata
  • atlan-*: Logging context fields
This activity is automatically called by the base WorkflowInterface.run() method. It handles retrieving configuration from Dapr state store.

preflight_check()

Perform validation and checks before workflow execution.
@activity.defn
@auto_heartbeater
async def preflight_check(
    self, 
    workflow_args: Dict[str, Any]
) -> Dict[str, Any]:
    """Validate configuration and resources."""
    pass
workflow_args
Dict[str, Any]
required
Dictionary containing workflow arguments and configuration with a metadata key.
Returns: Dictionary containing check results. Each check should have a success field:
{
    "connection_check": {
        "success": True,
        "message": "Connection successful"
    },
    "credentials_check": {
        "success": True,
        "message": "Credentials valid"
    }
}
The preflight check will fail if:
  • Handler is not initialized
  • No check results are returned
  • Any individual check has success: False
  • Required resources (like SQL client) are not initialized

convert_files()

Convert input files to a specified output format.
@activity.defn
@auto_heartbeater
async def convert_files(
    self, 
    workflow_args: Dict[str, Any]
) -> ActivityResult:
    """Convert files to specified output type."""
    pass
workflow_args
Dict[str, Any]
required
Must contain:
  • input_files: List of file paths to convert
  • output_file_type: Target file type (e.g., “csv”, “parquet”, “json”)
Returns: ActivityResult with status and converted file paths:
ActivityResult(
    status="success",
    message="Successfully converted files to parquet",
    metadata={"input_files": ["output/file1.parquet", "output/file2.parquet"]}
)

Creating Custom Activities

Define custom activities using the @activity.defn decorator.
from application_sdk.activities import ActivitiesInterface
from application_sdk.activities.common.utils import auto_heartbeater
from temporalio import activity
from typing import Dict, Any

class DataProcessingActivities(ActivitiesInterface[DataHandler]):
    
    @activity.defn
    @auto_heartbeater
    async def extract_data(self, workflow_args: Dict[str, Any]) -> Dict[str, Any]:
        """Extract data from source system."""
        state = await self._get_state(workflow_args)
        handler = state.handler
        
        # Send heartbeats for long operations
        activity.heartbeat("Starting extraction")
        
        try:
            data = await handler.fetch_data(
                source=workflow_args["metadata"]["source"],
                query=workflow_args["metadata"]["query"]
            )
            
            activity.heartbeat(f"Extracted {len(data)} records")
            
            return {
                "status": "success",
                "data": data,
                "count": len(data)
            }
        except Exception as e:
            activity.logger.error(f"Extraction failed: {e}")
            raise
    
    @activity.defn
    @auto_heartbeater
    async def transform_data(self, workflow_args: Dict[str, Any]) -> Dict[str, Any]:
        """Transform extracted data."""
        state = await self._get_state(workflow_args)
        
        input_data = workflow_args.get("data", [])
        activity.heartbeat(f"Transforming {len(input_data)} records")
        
        # Perform transformation logic
        transformed = []
        for record in input_data:
            transformed_record = self._transform_record(record)
            transformed.append(transformed_record)
        
        return {
            "status": "success",
            "data": transformed,
            "count": len(transformed)
        }
    
    @activity.defn
    @auto_heartbeater
    async def load_data(self, workflow_args: Dict[str, Any]) -> Dict[str, Any]:
        """Load transformed data to destination."""
        state = await self._get_state(workflow_args)
        handler = state.handler
        
        data = workflow_args.get("data", [])
        destination = workflow_args["metadata"]["destination"]
        
        activity.heartbeat(f"Loading {len(data)} records to {destination}")
        
        result = await handler.load_data(
            data=data,
            destination=destination
        )
        
        return {
            "status": "success",
            "loaded_count": result["count"],
            "destination": destination
        }

Activity Decorators

@activity.defn

Marks a method as a Temporal activity. Required for all activities.
@activity.defn
async def my_activity(self, args: Dict[str, Any]) -> Dict[str, Any]:
    pass

@auto_heartbeater

Automatically sends heartbeats during activity execution. Highly recommended for long-running activities.
from application_sdk.activities.common.utils import auto_heartbeater

@activity.defn
@auto_heartbeater
async def long_running_activity(self, args: Dict[str, Any]) -> Dict[str, Any]:
    # Heartbeats sent automatically in background
    pass
The @auto_heartbeater decorator prevents activities from timing out during long operations by automatically sending heartbeats to Temporal.

Activity Context

Access activity information using the activity module:
from temporalio import activity

@activity.defn
async def my_activity(self, args: Dict[str, Any]):
    # Get activity info
    info = activity.info()
    
    # Log using activity logger
    activity.logger.info(f"Running activity: {info.activity_type}")
    
    # Send heartbeat with progress
    activity.heartbeat({"progress": 50, "message": "Halfway done"})
    
    # Check if activity is being cancelled
    if activity.is_cancelled():
        activity.logger.info("Activity cancelled, cleaning up...")
        raise activity.CancelledError("Activity was cancelled")

Common Activity Utilities

The SDK provides utilities in application_sdk.activities.common.utils:
from application_sdk.activities.common.utils import (
    get_workflow_id,
    get_workflow_run_id,
    build_output_path,
    auto_heartbeater
)

# Get current workflow ID
workflow_id = get_workflow_id()

# Get current run ID
run_id = get_workflow_run_id()

# Build unique output path
output_path = build_output_path()  # Returns: "YYYY/MM/DD/HH/MM/<workflow_id>/<run_id>"

Error Handling

Activities should handle errors appropriately and provide meaningful error messages.
from application_sdk.common.error_codes import OrchestratorError

@activity.defn
@auto_heartbeater
async def risky_activity(self, workflow_args: Dict[str, Any]):
    try:
        state = await self._get_state(workflow_args)
        result = await state.handler.risky_operation()
        return result
    except OrchestratorError as e:
        activity.logger.error(
            f"Orchestrator error: {str(e)}",
            error_code=e.code,
            exc_info=e
        )
        # Clean up state on error
        await self._clean_state()
        raise
    except Exception as err:
        activity.logger.error(f"Unexpected error: {str(err)}", exc_info=err)
        await self._clean_state()
        raise

Best Practices

Always send heartbeats for activities that take more than a few seconds. This allows Temporal to detect worker failures.
@activity.defn
@auto_heartbeater  # Automatic heartbeats
async def process_large_file(self, args: Dict[str, Any]):
    # Manual heartbeat with progress
    for i, chunk in enumerate(file_chunks):
        process_chunk(chunk)
        activity.heartbeat({"progress": i / len(file_chunks) * 100})
Activities may be retried, so design them to be idempotent (safe to execute multiple times).
@activity.defn
async def update_record(self, args: Dict[str, Any]):
    record_id = args["record_id"]
    
    # Check if already processed
    existing = await handler.get_record(record_id)
    if existing and existing["processed"]:
        return {"status": "already_processed", "record_id": record_id}
    
    # Process the record
    result = await handler.process_record(record_id)
    return result
Always clean up activity state when errors occur to prevent stale state from affecting future executions.
try:
    result = await perform_operation()
    return result
except Exception as e:
    await self._clean_state()
    raise
Use structured logging with context to make debugging easier.
activity.logger.info(
    "Processing data",
    extra={
        "workflow_id": get_workflow_id(),
        "record_count": len(data),
        "source": source_name
    }
)

ActivityResult Model

The SDK provides a standard result model for activities:
from application_sdk.activities.common.models import ActivityResult

@activity.defn
async def my_activity(self, args: Dict[str, Any]) -> ActivityResult:
    return ActivityResult(
        status="success",  # or "warning", "error"
        message="Operation completed successfully",
        metadata={
            "records_processed": 1000,
            "duration_seconds": 45.2
        }
    )
  • Workflows - Orchestrate activity execution
  • Handlers - Implement business logic called by activities
  • Clients - Connect to external systems from handlers

Build docs developers (and LLMs) love