diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 4db298fe98..3c5e18713b 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -8,19 +8,23 @@ use anyhow::{anyhow, Result}; use audio::Audio; use call_settings::CallSettings; use channel::ChannelId; -use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore}; +use client::{ + proto::{self, PeerId}, + ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore, +}; use collections::HashSet; use futures::{future::Shared, FutureExt}; use postage::watch; use gpui::{ - AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task, - WeakModelHandle, + AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelContext, + ModelHandle, Subscription, Task, ViewContext, WeakModelHandle, }; use project::Project; pub use participant::ParticipantLocation; pub use room::Room; +use util::ResultExt; pub fn init(client: Arc, user_store: ModelHandle, cx: &mut AppContext) { settings::register::(cx); @@ -49,9 +53,25 @@ pub struct ActiveCall { ), client: Arc, user_store: ModelHandle, + follow_handlers: Vec, + followers: Vec, _subscriptions: Vec, } +#[derive(PartialEq, Eq, PartialOrd, Ord)] +struct Follower { + project_id: Option, + peer_id: PeerId, +} + +struct FollowHandler { + project_id: Option, + root_view: AnyWeakViewHandle, + get_views: + Box, &mut AppContext) -> Option>, + update_view: Box, +} + impl Entity for ActiveCall { type Event = room::Event; } @@ -68,9 +88,14 @@ impl ActiveCall { location: None, pending_invites: Default::default(), incoming_call: watch::channel(), + follow_handlers: Default::default(), + followers: Default::default(), _subscriptions: vec![ client.add_request_handler(cx.handle(), Self::handle_incoming_call), client.add_message_handler(cx.handle(), Self::handle_call_canceled), + client.add_request_handler(cx.handle(), Self::handle_follow), + client.add_message_handler(cx.handle(), Self::handle_unfollow), + client.add_message_handler(cx.handle(), Self::handle_update_followers), ], client, user_store, @@ -81,6 +106,48 @@ impl ActiveCall { self.room()?.read(cx).channel_id() } + pub fn add_follow_handler( + &mut self, + root_view: gpui::ViewHandle, + project_id: Option, + get_views: GetViews, + update_view: UpdateView, + _cx: &mut ModelContext, + ) where + GetViews: 'static + + Fn(&mut V, Option, &mut gpui::ViewContext) -> Result, + UpdateView: + 'static + Fn(&mut V, PeerId, proto::UpdateFollowers, &mut ViewContext) -> Result<()>, + { + self.follow_handlers + .retain(|h| h.root_view.id() != root_view.id()); + if let Err(ix) = self + .follow_handlers + .binary_search_by_key(&(project_id, root_view.id()), |f| { + (f.project_id, f.root_view.id()) + }) + { + self.follow_handlers.insert( + ix, + FollowHandler { + project_id, + root_view: root_view.into_any().downgrade(), + get_views: Box::new(move |view, project_id, cx| { + let view = view.clone().downcast::().unwrap(); + view.update(cx, |view, cx| get_views(view, project_id, cx).log_err()) + .flatten() + }), + update_view: Box::new(move |view, leader_id, message, cx| { + let view = view.clone().downcast::().unwrap(); + view.update(cx, |view, cx| { + update_view(view, leader_id, message, cx).log_err() + }); + }), + }, + ); + } + } + async fn handle_incoming_call( this: ModelHandle, envelope: TypedEnvelope, @@ -127,6 +194,127 @@ impl ActiveCall { Ok(()) } + async fn handle_follow( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { + this.update(&mut cx, |this, cx| { + let follower = Follower { + project_id: envelope.payload.project_id, + peer_id: envelope.original_sender_id()?, + }; + let active_project_id = this + .location + .as_ref() + .and_then(|project| project.upgrade(cx)?.read(cx).remote_id()); + + let mut response = proto::FollowResponse::default(); + for handler in &this.follow_handlers { + if follower.project_id != handler.project_id && follower.project_id.is_some() { + continue; + } + + let Some(root_view) = handler.root_view.upgrade(cx) else { + continue; + }; + + let Some(handler_response) = + (handler.get_views)(&root_view, follower.project_id, cx) + else { + continue; + }; + + if response.views.is_empty() { + response.views = handler_response.views; + } else { + response.views.extend_from_slice(&handler_response.views); + } + + if let Some(active_view_id) = handler_response.active_view_id.clone() { + if response.active_view_id.is_none() || handler.project_id == active_project_id + { + response.active_view_id = Some(active_view_id); + } + } + } + + if let Err(ix) = this.followers.binary_search(&follower) { + this.followers.insert(ix, follower); + } + + Ok(response) + }) + } + + async fn handle_unfollow( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, _| { + let follower = Follower { + project_id: envelope.payload.project_id, + peer_id: envelope.original_sender_id()?, + }; + if let Err(ix) = this.followers.binary_search(&follower) { + this.followers.remove(ix); + } + Ok(()) + }) + } + + async fn handle_update_followers( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let leader_id = envelope.original_sender_id()?; + let update = envelope.payload; + this.update(&mut cx, |this, cx| { + for handler in &this.follow_handlers { + if update.project_id != handler.project_id && update.project_id.is_some() { + continue; + } + let Some(root_view) = handler.root_view.upgrade(cx) else { + continue; + }; + (handler.update_view)(&root_view, leader_id, update.clone(), cx); + } + Ok(()) + }) + } + + pub fn update_followers( + &self, + project_id: Option, + update: proto::update_followers::Variant, + cx: &AppContext, + ) -> Option<()> { + let room_id = self.room()?.read(cx).id(); + let follower_ids: Vec<_> = self + .followers + .iter() + .filter_map(|follower| { + (follower.project_id == project_id).then_some(follower.peer_id.into()) + }) + .collect(); + if follower_ids.is_empty() { + return None; + } + self.client + .send(proto::UpdateFollowers { + room_id, + project_id, + follower_ids, + variant: Some(update), + }) + .log_err() + } + pub fn global(cx: &AppContext) -> ModelHandle { cx.global::>().clone() } diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index e7899ab2d8..ffa941bfa1 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -62,7 +62,7 @@ pub struct Room { leave_when_empty: bool, client: Arc, user_store: ModelHandle, - follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec>, + follows_by_leader_id_project_id: HashMap<(PeerId, Option), Vec>, subscriptions: Vec, pending_room_update: Option>, maintain_connection: Option>>, @@ -584,7 +584,7 @@ impl Room { pub fn followers_for(&self, leader_id: PeerId, project_id: u64) -> &[PeerId] { self.follows_by_leader_id_project_id - .get(&(leader_id, project_id)) + .get(&(leader_id, Some(project_id))) .map_or(&[], |v| v.as_slice()) } diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index feab53d094..11cb47afc5 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -15,7 +15,7 @@ use call::ActiveCall; use channel::ChannelStore; use client::{ proto::{self, PeerId}, - Client, TypedEnvelope, UserStore, + Client, UserStore, }; use collections::{hash_map, HashMap, HashSet}; use drag_and_drop::DragAndDrop; @@ -331,11 +331,6 @@ pub fn init(app_state: Arc, cx: &mut AppContext) { }) .detach(); }); - - let client = &app_state.client; - client.add_view_request_handler(Workspace::handle_follow); - client.add_view_message_handler(Workspace::handle_unfollow); - client.add_view_message_handler(Workspace::handle_update_followers); } type ProjectItemBuilders = HashMap< @@ -507,7 +502,6 @@ pub enum Event { pub struct Workspace { weak_self: WeakViewHandle, - remote_entity_subscription: Option, modal: Option, zoomed: Option, zoomed_position: Option, @@ -523,7 +517,6 @@ pub struct Workspace { titlebar_item: Option, notifications: Vec<(TypeId, usize, Box)>, project: ModelHandle, - leader_state: LeaderState, follower_states_by_leader: FollowerStatesByLeader, last_leaders_by_pane: HashMap, PeerId>, window_edited: bool, @@ -549,11 +542,6 @@ pub struct ViewId { pub id: u64, } -#[derive(Default)] -struct LeaderState { - followers: HashSet, -} - type FollowerStatesByLeader = HashMap, FollowerState>>; #[derive(Default)] @@ -737,12 +725,10 @@ impl Workspace { status_bar, titlebar_item: None, notifications: Default::default(), - remote_entity_subscription: None, left_dock, bottom_dock, right_dock, project: project.clone(), - leader_state: Default::default(), follower_states_by_leader: Default::default(), last_leaders_by_pane: Default::default(), window_edited: false, @@ -2419,19 +2405,21 @@ impl Workspace { } fn project_remote_id_changed(&mut self, remote_id: Option, cx: &mut ViewContext) { - if let Some(remote_id) = remote_id { - self.remote_entity_subscription = Some( - self.app_state - .client - .add_view_for_remote_entity(remote_id, cx), - ); - } else { - self.remote_entity_subscription.take(); + let handle = cx.handle(); + if let Some(call) = self.active_call() { + call.update(cx, |call, cx| { + call.add_follow_handler( + handle, + remote_id, + Self::get_views_for_followers, + Self::handle_update_followers, + cx, + ); + }); } } fn collaborator_left(&mut self, peer_id: PeerId, cx: &mut ViewContext) { - self.leader_state.followers.remove(&peer_id); if let Some(states_by_pane) = self.follower_states_by_leader.remove(&peer_id) { for state in states_by_pane.into_values() { for item in state.items_by_leader_view_id.into_values() { @@ -2463,8 +2451,10 @@ impl Workspace { .insert(pane.clone(), Default::default()); cx.notify(); - let project_id = self.project.read(cx).remote_id()?; + let room_id = self.active_call()?.read(cx).room()?.read(cx).id(); + let project_id = self.project.read(cx).remote_id(); let request = self.app_state.client.request(proto::Follow { + room_id, project_id, leader_id: Some(leader_id), }); @@ -2542,15 +2532,16 @@ impl Workspace { if states_by_pane.is_empty() { self.follower_states_by_leader.remove(&leader_id); - if let Some(project_id) = self.project.read(cx).remote_id() { - self.app_state - .client - .send(proto::Unfollow { - project_id, - leader_id: Some(leader_id), - }) - .log_err(); - } + let project_id = self.project.read(cx).remote_id(); + let room_id = self.active_call()?.read(cx).room()?.read(cx).id(); + self.app_state + .client + .send(proto::Unfollow { + room_id, + project_id, + leader_id: Some(leader_id), + }) + .log_err(); } cx.notify(); @@ -2564,10 +2555,6 @@ impl Workspace { self.follower_states_by_leader.contains_key(&peer_id) } - pub fn is_followed_by(&self, peer_id: PeerId) -> bool { - self.leader_state.followers.contains(&peer_id) - } - fn render_titlebar(&self, theme: &Theme, cx: &mut ViewContext) -> AnyElement { // TODO: There should be a better system in place for this // (https://github.com/zed-industries/zed/issues/1290) @@ -2718,80 +2705,56 @@ impl Workspace { // RPC handlers - async fn handle_follow( - this: WeakViewHandle, - envelope: TypedEnvelope, - _: Arc, - mut cx: AsyncAppContext, + fn get_views_for_followers( + &mut self, + _project_id: Option, + cx: &mut ViewContext, ) -> Result { - this.update(&mut cx, |this, cx| { - let client = &this.app_state.client; - this.leader_state - .followers - .insert(envelope.original_sender_id()?); + let client = &self.app_state.client; - let active_view_id = this.active_item(cx).and_then(|i| { - Some( - i.to_followable_item_handle(cx)? - .remote_id(client, cx)? - .to_proto(), - ) - }); + let active_view_id = self.active_item(cx).and_then(|i| { + Some( + i.to_followable_item_handle(cx)? + .remote_id(client, cx)? + .to_proto(), + ) + }); - cx.notify(); + cx.notify(); - Ok(proto::FollowResponse { - active_view_id, - views: this - .panes() - .iter() - .flat_map(|pane| { - let leader_id = this.leader_for_pane(pane); - pane.read(cx).items().filter_map({ - let cx = &cx; - move |item| { - let item = item.to_followable_item_handle(cx)?; - let id = item.remote_id(client, cx)?.to_proto(); - let variant = item.to_state_proto(cx)?; - Some(proto::View { - id: Some(id), - leader_id, - variant: Some(variant), - }) - } - }) + Ok(proto::FollowResponse { + active_view_id, + views: self + .panes() + .iter() + .flat_map(|pane| { + let leader_id = self.leader_for_pane(pane); + pane.read(cx).items().filter_map({ + let cx = &cx; + move |item| { + let item = item.to_followable_item_handle(cx)?; + let id = item.remote_id(client, cx)?.to_proto(); + let variant = item.to_state_proto(cx)?; + Some(proto::View { + id: Some(id), + leader_id, + variant: Some(variant), + }) + } }) - .collect(), - }) - })? + }) + .collect(), + }) } - async fn handle_unfollow( - this: WeakViewHandle, - envelope: TypedEnvelope, - _: Arc, - mut cx: AsyncAppContext, + fn handle_update_followers( + &mut self, + leader_id: PeerId, + message: proto::UpdateFollowers, + _cx: &mut ViewContext, ) -> Result<()> { - this.update(&mut cx, |this, cx| { - this.leader_state - .followers - .remove(&envelope.original_sender_id()?); - cx.notify(); - Ok(()) - })? - } - - async fn handle_update_followers( - this: WeakViewHandle, - envelope: TypedEnvelope, - _: Arc, - cx: AsyncAppContext, - ) -> Result<()> { - let leader_id = envelope.original_sender_id()?; - this.read_with(&cx, |this, _| { - this.leader_updates_tx - .unbounded_send((leader_id, envelope.payload)) - })??; + self.leader_updates_tx + .unbounded_send((leader_id, message))?; Ok(()) } @@ -2960,18 +2923,9 @@ impl Workspace { update: proto::update_followers::Variant, cx: &AppContext, ) -> Option<()> { - let project_id = self.project.read(cx).remote_id()?; - if !self.leader_state.followers.is_empty() { - self.app_state - .client - .send(proto::UpdateFollowers { - project_id, - follower_ids: self.leader_state.followers.iter().copied().collect(), - variant: Some(update), - }) - .log_err(); - } - None + self.active_call()? + .read(cx) + .update_followers(self.project.read(cx).remote_id(), update, cx) } pub fn leader_for_pane(&self, pane: &ViewHandle) -> Option {