Skip to main content
Performance is critical in Home Assistant. Poorly optimized integrations can slow down the entire system, increase resource usage, and degrade user experience.

Async Programming

Use Async/Await

Home Assistant is built on asyncio. Always use async functions:
# ✅ Correct: Async function
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
    """Set up from a config entry."""
    client = await create_async_client(entry.data)
    await client.connect()
    return True

# ❌ Wrong: Blocking function
def setup_entry(hass, entry):
    client = create_blocking_client(entry.data)
    client.connect()  # Blocks the event loop!
    return True

Avoid Blocking I/O

Never use blocking I/O in async functions:
import asyncio
from pathlib import Path

# ❌ Wrong: Blocking file I/O
async def load_config(path: str) -> dict:
    with open(path) as f:  # Blocks event loop!
        return json.load(f)

# ✅ Correct: Use executor for blocking I/O
async def load_config(hass: HomeAssistant, path: str) -> dict:
    def _load():
        with open(path) as f:
            return json.load(f)
    
    return await hass.async_add_executor_job(_load)

# ✅ Better: Use async file I/O
import aiofiles

async def load_config(path: str) -> dict:
    async with aiofiles.open(path) as f:
        content = await f.read()
        return json.loads(content)

Use aiohttp for HTTP Requests

Always use aiohttp instead of requests:
from homeassistant.helpers.aiohttp_client import async_get_clientsession

# ✅ Correct: Async HTTP with aiohttp
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
    session = async_get_clientsession(hass)
    
    async with session.get(f"https://api.example.com/status") as response:
        data = await response.json()
    
    return True

# ❌ Wrong: Blocking HTTP with requests
import requests

async def async_setup_entry(hass, entry):
    response = requests.get("https://api.example.com/status")  # Blocks!
    data = response.json()
    return True

Timeouts

Always use timeouts for network operations:
import asyncio
from aiohttp import ClientTimeout

# ✅ Correct: With timeout
async def fetch_data(session):
    try:
        async with asyncio.timeout(30):
            async with session.get(url) as response:
                return await response.json()
    except asyncio.TimeoutError:
        _LOGGER.warning("Request timed out")
        raise

# ✅ Also correct: aiohttp timeout
async def fetch_data(session):
    timeout = ClientTimeout(total=30)
    async with session.get(url, timeout=timeout) as response:
        return await response.json()

Data Updates

Use DataUpdateCoordinator

Coordinators prevent redundant API calls:
from homeassistant.helpers.update_coordinator import (
    DataUpdateCoordinator,
    UpdateFailed,
)
from datetime import timedelta

class MyCoordinator(DataUpdateCoordinator):
    """Coordinator for fetching data."""
    
    def __init__(self, hass: HomeAssistant, client) -> None:
        """Initialize coordinator."""
        super().__init__(
            hass,
            _LOGGER,
            name="My Integration",
            # Update every 30 seconds
            update_interval=timedelta(seconds=30),
        )
        self.client = client
    
    async def _async_update_data(self):
        """Fetch data from API."""
        try:
            # Single API call for all entities
            return await self.client.fetch_all_data()
        except Exception as err:
            raise UpdateFailed(f"Error fetching data: {err}") from err

Appropriate Polling Intervals

Choose reasonable update intervals:
# ✅ Good: Reasonable intervals based on device type

# Fast-changing data (power consumption)
update_interval=timedelta(seconds=30)

# Moderate data (temperature)
update_interval=timedelta(minutes=5)

# Slow-changing data (daily statistics)
update_interval=timedelta(hours=1)

# ❌ Bad: Too frequent
update_interval=timedelta(seconds=1)  # Wastes resources!

Parallel Updates Protection

Prevent concurrent entity updates:
from homeassistant.helpers.entity import Entity

class MyEntity(Entity):
    """Entity with update protection."""
    
    # Limit to 1 concurrent update
    _attr_parallel_updates = 1
    
    async def async_turn_on(self, **kwargs):
        """Turn on device."""
        await self.device.turn_on()
        await self.coordinator.async_request_refresh()
This is defined in PARALLEL_UPDATES at the platform level:
# In sensor.py, light.py, etc.
PARALLEL_UPDATES = 1

Efficient Data Structures

Use Runtime Data

Avoid storing data in hass.data:
# ✅ Correct: Type-safe runtime data
from homeassistant.config_entries import ConfigEntry

type MyConfigEntry = ConfigEntry[MyCoordinator]

async def async_setup_entry(hass: HomeAssistant, entry: MyConfigEntry) -> bool:
    coordinator = MyCoordinator(hass)
    await coordinator.async_config_entry_first_refresh()
    
    # Store in runtime_data
    entry.runtime_data = coordinator
    return True

# Access in platform
async def async_setup_entry(
    hass: HomeAssistant,
    entry: MyConfigEntry,
    async_add_entities,
):
    coordinator = entry.runtime_data
    async_add_entities([MyEntity(coordinator)])

# ❌ Wrong: Using hass.data
async def async_setup_entry(hass, entry):
    hass.data.setdefault(DOMAIN, {})
    hass.data[DOMAIN][entry.entry_id] = coordinator

Cache Expensive Operations

from functools import lru_cache
from homeassistant.util import slugify

# Cache pure functions
@lru_cache(maxsize=128)
def get_entity_id(domain: str, name: str) -> str:
    """Generate entity ID."""
    return f"{domain}.{slugify(name)}"

# Use cached_property for expensive properties
from functools import cached_property

class MyDevice:
    @cached_property
    def capabilities(self) -> dict:
        """Get device capabilities (computed once)."""
        return self._compute_capabilities()

Memory Management

Avoid Memory Leaks

