diff --git a/server/src/rpc.rs b/server/src/rpc.rs index d6bb42d256..b0d50d7272 100644 --- a/server/src/rpc.rs +++ b/server/src/rpc.rs @@ -267,13 +267,12 @@ impl Server { .await .user_id_for_connection(request.sender_id)?; - let mut collaborator_user_ids = Vec::new(); + let mut collaborator_user_ids = HashSet::new(); + collaborator_user_ids.insert(host_user_id); for github_login in request.payload.collaborator_logins { match self.app_state.db.create_user(&github_login, false).await { Ok(collaborator_user_id) => { - if collaborator_user_id != host_user_id { - collaborator_user_ids.push(collaborator_user_id); - } + collaborator_user_ids.insert(collaborator_user_id); } Err(err) => { let message = err.to_string(); @@ -285,24 +284,19 @@ impl Server { } } - let worktree_id; - let mut user_ids; - { - let mut state = self.state.write().await; - worktree_id = state.add_worktree(Worktree { - host_connection_id: request.sender_id, - collaborator_user_ids: collaborator_user_ids.clone(), - root_name: request.payload.root_name, - share: None, - }); - user_ids = collaborator_user_ids; - user_ids.push(host_user_id); - } + let collaborator_user_ids = collaborator_user_ids.into_iter().collect::>(); + let worktree_id = self.state.write().await.add_worktree(Worktree { + host_connection_id: request.sender_id, + collaborator_user_ids: collaborator_user_ids.clone(), + root_name: request.payload.root_name, + share: None, + }); self.peer .respond(receipt, proto::OpenWorktreeResponse { worktree_id }) .await?; - self.update_collaborators_for_users(&user_ids).await?; + self.update_collaborators_for_users(&collaborator_user_ids) + .await?; Ok(()) } @@ -311,11 +305,6 @@ impl Server { self: Arc, mut request: TypedEnvelope, ) -> tide::Result<()> { - let host_user_id = self - .state - .read() - .await - .user_id_for_connection(request.sender_id)?; let worktree = request .payload .worktree @@ -333,15 +322,14 @@ impl Server { active_replica_ids: Default::default(), entries, }); - - let mut user_ids = worktree.collaborator_user_ids.clone(); - user_ids.push(host_user_id); + let collaborator_user_ids = worktree.collaborator_user_ids.clone(); drop(state); self.peer .respond(request.receipt(), proto::ShareWorktreeResponse {}) .await?; - self.update_collaborators_for_users(&user_ids).await?; + self.update_collaborators_for_users(&collaborator_user_ids) + .await?; } else { self.peer .respond_with_error( @@ -360,14 +348,9 @@ impl Server { request: TypedEnvelope, ) -> tide::Result<()> { let worktree_id = request.payload.worktree_id; - let host_user_id = self - .state - .read() - .await - .user_id_for_connection(request.sender_id)?; let connection_ids; - let mut user_ids; + let collaborator_user_ids; { let mut state = self.state.write().await; let worktree = state.write_worktree(worktree_id, request.sender_id)?; @@ -376,8 +359,8 @@ impl Server { } connection_ids = worktree.connection_ids(); - user_ids = worktree.collaborator_user_ids.clone(); - user_ids.push(host_user_id); + collaborator_user_ids = worktree.collaborator_user_ids.clone(); + worktree.share.take(); for connection_id in &connection_ids { if let Some(connection) = state.connections.get_mut(connection_id) { @@ -391,7 +374,8 @@ impl Server { .send(conn_id, proto::UnshareWorktree { worktree_id }) }) .await?; - self.update_collaborators_for_users(&user_ids).await?; + self.update_collaborators_for_users(&collaborator_user_ids) + .await?; Ok(()) } @@ -409,7 +393,7 @@ impl Server { let response; let connection_ids; - let mut user_ids; + let collaborator_user_ids; let mut state = self.state.write().await; match state.join_worktree(request.sender_id, user_id, worktree_id) { Ok((peer_replica_id, worktree)) => { @@ -437,11 +421,8 @@ impl Server { replica_id: peer_replica_id as u32, peers, }; - - let host_connection_id = worktree.host_connection_id; connection_ids = worktree.connection_ids(); - user_ids = worktree.collaborator_user_ids.clone(); - user_ids.push(state.user_id_for_connection(host_connection_id)?); + collaborator_user_ids = worktree.collaborator_user_ids.clone(); } Err(error) => { self.peer @@ -471,7 +452,8 @@ impl Server { }) .await?; self.peer.respond(request.receipt(), response).await?; - self.update_collaborators_for_users(&user_ids).await?; + self.update_collaborators_for_users(&collaborator_user_ids) + .await?; Ok(()) } @@ -490,16 +472,14 @@ impl Server { sender_conn_id: ConnectionId, ) -> tide::Result<()> { let connection_ids; - let mut user_ids; - + let collaborator_user_ids; let mut is_host = false; let mut is_guest = false; { let mut state = self.state.write().await; let worktree = state.write_worktree(worktree_id, sender_conn_id)?; - let host_connection_id = worktree.host_connection_id; connection_ids = worktree.connection_ids(); - user_ids = worktree.collaborator_user_ids.clone(); + collaborator_user_ids = worktree.collaborator_user_ids.clone(); if worktree.host_connection_id == sender_conn_id { is_host = true; @@ -511,8 +491,6 @@ impl Server { share.active_replica_ids.remove(&replica_id); } } - - user_ids.push(state.user_id_for_connection(host_connection_id)?); } if is_host { @@ -533,7 +511,8 @@ impl Server { }) .await? } - self.update_collaborators_for_users(&user_ids).await?; + self.update_collaborators_for_users(&collaborator_user_ids) + .await?; Ok(()) } diff --git a/zed/src/lib.rs b/zed/src/lib.rs index 397eed486b..b7fa3f83d0 100644 --- a/zed/src/lib.rs +++ b/zed/src/lib.rs @@ -9,7 +9,6 @@ pub mod http; pub mod language; pub mod menus; pub mod people_panel; -pub mod presence; pub mod project_browser; pub mod rpc; pub mod settings; diff --git a/zed/src/main.rs b/zed/src/main.rs index 45aeb52dbc..c88b1465d1 100644 --- a/zed/src/main.rs +++ b/zed/src/main.rs @@ -13,9 +13,7 @@ use zed::{ channel::ChannelList, chat_panel, editor, file_finder, fs::RealFs, - http, language, menus, - presence::Presence, - rpc, settings, theme_selector, + http, language, menus, rpc, settings, theme_selector, user::UserStore, workspace::{self, OpenNew, OpenParams, OpenPaths}, AppState, @@ -40,14 +38,13 @@ fn main() { app.run(move |cx| { let rpc = rpc::Client::new(); let http = http::client(); - let user_store = UserStore::new(rpc.clone(), http.clone(), cx.background()); + let user_store = cx.add_model(|cx| UserStore::new(rpc.clone(), http.clone(), cx)); let app_state = Arc::new(AppState { languages: languages.clone(), settings_tx: Arc::new(Mutex::new(settings_tx)), settings, themes, channel_list: cx.add_model(|cx| ChannelList::new(user_store.clone(), rpc.clone(), cx)), - presence: cx.add_model(|cx| Presence::new(user_store.clone(), rpc.clone(), cx)), rpc, user_store, fs: Arc::new(RealFs), diff --git a/zed/src/presence.rs b/zed/src/presence.rs deleted file mode 100644 index 2cdde21e2f..0000000000 --- a/zed/src/presence.rs +++ /dev/null @@ -1,130 +0,0 @@ -use crate::{ - rpc::Client, - user::{User, UserStore}, - util::TryFutureExt, -}; -use anyhow::Result; -use gpui::{Entity, ModelContext, Task}; -use postage::prelude::Stream; -use smol::future::FutureExt; -use std::{collections::HashSet, sync::Arc, time::Duration}; -use zrpc::proto; - -pub struct Presence { - collaborators: Vec, - user_store: Arc, - rpc: Arc, - _maintain_people: Task<()>, -} - -#[derive(Debug)] -struct Collaborator { - user: Arc, - worktrees: Vec, -} - -#[derive(Debug)] -struct WorktreeMetadata { - root_name: String, - is_shared: bool, - participants: Vec>, -} - -impl Presence { - pub fn new(user_store: Arc, rpc: Arc, cx: &mut ModelContext) -> Self { - let _maintain_collaborators = cx.spawn_weak(|this, mut cx| { - let user_store = user_store.clone(); - let foreground = cx.foreground(); - async move { - let mut current_user = user_store.watch_current_user(); - loop { - let timer = foreground.timer(Duration::from_secs(2)); - let next_current_user = async { - current_user.recv().await; - }; - - next_current_user.race(timer).await; - if current_user.borrow().is_some() { - if let Some(this) = cx.read(|cx| this.upgrade(cx)) { - this.update(&mut cx, |this, cx| this.refresh(cx)) - .log_err() - .await; - } - } - } - } - }); - - Self { - collaborators: Vec::new(), - user_store, - rpc, - _maintain_people: _maintain_collaborators, - } - } - - fn refresh(&self, cx: &mut ModelContext) -> Task> { - cx.spawn(|this, mut cx| { - let rpc = self.rpc.clone(); - let user_store = self.user_store.clone(); - async move { - // let response = rpc.request(proto::GetCollaborators {}).await?; - // let mut user_ids = HashSet::new(); - // for collaborator in &response.collaborators { - // user_ids.insert(collaborator.user_id); - // user_ids.extend( - // collaborator - // .worktrees - // .iter() - // .flat_map(|w| &w.participants) - // .copied(), - // ); - // } - // user_store - // .load_users(user_ids.into_iter().collect()) - // .await?; - - // let mut collaborators = Vec::new(); - // for collaborator in response.collaborators { - // collaborators.push(Collaborator::from_proto(collaborator, &user_store).await?); - // } - - // this.update(&mut cx, |this, cx| { - // this.collaborators = collaborators; - // cx.notify(); - // }); - - Ok(()) - } - }) - } -} - -pub enum Event {} - -impl Entity for Presence { - type Event = Event; -} - -// impl Collaborator { -// async fn from_proto( -// collaborator: proto::Collaborator, -// user_store: &Arc, -// cx: &mut AsyncAppContext, -// ) -> Result { -// let user = user_store.fetch_user(collaborator.user_id).await?; -// let mut worktrees = Vec::new(); -// for worktree in collaborator.worktrees { -// let mut participants = Vec::new(); -// for participant_id in worktree.participants { -// participants.push(user_store.fetch_user(participant_id).await?); -// } -// worktrees.push(WorktreeMetadata { -// root_name: worktree.root_name, -// is_shared: worktree.is_shared, -// participants, -// }); -// } -// Ok(Self { user, worktrees }) -// } -// } diff --git a/zed/src/rpc.rs b/zed/src/rpc.rs index 6a81bf3c4a..7562a8cc1c 100644 --- a/zed/src/rpc.rs +++ b/zed/src/rpc.rs @@ -230,7 +230,11 @@ impl Client { } } - pub fn subscribe(self: &Arc, cx: ModelContext, mut handler: F) -> Subscription + pub fn subscribe( + self: &Arc, + cx: &mut ModelContext, + mut handler: F, + ) -> Subscription where T: EnvelopedMessage, M: Entity, diff --git a/zed/src/test.rs b/zed/src/test.rs index 9e02bb74db..eb6bf20f45 100644 --- a/zed/src/test.rs +++ b/zed/src/test.rs @@ -4,7 +4,6 @@ use crate::{ fs::RealFs, http::{HttpClient, Request, Response, ServerResponse}, language::LanguageRegistry, - presence::Presence, rpc::{self, Client, Credentials, EstablishConnectionError}, settings::{self, ThemeRegistry}, time::ReplicaId, diff --git a/zed/src/user.rs b/zed/src/user.rs index 8a050d5164..637c50e150 100644 --- a/zed/src/user.rs +++ b/zed/src/user.rs @@ -1,14 +1,17 @@ use crate::{ http::{HttpClient, Method, Request, Url}, - rpc::{Client, Status}, + rpc::{self, Client, Status}, util::TryFutureExt, }; use anyhow::{anyhow, Context, Result}; use futures::future; -use gpui::{Entity, ImageData, ModelContext, Task}; +use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task}; use postage::{prelude::Stream, sink::Sink, watch}; -use std::{collections::HashMap, sync::Arc}; -use zrpc::proto; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, +}; +use zrpc::{proto, TypedEnvelope}; #[derive(Debug)] pub struct User { @@ -17,11 +20,26 @@ pub struct User { pub avatar: Option>, } +#[derive(Debug)] +struct Collaborator { + pub user: Arc, + pub worktrees: Vec, +} + +#[derive(Debug)] +struct WorktreeMetadata { + pub root_name: String, + pub is_shared: bool, + pub participants: Vec>, +} + pub struct UserStore { users: HashMap>, current_user: watch::Receiver>>, + collaborators: Vec, rpc: Arc, http: Arc, + _maintain_collaborators: rpc::Subscription, _maintain_current_user: Task<()>, } @@ -37,8 +55,10 @@ impl UserStore { Self { users: Default::default(), current_user: current_user_rx, + collaborators: Default::default(), rpc: rpc.clone(), http, + _maintain_collaborators: rpc.subscribe(cx, Self::update_collaborators), _maintain_current_user: cx.spawn_weak(|this, mut cx| async move { let mut status = rpc.status(); while let Some(status) = status.recv().await { @@ -62,6 +82,45 @@ impl UserStore { } } + fn update_collaborators( + &mut self, + message: TypedEnvelope, + _: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + let mut user_ids = HashSet::new(); + for collaborator in &message.payload.collaborators { + user_ids.insert(collaborator.user_id); + user_ids.extend( + collaborator + .worktrees + .iter() + .flat_map(|w| &w.participants) + .copied(), + ); + } + + let load_users = self.load_users(user_ids.into_iter().collect(), cx); + cx.spawn(|this, mut cx| async move { + load_users.await?; + + let mut collaborators = Vec::new(); + for collaborator in message.payload.collaborators { + collaborators.push(Collaborator::from_proto(collaborator, &this, &mut cx).await?); + } + + this.update(&mut cx, |this, cx| { + this.collaborators = collaborators; + cx.notify(); + }); + + Result::<_, anyhow::Error>::Ok(()) + }) + .detach(); + + Ok(()) + } + pub fn load_users( &mut self, mut user_ids: Vec, @@ -134,6 +193,39 @@ impl User { } } +impl Collaborator { + async fn from_proto( + collaborator: proto::Collaborator, + user_store: &ModelHandle, + cx: &mut AsyncAppContext, + ) -> Result { + let user = user_store + .update(cx, |user_store, cx| { + user_store.fetch_user(collaborator.user_id, cx) + }) + .await?; + let mut worktrees = Vec::new(); + for worktree in collaborator.worktrees { + let mut participants = Vec::new(); + for participant_id in worktree.participants { + participants.push( + user_store + .update(cx, |user_store, cx| { + user_store.fetch_user(participant_id, cx) + }) + .await?, + ); + } + worktrees.push(WorktreeMetadata { + root_name: worktree.root_name, + is_shared: worktree.is_shared, + participants, + }); + } + Ok(Self { user, worktrees }) + } +} + async fn fetch_avatar(http: &dyn HttpClient, url: &str) -> Result> { let url = Url::parse(url).with_context(|| format!("failed to parse avatar url {:?}", url))?; let mut request = Request::new(Method::Get, url);