Overview
Redis pub/sub enables real-time messaging between different parts of your application. The Upstash Redis SDK provides a Subscriber class that handles channel subscriptions and message delivery over HTTP using Server-Sent Events (SSE).
Pub/sub in Upstash Redis uses HTTP streaming with Server-Sent Events, not the traditional Redis protocol. This makes it compatible with serverless and edge environments.
Basic Usage
Subscribe to a Channel
import { Redis } from '@upstash/redis';
const redis = new Redis({
url: process.env.UPSTASH_REDIS_REST_URL!,
token: process.env.UPSTASH_REDIS_REST_TOKEN!,
});
// Subscribe to a single channel
const subscriber = redis.subscribe<string>('notifications');
subscriber.on('message', (data) => {
console.log('Received:', data.message);
console.log('Channel:', data.channel);
});
Subscribe to Multiple Channels
const subscriber = redis.subscribe<string>([
'channel1',
'channel2',
'channel3'
]);
subscriber.on('message', (data) => {
console.log(`Message from ${data.channel}: ${data.message}`);
});
Publishing Messages
// Publish to a channel
const receiverCount = await redis.publish('notifications', 'Hello World');
console.log(`Message delivered to ${receiverCount} subscribers`);
// Publish complex objects (automatically serialized)
await redis.publish('events', {
type: 'user.login',
userId: '123',
timestamp: Date.now(),
});
Type Safety
Specify the message type when creating a subscriber:
interface ChatMessage {
user: string;
message: string;
timestamp: number;
}
const subscriber = redis.subscribe<ChatMessage>('chat:room1');
subscriber.on('message', (data) => {
// data.message is typed as ChatMessage
console.log(`${data.message.user}: ${data.message.message}`);
});
Event Listeners
Message Events
Listen for all messages across subscribed channels:
subscriber.on('message', (data) => {
console.log('Channel:', data.channel);
console.log('Message:', data.message);
});
Channel-Specific Events
Listen to messages from a specific channel:
const subscriber = redis.subscribe(['news', 'sports', 'weather']);
subscriber.on('message:news', (data) => {
console.log('News update:', data.message);
});
subscriber.on('message:sports', (data) => {
console.log('Sports update:', data.message);
});
Subscription Events
Handle subscription lifecycle events:
subscriber.on('subscribe', (count) => {
console.log(`Subscribed. Total subscriptions: ${count}`);
});
subscriber.on('unsubscribe', (count) => {
console.log(`Unsubscribed. Remaining subscriptions: ${count}`);
});
Error Events
Handle errors during subscription:
subscriber.on('error', (error) => {
console.error('Subscription error:', error);
});
Pattern Subscriptions
Subscribe to channels matching a pattern using psubscribe:
// Subscribe to all channels starting with 'user:'
const subscriber = redis.psubscribe<string>('user:*');
subscriber.on('pmessage', (data) => {
console.log('Pattern:', data.pattern);
console.log('Channel:', data.channel);
console.log('Message:', data.message);
});
Multiple Patterns
const subscriber = redis.psubscribe([
'user:*',
'order:*',
'payment:*'
]);
subscriber.on('pmessage', (data) => {
if (data.pattern === 'user:*') {
console.log('User event:', data.message);
} else if (data.pattern === 'order:*') {
console.log('Order event:', data.message);
}
});
Pattern-Specific Events
const subscriber = redis.psubscribe('notification:*');
subscriber.on('pmessage:notification:*', (data) => {
console.log(`Notification on ${data.channel}:`, data.message);
});
Managing Subscriptions
Unsubscribe from Channels
const subscriber = redis.subscribe(['channel1', 'channel2', 'channel3']);
// Unsubscribe from specific channels
await subscriber.unsubscribe(['channel1']);
// Unsubscribe from all channels
await subscriber.unsubscribe();
Get Subscribed Channels
const channels = subscriber.getSubscribedChannels();
console.log('Subscribed to:', channels);
Remove All Listeners
subscriber.removeAllListeners();
Practical Examples
Real-Time Chat
interface ChatMessage {
user: string;
message: string;
timestamp: number;
}
const chatRoom = 'room:lobby';
// Subscribe to chat messages
const subscriber = redis.subscribe<ChatMessage>(chatRoom);
subscriber.on('message', (data) => {
const msg = data.message;
console.log(`[${new Date(msg.timestamp).toLocaleTimeString()}] ${msg.user}: ${msg.message}`);
});
subscriber.on('error', (error) => {
console.error('Chat error:', error);
});
// Send a chat message
const sendMessage = async (user: string, message: string) => {
await redis.publish(chatRoom, {
user,
message,
timestamp: Date.now(),
});
};
await sendMessage('Alice', 'Hello everyone!');
await sendMessage('Bob', 'Hi Alice!');
Notification System
interface Notification {
type: 'info' | 'warning' | 'error';
title: string;
message: string;
}
class NotificationService {
private subscriber;
constructor(private redis: Redis, private userId: string) {
// Subscribe to user-specific notifications
this.subscriber = redis.subscribe<Notification>(`notifications:${userId}`);
this.setupListeners();
}
private setupListeners() {
this.subscriber.on('message', (data) => {
const notification = data.message;
this.handleNotification(notification);
});
this.subscriber.on('error', (error) => {
console.error('Notification error:', error);
});
}
private handleNotification(notification: Notification) {
const icon = notification.type === 'error' ? '❌' :
notification.type === 'warning' ? '⚠️' : 'ℹ️';
console.log(`${icon} ${notification.title}: ${notification.message}`);
}
async sendNotification(targetUserId: string, notification: Notification) {
await this.redis.publish(`notifications:${targetUserId}`, notification);
}
async cleanup() {
await this.subscriber.unsubscribe();
}
}
// Usage
const notificationService = new NotificationService(redis, 'user123');
await notificationService.sendNotification('user123', {
type: 'info',
title: 'New Message',
message: 'You have a new message from Bob',
});
Event Broadcasting
interface SystemEvent {
eventType: string;
data: any;
timestamp: number;
}
// Subscribe to all system events
const subscriber = redis.psubscribe<SystemEvent>('events:*');
subscriber.on('pmessage', (data) => {
const event = data.message;
const eventCategory = data.channel.split(':')[1];
console.log(`[${eventCategory}] ${event.eventType}:`, event.data);
});
// Publish different types of events
await redis.publish('events:user', {
eventType: 'user.login',
data: { userId: '123', ip: '192.168.1.1' },
timestamp: Date.now(),
});
await redis.publish('events:order', {
eventType: 'order.created',
data: { orderId: '456', amount: 99.99 },
timestamp: Date.now(),
});
Multi-Channel Monitoring
const subscriber = redis.subscribe([
'logs:error',
'logs:warning',
'logs:info'
]);
subscriber.on('message:logs:error', (data) => {
console.error('🔴 ERROR:', data.message);
// Send alert, log to monitoring service, etc.
});
subscriber.on('message:logs:warning', (data) => {
console.warn('🟡 WARNING:', data.message);
});
subscriber.on('message:logs:info', (data) => {
console.info('🟢 INFO:', data.message);
});
Message Serialization
Automatic Deserialization
By default, messages are automatically deserialized from JSON:
const subscriber = redis.subscribe<{ count: number }>('stats');
subscriber.on('message', (data) => {
// data.message is automatically parsed
console.log(data.message.count);
});
await redis.publish('stats', { count: 42 });
Disable Automatic Deserialization
To receive raw message strings:
const redisWithoutDeserialization = new Redis({
url: process.env.UPSTASH_REDIS_REST_URL!,
token: process.env.UPSTASH_REDIS_REST_TOKEN!,
automaticDeserialization: false,
});
const subscriber = redisWithoutDeserialization.subscribe<string>('channel');
subscriber.on('message', (data) => {
// data.message is a string
const parsed = JSON.parse(data.message);
});
Important Notes
Pub/sub messages are fire-and-forget. If a subscriber is offline when a message is published, it will not receive that message. For persistent messaging, consider using Redis Streams instead.
Connection Management
- Subscriptions use HTTP Server-Sent Events (SSE) for real-time updates
- Each subscription maintains a persistent HTTP connection
- Properly clean up subscriptions when done to avoid resource leaks
// Always cleanup when done
process.on('SIGINT', async () => {
await subscriber.unsubscribe();
process.exit(0);
});
- Pattern subscriptions (
psubscribe) are more expensive than exact matches
- Limit the number of concurrent subscriptions per client
- Use channel-specific event handlers for better performance
See Also