Watch Together enables synchronized playback across multiple users, allowing friends to watch content together in real-time regardless of location. Built on WebSocket-based synchronization and MPV integration.
Overview
Watch Together uses a relay server architecture with MPV’s JSON-IPC protocol for precise playback synchronization:
WebSocket Relay : Central server coordinates rooms and sync commands
MPV Integration : Bidirectional IPC via named pipes for real-time control
Syncplay Algorithm : Intelligent drift correction with speed adjustments
Room Codes : 6-character codes for easy session joining
Creating a Room
The host creates a room and receives a shareable room code.
Frontend
src/components/WatchTogether/WatchTogetherModal.tsx
const handleCreateRoom = async () => {
const newRoom = await wtCreateRoom (
selectedMedia . id ,
selectedMedia . title ,
buildMediaMatchKey ( selectedMedia ),
nickname . trim ()
);
console . log ( 'Room created:' , newRoom . code );
};
// Media match key for cross-file compatibility
function buildMediaMatchKey ( media : MediaItem ) : string | undefined {
const tokens : string [] = [];
if ( media . cloud_file_id ?. trim ()) {
tokens . push ( `cloud: ${ media . cloud_file_id . trim (). toLowerCase () } ` );
}
if ( media . file_path ?. trim ()) {
const fileName = normalizedPath . split ( '/' ). pop ()?. trim ();
if ( fileName ) {
tokens . push ( `file: ${ fileName . toLowerCase () } ` );
}
}
if ( media . tmdb_id ?. trim ()) {
tokens . push ( `tmdb: ${ media . tmdb_id . trim (). toLowerCase () } ` );
}
return tokens . join ( '|' );
}
Backend
src-tauri/src/watch_together.rs
pub async fn create_room (
& self ,
media_id : i64 ,
media_title : String ,
media_match_key : Option < String >,
nickname : String ,
) -> Result < RoomInfo , String > {
let mut session = WatchSession :: new ( media_id , true );
let client_id = session . client_id . clone ();
// Connect to WebSocket relay server
let relay_url = get_relay_server_url ();
let ( ws_stream , _ ) = connect_async ( relay_url ) . await ? ;
let ( mut write , mut read ) = ws_stream . split ();
// Send create message
let create_msg = ClientMessage :: Create {
media_title ,
media_id ,
media_match_key ,
nickname ,
client_id : client_id . clone (),
};
write . send ( Message :: Text ( serde_json :: to_string ( & create_msg ) ? )) . await ? ;
// Wait for room_created response
let room = wait_for_room_created ( & mut read ) . await ? ;
Ok ( room )
}
Room codes use only non-ambiguous characters (no I/1/O/0) for easier sharing.
Joining a Room
Participants join using the 6-character room code.
const handleJoinRoom = async () => {
const joinedRoom = await wtJoinRoom (
roomCode . trim (). toUpperCase (),
selectedMedia . id ,
selectedMedia . title ,
buildMediaMatchKey ( selectedMedia ),
nickname . trim ()
);
// Auto-launch MPV if playback already started
if ( joinedRoom . is_playing ) {
await launchMpv ( joinedRoom . current_position || 0 );
}
};
pub async fn join_room (
& self ,
room_code : String ,
media_id : i64 ,
media_title : Option < String >,
media_match_key : Option < String >,
nickname : String ,
) -> Result < RoomInfo , String > {
let mut session = WatchSession :: new ( media_id , false );
let join_msg = ClientMessage :: Join {
room_code : room_code . to_uppercase (),
nickname ,
client_id : session . client_id . clone (),
media_id ,
media_title ,
media_match_key ,
};
// Send join and wait for confirmation
let room = send_and_wait_for_room ( & mut write , & mut read , join_msg ) . await ? ;
Ok ( room )
}
Room Lobby
Before playback starts, participants wait in the lobby.
Participant States
src/components/WatchTogether/RoomLobby.tsx
const handleSetReady = async () => {
const duration = mediaDuration ?? 0 ;
await wtSetReady ( duration );
setIsReady ( true );
};
const handleStartPlayback = async () => {
// Host-only action
await wtStartPlayback ();
await onLaunchMpv ( 0 ); // Start at position 0
onPlaybackStart ();
};
Invite Friends
Integrated with the social system for direct invitations:
const handleInviteFriend = async ( friend : Friend ) => {
const inviteText = `Join my Watch Together room for " ${ room . media_title } ". Room code: ${ room . code } ` ;
await sendChatMessage ( friend . id , inviteText );
};
WebSocket Synchronization
Message Types
Client Messages
Server Messages
pub enum ClientMessage {
Create { media_title : String , media_id : i64 , nickname : String , ... },
Join { room_code : String , nickname : String , ... },
Ready { duration : f64 },
Start ,
Sync { command : SyncCommand },
StateReport { position : f64 , paused : bool },
Leave ,
}
Sync Commands
pub enum SyncAction {
Play { position : f64 },
Pause { position : f64 },
Seek { position : f64 },
}
MPV Playback Sync
Watch Together uses MPV’s JSON-IPC protocol for precise control.
IPC Connection
src-tauri/src/watch_together_mpv.rs
pub async fn connect ( & mut self ) -> Result <(), String > {
let pipe_name = format! ( " \\\\ . \\ pipe \\ mpv-wt-{}" , self . session_id);
// Wait for MPV to create the named pipe
let handle = CreateFileW ( pipe_name , GENERIC_READ | GENERIC_WRITE , ... );
let pipe_file = File :: from_raw_handle ( handle );
// Spawn reader/writer tasks
let ( cmd_tx , mut cmd_rx ) = mpsc :: channel :: < String >( 64 );
let ( event_tx , event_rx ) = mpsc :: channel :: < MpvSyncEvent >( 64 );
// Observe properties for real-time updates
self . observe_property ( 1 , "time-pos" ) . await ? ;
self . observe_property ( 2 , "pause" ) . await ? ;
self . observe_property ( 3 , "duration" ) . await ? ;
Ok (())
}
Launch MPV
pub fn launch_mpv_wt (
mpv_path : & str ,
file_or_url : & str ,
media_id : i64 ,
session_id : & str ,
start_position : f64 ,
auth_header : Option < & str >,
is_host : bool ,
) -> Result <( u32 , WatchTogetherController ), String > {
let controller = WatchTogetherController :: new ( session_id , is_host );
let mut cmd = std :: process :: Command :: new ( mpv_path );
cmd . arg ( controller . get_ipc_arg ()); // --input-ipc-server=\\.\pipe\mpv-wt-{session}
cmd . arg ( format! ( "--start={}" , start_position as i64 ));
cmd . arg ( "--pause=yes" ); // Start paused until everyone ready
cmd . arg ( file_or_url );
let child = cmd . spawn () ? ;
let pid = child . id ();
Ok (( pid , controller ))
}
Syncplay-Style Drift Correction
StreamVault implements Syncplay’s proven synchronization algorithm.
Sync Thresholds
const SEEK_THRESHOLD : f64 = 2.0 ; // Hard seek if drift > 2s
const SLOWDOWN_KICKIN : f64 = 0.35 ; // Start speed correction at 0.35s
const SLOWDOWN_RESET : f64 = 0.1 ; // Reset to 1x speed when < 0.1s drift
const SLOWDOWN_RATE : f64 = 0.95 ; // Slow to 95% speed when ahead
const SPEEDUP_RATE : f64 = 1.05 ; // Speed up to 105% when behind
const REWIND_THRESHOLD : f64 = 4.0 ; // Hard seek if 4s ahead
const FASTFORWARD_THRESHOLD : f64 = 5.0 ; // Hard seek if 5s behind
const SEEK_COOLDOWN_MS : u64 = 1200 ; // Prevent seek spam
Apply Sync
pub async fn apply_sync ( & self , server_position : f64 , server_paused : bool ) -> Result <(), String > {
let state = self . local_state . write () . await ;
let elapsed = state . last_update . elapsed () . as_secs_f64 ();
let estimated_position = if state . paused {
state . position
} else {
state . position + ( elapsed * state . speed)
};
let diff = estimated_position - server_position ;
// Tiered correction strategy
if diff > REWIND_THRESHOLD || diff < - FASTFORWARD_THRESHOLD {
// Very large drift - hard seek
self . seek_to ( server_position ) . await ? ;
self . set_speed ( 1.0 ) . await ? ;
} else if diff . abs () > SEEK_THRESHOLD {
// Medium drift - seek
self . seek_to ( server_position ) . await ? ;
self . set_speed ( 1.0 ) . await ? ;
} else if diff > SLOWDOWN_KICKIN {
// Slightly ahead - slow down
self . set_speed ( SLOWDOWN_RATE ) . await ? ;
} else if diff < - SLOWDOWN_KICKIN {
// Slightly behind - speed up
self . set_speed ( SPEEDUP_RATE ) . await ? ;
} else if diff . abs () < SLOWDOWN_RESET {
// Close enough - reset to normal
self . set_speed ( 1.0 ) . await ? ;
}
Ok (())
}
Echo Prevention
pub async fn set_paused ( & self , paused : bool ) -> Result <(), String > {
// Increment counter to ignore the echo event
self . ignoring_on_the_fly . fetch_add ( 1 , Ordering :: SeqCst );
self . send_command ( vec! [
"set_property" . into (),
"pause" . into (),
paused . into (),
]) . await ? ;
Ok (())
}
Sync Status Indicator
Real-time feedback during playback.
src/components/WatchTogether/SyncStatusIndicator.tsx
export function SyncStatusIndicator ({
isConnected ,
lastSyncTime
} : SyncStatusIndicatorProps ) {
const [ timeSinceSync , setTimeSinceSync ] = useState ( 0 );
useEffect (() => {
const interval = setInterval (() => {
if ( lastSyncTime ) {
setTimeSinceSync ( Date . now () - lastSyncTime );
}
}, 1000 );
return () => clearInterval ( interval );
}, [ lastSyncTime ]);
const syncStatus = timeSinceSync < 5000 ? 'synced' :
timeSinceSync < 10000 ? 'syncing' : 'desynced' ;
return (
< div className = "fixed bottom-4 right-4 z-50" >
< div className = "flex items-center gap-2 bg-zinc-900/95 backdrop-blur-sm border border-zinc-800 rounded-lg px-3 py-2" >
< div className = { cn (
"w-2 h-2 rounded-full" ,
syncStatus === 'synced' && "bg-green-500 animate-pulse" ,
syncStatus === 'syncing' && "bg-yellow-500" ,
syncStatus === 'desynced' && "bg-red-500"
) } />
< span className = "text-xs text-zinc-400" >
{ syncStatus === 'synced' ? 'Synced' :
syncStatus === 'syncing' ? 'Syncing...' : 'Connection Lost' }
</ span >
</ div >
</ div >
);
}
Participant List
src/components/WatchTogether/ParticipantList.tsx
export function ParticipantList ({ participants , currentUserId } : ParticipantListProps ) {
return (
< div className = "space-y-2" >
{ participants . map (( participant ) => (
< div key = { participant . id } className = "flex items-center justify-between p-2 bg-zinc-800/50 rounded" >
< div className = "flex items-center gap-2" >
< div className = "w-8 h-8 rounded-full bg-purple-600 flex items-center justify-center" >
{ participant . nickname . charAt ( 0 ). toUpperCase () }
</ div >
< div >
< p className = "text-sm font-medium" >
{ participant . nickname }
{ participant . is_host && < span className = "ml-2 text-xs text-purple-400" > Host </ span > }
{ participant . id === currentUserId && < span className = "ml-2 text-xs text-zinc-500" > You </ span > }
</ p >
</ div >
</ div >
{ participant . is_ready ? (
< Check className = "w-5 h-5 text-green-500" />
) : (
< Loader2 className = "w-5 h-5 text-zinc-500 animate-spin" />
) }
</ div >
)) }
</ div >
);
}
Event Handling
Frontend Event Listener
useEffect (() => {
const unlisten = listen < WatchEvent >( 'wt-event' , ( event ) => {
const data = event . payload ;
switch ( data . type ) {
case 'room_updated' :
case 'participant_changed' :
onSessionChange ( data . room , sessionId , data . room . is_playing );
break ;
case 'sync_command' :
// Applied in backend, just update UI
setLastSyncTime ( Date . now ());
break ;
case 'state_update' :
// Periodic authoritative position from server
setLastSyncTime ( Date . now ());
setIsConnected ( true );
break ;
case 'playback_started' :
setView ( 'playing' );
launchMpv ( data . position || 0 );
break ;
case 'disconnected' :
setIsConnected ( false );
onSessionChange ( null , '' , false );
break ;
}
});
return () => { unlisten . then ( fn => fn ()); };
}, []);
Backend Event Emission
let wt = state . watch_together . clone ();
wt . set_event_callback ( move | event : WatchEvent | {
window . emit ( "wt-event" , & event ) . ok ();
// Apply sync updates to MPV
if let WatchEvent :: StateUpdate { position , paused , .. } = & event {
let ctrl = wt_ctrl . clone ();
tokio :: spawn ( async move {
if let Some ( controller ) = ctrl . lock () . await . as_ref () {
controller . apply_sync ( * position , * paused ) . await . ok ();
}
});
}
}) . await ;
Complete Tauri Commands
Create Room
Join Room
Leave Room
Set Ready
Start Playback
#[tauri :: command]
async fn wt_create_room (
state : State <' _ , AppState >,
window : Window ,
media_id : i64 ,
media_title : String ,
media_match_key : Option < String >,
nickname : String ,
) -> Result < RoomInfo , String > {
let wt = state . watch_together . clone ();
wt . create_room ( media_id , media_title , media_match_key , nickname ) . await
}
Configuration
Relay Server URL
fn get_relay_server_url () -> String {
std :: env :: var ( "STREAMVAULT_WS_URL" )
. unwrap_or_else ( | _ | {
if cfg! ( debug_assertions ) {
"ws://localhost:3001/ws/watchtogether" . to_string ()
} else {
"wss://streamvault-backend-server.onrender.com/ws/watchtogether" . to_string ()
}
})
}
Watch Together requires an internet connection and a running relay server.
Best Practices
Network Quality : Recommend 5+ Mbps upload/download for smooth sync
Media Matching : Use media_match_key to allow different file sources (local vs cloud)
Error Handling : Always handle disconnection gracefully with reconnect UI
RTT Monitoring : Display latency to help users understand sync delays
Cooldowns : Implement seek cooldowns to prevent command spam
Troubleshooting
Desync Issues
// Check RTT (Round Trip Time)
ServerMessage :: StateUpdate { your_rtt , .. } => {
if your_rtt > 500.0 {
// High latency warning
}
}
Connection Loss
if ( timeSinceSync > 10000 ) {
// Show reconnect prompt
< Button onClick = { handleReconnect } > Reconnect </ Button >
}
Friends System Manage friends and send Watch Together invites
Activity Feed See what friends are watching