The observability module provides logging, metrics, and tracing capabilities for monitoring application behavior.
Logger Adaptor
application_sdk.observability.logger_adaptor.get_logger
Get a configured logger instance.
from application_sdk.observability.logger_adaptor import get_logger
logger = get_logger(__name__)
The name of the logger, typically __name__
Configured logger instance with structured logging support
Logger Methods
logger.debug(message, **kwargs)
logger.info(message, **kwargs)
logger.warning(message, **kwargs)
logger.error(message, exc_info=True, **kwargs)
Whether to include exception information
Additional structured data to include in the log (error_code, extra fields, etc.)
Metrics Adaptor
application_sdk.observability.metrics_adaptor.get_metrics
Get a configured metrics instance.
from application_sdk.observability.metrics_adaptor import get_metrics, MetricType
metrics = get_metrics()
Configured metrics instance
record_metric
Record a metric value.
metrics.record_metric(
name: str,
value: float,
metric_type: MetricType,
labels: Dict[str, str] = {},
description: str = "",
unit: str = ""
)
Type of metric: MetricType.COUNTER, MetricType.GAUGE, MetricType.HISTOGRAM
labels
Dict[str, str]
default:"{}"
Labels for the metric
Description of the metric
MetricType Enum
from application_sdk.observability.metrics_adaptor import MetricType
MetricType.COUNTER # Monotonically increasing counter
MetricType.GAUGE # Value that can go up or down
MetricType.HISTOGRAM # Distribution of values
Traces Adaptor
application_sdk.observability.traces_adaptor.get_traces
Get a configured traces instance.
from application_sdk.observability.traces_adaptor import get_traces
traces = get_traces()
Configured traces instance
record_trace
Record a trace span.
traces.record_trace(
name: str,
trace_id: str,
span_id: str,
kind: str = "INTERNAL",
status_code: str = "OK",
attributes: Dict[str, Any] = {},
events: List[Dict[str, Any]] = [],
duration_ms: float = 0.0
)
Span kind: “INTERNAL”, “SERVER”, “CLIENT”, “PRODUCER”, “CONSUMER”
Status code: “OK”, “ERROR”
attributes
Dict[str, Any]
default:"{}"
Span attributes
events
List[Dict[str, Any]]
default:"[]"
Span events with timestamps
Span duration in milliseconds
Example Usage
Basic Logging
from application_sdk.observability.logger_adaptor import get_logger
logger = get_logger(__name__)
# Simple logging
logger.info("Processing started")
logger.debug("Debug information")
logger.warning("Warning message")
# Error logging with exception
try:
risky_operation()
except Exception as e:
logger.error("Operation failed", exc_info=True)
# Structured logging
logger.info(
"User action completed",
user_id="user123",
action="data_upload",
duration_ms=1500
)
Recording Metrics
from application_sdk.observability.metrics_adaptor import get_metrics, MetricType
metrics = get_metrics()
# Counter metric
metrics.record_metric(
name="api_requests_total",
value=1,
metric_type=MetricType.COUNTER,
labels={"endpoint": "/api/data", "method": "GET"},
description="Total API requests",
unit="count"
)
# Gauge metric
metrics.record_metric(
name="active_connections",
value=42,
metric_type=MetricType.GAUGE,
labels={"service": "database"},
description="Number of active connections",
unit="connections"
)
# Histogram metric
metrics.record_metric(
name="request_duration",
value=125.5,
metric_type=MetricType.HISTOGRAM,
labels={"endpoint": "/api/data"},
description="Request duration",
unit="milliseconds"
)
Recording Traces
from application_sdk.observability.traces_adaptor import get_traces
import uuid
import time
traces = get_traces()
# Generate IDs
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
# Record successful operation
start_time = time.time()
# ... do work ...
duration_ms = (time.time() - start_time) * 1000
traces.record_trace(
name="process_data",
trace_id=trace_id,
span_id=span_id,
kind="INTERNAL",
status_code="OK",
attributes={
"function": "process_data",
"input_size": 1000,
"output_size": 950
},
events=[
{
"name": "processing_started",
"timestamp": time.time()
},
{
"name": "processing_completed",
"timestamp": time.time()
}
],
duration_ms=duration_ms
)
Error Tracing
from application_sdk.observability.traces_adaptor import get_traces
import uuid
import time
traces = get_traces()
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
start_time = time.time()
try:
# ... operation ...
pass
except Exception as e:
duration_ms = (time.time() - start_time) * 1000
# Record error trace
traces.record_trace(
name="failed_operation",
trace_id=trace_id,
span_id=span_id,
kind="INTERNAL",
status_code="ERROR",
attributes={
"function": "failed_operation",
"error_type": type(e).__name__,
"error_message": str(e)
},
events=[
{
"name": "error_occurred",
"timestamp": time.time(),
"attributes": {"error": str(e)}
}
],
duration_ms=duration_ms
)
raise
Combined Observability
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import get_metrics, MetricType
from application_sdk.observability.traces_adaptor import get_traces
import uuid
import time
logger = get_logger(__name__)
metrics = get_metrics()
traces = get_traces()
async def process_batch(batch_id: str, items: list):
"""Process batch with full observability."""
# Generate trace identifiers
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
start_time = time.time()
# Log start
logger.info(
"Batch processing started",
batch_id=batch_id,
item_count=len(items),
trace_id=trace_id
)
# Increment counter
metrics.record_metric(
name="batch_processing_started",
value=1,
metric_type=MetricType.COUNTER,
labels={"batch_id": batch_id}
)
try:
# Process items
processed = []
for i, item in enumerate(items):
result = await process_item(item)
processed.append(result)
# Log progress
if (i + 1) % 100 == 0:
logger.debug(
f"Processed {i + 1}/{len(items)} items",
batch_id=batch_id
)
# Calculate duration
duration_ms = (time.time() - start_time) * 1000
# Record success metrics
metrics.record_metric(
name="batch_processing_success",
value=1,
metric_type=MetricType.COUNTER,
labels={"batch_id": batch_id}
)
metrics.record_metric(
name="batch_processing_duration",
value=duration_ms,
metric_type=MetricType.HISTOGRAM,
labels={"batch_id": batch_id},
unit="milliseconds"
)
# Record success trace
traces.record_trace(
name="process_batch",
trace_id=trace_id,
span_id=span_id,
kind="INTERNAL",
status_code="OK",
attributes={
"batch_id": batch_id,
"item_count": len(items),
"processed_count": len(processed)
},
events=[
{
"name": "batch_started",
"timestamp": start_time
},
{
"name": "batch_completed",
"timestamp": time.time()
}
],
duration_ms=duration_ms
)
# Log completion
logger.info(
"Batch processing completed",
batch_id=batch_id,
processed_count=len(processed),
duration_ms=duration_ms,
trace_id=trace_id
)
return processed
except Exception as e:
# Calculate duration
duration_ms = (time.time() - start_time) * 1000
# Record failure metrics
metrics.record_metric(
name="batch_processing_failure",
value=1,
metric_type=MetricType.COUNTER,
labels={
"batch_id": batch_id,
"error_type": type(e).__name__
}
)
# Record error trace
traces.record_trace(
name="process_batch",
trace_id=trace_id,
span_id=span_id,
kind="INTERNAL",
status_code="ERROR",
attributes={
"batch_id": batch_id,
"error_type": type(e).__name__,
"error_message": str(e)
},
events=[
{
"name": "batch_failed",
"timestamp": time.time(),
"attributes": {"error": str(e)}
}
],
duration_ms=duration_ms
)
# Log error
logger.error(
"Batch processing failed",
batch_id=batch_id,
error_type=type(e).__name__,
trace_id=trace_id,
exc_info=True
)
raise
Activity with Observability
from application_sdk.activities import ActivitiesInterface
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import get_metrics, MetricType
from temporalio import activity
logger = get_logger(__name__)
metrics = get_metrics()
class ObservableActivities(ActivitiesInterface):
@activity.defn
async def fetch_data(self, workflow_args):
"""Fetch data with observability."""
logger.info("Starting data fetch", workflow_id=workflow_args.get("workflow_id"))
try:
state = await self._get_state(workflow_args)
# Fetch data
data = await state.handler.fetch_metadata(
metadata=workflow_args["metadata"]
)
# Record metrics
metrics.record_metric(
name="data_fetch_success",
value=1,
metric_type=MetricType.COUNTER,
labels={"source": workflow_args.get("source", "unknown")}
)
metrics.record_metric(
name="data_fetch_count",
value=len(data),
metric_type=MetricType.GAUGE,
labels={"source": workflow_args.get("source", "unknown")},
unit="records"
)
logger.info(
"Data fetch completed",
record_count=len(data),
workflow_id=workflow_args.get("workflow_id")
)
return data
except Exception as e:
metrics.record_metric(
name="data_fetch_failure",
value=1,
metric_type=MetricType.COUNTER,
labels={
"source": workflow_args.get("source", "unknown"),
"error_type": type(e).__name__
}
)
logger.error(
"Data fetch failed",
workflow_id=workflow_args.get("workflow_id"),
exc_info=True
)
raise
Best Practices
Logging
- Use appropriate log levels (DEBUG, INFO, WARNING, ERROR)
- Include structured data for better searchability
- Always log exceptions with
exc_info=True
- Include workflow_id and trace_id for correlation
- Avoid logging sensitive information
Metrics
- Use counters for monotonically increasing values
- Use gauges for values that can go up or down
- Use histograms for distributions
- Include meaningful labels for filtering
- Provide clear descriptions and units
Tracing
- Generate unique trace_id and span_id for each operation
- Include relevant attributes for debugging
- Record events at key points
- Set appropriate status codes
- Calculate accurate durations
- Observability has minimal overhead
- Use DEBUG level for detailed information
- Avoid excessive logging in hot paths
- Use sampling for high-volume traces