Skip to main content

Overview

Horse Trust uses Socket.io to provide instant, bidirectional communication between buyers and sellers. The chat system includes conversation management, real-time message delivery, typing indicators, and read receipts.

Architecture

The real-time chat system is built on three main components:

Conversations

Two-participant channels linked to horse listings

Messages

Persistent chat messages with read status

Socket.io Server

WebSocket server for real-time events

Data Models

Conversation Model

Defined in server/src/models/VetRecord.ts:
const conversationSchema = new Schema<IConversation>({
  participants: {
    type: [{ type: Schema.Types.ObjectId, ref: "User" }],
    required: true,
    validate: {
      validator: (arr: unknown[]) => arr.length === 2,
      message: "A conversation requires exactly 2 participants",
    },
  },
  horse_id: { type: Schema.Types.ObjectId, ref: "Horse" },
  last_message: { type: lastMessageSchema },
}, {
  timestamps: { createdAt: "created_at", updatedAt: "updated_at" },
});

conversationSchema.index({ participants: 1 });
Conversations are strictly two-participant to maintain clear buyer-seller communication.

Message Model

const messageSchema = new Schema<IMessage>({
  conversation_id: { type: Schema.Types.ObjectId, ref: "Conversation", required: true, index: true },
  sender_id: { type: Schema.Types.ObjectId, ref: "User", required: true },
  text: { type: String, required: true, maxlength: 2000 },
  is_read: { type: Boolean, default: false },
  read_at: { type: Date },
  deleted_at: { type: Date },
}, {
  timestamps: { createdAt: "sent_at", updatedAt: false },
});
Messages have a 2000 character limit to prevent abuse and ensure proper display.

Last Message Snapshot

Conversations store a snapshot of the last message for quick list rendering:
const lastMessageSchema = new Schema({
  text: { type: String },
  sender_id: { type: Schema.Types.ObjectId, ref: "User" },
  sent_at: { type: Date },
  is_read: { type: Boolean, default: false },
}, { _id: false });

Socket.io Server Setup

The Socket.io server is configured in server/src/index.ts:
import { Server as SocketServer, Socket } from "socket.io";

const httpServer = http.createServer(app);

const io = new SocketServer(httpServer, {
  cors: {
    origin: (process.env.CORS_ORIGINS || "http://localhost:5173").split(","),
    credentials: true,
  },
});

CORS Configuration

origin
string | string[]
Comma-separated list of allowed origins from CORS_ORIGINS environment variable
credentials
boolean
default:true
Allow credentials (cookies, authorization headers) in cross-origin requests

JWT Authentication Middleware

All Socket.io connections must be authenticated:
io.use((socket: Socket, next) => {
  const token = socket.handshake.auth?.token as string | undefined;
  if (!token) {
    return next(new Error("Authentication error: no token"));
  }
  try {
    const decoded = jwt.verify(token, process.env.JWT_SECRET as string) as JwtPayload;
    (socket as Socket & { user: JwtPayload }).user = decoded;
    next();
  } catch {
    next(new Error("Authentication error: invalid token"));
  }
});
1

Extract Token

Client passes JWT token in socket.handshake.auth.token
2

Verify Signature

Token is verified against JWT_SECRET environment variable
3

Attach User Data

Decoded user data is attached to socket instance for use in handlers
4

Reject Unauthorized

Invalid or missing tokens result in connection rejection

Connection Handling

User Rooms

Each authenticated user automatically joins a personal room:
io.on("connection", (socket: Socket) => {
  const user = (socket as Socket & { user: JwtPayload }).user;
  console.log(`🔌 Socket connected: userId=${user.userId}`);

  // Join a room per user (so we can target recipients)
  socket.join(`user:${user.userId}`);
  
  // Event handlers...
});
User rooms enable targeted notifications without iterating through all connected sockets.

Event Handlers

Join Conversation

