Overview
The WorkflowInterface is an abstract base class that defines the structure for all workflow implementations in the Atlan Application SDK. Workflows orchestrate the execution of activities and define the business logic that must complete reliably, even in the face of failures.
Workflows are powered by Temporal , which provides durable execution guarantees. Your workflow code is deterministic and can be replayed.
Key Concepts
Durable Execution : Workflows automatically recover from failures and continue from where they left off
Activity Orchestration : Workflows coordinate the execution of activities
State Management : Workflow state is automatically persisted and restored
Timeouts & Retries : Configurable timeouts and retry policies for activities
Class Definition
from application_sdk.workflows import WorkflowInterface
from application_sdk.activities import ActivitiesInterface
from temporalio import workflow
from typing import Dict, Any
@workflow.defn
class MyWorkflow (WorkflowInterface[MyActivities]):
activities_cls = MyActivities
@ staticmethod
def get_activities ( activities : MyActivities):
return [
activities.extract_data,
activities.transform_data,
activities.load_data
]
@workflow.run
async def run ( self , workflow_config : Dict[ str , Any]) -> None :
# First, call parent run to handle preflight
await super ().run(workflow_config)
# Execute your workflow logic
workflow_args = await self .get_workflow_args(workflow_config)
# Execute activities
result = await workflow.execute_activity_method(
self .activities_cls.extract_data,
workflow_args,
start_to_close_timeout = self .default_start_to_close_timeout,
heartbeat_timeout = self .default_heartbeat_timeout
)
Required Class Attributes
activities_cls
Type[ActivitiesInterface]
required
The activities class to be used by this workflow. Must be set as a class attribute. class MyWorkflow (WorkflowInterface[MyActivities]):
activities_cls = MyActivities
Configurable Timeouts
default_heartbeat_timeout
timedelta
default: "HEARTBEAT_TIMEOUT"
Default heartbeat timeout for activities. Activities must send heartbeats within this interval. from datetime import timedelta
class MyWorkflow (WorkflowInterface[MyActivities]):
default_heartbeat_timeout = timedelta( seconds = 30 )
default_start_to_close_timeout
timedelta
default: "START_TO_CLOSE_TIMEOUT"
Default maximum time for an activity to complete from start to finish. class MyWorkflow (WorkflowInterface[MyActivities]):
default_start_to_close_timeout = timedelta( minutes = 10 )
Required Methods
get_activities()
Static method that returns the list of activity methods this workflow will use.
@ staticmethod
ef get_activities(activities: ActivitiesInterface) -> Sequence[Callable[ ... , Any]]:
"""Return the list of activities for this workflow."""
return [
activities.preflight_check,
activities.extract_data,
activities.transform_data,
activities.load_data,
activities.cleanup
]
activities
ActivitiesInterface
required
An instance of the activities class. Use this to access activity methods.
return
Sequence[Callable[..., Any]]
List of activity methods that can be called from this workflow.
All activities that will be executed by this workflow must be included in the list returned by get_activities(). Missing activities will cause runtime errors.
run()
The main workflow execution method, decorated with @workflow.run.
@workflow.run
async def run ( self , workflow_config : Dict[ str , Any]) -> None :
"""Execute the workflow."""
# Get workflow configuration from state store
workflow_args: Dict[ str , Any] = await workflow.execute_activity_method(
self .activities_cls.get_workflow_args,
workflow_config,
retry_policy = RetryPolicy( maximum_attempts = 3 , backoff_coefficient = 2 ),
start_to_close_timeout = self .default_start_to_close_timeout,
heartbeat_timeout = self .default_heartbeat_timeout
)
# Execute preflight check
await workflow.execute_activity_method(
self .activities_cls.preflight_check,
args = [workflow_args],
retry_policy = RetryPolicy( maximum_attempts = 2 , backoff_coefficient = 2 ),
start_to_close_timeout = self .default_start_to_close_timeout,
heartbeat_timeout = self .default_heartbeat_timeout
)
# Your custom workflow logic here
# ...
Contains the workflow_id and other parameters. The workflow ID is used to extract the full configuration from the state store.
The base WorkflowInterface.run() method automatically:
Retrieves workflow configuration from the state store
Sets up workflow run ID and retry policies
Executes the preflight check activity
You can call await super().run(workflow_config) or implement your own logic.
Executing Activities
Use workflow.execute_activity_method() to execute activities from within your workflow:
from temporalio import workflow
from temporalio.common import RetryPolicy
# Simple activity execution
result = await workflow.execute_activity_method(
self .activities_cls.my_activity,
workflow_args,
start_to_close_timeout = timedelta( minutes = 5 ),
heartbeat_timeout = timedelta( seconds = 30 )
)
# With custom retry policy
result = await workflow.execute_activity_method(
self .activities_cls.risky_operation,
workflow_args,
retry_policy = RetryPolicy(
maximum_attempts = 5 ,
initial_interval = timedelta( seconds = 1 ),
maximum_interval = timedelta( seconds = 60 ),
backoff_coefficient = 2.0
),
start_to_close_timeout = timedelta( minutes = 10 )
)
Activity Execution Parameters
Defines how activities should be retried on failure. Fields:
maximum_attempts: Maximum number of attempts (default: unlimited)
initial_interval: Initial retry interval
maximum_interval: Maximum retry interval
backoff_coefficient: Multiplier for exponential backoff
Maximum time for the activity to complete from start to finish.
Maximum time between heartbeats. Activities must send heartbeats within this interval or they will be considered failed.
Complete Workflow Example
from application_sdk.workflows import WorkflowInterface
from application_sdk.activities import ActivitiesInterface
from temporalio import workflow
from temporalio.common import RetryPolicy
from datetime import timedelta
from typing import Dict, Any, Sequence, Callable
class DataPipelineActivities ( ActivitiesInterface ):
async def extract_data ( self , args : Dict[ str , Any]) -> Dict[ str , Any]:
# Extract data implementation
pass
async def transform_data ( self , args : Dict[ str , Any]) -> Dict[ str , Any]:
# Transform data implementation
pass
async def load_data ( self , args : Dict[ str , Any]) -> Dict[ str , Any]:
# Load data implementation
pass
@workflow.defn
class DataPipelineWorkflow (WorkflowInterface[DataPipelineActivities]):
"""ETL workflow that extracts, transforms, and loads data."""
activities_cls = DataPipelineActivities
# Custom timeouts for this workflow
default_heartbeat_timeout = timedelta( seconds = 30 )
default_start_to_close_timeout = timedelta( minutes = 15 )
@ staticmethod
def get_activities ( activities : DataPipelineActivities) -> Sequence[Callable[ ... , Any]]:
return [
activities.get_workflow_args,
activities.preflight_check,
activities.extract_data,
activities.transform_data,
activities.load_data
]
@workflow.run
async def run ( self , workflow_config : Dict[ str , Any]) -> None :
"""Execute the ETL pipeline."""
workflow.logger.info( "Starting ETL pipeline" )
# Get configuration from state store
workflow_args = await workflow.execute_activity_method(
self .activities_cls.get_workflow_args,
workflow_config,
retry_policy = RetryPolicy( maximum_attempts = 3 ),
start_to_close_timeout = self .default_start_to_close_timeout,
heartbeat_timeout = self .default_heartbeat_timeout
)
# Run preflight checks
await workflow.execute_activity_method(
self .activities_cls.preflight_check,
args = [workflow_args],
retry_policy = RetryPolicy( maximum_attempts = 2 ),
start_to_close_timeout = self .default_start_to_close_timeout,
heartbeat_timeout = self .default_heartbeat_timeout
)
# Extract
extracted_data = await workflow.execute_activity_method(
self .activities_cls.extract_data,
workflow_args,
start_to_close_timeout = timedelta( minutes = 30 ),
heartbeat_timeout = timedelta( minutes = 5 )
)
# Transform
transformed_data = await workflow.execute_activity_method(
self .activities_cls.transform_data,
extracted_data,
start_to_close_timeout = timedelta( minutes = 20 ),
heartbeat_timeout = timedelta( minutes = 3 )
)
# Load
await workflow.execute_activity_method(
self .activities_cls.load_data,
transformed_data,
start_to_close_timeout = timedelta( minutes = 15 ),
heartbeat_timeout = timedelta( minutes = 2 )
)
workflow.logger.info( "ETL pipeline completed successfully" )
Workflow Determinism
Workflows must be deterministic! Avoid:
Random number generation
Current time/date access (use workflow.now() instead)
Direct I/O operations
Non-deterministic threading
All side effects should happen in activities, not workflows.
Best Practices
Use Activities for Non-Deterministic Operations
All I/O, API calls, database queries, and other non-deterministic operations should be in activities, not workflows. # ❌ Bad - I/O in workflow
@workflow.run
async def run ( self , config ):
with open ( "file.txt" ) as f: # Don't do this!
data = f.read()
# ✅ Good - I/O in activity
@workflow.run
async def run ( self , config ):
data = await workflow.execute_activity_method(
self .activities_cls.read_file,
{ "path" : "file.txt" }
)
Always set realistic timeouts based on your activity’s expected execution time. Too short and activities fail prematurely; too long and you waste resources. # Quick operations
await workflow.execute_activity_method(
self .activities_cls.validate_input,
args,
start_to_close_timeout = timedelta( seconds = 30 )
)
# Long-running operations
await workflow.execute_activity_method(
self .activities_cls.process_large_dataset,
args,
start_to_close_timeout = timedelta( hours = 2 ),
heartbeat_timeout = timedelta( minutes = 5 )
)
Handle Failures Gracefully
Use try-except blocks to handle activity failures and implement compensating logic. try :
await workflow.execute_activity_method(
self .activities_cls.charge_payment,
payment_info
)
except Exception as e:
workflow.logger.error( f "Payment failed: { e } " )
# Compensating action
await workflow.execute_activity_method(
self .activities_cls.send_failure_notification,
{ "error" : str (e)}
)
raise
Activities - Implement tasks executed by workflows
Application - Register and run workflows
Worker - Execute workflow and activity code