Skip to main content
Decorators provide reusable functionality for activities and workflows, including distributed locking and observability.

Locking Decorators

needs_lock

application_sdk.decorators.locks.needs_lock Decorator to mark activities that require distributed locking.
from application_sdk.decorators.locks import needs_lock

@needs_lock(max_locks: int = 5, lock_name: Optional[str] = None)
max_locks
int
default:"5"
Maximum number of concurrent locks allowed
lock_name
str | None
default:"None"
Optional custom name for the lock. Defaults to activity name.
Activities decorated with needs_lock must be called with schedule_to_close_timeout to ensure proper lock TTL calculation that covers retries.
raises
WorkflowError
If activity is called without schedule_to_close_timeout

Observability Decorators

observability

application_sdk.observability.decorators.observability_decorator.observability Decorator for adding observability to functions.
from application_sdk.observability.decorators.observability_decorator import observability
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import get_metrics
from application_sdk.observability.traces_adaptor import get_traces

@observability(
    logger: Any = None,
    metrics: Any = None,
    traces: Any = None
)
logger
Any
default:"None"
Logger instance for operation logging. If None, auto-initializes using get_logger()
metrics
Any
default:"None"
Metrics adapter for recording operation metrics. If None, auto-initializes using get_metrics()
traces
Any
default:"None"
Traces adapter for recording operation traces. If None, auto-initializes using get_traces()
behavior
Description
Records traces and metrics for both successful and failed function executions:
  • Generates trace_id and span_id for each execution
  • Records execution duration
  • Logs function start and completion
  • Records success/failure metrics
  • Handles both synchronous and asynchronous functions

Example Usage

Activity with Distributed Lock

from application_sdk.activities import ActivitiesInterface
from application_sdk.decorators.locks import needs_lock
from temporalio import activity, workflow
from datetime import timedelta

class LockedActivities(ActivitiesInterface):
    
    @activity.defn
    @needs_lock(max_locks=3, lock_name="database_migration")
    async def migrate_database(self, workflow_args):
        """Migrate database with distributed lock.
        
        Only 3 migrations can run concurrently across all workers.
        """
        state = await self._get_state(workflow_args)
        
        # Perform migration
        await state.handler.run_migration(
            workflow_args["migration_script"]
        )
        
        return {"status": "migrated"}

# In workflow, call with schedule_to_close_timeout
class MigrationWorkflow(WorkflowInterface):
    activities_cls = LockedActivities
    
    async def run(self, workflow_config):
        # ... preflight ...
        
        # MUST specify schedule_to_close_timeout
        result = await workflow.execute_activity_method(
            self.activities_cls.migrate_database,
            workflow_args,
            schedule_to_close_timeout=timedelta(hours=1),
            start_to_close_timeout=timedelta(minutes=30)
        )
        
        return result

Activity with Custom Lock Name

from application_sdk.decorators.locks import needs_lock
from temporalio import activity

class DataProcessingActivities(ActivitiesInterface):
    
    @activity.defn
    @needs_lock(max_locks=1, lock_name="critical_resource")
    async def process_critical_data(self, workflow_args):
        """Process data with exclusive lock on critical resource.
        
        Only 1 activity can access the critical resource at a time.
        """
        state = await self._get_state(workflow_args)
        
        # Access critical resource exclusively
        result = await state.handler.process_data(
            workflow_args["data"]
        )
        
        return result

Function with Observability

from application_sdk.observability.decorators.observability_decorator import observability
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import get_metrics
from application_sdk.observability.traces_adaptor import get_traces

logger = get_logger(__name__)
metrics = get_metrics()
traces = get_traces()

@observability(logger=logger, metrics=metrics, traces=traces)
async def process_batch(batch_id: str, items: list):
    """Process a batch of items with observability.
    
    Automatically records:
    - Trace with span_id and trace_id
    - Success/failure metrics
    - Execution duration
    - Start/completion logs
    """
    processed = []
    
    for item in items:
        # Process item
        result = await process_item(item)
        processed.append(result)
    
    return {
        "batch_id": batch_id,
        "total": len(items),
        "processed": len(processed)
    }

