Skip to main content

Overview

The HardwareMonitor class provides hardware telemetry with graceful fallback when psutil is unavailable. Supports RAPL energy measurements on Linux systems with Intel CPUs.

Class Definition

class HardwareMonitor:
    def __init__(self) -> None
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/hardware/monitor.py:16 Automatically detects available hardware monitoring capabilities on initialization:
  • Attempts to import psutil for CPU and memory monitoring
  • Discovers RAPL energy interface at /sys/class/powercap/intel-rapl*/energy_uj
  • Falls back gracefully if either is unavailable

Methods

snapshot

def snapshot(self) -> TelemetrySnapshot
Captures current hardware state.
return
TelemetrySnapshot
Dataclass containing:
  • cpu_percent (float): CPU usage percentage (0.0 if psutil unavailable)
  • process_memory_mb (float): Process memory in MB (0.0 if psutil unavailable)
  • system_memory_percent (float): System memory usage percentage (0.0 if psutil unavailable)
  • energy_uj (float | None): RAPL energy counter in microjoules (None if unavailable)
Example:
from pipeline.hardware import HardwareMonitor
import time

monitor = HardwareMonitor()

# Capture snapshot
snapshot = monitor.snapshot()

print(f"CPU usage: {snapshot.cpu_percent:.1f}%")
print(f"Process memory: {snapshot.process_memory_mb:.2f} MB")
print(f"System memory: {snapshot.system_memory_percent:.1f}%")

if snapshot.energy_uj is not None:
    print(f"Energy counter: {snapshot.energy_uj:,.0f} μJ")
else:
    print("Energy monitoring not available")

process_memory_mb

def process_memory_mb(self) -> float
Convenience method to get current process memory usage.
return
float
Process memory in megabytes (0.0 if psutil unavailable)
Example:
from pipeline.hardware import HardwareMonitor

monitor = HardwareMonitor()

mem_before = monitor.process_memory_mb()
print(f"Memory before: {mem_before:.2f} MB")

# Do some memory-intensive work
data = [i**2 for i in range(10_000_000)]

mem_after = monitor.process_memory_mb()
print(f"Memory after: {mem_after:.2f} MB")
print(f"Memory increase: {mem_after - mem_before:.2f} MB")

compare

def compare(
    self, 
    start: TelemetrySnapshot, 
    end: TelemetrySnapshot
) -> dict[str, Any]
Compares two snapshots to compute deltas and energy consumption.
start
TelemetrySnapshot
required
Snapshot captured at start of operation
end
TelemetrySnapshot
required
Snapshot captured at end of operation
return
dict[str, Any]
Dictionary containing:
  • cpu_percent_start (float): Starting CPU usage
  • cpu_percent_end (float): Ending CPU usage
  • process_memory_start_mb (float): Starting process memory
  • process_memory_end_mb (float): Ending process memory
  • system_memory_percent_start (float): Starting system memory
  • system_memory_percent_end (float): Ending system memory
  • rapl_energy_j (float | None): Energy consumed in joules (None if unavailable or counter wrapped)
Example:
from pipeline.hardware import HardwareMonitor
import time

monitor = HardwareMonitor()

# Benchmark operation
start_snapshot = monitor.snapshot()
start_time = time.perf_counter()

# Simulate work
result = sum(i**2 for i in range(10_000_000))

end_time = time.perf_counter()
end_snapshot = monitor.snapshot()

# Compare snapshots
telemetry = monitor.compare(start_snapshot, end_snapshot)

print(f"Execution time: {end_time - start_time:.3f}s")
print(f"CPU: {telemetry['cpu_percent_start']:.1f}% → {telemetry['cpu_percent_end']:.1f}%")
print(f"Memory: {telemetry['process_memory_start_mb']:.2f} MB → {telemetry['process_memory_end_mb']:.2f} MB")
print(f"Memory delta: {telemetry['process_memory_end_mb'] - telemetry['process_memory_start_mb']:.2f} MB")

if telemetry['rapl_energy_j'] is not None:
    print(f"Energy consumed: {telemetry['rapl_energy_j']:.3f} J")
    watts = telemetry['rapl_energy_j'] / (end_time - start_time)
    print(f"Average power: {watts:.2f} W")
else:
    print("Energy data not available")

TelemetrySnapshot

@dataclass
class TelemetrySnapshot:
    cpu_percent: float
    process_memory_mb: float
    system_memory_percent: float
    energy_uj: float | None
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/hardware/monitor.py:8 Immutable snapshot of hardware state at a specific point in time.
cpu_percent
float
System-wide CPU usage percentage (0-100)
process_memory_mb
float
Current process RSS (Resident Set Size) in megabytes
system_memory_percent
float
System-wide memory usage percentage (0-100)
energy_uj
float | None
RAPL energy counter value in microjoules. None if RAPL unavailable.

Usage Patterns

Basic Performance Monitoring

from pipeline.hardware import HardwareMonitor
import time

