Skip to main content

Ergast API Rate Limits

The Ergast F1 API enforces the following rate limits to ensure fair usage:
  • Burst Limit: 4 requests per second
  • Sustained Limit: 200 requests per hour
Exceeding these limits will result in HTTP 429 (Too Many Requests) responses. Your requests will be throttled until the rate limit window resets.

Rate Limiting Implementation

All data collection scripts in the F1 Stats Archive implement rate limiting to respect the API’s constraints.

Basic Rate Limiting Pattern

Here’s the simple approach used in events.py:
import time
import requests

def fetch_with_rate_limit(url):
    # Respect rate limits: 4 requests per second, 200 requests per hour
    time.sleep(0.3)  # Wait 0.3 seconds between requests (~ 3.3 req/sec)

    response = requests.get(url)

    # Handle rate limiting
    if response.status_code == 429:
        print("Rate limit exceeded. Waiting 60 seconds before retrying...")
        time.sleep(60)
        return fetch_with_rate_limit(url)

    return response.json()
Using time.sleep(0.3) ensures approximately 3.3 requests per second, safely below the 4 req/sec limit.

Advanced Rate Limiting Pattern

The results.py script uses a class-based approach with precise timing:
import time
import requests

class RaceResultsFetcher:
    def __init__(self, base_dir="."):
        self.base_url = "https://api.jolpi.ca/ergast/f1"
        # Rate limits
        self.burst_limit = 4  # requests per second
        self.last_request_time = 0

    def make_request(self, url):
        """Make a request to the API with rate limiting"""
        # Ensure we don't exceed burst limit
        current_time = time.time()
        time_since_last_request = current_time - self.last_request_time

        if time_since_last_request < (1 / self.burst_limit):
            sleep_time = (1 / self.burst_limit) - time_since_last_request
            time.sleep(sleep_time)

        response = requests.get(url)
        self.last_request_time = time.time()

        if response.status_code == 429:
            print("Rate limit exceeded. Waiting 30 seconds before retrying.")
            time.sleep(30)
            return self.make_request(url)

        if response.status_code != 200:
            print(f"Error fetching data: {response.status_code}")
            return None

        return response.json()
This approach:
  • Tracks the exact time of the last request
  • Calculates the minimum wait time dynamically
  • Ensures precise adherence to the 4 req/sec limit

Retry Logic for 429 Responses

The laptimes.py script demonstrates robust retry logic:
import time
import requests

BASE_URL = "https://api.jolpi.ca/ergast/f1"
RATE_LIMIT_BURST = 4  # Max requests per second

def fetch_laptimes(year, round_num):
    """Fetch all lap times for a specific race with pagination."""
    all_data = None
    offset = 0
    total_records = None

    while total_records is None or offset < total_records:
        url = f"{BASE_URL}/{year}/{round_num}/laps.json?limit=100&offset={offset}"
        
        try:
            response = requests.get(url)
            response.raise_for_status()
            data = response.json()

            if all_data is None:
                all_data = data
                total_records = int(data["MRData"]["total"])
            else:
                # Append new lap data
                all_data["MRData"]["RaceTable"]["Races"][0]["Laps"].extend(
                    data["MRData"]["RaceTable"]["Races"][0]["Laps"]
                )

            offset += 100

            # Respect rate limits
            time.sleep(1 / RATE_LIMIT_BURST)

        except requests.exceptions.RequestException as e:
            if hasattr(e.response, "status_code") and e.response.status_code == 429:
                print("Rate limit exceeded. Waiting for 60 seconds...")
                time.sleep(60)  # Wait longer if we hit rate limit
                continue
            print(f"Error fetching data: {e}")
            return None

    return all_data
This implementation handles pagination while maintaining rate limits, important for endpoints that return large datasets.

Best Practices

1. Use Conservative Delays

Stay well below the maximum rate limit:
# Good: 3 requests per second (25% buffer)
time.sleep(0.33)

# Risky: 4 requests per second (no buffer)
time.sleep(0.25)

2. Implement Exponential Backoff

For production systems, use exponential backoff:
def fetch_with_backoff(url, max_retries=3):
    """Fetch data with exponential backoff on rate limit errors."""
    for attempt in range(max_retries):
        response = requests.get(url)
        
        if response.status_code == 429:
            wait_time = (2 ** attempt) * 30  # 30s, 60s, 120s
            print(f"Rate limited. Waiting {wait_time}s before retry {attempt + 1}/{max_retries}")
            time.sleep(wait_time)
            continue
        
        return response.json()
    
    raise Exception("Max retries exceeded")

3. Monitor Request Counts

