Skip to main content

Overview

Signal analysis functions extract temporal signals from filtered video tensors and compute dominant frequencies using FFT (Fast Fourier Transform). These functions convert magnified EVM signals into heart rate and respiratory rate measurements in BPM/RPM.

extract_temporal_signal

Extracts a 1D temporal signal by spatially averaging each frame in a video tensor.
from src.evm.signal_analysis import extract_temporal_signal

signal = extract_temporal_signal(video_tensor, use_green_channel=True)

Parameters

video_tensor
np.ndarray
required
Video tensor with shape (T, H, W, C) where:
  • T = number of frames
  • H = height
  • W = width
  • C = color channels (3 for BGR)
use_green_channel
bool
default:"True"
If True, uses only the green channel (index 1 in BGR). If False, averages all channels.The green channel provides better signal-to-noise ratio for pulse detection.

Returns

signal
np.ndarray
1D temporal signal with length T. Each value represents the spatial average of one frame.

Implementation

signal = []
for frame in video_tensor:
    if use_green_channel and frame.ndim == 3:
        mean_val = np.mean(frame[:, :, 1])  # Green channel (index 1 in BGR)
    else:
        mean_val = np.mean(frame)
    signal.append(mean_val)

return np.array(signal)

preprocess_signal

Preprocesses temporal signal through detrending and normalization.
from src.evm.signal_analysis import preprocess_signal

processed = preprocess_signal(signal)

Parameters

signal
np.ndarray
required
1D temporal signal from extract_temporal_signal.

Returns

preprocessed_signal
np.ndarray | None
Detrended and normalized signal. Returns None if:
  • Signal is None
  • Signal has fewer than 20 samples
  • Signal is constant (std < 1e-10)
  • Detrended signal is constant (std < 1e-8)

Processing Steps

  1. Constant check: Reject if std < 1e-10
  2. Detrending: Remove linear trend using scipy.signal.detrend
  3. Constant check after detrend: Reject if std < 1e-8
  4. Normalization: Divide by standard deviation
# Remove linear trend
signal_detrended = sp_signal.detrend(signal)

# Normalize
signal_std = np.std(signal_detrended)
signal_normalized = signal_detrended / signal_std

apply_hamming_window

Applies Hamming window to reduce spectral leakage in FFT.
from src.evm.signal_analysis import apply_hamming_window

windowed = apply_hamming_window(signal)

Parameters

signal
np.ndarray
required
Temporal signal to window.

Returns

windowed_signal
np.ndarray
Signal multiplied by Hamming window.

Why Hamming Window?

The Hamming window reduces spectral leakage by smoothly tapering the signal at its edges. This improves FFT accuracy by minimizing artifacts from discontinuities.

calculate_power_spectrum

Computes power spectrum using FFT.
from src.evm.signal_analysis import calculate_power_spectrum

fft_freq, power_spectrum = calculate_power_spectrum(signal, fps=30)

Parameters

signal
np.ndarray
required
Temporal signal (typically preprocessed and windowed).
fps
float
required
Frames per second (sampling rate).

Returns

fft_freq
np.ndarray
Frequency values corresponding to FFT bins (in Hz).
power_spectrum
np.ndarray
Power spectrum (magnitude squared of FFT).

Implementation

# Compute FFT
fft_vals = np.fft.fft(signal)
fft_freq = np.fft.fftfreq(len(signal), d=1.0/fps)

# Power spectrum
power_spectrum = np.abs(fft_vals) ** 2

find_dominant_frequency

Finds the dominant frequency within a specified physiological range.
from src.evm.signal_analysis import find_dominant_frequency

frequency_bpm = find_dominant_frequency(
    fft_freq=fft_freq,
    power_spectrum=power_spectrum,
    lowcut_hz=0.8,
    highcut_hz=3.0,
    min_bpm=40,
    max_bpm=250
)

Parameters

fft_freq
np.ndarray
required
FFT frequency array from calculate_power_spectrum.
power_spectrum
np.ndarray
required
Power spectrum from calculate_power_spectrum.
lowcut_hz
float
required
Minimum frequency in Hz to consider.
highcut_hz
float
required
Maximum frequency in Hz to consider.
min_bpm
float
required
Minimum physiologically valid BPM/RPM.
max_bpm
float
required
Maximum physiologically valid BPM/RPM.

Returns

frequency_bpm
float | None
Dominant frequency in beats/breaths per minute. Returns None if:
  • No frequencies in specified range
  • Computed BPM outside [min_bpm, max_bpm] range

Algorithm

# Mask frequency range
freq_mask = (fft_freq >= lowcut_hz) & (fft_freq <= highcut_hz)
masked_power = power_spectrum[freq_mask]
masked_freqs = fft_freq[freq_mask]

# Find peak
dominant_freq_hz = masked_freqs[np.argmax(masked_power)]

# Convert to BPM
frequency_bpm = abs(dominant_freq_hz * 60.0)

# Validate range
if frequency_bpm < min_bpm or frequency_bpm > max_bpm:
    return None

calculate_frequency_fft

Main API: Complete pipeline to calculate dominant frequency using FFT.
from src.evm.signal_analysis import calculate_frequency_fft

heart_rate = calculate_frequency_fft(
    temporal_signal=hr_signal,
    fps=30,
    lowcut_hz=0.83,
    highcut_hz=3.0,
    min_bpm=40,
    max_bpm=250
)

Parameters

temporal_signal
np.ndarray
required
1D temporal signal from EVM processing.
fps
float
required
Frames per second (video sampling rate).
lowcut_hz
float
required
Low cutoff frequency in Hz.
  • Heart rate: 0.83 Hz (50 BPM)
  • Respiratory rate: 0.18 Hz (11 RPM)
