Activities are the building blocks of workflows in the Atlan Application SDK. They represent individual tasks that perform actual work, such as calling APIs, querying databases, processing files, or any other side-effecting operation. Activities are designed to be retryable and fault-tolerant.
Unlike workflows, activities can perform non-deterministic operations like I/O, API calls, and database queries. All side effects should happen in activities.
Activities have access to a shared state that persists across activity executions within the same workflow run. This state includes the handler instance and workflow arguments.
Dictionary containing workflow arguments and configuration.
State is automatically refreshed if more than 15 minutes have passed since the last update. This ensures credentials and configurations remain current for long-running workflows.
Access activity information using the activity module:
from temporalio import activity@activity.defnasync def my_activity(self, args: Dict[str, Any]): # Get activity info info = activity.info() # Log using activity logger activity.logger.info(f"Running activity: {info.activity_type}") # Send heartbeat with progress activity.heartbeat({"progress": 50, "message": "Halfway done"}) # Check if activity is being cancelled if activity.is_cancelled(): activity.logger.info("Activity cancelled, cleaning up...") raise activity.CancelledError("Activity was cancelled")
The SDK provides utilities in application_sdk.activities.common.utils:
from application_sdk.activities.common.utils import ( get_workflow_id, get_workflow_run_id, build_output_path, auto_heartbeater)# Get current workflow IDworkflow_id = get_workflow_id()# Get current run IDrun_id = get_workflow_run_id()# Build unique output pathoutput_path = build_output_path() # Returns: "YYYY/MM/DD/HH/MM/<workflow_id>/<run_id>"
Always send heartbeats for activities that take more than a few seconds. This allows Temporal to detect worker failures.
@activity.defn@auto_heartbeater # Automatic heartbeatsasync def process_large_file(self, args: Dict[str, Any]): # Manual heartbeat with progress for i, chunk in enumerate(file_chunks): process_chunk(chunk) activity.heartbeat({"progress": i / len(file_chunks) * 100})
Make Activities Idempotent
Activities may be retried, so design them to be idempotent (safe to execute multiple times).
@activity.defnasync def update_record(self, args: Dict[str, Any]): record_id = args["record_id"] # Check if already processed existing = await handler.get_record(record_id) if existing and existing["processed"]: return {"status": "already_processed", "record_id": record_id} # Process the record result = await handler.process_record(record_id) return result
Clean Up State on Errors
Always clean up activity state when errors occur to prevent stale state from affecting future executions.
try: result = await perform_operation() return resultexcept Exception as e: await self._clean_state() raise
Log Meaningful Messages
Use structured logging with context to make debugging easier.