def benchmark_function(func, *args, **kwargs):
    monitor = HardwareMonitor()
    
    start_snap = monitor.snapshot()
    start_time = time.perf_counter()
    
    result = func(*args, **kwargs)
    
    end_time = time.perf_counter()
    end_snap = monitor.snapshot()
    
    telemetry = monitor.compare(start_snap, end_snap)
    elapsed = end_time - start_time
    
    print(f"Function: {func.__name__}")
    print(f"Duration: {elapsed:.3f}s")
    print(f"Memory: {telemetry['process_memory_end_mb']:.2f} MB")
    
    if telemetry['rapl_energy_j']:
        print(f"Energy: {telemetry['rapl_energy_j']:.3f} J")
        print(f"Power: {telemetry['rapl_energy_j'] / elapsed:.2f} W")
    
    return result

# Usage
result = benchmark_function(lambda: sum(range(10_000_000)))

Pipeline Integration

from pipeline.config import PipelineConfig
from pipeline.hardware import HardwareMonitor
from pipeline.ingestion import DataIngestor
import time

config = PipelineConfig()
monitor = HardwareMonitor()
ingestor = DataIngestor()

# Monitor data loading
start = monitor.snapshot()
df = ingestor.load('nba_data.csv')
end = monitor.snapshot()

telemetry = monitor.compare(start, end)
print(f"Loaded {len(df)} rows")
print(f"Memory used: {telemetry['process_memory_end_mb'] - telemetry['process_memory_start_mb']:.2f} MB")

Resource-Constrained Execution

from pipeline.hardware import HardwareMonitor
import time

monitor = HardwareMonitor()
MAX_MEMORY_MB = 1024

def process_with_memory_check(data_chunks):
    results = []
    
    for i, chunk in enumerate(data_chunks):
        # Check memory before processing
        current_memory = monitor.process_memory_mb()
        
        if current_memory > MAX_MEMORY_MB:
            print(f"⚠ Memory limit exceeded: {current_memory:.2f} MB > {MAX_MEMORY_MB} MB")
            print(f"Pausing to allow garbage collection...")
            time.sleep(1)  # Allow GC
            
            # Recheck
            current_memory = monitor.process_memory_mb()
            if current_memory > MAX_MEMORY_MB:
                print(f"❌ Cannot continue - memory still at {current_memory:.2f} MB")
                break
        
        # Process chunk
        result = process_chunk(chunk)  # Your processing function
        results.append(result)
        
        print(f"Chunk {i}: {current_memory:.2f} MB")
    
    return results

Energy Profiling

from pipeline.hardware import HardwareMonitor
from pipeline.streaming.engine import RealTimePipelineRunner
from pipeline.config import PipelineConfig
import pandas as pd

config = PipelineConfig(benchmark_runs=5)
runner = RealTimePipelineRunner(config)
monitor = HardwareMonitor()

# Check if energy monitoring available
test_snap = monitor.snapshot()
if test_snap.energy_uj is None:
    print("⚠ RAPL energy monitoring not available on this system")
    print("Energy estimates will use fallback calculation")
else:
    print("✓ RAPL energy monitoring available")

# Run benchmarks
df = pd.read_csv('nba_data.csv')
benchmark_results = runner.benchmark(df)

# Extract energy data
for i, run in enumerate(benchmark_results['runs']):
    batch_energy = run['batch']['energy_estimate_j']
    stream_energy = run['streaming']['energy_estimate_j']
    
    print(f"Run {i+1}:")
    print(f"  Batch energy: {batch_energy:.2f} J")
    print(f"  Streaming energy: {stream_energy:.2f} J")
    print(f"  Difference: {stream_energy - batch_energy:+.2f} J")

Cross-Platform Compatibility

from pipeline.hardware import HardwareMonitor
import sys

monitor = HardwareMonitor()
snapshot = monitor.snapshot()

print(f"Platform: {sys.platform}")
print(f"Python: {sys.version}")
print()

# Check available features
features = []

if snapshot.cpu_percent > 0 or snapshot.process_memory_mb > 0:
    features.append("✓ psutil (CPU/Memory)")
else:
    features.append("✗ psutil not available")

if snapshot.energy_uj is not None:
    features.append("✓ RAPL energy monitoring")
else:
    features.append("✗ RAPL not available (Linux + Intel CPU required)")

print("Hardware monitoring capabilities:")
for feature in features:
    print(f"  {feature}")

Notes

  • psutil dependency: Optional but recommended. Install with pip install psutil
  • RAPL availability: Only on Linux with Intel CPUs and requires read access to /sys/class/powercap/
  • Energy counter wrap: If energy_uj wraps around (end < start), rapl_energy_j returns None
  • CPU percent: Uses psutil.cpu_percent(interval=None) for instant reading (not averaged)
  • Process memory: Reports RSS (Resident Set Size), not virtual memory
  • Fallback behavior: Returns 0.0 for all metrics when psutil unavailable, enabling graceful degradation
  • Thread safety: Not thread-safe. Create separate instances for concurrent monitoring
  • Sampling frequency: Call snapshot() as frequently as needed; no internal rate limiting
  • Energy units: RAPL counter is in microjoules (μJ), converted to joules (J) in compare()

Build docs developers (and LLMs) love