Skip to main content

Overview

The Push Service processes push notifications from RabbitMQ queues and sends them via Firebase Cloud Messaging (FCM). Built with NestJS and TypeScript, it features circuit breaker patterns, exponential backoff retry logic, and comprehensive error handling.

Purpose and Responsibilities

  • Queue consumption - Consumes messages from push.queue via RabbitMQ
  • Template rendering - Replaces template variables with actual values
  • Push delivery - Sends push notifications via Firebase Admin SDK
  • Circuit breaker - Protects against cascading failures with opossum
  • Retry logic - Implements exponential backoff for transient failures
  • Status tracking - Updates delivery status via RabbitMQ status exchange
  • User preference checking - Validates user has opted in to push notifications

Tech Stack

  • Framework: NestJS 10.x
  • Language: TypeScript 5.x
  • Message Broker: RabbitMQ (amqp-connection-manager)
  • Push Provider: Firebase Admin SDK
  • Circuit Breaker: opossum
  • Resilience: Custom RetryService with exponential backoff
  • HTTP Client: Axios (for service-to-service communication)

Configuration

Port: 8004 (default 3001) Environment Variables:
PORT=3001
RABBITMQ_URL=amqp://guest:guest@localhost:5672
FIREBASE_PROJECT_ID=your-project-id
FIREBASE_PRIVATE_KEY=your-private-key
FIREBASE_CLIENT_EMAIL=[email protected]

RabbitMQ Consumption

The service consumes messages from the push.queue bound to the notifications.direct exchange.

Queue Configuration

Exchange: notifications.direct Queue: push.queue Routing Key: push Durable: true Prefetch: 1 (process one message at a time) Implementation (main.ts:25-65):
app.connectMicroservice<MicroserviceOptions>({
  transport: Transport.RMQ,
  options: {
    urls: [rabbitmqUrl],
    queue: 'push.queue',
    queueOptions: {
      durable: true,
    },
    noAck: false,
    prefetchCount: 1,
    exchange: 'notifications.direct',
    exchangeType: 'direct',
    socketOptions: {
      heartbeatIntervalInSeconds: 60,
      reconnectTimeInSeconds: 5,
    },
    deserializer: {
      deserialize: (value: any) => {
        try {
          const contentStr = value?.content?.toString?.() ?? '';
          if (contentStr) {
            const parsed = JSON.parse(contentStr);
            if (parsed && typeof parsed === 'object' && 'pattern' in parsed && 'data' in parsed) {
              return { pattern: parsed.pattern, data: parsed.data };
            }
            const routingKey = value?.fields?.routingKey || 'push';
            return { pattern: routingKey, data: parsed };
          }
          return { pattern: 'push', data: value };
        } catch (err) {
          return { pattern: 'push', data: value };
        }
      },
    },
  },
});
The custom deserializer handles both NestJS envelope messages and raw JSON payloads, ensuring robust message parsing.

Message Processor

Implementation (push-notification.processor.ts:15-86):
@EventPattern('push')
async handlePushNotification(@Payload() data: any, @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();
  const routingKey = originalMsg.fields.routingKey;

  if (routingKey !== 'push') {
    this.logger.warn(`Received message with routing key "${routingKey}" - ignoring.`);
    channel.ack(originalMsg);
    return;
  }
  
  this.logger.log('═══════════════════════════════════════');
  this.logger.log('📬 RECEIVED PUSH NOTIFICATION EVENT');
  this.logger.log(`🔍 Notification ID: ${data?.notification_id}`);
  this.logger.log('═══════════════════════════════════════');

  try {
    const result = await this.pushNotificationService.sendPushNotification(data);
    this.logger.log(`✅ Push notification sent successfully`);

    // Update status to delivered
    await this.statusUpdateService.updateStatus({
      notification_id: data.notification_id,
      status: 'delivered',
      timestamp: new Date().toISOString(),
    });

    channel.ack(originalMsg);
    return result;
  } catch (error) {
    this.logger.error(`❌ Failed to deliver notification: ${data.notification_id}`, error?.stack);

    // Update status to failed
    await this.statusUpdateService.updateStatus({
      notification_id: data.notification_id,
      status: 'failed',
      timestamp: new Date().toISOString(),
      error: error?.message,
    });

    // Decide whether to requeue based on error characteristics
    const msg = (error?.message || '').toLowerCase();
    const transientHints = ['econnrefused','enotfound','etimedout','econnreset','timeout','503','502','500'];
    const is4xx = /status\s+4\d{2}/.test(msg);
    const userOptOut = msg.includes('user has disabled push notifications');
    const shouldRequeue = transientHints.some(h => msg.includes(h)) && !userOptOut && !is4xx;

    if (shouldRequeue) {
      channel.nack(originalMsg, false, true);
      this.logger.warn(`⚠️ Message rejected and requeued for retry`);
    } else {
      channel.ack(originalMsg);
      this.logger.warn(`⚠️ Message acknowledged without requeue due to non-retriable error`);
    }
  }
}
Error Classification:
  • Transient (requeue): ECONNREFUSED, ETIMEDOUT, 500/502/503
  • Permanent (ACK): 4xx errors, user opt-out, invalid tokens
