From 2d1eb0c56c3038fcfa7fd1117312b889c9090184 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 10 Jan 2024 13:11:59 -0800 Subject: [PATCH] Expose a single `updates` stream from live_kit_client::Room Co-authored-by: Julia --- crates/call/src/room.rs | 80 ++++++------------- crates/live_kit_client/examples/test_app.rs | 35 ++++---- crates/live_kit_client/src/live_kit_client.rs | 20 +++++ crates/live_kit_client/src/prod.rs | 72 +++++------------ crates/live_kit_client/src/test.rs | 61 ++++---------- 5 files changed, 95 insertions(+), 173 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 3d1f1e70c7..877afceff3 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -15,10 +15,7 @@ use gpui::{ AppContext, AsyncAppContext, Context, EventEmitter, Model, ModelContext, Task, WeakModel, }; use language::LanguageRegistry; -use live_kit_client::{ - LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate, - RemoteVideoTrackUpdate, -}; +use live_kit_client::{LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RoomUpdate}; use postage::{sink::Sink, stream::Stream, watch}; use project::Project; use settings::Settings as _; @@ -131,11 +128,11 @@ impl Room { } }); - let _maintain_video_tracks = cx.spawn({ + let _handle_updates = cx.spawn({ let room = room.clone(); move |this, mut cx| async move { - let mut track_video_changes = room.remote_video_track_updates(); - while let Some(track_change) = track_video_changes.next().await { + let mut updates = room.updates(); + while let Some(update) = updates.next().await { let this = if let Some(this) = this.upgrade() { this } else { @@ -143,26 +140,7 @@ impl Room { }; this.update(&mut cx, |this, cx| { - this.remote_video_track_updated(track_change, cx).log_err() - }) - .ok(); - } - } - }); - - let _maintain_audio_tracks = cx.spawn({ - let room = room.clone(); - |this, mut cx| async move { - let mut track_audio_changes = room.remote_audio_track_updates(); - while let Some(track_change) = track_audio_changes.next().await { - let this = if let Some(this) = this.upgrade() { - this - } else { - break; - }; - - this.update(&mut cx, |this, cx| { - this.remote_audio_track_updated(track_change, cx).log_err() + this.live_kit_room_updated(update, cx).log_err() }) .ok(); } @@ -195,7 +173,7 @@ impl Room { deafened: false, speaking: false, _maintain_room, - _maintain_tracks: [_maintain_video_tracks, _maintain_audio_tracks], + _handle_updates, }) } else { None @@ -877,8 +855,8 @@ impl Room { .remote_audio_track_publications(&user.id.to_string()); for track in video_tracks { - this.remote_video_track_updated( - RemoteVideoTrackUpdate::Subscribed(track), + this.live_kit_room_updated( + RoomUpdate::SubscribedToRemoteVideoTrack(track), cx, ) .log_err(); @@ -887,8 +865,8 @@ impl Room { for (track, publication) in audio_tracks.iter().zip(publications.iter()) { - this.remote_audio_track_updated( - RemoteAudioTrackUpdate::Subscribed( + this.live_kit_room_updated( + RoomUpdate::SubscribedToRemoteAudioTrack( track.clone(), publication.clone(), ), @@ -979,13 +957,13 @@ impl Room { } } - fn remote_video_track_updated( + fn live_kit_room_updated( &mut self, - change: RemoteVideoTrackUpdate, + update: RoomUpdate, cx: &mut ModelContext, ) -> Result<()> { - match change { - RemoteVideoTrackUpdate::Subscribed(track) => { + match update { + RoomUpdate::SubscribedToRemoteVideoTrack(track) => { let user_id = track.publisher_id().parse()?; let track_id = track.sid().to_string(); let participant = self @@ -997,7 +975,8 @@ impl Room { participant_id: participant.peer_id, }); } - RemoteVideoTrackUpdate::Unsubscribed { + + RoomUpdate::UnsubscribedFromRemoteVideoTrack { publisher_id, track_id, } => { @@ -1011,19 +990,8 @@ impl Room { participant_id: participant.peer_id, }); } - } - cx.notify(); - Ok(()) - } - - fn remote_audio_track_updated( - &mut self, - change: RemoteAudioTrackUpdate, - cx: &mut ModelContext, - ) -> Result<()> { - match change { - RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } => { + RoomUpdate::ActiveSpeakersChanged { speakers } => { let mut speaker_ids = speakers .into_iter() .filter_map(|speaker_sid| speaker_sid.parse().ok()) @@ -1045,9 +1013,9 @@ impl Room { } } } - cx.notify(); } - RemoteAudioTrackUpdate::MuteChanged { track_id, muted } => { + + RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } => { let mut found = false; for participant in &mut self.remote_participants.values_mut() { for track in participant.audio_tracks.values() { @@ -1061,10 +1029,9 @@ impl Room { break; } } - - cx.notify(); } - RemoteAudioTrackUpdate::Subscribed(track, publication) => { + + RoomUpdate::SubscribedToRemoteAudioTrack(track, publication) => { let user_id = track.publisher_id().parse()?; let track_id = track.sid().to_string(); let participant = self @@ -1078,7 +1045,8 @@ impl Room { participant_id: participant.peer_id, }); } - RemoteAudioTrackUpdate::Unsubscribed { + + RoomUpdate::UnsubscribedFromRemoteAudioTrack { publisher_id, track_id, } => { @@ -1597,7 +1565,7 @@ struct LiveKitRoom { speaking: bool, next_publish_id: usize, _maintain_room: Task<()>, - _maintain_tracks: [Task<()>; 2], + _handle_updates: Task<()>, } impl LiveKitRoom { diff --git a/crates/live_kit_client/examples/test_app.rs b/crates/live_kit_client/examples/test_app.rs index 68a8a84209..9fc8aafd30 100644 --- a/crates/live_kit_client/examples/test_app.rs +++ b/crates/live_kit_client/examples/test_app.rs @@ -2,9 +2,7 @@ use std::{sync::Arc, time::Duration}; use futures::StreamExt; use gpui::{actions, KeyBinding, Menu, MenuItem}; -use live_kit_client::{ - LocalAudioTrack, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, Room, -}; +use live_kit_client::{LocalAudioTrack, LocalVideoTrack, Room, RoomUpdate}; use live_kit_server::token::{self, VideoGrant}; use log::LevelFilter; use simplelog::SimpleLogger; @@ -60,12 +58,12 @@ fn main() { let room_b = Room::new(); room_b.connect(&live_kit_url, &user2_token).await.unwrap(); - let mut audio_track_updates = room_b.remote_audio_track_updates(); + let mut room_updates = room_b.updates(); let audio_track = LocalAudioTrack::create(); let audio_track_publication = room_a.publish_audio_track(audio_track).await.unwrap(); - if let RemoteAudioTrackUpdate::Subscribed(track, _) = - audio_track_updates.next().await.unwrap() + if let RoomUpdate::SubscribedToRemoteAudioTrack(track, _) = + room_updates.next().await.unwrap() { let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); assert_eq!(remote_tracks.len(), 1); @@ -78,8 +76,8 @@ fn main() { audio_track_publication.set_mute(true).await.unwrap(); println!("waiting for mute changed!"); - if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = - audio_track_updates.next().await.unwrap() + if let RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } = + room_updates.next().await.unwrap() { let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); assert_eq!(remote_tracks[0].sid(), track_id); @@ -90,8 +88,8 @@ fn main() { audio_track_publication.set_mute(false).await.unwrap(); - if let RemoteAudioTrackUpdate::MuteChanged { track_id, muted } = - audio_track_updates.next().await.unwrap() + if let RoomUpdate::RemoteAudioTrackMuteChanged { track_id, muted } = + room_updates.next().await.unwrap() { let remote_tracks = room_b.remote_audio_tracks("test-participant-1"); assert_eq!(remote_tracks[0].sid(), track_id); @@ -110,13 +108,13 @@ fn main() { room_a.unpublish_track(audio_track_publication); // Clear out any active speakers changed messages - let mut next = audio_track_updates.next().await.unwrap(); - while let RemoteAudioTrackUpdate::ActiveSpeakersChanged { speakers } = next { + let mut next = room_updates.next().await.unwrap(); + while let RoomUpdate::ActiveSpeakersChanged { speakers } = next { println!("Speakers changed: {:?}", speakers); - next = audio_track_updates.next().await.unwrap(); + next = room_updates.next().await.unwrap(); } - if let RemoteAudioTrackUpdate::Unsubscribed { + if let RoomUpdate::UnsubscribedFromRemoteAudioTrack { publisher_id, track_id, } = next @@ -128,7 +126,6 @@ fn main() { panic!("unexpected message"); } - let mut video_track_updates = room_b.remote_video_track_updates(); let displays = room_a.display_sources().await.unwrap(); let display = displays.into_iter().next().unwrap(); @@ -136,8 +133,8 @@ fn main() { let local_video_track_publication = room_a.publish_video_track(local_video_track).await.unwrap(); - if let RemoteVideoTrackUpdate::Subscribed(track) = - video_track_updates.next().await.unwrap() + if let RoomUpdate::SubscribedToRemoteVideoTrack(track) = + room_updates.next().await.unwrap() { let remote_video_tracks = room_b.remote_video_tracks("test-participant-1"); assert_eq!(remote_video_tracks.len(), 1); @@ -152,10 +149,10 @@ fn main() { .pop() .unwrap(); room_a.unpublish_track(local_video_track_publication); - if let RemoteVideoTrackUpdate::Unsubscribed { + if let RoomUpdate::UnsubscribedFromRemoteVideoTrack { publisher_id, track_id, - } = video_track_updates.next().await.unwrap() + } = room_updates.next().await.unwrap() { assert_eq!(publisher_id, "test-participant-1"); assert_eq!(remote_video_track.sid(), track_id); diff --git a/crates/live_kit_client/src/live_kit_client.rs b/crates/live_kit_client/src/live_kit_client.rs index 47cc3873ff..7052b107bc 100644 --- a/crates/live_kit_client/src/live_kit_client.rs +++ b/crates/live_kit_client/src/live_kit_client.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + #[cfg(not(any(test, feature = "test-support")))] pub mod prod; @@ -9,3 +11,21 @@ pub mod test; #[cfg(any(test, feature = "test-support"))] pub use test::*; + +pub type Sid = String; + +#[derive(Clone, Eq, PartialEq)] +pub enum ConnectionState { + Disconnected, + Connected { url: String, token: String }, +} + +#[derive(Clone)] +pub enum RoomUpdate { + ActiveSpeakersChanged { speakers: Vec }, + RemoteAudioTrackMuteChanged { track_id: Sid, muted: bool }, + SubscribedToRemoteVideoTrack(Arc), + SubscribedToRemoteAudioTrack(Arc, Arc), + UnsubscribedFromRemoteVideoTrack { publisher_id: Sid, track_id: Sid }, + UnsubscribedFromRemoteAudioTrack { publisher_id: Sid, track_id: Sid }, +} diff --git a/crates/live_kit_client/src/prod.rs b/crates/live_kit_client/src/prod.rs index 5d8ef9bf13..b9f5aa6aa8 100644 --- a/crates/live_kit_client/src/prod.rs +++ b/crates/live_kit_client/src/prod.rs @@ -1,3 +1,4 @@ +use crate::{ConnectionState, RoomUpdate, Sid}; use anyhow::{anyhow, Context, Result}; use core_foundation::{ array::{CFArray, CFArrayRef}, @@ -155,22 +156,13 @@ extern "C" { fn LKRemoteTrackPublicationGetSid(publication: swift::RemoteTrackPublication) -> CFStringRef; } -pub type Sid = String; - -#[derive(Clone, Eq, PartialEq)] -pub enum ConnectionState { - Disconnected, - Connected { url: String, token: String }, -} - pub struct Room { native_room: swift::Room, connection: Mutex<( watch::Sender, watch::Receiver, )>, - remote_audio_track_subscribers: Mutex>>, - remote_video_track_subscribers: Mutex>>, + update_subscribers: Mutex>>, _delegate: RoomDelegate, } @@ -181,8 +173,7 @@ impl Room { Self { native_room: unsafe { LKRoomCreate(delegate.native_delegate) }, connection: Mutex::new(watch::channel_with(ConnectionState::Disconnected)), - remote_audio_track_subscribers: Default::default(), - remote_video_track_subscribers: Default::default(), + update_subscribers: Default::default(), _delegate: delegate, } }) @@ -397,15 +388,9 @@ impl Room { } } - pub fn remote_audio_track_updates(&self) -> mpsc::UnboundedReceiver { + pub fn updates(&self) -> mpsc::UnboundedReceiver { let (tx, rx) = mpsc::unbounded(); - self.remote_audio_track_subscribers.lock().push(tx); - rx - } - - pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver { - let (tx, rx) = mpsc::unbounded(); - self.remote_video_track_subscribers.lock().push(tx); + self.update_subscribers.lock().push(tx); rx } @@ -416,8 +401,8 @@ impl Room { ) { let track = Arc::new(track); let publication = Arc::new(publication); - self.remote_audio_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::Subscribed( + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::SubscribedToRemoteAudioTrack( track.clone(), publication.clone(), )) @@ -426,8 +411,8 @@ impl Room { } fn did_unsubscribe_from_remote_audio_track(&self, publisher_id: String, track_id: String) { - self.remote_audio_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::Unsubscribed { + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::UnsubscribedFromRemoteAudioTrack { publisher_id: publisher_id.clone(), track_id: track_id.clone(), }) @@ -436,8 +421,8 @@ impl Room { } fn mute_changed_from_remote_audio_track(&self, track_id: String, muted: bool) { - self.remote_audio_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::MuteChanged { + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::RemoteAudioTrackMuteChanged { track_id: track_id.clone(), muted, }) @@ -445,29 +430,26 @@ impl Room { }); } - // A vec of publisher IDs fn active_speakers_changed(&self, speakers: Vec) { - self.remote_audio_track_subscribers - .lock() - .retain(move |tx| { - tx.unbounded_send(RemoteAudioTrackUpdate::ActiveSpeakersChanged { - speakers: speakers.clone(), - }) - .is_ok() - }); + self.update_subscribers.lock().retain(move |tx| { + tx.unbounded_send(RoomUpdate::ActiveSpeakersChanged { + speakers: speakers.clone(), + }) + .is_ok() + }); } fn did_subscribe_to_remote_video_track(&self, track: RemoteVideoTrack) { let track = Arc::new(track); - self.remote_video_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteVideoTrackUpdate::Subscribed(track.clone())) + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) .is_ok() }); } fn did_unsubscribe_from_remote_video_track(&self, publisher_id: String, track_id: String) { - self.remote_video_track_subscribers.lock().retain(|tx| { - tx.unbounded_send(RemoteVideoTrackUpdate::Unsubscribed { + self.update_subscribers.lock().retain(|tx| { + tx.unbounded_send(RoomUpdate::UnsubscribedFromRemoteVideoTrack { publisher_id: publisher_id.clone(), track_id: track_id.clone(), }) @@ -889,18 +871,6 @@ impl Drop for RemoteVideoTrack { } } -pub enum RemoteVideoTrackUpdate { - Subscribed(Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - -pub enum RemoteAudioTrackUpdate { - ActiveSpeakersChanged { speakers: Vec }, - MuteChanged { track_id: Sid, muted: bool }, - Subscribed(Arc, Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - pub struct MacOSDisplay(swift::MacOSDisplay); impl MacOSDisplay { diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 4575fdd2c1..9c1a5ec59a 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -1,3 +1,4 @@ +use crate::{ConnectionState, RoomUpdate, Sid}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use collections::{BTreeMap, HashMap}; @@ -104,9 +105,8 @@ impl TestServer { client_room .0 .lock() - .video_track_updates - .0 - .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .updates_tx + .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) .unwrap(); } room.client_rooms.insert(identity, client_room); @@ -211,9 +211,8 @@ impl TestServer { let _ = client_room .0 .lock() - .video_track_updates - .0 - .try_broadcast(RemoteVideoTrackUpdate::Subscribed(track.clone())) + .updates_tx + .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(track.clone())) .unwrap(); } } @@ -261,9 +260,8 @@ impl TestServer { let _ = client_room .0 .lock() - .audio_track_updates - .0 - .try_broadcast(RemoteAudioTrackUpdate::Subscribed( + .updates_tx + .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack( track.clone(), publication.clone(), )) @@ -369,39 +367,26 @@ impl live_kit_server::api::Client for TestApiClient { } } -pub type Sid = String; - struct RoomState { connection: ( watch::Sender, watch::Receiver, ), display_sources: Vec, - audio_track_updates: ( - async_broadcast::Sender, - async_broadcast::Receiver, - ), - video_track_updates: ( - async_broadcast::Sender, - async_broadcast::Receiver, - ), -} - -#[derive(Clone, Eq, PartialEq)] -pub enum ConnectionState { - Disconnected, - Connected { url: String, token: String }, + updates_tx: async_broadcast::Sender, + updates_rx: async_broadcast::Receiver, } pub struct Room(Mutex); impl Room { pub fn new() -> Arc { + let (updates_tx, updates_rx) = async_broadcast::broadcast(128); Arc::new(Self(Mutex::new(RoomState { connection: watch::channel_with(ConnectionState::Disconnected), display_sources: Default::default(), - video_track_updates: async_broadcast::broadcast(128), - audio_track_updates: async_broadcast::broadcast(128), + updates_tx, + updates_rx, }))) } @@ -505,12 +490,8 @@ impl Room { .collect() } - pub fn remote_audio_track_updates(&self) -> impl Stream { - self.0.lock().audio_track_updates.1.clone() - } - - pub fn remote_video_track_updates(&self) -> impl Stream { - self.0.lock().video_track_updates.1.clone() + pub fn updates(&self) -> impl Stream { + self.0.lock().updates_rx.clone() } pub fn set_display_sources(&self, sources: Vec) { @@ -646,20 +627,6 @@ impl RemoteAudioTrack { } } -#[derive(Clone)] -pub enum RemoteVideoTrackUpdate { - Subscribed(Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - -#[derive(Clone)] -pub enum RemoteAudioTrackUpdate { - ActiveSpeakersChanged { speakers: Vec }, - MuteChanged { track_id: Sid, muted: bool }, - Subscribed(Arc, Arc), - Unsubscribed { publisher_id: Sid, track_id: Sid }, -} - #[derive(Clone)] pub struct MacOSDisplay { frames: (