Authentication & Credentials
Never Log Credentials
Never log passwords, tokens, or API keys:import logging
_LOGGER = logging.getLogger(__name__)
# ❌ Wrong: Logs entire config including password
_LOGGER.debug("Config: %s", config)
# ✅ Correct: Redact sensitive data
_LOGGER.debug(
"Config: username=%s, host=%s",
config["username"],
config["host"]
)
# ✅ Also correct: Truncate tokens
token = config["token"]
_LOGGER.debug("Using token: %s...", token[:8])
Secure Credential Storage
Home Assistant encrypts config entry data automatically:from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up from config entry."""
# Data is encrypted at rest
username = entry.data["username"]
password = entry.data["password"] # Encrypted in storage
api_key = entry.data["api_key"] # Encrypted in storage
# Use credentials to create client
client = await create_client(username, password, api_key)
return True
OAuth2 Integration
Use OAuth2 for supported services:from homeassistant.helpers import config_entry_oauth2_flow
from homeassistant.helpers.aiohttp_client import async_get_clientsession
class MyOAuth2Implementation(config_entry_oauth2_flow.AbstractOAuth2Implementation):
"""OAuth2 implementation."""
@property
def name(self) -> str:
return "My Service"
@property
def domain(self) -> str:
return DOMAIN
async def async_generate_authorize_url(self, flow_id: str) -> str:
"""Generate authorization URL."""
return f"https://api.example.com/oauth/authorize?client_id={self.client_id}"
async def async_resolve_external_data(self, external_data: Any) -> dict:
"""Resolve authorization code to tokens."""
session = async_get_clientsession(self.hass)
async with session.post(
"https://api.example.com/oauth/token",
data={
"code": external_data["code"],
"client_id": self.client_id,
"client_secret": self.client_secret,
},
) as response:
return await response.json()
Token Refresh
Handle token expiration gracefully:from homeassistant.exceptions import ConfigEntryAuthFailed
class MyCoordinator(DataUpdateCoordinator):
"""Coordinator with token refresh."""
async def _async_update_data(self):
"""Fetch data from API."""
try:
return await self.client.fetch_data()
except TokenExpiredError:
# Try to refresh token
try:
await self.client.refresh_token()
return await self.client.fetch_data()
except RefreshFailedError as err:
# Refresh failed - need reauthentication
raise ConfigEntryAuthFailed(
"Token refresh failed, please reauthenticate"
) from err
Input Validation
Validate All User Input
Never trust user input:import voluptuous as vol
from homeassistant.helpers import config_validation as cv
# Use voluptuous for validation
DATA_SCHEMA = vol.Schema({
vol.Required("host"): cv.string,
vol.Required("port"): cv.port,
vol.Required("username"): cv.string,
vol.Required("password"): cv.string,
vol.Optional("ssl", default=True): cv.boolean,
})
# In config flow
class MyConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
async def async_step_user(self, user_input=None):
if user_input is not None:
# Validate input
try:
validated = DATA_SCHEMA(user_input)
except vol.Invalid as err:
return self.async_show_form(
step_id="user",
data_schema=DATA_SCHEMA,
errors={"base": "invalid_input"},
)
# Use validated data
return await self._async_create_entry(validated)
Sanitize File Paths
from pathlib import Path
# ❌ Wrong: Path traversal vulnerability
def load_file(filename: str) -> bytes:
path = f"/config/files/{filename}"
with open(path, "rb") as f:
return f.read()
# User could pass: filename="../../secrets.yaml"
# ✅ Correct: Validate path stays within allowed directory
def load_file(hass: HomeAssistant, filename: str) -> bytes:
# Ensure filename is safe
safe_name = Path(filename).name # Strips directory components
base_dir = Path(hass.config.path("files"))
file_path = base_dir / safe_name
# Verify path is within base directory
try:
file_path.resolve().relative_to(base_dir.resolve())
except ValueError:
raise ValueError("Invalid file path")
with open(file_path, "rb") as f:
return f.read()
Prevent Command Injection
import shlex
import subprocess
# ❌ Wrong: Command injection vulnerability
def run_command(user_input: str):
subprocess.run(f"ping {user_input}", shell=True) # Dangerous!
# User could pass: user_input="example.com; rm -rf /"
# ✅ Correct: Use argument list, no shell
def run_command(host: str):
# Validate input
if not host.replace(".", "").replace("-", "").isalnum():
raise ValueError("Invalid host")
# Use argument list, not shell
subprocess.run(["ping", "-c", "1", host], shell=False)
Network Security
Use HTTPS
Always use HTTPS for API calls:from homeassistant.helpers.aiohttp_client import async_get_clientsession
import ssl
# ✅ Correct: HTTPS with certificate validation
async def fetch_data(hass: HomeAssistant) -> dict:
session = async_get_clientsession(hass)
# Default: validates certificates
async with session.get("https://api.example.com/data") as response:
return await response.json()
# ❌ Wrong: HTTP or disabled verification
async def fetch_data(hass):
session = async_get_clientsession(hass, verify_ssl=False) # Insecure!
async with session.get("http://api.example.com/data") as response: # Insecure!
return await response.json()
Certificate Validation
For self-signed certificates, allow user configuration:import ssl
import aiohttp
from homeassistant.helpers.aiohttp_client import async_create_clientsession
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up with optional certificate validation."""
if entry.options.get("verify_ssl", True):
# Default: validate certificates
session = async_get_clientsession(hass)
else:
# User explicitly disabled verification
# Show warning
_LOGGER.warning(
"SSL certificate verification is disabled for %s. "
"This is insecure and should only be used for testing.",
entry.title,
)
connector = aiohttp.TCPConnector(ssl=False)
session = async_create_clientsession(hass, connector=connector)
client = MyAPIClient(session, entry.data["host"])
return True
Rate Limiting
Protect against abuse:import asyncio
from datetime import datetime, timedelta
from collections import deque
class RateLimiter:
"""Rate limiter for API calls."""
def __init__(self, max_calls: int, period: timedelta) -> None:
"""Initialize rate limiter."""
self.max_calls = max_calls
self.period = period
self.calls = deque()
self._lock = asyncio.Lock()
async def acquire(self) -> None:
"""Acquire permission to make a call."""
async with self._lock:
now = datetime.now()
cutoff = now - self.period
# Remove old calls
while self.calls and self.calls[0] < cutoff:
self.calls.popleft()
# Check if we're at limit
if len(self.calls) >= self.max_calls:
# Calculate wait time
oldest = self.calls[0]
wait = (oldest + self.period - now).total_seconds()
if wait > 0:
await asyncio.sleep(wait)
self.calls.append(now)
# Usage
rate_limiter = RateLimiter(max_calls=10, period=timedelta(minutes=1))
async def api_call():
await rate_limiter.acquire()
# Make API call
Code Security
Avoid eval() and exec()
Never use eval() or exec() with user input:# ❌ Wrong: Arbitrary code execution
def calculate(expression: str) -> float:
return eval(expression) # User could pass: "__import__('os').system('rm -rf /')"
# ✅ Correct: Use safe alternatives
import ast
import operator
SAFE_OPERATORS = {
ast.Add: operator.add,
ast.Sub: operator.sub,
ast.Mult: operator.mul,
ast.Div: operator.truediv,
}
def calculate(expression: str) -> float:
"""Safely evaluate mathematical expression."""
try:
tree = ast.parse(expression, mode='eval')
# Validate only safe operations
for node in ast.walk(tree):
if isinstance(node, ast.operator) and type(node) not in SAFE_OPERATORS:
raise ValueError("Unsafe operation")
return _eval_node(tree.body)
except Exception as err:
raise ValueError(f"Invalid expression: {err}") from err
Secure Deserialization
import json
import pickle
# ❌ Wrong: pickle is unsafe with untrusted data
def load_data(data: bytes):
return pickle.loads(data) # Can execute arbitrary code!
# ✅ Correct: Use safe formats like JSON
def load_data(data: str) -> dict:
return json.loads(data)
# For YAML, use safe_load
import yaml
def load_yaml(data: str) -> dict:
return yaml.safe_load(data) # Not yaml.load()!
Dependency Security
Keep dependencies updated:// manifest.json
{
"domain": "my_integration",
"requirements": [
"my-library==2.1.0" // Pin versions for security
]
}
# Check for known vulnerabilities
pip-audit
# Keep dependencies updated
python3 -m script.gen_requirements_all
Local Network Security
Discover Devices Safely
from homeassistant.components import zeroconf
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up component with device discovery."""
async def async_discover_devices(service_info: zeroconf.ZeroconfServiceInfo):
"""Discover devices on local network."""
# Validate discovered device before connecting
if not _is_valid_device(service_info):
return
# Create config flow for discovered device
await hass.config_entries.flow.async_init(
DOMAIN,
context={"source": config_entries.SOURCE_ZEROCONF},
data=service_info,
)
# Register discovery handler
await zeroconf.async_register_discovery(
hass,
ZEROCONF_TYPE,
async_discover_devices,
)
return True
Validate Local Connections
import ipaddress
def is_safe_local_address(address: str) -> bool:
"""Check if address is a safe local address."""
try:
ip = ipaddress.ip_address(address)
# Allow local addresses
if ip.is_private or ip.is_loopback:
return True
# Reject public IPs without explicit user consent
_LOGGER.warning(
"Attempted connection to public IP %s, rejecting for security",
address
)
return False
except ValueError:
# Not an IP address, might be hostname
return True # Let DNS resolution handle it
Webhook Security
Validate Webhook Signatures
import hmac
import hashlib
from aiohttp.web import Request, Response
async def handle_webhook(
hass: HomeAssistant,
webhook_id: str,
request: Request,
) -> Response:
"""Handle webhook with signature validation."""
# Get signature from header
signature = request.headers.get("X-Signature")
if not signature:
return Response(text="Missing signature", status=401)
# Get webhook secret
entry = hass.config_entries.async_get_entry(webhook_id)
secret = entry.data["webhook_secret"]
# Read request body
body = await request.read()
# Compute expected signature
expected = hmac.new(
secret.encode(),
body,
hashlib.sha256
).hexdigest()
# Constant-time comparison
if not hmac.compare_digest(signature, expected):
_LOGGER.warning("Invalid webhook signature")
return Response(text="Invalid signature", status=401)
# Process webhook
return Response(text="OK")
Data Privacy
Respect User Privacy
# Don't collect unnecessary data
class MyEntity(Entity):
@property
def extra_state_attributes(self) -> dict:
return {
# ✅ OK: Device information
"model": self.device.model,
"firmware": self.device.firmware,
# ❌ Wrong: Personal information
# "user_email": self.device.user_email,
# "location_history": self.device.locations,
}
GDPR Compliance
Allow users to export and delete their data:async def async_remove_config_entry(hass: HomeAssistant, entry: ConfigEntry) -> None:
"""Remove config entry and delete all user data."""
coordinator = entry.runtime_data
# Delete data from cloud service
await coordinator.client.delete_account_data()
# Clean up local storage
store = hass.helpers.storage.Store(STORAGE_VERSION, STORAGE_KEY)
await store.async_remove()
_LOGGER.info("All user data has been deleted")
Security Checklist
Before releasing your integration:- No credentials in logs
- All credentials encrypted in config entries
- Input validation on all user data
- HTTPS with certificate validation
- No eval() or exec() with user input
- Safe deserialization (JSON, not pickle)
- Rate limiting on API calls
- Dependencies pinned and audited
- Webhook signature validation
- Minimal data collection
- Data deletion on config entry removal
- No hardcoded credentials
- SQL injection prevention (if using SQL)
- XSS prevention (if generating HTML)
Reporting Security Issues
If you discover a security vulnerability:- Do not open a public issue
- Email [email protected]
- Include:
- Description of vulnerability
- Steps to reproduce
- Potential impact
- Suggested fix (if any)