Skip to main content
Watch Together enables synchronized playback across multiple users, allowing friends to watch content together in real-time regardless of location. Built on WebSocket-based synchronization and MPV integration.

Overview

Watch Together uses a relay server architecture with MPV’s JSON-IPC protocol for precise playback synchronization:
  • WebSocket Relay: Central server coordinates rooms and sync commands
  • MPV Integration: Bidirectional IPC via named pipes for real-time control
  • Syncplay Algorithm: Intelligent drift correction with speed adjustments
  • Room Codes: 6-character codes for easy session joining

Creating a Room

The host creates a room and receives a shareable room code.

Frontend

src/components/WatchTogether/WatchTogetherModal.tsx
const handleCreateRoom = async () => {
  const newRoom = await wtCreateRoom(
    selectedMedia.id,
    selectedMedia.title,
    buildMediaMatchKey(selectedMedia),
    nickname.trim()
  );
  console.log('Room created:', newRoom.code);
};

// Media match key for cross-file compatibility
function buildMediaMatchKey(media: MediaItem): string | undefined {
  const tokens: string[] = [];
  
  if (media.cloud_file_id?.trim()) {
    tokens.push(`cloud:${media.cloud_file_id.trim().toLowerCase()}`);
  }
  
  if (media.file_path?.trim()) {
    const fileName = normalizedPath.split('/').pop()?.trim();
    if (fileName) {
      tokens.push(`file:${fileName.toLowerCase()}`);
    }
  }
  
  if (media.tmdb_id?.trim()) {
    tokens.push(`tmdb:${media.tmdb_id.trim().toLowerCase()}`);
  }
  
  return tokens.join('|');
}

Backend

src-tauri/src/watch_together.rs
pub async fn create_room(
    &self,
    media_id: i64,
    media_title: String,
    media_match_key: Option<String>,
    nickname: String,
) -> Result<RoomInfo, String> {
    let mut session = WatchSession::new(media_id, true);
    let client_id = session.client_id.clone();
    
    // Connect to WebSocket relay server
    let relay_url = get_relay_server_url();
    let (ws_stream, _) = connect_async(relay_url).await?;
    let (mut write, mut read) = ws_stream.split();
    
    // Send create message
    let create_msg = ClientMessage::Create {
        media_title,
        media_id,
        media_match_key,
        nickname,
        client_id: client_id.clone(),
    };
    
    write.send(Message::Text(serde_json::to_string(&create_msg)?)).await?;
    
    // Wait for room_created response
    let room = wait_for_room_created(&mut read).await?;
    Ok(room)
}
Room codes use only non-ambiguous characters (no I/1/O/0) for easier sharing.

Joining a Room

Participants join using the 6-character room code.
const handleJoinRoom = async () => {
  const joinedRoom = await wtJoinRoom(
    roomCode.trim().toUpperCase(),
    selectedMedia.id,
    selectedMedia.title,
    buildMediaMatchKey(selectedMedia),
    nickname.trim()
  );
  
  // Auto-launch MPV if playback already started
  if (joinedRoom.is_playing) {
    await launchMpv(joinedRoom.current_position || 0);
  }
};
pub async fn join_room(
    &self,
    room_code: String,
    media_id: i64,
    media_title: Option<String>,
    media_match_key: Option<String>,
    nickname: String,
) -> Result<RoomInfo, String> {
    let mut session = WatchSession::new(media_id, false);
    
    let join_msg = ClientMessage::Join {
        room_code: room_code.to_uppercase(),
        nickname,
        client_id: session.client_id.clone(),
        media_id,
        media_title,
        media_match_key,
    };
    
    // Send join and wait for confirmation
    let room = send_and_wait_for_room(&mut write, &mut read, join_msg).await?;
    Ok(room)
}

Room Lobby

Before playback starts, participants wait in the lobby.

Participant States

src/components/WatchTogether/RoomLobby.tsx
const handleSetReady = async () => {
  const duration = mediaDuration ?? 0;
  await wtSetReady(duration);
  setIsReady(true);
};

const handleStartPlayback = async () => {
  // Host-only action
  await wtStartPlayback();
  await onLaunchMpv(0); // Start at position 0
  onPlaybackStart();
};

Invite Friends

Integrated with the social system for direct invitations:
const handleInviteFriend = async (friend: Friend) => {
  const inviteText = `Join my Watch Together room for "${room.media_title}". Room code: ${room.code}`;
  await sendChatMessage(friend.id, inviteText);
};

WebSocket Synchronization

Message Types

pub enum ClientMessage {
    Create { media_title: String, media_id: i64, nickname: String, ... },
    Join { room_code: String, nickname: String, ... },
    Ready { duration: f64 },
    Start,
    Sync { command: SyncCommand },
    StateReport { position: f64, paused: bool },
    Leave,
}

Sync Commands

pub enum SyncAction {
    Play { position: f64 },
    Pause { position: f64 },
    Seek { position: f64 },
}

MPV Playback Sync

