Overview
The F1 Stats Archive implements comprehensive rate limiting to respect the Ergast API’s usage constraints. All data fetchers follow consistent patterns to ensure compliance with both burst and sustained rate limits.
Rate Limit Constraints
The Ergast API enforces the following limits:
- Burst limit: 4 requests per second
- Sustained limit: 500 requests per hour
- 429 response: Rate limit exceeded (requires backoff)
Implementation Patterns
Burst Rate Limiting
Most scripts use a burst limit of 4 requests per second, implemented through request delay calculations:
# Rate limiting parameters
BURST_LIMIT = 4 # requests per second
LAST_REQUEST_TIME = 0
def make_request(self, url):
"""Make a request to the API with rate limiting"""
# Ensure we don't exceed burst limit
current_time = time.time()
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < (1 / self.burst_limit):
sleep_time = (1 / self.burst_limit) - time_since_last_request
logger.debug(f"Rate limiting: sleeping for {sleep_time:.2f} seconds")
time.sleep(sleep_time)
logger.debug(f"Making request to: {url}")
response = requests.get(url)
self.last_request_time = time.time()
The delay is calculated as 1 / burst_limit seconds. For 4 requests per second, this equals 0.25 seconds between requests.
Conservative Rate Limiting
Some scripts use a more conservative approach with only 2 requests per second:
# Rate limiting parameters
BURST_LIMIT = 2 # requests per second
SUSTAINED_LIMIT = 500 # requests per hour
REQUEST_DELAY = 1 / BURST_LIMIT # seconds between requests
def check_rate_limits(self):
"""Check if we're within rate limits, wait if necessary"""
self.reset_hour_counter_if_needed()
# Check sustained (hourly) limit
if self.requests_this_hour >= SUSTAINED_LIMIT:
wait_time = 1800 - (time.time() - self.hour_start_time)
if wait_time > 0:
logger.warning(
f"Hourly rate limit reached. Waiting {wait_time:.2f} seconds"
)
time.sleep(wait_time)
self.requests_this_hour = 0
self.hour_start_time = time.time()
# Always wait between requests to respect burst limit
time.sleep(REQUEST_DELAY)
From team_points.py:24-59 and driver_points.py:21-46
Sustained Rate Limiting
For long-running operations, track hourly request counts:
def __init__(self, base_dir="data"):
self.base_dir = Path(base_dir)
self.requests_this_hour = 0
self.hour_start_time = time.time()
def reset_hour_counter_if_needed(self):
"""Reset the hourly request counter if an hour has passed"""
current_time = time.time()
if current_time - self.hour_start_time > 3600: # 3600 seconds = 1 hour
self.requests_this_hour = 0
self.hour_start_time = current_time
logger.info("Hourly request counter reset")
From team_points.py:30-41
429 Response Handling
All scripts implement automatic retry with exponential backoff when rate limits are exceeded:
if response.status_code == 429:
logger.warning("Rate limit exceeded. Waiting 30 seconds before retrying.")
time.sleep(30)
return self.make_request(url) # Retry after waiting
The recursive retry ensures that requests eventually succeed after the rate limit window resets.
Alternative 429 Handling
Some scripts use a longer 60-second wait:
if response.status_code == 429:
logger.warning("Rate limit exceeded. Waiting 60 seconds before retrying...")
time.sleep(60)
return fetch_with_rate_limit(url)
From events.py:32-35 and laptimes.py:56-59
Rate Limiting Constants
Different scripts use different rate limiting configurations:
| Script | Burst Limit | Sustained Limit | Request Delay |
|---|
team_points.py | 2 req/sec | 500 req/hour | 0.5 seconds |
driver_points.py | 2 req/sec | 500 req/hour | 0.5 seconds |
sprint_results.py | 4 req/sec | - | 0.25 seconds |
results.py | 4 req/sec | - | 0.25 seconds |
quali_results.py | 4 req/sec | - | 0.25 seconds |
pitstops.py | 4 req/sec | - | 0.25 seconds |
laptimes.py | 4 req/sec | 500 req/hour | 0.25 seconds |
events.py | ~3.3 req/sec | 500 req/hour | 0.3 seconds |
Best Practices
1. Always Track Request Timing
class Fetcher:
def __init__(self):
self.last_request_time = 0
self.burst_limit = 4
2. Calculate Sleep Time Dynamically
time_since_last_request = current_time - self.last_request_time
if time_since_last_request < (1 / self.burst_limit):
sleep_time = (1 / self.burst_limit) - time_since_last_request
time.sleep(sleep_time)
3. Update Timestamp After Request
response = requests.get(url)
self.last_request_time = time.time()
4. Implement Retry Logic
if response.status_code == 429:
logger.warning("Rate limit exceeded. Waiting 30 seconds before retrying.")
time.sleep(30)
return self.make_request(url)
5. Monitor Hourly Limits for Long Operations
self.requests_this_hour += 1
if self.requests_this_hour >= SUSTAINED_LIMIT:
wait_time = 3600 - (time.time() - self.hour_start_time)
time.sleep(wait_time)
self.requests_this_hour = 0
Simple Rate Limiting Example
For quick scripts without class-based architecture:
import time
import requests
RATE_LIMIT_BURST = 4
REQUEST_DELAY = 1 / RATE_LIMIT_BURST
def fetch_with_rate_limit(url):
"""Fetch data with rate limiting"""
time.sleep(REQUEST_DELAY)
response = requests.get(url)
if response.status_code == 429:
print("Rate limit exceeded. Waiting 60 seconds before retrying...")
time.sleep(60)
return fetch_with_rate_limit(url)
return response.json()
From events.py:24-37
Choose between conservative (2 req/sec) and standard (4 req/sec) limits based on your use case. For bulk operations, use the conservative approach.