Overview
The Rodando Driver app uses Socket.io for real-time bidirectional communication with the backend. WebSocket integration enables instant trip offers, live status updates, and real-time coordination between drivers and passengers.
Socket.io provides automatic reconnection, fallback to HTTP long-polling, and a robust event-based API for real-time features.
Architecture
Backend emits event
Server broadcasts trip events to connected drivers.
DriverWsService receives
Socket.io client receives and routes events to Subjects.
Facade handles business logic
TripFacade subscribes to event streams and orchestrates responses.
Store updates state
State changes trigger UI updates via Angular Signals.
DriverWsService
The core WebSocket service managing connection and events:
import { Injectable, inject } from '@angular/core';
import { io, Socket } from 'socket.io-client';
import { BehaviorSubject, Subject } from 'rxjs';
import { environment } from 'src/environments/environment';
import { AuthStore } from "../../../store/auth/auth.store";
type OfferPayload = {
assignmentId: string;
tripId: string;
ttlSec: number;
expiresAt: string; // ISO timestamp
};
type DriverAssignedPayload = {
tripId: string;
driverId: string;
vehicleId: string;
at: string;
currentStatus: 'accepted';
passenger?: {
id: string;
name: string | null;
photoUrl: string | null;
phoneMasked: string | null;
} | null;
};
type TripStartedPayload = {
tripId: string;
at: string;
currentStatus: 'in_progress';
};
type TripCompletedPayload = {
tripId: string;
at: string;
currentStatus: 'completed';
fareTotal?: number | null;
currency?: string | null;
};
type TripCancelledPayload = {
tripId: string;
at: string;
currentStatus: 'cancelled';
reason?: string | null;
};
@Injectable({ providedIn: 'root' })
export class DriverWsService {
private auth = inject(AuthStore);
private socket?: Socket;
// Connection state
private connected$ = new BehaviorSubject<boolean>(false);
readonly onConnected$ = this.connected$.asObservable();
// Event streams
readonly onOffer$ = new Subject<OfferPayload>();
readonly onDriverAssigned$ = new Subject<DriverAssignedPayload>();
readonly onArrivingStarted$ = new Subject<ArrivingStartedPayload>();
readonly onTripStarted$ = new Subject<TripStartedPayload>();
readonly onTripCompleted$ = new Subject<TripCompletedPayload>();
readonly onTripCancelled$ = new Subject<TripCancelledPayload>();
connect(): void {
if (this.socket?.connected) return;
this.socket = io(`${environment.wsBase}/drivers`, {
transports: ['websocket'],
auth: { token: this.auth.accessToken?.() || '' },
reconnection: true,
reconnectionAttempts: Infinity,
reconnectionDelayMax: 5000,
});
// Refresh token on reconnect attempts
this.socket.io.on('reconnect_attempt', () => {
this.socket!.auth = { token: this.auth.accessToken?.() || '' };
});
// Debug: log all events
this.socket.onAny((evt, ...args) =>
console.log('[WS] >>>', evt, ...args)
);
// Connection lifecycle
this.socket.on('connect', () => this.connected$.next(true));
this.socket.on('disconnect', () => this.connected$.next(false));
this.socket.on('connect_error', (e: any) =>
console.warn('[WS] err', e?.message)
);
// Trip events
this.socket.on('trip:assignment:offered', (p: OfferPayload) => {
console.log('[WS] trip:assignment:offered', p);
this.onOffer$.next(p);
});
this.socket.on('trip:driver_assigned', (p: DriverAssignedPayload) => {
console.log('[WS] trip:driver_assigned', p);
this.onDriverAssigned$.next(p);
});
this.socket.on('trip:arriving_started', (p: ArrivingStartedPayload) => {
console.log('[WS] trip:arriving_started', p);
this.onArrivingStarted$.next(p);
});
this.socket.on('trip:started', (p: TripStartedPayload) => {
console.log('[WS] trip:started', p);
this.onTripStarted$.next(p);
});
this.socket.on('trip:completed', (p: TripCompletedPayload) => {
console.log('[WS] trip:completed', p);
this.onTripCompleted$.next(p);
});
this.socket.on('trip:cancelled', (p: TripCancelledPayload) => {
console.log('[WS] trip:cancelled', p);
this.onTripCancelled$.next(p);
});
}
disconnect(): void {
try {
this.socket?.removeAllListeners();
this.socket?.disconnect();
console.log('[WS] disconnect() called');
} catch {}
this.connected$.next(false);
}
}
Authentication : The WebSocket connection uses JWT authentication. The token is refreshed automatically on reconnection attempts.
Event Types
The service handles six core event types:
1. Trip Assignment Offered
Event : trip:assignment:offered
When : Backend matches driver to a new trip request
Payload :
{
assignmentId : "asgn_xyz123" ,
tripId : "trip_abc456" ,
ttlSec : 20 , // Seconds to accept
expiresAt : "2024-03-09T15:30:00Z"
}
Handler :
this.ws.onOffer$.subscribe(async (offer) => {
if (this.store.state().phase !== 'idle') {
console.log('[TripFacade] ignoring offer, driver busy');
return;
}
this.store.setActiveTripId(offer.tripId);
this.store.setOfferAssignmentId(offer.assignmentId);
this.store.setPhase('assigned');
await this.refreshTrip();
this.startCountdown({
expiresAtIso: offer.expiresAt,
ttlSec: offer.ttlSec,
});
await this.modalSvc.open(); // Show offer modal
});
2. Driver Assigned
Event : trip:driver_assigned
When : Driver accepts the trip
Payload :
{
tripId : "trip_abc456" ,
driverId : "drv_789" ,
vehicleId : "veh_321" ,
at : "2024-03-09T15:30:10Z" ,
currentStatus : "accepted" ,
passenger : {
id : "pax_111" ,
name : "Juan Pérez" ,
photoUrl : "https://..." ,
phoneMasked : "+53 5*** **12"
}
}
Handler :
this.ws.onDriverAssigned$.subscribe(async (p) => {
console.log('[TripFacade] onDriverAssigned$', p);
this.store.setActiveTripId(p.tripId);
this.store.setPhase('assigned');
if (p.passenger) {
this.store.setPassengerSlim({
id: p.passenger.id,
name: p.passenger.name,
phoneMasked: p.passenger.phoneMasked,
photoUrl: p.passenger.photoUrl,
});
}
await this.refreshTrip();
});
3. Arriving Started
Event : trip:arriving_started
When : Driver confirms arrival at pickup location
Payload :
{
snapshot : {
tripId : "trip_abc456" ,
passengerId : "pax_111"
},
at : "2024-03-09T15:35:00Z"
}
Handler :
this.ws.onArrivingStarted$.subscribe(async (ev) => {
console.log('[TripFacade] onArrivingStarted$', ev);
const active = this.store.state().activeTripId;
if (!active || ev.snapshot.tripId !== active) return;
this.store.setPhase('arriving');
await this.modalSvc.close('arriving');
await this.refreshTrip();
});
4. Trip Started
Event : trip:started
When : Driver starts the trip (passenger onboard)
Payload :
{
tripId : "trip_abc456" ,
at : "2024-03-09T15:40:00Z" ,
currentStatus : "in_progress"
}
Handler :
this.ws.onTripStarted$.subscribe(async (p) => {
console.log('[TripFacade] onTripStarted$', p);
const active = this.store.state().activeTripId;
if (!active || p.tripId !== active) return;
this.store.setPhase('in_progress');
await this.refreshTrip();
});
5. Trip Completed
Event : trip:completed
When : Driver completes the trip
Payload :
{
tripId : "trip_abc456" ,
at : "2024-03-09T16:00:00Z" ,
currentStatus : "completed" ,
fareTotal : 150.50 ,
currency : "CUP"
}
Handler :
this.ws.onTripCompleted$.subscribe(async (p) => {
console.log('[TripFacade] onTripCompleted$', p);
const active = this.store.state().activeTripId;
if (!active || p.tripId !== active) return;
this.store.setPhase('completed');
await this.refreshTrip();
// Driver can view trip summary
});
6. Trip Cancelled
Event : trip:cancelled
When : Trip is cancelled (by driver or passenger)
Payload :
{
tripId : "trip_abc456" ,
at : "2024-03-09T15:38:00Z" ,
currentStatus : "cancelled" ,
reason : "Passenger cancelled"
}
Handler :
this.ws.onTripCancelled$.subscribe(async (p) => {
console.log('[TripFacade] onTripCancelled$', p);
const active = this.store.state().activeTripId;
if (!active || p.tripId !== active) return;
this.store.setPhase('cancelled');
await this.refreshTrip();
});
Connection Lifecycle
Initialization
Connect to WebSocket when user logs in:
login(payload: LoginPayload): Observable<User> {
return this.authService.login(payload).pipe(
tap(async (user) => {
this.authStore.setUser(user);
// Connect to WebSocket after successful login
this.wsService.connect();
await this.router.navigate(['/home']);
})
);
}
Reconnection
Socket.io automatically reconnects with exponential backoff:
this . socket = io ( url , {
reconnection: true ,
reconnectionAttempts: Infinity ,
reconnectionDelayMax: 5000 , // Max 5 seconds between attempts
});
// Refresh auth token on reconnect
this . socket . io . on ( 'reconnect_attempt' , () => {
this . socket ! . auth = { token: this . auth . accessToken ?.() || '' };
});
Token Refresh : The connection automatically updates the JWT token before each reconnection attempt, ensuring uninterrupted service even if the token expires.
Cleanup
Disconnect when user logs out:
async clearAll(): Promise<void> {
// Disconnect WebSocket
this.wsService.disconnect();
// Clear auth state
this.authStore.clear();
await this.secureStorage.remove('refreshToken');
await this.router.navigateByUrl('/auth/login');
}
Integration with State Management
WebSocket events flow into the state management layer:
Example: Full Event Flow
constructor() {
// Subscribe to WebSocket event
this.ws.onOffer$.subscribe(async (offer) => {
// 1. Check if driver is available
const s = this.store.state();
if (s.phase !== 'idle') {
console.log('[TripFacade] ignoring offer, driver busy');
return;
}
// 2. Update state
this.store.setActiveTripId(offer.tripId);
this.store.setOfferAssignmentId(offer.assignmentId);
this.store.setPhase('assigned');
// 3. Fetch full trip details from API
await this.refreshTrip();
// 4. Start countdown timer
this.startCountdown({
expiresAtIso: offer.expiresAt,
ttlSec: offer.ttlSec,
});
// 5. Show modal to driver
await this.modalSvc.open();
});
}
private async refreshTrip(): Promise<void> {
const id = this.store.state().activeTripId;
if (!id) return;
this.store.setLoading();
const trip = await this.tripsApi.getById(id)
.pipe(take(1), catchError(() => of(null)))
.toPromise();
if (trip) {
this.store.setTrip(trip);
const baseFare = trip.fareEstimatedTotal ?? trip.fareFinalTotal;
this.store.setInitialFare(baseFare, trip.fareFinalCurrency);
} else {
this.store.setError('No se pudo cargar el viaje');
}
}
Error Handling
Connection Errors
this . socket . on ( 'connect_error' , ( e : any ) => {
console . warn ( '[WS] Connection error:' , e ?. message );
// Common causes:
// - Invalid JWT token
// - Network issues
// - Backend unavailable
// Socket.io will auto-retry with exponential backoff
});
Event Errors
Wrap event handlers in try-catch:
this . ws . onOffer$ . subscribe ( async ( offer ) => {
try {
await this . handleOffer ( offer );
} catch ( err ) {
console . error ( '[TripFacade] Error handling offer:' , err );
this . store . setError ( 'No se pudo procesar la oferta' );
}
});
Debugging
Enable debug logging
The service logs all WebSocket events:
// Debug: log all incoming events
this . socket . onAny (( evt , ... args ) => {
console . log ( '[WS] >>>' , evt , ... args );
});
Example output :
[WS] >>> connect
[WS] >>> trip:assignment:offered { assignmentId: 'asgn_123', tripId: 'trip_456', ... }
[WS] >>> trip:driver_assigned { tripId: 'trip_456', driverId: 'drv_789', ... }
Connection status
Subscribe to connection state:
export class StatusComponent implements OnInit {
private ws = inject ( DriverWsService );
ngOnInit () {
this . ws . onConnected$ . subscribe ( connected => {
console . log ( 'WebSocket connected:' , connected );
// Update UI indicator
});
}
}
Event Filtering
Always validate events before processing:
this . ws . onTripStarted$ . subscribe ( async ( p ) => {
const active = this . store . state (). activeTripId ;
// Ignore events for other trips
if ( ! active || p . tripId !== active ) {
console . log ( '[TripFacade] Ignoring event for different trip' );
return ;
}
// Process event
this . store . setPhase ( 'in_progress' );
});
Memory Leaks
Unsubscribe when component/service destroys:
export class TripComponent implements OnDestroy {
private destroy$ = new Subject < void >();
ngOnInit () {
this . ws . onOffer$
. pipe ( takeUntil ( this . destroy$ ))
. subscribe ( offer => this . handleOffer ( offer ));
}
ngOnDestroy () {
this . destroy$ . next ();
this . destroy$ . complete ();
}
}
Testing
Mock WebSocket Service
import { Subject } from 'rxjs' ;
class MockDriverWsService {
onOffer$ = new Subject < OfferPayload >();
onDriverAssigned$ = new Subject < DriverAssignedPayload >();
onTripStarted$ = new Subject < TripStartedPayload >();
connect = jasmine . createSpy ( 'connect' );
disconnect = jasmine . createSpy ( 'disconnect' );
}
describe ( 'TripFacade' , () => {
let facade : TripFacade ;
let mockWs : MockDriverWsService ;
beforeEach (() => {
TestBed . configureTestingModule ({
providers: [
TripFacade ,
{ provide: DriverWsService , useClass: MockDriverWsService },
],
});
facade = TestBed . inject ( TripFacade );
mockWs = TestBed . inject ( DriverWsService ) as any ;
});
it ( 'should handle trip offer' , () => {
const offer : OfferPayload = {
assignmentId: 'asgn_123' ,
tripId: 'trip_456' ,
ttlSec: 20 ,
expiresAt: new Date ( Date . now () + 20000 ). toISOString (),
};
mockWs . onOffer$ . next ( offer );
expect ( facade . vm (). phase ). toBe ( 'assigned' );
expect ( facade . vm (). activeTripId ). toBe ( 'trip_456' );
});
});
Best Practices
Always validate events Check tripId matches active trip before processing.
Handle reconnections gracefully Refresh tokens and resync state after reconnect.
Log all events in dev Use onAny() for comprehensive debugging.
Clean up subscriptions Use takeUntil() to prevent memory leaks.
Security
JWT Authentication
Every WebSocket connection is authenticated:
this . socket = io ( url , {
auth: { token: this . auth . accessToken ?.() || '' },
});
Token Refresh
Automatically update token on reconnect:
this . socket . io . on ( 'reconnect_attempt' , () => {
// Get fresh token from store
this . socket ! . auth = { token: this . auth . accessToken ?.() || '' };
});
Important : Never send sensitive data (passwords, payment info) over WebSocket. Use HTTPS API endpoints for sensitive operations.
State Management How WebSocket events update application state.
Trip Flow Complete trip lifecycle from offer to completion.
Architecture Overview Learn about the overall app architecture.
API Reference Complete WebSocket event reference.