diff --git a/Cargo.lock b/Cargo.lock index fa167e245f..2188d2879f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -181,6 +181,17 @@ dependencies = [ "futures-core", ] +[[package]] +name = "async-broadcast" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d26004fe83b2d1cd3a97609b21e39f9a31535822210fe83205d2ce48866ea61" +dependencies = [ + "event-listener", + "futures-core", + "parking_lot 0.12.1", +] + [[package]] name = "async-channel" version = "1.7.1" @@ -2991,7 +3002,7 @@ name = "language" version = "0.1.0" dependencies = [ "anyhow", - "async-broadcast", + "async-broadcast 0.3.4", "async-trait", "client", "clock", @@ -3164,6 +3175,7 @@ name = "live_kit_client" version = "0.1.0" dependencies = [ "anyhow", + "async-broadcast 0.4.1", "async-trait", "block", "byteorder", @@ -3181,6 +3193,7 @@ dependencies = [ "live_kit_server", "log", "media", + "nanoid", "objc", "parking_lot 0.11.2", "postage", diff --git a/crates/call/src/participant.rs b/crates/call/src/participant.rs index f335689050..c045bd77dc 100644 --- a/crates/call/src/participant.rs +++ b/crates/call/src/participant.rs @@ -2,7 +2,7 @@ use anyhow::{anyhow, Result}; use client::{proto, User}; use collections::HashMap; use gpui::{Task, WeakModelHandle}; -use media::core_video::CVImageBuffer; +use live_kit_client::Frame; use project::Project; use std::sync::Arc; @@ -46,13 +46,13 @@ pub struct RemoteParticipant { #[derive(Clone)] pub struct RemoteVideoTrack { - pub(crate) frame: Option, + pub(crate) frame: Option, pub(crate) _live_kit_track: Arc, pub(crate) _maintain_frame: Arc>, } impl RemoteVideoTrack { - pub fn frame(&self) -> Option<&CVImageBuffer> { + pub fn frame(&self) -> Option<&Frame> { self.frame.as_ref() } } diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index f09b255df5..399b0e857e 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -418,31 +418,33 @@ impl Room { _live_kit_track: track, _maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move { while let Some(frame) = rx.next().await { - let this = if let Some(this) = this.upgrade(&cx) { - this - } else { - break; - }; - - let done = this.update(&mut cx, |this, cx| { - if let Some(track) = - this.remote_participants.get_mut(&peer_id).and_then( - |participant| participant.tracks.get_mut(&track_id), - ) - { - track.frame = frame; - cx.emit(Event::Frame { - participant_id: peer_id, - track_id: track_id.clone(), - }); - false + if let Some(frame) = frame { + let this = if let Some(this) = this.upgrade(&cx) { + this } else { - true - } - }); + break; + }; - if done { - break; + let done = this.update(&mut cx, |this, cx| { + if let Some(track) = + this.remote_participants.get_mut(&peer_id).and_then( + |participant| participant.tracks.get_mut(&track_id), + ) + { + track.frame = Some(frame); + cx.emit(Event::Frame { + participant_id: peer_id, + track_id: track_id.clone(), + }); + false + } else { + true + } + }); + + if done { + break; + } } } })), @@ -620,18 +622,18 @@ impl Room { return Task::ready(Err(anyhow!("screen was already shared"))); } - let publish_id = if let Some(live_kit) = self.live_kit.as_mut() { + let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() { let publish_id = post_inc(&mut live_kit.next_publish_id); live_kit.screen_track = ScreenTrack::Pending { publish_id }; cx.notify(); - publish_id + (live_kit.room.display_sources(), publish_id) } else { return Task::ready(Err(anyhow!("live-kit was not initialized"))); }; cx.spawn_weak(|this, mut cx| async move { let publish_track = async { - let displays = live_kit_client::display_sources().await?; + let displays = displays.await?; let display = displays .first() .ok_or_else(|| anyhow!("no display found"))?; @@ -711,6 +713,15 @@ impl Room { } } } + + #[cfg(any(test, feature = "test-support"))] + pub fn set_display_sources(&self, sources: Vec) { + self.live_kit + .as_ref() + .unwrap() + .room + .set_display_sources(sources); + } } struct LiveKitRoom { diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 7cd964cd8a..8ac898a4ad 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -5,7 +5,10 @@ use crate::{ }; use ::rpc::Peer; use anyhow::anyhow; -use call::{room, ActiveCall, ParticipantLocation, Room}; +use call::{ + room::{self, Event}, + ActiveCall, ParticipantLocation, Room, +}; use client::{ self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Connection, Credentials, EstablishConnectionError, PeerId, User, UserStore, RECEIVE_TIMEOUT, @@ -30,6 +33,7 @@ use language::{ range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language, LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope, }; +use live_kit_client::{Frame, MacOSDisplay}; use lsp::{self, FakeLanguageServer}; use parking_lot::Mutex; use project::{ @@ -185,6 +189,45 @@ async fn test_basic_calls( } ); + // User A shares their screen + let display = MacOSDisplay::new(); + let events_b = active_call_events(cx_b); + active_call_a + .update(cx_a, |call, cx| { + call.room().unwrap().update(cx, |room, cx| { + room.set_display_sources(vec![display.clone()]); + room.share_screen(cx) + }) + }) + .await + .unwrap(); + + let frame = Frame { + width: 800, + height: 600, + label: "a".into(), + }; + display.send_frame(frame.clone()); + deterministic.run_until_parked(); + + assert_eq!(events_b.borrow().len(), 1); + let event = events_b.borrow().first().unwrap().clone(); + if let Event::Frame { + participant_id, + track_id, + } = event + { + assert_eq!(participant_id, client_a.peer_id().unwrap()); + room_b.read_with(cx_b, |room, _| { + assert_eq!( + room.remote_participants()[&client_a.peer_id().unwrap()].tracks[&track_id].frame(), + Some(&frame) + ); + }); + } else { + panic!("unexpected event") + } + // User A leaves the room. active_call_a.update(cx_a, |call, cx| { call.hang_up(cx).unwrap(); @@ -954,21 +997,21 @@ async fn test_active_call_events( deterministic.run_until_parked(); assert_eq!(mem::take(&mut *events_a.borrow_mut()), vec![]); assert_eq!(mem::take(&mut *events_b.borrow_mut()), vec![]); +} - fn active_call_events(cx: &mut TestAppContext) -> Rc>> { - let events = Rc::new(RefCell::new(Vec::new())); - let active_call = cx.read(ActiveCall::global); - cx.update({ - let events = events.clone(); - |cx| { - cx.subscribe(&active_call, move |_, event, _| { - events.borrow_mut().push(event.clone()) - }) - .detach() - } - }); - events - } +fn active_call_events(cx: &mut TestAppContext) -> Rc>> { + let events = Rc::new(RefCell::new(Vec::new())); + let active_call = cx.read(ActiveCall::global); + cx.update({ + let events = events.clone(); + |cx| { + cx.subscribe(&active_call, move |_, event, _| { + events.borrow_mut().push(event.clone()) + }) + .detach() + } + }); + events } #[gpui::test(iterations = 10)] diff --git a/crates/live_kit_client/Cargo.toml b/crates/live_kit_client/Cargo.toml index ee2c3f88dc..cd8006314c 100644 --- a/crates/live_kit_client/Cargo.toml +++ b/crates/live_kit_client/Cargo.toml @@ -13,11 +13,13 @@ name = "test_app" [features] test-support = [ + "async-broadcast", "async-trait", "collections/test-support", "gpui/test-support", "lazy_static", - "live_kit_server" + "live_kit_server", + "nanoid", ] [dependencies] @@ -27,13 +29,16 @@ live_kit_server = { path = "../live_kit_server", optional = true } media = { path = "../media" } anyhow = "1.0.38" -async-trait = { version = "0.1", optional = true } core-foundation = "0.9.3" core-graphics = "0.22.3" futures = "0.3" -lazy_static = { version = "1.4", optional = true } parking_lot = "0.11.1" +async-broadcast = { version = "0.4", optional = true } +async-trait = { version = "0.1", optional = true } +lazy_static = { version = "1.4", optional = true } +nanoid = { version ="0.4", optional = true} + [dev-dependencies] collections = { path = "../collections", features = ["test-support"] } gpui = { path = "../gpui", features = ["test-support"] } diff --git a/crates/live_kit_client/examples/test_app.rs b/crates/live_kit_client/examples/test_app.rs index b4f037cead..7ad1eee967 100644 --- a/crates/live_kit_client/examples/test_app.rs +++ b/crates/live_kit_client/examples/test_app.rs @@ -1,21 +1,9 @@ use futures::StreamExt; -use gpui::{ - actions, - elements::{Canvas, *}, - keymap::Binding, - platform::current::Surface, - Menu, MenuItem, ViewContext, -}; +use gpui::{actions, keymap::Binding, Menu, MenuItem}; use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate, Room}; -use live_kit_server::{ - api::Client, - token::{self, VideoGrant}, -}; +use live_kit_server::token::{self, VideoGrant}; use log::LevelFilter; -use media::core_video::CVImageBuffer; -use postage::watch; use simplelog::SimpleLogger; -use std::sync::Arc; actions!(capture, [Quit]); @@ -62,7 +50,7 @@ fn main() { let mut track_changes = room_b.remote_video_track_updates(); - let displays = live_kit_client::display_sources().await.unwrap(); + let displays = room_a.display_sources().await.unwrap(); let display = displays.into_iter().next().unwrap(); let track_a = LocalVideoTrack::screen_share_for_display(&display); @@ -72,6 +60,7 @@ fn main() { let remote_tracks = room_b.remote_video_tracks("test-participant-1"); assert_eq!(remote_tracks.len(), 1); assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1"); + dbg!(track.sid()); assert_eq!(track.publisher_id(), "test-participant-1"); } else { panic!("unexpected message"); @@ -100,73 +89,6 @@ fn main() { }); } -struct ScreenCaptureView { - image_buffer: Option, - _room: Arc, -} - -impl gpui::Entity for ScreenCaptureView { - type Event = (); -} - -impl ScreenCaptureView { - pub fn new(room: Arc, cx: &mut ViewContext) -> Self { - let mut remote_video_tracks = room.remote_video_track_updates(); - cx.spawn_weak(|this, mut cx| async move { - if let Some(video_track) = remote_video_tracks.next().await { - let (mut frames_tx, mut frames_rx) = watch::channel_with(None); - // video_track.add_renderer(move |frame| *frames_tx.borrow_mut() = Some(frame)); - - while let Some(frame) = frames_rx.next().await { - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| { - this.image_buffer = frame; - cx.notify(); - }); - } else { - break; - } - } - } - }) - .detach(); - - Self { - image_buffer: None, - _room: room, - } - } -} - -impl gpui::View for ScreenCaptureView { - fn ui_name() -> &'static str { - "View" - } - - fn render(&mut self, _: &mut gpui::RenderContext) -> gpui::ElementBox { - let image_buffer = self.image_buffer.clone(); - let canvas = Canvas::new(move |bounds, _, cx| { - if let Some(image_buffer) = image_buffer.clone() { - cx.scene.push_surface(Surface { - bounds, - image_buffer, - }); - } - }); - - if let Some(image_buffer) = self.image_buffer.as_ref() { - canvas - .constrained() - .with_width(image_buffer.width() as f32) - .with_height(image_buffer.height() as f32) - .aligned() - .boxed() - } else { - canvas.boxed() - } - } -} - fn quit(_: &Quit, cx: &mut gpui::MutableAppContext) { cx.platform().quit(); } diff --git a/crates/live_kit_client/src/prod.rs b/crates/live_kit_client/src/prod.rs index 427f45fe72..15f5faacca 100644 --- a/crates/live_kit_client/src/prod.rs +++ b/crates/live_kit_client/src/prod.rs @@ -8,7 +8,8 @@ use futures::{ channel::{mpsc, oneshot}, Future, }; -use media::core_video::{CVImageBuffer, CVImageBufferRef}; +pub use media::core_video::CVImageBuffer; +use media::core_video::CVImageBufferRef; use parking_lot::Mutex; use std::{ ffi::c_void, @@ -109,8 +110,35 @@ impl Room { async { rx.await.unwrap().context("error connecting to room") } } + pub fn display_sources(self: &Arc) -> impl Future>> { + extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) { + unsafe { + let tx = Box::from_raw(tx as *mut oneshot::Sender>>); + + if sources.is_null() { + let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error)))); + } else { + let sources = CFArray::wrap_under_get_rule(sources) + .into_iter() + .map(|source| MacOSDisplay::new(*source)) + .collect(); + + let _ = tx.send(Ok(sources)); + } + } + } + + let (tx, rx) = oneshot::channel(); + + unsafe { + LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback); + } + + async move { rx.await.unwrap() } + } + pub fn publish_video_track( - &self, + self: &Arc, track: &LocalVideoTrack, ) -> impl Future> { let (tx, rx) = oneshot::channel::>(); @@ -338,16 +366,16 @@ impl RemoteVideoTrack { pub fn add_renderer(&self, callback: F) where - F: 'static + FnMut(CVImageBuffer), + F: 'static + Send + Sync + FnMut(Frame), { extern "C" fn on_frame(callback_data: *mut c_void, frame: CVImageBufferRef) where - F: FnMut(CVImageBuffer), + F: FnMut(Frame), { unsafe { let buffer = CVImageBuffer::wrap_under_get_rule(frame); let callback = &mut *(callback_data as *mut F); - callback(buffer); + callback(Frame(buffer)); } } @@ -394,29 +422,18 @@ impl Drop for MacOSDisplay { } } -pub fn display_sources() -> impl Future>> { - extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) { - unsafe { - let tx = Box::from_raw(tx as *mut oneshot::Sender>>); +pub struct Frame(CVImageBuffer); - if sources.is_null() { - let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error)))); - } else { - let sources = CFArray::wrap_under_get_rule(sources) - .into_iter() - .map(|source| MacOSDisplay::new(*source)) - .collect(); - - let _ = tx.send(Ok(sources)); - } - } +impl Frame { + pub fn width(&self) -> usize { + self.0.width() } - let (tx, rx) = oneshot::channel(); - - unsafe { - LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback); + pub fn height(&self) -> usize { + self.0.height() } - async move { rx.await.unwrap() } + pub fn image(&self) -> CVImageBuffer { + self.0.clone() + } } diff --git a/crates/live_kit_client/src/test.rs b/crates/live_kit_client/src/test.rs index 050cfd0a47..6a1a6ee2d6 100644 --- a/crates/live_kit_client/src/test.rs +++ b/crates/live_kit_client/src/test.rs @@ -1,8 +1,8 @@ use anyhow::{anyhow, Result}; use async_trait::async_trait; use collections::HashMap; -use futures::{channel::mpsc, future}; -use gpui::executor::Background; +use futures::{Stream, StreamExt}; +use gpui::executor::{self, Background}; use lazy_static::lazy_static; use live_kit_server::token; use media::core_video::CVImageBuffer; @@ -96,14 +96,14 @@ impl TestServer { let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?; - if room.clients.contains_key(&identity) { + if room.client_rooms.contains_key(&identity) { Err(anyhow!( "{:?} attempted to join room {:?} twice", identity, room_name )) } else { - room.clients.insert(identity, client_room); + room.client_rooms.insert(identity, client_room); Ok(()) } } @@ -117,7 +117,7 @@ impl TestServer { let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - room.clients.remove(&identity).ok_or_else(|| { + room.client_rooms.remove(&identity).ok_or_else(|| { anyhow!( "{:?} attempted to leave room {:?} before joining it", identity, @@ -135,7 +135,7 @@ impl TestServer { let room = server_rooms .get_mut(&room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; - room.clients.remove(&identity).ok_or_else(|| { + room.client_rooms.remove(&identity).ok_or_else(|| { anyhow!( "participant {:?} did not join room {:?}", identity, @@ -144,11 +144,44 @@ impl TestServer { })?; Ok(()) } + + async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> { + self.background.simulate_random_delay().await; + let claims = live_kit_server::token::validate(&token, &self.secret_key)?; + let identity = claims.sub.unwrap().to_string(); + let room_name = claims.video.room.unwrap(); + + let mut server_rooms = self.rooms.lock(); + let room = server_rooms + .get_mut(&*room_name) + .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; + + let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack { + sid: nanoid::nanoid!(17), + publisher_id: identity.clone(), + frames_rx: local_track.frames_rx.clone(), + background: self.background.clone(), + })); + + for (id, client_room) in &room.client_rooms { + if *id != identity { + let _ = client_room + .0 + .lock() + .video_track_updates + .0 + .try_broadcast(update.clone()) + .unwrap(); + } + } + + Ok(()) + } } #[derive(Default)] struct TestServerRoom { - clients: HashMap>, + client_rooms: HashMap>, } impl TestServerRoom {} @@ -194,17 +227,29 @@ impl live_kit_server::api::Client for TestApiClient { pub type Sid = String; -#[derive(Default)] struct RoomState { - token: Option, + connection: Option, + display_sources: Vec, + video_track_updates: ( + async_broadcast::Sender, + async_broadcast::Receiver, + ), +} + +struct ConnectionState { + url: String, + token: String, } -#[derive(Default)] pub struct Room(Mutex); impl Room { pub fn new() -> Arc { - Default::default() + Arc::new(Self(Mutex::new(RoomState { + connection: None, + display_sources: Default::default(), + video_track_updates: async_broadcast::broadcast(128), + }))) } pub fn connect(self: &Arc, url: &str, token: &str) -> impl Future> { @@ -214,36 +259,75 @@ impl Room { async move { let server = TestServer::get(&url)?; server.join_room(token.clone(), this.clone()).await?; - this.0.lock().token = Some(token); + this.0.lock().connection = Some(ConnectionState { url, token }); Ok(()) } } - pub fn publish_video_track( - &self, - track: &LocalVideoTrack, - ) -> impl Future> { - future::pending() + pub fn display_sources(self: &Arc) -> impl Future>> { + let this = self.clone(); + async move { + let server = this.test_server(); + server.background.simulate_random_delay().await; + Ok(this.0.lock().display_sources.clone()) + } } - pub fn unpublish_track(&self, publication: LocalTrackPublication) {} + pub fn publish_video_track( + self: &Arc, + track: &LocalVideoTrack, + ) -> impl Future> { + let this = self.clone(); + let track = track.clone(); + async move { + this.test_server() + .publish_video_track(this.token(), track) + .await?; + Ok(LocalTrackPublication) + } + } - pub fn remote_video_tracks(&self, participant_id: &str) -> Vec> { + pub fn unpublish_track(&self, _: LocalTrackPublication) {} + + pub fn remote_video_tracks(&self, _: &str) -> Vec> { Default::default() } - pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver { - mpsc::unbounded().1 + pub fn remote_video_track_updates(&self) -> impl Stream { + self.0.lock().video_track_updates.1.clone() + } + + pub fn set_display_sources(&self, sources: Vec) { + self.0.lock().display_sources = sources; + } + + fn test_server(&self) -> Arc { + let this = self.0.lock(); + let connection = this + .connection + .as_ref() + .expect("must be connected to call this method"); + TestServer::get(&connection.url).unwrap() + } + + fn token(&self) -> String { + self.0 + .lock() + .connection + .as_ref() + .expect("must be connected to call this method") + .token + .clone() } } impl Drop for Room { fn drop(&mut self) { - if let Some(token) = self.0.lock().token.take() { - if let Ok(server) = TestServer::get(&token) { + if let Some(connection) = self.0.lock().connection.take() { + if let Ok(server) = TestServer::get(&connection.token) { let background = server.background.clone(); background - .spawn(async move { server.leave_room(token).await.unwrap() }) + .spawn(async move { server.leave_room(connection.token).await.unwrap() }) .detach(); } } @@ -252,17 +336,24 @@ impl Drop for Room { pub struct LocalTrackPublication; -pub struct LocalVideoTrack; +#[derive(Clone)] +pub struct LocalVideoTrack { + frames_rx: async_broadcast::Receiver, +} impl LocalVideoTrack { pub fn screen_share_for_display(display: &MacOSDisplay) -> Self { - Self + Self { + frames_rx: display.frames.1.clone(), + } } } pub struct RemoteVideoTrack { sid: Sid, publisher_id: Sid, + frames_rx: async_broadcast::Receiver, + background: Arc, } impl RemoteVideoTrack { @@ -274,20 +365,64 @@ impl RemoteVideoTrack { &self.publisher_id } - pub fn add_renderer(&self, callback: F) + pub fn add_renderer(&self, mut callback: F) where - F: 'static + FnMut(CVImageBuffer), + F: 'static + Send + Sync + FnMut(Frame), { + let mut frames_rx = self.frames_rx.clone(); + self.background + .spawn(async move { + while let Some(frame) = frames_rx.next().await { + callback(frame) + } + }) + .detach(); } } +#[derive(Clone)] pub enum RemoteVideoTrackUpdate { Subscribed(Arc), Unsubscribed { publisher_id: Sid, track_id: Sid }, } -pub struct MacOSDisplay; - -pub fn display_sources() -> impl Future>> { - future::pending() +#[derive(Clone)] +pub struct MacOSDisplay { + frames: ( + async_broadcast::Sender, + async_broadcast::Receiver, + ), +} + +impl MacOSDisplay { + pub fn new() -> Self { + Self { + frames: async_broadcast::broadcast(128), + } + } + + pub fn send_frame(&self, frame: Frame) { + self.frames.0.try_broadcast(frame).unwrap(); + } +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct Frame { + pub label: String, + pub width: usize, + pub height: usize, +} + +impl Frame { + pub fn width(&self) -> usize { + self.width + } + + pub fn height(&self) -> usize { + self.height + } + + pub fn image(&self) -> CVImageBuffer { + unimplemented!("you can't call this in test mode") + } } diff --git a/crates/workspace/src/pane_group.rs b/crates/workspace/src/pane_group.rs index b99aba3b3a..c778115d91 100644 --- a/crates/workspace/src/pane_group.rs +++ b/crates/workspace/src/pane_group.rs @@ -159,7 +159,7 @@ impl Member { let origin = bounds.origin() + (bounds.size() / 2.) - size / 2.; cx.scene.push_surface(gpui::mac::Surface { bounds: RectF::new(origin, size), - image_buffer: frame, + image_buffer: frame.image(), }); } })