Local parquet cache for fast, gap-aware historical data retrieval
The caching system stores indicator time-series data as parquet files, fetching only missing date ranges on subsequent requests. Historical electricity data is immutable once published, making aggressive caching safe and enabled by default.
from esios import ESIOSClientwith ESIOSClient(cache=True) as client: handle = client.indicators.get(600) # First call: fetches from API, writes to cache df = handle.historical("2025-01-01", "2025-01-07") # Second call: reads from cache (instant) df = handle.historical("2025-01-01", "2025-01-07")
Caching is enabled by default. Disable it with cache=False.
# Cache stores columns as geo_id (internal format)# 3 8741 8826# datetime # 2025-01-01 00:00:00 45.23 42.15 48.90# 2025-01-01 01:00:00 43.12 41.05 46.23# 2025-01-01 02:00:00 NaN NaN NaN ← Gap in cache# User sees columns as geo_name (output format)# España Portugal Francia# datetime # 2025-01-01 00:00:00 45.23 42.15 48.90# 2025-01-01 01:00:00 43.12 41.05 46.23
From src/esios/managers/indicators.py:286-316:
def _finalize(self, df: pd.DataFrame) -> pd.DataFrame: """Prepare DataFrame for user-facing output. Cache stores columns as str(geo_id). This method renames them to human-readable geo_names at the very end, just before returning to the caller. Single-value/single-geo indicators get the indicator ID. """ if df.empty: return df if len(df.columns) == 1: col = df.columns[0] if col == "value": df = df.rename(columns={"value": str(self.id)}) return df # Rename str(geo_id) columns → geo_name geo_map = self._build_geo_map() # str(geo_id) → geo_name rename = {col: geo_map[col] for col in df.columns if col in geo_map} if rename: df = df.rename(columns=rename)
Columns are stored as geo_id strings in cache for stability. They’re renamed to geo_name only when returned to the user.
The cache intelligently detects missing date ranges and fetches only what’s needed:
with ESIOSClient() as client: handle = client.indicators.get(600) # Cache: Jan 1-7 df = handle.historical("2025-01-01", "2025-01-07") # Request: Jan 1-15 → fetches only Jan 8-15 from API df = handle.historical("2025-01-01", "2025-01-15")
with ESIOSClient() as client: handle = client.indicators.get(600) # Cache Spain data for Jan 1-7 df = handle.historical("2025-01-01", "2025-01-07", geo_ids=[3]) # Request Portugal → fetches full range (cache has no Portugal column) df = handle.historical("2025-01-01", "2025-01-07", geo_ids=[8741]) # Request Spain again → uses cache (has Spain column) df = handle.historical("2025-01-01", "2025-01-07", geo_ids=[3])
From src/esios/cache.py:266-276:
# If specific columns requested, check only those (per-geo gap detection)if columns: missing = [c for c in columns if c not in cached_df.columns] if missing: return [DateRange(start, end)] mask = cached_df[columns].notna().all(axis=1) effective_df = cached_df[mask] if effective_df.empty: return [DateRange(start, end)]else: effective_df = cached_df
Filter to specific geographies with geo_ids to optimize cache usage. Requesting all geographies every time keeps the cache complete.
Data within cache_recent_ttl hours of now is re-fetched:
with ESIOSClient(cache_recent_ttl=48) as client: # Data older than 48 hours is cached indefinitely # Data within 48 hours is re-fetched to get updates df = client.indicators.get(600).historical("2025-01-01", "2025-01-07")
Default: 48 hoursFrom src/esios/cache.py:52-53:
# Data older than this (hours) is considered final and won't be re-fetched_DEFAULT_RECENT_TTL_HOURS = 48
ESIOS updates recent data as measurements are finalized. The 48-hour TTL ensures you get corrected values.
with ESIOSClient() as client: # First call: fetches metadata from API handle = client.indicators.get(600) # Within 7 days: uses cached metadata handle = client.indicators.get(600)
Default: 7 daysFrom src/esios/cache.py:55-57:
# TTL for metadata caches_DEFAULT_META_TTL_DAYS = 7_DEFAULT_CATALOG_TTL_HOURS = 24
with ESIOSClient() as client: # First call: fetches from API indicators = client.indicators.list() # Within 24 hours: uses cached list indicators = client.indicators.list()
from esios.cache import CacheStore, CacheConfigimport pandas as pdconfig = CacheConfig(enabled=True)cache = CacheStore(config)# Read cached data for a date rangedf = cache.read( endpoint="indicators", indicator_id=600, start=pd.Timestamp("2025-01-01"), end=pd.Timestamp("2025-01-07"), columns=["3", "8741"], # Optional: filter to specific geo_ids)
# Write new data (merges with existing)cache.write( endpoint="indicators", indicator_id=600, df=df, # Wide-format DataFrame with DatetimeIndex)
From src/esios/cache.py:204-237:
def write( self, endpoint: str, indicator_id: int, df: pd.DataFrame,) -> None: """Merge new wide-format data with existing cache and persist. *df* should already be in wide format (columns = geo_names or "value", index = DatetimeIndex). New data is merged with existing using ``combine_first`` so that new values fill in NaN cells without overwriting existing data, then overlapping rows use the new values. """ if df.empty: return path = self._parquet_path(endpoint, indicator_id) path.parent.mkdir(parents=True, exist_ok=True) # Read existing and merge existing = pd.DataFrame() if path.exists(): try: existing = pd.read_parquet(path) except Exception: logger.warning("Corrupted cache at %s — overwriting.", path) if not existing.empty: merged = df.combine_first(existing) merged = merged.sort_index() else: merged = df.sort_index() _atomic_write_parquet(path, merged)
Writes are atomic (temp file + rename). The cache remains consistent even if the process is interrupted.
# Disable globallywith ESIOSClient(cache=False) as client: df = client.indicators.get(600).historical("2025-01-01", "2025-01-07") # Always fetches from API
Disabling cache significantly increases API calls and response times. Only disable if you need real-time data or are debugging.
with ESIOSClient() as client: handle = client.indicators.get(600) # Cache disabled: aggregation in use df = handle.historical( "2025-01-01", "2025-01-31", time_agg="sum", time_trunc="day" )
From src/esios/managers/indicators.py:180-183:
# -- Cache-aware fetch -------------------------------------------------cache = self._cacheuse_cache = cache.config.enabled and not time_agg and not geo_agg
The registry is automatically enriched as you fetch data:
with ESIOSClient() as client: # First fetch learns geo mappings from API response df = client.indicators.get(600).historical("2025-01-01", "2025-01-07") # Mappings are persisted to geos.json geos = client.cache.read_geos() print(geos) # Includes all discovered geographies
From src/esios/cache.py:326-343:
def merge_geos(self, geos: dict[str, str]) -> None: """Merge new geo_id → geo_name mappings into the global registry. Existing mappings are preserved; new ones are added. """ if not geos: return existing = self.read_geos() existing.update(geos) data = { "version": 1, "updated_at": datetime.now().isoformat(), "geos": existing, } path = self._geos_path() _atomic_write_json(path, data)
The global registry improves geo name resolution across all indicators, even if an indicator’s metadata is incomplete.
The cache automatically migrates from old layouts:
# Old layout (pre-v0.2.0)~/.cache/esios/indicators/600.parquet# New layout (v0.2.0+)~/.cache/esios/indicators/600/data.parquet~/.cache/esios/indicators/600/meta.json
From src/esios/cache.py:124-144:
def _maybe_migrate(self, endpoint: str, item_id: int) -> None: """Auto-migrate old flat cache files to new directory layout. Old layout: ``{cache_dir}/{endpoint}/{item_id}.parquet`` New layout: ``{cache_dir}/{endpoint}/{item_id}/data.parquet`` """ old_path = self.config.cache_dir / endpoint / f"{item_id}.parquet" if not old_path.exists(): return new_path = self._parquet_path(endpoint, item_id) if new_path.exists(): # New layout already has data — just remove old file old_path.unlink() logger.info("Removed old cache file %s (already migrated).", old_path) return # Move old flat file into new directory layout new_path.parent.mkdir(parents=True, exist_ok=True) old_path.rename(new_path) logger.info("Migrated cache %s → %s", old_path, new_path)
Migration is automatic and transparent. No manual action required.
from esios import ESIOSClientfrom esios.cache import CacheConfigfrom pathlib import Pathconfig = CacheConfig( enabled=True, cache_dir=Path("/custom/cache/dir"), recent_ttl_hours=24, meta_ttl_days=14, catalog_ttl_hours=48,)# Pass config to client (not currently supported — use constructor params)# This is for internal use or testing