Async Programming
Use Async/Await
Home Assistant is built on asyncio. Always use async functions:# ✅ Correct: Async function
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up from a config entry."""
client = await create_async_client(entry.data)
await client.connect()
return True
# ❌ Wrong: Blocking function
def setup_entry(hass, entry):
client = create_blocking_client(entry.data)
client.connect() # Blocks the event loop!
return True
Avoid Blocking I/O
Never use blocking I/O in async functions:import asyncio
from pathlib import Path
# ❌ Wrong: Blocking file I/O
async def load_config(path: str) -> dict:
with open(path) as f: # Blocks event loop!
return json.load(f)
# ✅ Correct: Use executor for blocking I/O
async def load_config(hass: HomeAssistant, path: str) -> dict:
def _load():
with open(path) as f:
return json.load(f)
return await hass.async_add_executor_job(_load)
# ✅ Better: Use async file I/O
import aiofiles
async def load_config(path: str) -> dict:
async with aiofiles.open(path) as f:
content = await f.read()
return json.loads(content)
Use aiohttp for HTTP Requests
Always use aiohttp instead of requests:from homeassistant.helpers.aiohttp_client import async_get_clientsession
# ✅ Correct: Async HTTP with aiohttp
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
session = async_get_clientsession(hass)
async with session.get(f"https://api.example.com/status") as response:
data = await response.json()
return True
# ❌ Wrong: Blocking HTTP with requests
import requests
async def async_setup_entry(hass, entry):
response = requests.get("https://api.example.com/status") # Blocks!
data = response.json()
return True
Timeouts
Always use timeouts for network operations:import asyncio
from aiohttp import ClientTimeout
# ✅ Correct: With timeout
async def fetch_data(session):
try:
async with asyncio.timeout(30):
async with session.get(url) as response:
return await response.json()
except asyncio.TimeoutError:
_LOGGER.warning("Request timed out")
raise
# ✅ Also correct: aiohttp timeout
async def fetch_data(session):
timeout = ClientTimeout(total=30)
async with session.get(url, timeout=timeout) as response:
return await response.json()
Data Updates
Use DataUpdateCoordinator
Coordinators prevent redundant API calls:from homeassistant.helpers.update_coordinator import (
DataUpdateCoordinator,
UpdateFailed,
)
from datetime import timedelta
class MyCoordinator(DataUpdateCoordinator):
"""Coordinator for fetching data."""
def __init__(self, hass: HomeAssistant, client) -> None:
"""Initialize coordinator."""
super().__init__(
hass,
_LOGGER,
name="My Integration",
# Update every 30 seconds
update_interval=timedelta(seconds=30),
)
self.client = client
async def _async_update_data(self):
"""Fetch data from API."""
try:
# Single API call for all entities
return await self.client.fetch_all_data()
except Exception as err:
raise UpdateFailed(f"Error fetching data: {err}") from err
Appropriate Polling Intervals
Choose reasonable update intervals:# ✅ Good: Reasonable intervals based on device type
# Fast-changing data (power consumption)
update_interval=timedelta(seconds=30)
# Moderate data (temperature)
update_interval=timedelta(minutes=5)
# Slow-changing data (daily statistics)
update_interval=timedelta(hours=1)
# ❌ Bad: Too frequent
update_interval=timedelta(seconds=1) # Wastes resources!
Parallel Updates Protection
Prevent concurrent entity updates:from homeassistant.helpers.entity import Entity
class MyEntity(Entity):
"""Entity with update protection."""
# Limit to 1 concurrent update
_attr_parallel_updates = 1
async def async_turn_on(self, **kwargs):
"""Turn on device."""
await self.device.turn_on()
await self.coordinator.async_request_refresh()
PARALLEL_UPDATES at the platform level:
# In sensor.py, light.py, etc.
PARALLEL_UPDATES = 1
Efficient Data Structures
Use Runtime Data
Avoid storing data inhass.data:
# ✅ Correct: Type-safe runtime data
from homeassistant.config_entries import ConfigEntry
type MyConfigEntry = ConfigEntry[MyCoordinator]
async def async_setup_entry(hass: HomeAssistant, entry: MyConfigEntry) -> bool:
coordinator = MyCoordinator(hass)
await coordinator.async_config_entry_first_refresh()
# Store in runtime_data
entry.runtime_data = coordinator
return True
# Access in platform
async def async_setup_entry(
hass: HomeAssistant,
entry: MyConfigEntry,
async_add_entities,
):
coordinator = entry.runtime_data
async_add_entities([MyEntity(coordinator)])
# ❌ Wrong: Using hass.data
async def async_setup_entry(hass, entry):
hass.data.setdefault(DOMAIN, {})
hass.data[DOMAIN][entry.entry_id] = coordinator
Cache Expensive Operations
from functools import lru_cache
from homeassistant.util import slugify
# Cache pure functions
@lru_cache(maxsize=128)
def get_entity_id(domain: str, name: str) -> str:
"""Generate entity ID."""
return f"{domain}.{slugify(name)}"
# Use cached_property for expensive properties
from functools import cached_property
class MyDevice:
@cached_property
def capabilities(self) -> dict:
"""Get device capabilities (computed once)."""
return self._compute_capabilities()
Memory Management
Avoid Memory Leaks
Clean up resources properly:class MyCoordinator(DataUpdateCoordinator):
"""Coordinator with proper cleanup."""
def __init__(self, hass: HomeAssistant, client) -> None:
super().__init__(hass, _LOGGER, name=DOMAIN)
self.client = client
self._listeners = []
async def async_shutdown(self):
"""Shutdown coordinator."""
# Remove listeners
for unlisten in self._listeners:
unlisten()
self._listeners.clear()
# Close client connection
await self.client.close()
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload config entry."""
coordinator = entry.runtime_data
# Cleanup before unloading
await coordinator.async_shutdown()
return await hass.config_entries.async_unload_platforms(entry, PLATFORMS)
Limit Data Storage
from collections import deque
class MyEntity(Entity):
"""Entity with limited history."""
def __init__(self) -> None:
# Store only last 100 values, not unlimited
self._history = deque(maxlen=100)
async def async_update(self):
value = await self.coordinator.get_value()
self._history.append(value)
Database Performance
Exclude Unnecessary Data
Don’t store rapidly changing data:from homeassistant.helpers.entity import Entity
class MySensor(Entity):
"""Sensor that updates frequently."""
# Don't record high-frequency updates to database
_attr_should_poll = False
# For diagnostic data that changes often
@property
def extra_state_attributes(self) -> dict:
return {
# This changes every second - don't record
"last_update": self._last_update.isoformat(),
# But this is stable - OK to record
"device_model": self._device_model,
}
Use State Classes
from homeassistant.components.sensor import (
SensorEntity,
SensorStateClass,
)
class MySensor(SensorEntity):
"""Sensor with state class for statistics."""
_attr_state_class = SensorStateClass.MEASUREMENT
# Home Assistant will automatically calculate statistics
# instead of storing every single state
Network Efficiency
Batch API Requests
# ❌ Wrong: Multiple API calls
async def async_update_entities(entities):
for entity in entities:
data = await api.get_device_state(entity.device_id)
entity.update_from_data(data)
# ✅ Correct: Single batch API call
async def async_update_entities(entities):
device_ids = [e.device_id for e in entities]
all_data = await api.get_multiple_device_states(device_ids)
for entity in entities:
data = all_data[entity.device_id]
entity.update_from_data(data)
Connection Pooling
from homeassistant.helpers.aiohttp_client import async_get_clientsession
# ✅ Correct: Reuse session (connection pooling)
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
# This reuses connections automatically
session = async_get_clientsession(hass)
client = MyAPIClient(session)
return True
# ❌ Wrong: New session for each request
import aiohttp
async def fetch_data():
async with aiohttp.ClientSession() as session: # New session each time!
async with session.get(url) as response:
return await response.json()
Compression
from homeassistant.helpers.aiohttp_client import async_get_clientsession
async def fetch_large_data(hass: HomeAssistant) -> bytes:
"""Fetch data with compression."""
session = async_get_clientsession(hass)
headers = {"Accept-Encoding": "gzip, deflate"}
async with session.get(url, headers=headers) as response:
# aiohttp automatically decompresses
return await response.read()
CPU Optimization
Offload Heavy Computation
import numpy as np
async def process_image(hass: HomeAssistant, image_data: bytes) -> np.ndarray:
"""Process image in executor to avoid blocking."""
def _process():
# CPU-intensive image processing
import cv2
img = cv2.imdecode(np.frombuffer(image_data, np.uint8), cv2.IMREAD_COLOR)
return cv2.resize(img, (640, 480))
return await hass.async_add_executor_job(_process)
Lazy Loading
class MyIntegration:
"""Integration with lazy loading."""
def __init__(self) -> None:
self._heavy_library = None
@property
def heavy_library(self):
"""Lazy load heavy library."""
if self._heavy_library is None:
# Only import when actually needed
import heavy_library
self._heavy_library = heavy_library
return self._heavy_library
Benchmarking
Measure Performance
import time
import logging
_LOGGER = logging.getLogger(__name__)
async def async_update(self):
"""Update with performance measurement."""
start = time.monotonic()
try:
await self._async_update_internal()
finally:
duration = time.monotonic() - start
if duration > 1.0: # Warn if update takes > 1 second
_LOGGER.warning(
"Update took %.2fs, consider optimizing",
duration
)
Profile Code
# Use cProfile for performance analysis
python -m cProfile -o profile.stats script/benchmark/core.py
# Analyze results
import pstats
stats = pstats.Stats('profile.stats')
stats.sort_stats('cumulative')
stats.print_stats(20)
Common Performance Issues
Issue: Blocking the Event Loop
# ❌ Problem
async def async_update(self):
time.sleep(5) # Blocks entire Home Assistant!
# ✅ Solution
async def async_update(self):
await asyncio.sleep(5) # Allows other tasks to run
Issue: Too Frequent Updates
# ❌ Problem: Updates every second
update_interval = timedelta(seconds=1)
# ✅ Solution: Update only when needed
class MyEntity(Entity):
_attr_should_poll = False # Don't poll
async def async_added_to_hass(self):
# Subscribe to device updates instead
self.async_on_remove(
self.coordinator.async_add_listener(self.async_write_ha_state)
)
Issue: Unbounded Memory Growth
# ❌ Problem: Unlimited list growth
self._all_events = []
def add_event(self, event):
self._all_events.append(event) # Grows forever!
# ✅ Solution: Use bounded collections
from collections import deque
self._recent_events = deque(maxlen=100) # Keep only last 100
def add_event(self, event):
self._recent_events.append(event)