Skip to main content
The observability module provides logging, metrics, and tracing capabilities for monitoring application behavior.

Logger Adaptor

application_sdk.observability.logger_adaptor.get_logger Get a configured logger instance.
from application_sdk.observability.logger_adaptor import get_logger

logger = get_logger(__name__)
name
str
required
The name of the logger, typically __name__
return
Logger
Configured logger instance with structured logging support

Logger Methods

logger.debug(message, **kwargs)
logger.info(message, **kwargs)
logger.warning(message, **kwargs)
logger.error(message, exc_info=True, **kwargs)
message
str
required
The log message
exc_info
bool
default:"False"
Whether to include exception information
**kwargs
Any
Additional structured data to include in the log (error_code, extra fields, etc.)

Metrics Adaptor

application_sdk.observability.metrics_adaptor.get_metrics Get a configured metrics instance.
from application_sdk.observability.metrics_adaptor import get_metrics, MetricType

metrics = get_metrics()
return
Metrics
Configured metrics instance

record_metric

Record a metric value.
metrics.record_metric(
    name: str,
    value: float,
    metric_type: MetricType,
    labels: Dict[str, str] = {},
    description: str = "",
    unit: str = ""
)
name
str
required
Metric name
value
float
required
Metric value
metric_type
MetricType
required
Type of metric: MetricType.COUNTER, MetricType.GAUGE, MetricType.HISTOGRAM
labels
Dict[str, str]
default:"{}"
Labels for the metric
description
str
default:"\"\""
Description of the metric
unit
str
default:"\"\""
Unit of measurement

MetricType Enum

from application_sdk.observability.metrics_adaptor import MetricType

MetricType.COUNTER    # Monotonically increasing counter
MetricType.GAUGE      # Value that can go up or down
MetricType.HISTOGRAM  # Distribution of values

Traces Adaptor

application_sdk.observability.traces_adaptor.get_traces Get a configured traces instance.
from application_sdk.observability.traces_adaptor import get_traces

traces = get_traces()
return
Traces
Configured traces instance

record_trace

Record a trace span.
traces.record_trace(
    name: str,
    trace_id: str,
    span_id: str,
    kind: str = "INTERNAL",
    status_code: str = "OK",
    attributes: Dict[str, Any] = {},
    events: List[Dict[str, Any]] = [],
    duration_ms: float = 0.0
)
name
str
required
Span name
trace_id
str
required
Unique trace identifier
span_id
str
required
Unique span identifier
kind
str
default:"INTERNAL"
Span kind: “INTERNAL”, “SERVER”, “CLIENT”, “PRODUCER”, “CONSUMER”
status_code
str
default:"OK"
Status code: “OK”, “ERROR”
attributes
Dict[str, Any]
default:"{}"
Span attributes
events
List[Dict[str, Any]]
default:"[]"
Span events with timestamps
duration_ms
float
default:"0.0"
Span duration in milliseconds

Example Usage

Basic Logging

from application_sdk.observability.logger_adaptor import get_logger

logger = get_logger(__name__)

# Simple logging
logger.info("Processing started")
logger.debug("Debug information")
logger.warning("Warning message")

# Error logging with exception
try:
    risky_operation()
except Exception as e:
    logger.error("Operation failed", exc_info=True)

# Structured logging
logger.info(
    "User action completed",
    user_id="user123",
    action="data_upload",
    duration_ms=1500
)

Recording Metrics

from application_sdk.observability.metrics_adaptor import get_metrics, MetricType

metrics = get_metrics()

# Counter metric
metrics.record_metric(
    name="api_requests_total",
    value=1,
    metric_type=MetricType.COUNTER,
    labels={"endpoint": "/api/data", "method": "GET"},
    description="Total API requests",
    unit="count"
)

# Gauge metric
metrics.record_metric(
    name="active_connections",
    value=42,
    metric_type=MetricType.GAUGE,
    labels={"service": "database"},
    description="Number of active connections",
    unit="connections"
)