Watch Together uses MPV’s JSON-IPC protocol for precise control.

IPC Connection

src-tauri/src/watch_together_mpv.rs
pub async fn connect(&mut self) -> Result<(), String> {
    let pipe_name = format!("\\\\.\\pipe\\mpv-wt-{}", self.session_id);
    
    // Wait for MPV to create the named pipe
    let handle = CreateFileW(pipe_name, GENERIC_READ | GENERIC_WRITE, ...);
    let pipe_file = File::from_raw_handle(handle);
    
    // Spawn reader/writer tasks
    let (cmd_tx, mut cmd_rx) = mpsc::channel::<String>(64);
    let (event_tx, event_rx) = mpsc::channel::<MpvSyncEvent>(64);
    
    // Observe properties for real-time updates
    self.observe_property(1, "time-pos").await?;
    self.observe_property(2, "pause").await?;
    self.observe_property(3, "duration").await?;
    
    Ok(())
}

Launch MPV

pub fn launch_mpv_wt(
    mpv_path: &str,
    file_or_url: &str,
    media_id: i64,
    session_id: &str,
    start_position: f64,
    auth_header: Option<&str>,
    is_host: bool,
) -> Result<(u32, WatchTogetherController), String> {
    let controller = WatchTogetherController::new(session_id, is_host);
    
    let mut cmd = std::process::Command::new(mpv_path);
    cmd.arg(controller.get_ipc_arg()); // --input-ipc-server=\\.\pipe\mpv-wt-{session}
    cmd.arg(format!("--start={}", start_position as i64));
    cmd.arg("--pause=yes"); // Start paused until everyone ready
    cmd.arg(file_or_url);
    
    let child = cmd.spawn()?;
    let pid = child.id();
    
    Ok((pid, controller))
}

Syncplay-Style Drift Correction

StreamVault implements Syncplay’s proven synchronization algorithm.

Sync Thresholds

const SEEK_THRESHOLD: f64 = 2.0;           // Hard seek if drift > 2s
const SLOWDOWN_KICKIN: f64 = 0.35;         // Start speed correction at 0.35s
const SLOWDOWN_RESET: f64 = 0.1;           // Reset to 1x speed when < 0.1s drift
const SLOWDOWN_RATE: f64 = 0.95;           // Slow to 95% speed when ahead
const SPEEDUP_RATE: f64 = 1.05;            // Speed up to 105% when behind
const REWIND_THRESHOLD: f64 = 4.0;         // Hard seek if 4s ahead
const FASTFORWARD_THRESHOLD: f64 = 5.0;    // Hard seek if 5s behind
const SEEK_COOLDOWN_MS: u64 = 1200;        // Prevent seek spam

Apply Sync

pub async fn apply_sync(&self, server_position: f64, server_paused: bool) -> Result<(), String> {
    let state = self.local_state.write().await;
    let elapsed = state.last_update.elapsed().as_secs_f64();
    let estimated_position = if state.paused {
        state.position
    } else {
        state.position + (elapsed * state.speed)
    };
    
    let diff = estimated_position - server_position;
    
    // Tiered correction strategy
    if diff > REWIND_THRESHOLD || diff < -FASTFORWARD_THRESHOLD {
        // Very large drift - hard seek
        self.seek_to(server_position).await?;
        self.set_speed(1.0).await?;
    } else if diff.abs() > SEEK_THRESHOLD {
        // Medium drift - seek
        self.seek_to(server_position).await?;
        self.set_speed(1.0).await?;
    } else if diff > SLOWDOWN_KICKIN {
        // Slightly ahead - slow down
        self.set_speed(SLOWDOWN_RATE).await?;
    } else if diff < -SLOWDOWN_KICKIN {
        // Slightly behind - speed up
        self.set_speed(SPEEDUP_RATE).await?;
    } else if diff.abs() < SLOWDOWN_RESET {
        // Close enough - reset to normal
        self.set_speed(1.0).await?;
    }
    
    Ok(())
}

Echo Prevention

pub async fn set_paused(&self, paused: bool) -> Result<(), String> {
    // Increment counter to ignore the echo event
    self.ignoring_on_the_fly.fetch_add(1, Ordering::SeqCst);
    
    self.send_command(vec![
        "set_property".into(),
        "pause".into(),
        paused.into(),
    ]).await?;
    
    Ok(())
}

Sync Status Indicator