Track how many requests you’re making per hour:
from collections import deque
import time

class RateLimitedAPI:
    def __init__(self):
        self.request_times = deque()
        self.hourly_limit = 200
    
    def make_request(self, url):
        # Remove requests older than 1 hour
        current_time = time.time()
        while self.request_times and current_time - self.request_times[0] > 3600:
            self.request_times.popleft()
        
        # Check if we're at the hourly limit
        if len(self.request_times) >= self.hourly_limit:
            wait_time = 3600 - (current_time - self.request_times[0])
            print(f"Hourly limit reached. Waiting {wait_time:.0f} seconds")
            time.sleep(wait_time)
        
        # Make the request
        time.sleep(0.3)  # Burst limit
        response = requests.get(url)
        self.request_times.append(time.time())
        
        return response.json()

4. Cache API Responses

Avoid redundant requests by caching:
import json
from pathlib import Path
import time

def fetch_with_cache(url, cache_dir=".cache", max_age=3600):
    """Fetch data with file-based caching."""
    # Create cache filename from URL
    cache_file = Path(cache_dir) / f"{hash(url)}.json"
    cache_file.parent.mkdir(exist_ok=True)
    
    # Check if cache exists and is fresh
    if cache_file.exists():
        age = time.time() - cache_file.stat().st_mtime
        if age < max_age:
            with open(cache_file, "r") as f:
                return json.load(f)
    
    # Fetch fresh data
    time.sleep(0.3)  # Rate limiting
    response = requests.get(url)
    data = response.json()
    
    # Save to cache
    with open(cache_file, "w") as f:
        json.dump(data, f)
    
    return data

Handling Large Data Collections

When collecting historical data for multiple seasons:

Batch Processing

import time

def fetch_all_seasons(start_year, end_year):
    """Fetch data for multiple seasons with rate limiting."""
    for year in range(start_year, end_year + 1):
        print(f"Processing season {year}...")
        
        # Fetch events for the year
        url = f"https://api.jolpi.ca/ergast/f1/{year}/races/"
        data = fetch_with_rate_limit(url)
        
        # Process each race
        races = data["MRData"]["RaceTable"]["Races"]
        for race in races:
            round_num = race["round"]
            
            # Fetch race results
            results_url = f"https://api.jolpi.ca/ergast/f1/{year}/{round_num}/results.json"
            results = fetch_with_rate_limit(results_url)
            
            # Save data...
        
        print(f"Completed season {year}")
        # Add a longer pause between seasons
        time.sleep(5)

Progress Tracking

import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    handlers=[
        logging.FileHandler("fetch.log"),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger(__name__)

def fetch_with_logging(url):
    """Fetch data with detailed logging."""
    logger.info(f"Fetching: {url}")
    time.sleep(0.3)
    
    response = requests.get(url)
    
    if response.status_code == 429:
        logger.warning("Rate limit exceeded. Implementing backoff.")
        time.sleep(60)
        return fetch_with_logging(url)
    
    logger.info(f"Successfully fetched data from {url}")
    return response.json()

Rate Limit Headers

The Ergast API doesn’t return rate limit headers, so you must track limits client-side using the patterns shown above.
Always implement both burst (per-second) and sustained (per-hour) rate limiting when making multiple requests.

Testing Rate Limits

Test your rate limiting implementation:
import time

def test_rate_limiting():
    """Test that rate limiting works correctly."""
    fetcher = RaceResultsFetcher()
    
    start_time = time.time()
    request_count = 10
    
    for i in range(request_count):
        url = f"https://api.jolpi.ca/ergast/f1/2024/{i+1}/results.json"
        fetcher.make_request(url)
    
    elapsed_time = time.time() - start_time
    actual_rate = request_count / elapsed_time
    
    print(f"Made {request_count} requests in {elapsed_time:.2f}s")
    print(f"Actual rate: {actual_rate:.2f} req/s")
    print(f"Expected: <= 4 req/s")
    
    assert actual_rate <= 4.1, "Rate limit exceeded!"

test_rate_limiting()

Troubleshooting

Still Getting 429 Errors

  1. Increase the delay between requests
  2. Check if you have multiple processes running
  3. Verify you’re not making concurrent requests
  4. Implement longer backoff periods (60-120 seconds)

Slow Data Collection

With rate limits, collecting complete historical data takes time:
  • 4 req/sec = 240 requests/minute
  • 200 req/hour sustained
  • Full season (24 races × 8 endpoints) = 192 requests
  • Multiple seasons will take hours to complete
Be patient and let automated workflows handle gradual updates.

Build docs developers (and LLMs) love