Bulk operations allow you to process multiple items in a single API request, improving performance and reducing the number of API calls needed.
Bulk coin queries
Fetch multiple coins by mints
Retrieve detailed information for multiple coins using their mint addresses:
curl -X POST "https://advanced-api-v2.pump.fun/coins/mints" \
-H "Authorization: Bearer <your_token>" \
-H "Content-Type: application/json" \
-d '{
"mints": [
"CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump",
"7GCihgDB8fe6KNjn2MYtkzZcRjQy3t9GHdC8uHYmW2hr",
"3nBq1K6Xp7kYHqKBJEhj8r5QGAqVQYuKgVqZHpump"
]
}'
Bulk endpoints can handle up to 100 items per request. For larger datasets, split into multiple batches.
Get metadata for multiple coins efficiently:
curl -X POST "https://advanced-api-v2.pump.fun/coins/metadatas" \
-H "Authorization: Bearer <your_token>" \
-H "Content-Type: application/json" \
-d '{
"mints": [
"CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump",
"7GCihgDB8fe6KNjn2MYtkzZcRjQy3t9GHdC8uHYmW2hr"
]
}'
Bulk moderation operations
Bulk NSFW marking
Mark multiple items as NSFW in a single request:
curl -X POST "https://frontend-api-v3.pump.fun/moderation/bulk-nsfw" \
-H "Authorization: Bearer <your_token>" \
-H "Content-Type: application/json" \
-d '{
"mints": [
"CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump",
"7GCihgDB8fe6KNjn2MYtkzZcRjQy3t9GHdC8uHYmW2hr"
]
}'
Bulk moderation endpoints typically require admin privileges. Ensure your account has the necessary permissions.
Bulk hide items
Hide multiple items from public view:
curl -X POST "https://frontend-api-v3.pump.fun/moderation/bulk-hidden" \
-H "Authorization: Bearer <your_token>" \
-H "Content-Type: application/json" \
-d '{
"ids": [123, 456, 789]
}'
Bulk ban items
Ban multiple items or users:
curl -X POST "https://frontend-api-v3.pump.fun/moderation/bulk-ban" \
-H "Authorization: Bearer <your_token>" \
-H "Content-Type: application/json" \
-d '{
"ids": [123, 456, 789],
"reason": "Spam"
}'
Batch processing patterns
Process items in batches
Efficiently process large lists by batching:
import requests
from typing import List, Dict, Any
def batch_items(items: List[str], batch_size: int = 50):
"""
Split items into batches of specified size
"""
for i in range(0, len(items), batch_size):
yield items[i:i + batch_size]
def fetch_coins_in_batches(mints: List[str], batch_size: int = 50) -> List[Dict[Any, Any]]:
"""
Fetch coin data for large lists of mints
"""
url = "https://advanced-api-v2.pump.fun/coins/mints"
headers = {
"Authorization": "Bearer <your_token>",
"Content-Type": "application/json"
}
all_coins = []
for batch in batch_items(mints, batch_size):
data = {"mints": batch}
response = requests.post(url, headers=headers, json=data)
if response.status_code == 200:
coins = response.json()
all_coins.extend(coins)
else:
print(f"Error fetching batch: {response.status_code}")
return all_coins
# Usage
mints = [
"CxLHsqvjfisgPAGwcZJsTn6nzZXJLxmVYM7v9pump",
"7GCihgDB8fe6KNjn2MYtkzZcRjQy3t9GHdC8uHYmW2hr",
# ... many more mints
]
coins = fetch_coins_in_batches(mints, batch_size=50)
print(f"Fetched {len(coins)} coins from {len(mints)} mints")
Parallel batch processing
Process multiple batches concurrently for better performance:
import requests
import concurrent.futures
from typing import List, Dict, Any
def fetch_batch(batch: List[str], headers: Dict[str, str]) -> List[Dict[Any, Any]]:
"""
Fetch a single batch of coins
"""
url = "https://advanced-api-v2.pump.fun/coins/mints"
data = {"mints": batch}
try:
response = requests.post(url, headers=headers, json=data, timeout=10)
if response.status_code == 200:
return response.json()
except Exception as e:
print(f"Error fetching batch: {e}")
return []
def fetch_coins_parallel(mints: List[str], batch_size: int = 50, max_workers: int = 5):
"""
Fetch coins using parallel batch requests
"""
headers = {
"Authorization": "Bearer <your_token>",
"Content-Type": "application/json"
}
# Split into batches
batches = [mints[i:i + batch_size] for i in range(0, len(mints), batch_size)]
all_coins = []
# Process batches in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_batch = {
executor.submit(fetch_batch, batch, headers): batch
for batch in batches
}
for future in concurrent.futures.as_completed(future_to_batch):
coins = future.result()
all_coins.extend(coins)
return all_coins
# Usage
mints = ["mint1", "mint2", "mint3"] # ... many mints
coins = fetch_coins_parallel(mints, batch_size=50, max_workers=5)
When using parallel processing, be mindful of rate limits. Limit the number of concurrent workers to avoid hitting API limits.
Error handling in bulk operations
Implement robust error handling for bulk operations:
import requests
import time
from typing import List, Dict, Any, Optional
class BulkOperationHandler:
def __init__(self, token: str, max_retries: int = 3):
self.token = token
self.max_retries = max_retries
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
def fetch_coins_with_retry(self, mints: List[str]) -> Optional[List[Dict[Any, Any]]]:
"""
Fetch coins with exponential backoff retry
"""
url = "https://advanced-api-v2.pump.fun/coins/mints"
data = {"mints": mints}
for attempt in range(self.max_retries):
try:
response = requests.post(url, headers=self.headers, json=data, timeout=30)
if response.status_code == 200:
return response.json()
elif response.status_code == 429: # Rate limited
wait_time = 2 ** attempt
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
else:
print(f"Error {response.status_code}: {response.text}")
return None
except requests.exceptions.Timeout:
print(f"Timeout on attempt {attempt + 1}")
if attempt < self.max_retries - 1:
time.sleep(2 ** attempt)
except Exception as e:
print(f"Error: {e}")
return None
return None
def process_large_list(self, mints: List[str], batch_size: int = 50):
"""
Process a large list with proper error handling
"""
successful = []
failed = []
batches = [mints[i:i + batch_size] for i in range(0, len(mints), batch_size)]
for i, batch in enumerate(batches, 1):
print(f"Processing batch {i}/{len(batches)}...")
result = self.fetch_coins_with_retry(batch)
if result:
successful.extend(result)
else:
failed.extend(batch)
# Rate limit friendly delay between batches
time.sleep(0.5)
return {
"successful": successful,
"failed": failed,
"success_rate": len(successful) / len(mints) * 100
}
# Usage
handler = BulkOperationHandler("your_token")
mints = ["mint1", "mint2", "mint3"] # ... many mints
result = handler.process_large_list(mints, batch_size=50)
print(f"Success rate: {result['success_rate']:.2f}%")
print(f"Failed items: {len(result['failed'])}")
Best practices
Optimize batch size
Use batch sizes between 25-100 items for optimal performance. Larger batches may timeout.
Implement retry logic
Always implement exponential backoff for failed requests, especially for rate limit errors.
Validate input
Validate and sanitize input arrays before sending to avoid malformed requests.
Track failures
Keep track of failed items to retry them separately or log for investigation.
Use parallel processing wisely
Limit concurrent requests to 3-5 to avoid overwhelming the API or triggering rate limits.
Monitor response times
Track response times and adjust batch sizes if requests consistently timeout.