Clean up resources properly:
class MyCoordinator(DataUpdateCoordinator):
    """Coordinator with proper cleanup."""
    
    def __init__(self, hass: HomeAssistant, client) -> None:
        super().__init__(hass, _LOGGER, name=DOMAIN)
        self.client = client
        self._listeners = []
    
    async def async_shutdown(self):
        """Shutdown coordinator."""
        # Remove listeners
        for unlisten in self._listeners:
            unlisten()
        self._listeners.clear()
        
        # Close client connection
        await self.client.close()

async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
    """Unload config entry."""
    coordinator = entry.runtime_data
    
    # Cleanup before unloading
    await coordinator.async_shutdown()
    
    return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)

Limit Data Storage

from collections import deque

class MyEntity(Entity):
    """Entity with limited history."""
    
    def __init__(self) -> None:
        # Store only last 100 values, not unlimited
        self._history = deque(maxlen=100)
    
    async def async_update(self):
        value = await self.coordinator.get_value()
        self._history.append(value)

Database Performance

Exclude Unnecessary Data

Don’t store rapidly changing data:
from homeassistant.helpers.entity import Entity

class MySensor(Entity):
    """Sensor that updates frequently."""
    
    # Don't record high-frequency updates to database
    _attr_should_poll = False
    
    # For diagnostic data that changes often
    @property
    def extra_state_attributes(self) -> dict:
        return {
            # This changes every second - don't record
            "last_update": self._last_update.isoformat(),
            # But this is stable - OK to record
            "device_model": self._device_model,
        }

Use State Classes

from homeassistant.components.sensor import (
    SensorEntity,
    SensorStateClass,
)

class MySensor(SensorEntity):
    """Sensor with state class for statistics."""
    
    _attr_state_class = SensorStateClass.MEASUREMENT
    
    # Home Assistant will automatically calculate statistics
    # instead of storing every single state

Network Efficiency

Batch API Requests

# ❌ Wrong: Multiple API calls
async def async_update_entities(entities):
    for entity in entities:
        data = await api.get_device_state(entity.device_id)
        entity.update_from_data(data)

# ✅ Correct: Single batch API call
async def async_update_entities(entities):
    device_ids = [e.device_id for e in entities]
    all_data = await api.get_multiple_device_states(device_ids)
    
    for entity in entities:
        data = all_data[entity.device_id]
        entity.update_from_data(data)

Connection Pooling

from homeassistant.helpers.aiohttp_client import async_get_clientsession

# ✅ Correct: Reuse session (connection pooling)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
    # This reuses connections automatically
    session = async_get_clientsession(hass)
    client = MyAPIClient(session)
    return True

# ❌ Wrong: New session for each request
import aiohttp

async def fetch_data():
    async with aiohttp.ClientSession() as session:  # New session each time!
        async with session.get(url) as response:
            return await response.json()

Compression

from homeassistant.helpers.aiohttp_client import async_get_clientsession

async def fetch_large_data(hass: HomeAssistant) -> bytes:
    """Fetch data with compression."""
    session = async_get_clientsession(hass)
    
    headers = {"Accept-Encoding": "gzip, deflate"}
    
    async with session.get(url, headers=headers) as response:
        # aiohttp automatically decompresses
        return await response.read()

CPU Optimization

Offload Heavy Computation

import numpy as np

async def process_image(hass: HomeAssistant, image_data: bytes) -> np.ndarray:
    """Process image in executor to avoid blocking."""
    
    def _process():
        # CPU-intensive image processing
        import cv2
        img = cv2.imdecode(np.frombuffer(image_data, np.uint8), cv2.IMREAD_COLOR)
        return cv2.resize(img, (640, 480))
    
    return await hass.async_add_executor_job(_process)

Lazy Loading

class MyIntegration:
    """Integration with lazy loading."""
    
    def __init__(self) -> None:
        self._heavy_library = None
    
    @property
    def heavy_library(self):
        """Lazy load heavy library."""
        if self._heavy_library is None:
            # Only import when actually needed
            import heavy_library
            self._heavy_library = heavy_library
        return self._heavy_library

Benchmarking

Measure Performance

import time
import logging

_LOGGER = logging.getLogger(__name__)

async def async_update(self):
    """Update with performance measurement."""
    start = time.monotonic()
    
    try:
        await self._async_update_internal()
    finally:
        duration = time.monotonic() - start
        if duration > 1.0:  # Warn if update takes > 1 second
            _LOGGER.warning(
                "Update took %.2fs, consider optimizing",
                duration
            )

Profile Code

# Use cProfile for performance analysis
python -m cProfile -o profile.stats script/benchmark/core.py

# Analyze results
import pstats
stats = pstats.Stats('profile.stats')
stats.sort_stats('cumulative')
stats.print_stats(20)

Common Performance Issues

Issue: Blocking the Event Loop

# ❌ Problem
async def async_update(self):
    time.sleep(5)  # Blocks entire Home Assistant!
    
# ✅ Solution
async def async_update(self):
    await asyncio.sleep(5)  # Allows other tasks to run

Issue: Too Frequent Updates

# ❌ Problem: Updates every second
update_interval = timedelta(seconds=1)

# ✅ Solution: Update only when needed
class MyEntity(Entity):
    _attr_should_poll = False  # Don't poll
    
    async def async_added_to_hass(self):
        # Subscribe to device updates instead
        self.async_on_remove(
            self.coordinator.async_add_listener(self.async_write_ha_state)
        )

Issue: Unbounded Memory Growth

# ❌ Problem: Unlimited list growth
self._all_events = []

def add_event(self, event):
    self._all_events.append(event)  # Grows forever!

# ✅ Solution: Use bounded collections
from collections import deque

self._recent_events = deque(maxlen=100)  # Keep only last 100

def add_event(self, event):
    self._recent_events.append(event)

Resources

Build docs developers (and LLMs) love