Skip to main content
While the Pump.fun API doesn’t currently expose dedicated WebSocket endpoints in the documented specification, you can implement real-time functionality using efficient polling strategies.

Polling strategies

Simple polling

The most straightforward approach is to poll endpoints at regular intervals:
Python
import requests
import time

def poll_latest_trades(mint, interval=5):
    """
    Poll for latest trades every N seconds
    """
    url = f"https://frontend-api-v3.pump.fun/trades/all/{mint}"
    headers = {
        "Authorization": "Bearer <your_token>",
        "Accept": "application/json"
    }
    params = {"limit": 10, "offset": 0, "minimumSize": 0}
    
    last_seen = set()
    
    while True:
        try:
            response = requests.get(url, headers=headers, params=params)
            trades = response.json()
            
            for trade in trades:
                trade_id = trade.get('signature')
                if trade_id and trade_id not in last_seen:
                    last_seen.add(trade_id)
                    print(f"New trade: {trade}")
            
            time.sleep(interval)
        except Exception as e:
            print(f"Error: {e}")
            time.sleep(interval)

# Poll every 5 seconds
poll_latest_trades("CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump", interval=5)

Smart polling with ETag caching

Use ETag headers to reduce bandwidth and avoid processing unchanged data:
Python
import requests
import time

def smart_poll_with_etag(url, interval=5):
    """
    Poll with ETag caching to reduce bandwidth
    """
    headers = {
        "Authorization": "Bearer <your_token>",
        "Accept": "application/json"
    }
    etag = None
    
    while True:
        try:
            # Include If-None-Match header if we have an ETag
            if etag:
                headers["If-None-Match"] = etag
            
            response = requests.get(url, headers=headers)
            
            # 304 means content hasn't changed
            if response.status_code == 304:
                print("No changes detected")
            elif response.status_code == 200:
                # Update ETag for next request
                etag = response.headers.get('ETag')
                data = response.json()
                print(f"Data updated: {data}")
            
            time.sleep(interval)
        except Exception as e:
            print(f"Error: {e}")
            time.sleep(interval)

url = "https://frontend-api-v3.pump.fun/coins/latest"
smart_poll_with_etag(url, interval=10)
ETag caching can significantly reduce bandwidth usage and API load. The server returns a 304 Not Modified response when content hasn’t changed.

Adaptive polling

Adjust polling frequency based on activity level:
Python
import requests
import time

class AdaptivePoller:
    def __init__(self, url, headers, min_interval=5, max_interval=60):
        self.url = url
        self.headers = headers
        self.min_interval = min_interval
        self.max_interval = max_interval
        self.current_interval = min_interval
        
    def poll(self, params=None):
        """
        Poll with adaptive interval based on activity
        """
        last_data = None
        
        while True:
            try:
                response = requests.get(
                    self.url, 
                    headers=self.headers, 
                    params=params
                )
                data = response.json()
                
                # If data changed, decrease interval (poll more frequently)
                if data != last_data:
                    self.current_interval = max(
                        self.min_interval,
                        self.current_interval * 0.8
                    )
                    print(f"Activity detected! Polling every {self.current_interval}s")
                else:
                    # If no changes, increase interval (poll less frequently)
                    self.current_interval = min(
                        self.max_interval,
                        self.current_interval * 1.2
                    )
                    print(f"No activity. Polling every {self.current_interval}s")
                
                last_data = data
                time.sleep(self.current_interval)
                
            except Exception as e:
                print(f"Error: {e}")
                time.sleep(self.current_interval)

# Usage
poller = AdaptivePoller(
    url="https://frontend-api-v3.pump.fun/trades/latest",
    headers={
        "Authorization": "Bearer <your_token>",
        "Accept": "application/json"
    },
    min_interval=3,
    max_interval=30
)
poller.poll()

Multi-endpoint monitoring

Monitor multiple endpoints simultaneously using threading:
Python
import requests
import threading
import time
from queue import Queue

