Overview
The Push Service processes push notifications from RabbitMQ queues and sends them via Firebase Cloud Messaging (FCM). Built with NestJS and TypeScript, it features circuit breaker patterns, exponential backoff retry logic, and comprehensive error handling.
Purpose and Responsibilities
- Queue consumption - Consumes messages from
push.queue via RabbitMQ
- Template rendering - Replaces template variables with actual values
- Push delivery - Sends push notifications via Firebase Admin SDK
- Circuit breaker - Protects against cascading failures with opossum
- Retry logic - Implements exponential backoff for transient failures
- Status tracking - Updates delivery status via RabbitMQ status exchange
- User preference checking - Validates user has opted in to push notifications
Tech Stack
- Framework: NestJS 10.x
- Language: TypeScript 5.x
- Message Broker: RabbitMQ (amqp-connection-manager)
- Push Provider: Firebase Admin SDK
- Circuit Breaker: opossum
- Resilience: Custom RetryService with exponential backoff
- HTTP Client: Axios (for service-to-service communication)
Configuration
Port: 8004 (default 3001)
Environment Variables:
PORT=3001
RABBITMQ_URL=amqp://guest:guest@localhost:5672
FIREBASE_PROJECT_ID=your-project-id
FIREBASE_PRIVATE_KEY=your-private-key
FIREBASE_CLIENT_EMAIL=[email protected]
RabbitMQ Consumption
The service consumes messages from the push.queue bound to the notifications.direct exchange.
Queue Configuration
Exchange: notifications.direct
Queue: push.queue
Routing Key: push
Durable: true
Prefetch: 1 (process one message at a time)
Implementation (main.ts:25-65):
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.RMQ,
options: {
urls: [rabbitmqUrl],
queue: 'push.queue',
queueOptions: {
durable: true,
},
noAck: false,
prefetchCount: 1,
exchange: 'notifications.direct',
exchangeType: 'direct',
socketOptions: {
heartbeatIntervalInSeconds: 60,
reconnectTimeInSeconds: 5,
},
deserializer: {
deserialize: (value: any) => {
try {
const contentStr = value?.content?.toString?.() ?? '';
if (contentStr) {
const parsed = JSON.parse(contentStr);
if (parsed && typeof parsed === 'object' && 'pattern' in parsed && 'data' in parsed) {
return { pattern: parsed.pattern, data: parsed.data };
}
const routingKey = value?.fields?.routingKey || 'push';
return { pattern: routingKey, data: parsed };
}
return { pattern: 'push', data: value };
} catch (err) {
return { pattern: 'push', data: value };
}
},
},
},
});
The custom deserializer handles both NestJS envelope messages and raw JSON payloads, ensuring robust message parsing.
Message Processor
Implementation (push-notification.processor.ts:15-86):
@EventPattern('push')
async handlePushNotification(@Payload() data: any, @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
const routingKey = originalMsg.fields.routingKey;
if (routingKey !== 'push') {
this.logger.warn(`Received message with routing key "${routingKey}" - ignoring.`);
channel.ack(originalMsg);
return;
}
this.logger.log('═══════════════════════════════════════');
this.logger.log('📬 RECEIVED PUSH NOTIFICATION EVENT');
this.logger.log(`🔍 Notification ID: ${data?.notification_id}`);
this.logger.log('═══════════════════════════════════════');
try {
const result = await this.pushNotificationService.sendPushNotification(data);
this.logger.log(`✅ Push notification sent successfully`);
// Update status to delivered
await this.statusUpdateService.updateStatus({
notification_id: data.notification_id,
status: 'delivered',
timestamp: new Date().toISOString(),
});
channel.ack(originalMsg);
return result;
} catch (error) {
this.logger.error(`❌ Failed to deliver notification: ${data.notification_id}`, error?.stack);
// Update status to failed
await this.statusUpdateService.updateStatus({
notification_id: data.notification_id,
status: 'failed',
timestamp: new Date().toISOString(),
error: error?.message,
});
// Decide whether to requeue based on error characteristics
const msg = (error?.message || '').toLowerCase();
const transientHints = ['econnrefused','enotfound','etimedout','econnreset','timeout','503','502','500'];
const is4xx = /status\s+4\d{2}/.test(msg);
const userOptOut = msg.includes('user has disabled push notifications');
const shouldRequeue = transientHints.some(h => msg.includes(h)) && !userOptOut && !is4xx;
if (shouldRequeue) {
channel.nack(originalMsg, false, true);
this.logger.warn(`⚠️ Message rejected and requeued for retry`);
} else {
channel.ack(originalMsg);
this.logger.warn(`⚠️ Message acknowledged without requeue due to non-retriable error`);
}
}
}
Error Classification:
- Transient (requeue): ECONNREFUSED, ETIMEDOUT, 500/502/503
- Permanent (ACK): 4xx errors, user opt-out, invalid tokens
Messages are only requeued for transient errors. Permanent errors are acknowledged to prevent infinite retry loops.
Circuit Breaker Implementation
The service uses the opossum library to implement circuit breaker patterns, protecting against cascading failures.
Circuit Breaker Service (circuit-breaker.service.ts:9-41):
createCircuitBreaker(
name: string,
operation: (...args: any[]) => Promise<any>,
options?: CircuitBreaker.Options
): CircuitBreaker {
const defaultOptions: CircuitBreaker.Options = {
timeout: 3000, // 3 second timeout
errorThresholdPercentage: 50, // Open at 50% failure rate
resetTimeout: 30000, // Try again after 30 seconds
};
const circuitBreaker = new CircuitBreaker(operation, { ...defaultOptions, ...options });
circuitBreaker.on('open', () => {
this.logger.warn(`Circuit breaker ${name} opened`);
});
circuitBreaker.on('close', () => {
this.logger.log(`Circuit breaker ${name} closed`);
});
circuitBreaker.on('halfOpen', () => {
this.logger.log(`Circuit breaker ${name} half-opened`);
});
this.circuitBreakers.set(name, circuitBreaker);
return circuitBreaker;
}
Circuit States:
- Closed - Normal operation, requests pass through
- Open - Failures exceed threshold, requests fail immediately
- Half-Open - After reset timeout, test with one request
Configuration (push-notification.service.ts:39-49):
const pushCircuitBreaker = this.circuitBreakerService.createCircuitBreaker(
"push-sending",
(token, content, metadata) => this.pushClient.sendPush(token, content, metadata),
{
timeout: 10000, // 10 seconds
errorThresholdPercentage: 50, // Open at 50% failure
resetTimeout: 30000, // Retry after 30 seconds
},
);
Circuit breakers prevent overwhelming Firebase when it’s experiencing issues, allowing the system to recover gracefully.
Retry Logic
The service implements exponential backoff retry logic for transient failures.
Retry Service (retry.service.ts:7-36):
async executeWithRetry(
operation: () => Promise<any>,
maxRetries: number = 3,
baseDelay: number = 1000,
onError?: (error: Error, retryCount: number) => void
) {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
const result = await operation();
if (attempt > 0) {
this.logger.log(`Operation succeeded after ${attempt} retries`);
}
return result;
} catch (error) {
if (attempt === maxRetries) {
this.logger.error(`Operation failed after ${maxRetries} retries: ${error.message}`);
throw error;
}
const delay = baseDelay * Math.pow(2, attempt); // Exponential backoff
this.logger.warn(`Attempt ${attempt + 1} failed: ${error.message}. Retrying in ${delay}ms...`);
if (onError) {
onError(error, attempt + 1);
}
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
Retry Schedule:
| Attempt | Delay | Total Time |
|---|
| 1 | 0ms | 0s |
| 2 | 500ms | 0.5s |
| 3 | 1s | 1.5s |
| 4 | 2s | 3.5s |
Usage (push-notification.service.ts:21-25, 52-61):
// Retry user data fetch
const user = await this.retryService.executeWithRetry(
() => this.userService.getUser(data.user.id),
3, // maxRetries
500, // baseDelay in ms
);
// Retry push notification send
const result = await this.retryService.executeWithRetry(
() => pushCircuitBreaker.fire(user.push_token, processedContent, data.metadata),
3, // maxRetries
1000, // baseDelay in ms
);
Push Notification Sending
The service uses Firebase Admin SDK to send push notifications.
Push Service Flow (push-notification.service.ts:18-71):
async sendPushNotification(data: any): Promise<any> {
try {
// 1. Get user data with retry
const user = await this.retryService.executeWithRetry(
() => this.userService.getUser(data.user.id),
3, 500,
);
// 2. Check if user has opted in
if (!user.preferences?.push) {
throw new Error("User has disabled push notifications");
}
// 3. Get template from message
const template = data.template;
// 4. Process template with variables
const processedContent = this.processTemplate(template, data.variables);
// 5. Create circuit breaker
const pushCircuitBreaker = this.circuitBreakerService.createCircuitBreaker(
"push-sending",
(token, content, metadata) => this.pushClient.sendPush(token, content, metadata),
{
timeout: 10000,
errorThresholdPercentage: 50,
resetTimeout: 30000,
},
);
// 6. Send with circuit breaker and retry
const result = await this.retryService.executeWithRetry(
() => pushCircuitBreaker.fire(user.push_token, processedContent, data.metadata),
3, 1000,
);
return result;
} catch (error) {
this.logger.error(`Error sending push notification: ${error.message}`, error);
throw error;
}
}
Template Rendering (push-notification.service.ts:73-83):
private processTemplate(
template: any,
variables: Record<string, any>,
): string {
let content = template.body;
Object.entries(variables).forEach(([key, value]) => {
content = content.replace(new RegExp(`{{${key}}}`, "g"), value);
});
return content;
}
User Preference Checking
The service validates that users have opted in to push notifications before sending.
Preference Check (push-notification.service.ts:28-30):
if (!user.preferences?.push) {
throw new Error("User has disabled push notifications");
}
User Preferences Structure:
{
"preferences": {
"email": true,
"push": false,
"sms": false
}
}
If a user has disabled push notifications, the service throws a permanent error (not retried) and acknowledges the message.
Status Updates
The service publishes status updates to a RabbitMQ status exchange.
Status Update (push-notification.processor.ts:46-50, 63-68):
// Success
await this.statusUpdateService.updateStatus({
notification_id: data.notification_id,
status: 'delivered',
timestamp: new Date().toISOString(),
});
// Failure
await this.statusUpdateService.updateStatus({
notification_id: data.notification_id,
status: 'failed',
timestamp: new Date().toISOString(),
error: error?.message,
});
Health Endpoint
The service exposes a health check endpoint via HTTP.
Endpoint: GET /health
Response (200 OK):
Running the Service
Development
Production
npm run build
npm run start:prod
Docker
docker build -t push-service .
docker run -p 8004:3001 \
-e RABBITMQ_URL=amqp://rabbitmq:5672 \
-e FIREBASE_PROJECT_ID=your-project \
-e FIREBASE_PRIVATE_KEY="-----BEGIN PRIVATE KEY-----\n..." \
-e FIREBASE_CLIENT_EMAIL=firebase-adminsdk@your-project.iam.gserviceaccount.com \
push-service
Firebase Configuration
The service requires Firebase Admin SDK credentials.
Setup Steps:
- Go to Firebase Console → Project Settings → Service Accounts
- Click “Generate new private key”
- Extract credentials from the JSON file:
project_id → FIREBASE_PROJECT_ID
private_key → FIREBASE_PRIVATE_KEY
client_email → FIREBASE_CLIENT_EMAIL
Environment Variables:
FIREBASE_PROJECT_ID=my-project-123
FIREBASE_PRIVATE_KEY="-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhki...\n-----END PRIVATE KEY-----\n"
FIREBASE_CLIENT_EMAIL=[email protected]
Keep your Firebase private key secure. Never commit it to version control. Use environment variables or secret management services.
Error Handling
Permanent Errors (ACK message):
- User has disabled push notifications
- Invalid push token (4xx errors)
- Malformed request
Transient Errors (requeue message):
- Firebase connection timeout
- Network errors (ECONNREFUSED, ETIMEDOUT)
- 5xx server errors
Error Detection (push-notification.processor.ts:70-76):
const transientHints = ['econnrefused','enotfound','etimedout','econnreset','timeout','503','502','500'];
const is4xx = /status\s+4\d{2}/.test(msg);
const userOptOut = msg.includes('user has disabled push notifications');
const shouldRequeue = transientHints.some(h => msg.includes(h)) && !userOptOut && !is4xx;
- Prefetch count: Set to 1 to prevent message hoarding
- Circuit breakers: Prevent cascading failures to Firebase
- Retry logic: Exponential backoff reduces Firebase load
- Parallel processing: Process one message at a time (configurable)
- Connection pooling: RabbitMQ and HTTP clients use connection managers
Logging and Observability
Log Levels:
log - Normal operations (message received, push sent)
warn - Retry attempts, circuit breaker state changes
error - Push delivery failures
Structured Logging:
this.logger.log('═══════════════════════════════════════');
this.logger.log('📬 RECEIVED PUSH NOTIFICATION EVENT');
this.logger.log('═══════════════════════════════════════');
this.logger.log(`📦 Raw message: ${originalMsg.content.toString()}`);
this.logger.log(`🏷️ Routing key: ${originalMsg.fields.routingKey}`);
this.logger.log(`🔍 Notification ID: ${data?.notification_id}`);