Overview
The HardwareMonitor class provides hardware telemetry with graceful fallback when psutil is unavailable. Supports RAPL energy measurements on Linux systems with Intel CPUs.
Class Definition
class HardwareMonitor:
def __init__(self) -> None
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/hardware/monitor.py:16
Automatically detects available hardware monitoring capabilities on initialization:
- Attempts to import
psutil for CPU and memory monitoring
- Discovers RAPL energy interface at
/sys/class/powercap/intel-rapl*/energy_uj
- Falls back gracefully if either is unavailable
Methods
snapshot
def snapshot(self) -> TelemetrySnapshot
Captures current hardware state.
Dataclass containing:
cpu_percent (float): CPU usage percentage (0.0 if psutil unavailable)
process_memory_mb (float): Process memory in MB (0.0 if psutil unavailable)
system_memory_percent (float): System memory usage percentage (0.0 if psutil unavailable)
energy_uj (float | None): RAPL energy counter in microjoules (None if unavailable)
Example:
from pipeline.hardware import HardwareMonitor
import time
monitor = HardwareMonitor()
# Capture snapshot
snapshot = monitor.snapshot()
print(f"CPU usage: {snapshot.cpu_percent:.1f}%")
print(f"Process memory: {snapshot.process_memory_mb:.2f} MB")
print(f"System memory: {snapshot.system_memory_percent:.1f}%")
if snapshot.energy_uj is not None:
print(f"Energy counter: {snapshot.energy_uj:,.0f} μJ")
else:
print("Energy monitoring not available")
process_memory_mb
def process_memory_mb(self) -> float
Convenience method to get current process memory usage.
Process memory in megabytes (0.0 if psutil unavailable)
Example:
from pipeline.hardware import HardwareMonitor
monitor = HardwareMonitor()
mem_before = monitor.process_memory_mb()
print(f"Memory before: {mem_before:.2f} MB")
# Do some memory-intensive work
data = [i**2 for i in range(10_000_000)]
mem_after = monitor.process_memory_mb()
print(f"Memory after: {mem_after:.2f} MB")
print(f"Memory increase: {mem_after - mem_before:.2f} MB")
compare
def compare(
self,
start: TelemetrySnapshot,
end: TelemetrySnapshot
) -> dict[str, Any]
Compares two snapshots to compute deltas and energy consumption.
start
TelemetrySnapshot
required
Snapshot captured at start of operation
end
TelemetrySnapshot
required
Snapshot captured at end of operation
Dictionary containing:
cpu_percent_start (float): Starting CPU usage
cpu_percent_end (float): Ending CPU usage
process_memory_start_mb (float): Starting process memory
process_memory_end_mb (float): Ending process memory
system_memory_percent_start (float): Starting system memory
system_memory_percent_end (float): Ending system memory
rapl_energy_j (float | None): Energy consumed in joules (None if unavailable or counter wrapped)
Example:
from pipeline.hardware import HardwareMonitor
import time
monitor = HardwareMonitor()
# Benchmark operation
start_snapshot = monitor.snapshot()
start_time = time.perf_counter()
# Simulate work
result = sum(i**2 for i in range(10_000_000))
end_time = time.perf_counter()
end_snapshot = monitor.snapshot()
# Compare snapshots
telemetry = monitor.compare(start_snapshot, end_snapshot)
print(f"Execution time: {end_time - start_time:.3f}s")
print(f"CPU: {telemetry['cpu_percent_start']:.1f}% → {telemetry['cpu_percent_end']:.1f}%")
print(f"Memory: {telemetry['process_memory_start_mb']:.2f} MB → {telemetry['process_memory_end_mb']:.2f} MB")
print(f"Memory delta: {telemetry['process_memory_end_mb'] - telemetry['process_memory_start_mb']:.2f} MB")
if telemetry['rapl_energy_j'] is not None:
print(f"Energy consumed: {telemetry['rapl_energy_j']:.3f} J")
watts = telemetry['rapl_energy_j'] / (end_time - start_time)
print(f"Average power: {watts:.2f} W")
else:
print("Energy data not available")
TelemetrySnapshot
@dataclass
class TelemetrySnapshot:
cpu_percent: float
process_memory_mb: float
system_memory_percent: float
energy_uj: float | None
Source: ~/workspace/source/NBA Data Preprocessing/task/pipeline/hardware/monitor.py:8
Immutable snapshot of hardware state at a specific point in time.
System-wide CPU usage percentage (0-100)
Current process RSS (Resident Set Size) in megabytes
System-wide memory usage percentage (0-100)
RAPL energy counter value in microjoules. None if RAPL unavailable.
Usage Patterns
from pipeline.hardware import HardwareMonitor
import time
def benchmark_function(func, *args, **kwargs):
monitor = HardwareMonitor()
start_snap = monitor.snapshot()
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
end_snap = monitor.snapshot()
telemetry = monitor.compare(start_snap, end_snap)
elapsed = end_time - start_time
print(f"Function: {func.__name__}")
print(f"Duration: {elapsed:.3f}s")
print(f"Memory: {telemetry['process_memory_end_mb']:.2f} MB")
if telemetry['rapl_energy_j']:
print(f"Energy: {telemetry['rapl_energy_j']:.3f} J")
print(f"Power: {telemetry['rapl_energy_j'] / elapsed:.2f} W")
return result
# Usage
result = benchmark_function(lambda: sum(range(10_000_000)))
Pipeline Integration
from pipeline.config import PipelineConfig
from pipeline.hardware import HardwareMonitor
from pipeline.ingestion import DataIngestor
import time
config = PipelineConfig()
monitor = HardwareMonitor()
ingestor = DataIngestor()
# Monitor data loading
start = monitor.snapshot()
df = ingestor.load('nba_data.csv')
end = monitor.snapshot()
telemetry = monitor.compare(start, end)
print(f"Loaded {len(df)} rows")
print(f"Memory used: {telemetry['process_memory_end_mb'] - telemetry['process_memory_start_mb']:.2f} MB")
Resource-Constrained Execution
from pipeline.hardware import HardwareMonitor
import time
monitor = HardwareMonitor()
MAX_MEMORY_MB = 1024
def process_with_memory_check(data_chunks):
results = []
for i, chunk in enumerate(data_chunks):
# Check memory before processing
current_memory = monitor.process_memory_mb()
if current_memory > MAX_MEMORY_MB:
print(f"⚠ Memory limit exceeded: {current_memory:.2f} MB > {MAX_MEMORY_MB} MB")
print(f"Pausing to allow garbage collection...")
time.sleep(1) # Allow GC
# Recheck
current_memory = monitor.process_memory_mb()
if current_memory > MAX_MEMORY_MB:
print(f"❌ Cannot continue - memory still at {current_memory:.2f} MB")
break
# Process chunk
result = process_chunk(chunk) # Your processing function
results.append(result)
print(f"Chunk {i}: {current_memory:.2f} MB")
return results
Energy Profiling
from pipeline.hardware import HardwareMonitor
from pipeline.streaming.engine import RealTimePipelineRunner
from pipeline.config import PipelineConfig
import pandas as pd
config = PipelineConfig(benchmark_runs=5)
runner = RealTimePipelineRunner(config)
monitor = HardwareMonitor()
# Check if energy monitoring available
test_snap = monitor.snapshot()
if test_snap.energy_uj is None:
print("⚠ RAPL energy monitoring not available on this system")
print("Energy estimates will use fallback calculation")
else:
print("✓ RAPL energy monitoring available")
# Run benchmarks
df = pd.read_csv('nba_data.csv')
benchmark_results = runner.benchmark(df)
# Extract energy data
for i, run in enumerate(benchmark_results['runs']):
batch_energy = run['batch']['energy_estimate_j']
stream_energy = run['streaming']['energy_estimate_j']
print(f"Run {i+1}:")
print(f" Batch energy: {batch_energy:.2f} J")
print(f" Streaming energy: {stream_energy:.2f} J")
print(f" Difference: {stream_energy - batch_energy:+.2f} J")
from pipeline.hardware import HardwareMonitor
import sys
monitor = HardwareMonitor()
snapshot = monitor.snapshot()
print(f"Platform: {sys.platform}")
print(f"Python: {sys.version}")
print()
# Check available features
features = []
if snapshot.cpu_percent > 0 or snapshot.process_memory_mb > 0:
features.append("✓ psutil (CPU/Memory)")
else:
features.append("✗ psutil not available")
if snapshot.energy_uj is not None:
features.append("✓ RAPL energy monitoring")
else:
features.append("✗ RAPL not available (Linux + Intel CPU required)")
print("Hardware monitoring capabilities:")
for feature in features:
print(f" {feature}")
Notes
- psutil dependency: Optional but recommended. Install with
pip install psutil
- RAPL availability: Only on Linux with Intel CPUs and requires read access to
/sys/class/powercap/
- Energy counter wrap: If
energy_uj wraps around (end < start), rapl_energy_j returns None
- CPU percent: Uses
psutil.cpu_percent(interval=None) for instant reading (not averaged)
- Process memory: Reports RSS (Resident Set Size), not virtual memory
- Fallback behavior: Returns 0.0 for all metrics when psutil unavailable, enabling graceful degradation
- Thread safety: Not thread-safe. Create separate instances for concurrent monitoring
- Sampling frequency: Call
snapshot() as frequently as needed; no internal rate limiting
- Energy units: RAPL counter is in microjoules (μJ), converted to joules (J) in
compare()