Skip to main content
Workflows define the orchestration logic for your application, coordinating activities and managing state.

WorkflowInterface

application_sdk.workflows.WorkflowInterface Abstract base class for all workflow implementations using Temporal.

Class Attributes

activities_cls
Type[ActivitiesInterface]
required
The activities class to be used by the workflow. Must be set by subclasses.
default_heartbeat_timeout
timedelta
default:"HEARTBEAT_TIMEOUT"
Default heartbeat timeout for workflow activities
default_start_to_close_timeout
timedelta
default:"START_TO_CLOSE_TIMEOUT"
Default start-to-close timeout for workflow activities

Methods

get_activities

Get the sequence of activities for this workflow.
@staticmethod
def get_activities(
    activities: ActivitiesInterface
) -> Sequence[Callable[..., Any]]
activities
ActivitiesInterface
required
The activities interface instance
return
Sequence[Callable[..., Any]]
List of activity methods to be executed
This method must be implemented by subclasses to define which activities are available to the workflow.

run

Run the workflow with the given configuration.
@workflow.run
async def run(self, workflow_config: Dict[str, Any]) -> None
workflow_config
Dict[str, Any]
required
Workflow configuration including:
  • workflow_id: Unique identifier for the workflow
  • Additional parameters specific to the workflow
behavior
Description
The base implementation:
  1. Extracts workflow configuration from the state store
  2. Sets up workflow run ID and retry policy
  3. Executes the preflight check activity
Subclasses should call super().run(workflow_config) first, then add custom logic.

Example Usage

Basic Workflow

from application_sdk.workflows import WorkflowInterface
from application_sdk.activities import ActivitiesInterface
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta

class DataProcessingActivities(ActivitiesInterface):
    async def extract_data(self, workflow_args):
        # Extract data
        return {"data": []}
    
    async def transform_data(self, workflow_args):
        # Transform data
        return {"transformed": []}
    
    async def load_data(self, workflow_args):
        # Load data
        return {"loaded": True}

class DataProcessingWorkflow(WorkflowInterface):
    activities_cls = DataProcessingActivities
    
    @staticmethod
    def get_activities(activities):
        return [
            activities.extract_data,
            activities.transform_data,
            activities.load_data
        ]
    
    async def run(self, workflow_config):
        # Call parent to handle preflight
        workflow_args = await workflow.execute_activity_method(
            self.activities_cls.get_workflow_args,
            workflow_config,
            retry_policy=RetryPolicy(maximum_attempts=3),
            start_to_close_timeout=self.default_start_to_close_timeout,
        )
        
        await workflow.execute_activity_method(
            self.activities_cls.preflight_check,
            args=[workflow_args],
            start_to_close_timeout=self.default_start_to_close_timeout,
        )
        
        # ETL pipeline
        extracted = await workflow.execute_activity_method(
            self.activities_cls.extract_data,
            workflow_args,
            start_to_close_timeout=timedelta(minutes=30),
            heartbeat_timeout=timedelta(minutes=5)
        )
        
        transformed = await workflow.execute_activity_method(
            self.activities_cls.transform_data,
            {**workflow_args, **extracted},
            start_to_close_timeout=timedelta(minutes=30)
        )
        
        loaded = await workflow.execute_activity_method(
            self.activities_cls.load_data,
            {**workflow_args, **transformed},
            start_to_close_timeout=timedelta(minutes=15)
        )
        
        return loaded

Workflow with Parallel Activities

from temporalio import workflow
import asyncio

class ParallelWorkflow(WorkflowInterface):
    activities_cls = ParallelActivities
    
    @staticmethod
    def get_activities(activities):
        return [
            activities.process_source_a,
            activities.process_source_b,
            activities.process_source_c,
            activities.merge_results
        ]
    
    async def run(self, workflow_config):
        workflow_args = await workflow.execute_activity_method(
            self.activities_cls.get_workflow_args,
            workflow_config,
            start_to_close_timeout=self.default_start_to_close_timeout,
        )
        
        await workflow.execute_activity_method(
            self.activities_cls.preflight_check,
            args=[workflow_args],
            start_to_close_timeout=self.default_start_to_close_timeout,
        )
        
        # Execute multiple sources in parallel
        results = await asyncio.gather(
            workflow.execute_activity_method(
                self.activities_cls.process_source_a,
                workflow_args,
                start_to_close_timeout=timedelta(minutes=20)
            ),
            workflow.execute_activity_method(
                self.activities_cls.process_source_b,
                workflow_args,
                start_to_close_timeout=timedelta(minutes=20)
            ),
            workflow.execute_activity_method(
                self.activities_cls.process_source_c,
                workflow_args,
                start_to_close_timeout=timedelta(minutes=20)
            )
        )
        
        # Merge results
        merged = await workflow.execute_activity_method(
            self.activities_cls.merge_results,
            {**workflow_args, "results": results},
            start_to_close_timeout=timedelta(minutes=10)
        )
        
        return merged

