From ca0a4bdf8e24d64039ba2ae1bfad2b2fbbf48b07 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 28 Sep 2023 18:58:52 -0700 Subject: [PATCH] Introduce a WorkspaceStore for handling following --- crates/call/src/call.rs | 205 ++---------------------- crates/collab/src/tests/test_server.rs | 4 +- crates/workspace/src/workspace.rs | 212 +++++++++++++++++++++---- crates/zed/src/main.rs | 4 +- 4 files changed, 201 insertions(+), 224 deletions(-) diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index f5bc05e37a..bdc402ee77 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -2,29 +2,23 @@ pub mod call_settings; pub mod participant; pub mod room; -use std::sync::Arc; - use anyhow::{anyhow, Result}; use audio::Audio; use call_settings::CallSettings; use channel::ChannelId; -use client::{ - proto::{self, PeerId}, - ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore, -}; +use client::{proto, ClickhouseEvent, Client, TelemetrySettings, TypedEnvelope, User, UserStore}; use collections::HashSet; use futures::{future::Shared, FutureExt}; -use postage::watch; - use gpui::{ - AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelContext, - ModelHandle, Subscription, Task, ViewContext, WeakModelHandle, + AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Subscription, Task, + WeakModelHandle, }; +use postage::watch; use project::Project; +use std::sync::Arc; pub use participant::ParticipantLocation; pub use room::Room; -use util::ResultExt; pub fn init(client: Arc, user_store: ModelHandle, cx: &mut AppContext) { settings::register::(cx); @@ -53,25 +47,9 @@ pub struct ActiveCall { ), client: Arc, user_store: ModelHandle, - follow_handlers: Vec, - followers: Vec, _subscriptions: Vec, } -#[derive(PartialEq, Eq, PartialOrd, Ord, Debug)] -struct Follower { - project_id: Option, - peer_id: PeerId, -} - -struct FollowHandler { - project_id: Option, - root_view: AnyWeakViewHandle, - get_views: - Box, &mut AppContext) -> Option>, - update_view: Box, -} - impl Entity for ActiveCall { type Event = room::Event; } @@ -88,14 +66,10 @@ impl ActiveCall { location: None, pending_invites: Default::default(), incoming_call: watch::channel(), - follow_handlers: Default::default(), - followers: Default::default(), + _subscriptions: vec![ client.add_request_handler(cx.handle(), Self::handle_incoming_call), client.add_message_handler(cx.handle(), Self::handle_call_canceled), - client.add_request_handler(cx.handle(), Self::handle_follow), - client.add_message_handler(cx.handle(), Self::handle_unfollow), - client.add_message_handler(cx.handle(), Self::handle_update_from_leader), ], client, user_store, @@ -106,48 +80,6 @@ impl ActiveCall { self.room()?.read(cx).channel_id() } - pub fn add_follow_handler( - &mut self, - root_view: gpui::ViewHandle, - project_id: Option, - get_views: GetViews, - update_view: UpdateView, - _cx: &mut ModelContext, - ) where - GetViews: 'static - + Fn(&mut V, Option, &mut gpui::ViewContext) -> Result, - UpdateView: - 'static + Fn(&mut V, PeerId, proto::UpdateFollowers, &mut ViewContext) -> Result<()>, - { - self.follow_handlers - .retain(|h| h.root_view.id() != root_view.id()); - if let Err(ix) = self - .follow_handlers - .binary_search_by_key(&(project_id, root_view.id()), |f| { - (f.project_id, f.root_view.id()) - }) - { - self.follow_handlers.insert( - ix, - FollowHandler { - project_id, - root_view: root_view.into_any().downgrade(), - get_views: Box::new(move |view, project_id, cx| { - let view = view.clone().downcast::().unwrap(); - view.update(cx, |view, cx| get_views(view, project_id, cx).log_err()) - .flatten() - }), - update_view: Box::new(move |view, leader_id, message, cx| { - let view = view.clone().downcast::().unwrap(); - view.update(cx, |view, cx| { - update_view(view, leader_id, message, cx).log_err() - }); - }), - }, - ); - } - } - async fn handle_incoming_call( this: ModelHandle, envelope: TypedEnvelope, @@ -194,127 +126,6 @@ impl ActiveCall { Ok(()) } - async fn handle_follow( - this: ModelHandle, - envelope: TypedEnvelope, - _: Arc, - mut cx: AsyncAppContext, - ) -> Result { - this.update(&mut cx, |this, cx| { - let follower = Follower { - project_id: envelope.payload.project_id, - peer_id: envelope.original_sender_id()?, - }; - let active_project_id = this - .location - .as_ref() - .and_then(|project| project.upgrade(cx)?.read(cx).remote_id()); - - let mut response = proto::FollowResponse::default(); - for handler in &this.follow_handlers { - if follower.project_id != handler.project_id && follower.project_id.is_some() { - continue; - } - - let Some(root_view) = handler.root_view.upgrade(cx) else { - continue; - }; - - let Some(handler_response) = - (handler.get_views)(&root_view, follower.project_id, cx) - else { - continue; - }; - - if response.views.is_empty() { - response.views = handler_response.views; - } else { - response.views.extend_from_slice(&handler_response.views); - } - - if let Some(active_view_id) = handler_response.active_view_id.clone() { - if response.active_view_id.is_none() || handler.project_id == active_project_id - { - response.active_view_id = Some(active_view_id); - } - } - } - - if let Err(ix) = this.followers.binary_search(&follower) { - this.followers.insert(ix, follower); - } - - Ok(response) - }) - } - - async fn handle_unfollow( - this: ModelHandle, - envelope: TypedEnvelope, - _: Arc, - mut cx: AsyncAppContext, - ) -> Result<()> { - this.update(&mut cx, |this, _| { - let follower = Follower { - project_id: envelope.payload.project_id, - peer_id: envelope.original_sender_id()?, - }; - if let Ok(ix) = this.followers.binary_search(&follower) { - this.followers.remove(ix); - } - Ok(()) - }) - } - - async fn handle_update_from_leader( - this: ModelHandle, - envelope: TypedEnvelope, - _: Arc, - mut cx: AsyncAppContext, - ) -> Result<()> { - let leader_id = envelope.original_sender_id()?; - let update = envelope.payload; - this.update(&mut cx, |this, cx| { - for handler in &this.follow_handlers { - if update.project_id != handler.project_id && update.project_id.is_some() { - continue; - } - let Some(root_view) = handler.root_view.upgrade(cx) else { - continue; - }; - (handler.update_view)(&root_view, leader_id, update.clone(), cx); - } - Ok(()) - }) - } - - pub fn update_followers( - &self, - project_id: Option, - update: proto::update_followers::Variant, - cx: &AppContext, - ) -> Option<()> { - let room_id = self.room()?.read(cx).id(); - let follower_ids: Vec<_> = self - .followers - .iter() - .filter_map(|follower| { - (follower.project_id == project_id).then_some(follower.peer_id.into()) - }) - .collect(); - if follower_ids.is_empty() { - return None; - } - self.client - .send(proto::UpdateFollowers { - room_id, - project_id, - follower_ids, - variant: Some(update), - }) - .log_err() - } - pub fn global(cx: &AppContext) -> ModelHandle { cx.global::>().clone() } @@ -536,6 +347,10 @@ impl ActiveCall { } } + pub fn location(&self) -> Option<&WeakModelHandle> { + self.location.as_ref() + } + pub fn set_location( &mut self, project: Option<&ModelHandle>, diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index 71537f069f..a56df311bd 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -29,7 +29,7 @@ use std::{ }, }; use util::http::FakeHttpClient; -use workspace::Workspace; +use workspace::{Workspace, WorkspaceStore}; pub struct TestServer { pub app_state: Arc, @@ -204,6 +204,7 @@ impl TestServer { let fs = FakeFs::new(cx.background()); let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx)); + let workspace_store = cx.add_model(|cx| WorkspaceStore::new(client.clone(), cx)); let channel_store = cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx)); let mut language_registry = LanguageRegistry::test(); @@ -211,6 +212,7 @@ impl TestServer { let app_state = Arc::new(workspace::AppState { client: client.clone(), user_store: user_store.clone(), + workspace_store, channel_store: channel_store.clone(), languages: Arc::new(language_registry), fs: fs.clone(), diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 89dff882c3..256ecfd3e9 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -15,7 +15,7 @@ use call::ActiveCall; use channel::ChannelStore; use client::{ proto::{self, PeerId}, - Client, UserStore, + Client, TypedEnvelope, UserStore, }; use collections::{hash_map, HashMap, HashSet}; use drag_and_drop::DragAndDrop; @@ -451,6 +451,7 @@ pub struct AppState { pub client: Arc, pub user_store: ModelHandle, pub channel_store: ModelHandle, + pub workspace_store: ModelHandle, pub fs: Arc, pub build_window_options: fn(Option, Option, &dyn Platform) -> WindowOptions<'static>, @@ -459,6 +460,19 @@ pub struct AppState { pub background_actions: BackgroundActions, } +pub struct WorkspaceStore { + workspaces: HashSet>, + followers: Vec, + client: Arc, + _subscriptions: Vec, +} + +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug)] +struct Follower { + project_id: Option, + peer_id: PeerId, +} + impl AppState { #[cfg(any(test, feature = "test-support"))] pub fn test(cx: &mut AppContext) -> Arc { @@ -475,6 +489,7 @@ impl AppState { let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx)); let channel_store = cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx)); + let workspace_store = cx.add_model(|cx| WorkspaceStore::new(client.clone(), cx)); theme::init((), cx); client::init(&client, cx); @@ -486,6 +501,7 @@ impl AppState { languages, user_store, channel_store, + workspace_store, initialize_workspace: |_, _, _, _| Task::ready(Ok(())), build_window_options: |_, _, _| Default::default(), background_actions: || &[], @@ -663,6 +679,10 @@ impl Workspace { cx.focus(¢er_pane); cx.emit(Event::PaneAdded(center_pane.clone())); + app_state.workspace_store.update(cx, |store, _| { + store.workspaces.insert(weak_handle.clone()); + }); + let mut current_user = app_state.user_store.read(cx).watch_current_user(); let mut connection_status = app_state.client.status(); let _observe_current_user = cx.spawn(|this, mut cx| async move { @@ -2492,19 +2512,8 @@ impl Workspace { &self.active_pane } - fn project_remote_id_changed(&mut self, remote_id: Option, cx: &mut ViewContext) { - let handle = cx.handle(); - if let Some(call) = self.active_call() { - call.update(cx, |call, cx| { - call.add_follow_handler( - handle, - remote_id, - Self::get_views_for_followers, - Self::handle_update_followers, - cx, - ); - }); - } + fn project_remote_id_changed(&mut self, _project_id: Option, _cx: &mut ViewContext) { + // TODO } fn collaborator_left(&mut self, peer_id: PeerId, cx: &mut ViewContext) { @@ -2793,11 +2802,7 @@ impl Workspace { // RPC handlers - fn get_views_for_followers( - &mut self, - _project_id: Option, - cx: &mut ViewContext, - ) -> Result { + fn handle_follow(&mut self, cx: &mut ViewContext) -> proto::FollowResponse { let client = &self.app_state.client; let active_view_id = self.active_item(cx).and_then(|i| { @@ -2810,7 +2815,7 @@ impl Workspace { cx.notify(); - Ok(proto::FollowResponse { + proto::FollowResponse { active_view_id, views: self .panes() @@ -2832,7 +2837,7 @@ impl Workspace { }) }) .collect(), - }) + } } fn handle_update_followers( @@ -2840,10 +2845,10 @@ impl Workspace { leader_id: PeerId, message: proto::UpdateFollowers, _cx: &mut ViewContext, - ) -> Result<()> { + ) { self.leader_updates_tx - .unbounded_send((leader_id, message))?; - Ok(()) + .unbounded_send((leader_id, message)) + .ok(); } async fn process_leader_update( @@ -2999,9 +3004,9 @@ impl Workspace { update: proto::update_followers::Variant, cx: &AppContext, ) -> Option<()> { - self.active_call()? - .read(cx) - .update_followers(self.project.read(cx).remote_id(), update, cx) + self.app_state().workspace_store.read_with(cx, |store, cx| { + store.update_followers(self.project.read(cx).remote_id(), update, cx) + }) } pub fn leader_for_pane(&self, pane: &ViewHandle) -> Option { @@ -3472,8 +3477,10 @@ impl Workspace { let channel_store = cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx)); + let workspace_store = cx.add_model(|cx| WorkspaceStore::new(client.clone(), cx)); let app_state = Arc::new(AppState { languages: project.read(cx).languages().clone(), + workspace_store, client, user_store, channel_store, @@ -3717,6 +3724,12 @@ fn notify_if_database_failed(workspace: &WeakViewHandle, cx: &mut Asy impl Entity for Workspace { type Event = Event; + + fn release(&mut self, cx: &mut AppContext) { + self.app_state.workspace_store.update(cx, |store, _| { + store.workspaces.remove(&self.weak_self); + }) + } } impl View for Workspace { @@ -3859,6 +3872,151 @@ impl View for Workspace { } } +impl WorkspaceStore { + pub fn new(client: Arc, cx: &mut ModelContext) -> Self { + Self { + workspaces: Default::default(), + followers: Default::default(), + _subscriptions: vec![ + client.add_request_handler(cx.handle(), Self::handle_follow), + client.add_message_handler(cx.handle(), Self::handle_unfollow), + client.add_message_handler(cx.handle(), Self::handle_update_from_leader), + ], + client, + } + } + + pub fn update_followers( + &self, + project_id: Option, + update: proto::update_followers::Variant, + cx: &AppContext, + ) -> Option<()> { + if !cx.has_global::>() { + return None; + } + + let room_id = ActiveCall::global(cx).read(cx).room()?.read(cx).id(); + let follower_ids: Vec<_> = self + .followers + .iter() + .filter_map(|follower| { + (follower.project_id == project_id).then_some(follower.peer_id.into()) + }) + .collect(); + if follower_ids.is_empty() { + return None; + } + self.client + .send(proto::UpdateFollowers { + room_id, + project_id, + follower_ids, + variant: Some(update), + }) + .log_err() + } + + async fn handle_follow( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { + this.update(&mut cx, |this, cx| { + let follower = Follower { + project_id: envelope.payload.project_id, + peer_id: envelope.original_sender_id()?, + }; + let active_project_id = ActiveCall::global(cx) + .read(cx) + .location() + .as_ref() + .and_then(|project| project.upgrade(cx)?.read(cx).remote_id()); + + let mut response = proto::FollowResponse::default(); + for workspace in &this.workspaces { + let Some(workspace) = workspace.upgrade(cx) else { + continue; + }; + + workspace.update(cx.as_mut(), |workspace, cx| { + let project_id = workspace.project.read(cx).remote_id(); + if follower.project_id != project_id && follower.project_id.is_some() { + return; + } + + let handler_response = workspace.handle_follow(cx); + if response.views.is_empty() { + response.views = handler_response.views; + } else { + response.views.extend_from_slice(&handler_response.views); + } + + if let Some(active_view_id) = handler_response.active_view_id.clone() { + if response.active_view_id.is_none() || project_id == active_project_id { + response.active_view_id = Some(active_view_id); + } + } + }); + } + + if let Err(ix) = this.followers.binary_search(&follower) { + this.followers.insert(ix, follower); + } + + Ok(response) + }) + } + + async fn handle_unfollow( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, _| { + let follower = Follower { + project_id: envelope.payload.project_id, + peer_id: envelope.original_sender_id()?, + }; + if let Ok(ix) = this.followers.binary_search(&follower) { + this.followers.remove(ix); + } + Ok(()) + }) + } + + async fn handle_update_from_leader( + this: ModelHandle, + envelope: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + let leader_id = envelope.original_sender_id()?; + let update = envelope.payload; + this.update(&mut cx, |this, cx| { + for workspace in &this.workspaces { + let Some(workspace) = workspace.upgrade(cx) else { + continue; + }; + workspace.update(cx.as_mut(), |workspace, cx| { + let project_id = workspace.project.read(cx).remote_id(); + if update.project_id != project_id && update.project_id.is_some() { + return; + } + workspace.handle_update_followers(leader_id, update.clone(), cx); + }); + } + Ok(()) + }) + } +} + +impl Entity for WorkspaceStore { + type Event = (); +} + impl ViewId { pub(crate) fn from_proto(message: proto::ViewId) -> Result { Ok(Self { diff --git a/crates/zed/src/main.rs b/crates/zed/src/main.rs index bb44f67841..7991cabde2 100644 --- a/crates/zed/src/main.rs +++ b/crates/zed/src/main.rs @@ -54,7 +54,7 @@ use welcome::{show_welcome_experience, FIRST_OPEN}; use fs::RealFs; use util::{channel::RELEASE_CHANNEL, paths, ResultExt, TryFutureExt}; -use workspace::AppState; +use workspace::{AppState, WorkspaceStore}; use zed::{ assets::Assets, build_window_options, handle_keymap_file_changes, initialize_workspace, languages, menus, @@ -139,6 +139,7 @@ fn main() { let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http.clone(), cx)); let channel_store = cx.add_model(|cx| ChannelStore::new(client.clone(), user_store.clone(), cx)); + let workspace_store = cx.add_model(|cx| WorkspaceStore::new(client.clone(), cx)); cx.set_global(client.clone()); @@ -187,6 +188,7 @@ fn main() { build_window_options, initialize_workspace, background_actions, + workspace_store, }); cx.set_global(Arc::downgrade(&app_state));