highcut_hz
float
required
High cutoff frequency in Hz.
  • Heart rate: 3.0 Hz (180 BPM)
  • Respiratory rate: 0.5 Hz (30 RPM)
min_bpm
float
required
Minimum physiologically valid BPM/RPM.
  • Heart rate: 40 BPM
  • Respiratory rate: 8 RPM
max_bpm
float
required
Maximum physiologically valid BPM/RPM.
  • Heart rate: 250 BPM
  • Respiratory rate: 35 RPM

Returns

frequency_bpm
float | None
Detected frequency in BPM (heart rate) or RPM (respiratory rate). Returns None if:
  • Signal preprocessing fails
  • No dominant frequency detected
  • Frequency outside physiological range

Complete Pipeline

Implementation

def calculate_frequency_fft(temporal_signal, fps, lowcut_hz, highcut_hz, 
                           min_bpm, max_bpm):
    # Step 1: Preprocessing
    signal_processed = preprocess_signal(temporal_signal)
    if signal_processed is None:
        return None
    
    # Step 2: Hamming window
    signal_windowed = apply_hamming_window(signal_processed)
    
    # Step 3: FFT
    fft_freq, power_spectrum = calculate_power_spectrum(signal_windowed, fps)
    
    # Step 4: Find dominant frequency
    frequency_bpm = find_dominant_frequency(
        fft_freq, power_spectrum, lowcut_hz, highcut_hz, min_bpm, max_bpm
    )
    
    return frequency_bpm

Usage Example: Heart Rate Detection

Complete example extracting heart rate from EVM output:
import numpy as np
from src.evm.evm_core import EVMProcessor
from src.evm.signal_analysis import calculate_frequency_fft
from src.config import (
    FPS, LOW_HEART, HIGH_HEART, MIN_HEART_BPM, MAX_HEART_BPM
)

# Process video with EVM
video_frames = [...]  # 200 BGR frames
processor = EVMProcessor()
hr_signal, rr_signal = processor.process_dual_band(video_frames)

print(f"HR signal shape: {hr_signal.shape}")  # (200,)
print(f"HR signal range: {hr_signal.min():.2f} to {hr_signal.max():.2f}")

# Calculate heart rate using FFT
heart_rate = calculate_frequency_fft(
    temporal_signal=hr_signal,
    fps=FPS,                    # 30
    lowcut_hz=LOW_HEART,        # 0.83 Hz
    highcut_hz=HIGH_HEART,      # 3.0 Hz
    min_bpm=MIN_HEART_BPM,      # 40 BPM
    max_bpm=MAX_HEART_BPM       # 250 BPM
)

if heart_rate:
    print(f"Detected Heart Rate: {heart_rate:.1f} BPM")
else:
    print("Heart rate detection failed")

Usage Example: Respiratory Rate Detection

from src.config import (
    LOW_RESP, HIGH_RESP, MIN_RESP_BPM, MAX_RESP_BPM
)

# Use RR signal from dual-band processing
respiratory_rate = calculate_frequency_fft(
    temporal_signal=rr_signal,
    fps=FPS,                    # 30
    lowcut_hz=LOW_RESP,         # 0.18 Hz
    highcut_hz=HIGH_RESP,       # 0.5 Hz
    min_bpm=MIN_RESP_BPM,       # 8 RPM
    max_bpm=MAX_RESP_BPM        # 35 RPM
)

if respiratory_rate:
    print(f"Detected Respiratory Rate: {respiratory_rate:.1f} RPM")
else:
    print("Respiratory rate detection failed")

Frequency Band Parameters

Heart Rate

lowcut_hz = 0.83      # 50 BPM
highcut_hz = 3.0      # 180 BPM
min_bpm = 40
max_bpm = 250

Respiratory Rate

lowcut_hz = 0.18      # 11 RPM
highcut_hz = 0.5      # 30 RPM
min_bpm = 8
max_bpm = 35

Hz to BPM Conversion

The conversion from frequency (Hz) to beats/breaths per minute:
frequency_bpm = frequency_hz * 60.0

Examples

Frequency (Hz)Heart Rate (BPM)Respiratory Rate (RPM)
0.5 Hz30 BPM30 RPM
1.0 Hz60 BPM60 RPM (too high for breathing)
1.5 Hz90 BPM-
2.0 Hz120 BPM-

Signal Quality Checks

The pipeline includes multiple quality checks:
  1. Minimum length: Signal must have ≥20 samples
  2. Constant signal: Rejected if std < 1e-10
  3. Post-detrend check: Rejected if std < 1e-8 after detrending
  4. Frequency range: Must be within [lowcut_hz, highcut_hz]
  5. Physiological range: Must be within [min_bpm, max_bpm]

Performance Metrics

Computational Cost

For a 200-sample signal on Raspberry Pi 4:
  • Preprocessing: ~2-3ms
  • Hamming window: ~1ms
  • FFT: ~5-10ms
  • Peak finding: ~1-2ms
  • Total: ~10-20ms per signal

Accuracy

Benchmark results on validation dataset:
  • Mean Absolute Error (MAE): < 5 BPM
  • Root Mean Square Error (RMSE): < 7 BPM
  • Within ±5 BPM: > 70%
  • Within ±10 BPM: > 90%
  • Correlation with ground truth: > 0.85

Error Handling

Robust error handling at each stage:
  • None input: Returns None
  • Short signals: Returns None for < 20 samples
  • Constant signals: Returns None to avoid division by zero
  • No peaks found: Returns None
  • Out of range: Returns None if BPM invalid

Build docs developers (and LLMs) love