Real-time feedback during playback.
src/components/WatchTogether/SyncStatusIndicator.tsx
export function SyncStatusIndicator({ 
  isConnected, 
  lastSyncTime 
}: SyncStatusIndicatorProps) {
  const [timeSinceSync, setTimeSinceSync] = useState(0);
  
  useEffect(() => {
    const interval = setInterval(() => {
      if (lastSyncTime) {
        setTimeSinceSync(Date.now() - lastSyncTime);
      }
    }, 1000);
    return () => clearInterval(interval);
  }, [lastSyncTime]);
  
  const syncStatus = timeSinceSync < 5000 ? 'synced' : 
                     timeSinceSync < 10000 ? 'syncing' : 'desynced';
  
  return (
    <div className="fixed bottom-4 right-4 z-50">
      <div className="flex items-center gap-2 bg-zinc-900/95 backdrop-blur-sm border border-zinc-800 rounded-lg px-3 py-2">
        <div className={cn(
          "w-2 h-2 rounded-full",
          syncStatus === 'synced' && "bg-green-500 animate-pulse",
          syncStatus === 'syncing' && "bg-yellow-500",
          syncStatus === 'desynced' && "bg-red-500"
        )} />
        <span className="text-xs text-zinc-400">
          {syncStatus === 'synced' ? 'Synced' : 
           syncStatus === 'syncing' ? 'Syncing...' : 'Connection Lost'}
        </span>
      </div>
    </div>
  );
}

Participant List

src/components/WatchTogether/ParticipantList.tsx
export function ParticipantList({ participants, currentUserId }: ParticipantListProps) {
  return (
    <div className="space-y-2">
      {participants.map((participant) => (
        <div key={participant.id} className="flex items-center justify-between p-2 bg-zinc-800/50 rounded">
          <div className="flex items-center gap-2">
            <div className="w-8 h-8 rounded-full bg-purple-600 flex items-center justify-center">
              {participant.nickname.charAt(0).toUpperCase()}
            </div>
            <div>
              <p className="text-sm font-medium">
                {participant.nickname}
                {participant.is_host && <span className="ml-2 text-xs text-purple-400">Host</span>}
                {participant.id === currentUserId && <span className="ml-2 text-xs text-zinc-500">You</span>}
              </p>
            </div>
          </div>
          {participant.is_ready ? (
            <Check className="w-5 h-5 text-green-500" />
          ) : (
            <Loader2 className="w-5 h-5 text-zinc-500 animate-spin" />
          )}
        </div>
      ))}
    </div>
  );
}

Event Handling

Frontend Event Listener

useEffect(() => {
  const unlisten = listen<WatchEvent>('wt-event', (event) => {
    const data = event.payload;
    
    switch (data.type) {
      case 'room_updated':
      case 'participant_changed':
        onSessionChange(data.room, sessionId, data.room.is_playing);
        break;
        
      case 'sync_command':
        // Applied in backend, just update UI
        setLastSyncTime(Date.now());
        break;
        
      case 'state_update':
        // Periodic authoritative position from server
        setLastSyncTime(Date.now());
        setIsConnected(true);
        break;
        
      case 'playback_started':
        setView('playing');
        launchMpv(data.position || 0);
        break;
        
      case 'disconnected':
        setIsConnected(false);
        onSessionChange(null, '', false);
        break;
    }
  });
  
  return () => { unlisten.then(fn => fn()); };
}, []);

Backend Event Emission

src-tauri/src/main.rs
let wt = state.watch_together.clone();
wt.set_event_callback(move |event: WatchEvent| {
    window.emit("wt-event", &event).ok();
    
    // Apply sync updates to MPV
    if let WatchEvent::StateUpdate { position, paused, .. } = &event {
        let ctrl = wt_ctrl.clone();
        tokio::spawn(async move {
            if let Some(controller) = ctrl.lock().await.as_ref() {
                controller.apply_sync(*position, *paused).await.ok();
            }
        });
    }
}).await;

Complete Tauri Commands

#[tauri::command]
async fn wt_create_room(
    state: State<'_, AppState>,
    window: Window,
    media_id: i64,
    media_title: String,
    media_match_key: Option<String>,
    nickname: String,
) -> Result<RoomInfo, String> {
    let wt = state.watch_together.clone();
    
    wt.create_room(media_id, media_title, media_match_key, nickname).await
}

Configuration

Relay Server URL

fn get_relay_server_url() -> String {
    std::env::var("STREAMVAULT_WS_URL")
        .unwrap_or_else(|_| {
            if cfg!(debug_assertions) {
                "ws://localhost:3001/ws/watchtogether".to_string()
            } else {
                "wss://streamvault-backend-server.onrender.com/ws/watchtogether".to_string()
            }
        })
}
Watch Together requires an internet connection and a running relay server.

Best Practices

  1. Network Quality: Recommend 5+ Mbps upload/download for smooth sync
  2. Media Matching: Use media_match_key to allow different file sources (local vs cloud)
  3. Error Handling: Always handle disconnection gracefully with reconnect UI
  4. RTT Monitoring: Display latency to help users understand sync delays
  5. Cooldowns: Implement seek cooldowns to prevent command spam

Troubleshooting

Desync Issues

// Check RTT (Round Trip Time)
ServerMessage::StateUpdate { your_rtt, .. } => {
    if your_rtt > 500.0 {
        // High latency warning
    }
}

Connection Loss

if (timeSinceSync > 10000) {
  // Show reconnect prompt
  <Button onClick={handleReconnect}>Reconnect</Button>
}

Friends System

Manage friends and send Watch Together invites

Activity Feed

See what friends are watching

Build docs developers (and LLMs) love