class MultiEndpointMonitor:
    def __init__(self, token):
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json"
        }
        self.update_queue = Queue()
    
    def monitor_endpoint(self, name, url, params, interval):
        """
        Monitor a single endpoint in a separate thread
        """
        while True:
            try:
                response = requests.get(url, headers=self.headers, params=params)
                data = response.json()
                self.update_queue.put({
                    "endpoint": name,
                    "data": data,
                    "timestamp": time.time()
                })
            except Exception as e:
                print(f"Error monitoring {name}: {e}")
            
            time.sleep(interval)
    
    def start_monitoring(self, endpoints):
        """
        Start monitoring multiple endpoints
        endpoints = [(name, url, params, interval), ...]
        """
        for name, url, params, interval in endpoints:
            thread = threading.Thread(
                target=self.monitor_endpoint,
                args=(name, url, params, interval),
                daemon=True
            )
            thread.start()
    
    def process_updates(self):
        """
        Process updates from all endpoints
        """
        while True:
            update = self.update_queue.get()
            print(f"Update from {update['endpoint']}: {update['data']}")

# Usage
monitor = MultiEndpointMonitor("your_token")

endpoints = [
    (
        "latest_trades",
        "https://frontend-api-v3.pump.fun/trades/latest",
        {},
        5
    ),
    (
        "latest_coins",
        "https://frontend-api-v3.pump.fun/coins/latest",
        {},
        10
    ),
    (
        "specific_coin",
        "https://frontend-api-v3.pump.fun/coins/CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump",
        {},
        8
    )
]

monitor.start_monitoring(endpoints)
monitor.process_updates()
When monitoring multiple endpoints, be mindful of rate limits. Distribute your polling intervals to stay within API limits.

Event-driven updates

Implement an event system for handling real-time updates:
Python
import requests
import time
from typing import Callable, Dict, List

class EventDrivenPoller:
    def __init__(self, token):
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Accept": "application/json"
        }
        self.listeners: Dict[str, List[Callable]] = {}
    
    def on(self, event: str, callback: Callable):
        """
        Register an event listener
        """
        if event not in self.listeners:
            self.listeners[event] = []
        self.listeners[event].append(callback)
    
    def emit(self, event: str, data):
        """
        Emit an event to all registered listeners
        """
        if event in self.listeners:
            for callback in self.listeners[event]:
                callback(data)
    
    def poll_trades(self, mint, interval=5):
        """
        Poll for trades and emit events
        """
        url = f"https://frontend-api-v3.pump.fun/trades/all/{mint}"
        params = {"limit": 20, "offset": 0, "minimumSize": 0}
        seen = set()
        
        while True:
            try:
                response = requests.get(url, headers=self.headers, params=params)
                trades = response.json()
                
                for trade in trades:
                    trade_id = trade.get('signature')
                    if trade_id and trade_id not in seen:
                        seen.add(trade_id)
                        self.emit('new_trade', trade)
                        
                        # Emit specific events based on trade properties
                        if trade.get('is_buy'):
                            self.emit('buy', trade)
                        else:
                            self.emit('sell', trade)
                
                time.sleep(interval)
            except Exception as e:
                self.emit('error', str(e))
                time.sleep(interval)

# Usage
poller = EventDrivenPoller("your_token")

# Register event handlers
poller.on('new_trade', lambda trade: print(f"New trade: {trade}"))
poller.on('buy', lambda trade: print(f"BUY: {trade.get('amount')}"))
poller.on('sell', lambda trade: print(f"SELL: {trade.get('amount')}"))
poller.on('error', lambda error: print(f"Error occurred: {error}"))

# Start polling
poller.poll_trades("CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump", interval=5)

Best practices

1

Choose appropriate intervals

Start with 5-10 second intervals for active monitoring, 30-60 seconds for less critical updates.
2

Implement exponential backoff

When encountering errors or rate limits, increase polling intervals exponentially.
3

Use ETag caching

Always include If-None-Match headers when available to reduce bandwidth.
4

Monitor rate limits

Track x-ratelimit-remaining headers and adjust polling behavior accordingly.
5

Deduplicate data

Maintain a set of seen IDs to avoid processing duplicate data.
6

Handle errors gracefully

Implement proper error handling and continue polling even after failures.
If Pump.fun introduces native WebSocket support in the future, migrate to that for more efficient real-time updates with lower latency and reduced server load.

Build docs developers (and LLMs) love