Workflow with Error Handling

from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError

class ResilientWorkflow(WorkflowInterface):
    activities_cls = ResilientActivities
    
    @staticmethod
    def get_activities(activities):
        return [
            activities.critical_task,
            activities.optional_task,
            activities.cleanup_task
        ]
    
    async def run(self, workflow_config):
        workflow_args = await workflow.execute_activity_method(
            self.activities_cls.get_workflow_args,
            workflow_config,
            start_to_close_timeout=self.default_start_to_close_timeout,
        )
        
        await workflow.execute_activity_method(
            self.activities_cls.preflight_check,
            args=[workflow_args],
            start_to_close_timeout=self.default_start_to_close_timeout,
        )
        
        try:
            # Critical task with aggressive retry
            result = await workflow.execute_activity_method(
                self.activities_cls.critical_task,
                workflow_args,
                retry_policy=RetryPolicy(
                    maximum_attempts=5,
                    initial_interval=timedelta(seconds=1),
                    backoff_coefficient=2.0
                ),
                start_to_close_timeout=timedelta(minutes=30)
            )
            
            # Optional task - continue on failure
            try:
                await workflow.execute_activity_method(
                    self.activities_cls.optional_task,
                    workflow_args,
                    retry_policy=RetryPolicy(maximum_attempts=1),
                    start_to_close_timeout=timedelta(minutes=5)
                )
            except ActivityError as e:
                workflow.logger.warning(f"Optional task failed: {e}")
            
            return result
            
        finally:
            # Always run cleanup
            await workflow.execute_activity_method(
                self.activities_cls.cleanup_task,
                workflow_args,
                start_to_close_timeout=timedelta(minutes=5)
            )

Workflow with Custom Timeouts

class CustomTimeoutWorkflow(WorkflowInterface):
    activities_cls = CustomActivities
    
    # Override default timeouts
    default_heartbeat_timeout = timedelta(minutes=2)
    default_start_to_close_timeout = timedelta(hours=1)
    
    @staticmethod
    def get_activities(activities):
        return [activities.long_running_task]
    
    async def run(self, workflow_config):
        workflow_args = await workflow.execute_activity_method(
            self.activities_cls.get_workflow_args,
            workflow_config,
            start_to_close_timeout=self.default_start_to_close_timeout,
        )
        
        await workflow.execute_activity_method(
            self.activities_cls.preflight_check,
            args=[workflow_args],
            start_to_close_timeout=self.default_start_to_close_timeout,
        )
        
        # Long running task with custom timeouts
        result = await workflow.execute_activity_method(
            self.activities_cls.long_running_task,
            workflow_args,
            start_to_close_timeout=timedelta(hours=2),
            heartbeat_timeout=timedelta(minutes=5),
            schedule_to_close_timeout=timedelta(hours=3)
        )
        
        return result

Best Practices

Activity Execution

  • Always specify start_to_close_timeout for activities
  • Use heartbeat_timeout for long-running activities
  • Configure appropriate retry policies based on activity characteristics

Error Handling

  • Catch and handle activity errors appropriately
  • Use try/finally blocks for cleanup operations
  • Log errors with sufficient context

State Management

  • Store workflow state in the StateStore service
  • Use workflow arguments to pass data between activities
  • Avoid storing large data in workflow history

Performance

  • Execute independent activities in parallel using asyncio.gather
  • Set appropriate timeout values to avoid hanging workflows
  • Use heartbeats for long-running activities to enable progress tracking

Build docs developers (and LLMs) love