Overview
Horse Trust uses Socket.io to provide instant, bidirectional communication between buyers and sellers. The chat system includes conversation management, real-time message delivery, typing indicators, and read receipts.
Architecture
The real-time chat system is built on three main components:
Conversations Two-participant channels linked to horse listings
Messages Persistent chat messages with read status
Socket.io Server WebSocket server for real-time events
Data Models
Conversation Model
Defined in server/src/models/VetRecord.ts:
const conversationSchema = new Schema < IConversation >({
participants: {
type: [{ type: Schema . Types . ObjectId , ref: "User" }],
required: true ,
validate: {
validator : ( arr : unknown []) => arr . length === 2 ,
message: "A conversation requires exactly 2 participants" ,
},
},
horse_id: { type: Schema . Types . ObjectId , ref: "Horse" },
last_message: { type: lastMessageSchema },
}, {
timestamps: { createdAt: "created_at" , updatedAt: "updated_at" },
});
conversationSchema . index ({ participants: 1 });
Conversations are strictly two-participant to maintain clear buyer-seller communication.
Message Model
const messageSchema = new Schema < IMessage >({
conversation_id: { type: Schema . Types . ObjectId , ref: "Conversation" , required: true , index: true },
sender_id: { type: Schema . Types . ObjectId , ref: "User" , required: true },
text: { type: String , required: true , maxlength: 2000 },
is_read: { type: Boolean , default: false },
read_at: { type: Date },
deleted_at: { type: Date },
}, {
timestamps: { createdAt: "sent_at" , updatedAt: false },
});
Messages have a 2000 character limit to prevent abuse and ensure proper display.
Last Message Snapshot
Conversations store a snapshot of the last message for quick list rendering:
const lastMessageSchema = new Schema ({
text: { type: String },
sender_id: { type: Schema . Types . ObjectId , ref: "User" },
sent_at: { type: Date },
is_read: { type: Boolean , default: false },
}, { _id: false });
Socket.io Server Setup
The Socket.io server is configured in server/src/index.ts:
import { Server as SocketServer , Socket } from "socket.io" ;
const httpServer = http . createServer ( app );
const io = new SocketServer ( httpServer , {
cors: {
origin: ( process . env . CORS_ORIGINS || "http://localhost:5173" ). split ( "," ),
credentials: true ,
},
});
CORS Configuration
Comma-separated list of allowed origins from CORS_ORIGINS environment variable
Allow credentials (cookies, authorization headers) in cross-origin requests
JWT Authentication Middleware
All Socket.io connections must be authenticated:
io . use (( socket : Socket , next ) => {
const token = socket . handshake . auth ?. token as string | undefined ;
if ( ! token ) {
return next ( new Error ( "Authentication error: no token" ));
}
try {
const decoded = jwt . verify ( token , process . env . JWT_SECRET as string ) as JwtPayload ;
( socket as Socket & { user : JwtPayload }). user = decoded ;
next ();
} catch {
next ( new Error ( "Authentication error: invalid token" ));
}
});
Extract Token
Client passes JWT token in socket.handshake.auth.token
Verify Signature
Token is verified against JWT_SECRET environment variable
Attach User Data
Decoded user data is attached to socket instance for use in handlers
Reject Unauthorized
Invalid or missing tokens result in connection rejection
Connection Handling
User Rooms
Each authenticated user automatically joins a personal room:
io . on ( "connection" , ( socket : Socket ) => {
const user = ( socket as Socket & { user : JwtPayload }). user ;
console . log ( `🔌 Socket connected: userId= ${ user . userId } ` );
// Join a room per user (so we can target recipients)
socket . join ( `user: ${ user . userId } ` );
// Event handlers...
});
User rooms enable targeted notifications without iterating through all connected sockets.
Event Handlers
Join Conversation
Users join conversation-specific rooms to receive messages:
socket . on ( "join_conversation" , ( conversationId : string ) => {
socket . join ( `conv: ${ conversationId } ` );
console . log ( ` User ${ user . userId } joined conv: ${ conversationId } ` );
});
Send Message
The core messaging logic with full error handling:
socket . on (
"send_message" ,
async ( data : { conversation_id : string ; text : string }, ack ?: ( res : unknown ) => void ) => {
try {
// 1. Verify user is participant
const conversation = await Conversation . findOne ({
_id: data . conversation_id ,
participants: user . userId ,
});
if ( ! conversation ) {
if ( ack ) ack ({ success: false , message: "Conversation not found" });
return ;
}
// 2. Persist to database
const message = await Message . create ({
conversation_id: data . conversation_id ,
sender_id: user . userId ,
text: data . text ,
is_read: false ,
});
// 3. Update last_message snapshot
await Conversation . findByIdAndUpdate ( data . conversation_id , {
last_message: {
text: data . text ,
sender_id: user . userId ,
sent_at: message . sent_at ,
is_read: false ,
},
updated_at: new Date (),
});
// 4. Populate sender info
const populated = await message . populate ( "sender_id" , "full_name profile_picture_url" );
// 5. Emit to all in the conversation room
io . to ( `conv: ${ data . conversation_id } ` ). emit ( "new_message" , populated );
// 6. Notify recipient's personal room
const recipientId = conversation . participants . find (
( p ) => p . toString () !== user . userId
);
if ( recipientId ) {
io . to ( `user: ${ recipientId } ` ). emit ( "message_notification" , {
conversation_id: data . conversation_id ,
sender: user . userId ,
preview: data . text . substring ( 0 , 60 ),
});
}
if ( ack ) ack ({ success: true , data: populated });
} catch ( err ) {
console . error ( "Socket send_message error:" , err );
if ( ack ) ack ({ success: false , message: "Error sending message" });
}
}
);
Verify the user is actually a participant in the conversation before allowing message send.
Messages are immediately persisted to MongoDB before being broadcast.
The conversation’s last_message field is updated for efficient list rendering.
Messages are sent to both the conversation room (for open chats) and recipient’s personal room (for notifications).
The callback function confirms successful delivery to the sender.
Typing Indicators
Real-time typing indicators enhance the chat experience:
socket . on ( "typing" , ( conversationId : string ) => {
socket . to ( `conv: ${ conversationId } ` ). emit ( "user_typing" , { userId: user . userId });
});
socket . on ( "stop_typing" , ( conversationId : string ) => {
socket . to ( `conv: ${ conversationId } ` ). emit ( "user_stop_typing" , { userId: user . userId });
});
Note the use of socket.to() instead of io.to() - this broadcasts to all sockets in the room except the sender.
Client-Side Integration
Connecting to Socket.io
import { io } from "socket.io-client" ;
const token = localStorage . getItem ( 'horse_trust_token' );
const socket = io ( process . env . NEXT_PUBLIC_API_URL , {
auth: { token },
transports: [ 'websocket' , 'polling' ],
});
socket . on ( 'connect' , () => {
console . log ( 'Connected to chat server' );
});
socket . on ( 'connect_error' , ( error ) => {
console . error ( 'Connection failed:' , error . message );
});
Joining a Conversation
useEffect (() => {
if ( conversationId && socket ) {
socket . emit ( 'join_conversation' , conversationId );
return () => {
socket . emit ( 'leave_conversation' , conversationId );
};
}
}, [ conversationId , socket ]);
Sending Messages
const sendMessage = ( text : string ) => {
socket . emit (
'send_message' ,
{
conversation_id: conversationId ,
text: text ,
},
( response ) => {
if ( response . success ) {
console . log ( 'Message sent:' , response . data );
} else {
console . error ( 'Failed to send:' , response . message );
}
}
);
};
Receiving Messages
useEffect (() => {
socket . on ( 'new_message' , ( message ) => {
setMessages (( prev ) => [ ... prev , message ]);
// Scroll to bottom
chatContainerRef . current ?. scrollTo ({
top: chatContainerRef . current . scrollHeight ,
behavior: 'smooth'
});
});
socket . on ( 'message_notification' , ( notification ) => {
// Show toast notification
toast . info ( `New message from ${ notification . sender } ` );
// Update unread count
setUnreadCount (( prev ) => prev + 1 );
});
return () => {
socket . off ( 'new_message' );
socket . off ( 'message_notification' );
};
}, [ socket ]);
Typing Indicators
const [ isTyping , setIsTyping ] = useState ( false );
const typingTimeoutRef = useRef < NodeJS . Timeout >();
const handleInputChange = ( e : React . ChangeEvent < HTMLInputElement >) => {
setText ( e . target . value );
// Emit typing event
socket . emit ( 'typing' , conversationId );
// Clear existing timeout
if ( typingTimeoutRef . current ) {
clearTimeout ( typingTimeoutRef . current );
}
// Stop typing after 2 seconds of inactivity
typingTimeoutRef . current = setTimeout (() => {
socket . emit ( 'stop_typing' , conversationId );
}, 2000 );
};
useEffect (() => {
socket . on ( 'user_typing' , ({ userId }) => {
if ( userId !== currentUserId ) {
setIsTyping ( true );
}
});
socket . on ( 'user_stop_typing' , ({ userId }) => {
if ( userId !== currentUserId ) {
setIsTyping ( false );
}
});
return () => {
socket . off ( 'user_typing' );
socket . off ( 'user_stop_typing' );
};
}, [ socket ]);
Read Receipts
Implement read receipts by emitting a read event when messages are viewed:
// Server-side
socket . on ( 'mark_as_read' , async ( messageId : string ) => {
await Message . findByIdAndUpdate ( messageId , {
is_read: true ,
read_at: new Date (),
});
socket . broadcast . emit ( 'message_read' , { messageId });
});
// Client-side
useEffect (() => {
const observer = new IntersectionObserver (
( entries ) => {
entries . forEach (( entry ) => {
if ( entry . isIntersecting ) {
const messageId = entry . target . getAttribute ( 'data-message-id' );
if ( messageId ) {
socket . emit ( 'mark_as_read' , messageId );
}
}
});
},
{ threshold: 1.0 }
);
// Observe message elements
messageRefs . current . forEach (( ref ) => {
if ( ref ) observer . observe ( ref );
});
return () => observer . disconnect ();
}, [ messages ]);
Disconnection Handling
socket . on ( "disconnect" , () => {
console . log ( `🔌 Socket disconnected: userId= ${ user . userId } ` );
});
Client-Side Reconnection
socket . on ( 'disconnect' , () => {
console . log ( 'Disconnected from server' );
setConnectionStatus ( 'disconnected' );
});
socket . on ( 'reconnect' , ( attemptNumber ) => {
console . log ( `Reconnected after ${ attemptNumber } attempts` );
setConnectionStatus ( 'connected' );
// Rejoin conversation
if ( conversationId ) {
socket . emit ( 'join_conversation' , conversationId );
}
});
Best Practices
Authentication Required Always verify JWT tokens before allowing socket connections
Participant Validation Check user is authorized to send messages in a conversation
Database First Persist messages before broadcasting to ensure durability
Acknowledgments Use acknowledgment callbacks for critical operations
Room Management Properly join/leave rooms to prevent memory leaks
Error Handling Gracefully handle all errors with user feedback
Message History Pagination
Throttle typing indicator events to reduce unnecessary broadcasts (max once per second).
Use connection pooling for database queries in high-traffic scenarios.
For horizontal scaling, use Redis adapter to enable cross-server communication: import { createAdapter } from "@socket.io/redis-adapter" ;
const pubClient = createClient ({ url: process . env . REDIS_URL });
const subClient = pubClient . duplicate ();
io . adapter ( createAdapter ( pubClient , subClient ));
Next Steps
Message History API Load conversation history via REST API
Notification System Implement push notifications for offline users