Multi-provider aggregation, parallel fetching, and intelligent fallback mechanisms
The Currency Converter API aggregates exchange rates from three external providers simultaneously, averages their responses for accuracy, and implements intelligent fallback logic to maintain service availability even when individual providers fail.
When a cache miss occurs, the system fetches rates from all providers simultaneously using asyncio.gather(). This parallel approach minimizes total request time.From application/services/rate_service.py:69:
async def _aggregate_rates(self, from_currency: str, to_currency: str) -> AggregatedRate: tasks = [] providers = [self.primary_provider] + self.secondary_providers for provider in providers: tasks.append(self._fetch_from_provider(provider, from_currency, to_currency)) results = await asyncio.gather(*tasks) rates: dict[str, Decimal] = {} for provider, rate in zip(providers, results, strict=False): if rate is not None: rates[provider.name] = rate if not rates: raise ProviderError(f'All providers failed for {from_currency} → {to_currency}') avg_rate = sum(rates.values()) / Decimal(len(rates)) return AggregatedRate( from_currency=from_currency, to_currency=to_currency, rate=avg_rate, timestamp=datetime.now(), sources=list(rates.keys()), individual_rates=rates, )
ProviderError indicates an API-level problem (invalid API key, malformed request, rate limit exceeded). Retrying these errors would waste time since they won’t succeed without external intervention.Network errors (ConnectionError, TimeoutError) are transient and often succeed on retry.
Each provider implements the protocol with its own API-specific logic. Here’s the Fixer.io implementation from infrastructure/providers/fixerio.py:8:
class FixerIOProvider: BASE_URL = 'http://data.fixer.io/api' def __init__(self, api_key: str, client: httpx.AsyncClient | None = None, timeout: int = 10): self.api_key = api_key self._client = client or httpx.AsyncClient(timeout=timeout) @property def name(self) -> str: return 'fixerio' async def _request(self, endpoint: str, params: dict) -> dict: params['access_key'] = self.api_key url = f'{self.BASE_URL}/{endpoint}' try: response = await self._client.get(url, params=params) response.raise_for_status() data = response.json() if not data.get('success', False): info = data.get('error', {}).get('info', 'Unknown error') raise ProviderError(f'Fixer.io API error: {info}') return data except httpx.HTTPStatusError as e: raise ProviderError( f'Fixer.io HTTP error {e.response.status_code}: {e.response.text[:200]}' ) from e except httpx.RequestError as e: raise ProviderError(f'Fixer.io request failed: {e.__class__.__name__}') from e except Exception as e: raise ProviderError(f'Fixer.io response parsing error: {str(e)}') from e async def fetch_rate(self, from_currency: str, to_currency: str) -> Decimal: data = await self._request('latest', {'base': from_currency, 'symbols': to_currency}) try: return Decimal(str(data['rates'][to_currency])) except KeyError as e: raise ProviderError(f'Missing rate for {to_currency}') from e async def fetch_supported_currencies(self) -> list[dict]: data = await self._request('symbols', {}) return [{'code': code, 'name': name} for code, name in data['symbols'].items()] async def close(self) -> None: await self._client.aclose()
Each provider owns its own httpx.AsyncClient and translates API-specific errors into domain exceptions (ProviderError). This isolation makes testing trivial through dependency injection.
At startup, the system fetches supported currencies from all providers and computes their intersection — only currencies supported by all providers are added to the database.From application/services/currency_service.py:17:
async def initialize_supported_currencies(self) -> None: logger.info('Initializing supported currencies...') provider_tasks = [provider.fetch_supported_currencies() for provider in self.providers] results = await asyncio.gather(*provider_tasks, return_exceptions=True) all_currencies = [] for i, result in enumerate(results): provider_name = self.providers[i].name if isinstance(result, Exception): logger.error(f'Failed to fetch currencies from {provider_name}: {result}') elif isinstance(result, list): all_currencies.append(set(c['code'] for c in result)) logger.info(f'{provider_name} supports {len(result)} currencies') if not all_currencies: raise ProviderError('Failed to fetch currencies from any provider') supported_codes = set.intersection(*all_currencies) currency_models = [SupportedCurrency(code=code, name=None) for code in supported_codes] await self.repository.save_supported_currencies(currency_models) logger.info(f'Saved {len(supported_codes)} supported currencies.')
Why use intersection instead of union?
Using the intersection ensures that any currency pair can be converted using any provider. If you used the union, some conversions might only work with specific providers, creating inconsistent behavior.This design prioritizes reliability over coverage.
Provider errors are mapped to HTTP status codes via exception handlers. Domain exceptions raised by providers propagate up through the application layer and are caught at the API layer.
Exception
HTTP Status
Client Message
ProviderError
503
”Exchange rate service unavailable”
InvalidCurrencyError
400
”Currency XYZ not supported”
CacheError
500
”Internal server error”
ProviderError messages are never exposed to clients. They may contain internal API details like error codes or provider-specific information.