Skip to main content
The AsyncESIOSClient provides an async interface for the ESIOS API, enabling concurrent requests and integration with async frameworks like FastAPI, aiohttp, and asyncio applications.

When to Use Async

Use the async client when:
  • Building async web services (FastAPI, aiohttp, Sanic)
  • Fetching multiple indicators concurrently
  • Integrating with async workflows (Celery, Prefect, Airflow)
  • Need non-blocking I/O in event loops
For simple scripts or Jupyter notebooks, the synchronous client is simpler.

Basic Usage

The async client requires context manager usage:
import asyncio
from esios import AsyncESIOSClient

async def main():
    async with AsyncESIOSClient(token="your_api_key") as client:
        # Make async API calls
        data = await client.get("indicators/600")
        print(data)

asyncio.run(main())
The AsyncESIOSClient currently provides low-level get() and download() methods. High-level managers (indicators, archives) are not yet implemented for the async client.

Making API Requests

The async client provides get() for JSON endpoints:
async with AsyncESIOSClient(token="your_api_key") as client:
    # Get indicator metadata
    response = await client.get("indicators/600")
    indicator = response["indicator"]
    print(indicator["name"])
    
    # Get historical data
    params = {
        "start_date": "2024-01-01",
        "end_date": "2024-01-31T23:59:59"
    }
    response = await client.get("indicators/600", params=params)
    values = response["indicator"]["values"]
    print(f"Retrieved {len(values)} data points")

Concurrent Requests

Fetch multiple resources in parallel with asyncio.gather():
import asyncio
from esios import AsyncESIOSClient

async def fetch_multiple_indicators():
    async with AsyncESIOSClient(token="your_api_key") as client:
        # Fetch 3 indicators concurrently
        results = await asyncio.gather(
            client.get("indicators/600"),
            client.get("indicators/1001"),
            client.get("indicators/1293")
        )
        
        for result in results:
            indicator = result["indicator"]
            print(f"{indicator['id']}: {indicator['name']}")

asyncio.run(fetch_multiple_indicators())
This is significantly faster than sequential requests with the sync client.
For maximum throughput, batch 5-10 concurrent requests at a time. Too many concurrent connections may trigger rate limiting.

Downloading Files

Use download() for binary content (archives):
async with AsyncESIOSClient(token="your_api_key") as client:
    # First, configure the archive to get download URL
    response = await client.get(
        "archives/34",
        params={"date": "2024-01-15T00:00:00", "date_type": "datos"}
    )
    
    download_url = response["archive"]["download"]["url"]
    full_url = "https://api.esios.ree.es" + download_url
    
    # Download the file
    content = await client.download(full_url)
    
    # Write to disk
    with open("I90DIA_20240115.xls", "wb") as f:
        f.write(content)
    
    print(f"Downloaded {len(content)} bytes")

Error Handling

Handle async exceptions:
from esios import AsyncESIOSClient, APIResponseError, NetworkError

async def safe_fetch():
    async with AsyncESIOSClient(token="your_api_key") as client:
        try:
            data = await client.get("indicators/999999")  # Non-existent
        except APIResponseError as e:
            print(f"API error {e.status_code}: {e}")
        except NetworkError as e:
            print(f"Network error: {e}")

asyncio.run(safe_fetch())

Retry Logic

The async client includes automatic retries with exponential backoff:
async with AsyncESIOSClient(token="your_api_key") as client:
    # Retries up to 3 times on APIResponseError or NetworkError
    data = await client.get("indicators/600")
Retry configuration (from source):
  • Max retries: 3 attempts
  • Backoff: Exponential with min 1s, max 10s
  • Retryable errors: APIResponseError, NetworkError
Retries are handled transparently. You’ll only see an exception if all retry attempts fail.

Timeout Configuration

Customize request timeout:
# Default timeout: 30 seconds
client = AsyncESIOSClient(
    token="your_api_key",
    timeout=60.0  # 60 second timeout
)

async with client:
    # Long-running request won't timeout for 60 seconds
    data = await client.get("indicators/600", params={...})

Integration with FastAPI

Use dependency injection to manage client lifecycle:
from fastapi import FastAPI, Depends
from esios import AsyncESIOSClient
import os

app = FastAPI()

async def get_client():
    client = AsyncESIOSClient(token=os.getenv("ESIOS_API_KEY"))
    try:
        yield client
    finally:
        await client.close()

@app.get("/indicators/{indicator_id}")
async def get_indicator(
    indicator_id: int,
    client: AsyncESIOSClient = Depends(get_client)
):
    data = await client.get(f"indicators/{indicator_id}")
    return data["indicator"]

@app.get("/prices/history")
async def get_prices(
    start: str,
    end: str,
    client: AsyncESIOSClient = Depends(get_client)
):
    params = {
        "start_date": start,
        "end_date": end + "T23:59:59"
    }
    data = await client.get("indicators/600", params=params)
    return {"values": data["indicator"]["values"]}

Integration with aiohttp

Use the async client in aiohttp request handlers:
from aiohttp import web
from esios import AsyncESIOSClient
import os

async def handle_indicator(request):
    indicator_id = request.match_info["id"]
    
    async with AsyncESIOSClient(token=os.getenv("ESIOS_API_KEY")) as client:
        data = await client.get(f"indicators/{indicator_id}")
        return web.json_response(data["indicator"])

app = web.Application()
app.add_routes([
    web.get("/indicators/{id}", handle_indicator)
])

if __name__ == "__main__":
    web.run_app(app, port=8080)

Batch Processing with Semaphore

Limit concurrent requests to avoid rate limiting:
import asyncio
from esios import AsyncESIOSClient

async def fetch_with_limit():
    indicator_ids = list(range(1000, 1100))  # 100 indicators
    
    async with AsyncESIOSClient(token="your_api_key") as client:
        # Limit to 5 concurrent requests
        semaphore = asyncio.Semaphore(5)
        
        async def fetch_one(indicator_id):
            async with semaphore:
                return await client.get(f"indicators/{indicator_id}")
        
        results = await asyncio.gather(
            *[fetch_one(iid) for iid in indicator_ids],
            return_exceptions=True
        )
        
        # Filter successful results
        successful = [
            r for r in results
            if not isinstance(r, Exception)
        ]
        
        print(f"Successfully fetched {len(successful)}/{len(indicator_ids)}")

asyncio.run(fetch_with_limit())

Manual Lifecycle Management

If you can’t use a context manager:
client = AsyncESIOSClient(token="your_api_key")

try:
    data = await client.get("indicators/600")
    print(data)
finally:
    await client.close()  # Important: always close
Always call close() when done to release HTTP connections. Prefer using async with to ensure cleanup.

Differences from Sync Client

Key differences between AsyncESIOSClient and ESIOSClient:
FeatureSync ClientAsync Client
Context managerOptionalRequired
High-level managersindicators, archives❌ Not yet implemented
Method callsclient.method()await client.method()
Concurrent requests❌ Sequentialasyncio.gather()
Framework integrationSimple scripts, JupyterFastAPI, aiohttp, async apps
Caching✅ Full support❌ Must implement manually

Future: Async Managers

Full async manager support is planned:
# Future API (not yet implemented)
async with AsyncESIOSClient(token="...") as client:
    handle = await client.indicators.get(600)
    df = await handle.historical("2024-01-01", "2024-01-31")
Currently, use the low-level get() and download() methods and parse responses manually.

Next Steps

Build docs developers (and LLMs) love