Polling strategies
Simple polling
The most straightforward approach is to poll endpoints at regular intervals:Python
import requests
import time
def poll_latest_trades(mint, interval=5):
"""
Poll for latest trades every N seconds
"""
url = f"https://frontend-api-v3.pump.fun/trades/all/{mint}"
headers = {
"Authorization": "Bearer <your_token>",
"Accept": "application/json"
}
params = {"limit": 10, "offset": 0, "minimumSize": 0}
last_seen = set()
while True:
try:
response = requests.get(url, headers=headers, params=params)
trades = response.json()
for trade in trades:
trade_id = trade.get('signature')
if trade_id and trade_id not in last_seen:
last_seen.add(trade_id)
print(f"New trade: {trade}")
time.sleep(interval)
except Exception as e:
print(f"Error: {e}")
time.sleep(interval)
# Poll every 5 seconds
poll_latest_trades("CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump", interval=5)
Smart polling with ETag caching
Use ETag headers to reduce bandwidth and avoid processing unchanged data:Python
import requests
import time
def smart_poll_with_etag(url, interval=5):
"""
Poll with ETag caching to reduce bandwidth
"""
headers = {
"Authorization": "Bearer <your_token>",
"Accept": "application/json"
}
etag = None
while True:
try:
# Include If-None-Match header if we have an ETag
if etag:
headers["If-None-Match"] = etag
response = requests.get(url, headers=headers)
# 304 means content hasn't changed
if response.status_code == 304:
print("No changes detected")
elif response.status_code == 200:
# Update ETag for next request
etag = response.headers.get('ETag')
data = response.json()
print(f"Data updated: {data}")
time.sleep(interval)
except Exception as e:
print(f"Error: {e}")
time.sleep(interval)
url = "https://frontend-api-v3.pump.fun/coins/latest"
smart_poll_with_etag(url, interval=10)
ETag caching can significantly reduce bandwidth usage and API load. The server returns a 304 Not Modified response when content hasn’t changed.
Adaptive polling
Adjust polling frequency based on activity level:Python
import requests
import time
class AdaptivePoller:
def __init__(self, url, headers, min_interval=5, max_interval=60):
self.url = url
self.headers = headers
self.min_interval = min_interval
self.max_interval = max_interval
self.current_interval = min_interval
def poll(self, params=None):
"""
Poll with adaptive interval based on activity
"""
last_data = None
while True:
try:
response = requests.get(
self.url,
headers=self.headers,
params=params
)
data = response.json()
# If data changed, decrease interval (poll more frequently)
if data != last_data:
self.current_interval = max(
self.min_interval,
self.current_interval * 0.8
)
print(f"Activity detected! Polling every {self.current_interval}s")
else:
# If no changes, increase interval (poll less frequently)
self.current_interval = min(
self.max_interval,
self.current_interval * 1.2
)
print(f"No activity. Polling every {self.current_interval}s")
last_data = data
time.sleep(self.current_interval)
except Exception as e:
print(f"Error: {e}")
time.sleep(self.current_interval)
# Usage
poller = AdaptivePoller(
url="https://frontend-api-v3.pump.fun/trades/latest",
headers={
"Authorization": "Bearer <your_token>",
"Accept": "application/json"
},
min_interval=3,
max_interval=30
)
poller.poll()
Multi-endpoint monitoring
Monitor multiple endpoints simultaneously using threading:Python
import requests
import threading
import time
from queue import Queue
class MultiEndpointMonitor:
def __init__(self, token):
self.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json"
}
self.update_queue = Queue()
def monitor_endpoint(self, name, url, params, interval):
"""
Monitor a single endpoint in a separate thread
"""
while True:
try:
response = requests.get(url, headers=self.headers, params=params)
data = response.json()
self.update_queue.put({
"endpoint": name,
"data": data,
"timestamp": time.time()
})
except Exception as e:
print(f"Error monitoring {name}: {e}")
time.sleep(interval)
def start_monitoring(self, endpoints):
"""
Start monitoring multiple endpoints
endpoints = [(name, url, params, interval), ...]
"""
for name, url, params, interval in endpoints:
thread = threading.Thread(
target=self.monitor_endpoint,
args=(name, url, params, interval),
daemon=True
)
thread.start()
def process_updates(self):
"""
Process updates from all endpoints
"""
while True:
update = self.update_queue.get()
print(f"Update from {update['endpoint']}: {update['data']}")
# Usage
monitor = MultiEndpointMonitor("your_token")
endpoints = [
(
"latest_trades",
"https://frontend-api-v3.pump.fun/trades/latest",
{},
5
),
(
"latest_coins",
"https://frontend-api-v3.pump.fun/coins/latest",
{},
10
),
(
"specific_coin",
"https://frontend-api-v3.pump.fun/coins/CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump",
{},
8
)
]
monitor.start_monitoring(endpoints)
monitor.process_updates()
When monitoring multiple endpoints, be mindful of rate limits. Distribute your polling intervals to stay within API limits.
Event-driven updates
Implement an event system for handling real-time updates:Python
import requests
import time
from typing import Callable, Dict, List
class EventDrivenPoller:
def __init__(self, token):
self.headers = {
"Authorization": f"Bearer {token}",
"Accept": "application/json"
}
self.listeners: Dict[str, List[Callable]] = {}
def on(self, event: str, callback: Callable):
"""
Register an event listener
"""
if event not in self.listeners:
self.listeners[event] = []
self.listeners[event].append(callback)
def emit(self, event: str, data):
"""
Emit an event to all registered listeners
"""
if event in self.listeners:
for callback in self.listeners[event]:
callback(data)
def poll_trades(self, mint, interval=5):
"""
Poll for trades and emit events
"""
url = f"https://frontend-api-v3.pump.fun/trades/all/{mint}"
params = {"limit": 20, "offset": 0, "minimumSize": 0}
seen = set()
while True:
try:
response = requests.get(url, headers=self.headers, params=params)
trades = response.json()
for trade in trades:
trade_id = trade.get('signature')
if trade_id and trade_id not in seen:
seen.add(trade_id)
self.emit('new_trade', trade)
# Emit specific events based on trade properties
if trade.get('is_buy'):
self.emit('buy', trade)
else:
self.emit('sell', trade)
time.sleep(interval)
except Exception as e:
self.emit('error', str(e))
time.sleep(interval)
# Usage
poller = EventDrivenPoller("your_token")
# Register event handlers
poller.on('new_trade', lambda trade: print(f"New trade: {trade}"))
poller.on('buy', lambda trade: print(f"BUY: {trade.get('amount')}"))
poller.on('sell', lambda trade: print(f"SELL: {trade.get('amount')}"))
poller.on('error', lambda error: print(f"Error occurred: {error}"))
# Start polling
poller.poll_trades("CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump", interval=5)
Best practices
Choose appropriate intervals
Start with 5-10 second intervals for active monitoring, 30-60 seconds for less critical updates.
Implement exponential backoff
When encountering errors or rate limits, increase polling intervals exponentially.
If Pump.fun introduces native WebSocket support in the future, migrate to that for more efficient real-time updates with lower latency and reduced server load.