# Histogram metric
metrics.record_metric(
    name="request_duration",
    value=125.5,
    metric_type=MetricType.HISTOGRAM,
    labels={"endpoint": "/api/data"},
    description="Request duration",
    unit="milliseconds"
)

Recording Traces

from application_sdk.observability.traces_adaptor import get_traces
import uuid
import time

traces = get_traces()

# Generate IDs
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())

# Record successful operation
start_time = time.time()

# ... do work ...

duration_ms = (time.time() - start_time) * 1000

traces.record_trace(
    name="process_data",
    trace_id=trace_id,
    span_id=span_id,
    kind="INTERNAL",
    status_code="OK",
    attributes={
        "function": "process_data",
        "input_size": 1000,
        "output_size": 950
    },
    events=[
        {
            "name": "processing_started",
            "timestamp": time.time()
        },
        {
            "name": "processing_completed",
            "timestamp": time.time()
        }
    ],
    duration_ms=duration_ms
)

Error Tracing

from application_sdk.observability.traces_adaptor import get_traces
import uuid
import time

traces = get_traces()
trace_id = str(uuid.uuid4())
span_id = str(uuid.uuid4())
start_time = time.time()

try:
    # ... operation ...
    pass
except Exception as e:
    duration_ms = (time.time() - start_time) * 1000
    
    # Record error trace
    traces.record_trace(
        name="failed_operation",
        trace_id=trace_id,
        span_id=span_id,
        kind="INTERNAL",
        status_code="ERROR",
        attributes={
            "function": "failed_operation",
            "error_type": type(e).__name__,
            "error_message": str(e)
        },
        events=[
            {
                "name": "error_occurred",
                "timestamp": time.time(),
                "attributes": {"error": str(e)}
            }
        ],
        duration_ms=duration_ms
    )
    raise

Combined Observability

from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import get_metrics, MetricType
from application_sdk.observability.traces_adaptor import get_traces
import uuid
import time

logger = get_logger(__name__)
metrics = get_metrics()
traces = get_traces()

async def process_batch(batch_id: str, items: list):
    """Process batch with full observability."""
    
    # Generate trace identifiers
    trace_id = str(uuid.uuid4())
    span_id = str(uuid.uuid4())
    start_time = time.time()
    
    # Log start
    logger.info(
        "Batch processing started",
        batch_id=batch_id,
        item_count=len(items),
        trace_id=trace_id
    )
    
    # Increment counter
    metrics.record_metric(
        name="batch_processing_started",
        value=1,
        metric_type=MetricType.COUNTER,
        labels={"batch_id": batch_id}
    )
    
    try:
        # Process items
        processed = []
        for i, item in enumerate(items):
            result = await process_item(item)
            processed.append(result)
            
            # Log progress
            if (i + 1) % 100 == 0:
                logger.debug(
                    f"Processed {i + 1}/{len(items)} items",
                    batch_id=batch_id
                )
        
        # Calculate duration
        duration_ms = (time.time() - start_time) * 1000
        
        # Record success metrics
        metrics.record_metric(
            name="batch_processing_success",
            value=1,
            metric_type=MetricType.COUNTER,
            labels={"batch_id": batch_id}
        )
        
        metrics.record_metric(
            name="batch_processing_duration",
            value=duration_ms,
            metric_type=MetricType.HISTOGRAM,
            labels={"batch_id": batch_id},
            unit="milliseconds"
        )
        
        # Record success trace
        traces.record_trace(
            name="process_batch",
            trace_id=trace_id,
            span_id=span_id,
            kind="INTERNAL",
            status_code="OK",
            attributes={
                "batch_id": batch_id,
                "item_count": len(items),
                "processed_count": len(processed)
            },
            events=[
                {
                    "name": "batch_started",
                    "timestamp": start_time
                },
                {
                    "name": "batch_completed",
                    "timestamp": time.time()
                }
            ],
            duration_ms=duration_ms
        )
        
        # Log completion
        logger.info(
            "Batch processing completed",
            batch_id=batch_id,
            processed_count=len(processed),
            duration_ms=duration_ms,
            trace_id=trace_id
        )
        
        return processed
        
    except Exception as e:
        # Calculate duration
        duration_ms = (time.time() - start_time) * 1000
        
        # Record failure metrics
        metrics.record_metric(
            name="batch_processing_failure",
            value=1,
            metric_type=MetricType.COUNTER,
            labels={
                "batch_id": batch_id,
                "error_type": type(e).__name__
            }
        )
        
        # Record error trace
        traces.record_trace(
            name="process_batch",
            trace_id=trace_id,
            span_id=span_id,
            kind="INTERNAL",
            status_code="ERROR",
            attributes={
                "batch_id": batch_id,
                "error_type": type(e).__name__,
                "error_message": str(e)
            },
            events=[
                {
                    "name": "batch_failed",
                    "timestamp": time.time(),
                    "attributes": {"error": str(e)}
                }
            ],
            duration_ms=duration_ms
        )
        
        # Log error
        logger.error(
            "Batch processing failed",
            batch_id=batch_id,
            error_type=type(e).__name__,
            trace_id=trace_id,
            exc_info=True
        )
        
        raise