Users join conversation-specific rooms to receive messages:
socket.on("join_conversation", (conversationId: string) => {
  socket.join(`conv:${conversationId}`);
  console.log(`  User ${user.userId} joined conv:${conversationId}`);
});

Send Message

The core messaging logic with full error handling:
socket.on(
  "send_message",
  async (data: { conversation_id: string; text: string }, ack?: (res: unknown) => void) => {
    try {
      // 1. Verify user is participant
      const conversation = await Conversation.findOne({
        _id: data.conversation_id,
        participants: user.userId,
      });

      if (!conversation) {
        if (ack) ack({ success: false, message: "Conversation not found" });
        return;
      }

      // 2. Persist to database
      const message = await Message.create({
        conversation_id: data.conversation_id,
        sender_id: user.userId,
        text: data.text,
        is_read: false,
      });

      // 3. Update last_message snapshot
      await Conversation.findByIdAndUpdate(data.conversation_id, {
        last_message: {
          text: data.text,
          sender_id: user.userId,
          sent_at: message.sent_at,
          is_read: false,
        },
        updated_at: new Date(),
      });

      // 4. Populate sender info
      const populated = await message.populate("sender_id", "full_name profile_picture_url");

      // 5. Emit to all in the conversation room
      io.to(`conv:${data.conversation_id}`).emit("new_message", populated);

      // 6. Notify recipient's personal room
      const recipientId = conversation.participants.find(
        (p) => p.toString() !== user.userId
      );
      if (recipientId) {
        io.to(`user:${recipientId}`).emit("message_notification", {
          conversation_id: data.conversation_id,
          sender: user.userId,
          preview: data.text.substring(0, 60),
        });
      }

      if (ack) ack({ success: true, data: populated });
    } catch (err) {
      console.error("Socket send_message error:", err);
      if (ack) ack({ success: false, message: "Error sending message" });
    }
  }
);
Verify the user is actually a participant in the conversation before allowing message send.
Messages are immediately persisted to MongoDB before being broadcast.
The conversation’s last_message field is updated for efficient list rendering.
Messages are sent to both the conversation room (for open chats) and recipient’s personal room (for notifications).
The callback function confirms successful delivery to the sender.

Typing Indicators

Real-time typing indicators enhance the chat experience:
socket.on("typing", (conversationId: string) => {
  socket.to(`conv:${conversationId}`).emit("user_typing", { userId: user.userId });
});

socket.on("stop_typing", (conversationId: string) => {
  socket.to(`conv:${conversationId}`).emit("user_stop_typing", { userId: user.userId });
});
Note the use of socket.to() instead of io.to() - this broadcasts to all sockets in the room except the sender.

Client-Side Integration

Connecting to Socket.io

import { io } from "socket.io-client";

const token = localStorage.getItem('horse_trust_token');

const socket = io(process.env.NEXT_PUBLIC_API_URL, {
  auth: { token },
  transports: ['websocket', 'polling'],
});

socket.on('connect', () => {
  console.log('Connected to chat server');
});

socket.on('connect_error', (error) => {
  console.error('Connection failed:', error.message);
});

Joining a Conversation

useEffect(() => {
  if (conversationId && socket) {
    socket.emit('join_conversation', conversationId);
    
    return () => {
      socket.emit('leave_conversation', conversationId);
    };
  }
}, [conversationId, socket]);

Sending Messages

const sendMessage = (text: string) => {
  socket.emit(
    'send_message',
    {
      conversation_id: conversationId,
      text: text,
    },
    (response) => {
      if (response.success) {
        console.log('Message sent:', response.data);
      } else {
        console.error('Failed to send:', response.message);
      }
    }
  );
};

Receiving Messages

