Skip to main content
Edge devices often run at reduced CPU frequencies to save power, which directly impacts inference latency. The deployment_simulation function models this behavior by applying a frequency-dependent latency multiplier to benchmark results.

Overview

The deployment_simulation function is defined in src/edge_opt/deploy.py:10-33 and provides a realistic simulation of how your model performs under different CPU frequency scaling scenarios.
def deployment_simulation(
    model: nn.Module, 
    loader: DataLoader, 
    cpu_frequency_scale: float, 
    stream_items: int = 128
) -> dict[str, float]:
This function benchmarks both batch inference (processing multiple samples at once) and streaming inference (processing samples one at a time), then applies a latency multiplier based on CPU frequency scaling.

CPU Frequency Scale Parameter

The cpu_frequency_scale parameter represents the ratio of the current CPU frequency to the maximum frequency:
cpu_frequency_scale = current_frequency / max_frequency

Example Values

ScenarioMax FreqCurrent Freqcpu_frequency_scale
Full performance2.0 GHz2.0 GHz1.0
Power saving mode2.0 GHz1.0 GHz0.5
Ultra-low power2.0 GHz0.4 GHz0.2
Overclocked2.0 GHz2.4 GHz1.2
The cpu_frequency_scale should typically be in the range [0.2, 1.0] for realistic edge device scenarios. Values below 0.2 may indicate thermal throttling or extreme power constraints.

Latency Multiplier Calculation

The key insight is that latency is inversely proportional to CPU frequency. This relationship is captured in line 13:
latency_multiplier = 1.0 / max(cpu_frequency_scale, 1e-6)

Mathematical Relationship

1

Inverse Relationship

When CPU frequency decreases, execution time increases proportionally
latency ∝ 1 / frequency
2

Compute Multiplier

The multiplier converts benchmark latency (measured at full speed) to scaled latency
latency_multiplier = 1.0 / cpu_frequency_scale
3

Safety Check

The max(cpu_frequency_scale, 1e-6) prevents division by zero if frequency scale is accidentally set to 0
4

Apply to Measurements

All timing measurements are multiplied by this factor to simulate the slower CPU
batch_time = (time.perf_counter() - start_batch) * latency_multiplier

Example Calculations

cpu_frequency_scalelatency_multiplierEffect on Latency
1.0 (full speed)1.0No change (100%)
0.5 (half speed)2.02x slower (200%)
0.25 (quarter speed)4.04x slower (400%)
0.1 (extreme throttle)10.010x slower (1000%)

Function Implementation

Here’s the complete implementation with detailed breakdown:
def deployment_simulation(model: nn.Module, loader: DataLoader, cpu_frequency_scale: float, stream_items: int = 128) -> dict[str, float]:
    model.eval()  # Set model to evaluation mode
    batch_inputs, _ = next(iter(loader))  # Get one batch from loader
    latency_multiplier = 1.0 / max(cpu_frequency_scale, 1e-6)  # Line 13

    with torch.no_grad():  # Disable gradient computation
        # Batch inference benchmark
        start_batch = time.perf_counter()
        _ = model(batch_inputs)
        batch_time = (time.perf_counter() - start_batch) * latency_multiplier  # Line 18

        # Streaming inference benchmark
        stream = batch_inputs[:stream_items]  # Take first N items
        start_stream = time.perf_counter()
        for item in stream:
            _ = model(item.unsqueeze(0))  # Process one sample at a time
        stream_time = (time.perf_counter() - start_stream) * latency_multiplier  # Line 24

    return {
        "cpu_frequency_scale": cpu_frequency_scale,
        "latency_multiplier": latency_multiplier,
        "batch_latency_ms": batch_time * 1000.0,
        "batch_throughput_sps": batch_inputs.shape[0] / batch_time,
        "stream_avg_latency_ms": (stream_time / stream.shape[0]) * 1000.0,
        "stream_throughput_sps": stream.shape[0] / stream_time,
    }

Key Operations

Batch inference (lines 16-18): Processes all samples in the batch simultaneously, leveraging vectorization and parallel computation.Streaming inference (lines 20-24): Processes samples one at a time in a loop, simulating real-time edge scenarios where data arrives sequentially.Batch inference is typically 5-20x faster per sample due to hardware parallelism.
Uses time.perf_counter() for high-resolution timing. The measured wall-clock time is then multiplied by latency_multiplier to simulate the slower CPU.
Throughput is measured in samples per second (sps):
throughput = num_samples / time_in_seconds
As latency increases (due to frequency scaling), throughput decreases proportionally.

Return Value Structure