Activity with Observability

from application_sdk.activities import ActivitiesInterface
from application_sdk.observability.logger_adaptor import get_logger
from application_sdk.observability.metrics_adaptor import get_metrics, MetricType
from temporalio import activity

logger = get_logger(__name__)
metrics = get_metrics()

class ObservableActivities(ActivitiesInterface):
    
    @activity.defn
    async def fetch_data(self, workflow_args):
        """Fetch data with observability."""
        logger.info("Starting data fetch", workflow_id=workflow_args.get("workflow_id"))
        
        try:
            state = await self._get_state(workflow_args)
            
            # Fetch data
            data = await state.handler.fetch_metadata(
                metadata=workflow_args["metadata"]
            )
            
            # Record metrics
            metrics.record_metric(
                name="data_fetch_success",
                value=1,
                metric_type=MetricType.COUNTER,
                labels={"source": workflow_args.get("source", "unknown")}
            )
            
            metrics.record_metric(
                name="data_fetch_count",
                value=len(data),
                metric_type=MetricType.GAUGE,
                labels={"source": workflow_args.get("source", "unknown")},
                unit="records"
            )
            
            logger.info(
                "Data fetch completed",
                record_count=len(data),
                workflow_id=workflow_args.get("workflow_id")
            )
            
            return data
            
        except Exception as e:
            metrics.record_metric(
                name="data_fetch_failure",
                value=1,
                metric_type=MetricType.COUNTER,
                labels={
                    "source": workflow_args.get("source", "unknown"),
                    "error_type": type(e).__name__
                }
            )
            
            logger.error(
                "Data fetch failed",
                workflow_id=workflow_args.get("workflow_id"),
                exc_info=True
            )
            
            raise

Best Practices

Logging

  • Use appropriate log levels (DEBUG, INFO, WARNING, ERROR)
  • Include structured data for better searchability
  • Always log exceptions with exc_info=True
  • Include workflow_id and trace_id for correlation
  • Avoid logging sensitive information

Metrics

  • Use counters for monotonically increasing values
  • Use gauges for values that can go up or down
  • Use histograms for distributions
  • Include meaningful labels for filtering
  • Provide clear descriptions and units

Tracing

  • Generate unique trace_id and span_id for each operation
  • Include relevant attributes for debugging
  • Record events at key points
  • Set appropriate status codes
  • Calculate accurate durations

Performance

  • Observability has minimal overhead
  • Use DEBUG level for detailed information
  • Avoid excessive logging in hot paths
  • Use sampling for high-volume traces

Build docs developers (and LLMs) love