Skip to main content
Observable instruments (also called asynchronous instruments) report measurements via callbacks that are invoked during metric collection. They’re ideal for values that are expensive to compute, read from external sources, or already being tracked elsewhere.

When to Use Observable Instruments

Use observable instruments when:
  • Reading from system APIs or sensors
  • Values are expensive to compute
  • Measurements should happen on a schedule (not inline)
  • Multiple instruments need the same source data
  • You don’t control when collection happens
Common examples:
  • CPU time from OS APIs
  • Memory usage from system calls
  • Process statistics
  • Sensor readings
  • External resource metrics
Observable instruments use callbacks that run during metric collection. The SDK determines when to call your callback, typically during export intervals.

Available Observable Instruments

ObservableCounter

For monotonically increasing values:
pub struct ObservableCounter<T> {
    _marker: std::marker::PhantomData<T>,
}
Supports: u64, f64

ObservableUpDownCounter

For values that can increase or decrease:
pub struct ObservableUpDownCounter<T> {
    _marker: std::marker::PhantomData<T>,
}
Supports: i64, f64

ObservableGauge

For current/instantaneous values:
pub struct ObservableGauge<T> {
    _marker: std::marker::PhantomData<T>,
}
Supports: u64, i64, f64

AsyncInstrument Trait

All observable instruments implement the AsyncInstrument trait:
pub trait AsyncInstrument<T>: Send + Sync {
    fn observe(&self, measurement: T, attributes: &[KeyValue]);
}
The observe method is called within your callback to report measurements.

Callback Type

Callbacks are defined as:
pub type Callback<T> = Box<dyn Fn(&dyn AsyncInstrument<T>) + Send + Sync>;
Your callback receives an observer that implements AsyncInstrument<T>, which you use to report measurements.

Creating Observable Instruments

Observable instruments are created using the builder pattern with with_callback:

ObservableCounter

use opentelemetry::{global, KeyValue};

let meter = global::meter("my-app");

// u64 observable counter
let _observable_counter = meter
    .u64_observable_counter("cpu_time")
    .with_description("CPU time used by the process")
    .with_unit("ms")
    .with_callback(|observer| {
        let cpu_time = get_process_cpu_time();  // Your function
        observer.observe(cpu_time, &[]);
    })
    .build();

// f64 observable counter
let _observable_counter = meter
    .f64_observable_counter("bytes_processed")
    .with_callback(|observer| {
        let bytes = get_total_bytes_processed();
        observer.observe(bytes, &[]);
    })
    .build();

ObservableUpDownCounter

let _observable_updown = meter
    .i64_observable_up_down_counter("active_connections")
    .with_description("Number of active connections")
    .with_callback(|observer| {
        let count = get_active_connection_count();
        observer.observe(count, &[]);
    })
    .build();

ObservableGauge

let _observable_gauge = meter
    .f64_observable_gauge("cpu_usage")
    .with_description("Current CPU usage percentage")
    .with_unit("%")
    .with_callback(|observer| {
        let usage = get_current_cpu_usage();
        observer.observe(usage, &[KeyValue::new("core", "0")]);
    })
    .build();

Complete Example

use opentelemetry::{global, KeyValue};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use opentelemetry_sdk::Resource;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

// Simulated system state
struct SystemStats {
    cpu_time: AtomicU64,
    memory_used: AtomicU64,
    connections: AtomicU64,
}

impl SystemStats {
    fn new() -> Self {
        SystemStats {
            cpu_time: AtomicU64::new(0),
            memory_used: AtomicU64::new(0),
            connections: AtomicU64::new(0),
        }
    }
}

fn init_meter_provider() -> SdkMeterProvider {
    let exporter = opentelemetry_stdout::MetricExporterBuilder::default().build();
    let provider = SdkMeterProvider::builder()
        .with_periodic_exporter(exporter)
        .with_resource(
            Resource::builder()
                .with_service_name("observable-example")
                .build(),
        )
        .build();
    global::set_meter_provider(provider.clone());
    provider
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let meter_provider = init_meter_provider();
    let meter = global::meter("system-monitor");
    
    let stats = Arc::new(SystemStats::new());

    // Observable counter for cumulative CPU time
    let stats_clone = stats.clone();
    let _cpu_time = meter
        .u64_observable_counter("process_cpu_time")
        .with_description("Total CPU time used")
        .with_unit("ms")
        .with_callback(move |observer| {
            let time = stats_clone.cpu_time.load(Ordering::Relaxed);
            observer.observe(time, &[]);
        })
        .build();

    // Observable gauge for current memory usage
    let stats_clone = stats.clone();
    let _memory = meter
        .u64_observable_gauge("process_memory_usage")
        .with_description("Current memory usage")
        .with_unit("By")
        .with_callback(move |observer| {
            let memory = stats_clone.memory_used.load(Ordering::Relaxed);
            observer.observe(memory, &[]);
        })
        .build();

    // Observable gauge for active connections
    let stats_clone = stats.clone();
    let _connections = meter
        .u64_observable_gauge("active_connections")
        .with_callback(move |observer| {
            let count = stats_clone.connections.load(Ordering::Relaxed);
            observer.observe(count, &[]);
        })
        .build();

    // Simulate system activity
    stats.cpu_time.store(12500, Ordering::Relaxed);
    stats.memory_used.store(1024 * 1024 * 512, Ordering::Relaxed);
    stats.connections.store(42, Ordering::Relaxed);

    // Metrics will be collected automatically during export
    meter_provider.shutdown()?;
    Ok(())
}

Example from Source