Auto-initialized Observability

from application_sdk.observability.decorators.observability_decorator import observability

# Observability components auto-initialized
@observability()
async def fetch_data(source: str):
    """Fetch data with auto-initialized observability.
    
    Logger, metrics, and traces are automatically initialized.
    """
    # Fetch data
    data = await external_api.get(source)
    
    return data

Class Method with Observability

from application_sdk.observability.decorators.observability_decorator import observability

class DataProcessor:
    
    @observability()
    async def process(self, data: dict):
        """Process data with observability tracking."""
        # Transform data
        transformed = self._transform(data)
        
        # Validate
        self._validate(transformed)
        
        # Save
        await self._save(transformed)
        
        return transformed
    
    def _transform(self, data: dict):
        # Transformation logic
        return data
    
    def _validate(self, data: dict):
        # Validation logic
        pass
    
    async def _save(self, data: dict):
        # Save logic
        pass

Combining Decorators

from application_sdk.decorators.locks import needs_lock
from application_sdk.observability.decorators.observability_decorator import observability
from temporalio import activity

class CombinedActivities(ActivitiesInterface):
    
    @activity.defn
    @needs_lock(max_locks=2)
    @observability()
    async def locked_and_observed(self, workflow_args):
        """Activity with both locking and observability.
        
        Provides:
        - Distributed locking (max 2 concurrent)
        - Automatic observability (traces, metrics, logs)
        """
        state = await self._get_state(workflow_args)
        
        # Process with lock and observability
        result = await state.handler.process(
            workflow_args["data"]
        )
        
        return result

Synchronous Function with Observability

from application_sdk.observability.decorators.observability_decorator import observability

@observability()
def calculate_metrics(data: list) -> dict:
    """Calculate metrics from data.
    
    Synchronous function with observability.
    """
    total = len(data)
    average = sum(data) / total if total > 0 else 0
    
    return {
        "total": total,
        "average": average,
        "min": min(data) if data else 0,
        "max": max(data) if data else 0
    }

Error Handling with Observability

from application_sdk.observability.decorators.observability_decorator import observability

@observability()
async def risky_operation(input_data: dict):
    """Operation that may fail.
    
    Errors are automatically:
    - Logged with context
    - Recorded in metrics (failure counter)
    - Tracked in traces (ERROR status)
    """
    try:
        # Validate input
        if not input_data.get("required_field"):
            raise ValueError("Missing required field")
        
        # Process
        result = await process(input_data)
        
        return result
        
    except ValueError as e:
        # Re-raise to let decorator handle observability
        raise
    except Exception as e:
        # Log additional context before re-raising
        logger.error(f"Unexpected error: {e}", extra={"input": input_data})
        raise

Best Practices

Distributed Locking

  • Use needs_lock for activities that access shared resources
  • Set appropriate max_locks based on resource capacity
  • Always specify schedule_to_close_timeout when calling locked activities
  • Use descriptive lock names for clarity
  • Consider lock duration when setting max_locks

Observability

  • Use @observability() with auto-initialization for simplicity
  • Apply to both sync and async functions
  • Combine with other decorators (activity.defn, needs_lock)
  • Let decorator handle error observability
  • Don’t manually duplicate what the decorator provides

Decorator Order

  • Place @activity.defn first (closest to function)
  • Place @needs_lock after activity.defn
  • Place @observability last (outermost)
@observability()          # Outermost - observes everything
@needs_lock(max_locks=5)  # Middle - adds locking
@activity.defn            # Innermost - defines activity
async def my_activity(self, workflow_args):
    pass

Performance

  • Observability has minimal overhead
  • Lock acquisition may introduce delays under contention
  • Monitor lock wait times in production
  • Adjust max_locks based on observed performance

Build docs developers (and LLMs) love