Skip to main content

Overview

The WorkflowInterface is an abstract base class that defines the structure for all workflow implementations in the Atlan Application SDK. Workflows orchestrate the execution of activities and define the business logic that must complete reliably, even in the face of failures.
Workflows are powered by Temporal, which provides durable execution guarantees. Your workflow code is deterministic and can be replayed.

Key Concepts

  • Durable Execution: Workflows automatically recover from failures and continue from where they left off
  • Activity Orchestration: Workflows coordinate the execution of activities
  • State Management: Workflow state is automatically persisted and restored
  • Timeouts & Retries: Configurable timeouts and retry policies for activities

Class Definition

from application_sdk.workflows import WorkflowInterface
from application_sdk.activities import ActivitiesInterface
from temporalio import workflow
from typing import Dict, Any

@workflow.defn
class MyWorkflow(WorkflowInterface[MyActivities]):
    activities_cls = MyActivities
    
    @staticmethod
    def get_activities(activities: MyActivities):
        return [
            activities.extract_data,
            activities.transform_data,
            activities.load_data
        ]
    
    @workflow.run
    async def run(self, workflow_config: Dict[str, Any]) -> None:
        # First, call parent run to handle preflight
        await super().run(workflow_config)
        
        # Execute your workflow logic
        workflow_args = await self.get_workflow_args(workflow_config)
        
        # Execute activities
        result = await workflow.execute_activity_method(
            self.activities_cls.extract_data,
            workflow_args,
            start_to_close_timeout=self.default_start_to_close_timeout,
            heartbeat_timeout=self.default_heartbeat_timeout
        )

Required Class Attributes

activities_cls
Type[ActivitiesInterface]
required
The activities class to be used by this workflow. Must be set as a class attribute.
class MyWorkflow(WorkflowInterface[MyActivities]):
    activities_cls = MyActivities

Configurable Timeouts

default_heartbeat_timeout
timedelta
default:"HEARTBEAT_TIMEOUT"
Default heartbeat timeout for activities. Activities must send heartbeats within this interval.
from datetime import timedelta

class MyWorkflow(WorkflowInterface[MyActivities]):
    default_heartbeat_timeout = timedelta(seconds=30)
default_start_to_close_timeout
timedelta
default:"START_TO_CLOSE_TIMEOUT"
Default maximum time for an activity to complete from start to finish.
class MyWorkflow(WorkflowInterface[MyActivities]):
    default_start_to_close_timeout = timedelta(minutes=10)

Required Methods

get_activities()

Static method that returns the list of activity methods this workflow will use.
@staticmethod
ef get_activities(activities: ActivitiesInterface) -> Sequence[Callable[..., Any]]:
    """Return the list of activities for this workflow."""
    return [
        activities.preflight_check,
        activities.extract_data,
        activities.transform_data,
        activities.load_data,
        activities.cleanup
    ]
activities
ActivitiesInterface
required
An instance of the activities class. Use this to access activity methods.
return
Sequence[Callable[..., Any]]
List of activity methods that can be called from this workflow.
All activities that will be executed by this workflow must be included in the list returned by get_activities(). Missing activities will cause runtime errors.

run()

The main workflow execution method, decorated with @workflow.run.
@workflow.run
async def run(self, workflow_config: Dict[str, Any]) -> None:
    """Execute the workflow."""
    # Get workflow configuration from state store
    workflow_args: Dict[str, Any] = await workflow.execute_activity_method(
        self.activities_cls.get_workflow_args,
        workflow_config,
        retry_policy=RetryPolicy(maximum_attempts=3, backoff_coefficient=2),
        start_to_close_timeout=self.default_start_to_close_timeout,
        heartbeat_timeout=self.default_heartbeat_timeout
    )
    
    # Execute preflight check
    await workflow.execute_activity_method(
        self.activities_cls.preflight_check,
        args=[workflow_args],
        retry_policy=RetryPolicy(maximum_attempts=2, backoff_coefficient=2),
        start_to_close_timeout=self.default_start_to_close_timeout,
        heartbeat_timeout=self.default_heartbeat_timeout
    )
    
    # Your custom workflow logic here
    # ...
workflow_config
Dict[str, Any]
required
Contains the workflow_id and other parameters. The workflow ID is used to extract the full configuration from the state store.
The base WorkflowInterface.run() method automatically:
  1. Retrieves workflow configuration from the state store
  2. Sets up workflow run ID and retry policies
  3. Executes the preflight check activity
You can call await super().run(workflow_config) or implement your own logic.

Executing Activities

Use workflow.execute_activity_method() to execute activities from within your workflow:
from temporalio import workflow
from temporalio.common import RetryPolicy

# Simple activity execution
result = await workflow.execute_activity_method(
    self.activities_cls.my_activity,
    workflow_args,
    start_to_close_timeout=timedelta(minutes=5),
    heartbeat_timeout=timedelta(seconds=30)
)

# With custom retry policy
result = await workflow.execute_activity_method(
    self.activities_cls.risky_operation,
    workflow_args,
    retry_policy=RetryPolicy(
        maximum_attempts=5,
        initial_interval=timedelta(seconds=1),
        maximum_interval=timedelta(seconds=60),
        backoff_coefficient=2.0
    ),
    start_to_close_timeout=timedelta(minutes=10)
)

