Skip to main content

Overview

This guide covers best practices for using the TerraQuake API efficiently and reliably. Follow these recommendations to build robust applications that provide the best user experience.

API Usage Patterns

Pagination Strategy

Always use pagination to manage large datasets and reduce response times.
import requests

def fetch_all_earthquakes(start_date, end_date):
    """Fetch earthquakes using pagination"""
    url = 'https://api.terraquakeapi.com/api/v1/earthquakes/range'
    all_events = []
    page = 1
    
    while True:
        params = {
            'startdate': start_date,
            'enddate': end_date,
            'limit': 100,  # Reasonable page size
            'page': page
        }
        
        response = requests.get(url, params=params)
        data = response.json()
        
        all_events.extend(data['payload'])
        
        # Check if there are more pages
        if not data['pagination']['hasMore']:
            break
        
        page += 1
    
    return all_events

earthquakes = fetch_all_earthquakes('2024-01-01', '2024-03-31')
print(f"Total: {len(earthquakes)} earthquakes")
Default limit is 50. Use limit=100 for fewer requests, or limit=25 for faster initial responses.

Request Only What You Need

Use field selection and sorting to optimize responses.
import requests

# ✅ Request only required fields
url = 'https://api.terraquakeapi.com/api/v1/earthquakes/recent'
params = {
    'fields': 'time,magnitude,place,coordinates',  # Only what you need
    'sort': '-magnitude',  # Get important data first
    'limit': 50
}

response = requests.get(url, params=params)
data = response.json()

Use Appropriate Endpoints

Choose the most specific endpoint for your use case.
Python
import requests

# ✅ GOOD: Use specific endpoint for today's data
response = requests.get('https://api.terraquakeapi.com/api/v1/earthquakes/today')

# ❌ BAD: Using generic endpoint with filters
from datetime import date
today = date.today().isoformat()
response = requests.get(
    'https://api.terraquakeapi.com/api/v1/earthquakes/range',
    params={'startdate': today, 'enddate': today}
)  # More processing required

Caching Strategies

Implement caching to reduce API calls and improve performance.

Time-Based Caching

1

Identify cacheable data

Historical data (past months/years) doesn’t change and can be cached indefinitely:
Python
# These can be cached long-term
- /earthquakes/month?year=2023&month=6  # Cache: 30 days
- /earthquakes/range?startdate=2023-01-01&enddate=2023-12-31  # Cache: 30 days

# These need shorter caching
- /earthquakes/today  # Cache: 5 minutes
- /earthquakes/recent  # Cache: 1 hour
2

Implement caching

Python
import requests
from datetime import datetime, timedelta
from functools import lru_cache

class TerraQuakeClient:
    def __init__(self):
        self.base_url = 'https://api.terraquakeapi.com/api/v1'
        self.cache = {}
    
    def _get_cache_key(self, endpoint, params):
        """Generate unique cache key"""
        param_str = '&'.join(f"{k}={v}" for k, v in sorted(params.items()))
        return f"{endpoint}?{param_str}"
    
    def _is_cache_valid(self, cache_entry, ttl_seconds):
        """Check if cache entry is still valid"""
        age = (datetime.now() - cache_entry['timestamp']).seconds
        return age < ttl_seconds
    
    def get_earthquakes(self, endpoint, params, cache_ttl=300):
        """Fetch earthquakes with caching"""
        cache_key = self._get_cache_key(endpoint, params)
        
        # Check cache
        if cache_key in self.cache:
            entry = self.cache[cache_key]
            if self._is_cache_valid(entry, cache_ttl):
                print(f"Cache hit: {cache_key}")
                return entry['data']
        
        # Fetch from API
        print(f"Cache miss: {cache_key}")
        url = f"{self.base_url}/{endpoint}"
        response = requests.get(url, params=params)
        data = response.json()
        
        # Store in cache
        self.cache[cache_key] = {
            'data': data,
            'timestamp': datetime.now()
        }
        
        return data

# Usage
client = TerraQuakeClient()

# Historical data - cache for 24 hours
data1 = client.get_earthquakes(
    'earthquakes/month',
    {'year': 2023, 'month': 6},
    cache_ttl=86400
)

# Today's data - cache for 5 minutes
data2 = client.get_earthquakes(
    'earthquakes/today',
    {'limit': 50},
    cache_ttl=300
)

Using Redis for Caching

Python
import requests
import redis
import json
from datetime import timedelta

