Workflows define the orchestration logic for your application, coordinating activities and managing state.
WorkflowInterface
application_sdk.workflows.WorkflowInterface
Abstract base class for all workflow implementations using Temporal.
Class Attributes
activities_cls
Type[ActivitiesInterface]
required
The activities class to be used by the workflow. Must be set by subclasses.
default_heartbeat_timeout
timedelta
default:"HEARTBEAT_TIMEOUT"
Default heartbeat timeout for workflow activities
default_start_to_close_timeout
timedelta
default:"START_TO_CLOSE_TIMEOUT"
Default start-to-close timeout for workflow activities
Methods
get_activities
Get the sequence of activities for this workflow.
@staticmethod
def get_activities(
activities: ActivitiesInterface
) -> Sequence[Callable[..., Any]]
activities
ActivitiesInterface
required
The activities interface instance
return
Sequence[Callable[..., Any]]
List of activity methods to be executed
This method must be implemented by subclasses to define which activities are available to the workflow.
run
Run the workflow with the given configuration.
@workflow.run
async def run(self, workflow_config: Dict[str, Any]) -> None
Workflow configuration including:
workflow_id: Unique identifier for the workflow
- Additional parameters specific to the workflow
The base implementation:
- Extracts workflow configuration from the state store
- Sets up workflow run ID and retry policy
- Executes the preflight check activity
Subclasses should call super().run(workflow_config) first, then add custom logic.
Example Usage
Basic Workflow
from application_sdk.workflows import WorkflowInterface
from application_sdk.activities import ActivitiesInterface
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
class DataProcessingActivities(ActivitiesInterface):
async def extract_data(self, workflow_args):
# Extract data
return {"data": []}
async def transform_data(self, workflow_args):
# Transform data
return {"transformed": []}
async def load_data(self, workflow_args):
# Load data
return {"loaded": True}
class DataProcessingWorkflow(WorkflowInterface):
activities_cls = DataProcessingActivities
@staticmethod
def get_activities(activities):
return [
activities.extract_data,
activities.transform_data,
activities.load_data
]
async def run(self, workflow_config):
# Call parent to handle preflight
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config,
retry_policy=RetryPolicy(maximum_attempts=3),
start_to_close_timeout=self.default_start_to_close_timeout,
)
await workflow.execute_activity_method(
self.activities_cls.preflight_check,
args=[workflow_args],
start_to_close_timeout=self.default_start_to_close_timeout,
)
# ETL pipeline
extracted = await workflow.execute_activity_method(
self.activities_cls.extract_data,
workflow_args,
start_to_close_timeout=timedelta(minutes=30),
heartbeat_timeout=timedelta(minutes=5)
)
transformed = await workflow.execute_activity_method(
self.activities_cls.transform_data,
{**workflow_args, **extracted},
start_to_close_timeout=timedelta(minutes=30)
)
loaded = await workflow.execute_activity_method(
self.activities_cls.load_data,
{**workflow_args, **transformed},
start_to_close_timeout=timedelta(minutes=15)
)
return loaded
Workflow with Parallel Activities
from temporalio import workflow
import asyncio
class ParallelWorkflow(WorkflowInterface):
activities_cls = ParallelActivities
@staticmethod
def get_activities(activities):
return [
activities.process_source_a,
activities.process_source_b,
activities.process_source_c,
activities.merge_results
]
async def run(self, workflow_config):
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config,
start_to_close_timeout=self.default_start_to_close_timeout,
)
await workflow.execute_activity_method(
self.activities_cls.preflight_check,
args=[workflow_args],
start_to_close_timeout=self.default_start_to_close_timeout,
)
# Execute multiple sources in parallel
results = await asyncio.gather(
workflow.execute_activity_method(
self.activities_cls.process_source_a,
workflow_args,
start_to_close_timeout=timedelta(minutes=20)
),
workflow.execute_activity_method(
self.activities_cls.process_source_b,
workflow_args,
start_to_close_timeout=timedelta(minutes=20)
),
workflow.execute_activity_method(
self.activities_cls.process_source_c,
workflow_args,
start_to_close_timeout=timedelta(minutes=20)
)
)
# Merge results
merged = await workflow.execute_activity_method(
self.activities_cls.merge_results,
{**workflow_args, "results": results},
start_to_close_timeout=timedelta(minutes=10)
)
return merged
Workflow with Error Handling
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError
class ResilientWorkflow(WorkflowInterface):
activities_cls = ResilientActivities
@staticmethod
def get_activities(activities):
return [
activities.critical_task,
activities.optional_task,
activities.cleanup_task
]
async def run(self, workflow_config):
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config,
start_to_close_timeout=self.default_start_to_close_timeout,
)
await workflow.execute_activity_method(
self.activities_cls.preflight_check,
args=[workflow_args],
start_to_close_timeout=self.default_start_to_close_timeout,
)
try:
# Critical task with aggressive retry
result = await workflow.execute_activity_method(
self.activities_cls.critical_task,
workflow_args,
retry_policy=RetryPolicy(
maximum_attempts=5,
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0
),
start_to_close_timeout=timedelta(minutes=30)
)
# Optional task - continue on failure
try:
await workflow.execute_activity_method(
self.activities_cls.optional_task,
workflow_args,
retry_policy=RetryPolicy(maximum_attempts=1),
start_to_close_timeout=timedelta(minutes=5)
)
except ActivityError as e:
workflow.logger.warning(f"Optional task failed: {e}")
return result
finally:
# Always run cleanup
await workflow.execute_activity_method(
self.activities_cls.cleanup_task,
workflow_args,
start_to_close_timeout=timedelta(minutes=5)
)
Workflow with Custom Timeouts
class CustomTimeoutWorkflow(WorkflowInterface):
activities_cls = CustomActivities
# Override default timeouts
default_heartbeat_timeout = timedelta(minutes=2)
default_start_to_close_timeout = timedelta(hours=1)
@staticmethod
def get_activities(activities):
return [activities.long_running_task]
async def run(self, workflow_config):
workflow_args = await workflow.execute_activity_method(
self.activities_cls.get_workflow_args,
workflow_config,
start_to_close_timeout=self.default_start_to_close_timeout,
)
await workflow.execute_activity_method(
self.activities_cls.preflight_check,
args=[workflow_args],
start_to_close_timeout=self.default_start_to_close_timeout,
)
# Long running task with custom timeouts
result = await workflow.execute_activity_method(
self.activities_cls.long_running_task,
workflow_args,
start_to_close_timeout=timedelta(hours=2),
heartbeat_timeout=timedelta(minutes=5),
schedule_to_close_timeout=timedelta(hours=3)
)
return result
Best Practices
Activity Execution
- Always specify
start_to_close_timeout for activities
- Use
heartbeat_timeout for long-running activities
- Configure appropriate retry policies based on activity characteristics
Error Handling
- Catch and handle activity errors appropriately
- Use try/finally blocks for cleanup operations
- Log errors with sufficient context
State Management
- Store workflow state in the StateStore service
- Use workflow arguments to pass data between activities
- Avoid storing large data in workflow history
- Execute independent activities in parallel using
asyncio.gather
- Set appropriate timeout values to avoid hanging workflows
- Use heartbeats for long-running activities to enable progress tracking