Decorators provide reusable functionality for activities and workflows, including distributed locking and observability.
Locking Decorators
needs_lock
application_sdk.decorators.locks.needs_lock
Decorator to mark activities that require distributed locking.
from application_sdk.decorators.locks import needs_lock
@needs_lock(max_locks: int = 5, lock_name: Optional[str] = None)
Maximum number of concurrent locks allowed
Optional custom name for the lock. Defaults to activity name.
Activities decorated with needs_lock must be called with schedule_to_close_timeout to ensure proper lock TTL calculation that covers retries.
If activity is called without schedule_to_close_timeout
Observability Decorators
observability
application_sdk.observability.decorators.observability_decorator.observability
Decorator for adding observability to functions.
from application_sdk.observability.decorators.observability_decorator import observability
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import get_metrics
from application_sdk.observability.traces_adaptor import get_traces
@observability(
logger: Any = None,
metrics: Any = None,
traces: Any = None
)
Logger instance for operation logging. If None, auto-initializes using get_logger()
Metrics adapter for recording operation metrics. If None, auto-initializes using get_metrics()
Traces adapter for recording operation traces. If None, auto-initializes using get_traces()
Records traces and metrics for both successful and failed function executions:
- Generates trace_id and span_id for each execution
- Records execution duration
- Logs function start and completion
- Records success/failure metrics
- Handles both synchronous and asynchronous functions
Example Usage
Activity with Distributed Lock
from application_sdk.activities import ActivitiesInterface
from application_sdk.decorators.locks import needs_lock
from temporalio import activity, workflow
from datetime import timedelta
class LockedActivities(ActivitiesInterface):
@activity.defn
@needs_lock(max_locks=3, lock_name="database_migration")
async def migrate_database(self, workflow_args):
"""Migrate database with distributed lock.
Only 3 migrations can run concurrently across all workers.
"""
state = await self._get_state(workflow_args)
# Perform migration
await state.handler.run_migration(
workflow_args["migration_script"]
)
return {"status": "migrated"}
# In workflow, call with schedule_to_close_timeout
class MigrationWorkflow(WorkflowInterface):
activities_cls = LockedActivities
async def run(self, workflow_config):
# ... preflight ...
# MUST specify schedule_to_close_timeout
result = await workflow.execute_activity_method(
self.activities_cls.migrate_database,
workflow_args,
schedule_to_close_timeout=timedelta(hours=1),
start_to_close_timeout=timedelta(minutes=30)
)
return result
Activity with Custom Lock Name
from application_sdk.decorators.locks import needs_lock
from temporalio import activity
class DataProcessingActivities(ActivitiesInterface):
@activity.defn
@needs_lock(max_locks=1, lock_name="critical_resource")
async def process_critical_data(self, workflow_args):
"""Process data with exclusive lock on critical resource.
Only 1 activity can access the critical resource at a time.
"""
state = await self._get_state(workflow_args)
# Access critical resource exclusively
result = await state.handler.process_data(
workflow_args["data"]
)
return result
Function with Observability
from application_sdk.observability.decorators.observability_decorator import observability
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import get_metrics
from application_sdk.observability.traces_adaptor import get_traces
logger = get_logger(__name__)
metrics = get_metrics()
traces = get_traces()
@observability(logger=logger, metrics=metrics, traces=traces)
async def process_batch(batch_id: str, items: list):
"""Process a batch of items with observability.
Automatically records:
- Trace with span_id and trace_id
- Success/failure metrics
- Execution duration
- Start/completion logs
"""
processed = []
for item in items:
# Process item
result = await process_item(item)
processed.append(result)
return {
"batch_id": batch_id,
"total": len(items),
"processed": len(processed)
}
Auto-initialized Observability
from application_sdk.observability.decorators.observability_decorator import observability
# Observability components auto-initialized
@observability()
async def fetch_data(source: str):
"""Fetch data with auto-initialized observability.
Logger, metrics, and traces are automatically initialized.
"""
# Fetch data
data = await external_api.get(source)
return data
Class Method with Observability
from application_sdk.observability.decorators.observability_decorator import observability
class DataProcessor:
@observability()
async def process(self, data: dict):
"""Process data with observability tracking."""
# Transform data
transformed = self._transform(data)
# Validate
self._validate(transformed)
# Save
await self._save(transformed)
return transformed
def _transform(self, data: dict):
# Transformation logic
return data
def _validate(self, data: dict):
# Validation logic
pass
async def _save(self, data: dict):
# Save logic
pass
Combining Decorators
from application_sdk.decorators.locks import needs_lock
from application_sdk.observability.decorators.observability_decorator import observability
from temporalio import activity
class CombinedActivities(ActivitiesInterface):
@activity.defn
@needs_lock(max_locks=2)
@observability()
async def locked_and_observed(self, workflow_args):
"""Activity with both locking and observability.
Provides:
- Distributed locking (max 2 concurrent)
- Automatic observability (traces, metrics, logs)
"""
state = await self._get_state(workflow_args)
# Process with lock and observability
result = await state.handler.process(
workflow_args["data"]
)
return result
Synchronous Function with Observability
from application_sdk.observability.decorators.observability_decorator import observability
@observability()
def calculate_metrics(data: list) -> dict:
"""Calculate metrics from data.
Synchronous function with observability.
"""
total = len(data)
average = sum(data) / total if total > 0 else 0
return {
"total": total,
"average": average,
"min": min(data) if data else 0,
"max": max(data) if data else 0
}
Error Handling with Observability
from application_sdk.observability.decorators.observability_decorator import observability
@observability()
async def risky_operation(input_data: dict):
"""Operation that may fail.
Errors are automatically:
- Logged with context
- Recorded in metrics (failure counter)
- Tracked in traces (ERROR status)
"""
try:
# Validate input
if not input_data.get("required_field"):
raise ValueError("Missing required field")
# Process
result = await process(input_data)
return result
except ValueError as e:
# Re-raise to let decorator handle observability
raise
except Exception as e:
# Log additional context before re-raising
logger.error(f"Unexpected error: {e}", extra={"input": input_data})
raise
Best Practices
Distributed Locking
- Use
needs_lock for activities that access shared resources
- Set appropriate
max_locks based on resource capacity
- Always specify
schedule_to_close_timeout when calling locked activities
- Use descriptive lock names for clarity
- Consider lock duration when setting max_locks
Observability
- Use
@observability() with auto-initialization for simplicity
- Apply to both sync and async functions
- Combine with other decorators (activity.defn, needs_lock)
- Let decorator handle error observability
- Don’t manually duplicate what the decorator provides
Decorator Order
- Place
@activity.defn first (closest to function)
- Place
@needs_lock after activity.defn
- Place
@observability last (outermost)
@observability() # Outermost - observes everything
@needs_lock(max_locks=5) # Middle - adds locking
@activity.defn # Innermost - defines activity
async def my_activity(self, workflow_args):
pass
- Observability has minimal overhead
- Lock acquisition may introduce delays under contention
- Monitor lock wait times in production
- Adjust max_locks based on observed performance