class CachedTerraQuakeClient:
    def __init__(self, redis_url='redis://localhost:6379'):
        self.base_url = 'https://api.terraquakeapi.com/api/v1'
        self.redis = redis.from_url(redis_url)
    
    def get_earthquakes(self, endpoint, params, cache_ttl=300):
        """Fetch with Redis caching"""
        cache_key = f"terraquake:{endpoint}:{json.dumps(params, sort_keys=True)}"
        
        # Try cache first
        cached = self.redis.get(cache_key)
        if cached:
            print("Redis cache hit")
            return json.loads(cached)
        
        # Fetch from API
        print("Fetching from API")
        url = f"{self.base_url}/{endpoint}"
        response = requests.get(url, params=params)
        data = response.json()
        
        # Store in Redis with TTL
        self.redis.setex(
            cache_key,
            timedelta(seconds=cache_ttl),
            json.dumps(data)
        )
        
        return data

# Usage
client = CachedTerraQuakeClient()

# First call - fetches from API
data = client.get_earthquakes('earthquakes/recent', {'limit': 100}, cache_ttl=3600)

# Second call - returns from cache
data = client.get_earthquakes('earthquakes/recent', {'limit': 100}, cache_ttl=3600)

Rate Limit Management

While the TerraQuake API doesn’t currently enforce strict rate limits, implement rate limiting to be a good API citizen and prepare for future restrictions.

Implement Request Throttling

Python
import requests
import time
from datetime import datetime, timedelta

class RateLimitedClient:
    def __init__(self, max_requests=60, time_window=60):
        """Initialize with rate limit (default: 60 requests per minute)"""
        self.max_requests = max_requests
        self.time_window = time_window  # seconds
        self.requests = []
    
    def _clean_old_requests(self):
        """Remove requests older than time window"""
        cutoff = datetime.now() - timedelta(seconds=self.time_window)
        self.requests = [req for req in self.requests if req > cutoff]
    
    def _wait_if_needed(self):
        """Wait if rate limit would be exceeded"""
        self._clean_old_requests()
        
        if len(self.requests) >= self.max_requests:
            # Calculate wait time
            oldest_request = min(self.requests)
            wait_until = oldest_request + timedelta(seconds=self.time_window)
            wait_seconds = (wait_until - datetime.now()).total_seconds()
            
            if wait_seconds > 0:
                print(f"Rate limit reached. Waiting {wait_seconds:.1f}s...")
                time.sleep(wait_seconds)
                self._clean_old_requests()
    
    def get(self, url, **kwargs):
        """Rate-limited GET request"""
        self._wait_if_needed()
        self.requests.append(datetime.now())
        return requests.get(url, **kwargs)

# Usage
client = RateLimitedClient(max_requests=60, time_window=60)

# Make many requests - automatically throttled
for i in range(100):
    response = client.get('https://api.terraquakeapi.com/api/v1/earthquakes/today')
    print(f"Request {i+1}: {response.status_code}")

Exponential Backoff for Retries

Python
import requests
import time
from requests.exceptions import RequestException