The function returns a dictionary with six key metrics:
{
    "cpu_frequency_scale": 0.5,       # Input parameter (for record-keeping)
    "latency_multiplier": 2.0,        # Computed from frequency scale
    "batch_latency_ms": 45.2,         # Time to process entire batch
    "batch_throughput_sps": 708.8,    # Samples per second (batch mode)
    "stream_avg_latency_ms": 1.8,     # Average time per sample (streaming)
    "stream_throughput_sps": 555.5    # Samples per second (streaming)
}
All latency values are in milliseconds, and all throughput values are in samples per second.

Practical Usage Example

from edge_opt.deploy import deployment_simulation
import torch
from torch.utils.data import DataLoader

# Assume you have a trained model and validation loader
model = load_your_model()
val_loader = DataLoader(val_dataset, batch_size=32)

# Simulate different CPU frequency scenarios
frequency_scales = [1.0, 0.8, 0.6, 0.4, 0.2]

results = []
for scale in frequency_scales:
    metrics = deployment_simulation(
        model=model,
        loader=val_loader,
        cpu_frequency_scale=scale,
        stream_items=128
    )
    results.append(metrics)
    print(f"CPU @ {scale*100:.0f}%: "
          f"Batch latency={metrics['batch_latency_ms']:.1f}ms, "
          f"Stream latency={metrics['stream_avg_latency_ms']:.1f}ms")

Example Output

CPU @ 100%: Batch latency=22.5ms, Stream latency=0.9ms
CPU @ 80%: Batch latency=28.1ms, Stream latency=1.1ms
CPU @ 60%: Batch latency=37.5ms, Stream latency=1.5ms
CPU @ 40%: Batch latency=56.2ms, Stream latency=2.2ms
CPU @ 20%: Batch latency=112.5ms, Stream latency=4.5ms

Integration with collect_metrics

The latency_multiplier concept is also used in the main metrics collection function (src/edge_opt/metrics.py:70-99):
def collect_metrics(
    model: nn.Module,
    loader: DataLoader,
    device: torch.device,
    power_watts: float,
    precision: str,
    latency_multiplier: float = 1.0,  # Can be set from frequency scale
    benchmark_repeats: int = 5,
) -> PerfMetrics:
    # ... benchmark code ...
    latency_mean, latency_std, latency_p95 = measure_latency_distribution(model, sample_input, repeats=benchmark_repeats)
    latency = latency_mean * latency_multiplier  # Line 86
    # ... rest of metrics ...
You can compute the multiplier using deployment_simulation and pass it to collect_metrics for consistent frequency scaling across your entire evaluation pipeline:
# Determine latency multiplier for target device
sim_result = deployment_simulation(model, loader, cpu_frequency_scale=0.5)
latency_multiplier = sim_result["latency_multiplier"]  # 2.0

# Use in full metrics collection
metrics = collect_metrics(
    model=model,
    loader=val_loader,
    device=device,
    power_watts=2.5,
    precision="fp32",
    latency_multiplier=latency_multiplier,  # Apply frequency scaling
    benchmark_repeats=5
)

Use Cases

Raspberry Pi 4 supports multiple CPU governors (performance, powersave, ondemand). Use cpu_frequency_scale to model each mode:
  • performance: scale = 1.0 (1.5 GHz)
  • ondemand: scale = 0.8 (1.2 GHz)
  • powersave: scale = 0.4 (600 MHz)
On battery-powered devices, CPU frequency dynamically adjusts based on remaining charge. Simulate different battery levels:
  • 100-80% battery: scale = 1.0
  • 80-40% battery: scale = 0.7
  • 40-20% battery: scale = 0.5
  • <20% battery: scale = 0.3
When devices overheat, CPUs automatically reduce frequency. Model thermal scenarios:
  • Normal temperature: scale = 1.0
  • Warm (60°C): scale = 0.85
  • Hot (70°C): scale = 0.6
  • Critical (80°C): scale = 0.4
Different edge devices have different base frequencies. Normalize comparisons:
device_configs = [
    {"name": "Jetson Nano", "freq_ghz": 1.43, "scale": 1.0},
    {"name": "RPi 4", "freq_ghz": 1.5, "scale": 1.05},
    {"name": "RPi Zero", "freq_ghz": 1.0, "scale": 0.7},
]

Limitations and Considerations

The latency multiplier assumes a linear relationship between CPU frequency and execution time. In practice:
  • Memory-bound operations may not scale linearly
  • Cache effects can cause non-linear behavior
  • I/O operations are unaffected by CPU frequency
  • GPU/NPU accelerators have independent frequency scaling
For critical deployments, validate the simulation against real hardware measurements.
The stream_items parameter (default=128) controls how many samples are processed in streaming mode. Increase this for more stable timing measurements, but be aware it will slow down the simulation.
  • measure_latency() - Core latency measurement (src/edge_opt/metrics.py:39)
  • measure_latency_distribution() - Latency with statistics (src/edge_opt/metrics.py:53)
  • collect_metrics() - Full metrics with latency multiplier (src/edge_opt/metrics.py:70)

Build docs developers (and LLMs) love