From the metrics-basic example:
// Create a ObservableCounter instrument and register a callback
let _observable_counter = meter
    .u64_observable_counter("my_observable_counter")
    .with_description("My observable counter example description")
    .with_unit("myunit")
    .with_callback(|observer| {
        observer.observe(
            100,
            &[
                KeyValue::new("mykey1", "myvalue1"),
                KeyValue::new("mykey2", "myvalue2"),
            ],
        )
    })
    .build();

Multiple Observations in One Callback

A single callback can report multiple measurements with different attributes:
let _cpu_usage = meter
    .f64_observable_gauge("cpu_usage_per_core")
    .with_callback(|observer| {
        // Report usage for each CPU core
        observer.observe(45.2, &[KeyValue::new("core", "0")]);
        observer.observe(52.1, &[KeyValue::new("core", "1")]);
        observer.observe(38.7, &[KeyValue::new("core", "2")]);
        observer.observe(61.3, &[KeyValue::new("core", "3")]);
    })
    .build();

Multiple Callbacks

You can register multiple callbacks for the same instrument:
let _gauge = meter
    .f64_observable_gauge("temperature")
    .with_callback(|observer| {
        observer.observe(22.5, &[KeyValue::new("sensor", "cpu")]);
    })
    .with_callback(|observer| {
        observer.observe(35.0, &[KeyValue::new("sensor", "gpu")]);
    })
    .build();

Capturing State in Callbacks

Use closures to capture state:
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

let request_count = Arc::new(AtomicU64::new(0));

// Increment in your application code
let counter_clone = request_count.clone();
request_count.fetch_add(1, Ordering::Relaxed);

// Report in observable instrument
let _observable = meter
    .u64_observable_counter("requests_total")
    .with_callback(move |observer| {
        let count = counter_clone.load(Ordering::Relaxed);
        observer.observe(count, &[]);
    })
    .build();

Synchronous vs. Observable

Choose the right approach for your use case:
// Record as events happen
let counter = meter.u64_counter("requests").build();

fn handle_request() {
    counter.add(1, &[KeyValue::new("endpoint", "/api")]);
    // Process request...
}

Instrument Type Comparison

InstrumentSynchronousObservableUse Case
CounterCounterObservableCounterMonotonically increasing
UpDownCounterUpDownCounterObservableUpDownCounterCan increase or decrease
GaugeGaugeObservableGaugeCurrent/instantaneous value
HistogramHistogram(none)Value distributions
There is no ObservableHistogram. Histograms are only available as synchronous instruments.

Callback Best Practices

Keep Callbacks Fast

Callbacks should complete quickly:
// Good - fast operation
let _gauge = meter
    .u64_observable_gauge("cache_size")
    .with_callback(|observer| {
        let size = CACHE.len();  // O(1) operation
        observer.observe(size as u64, &[]);
    })
    .build();

// Bad - slow operation
let _gauge = meter
    .u64_observable_gauge("expensive_metric")
    .with_callback(|observer| {
        let value = expensive_computation();  // Avoid!
        observer.observe(value, &[]);
    })
    .build();

Avoid Blocking I/O

Don’t perform blocking operations in callbacks:
// Bad - blocking I/O
let _gauge = meter
    .u64_observable_gauge("file_size")
    .with_callback(|observer| {
        let size = std::fs::metadata("file.txt").unwrap().len();  // Avoid!
        observer.observe(size, &[]);
    })
    .build();

// Good - pre-computed value
let file_size = Arc::new(AtomicU64::new(0));

// Update in background
let size_clone = file_size.clone();
std::thread::spawn(move || {
    loop {
        if let Ok(metadata) = std::fs::metadata("file.txt") {
            size_clone.store(metadata.len(), Ordering::Relaxed);
        }
        std::thread::sleep(Duration::from_secs(60));
    }
});

// Report in callback
let _gauge = meter
    .u64_observable_gauge("file_size")
    .with_callback(move |observer| {
        observer.observe(file_size.load(Ordering::Relaxed), &[]);
    })
    .build();

Handle Errors Gracefully

let _gauge = meter
    .f64_observable_gauge("temperature")
    .with_callback(|observer| {
        match read_temperature_sensor() {
            Ok(temp) => observer.observe(temp, &[]),
            Err(_) => {}
            // Don't observe on error, or use a default value
        }
    })
    .build();

Instrument Lifecycle

Keep observable instruments in scope! The callback will not be invoked if the instrument is dropped:
// Bad - instrument dropped immediately
meter.u64_observable_counter("my_counter")
    .with_callback(|observer| observer.observe(100, &[]))
    .build();
// Callback will never be called!

// Good - keep instrument in scope
let _my_counter = meter
    .u64_observable_counter("my_counter")
    .with_callback(|observer| observer.observe(100, &[]))
    .build();
// Callback will be called during collection

Common Patterns

System Metrics

let _cpu = meter
    .f64_observable_gauge("system_cpu_usage")
    .with_unit("%")
    .with_callback(|observer| {
        if let Ok(usage) = get_cpu_usage() {
            observer.observe(usage, &[]);
        }
    })
    .build();

let _memory = meter
    .u64_observable_gauge("system_memory_used")
    .with_unit("By")
    .with_callback(|observer| {
        if let Ok(used) = get_memory_used() {
            observer.observe(used, &[]);
        }
    })
    .build();

Process Statistics

let _uptime = meter
    .u64_observable_counter("process_uptime")
    .with_unit("s")
    .with_callback(|observer| {
        let uptime = get_process_uptime_seconds();
        observer.observe(uptime, &[]);
    })
    .build();

Next Steps

Counter

Learn about synchronous Counter instruments

Gauge

Learn about synchronous Gauge instruments

Meters

Create and configure Meters

Views

Customize metric aggregation

Build docs developers (and LLMs) love