Messages are only requeued for transient errors. Permanent errors are acknowledged to prevent infinite retry loops.

Circuit Breaker Implementation

The service uses the opossum library to implement circuit breaker patterns, protecting against cascading failures. Circuit Breaker Service (circuit-breaker.service.ts:9-41):
createCircuitBreaker(
  name: string,
  operation: (...args: any[]) => Promise<any>,
  options?: CircuitBreaker.Options
): CircuitBreaker {
  const defaultOptions: CircuitBreaker.Options = {
    timeout: 3000,                // 3 second timeout
    errorThresholdPercentage: 50, // Open at 50% failure rate
    resetTimeout: 30000,          // Try again after 30 seconds
  };

  const circuitBreaker = new CircuitBreaker(operation, { ...defaultOptions, ...options });

  circuitBreaker.on('open', () => {
    this.logger.warn(`Circuit breaker ${name} opened`);
  });

  circuitBreaker.on('close', () => {
    this.logger.log(`Circuit breaker ${name} closed`);
  });

  circuitBreaker.on('halfOpen', () => {
    this.logger.log(`Circuit breaker ${name} half-opened`);
  });

  this.circuitBreakers.set(name, circuitBreaker);
  return circuitBreaker;
}
Circuit States:
  • Closed - Normal operation, requests pass through
  • Open - Failures exceed threshold, requests fail immediately
  • Half-Open - After reset timeout, test with one request
Configuration (push-notification.service.ts:39-49):
const pushCircuitBreaker = this.circuitBreakerService.createCircuitBreaker(
  "push-sending",
  (token, content, metadata) => this.pushClient.sendPush(token, content, metadata),
  {
    timeout: 10000,               // 10 seconds
    errorThresholdPercentage: 50, // Open at 50% failure
    resetTimeout: 30000,          // Retry after 30 seconds
  },
);
Circuit breakers prevent overwhelming Firebase when it’s experiencing issues, allowing the system to recover gracefully.

Retry Logic

The service implements exponential backoff retry logic for transient failures. Retry Service (retry.service.ts:7-36):
async executeWithRetry(
  operation: () => Promise<any>,
  maxRetries: number = 3,
  baseDelay: number = 1000,
  onError?: (error: Error, retryCount: number) => void
) {
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      const result = await operation();
      if (attempt > 0) {
        this.logger.log(`Operation succeeded after ${attempt} retries`);
      }
      return result;
    } catch (error) {
      if (attempt === maxRetries) {
        this.logger.error(`Operation failed after ${maxRetries} retries: ${error.message}`);
        throw error;
      }

      const delay = baseDelay * Math.pow(2, attempt); // Exponential backoff
      this.logger.warn(`Attempt ${attempt + 1} failed: ${error.message}. Retrying in ${delay}ms...`);

      if (onError) {
        onError(error, attempt + 1);
      }

      await new Promise(resolve => setTimeout(resolve, delay));
    }
  }
}
Retry Schedule:
AttemptDelayTotal Time
10ms0s
2500ms0.5s
31s1.5s
42s3.5s
Usage (push-notification.service.ts:21-25, 52-61):
// Retry user data fetch
const user = await this.retryService.executeWithRetry(
  () => this.userService.getUser(data.user.id),
  3,   // maxRetries
  500, // baseDelay in ms
);

// Retry push notification send
const result = await this.retryService.executeWithRetry(
  () => pushCircuitBreaker.fire(user.push_token, processedContent, data.metadata),
  3,    // maxRetries
  1000, // baseDelay in ms
);

Push Notification Sending

The service uses Firebase Admin SDK to send push notifications. Push Service Flow (push-notification.service.ts:18-71):
async sendPushNotification(data: any): Promise<any> {
  try {
    // 1. Get user data with retry
    const user = await this.retryService.executeWithRetry(
      () => this.userService.getUser(data.user.id),
      3, 500,
    );

    // 2. Check if user has opted in
    if (!user.preferences?.push) {
      throw new Error("User has disabled push notifications");
    }

    // 3. Get template from message
    const template = data.template;

    // 4. Process template with variables
    const processedContent = this.processTemplate(template, data.variables);

    // 5. Create circuit breaker
    const pushCircuitBreaker = this.circuitBreakerService.createCircuitBreaker(
      "push-sending",
      (token, content, metadata) => this.pushClient.sendPush(token, content, metadata),
      {
        timeout: 10000,
        errorThresholdPercentage: 50,
        resetTimeout: 30000,
      },
    );

    // 6. Send with circuit breaker and retry
    const result = await this.retryService.executeWithRetry(
      () => pushCircuitBreaker.fire(user.push_token, processedContent, data.metadata),
      3, 1000,
    );

    return result;
  } catch (error) {
    this.logger.error(`Error sending push notification: ${error.message}`, error);
    throw error;
  }
}
Template Rendering (push-notification.service.ts:73-83):
private processTemplate(
  template: any,
  variables: Record<string, any>,
): string {
  let content = template.body;
  Object.entries(variables).forEach(([key, value]) => {
    content = content.replace(new RegExp(`{{${key}}}`, "g"), value);
  });
  return content;
}

