Ergast API Rate Limits
The Ergast F1 API enforces the following rate limits to ensure fair usage:
- Burst Limit: 4 requests per second
- Sustained Limit: 200 requests per hour
Exceeding these limits will result in HTTP 429 (Too Many Requests) responses. Your requests will be throttled until the rate limit window resets.
Rate Limiting Implementation
All data collection scripts in the F1 Stats Archive implement rate limiting to respect the API’s constraints.
Basic Rate Limiting Pattern
Here’s the simple approach used in events.py:
import time
import requests
def fetch_with_rate_limit(url):
# Respect rate limits: 4 requests per second, 200 requests per hour
time.sleep(0.3) # Wait 0.3 seconds between requests (~ 3.3 req/sec)
response = requests.get(url)
# Handle rate limiting
if response.status_code == 429:
print("Rate limit exceeded. Waiting 60 seconds before retrying...")
time.sleep(60)
return fetch_with_rate_limit(url)
return response.json()
Using time.sleep(0.3) ensures approximately 3.3 requests per second, safely below the 4 req/sec limit.
Advanced Rate Limiting Pattern
The results.py script uses a class-based approach with precise timing:
import time
import requests
class RaceResultsFetcher:
def __init__(self, base_dir="."):
self.base_url = "https://api.jolpi.ca/ergast/f1"
# Rate limits
self.burst_limit = 4 # requests per second
self.last_request_time = 0
def make_request(self, url):
"""Make a request to the API with rate limiting"""
# Ensure we don't exceed burst limit
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < (1 / self.burst_limit):
sleep_time = (1 / self.burst_limit) - time_since_last_request
time.sleep(sleep_time)
response = requests.get(url)
self.last_request_time = time.time()
if response.status_code == 429:
print("Rate limit exceeded. Waiting 30 seconds before retrying.")
time.sleep(30)
return self.make_request(url)
if response.status_code != 200:
print(f"Error fetching data: {response.status_code}")
return None
return response.json()
This approach:
- Tracks the exact time of the last request
- Calculates the minimum wait time dynamically
- Ensures precise adherence to the 4 req/sec limit
Retry Logic for 429 Responses
The laptimes.py script demonstrates robust retry logic:
import time
import requests
BASE_URL = "https://api.jolpi.ca/ergast/f1"
RATE_LIMIT_BURST = 4 # Max requests per second
def fetch_laptimes(year, round_num):
"""Fetch all lap times for a specific race with pagination."""
all_data = None
offset = 0
total_records = None
while total_records is None or offset < total_records:
url = f"{BASE_URL}/{year}/{round_num}/laps.json?limit=100&offset={offset}"
try:
response = requests.get(url)
response.raise_for_status()
data = response.json()
if all_data is None:
all_data = data
total_records = int(data["MRData"]["total"])
else:
# Append new lap data
all_data["MRData"]["RaceTable"]["Races"][0]["Laps"].extend(
data["MRData"]["RaceTable"]["Races"][0]["Laps"]
)
offset += 100
# Respect rate limits
time.sleep(1 / RATE_LIMIT_BURST)
except requests.exceptions.RequestException as e:
if hasattr(e.response, "status_code") and e.response.status_code == 429:
print("Rate limit exceeded. Waiting for 60 seconds...")
time.sleep(60) # Wait longer if we hit rate limit
continue
print(f"Error fetching data: {e}")
return None
return all_data
This implementation handles pagination while maintaining rate limits, important for endpoints that return large datasets.
Best Practices
1. Use Conservative Delays
Stay well below the maximum rate limit:
# Good: 3 requests per second (25% buffer)
time.sleep(0.33)
# Risky: 4 requests per second (no buffer)
time.sleep(0.25)
2. Implement Exponential Backoff
For production systems, use exponential backoff:
def fetch_with_backoff(url, max_retries=3):
"""Fetch data with exponential backoff on rate limit errors."""
for attempt in range(max_retries):
response = requests.get(url)
if response.status_code == 429:
wait_time = (2 ** attempt) * 30 # 30s, 60s, 120s
print(f"Rate limited. Waiting {wait_time}s before retry {attempt + 1}/{max_retries}")
time.sleep(wait_time)
continue
return response.json()
raise Exception("Max retries exceeded")
3. Monitor Request Counts
Track how many requests you’re making per hour:
from collections import deque
import time
class RateLimitedAPI:
def __init__(self):
self.request_times = deque()
self.hourly_limit = 200
def make_request(self, url):
# Remove requests older than 1 hour
current_time = time.time()
while self.request_times and current_time - self.request_times[0] > 3600:
self.request_times.popleft()
# Check if we're at the hourly limit
if len(self.request_times) >= self.hourly_limit:
wait_time = 3600 - (current_time - self.request_times[0])
print(f"Hourly limit reached. Waiting {wait_time:.0f} seconds")
time.sleep(wait_time)
# Make the request
time.sleep(0.3) # Burst limit
response = requests.get(url)
self.request_times.append(time.time())
return response.json()
4. Cache API Responses
Avoid redundant requests by caching:
import json
from pathlib import Path
import time
def fetch_with_cache(url, cache_dir=".cache", max_age=3600):
"""Fetch data with file-based caching."""
# Create cache filename from URL
cache_file = Path(cache_dir) / f"{hash(url)}.json"
cache_file.parent.mkdir(exist_ok=True)
# Check if cache exists and is fresh
if cache_file.exists():
age = time.time() - cache_file.stat().st_mtime
if age < max_age:
with open(cache_file, "r") as f:
return json.load(f)
# Fetch fresh data
time.sleep(0.3) # Rate limiting
response = requests.get(url)
data = response.json()
# Save to cache
with open(cache_file, "w") as f:
json.dump(data, f)
return data
Handling Large Data Collections
When collecting historical data for multiple seasons:
Batch Processing
import time
def fetch_all_seasons(start_year, end_year):
"""Fetch data for multiple seasons with rate limiting."""
for year in range(start_year, end_year + 1):
print(f"Processing season {year}...")
# Fetch events for the year
url = f"https://api.jolpi.ca/ergast/f1/{year}/races/"
data = fetch_with_rate_limit(url)
# Process each race
races = data["MRData"]["RaceTable"]["Races"]
for race in races:
round_num = race["round"]
# Fetch race results
results_url = f"https://api.jolpi.ca/ergast/f1/{year}/{round_num}/results.json"
results = fetch_with_rate_limit(results_url)
# Save data...
print(f"Completed season {year}")
# Add a longer pause between seasons
time.sleep(5)
Progress Tracking
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("fetch.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def fetch_with_logging(url):
"""Fetch data with detailed logging."""
logger.info(f"Fetching: {url}")
time.sleep(0.3)
response = requests.get(url)
if response.status_code == 429:
logger.warning("Rate limit exceeded. Implementing backoff.")
time.sleep(60)
return fetch_with_logging(url)
logger.info(f"Successfully fetched data from {url}")
return response.json()
The Ergast API doesn’t return rate limit headers, so you must track limits client-side using the patterns shown above.
Always implement both burst (per-second) and sustained (per-hour) rate limiting when making multiple requests.
Testing Rate Limits
Test your rate limiting implementation:
import time
def test_rate_limiting():
"""Test that rate limiting works correctly."""
fetcher = RaceResultsFetcher()
start_time = time.time()
request_count = 10
for i in range(request_count):
url = f"https://api.jolpi.ca/ergast/f1/2024/{i+1}/results.json"
fetcher.make_request(url)
elapsed_time = time.time() - start_time
actual_rate = request_count / elapsed_time
print(f"Made {request_count} requests in {elapsed_time:.2f}s")
print(f"Actual rate: {actual_rate:.2f} req/s")
print(f"Expected: <= 4 req/s")
assert actual_rate <= 4.1, "Rate limit exceeded!"
test_rate_limiting()
Troubleshooting
Still Getting 429 Errors
- Increase the delay between requests
- Check if you have multiple processes running
- Verify you’re not making concurrent requests
- Implement longer backoff periods (60-120 seconds)
Slow Data Collection
With rate limits, collecting complete historical data takes time:
- 4 req/sec = 240 requests/minute
- 200 req/hour sustained
- Full season (24 races × 8 endpoints) = 192 requests
- Multiple seasons will take hours to complete
Be patient and let automated workflows handle gradual updates.