Activity Execution Parameters

retry_policy
RetryPolicy
Defines how activities should be retried on failure.Fields:
  • maximum_attempts: Maximum number of attempts (default: unlimited)
  • initial_interval: Initial retry interval
  • maximum_interval: Maximum retry interval
  • backoff_coefficient: Multiplier for exponential backoff
start_to_close_timeout
timedelta
required
Maximum time for the activity to complete from start to finish.
heartbeat_timeout
timedelta
Maximum time between heartbeats. Activities must send heartbeats within this interval or they will be considered failed.

Complete Workflow Example

from application_sdk.workflows import WorkflowInterface
from application_sdk.activities import ActivitiesInterface
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
from typing import Dict, Any, Sequence, Callable

class DataPipelineActivities(ActivitiesInterface):
    async def extract_data(self, args: Dict[str, Any]) -> Dict[str, Any]:
        # Extract data implementation
        pass
    
    async def transform_data(self, args: Dict[str, Any]) -> Dict[str, Any]:
        # Transform data implementation
        pass
    
    async def load_data(self, args: Dict[str, Any]) -> Dict[str, Any]:
        # Load data implementation
        pass

@workflow.defn
class DataPipelineWorkflow(WorkflowInterface[DataPipelineActivities]):
    """ETL workflow that extracts, transforms, and loads data."""
    
    activities_cls = DataPipelineActivities
    
    # Custom timeouts for this workflow
    default_heartbeat_timeout = timedelta(seconds=30)
    default_start_to_close_timeout = timedelta(minutes=15)
    
    @staticmethod
    def get_activities(activities: DataPipelineActivities) -> Sequence[Callable[..., Any]]:
        return [
            activities.get_workflow_args,
            activities.preflight_check,
            activities.extract_data,
            activities.transform_data,
            activities.load_data
        ]
    
    @workflow.run
    async def run(self, workflow_config: Dict[str, Any]) -> None:
        """Execute the ETL pipeline."""
        workflow.logger.info("Starting ETL pipeline")
        
        # Get configuration from state store
        workflow_args = await workflow.execute_activity_method(
            self.activities_cls.get_workflow_args,
            workflow_config,
            retry_policy=RetryPolicy(maximum_attempts=3),
            start_to_close_timeout=self.default_start_to_close_timeout,
            heartbeat_timeout=self.default_heartbeat_timeout
        )
        
        # Run preflight checks
        await workflow.execute_activity_method(
            self.activities_cls.preflight_check,
            args=[workflow_args],
            retry_policy=RetryPolicy(maximum_attempts=2),
            start_to_close_timeout=self.default_start_to_close_timeout,
            heartbeat_timeout=self.default_heartbeat_timeout
        )
        
        # Extract
        extracted_data = await workflow.execute_activity_method(
            self.activities_cls.extract_data,
            workflow_args,
            start_to_close_timeout=timedelta(minutes=30),
            heartbeat_timeout=timedelta(minutes=5)
        )
        
        # Transform
        transformed_data = await workflow.execute_activity_method(
            self.activities_cls.transform_data,
            extracted_data,
            start_to_close_timeout=timedelta(minutes=20),
            heartbeat_timeout=timedelta(minutes=3)
        )
        
        # Load
        await workflow.execute_activity_method(
            self.activities_cls.load_data,
            transformed_data,
            start_to_close_timeout=timedelta(minutes=15),
            heartbeat_timeout=timedelta(minutes=2)
        )
        
        workflow.logger.info("ETL pipeline completed successfully")

Workflow Determinism

Workflows must be deterministic! Avoid:
  • Random number generation
  • Current time/date access (use workflow.now() instead)
  • Direct I/O operations
  • Non-deterministic threading
All side effects should happen in activities, not workflows.

Best Practices

All I/O, API calls, database queries, and other non-deterministic operations should be in activities, not workflows.
# ❌ Bad - I/O in workflow
@workflow.run
async def run(self, config):
    with open("file.txt") as f:  # Don't do this!
        data = f.read()

# ✅ Good - I/O in activity
@workflow.run
async def run(self, config):
    data = await workflow.execute_activity_method(
        self.activities_cls.read_file,
        {"path": "file.txt"}
    )
Always set realistic timeouts based on your activity’s expected execution time. Too short and activities fail prematurely; too long and you waste resources.
# Quick operations
await workflow.execute_activity_method(
    self.activities_cls.validate_input,
    args,
    start_to_close_timeout=timedelta(seconds=30)
)

# Long-running operations
await workflow.execute_activity_method(
    self.activities_cls.process_large_dataset,
    args,
    start_to_close_timeout=timedelta(hours=2),
    heartbeat_timeout=timedelta(minutes=5)
)
Use try-except blocks to handle activity failures and implement compensating logic.
try:
    await workflow.execute_activity_method(
        self.activities_cls.charge_payment,
        payment_info
    )
except Exception as e:
    workflow.logger.error(f"Payment failed: {e}")
    # Compensating action
    await workflow.execute_activity_method(
        self.activities_cls.send_failure_notification,
        {"error": str(e)}
    )
    raise
  • Activities - Implement tasks executed by workflows
  • Application - Register and run workflows
  • Worker - Execute workflow and activity code

Build docs developers (and LLMs) love