Activities implement the individual tasks that make up a workflow, handling data processing, external API calls, and state management.
ActivitiesInterface
application_sdk.activities.ActivitiesInterface
Abstract base class defining the interface for workflow activities with built-in state management.
State Management
Activities maintain state through the ActivitiesState model, which is automatically managed per workflow execution.
_get_state
Retrieve the state for the current workflow.
await self._get_state(workflow_args: Dict[str, Any]) -> ActivitiesState
Dictionary containing workflow arguments and configuration
The state data for the current workflow. Automatically initialized if it doesn’t exist.
State is automatically refreshed if more than 15 minutes have passed since the last update.
_set_state
Initialize or update the state for the current workflow.
await self._set_state(workflow_args: Dict[str, Any]) -> None
Arguments for the workflow, containing configuration and runtime parameters
_clean_state
Remove the state data for the current workflow.
await self._clean_state() -> None
Typically called during cleanup or error handling. Failures are logged but don’t raise exceptions.
Built-in Activities
get_workflow_args
Safely retrieve workflow configuration from state store.
@activity.defn
@auto_heartbeater
async def get_workflow_args(
self,
workflow_config: Dict[str, Any]
) -> Dict[str, Any]
Dictionary containing workflow_id and other parameters
Complete workflow configuration with:
- All configuration from state store
output_prefix: Output path prefix
output_path: Full output path
workflow_id: Workflow identifier
workflow_run_id: Workflow run identifier
- Preserved
atlan-* prefixed keys for logging context
If configuration cannot be retrieved from state store
preflight_check
Perform preflight checks before workflow execution.
@activity.defn
@auto_heartbeater
async def preflight_check(
self,
workflow_args: Dict[str, Any]
) -> Dict[str, Any]
Dictionary containing workflow arguments and configuration. Must include a metadata key.
Dictionary containing the results of the preflight check
If the handler is not found or if the preflight check fails
convert_files
Convert input files to the specified output type.
@activity.defn
@auto_heartbeater
async def convert_files(
self,
workflow_args: Dict[str, Any]
) -> ActivityResult
Dictionary containing:
input_files: List of input file paths
output_file_type: Target file type (e.g., “parquet”, “json”, “csv”)
Result object with:
status: “success” or “warning”
message: Description of the operation
metadata.input_files: List of converted file paths
ActivitiesState
application_sdk.activities.ActivitiesState
Base state model for workflow activities.
Attributes
Handler instance for activity-specific operations. Must be a subclass of HandlerInterface.
Arguments passed to the workflow containing configuration and runtime parameters
Timestamp of the last state update, used for automatic state refresh
Example Usage
Basic Activities
from application_sdk.activities import ActivitiesInterface, ActivitiesState
from application_sdk.handlers.base import BaseHandler
from temporalio import activity
class MyHandler(BaseHandler):
async def fetch_data(self, source):
# Fetch data from source
return {"data": []}
class MyActivitiesState(ActivitiesState):
handler: MyHandler
class MyActivities(ActivitiesInterface[MyActivitiesState]):
@activity.defn
async def initialize(self, workflow_args):
"""Initialize handler with credentials."""
state = await self._get_state(workflow_args)
# Load handler with credentials
handler = MyHandler()
await handler.load(workflow_args["credentials"])
# Update state
state.handler = handler
await self._set_state(workflow_args)
@activity.defn
async def process_data(self, workflow_args):
"""Process data using the handler."""
state = await self._get_state(workflow_args)
# Use handler from state
data = await state.handler.fetch_data(
workflow_args["source"]
)
# Process data
processed = self._transform_data(data)
return {"processed_count": len(processed)}
def _transform_data(self, data):
# Transform logic
return data
Activities with Error Handling
from application_sdk.activities import ActivitiesInterface
from application_sdk.common.error_codes import OrchestratorError
from temporalio import activity
class RobustActivities(ActivitiesInterface):
@activity.defn
async def fetch_with_retry(self, workflow_args):
"""Fetch data with automatic retry on failure."""
state = await self._get_state(workflow_args)
try:
# Heartbeat to show activity is alive
activity.heartbeat("Starting data fetch")
result = await state.handler.fetch_metadata(
metadata=workflow_args["metadata"]
)
activity.heartbeat("Data fetch completed")
return result
except OrchestratorError as e:
activity.logger.error(
f"Orchestrator error: {e}",
error_code=e.code
)
await self._clean_state()
raise
except Exception as e:
activity.logger.error(f"Unexpected error: {e}")
await self._clean_state()
raise
Activities with Progress Tracking
from application_sdk.activities import ActivitiesInterface
from temporalio import activity
class ProgressTrackingActivities(ActivitiesInterface):
@activity.defn
async def batch_process(self, workflow_args):
"""Process data in batches with progress tracking."""
state = await self._get_state(workflow_args)
items = workflow_args["items"]
batch_size = workflow_args.get("batch_size", 100)
processed = 0
total = len(items)
for i in range(0, total, batch_size):
batch = items[i:i+batch_size]
# Process batch
await self._process_batch(batch, state)
# Update progress
processed += len(batch)
progress = (processed / total) * 100
# Send heartbeat with progress
activity.heartbeat(f"Processed {processed}/{total} items ({progress:.1f}%)")
return {"total_processed": processed}
async def _process_batch(self, batch, state):
# Process batch
pass
Activities with State Cleanup
from application_sdk.activities import ActivitiesInterface
from temporalio import activity
class CleanupActivities(ActivitiesInterface):
@activity.defn
async def process_with_cleanup(self, workflow_args):
"""Process data with guaranteed cleanup."""
state = await self._get_state(workflow_args)
try:
# Processing logic
result = await state.handler.fetch_metadata(
metadata=workflow_args["metadata"]
)
return result
finally:
# Always cleanup state
await self._clean_state()
# Close any open connections
if state.handler and hasattr(state.handler, 'client'):
await state.handler.client.close()
Custom State Model
from application_sdk.activities import ActivitiesInterface, ActivitiesState
from typing import Dict, Any, List
from datetime import datetime
class CustomState(ActivitiesState):
"""Custom state with additional fields."""
handler: MyHandler
processed_items: List[str] = []
start_time: datetime = None
metrics: Dict[str, Any] = {}
class CustomActivities(ActivitiesInterface[CustomState]):
@activity.defn
async def initialize(self, workflow_args):
state = await self._get_state(workflow_args)
state.start_time = datetime.now()
state.metrics = {"items_processed": 0}
await self._set_state(workflow_args)
@activity.defn
async def process_item(self, workflow_args):
state = await self._get_state(workflow_args)
item_id = workflow_args["item_id"]
# Process item
# ...
# Update state
state.processed_items.append(item_id)
state.metrics["items_processed"] += 1
await self._set_state(workflow_args)
return {"item_id": item_id, "status": "processed"}
Best Practices
State Management
- Always use
_get_state() to access current state
- Call
_clean_state() in finally blocks for proper cleanup
- Keep state objects lightweight
- Use custom state models for activity-specific data
Error Handling
- Catch and log errors with appropriate context
- Clean up state on errors
- Use appropriate error types (OrchestratorError, etc.)
- Close resources in finally blocks
Activity Design
- Make activities idempotent when possible
- Send heartbeats for long-running activities
- Break down complex operations into smaller activities
- Use the
@activity.defn decorator
- Use the
@auto_heartbeater decorator for automatic heartbeats
- Process data in batches for large datasets
- Report progress using heartbeats
- Close connections and clean up resources
- Minimize state object size