Follow these best practices to build robust, efficient, and secure applications using the Pump.fun API.
Rate limiting
Understanding rate limits
The Pump.fun API implements rate limiting to ensure fair usage and system stability. Rate limits vary by endpoint and are communicated through response headers.
Check these headers in API responses:
import requests
response = requests.get(url, headers=headers)
# Check rate limit headers
rate_limit = response.headers.get('x-ratelimit-limit')
rate_remaining = response.headers.get('x-ratelimit-remaining')
rate_reset = response.headers.get('x-ratelimit-reset')
print(f"Limit: {rate_limit}")
print(f"Remaining: {rate_remaining}")
print(f"Reset: {rate_reset}")
Implement exponential backoff
Handle rate limit errors with exponential backoff:
import requests
import time
from typing import Optional, Dict, Any
def request_with_backoff(url: str, headers: Dict[str, str],
max_retries: int = 5) -> Optional[requests.Response]:
"""
Make request with exponential backoff on rate limits
"""
for attempt in range(max_retries):
response = requests.get(url, headers=headers)
if response.status_code == 200:
return response
elif response.status_code == 429:
# Rate limited
retry_after = int(response.headers.get('Retry-After', 2 ** attempt))
print(f"Rate limited. Waiting {retry_after}s...")
time.sleep(retry_after)
else:
print(f"Error {response.status_code}: {response.text}")
return None
return None
# Usage
response = request_with_backoff(url, headers)
if response:
data = response.json()
Never ignore 429 status codes. Always implement proper backoff to avoid being temporarily blocked.
Rate limiting best practices
Respect rate limits
Monitor x-ratelimit-remaining and slow down requests when approaching limits.
Implement request queuing
Queue requests and process them at a controlled rate rather than sending bursts.
Use exponential backoff
When rate limited, wait exponentially longer between retries (2s, 4s, 8s, etc.).
Cache aggressively
Cache responses to minimize redundant API calls.
Caching strategies
ETag caching
Use ETag headers to avoid re-downloading unchanged data:
import requests
class ETAgCache:
def __init__(self):
self.cache = {} # url -> (etag, data)
def get(self, url: str, headers: Dict[str, str]) -> Optional[Any]:
"""
Get data with ETag caching
"""
# Include cached ETag if available
if url in self.cache:
etag, cached_data = self.cache[url]
headers['If-None-Match'] = etag
response = requests.get(url, headers=headers)
if response.status_code == 304:
# Not modified, return cached data
print("Using cached data (304 Not Modified)")
return self.cache[url][1]
elif response.status_code == 200:
# New data, update cache
data = response.json()
etag = response.headers.get('ETag')
if etag:
self.cache[url] = (etag, data)
return data
return None
# Usage
cache = ETAgCache()
headers = {"Authorization": "Bearer <your_token>", "Accept": "application/json"}
data = cache.get(url, headers)
Time-based caching
Implement time-based cache expiration:
import time
from typing import Optional, Any, Dict
class TimeBasedCache:
def __init__(self, ttl: int = 300):
self.cache = {} # key -> (data, timestamp)
self.ttl = ttl # Time to live in seconds
def get(self, key: str) -> Optional[Any]:
"""Get cached data if not expired"""
if key in self.cache:
data, timestamp = self.cache[key]
if time.time() - timestamp < self.ttl:
return data
else:
# Expired, remove from cache
del self.cache[key]
return None
def set(self, key: str, data: Any):
"""Store data in cache with current timestamp"""
self.cache[key] = (data, time.time())
def clear_expired(self):
"""Clear all expired entries"""
current_time = time.time()
expired_keys = [
key for key, (_, timestamp) in self.cache.items()
if current_time - timestamp >= self.ttl
]
for key in expired_keys:
del self.cache[key]
# Usage
cache = TimeBasedCache(ttl=300) # 5 minute cache
# Try to get from cache
data = cache.get('coins_latest')
if data is None:
# Cache miss, fetch from API
response = requests.get(url, headers=headers)
data = response.json()
cache.set('coins_latest', data)
Set appropriate TTL values based on data volatility. Use shorter TTLs (30-60s) for real-time data, longer (5-15min) for static content.
Error handling
Comprehensive error handling
Implement robust error handling for all API calls:
import requests
from typing import Optional, Dict, Any
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class APIClient:
def __init__(self, token: str):
self.base_url = "https://frontend-api-v3.pump.fun"
self.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json"
}
def make_request(self, endpoint: str, method: str = 'GET',
params: Optional[Dict] = None,
data: Optional[Dict] = None) -> Optional[Dict[str, Any]]:
"""
Make API request with comprehensive error handling
"""
url = f"{self.base_url}{endpoint}"
try:
if method == 'GET':
response = requests.get(url, headers=self.headers, params=params, timeout=30)
elif method == 'POST':
response = requests.post(url, headers=self.headers, json=data, timeout=30)
elif method == 'DELETE':
response = requests.delete(url, headers=self.headers, timeout=30)
else:
logger.error(f"Unsupported method: {method}")
return None
# Handle different status codes
if response.status_code == 200 or response.status_code == 201:
return response.json()
elif response.status_code == 304:
logger.info("Content not modified")
return None
elif response.status_code == 400:
logger.error(f"Bad request: {response.text}")
return None
elif response.status_code == 401:
logger.error("Unauthorized. Check your authentication token.")
return None
elif response.status_code == 403:
logger.error("Forbidden. Insufficient permissions.")
return None
elif response.status_code == 404:
logger.error(f"Not found: {endpoint}")
return None
elif response.status_code == 429:
logger.warning("Rate limited")
return None
elif response.status_code >= 500:
logger.error(f"Server error: {response.status_code}")
return None
else:
logger.error(f"Unexpected status code: {response.status_code}")
return None
except requests.exceptions.Timeout:
logger.error(f"Request timeout for {endpoint}")
return None
except requests.exceptions.ConnectionError:
logger.error(f"Connection error for {endpoint}")
return None
except requests.exceptions.RequestException as e:
logger.error(f"Request exception: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error: {e}")
return None
# Usage
client = APIClient("your_token")
data = client.make_request('/coins/latest', method='GET')
if data:
print(f"Success: {data}")
else:
print("Request failed")
Security best practices
Secure token storage
Never hardcode tokens in your source code:
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
# Get token from environment
API_TOKEN = os.getenv('PUMP_FUN_API_TOKEN')
if not API_TOKEN:
raise ValueError("API_TOKEN not found in environment variables")
headers = {
"Authorization": f"Bearer {API_TOKEN}",
"Accept": "application/json"
}
Always validate user input before sending to the API:
import re
from typing import Optional
def validate_mint_address(mint: str) -> bool:
"""
Validate Solana mint address format
"""
# Basic validation: alphanumeric, 32-44 characters
pattern = r'^[1-9A-HJ-NP-Za-km-z]{32,44}$'
return bool(re.match(pattern, mint))
def sanitize_search_term(term: str) -> str:
"""
Sanitize search term to prevent injection
"""
# Remove special characters, limit length
sanitized = re.sub(r'[^a-zA-Z0-9\s-]', '', term)
return sanitized[:100]
# Usage
user_input = "CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump"
if validate_mint_address(user_input):
# Safe to use
coin = fetch_coin(user_input)
else:
print("Invalid mint address")
Use HTTPS only
Always use HTTPS endpoints:
# Good
url = "https://frontend-api-v3.pump.fun/coins/latest"
# Bad - Never use HTTP
# url = "http://frontend-api-v3.pump.fun/coins/latest"
Implement request signing
For webhook forwarding, implement proper request signing:
import hmac
import hashlib
import json
def sign_request(payload: Dict, secret: str) -> str:
"""
Sign request payload with HMAC-SHA256
"""
payload_string = json.dumps(payload, sort_keys=True)
signature = hmac.new(
secret.encode(),
payload_string.encode(),
hashlib.sha256
).hexdigest()
return signature
def verify_request(payload: Dict, signature: str, secret: str) -> bool:
"""
Verify request signature
"""
expected_signature = sign_request(payload, secret)
return hmac.compare_digest(signature, expected_signature)
Use connection pooling
Reuse HTTP connections for better performance:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
class OptimizedAPIClient:
def __init__(self, token: str):
self.session = requests.Session()
# Configure retry strategy
retry_strategy = Retry(
total=3,
status_forcelist=[429, 500, 502, 503, 504],
backoff_factor=1
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=10
)
self.session.mount("https://", adapter)
self.session.headers.update({
"Authorization": f"Bearer {token}",
"Accept": "application/json"
})
def get(self, url: str, params=None):
return self.session.get(url, params=params, timeout=30)
def close(self):
self.session.close()
# Usage
client = OptimizedAPIClient("your_token")
response = client.get("https://frontend-api-v3.pump.fun/coins/latest")
client.close()
Batch requests when possible
Use bulk endpoints instead of individual requests:
# Good - Single bulk request
mints = ["mint1", "mint2", "mint3"]
response = requests.post(
"https://advanced-api-v2.pump.fun/coins/mints",
headers=headers,
json={"mints": mints}
)
# Bad - Multiple individual requests
# for mint in mints:
# response = requests.get(f"https://frontend-api-v3.pump.fun/coins/{mint}")
Summary checklist
Authentication
Store tokens securely in environment variables, never in source code.
Rate limiting
Monitor rate limit headers and implement exponential backoff for 429 errors.
Caching
Use ETag caching and time-based caching to reduce API calls.
Error handling
Implement comprehensive error handling for all status codes and exceptions.
Input validation
Always validate and sanitize user input before sending to the API.
Connection pooling
Use session objects and connection pooling for better performance.
Logging
Log all errors and important events for debugging and monitoring.
Timeouts
Set appropriate timeouts (20-30s) for all requests to prevent hanging.