useEffect(() => {
  socket.on('new_message', (message) => {
    setMessages((prev) => [...prev, message]);
    
    // Scroll to bottom
    chatContainerRef.current?.scrollTo({
      top: chatContainerRef.current.scrollHeight,
      behavior: 'smooth'
    });
  });

  socket.on('message_notification', (notification) => {
    // Show toast notification
    toast.info(`New message from ${notification.sender}`);
    
    // Update unread count
    setUnreadCount((prev) => prev + 1);
  });

  return () => {
    socket.off('new_message');
    socket.off('message_notification');
  };
}, [socket]);

Typing Indicators

const [isTyping, setIsTyping] = useState(false);
const typingTimeoutRef = useRef<NodeJS.Timeout>();

const handleInputChange = (e: React.ChangeEvent<HTMLInputElement>) => {
  setText(e.target.value);
  
  // Emit typing event
  socket.emit('typing', conversationId);
  
  // Clear existing timeout
  if (typingTimeoutRef.current) {
    clearTimeout(typingTimeoutRef.current);
  }
  
  // Stop typing after 2 seconds of inactivity
  typingTimeoutRef.current = setTimeout(() => {
    socket.emit('stop_typing', conversationId);
  }, 2000);
};

useEffect(() => {
  socket.on('user_typing', ({ userId }) => {
    if (userId !== currentUserId) {
      setIsTyping(true);
    }
  });
  
  socket.on('user_stop_typing', ({ userId }) => {
    if (userId !== currentUserId) {
      setIsTyping(false);
    }
  });

  return () => {
    socket.off('user_typing');
    socket.off('user_stop_typing');
  };
}, [socket]);

Read Receipts

Implement read receipts by emitting a read event when messages are viewed:
// Server-side
socket.on('mark_as_read', async (messageId: string) => {
  await Message.findByIdAndUpdate(messageId, {
    is_read: true,
    read_at: new Date(),
  });
  
  socket.broadcast.emit('message_read', { messageId });
});

// Client-side
useEffect(() => {
  const observer = new IntersectionObserver(
    (entries) => {
      entries.forEach((entry) => {
        if (entry.isIntersecting) {
          const messageId = entry.target.getAttribute('data-message-id');
          if (messageId) {
            socket.emit('mark_as_read', messageId);
          }
        }
      });
    },
    { threshold: 1.0 }
  );

  // Observe message elements
  messageRefs.current.forEach((ref) => {
    if (ref) observer.observe(ref);
  });

  return () => observer.disconnect();
}, [messages]);

Disconnection Handling

socket.on("disconnect", () => {
  console.log(`🔌 Socket disconnected: userId=${user.userId}`);
});

Client-Side Reconnection

socket.on('disconnect', () => {
  console.log('Disconnected from server');
  setConnectionStatus('disconnected');
});

socket.on('reconnect', (attemptNumber) => {
  console.log(`Reconnected after ${attemptNumber} attempts`);
  setConnectionStatus('connected');
  
  // Rejoin conversation
  if (conversationId) {
    socket.emit('join_conversation', conversationId);
  }
});

Best Practices

Authentication Required

Always verify JWT tokens before allowing socket connections

Participant Validation

Check user is authorized to send messages in a conversation

Database First

Persist messages before broadcasting to ensure durability

Acknowledgments

Use acknowledgment callbacks for critical operations

Room Management

Properly join/leave rooms to prevent memory leaks

Error Handling

Gracefully handle all errors with user feedback

Performance Considerations

Load messages in batches (e.g., 50 at a time) and implement infinite scroll for long conversations.
Throttle typing indicator events to reduce unnecessary broadcasts (max once per second).
Use connection pooling for database queries in high-traffic scenarios.
For horizontal scaling, use Redis adapter to enable cross-server communication:
import { createAdapter } from "@socket.io/redis-adapter";
const pubClient = createClient({ url: process.env.REDIS_URL });
const subClient = pubClient.duplicate();
io.adapter(createAdapter(pubClient, subClient));

Next Steps

Message History API

Load conversation history via REST API

Notification System

Implement push notifications for offline users

Build docs developers (and LLMs) love