def fetch_with_retry(url, params=None, max_retries=3):
    """Fetch with exponential backoff retry"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            return response.json()
        
        except RequestException as e:
            if attempt == max_retries - 1:
                # Last attempt failed
                raise
            
            # Calculate backoff time: 2^attempt seconds
            wait_time = 2 ** attempt
            print(f"Request failed: {e}. Retrying in {wait_time}s...")
            time.sleep(wait_time)

# Usage
try:
    data = fetch_with_retry(
        'https://api.terraquakeapi.com/api/v1/earthquakes/recent',
        params={'limit': 100}
    )
    print(f"Success: {len(data['payload'])} earthquakes")
except RequestException as e:
    print(f"Failed after all retries: {e}")

Error Handling

Comprehensive Error Handling

Python
import requests
from requests.exceptions import (
    RequestException,
    Timeout,
    ConnectionError,
    HTTPError
)
import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class TerraQuakeAPIError(Exception):
    """Base exception for TerraQuake API errors"""
    pass

class TerraQuakeClient:
    def __init__(self, base_url='https://api.terraquakeapi.com/api/v1'):
        self.base_url = base_url
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'TerraQuake-Client/1.0'
        })
    
    def get_earthquakes(self, endpoint, params=None, timeout=10):
        """Fetch earthquakes with comprehensive error handling"""
        url = f"{self.base_url}/{endpoint}"
        
        try:
            logger.info(f"Requesting: {url}")
            response = self.session.get(url, params=params, timeout=timeout)
            
            # Check for HTTP errors
            response.raise_for_status()
            
            # Parse JSON
            try:
                data = response.json()
            except ValueError as e:
                logger.error(f"Invalid JSON response: {e}")
                raise TerraQuakeAPIError("Invalid API response format")
            
            # Validate response structure
            if 'payload' not in data:
                logger.error(f"Missing 'payload' in response: {data}")
                raise TerraQuakeAPIError("Invalid response structure")
            
            logger.info(f"Success: {len(data['payload'])} results")
            return data
        
        except Timeout:
            logger.error(f"Request timed out after {timeout}s")
            raise TerraQuakeAPIError("API request timed out")
        
        except ConnectionError as e:
            logger.error(f"Connection error: {e}")
            raise TerraQuakeAPIError("Failed to connect to API")
        
        except HTTPError as e:
            status_code = e.response.status_code
            
            if status_code == 400:
                logger.error(f"Bad request: {e.response.text}")
                raise TerraQuakeAPIError("Invalid request parameters")
            elif status_code == 404:
                logger.error("Endpoint not found")
                raise TerraQuakeAPIError("API endpoint not found")
            elif status_code >= 500:
                logger.error(f"Server error: {status_code}")
                raise TerraQuakeAPIError("API server error")
            else:
                logger.error(f"HTTP error {status_code}: {e}")
                raise TerraQuakeAPIError(f"HTTP error: {status_code}")
        
        except RequestException as e:
            logger.error(f"Request failed: {e}")
            raise TerraQuakeAPIError(f"API request failed: {str(e)}")

# Usage
client = TerraQuakeClient()

try:
    data = client.get_earthquakes(
        'earthquakes/region',
        params={'region': 'Calabria', 'limit': 100}
    )
    print(f"Retrieved {len(data['payload'])} earthquakes")

except TerraQuakeAPIError as e:
    print(f"API Error: {e}")
    # Handle error appropriately (show user message, use cached data, etc.)

Graceful Degradation

Python
import requests
from datetime import datetime

class ResilientTerraQuakeClient:
    def __init__(self):
        self.base_url = 'https://api.terraquakeapi.com/api/v1'
        self.fallback_data = None
    
    def get_earthquakes_with_fallback(self, endpoint, params):
        """Fetch with fallback to cached/default data"""
        try:
            url = f"{self.base_url}/{endpoint}"
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()
            
            # Store as fallback
            self.fallback_data = data
            return data, 'live'
        
        except Exception as e:
            print(f"API error: {e}")
            
            if self.fallback_data:
                print("Using cached data as fallback")
                return self.fallback_data, 'cached'
            else:
                print("No fallback data available")
                # Return empty but valid response structure
                return {
                    'payload': [],
                    'message': 'API temporarily unavailable',
                    'totalEarthquakes': 0,
                    'pagination': {'page': 1, 'totalPages': 0, 'hasMore': False}
                }, 'empty'

# Usage
client = ResilientTerraQuakeClient()
data, source = client.get_earthquakes_with_fallback(
    'earthquakes/today',
    {'limit': 50}
)

print(f"Data source: {source}")
print(f"Events: {len(data['payload'])}")

Data Freshness and Polling

Smart Polling Strategy

Python
import requests
import time
from datetime import datetime

class EarthquakePoller:
    def __init__(self, check_interval=300):
        """Initialize poller (default: check every 5 minutes)"""
        self.base_url = 'https://api.terraquakeapi.com/api/v1'
        self.check_interval = check_interval
        self.last_event_time = None
    
    def poll_for_new_events(self):
        """Poll for new earthquakes since last check"""
        url = f"{self.base_url}/earthquakes/today"
        params = {'limit': 100, 'sort': '-time'}
        
        try:
            response = requests.get(url, params=params, timeout=10)
            data = response.json()
            
            if not data['payload']:
                return []
            
            # Get most recent event time
            latest_time = data['payload'][0]['time']
            
            # First run - store latest time
            if self.last_event_time is None:
                self.last_event_time = latest_time
                return []  # Don't return all events on first run
            
            # Find new events since last check
            new_events = []
            for event in data['payload']:
                if event['time'] > self.last_event_time:
                    new_events.append(event)
                else:
                    break  # Events are sorted by time
            
            # Update last check time
            if new_events:
                self.last_event_time = latest_time
            
            return new_events
        
        except Exception as e:
            print(f"Polling error: {e}")
            return []
    
    def start_polling(self, callback):
        """Start polling loop with callback for new events"""
        print(f"Starting earthquake poller (interval: {self.check_interval}s)")
        
        while True:
            new_events = self.poll_for_new_events()
            
            if new_events:
                print(f"\n[{datetime.now()}] Found {len(new_events)} new earthquakes")
                for event in new_events:
                    callback(event)
            
            time.sleep(self.check_interval)

# Callback function
def on_new_earthquake(event):
    """Handle new earthquake event"""
    print(f"  M{event['magnitude']} - {event['place']} at {event['time']}")
    
    # Send notification, update database, etc.
    if event['magnitude'] >= 4.0:
        print(f"    ⚠️ SIGNIFICANT EARTHQUAKE!")

# Start polling
poller = EarthquakePoller(check_interval=300)  # 5 minutes
poller.start_polling(on_new_earthquake)

Performance Optimization

Parallel Requests

Python
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_region(region):
    """Fetch earthquakes for a single region"""
    url = 'https://api.terraquakeapi.com/api/v1/earthquakes/region'
    params = {'region': region, 'limit': 100}
    
    try:
        response = requests.get(url, params=params, timeout=10)
        data = response.json()
        return {
            'region': region,
            'count': data['totalEarthquakes'],
            'events': data['payload']
        }
    except Exception as e:
        print(f"Error fetching {region}: {e}")
        return {'region': region, 'count': 0, 'events': []}

def fetch_all_regions_parallel(regions, max_workers=5):
    """Fetch multiple regions in parallel"""
    results = {}
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_region = {
            executor.submit(fetch_region, region): region
            for region in regions
        }
        
        # Collect results as they complete
        for future in as_completed(future_to_region):
            result = future.result()
            results[result['region']] = result
    
    return results

# Fetch data for multiple regions in parallel
regions = ['Calabria', 'Sicilia', 'Campania', 'Lazio', 'Toscana']
results = fetch_all_regions_parallel(regions, max_workers=3)

# Display results
for region, data in results.items():
    print(f"{region}: {data['count']} earthquakes")

Connection Pooling

Python
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

class OptimizedTerraQuakeClient:
    def __init__(self):
        self.base_url = 'https://api.terraquakeapi.com/api/v1'
        
        # Configure session with connection pooling
        self.session = requests.Session()
        
        # Retry strategy
        retry_strategy = Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[429, 500, 502, 503, 504]
        )
        
        # HTTP adapter with pooling
        adapter = HTTPAdapter(
            pool_connections=10,
            pool_maxsize=20,
            max_retries=retry_strategy
        )
        
        self.session.mount('http://', adapter)
        self.session.mount('https://', adapter)
    
    def get(self, endpoint, params=None):
        """Optimized GET request"""
        url = f"{self.base_url}/{endpoint}"
        response = self.session.get(url, params=params, timeout=10)
        response.raise_for_status()
        return response.json()

# Usage - reuses connections
client = OptimizedTerraQuakeClient()

for i in range(10):
    data = client.get('earthquakes/today', {'limit': 50})
    print(f"Request {i+1}: {len(data['payload'])} events")

Monitoring and Logging

Python
import requests
import logging
import time
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('terraquake_api.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('TerraQuakeClient')

class MonitoredClient:
    def __init__(self):
        self.base_url = 'https://api.terraquakeapi.com/api/v1'
        self.request_count = 0
        self.error_count = 0
        self.total_response_time = 0
    
    def get_earthquakes(self, endpoint, params=None):
        """Fetch with monitoring"""
        url = f"{self.base_url}/{endpoint}"
        start_time = time.time()
        
        try:
            logger.info(f"Request {self.request_count + 1}: {endpoint}")
            
            response = requests.get(url, params=params, timeout=10)
            response.raise_for_status()
            data = response.json()
            
            # Calculate metrics
            response_time = time.time() - start_time
            self.request_count += 1
            self.total_response_time += response_time
            
            logger.info(
                f"Success: {len(data['payload'])} results in {response_time:.2f}s"
            )
            
            return data
        
        except Exception as e:
            self.error_count += 1
            response_time = time.time() - start_time
            
            logger.error(
                f"Request failed after {response_time:.2f}s: {e}",
                exc_info=True
            )
            raise
    
    def get_stats(self):
        """Get client statistics"""
        avg_response_time = (
            self.total_response_time / self.request_count
            if self.request_count > 0
            else 0
        )
        
        return {
            'total_requests': self.request_count,
            'errors': self.error_count,
            'success_rate': (
                (self.request_count - self.error_count) / self.request_count * 100
                if self.request_count > 0
                else 0
            ),
            'avg_response_time': avg_response_time
        }

# Usage
client = MonitoredClient()

for region in ['Calabria', 'Sicilia', 'Campania']:
    try:
        data = client.get_earthquakes('earthquakes/region', {'region': region})
    except Exception:
        pass

stats = client.get_stats()
print(f"\nClient Statistics:")
print(f"Total Requests: {stats['total_requests']}")
print(f"Errors: {stats['errors']}")
print(f"Success Rate: {stats['success_rate']:.1f}%")
print(f"Avg Response Time: {stats['avg_response_time']:.2f}s")

Summary Checklist

✅ Do

  • Use pagination for large datasets
  • Implement caching with appropriate TTLs
  • Handle errors gracefully
  • Use specific endpoints
  • Request only needed fields
  • Implement retry logic
  • Monitor API usage
  • Use connection pooling

❌ Don't

  • Fetch all data without pagination
  • Poll too frequently (< 1 minute)
  • Ignore error responses
  • Make unnecessary requests
  • Hardcode API responses
  • Ignore timeouts
  • Skip logging
  • Create new connections per request

Next Steps

API Reference

Complete endpoint documentation

Filtering Guide

Learn about filter options

Build docs developers (and LLMs) love