User Preference Checking

The service validates that users have opted in to push notifications before sending. Preference Check (push-notification.service.ts:28-30):
if (!user.preferences?.push) {
  throw new Error("User has disabled push notifications");
}
User Preferences Structure:
{
  "preferences": {
    "email": true,
    "push": false,
    "sms": false
  }
}
If a user has disabled push notifications, the service throws a permanent error (not retried) and acknowledges the message.

Status Updates

The service publishes status updates to a RabbitMQ status exchange. Status Update (push-notification.processor.ts:46-50, 63-68):
// Success
await this.statusUpdateService.updateStatus({
  notification_id: data.notification_id,
  status: 'delivered',
  timestamp: new Date().toISOString(),
});

// Failure
await this.statusUpdateService.updateStatus({
  notification_id: data.notification_id,
  status: 'failed',
  timestamp: new Date().toISOString(),
  error: error?.message,
});

Health Endpoint

The service exposes a health check endpoint via HTTP. Endpoint: GET /health Response (200 OK):
{
  "status": "ok"
}

Running the Service

Development

npm run start:dev

Production

npm run build
npm run start:prod

Docker

docker build -t push-service .
docker run -p 8004:3001 \
  -e RABBITMQ_URL=amqp://rabbitmq:5672 \
  -e FIREBASE_PROJECT_ID=your-project \
  -e FIREBASE_PRIVATE_KEY="-----BEGIN PRIVATE KEY-----\n..." \
  -e FIREBASE_CLIENT_EMAIL=firebase-adminsdk@your-project.iam.gserviceaccount.com \
  push-service

Firebase Configuration

The service requires Firebase Admin SDK credentials. Setup Steps:
  1. Go to Firebase Console → Project Settings → Service Accounts
  2. Click “Generate new private key”
  3. Extract credentials from the JSON file:
    • project_idFIREBASE_PROJECT_ID
    • private_keyFIREBASE_PRIVATE_KEY
    • client_emailFIREBASE_CLIENT_EMAIL
Environment Variables:
FIREBASE_PROJECT_ID=my-project-123
FIREBASE_PRIVATE_KEY="-----BEGIN PRIVATE KEY-----\nMIIEvQIBADANBgkqhki...\n-----END PRIVATE KEY-----\n"
FIREBASE_CLIENT_EMAIL=[email protected]
Keep your Firebase private key secure. Never commit it to version control. Use environment variables or secret management services.

Error Handling

Permanent Errors (ACK message):
  • User has disabled push notifications
  • Invalid push token (4xx errors)
  • Malformed request
Transient Errors (requeue message):
  • Firebase connection timeout
  • Network errors (ECONNREFUSED, ETIMEDOUT)
  • 5xx server errors
Error Detection (push-notification.processor.ts:70-76):
const transientHints = ['econnrefused','enotfound','etimedout','econnreset','timeout','503','502','500'];
const is4xx = /status\s+4\d{2}/.test(msg);
const userOptOut = msg.includes('user has disabled push notifications');
const shouldRequeue = transientHints.some(h => msg.includes(h)) && !userOptOut && !is4xx;

Performance Considerations

  • Prefetch count: Set to 1 to prevent message hoarding
  • Circuit breakers: Prevent cascading failures to Firebase
  • Retry logic: Exponential backoff reduces Firebase load
  • Parallel processing: Process one message at a time (configurable)
  • Connection pooling: RabbitMQ and HTTP clients use connection managers

Logging and Observability

Log Levels:
  • log - Normal operations (message received, push sent)
  • warn - Retry attempts, circuit breaker state changes
  • error - Push delivery failures
Structured Logging:
this.logger.log('═══════════════════════════════════════');
this.logger.log('📬 RECEIVED PUSH NOTIFICATION EVENT');
this.logger.log('═══════════════════════════════════════');
this.logger.log(`📦 Raw message: ${originalMsg.content.toString()}`);
this.logger.log(`🏷️  Routing key: ${originalMsg.fields.routingKey}`);
this.logger.log(`🔍 Notification ID: ${data?.notification_id}`);

Build docs developers (and LLMs) love