From b307a7e91d56326af64b22106b73572c3d2874e8 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Fri, 26 Nov 2021 20:35:50 -0700 Subject: [PATCH] Populate the user data of worktree collaborators This will make it possible for us to render their avatars. Previously we only had the user ids. During rendering, everything needs to be available synchronously. So now, whenever collaborators are added, we perform the async I/O to fetch their user data prior to adding them to the worktree. --- crates/client/src/test.rs | 20 +++- crates/project/src/worktree.rs | 170 +++++++++++++++++---------------- crates/rpc/proto/zed.proto | 14 +-- crates/rpc/src/proto.rs | 8 +- crates/server/src/rpc.rs | 47 +++++---- 5 files changed, 145 insertions(+), 114 deletions(-) diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index b91a943002..a40d1ee3a7 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -2,7 +2,7 @@ use super::Client; use super::*; use crate::http::{HttpClient, Request, Response, ServerResponse}; use futures::{future::BoxFuture, Future}; -use gpui::TestAppContext; +use gpui::{ModelHandle, TestAppContext}; use parking_lot::Mutex; use postage::{mpsc, prelude::Stream}; use rpc::{proto, ConnectionId, Peer, Receipt, TypedEnvelope}; @@ -155,6 +155,24 @@ impl FakeServer { fn connection_id(&self) -> ConnectionId { self.connection_id.lock().expect("not connected") } + + pub async fn build_user_store( + &self, + client: Arc, + cx: &mut TestAppContext, + ) -> ModelHandle { + let http_client = FakeHttpClient::with_404_response(); + let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx)); + assert_eq!( + self.receive::() + .await + .unwrap() + .payload + .user_ids, + &[self.user_id] + ); + user_store + } } pub struct FakeHttpClient { diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index f21abf06de..0fec2d4209 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -69,6 +69,26 @@ pub struct Collaborator { pub replica_id: ReplicaId, } +impl Collaborator { + fn from_proto( + message: proto::Collaborator, + user_store: &ModelHandle, + cx: &mut AsyncAppContext, + ) -> impl Future> { + let user = user_store.update(cx, |user_store, cx| { + user_store.fetch_user(message.user_id, cx) + }); + + async move { + Ok(Self { + peer_id: PeerId(message.peer_id), + user: user.await?, + replica_id: message.replica_id as ReplicaId, + }) + } + } +} + impl Entity for Worktree { type Event = Event; @@ -174,7 +194,6 @@ impl Worktree { let remote_id = worktree.id; let replica_id = join_response.replica_id as ReplicaId; - let peers = join_response.peers; let root_char_bag: CharBag = worktree .root_name .chars() @@ -209,24 +228,18 @@ impl Worktree { }) .await; - let user_ids = peers.iter().map(|peer| peer.user_id).collect(); + let user_ids = join_response + .collaborators + .iter() + .map(|peer| peer.user_id) + .collect(); user_store .update(cx, |user_store, cx| user_store.load_users(user_ids, cx)) .await?; - let mut collaborators = HashMap::with_capacity(peers.len()); - for peer in &peers { - let peer_id = PeerId(peer.peer_id); - let user = user_store - .update(cx, |user_store, cx| user_store.fetch_user(peer.user_id, cx)) - .await?; - collaborators.insert( - peer_id, - Collaborator { - peer_id, - user, - replica_id: peer.replica_id as ReplicaId, - }, - ); + let mut collaborators = HashMap::with_capacity(join_response.collaborators.len()); + for message in join_response.collaborators { + let collaborator = Collaborator::from_proto(message, &user_store, cx).await?; + collaborators.insert(collaborator.peer_id, collaborator); } let worktree = cx.update(|cx| { @@ -274,8 +287,8 @@ impl Worktree { } let _subscriptions = vec![ - client.subscribe_to_entity(remote_id, cx, Self::handle_add_peer), - client.subscribe_to_entity(remote_id, cx, Self::handle_remove_peer), + client.subscribe_to_entity(remote_id, cx, Self::handle_add_collaborator), + client.subscribe_to_entity(remote_id, cx, Self::handle_remove_collaborator), client.subscribe_to_entity(remote_id, cx, Self::handle_update), client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer), client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved), @@ -347,27 +360,52 @@ impl Worktree { } } - pub fn handle_add_peer( - &mut self, - envelope: TypedEnvelope, - _: Arc, - cx: &mut ModelContext, - ) -> Result<()> { + pub fn user_store(&self) -> &ModelHandle { match self { - Worktree::Local(worktree) => worktree.add_peer(envelope, cx), - Worktree::Remote(worktree) => worktree.add_peer(envelope, cx), + Worktree::Local(worktree) => &worktree.user_store, + Worktree::Remote(worktree) => &worktree.user_store, } } - pub fn handle_remove_peer( + pub fn handle_add_collaborator( &mut self, - envelope: TypedEnvelope, + mut envelope: TypedEnvelope, + _: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + let user_store = self.user_store().clone(); + let collaborator = envelope + .payload + .collaborator + .take() + .ok_or_else(|| anyhow!("empty collaborator"))?; + + cx.spawn(|this, mut cx| { + async move { + let collaborator = + Collaborator::from_proto(collaborator, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| match this { + Worktree::Local(worktree) => worktree.add_collaborator(collaborator, cx), + Worktree::Remote(worktree) => worktree.add_collaborator(collaborator, cx), + }); + Ok(()) + } + .log_err() + }) + .detach(); + + Ok(()) + } + + pub fn handle_remove_collaborator( + &mut self, + envelope: TypedEnvelope, _: Arc, cx: &mut ModelContext, ) -> Result<()> { match self { - Worktree::Local(worktree) => worktree.remove_peer(envelope, cx), - Worktree::Remote(worktree) => worktree.remove_peer(envelope, cx), + Worktree::Local(worktree) => worktree.remove_collaborator(envelope, cx), + Worktree::Remote(worktree) => worktree.remove_collaborator(envelope, cx), } } @@ -1107,33 +1145,19 @@ impl LocalWorktree { Ok(()) } - pub fn add_peer( + pub fn add_collaborator( &mut self, - envelope: TypedEnvelope, + collaborator: Collaborator, cx: &mut ModelContext, - ) -> Result<()> { - let peer = envelope - .payload - .peer - .as_ref() - .ok_or_else(|| anyhow!("empty peer"))?; - let peer_id = PeerId(peer.peer_id); - self.collaborators.insert( - peer_id, - Collaborator { - peer_id, - user: todo!(), - replica_id: peer.replica_id as ReplicaId, - }, - ); + ) { + self.collaborators + .insert(collaborator.peer_id, collaborator); cx.notify(); - - Ok(()) } - pub fn remove_peer( + pub fn remove_collaborator( &mut self, - envelope: TypedEnvelope, + envelope: TypedEnvelope, cx: &mut ModelContext, ) -> Result<()> { let peer_id = PeerId(envelope.payload.peer_id); @@ -1316,8 +1340,8 @@ impl LocalWorktree { this.update(&mut cx, |worktree, cx| { let _subscriptions = vec![ - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_peer), - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_peer), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_collaborator), + rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_collaborator), rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer), rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer), rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer), @@ -1532,32 +1556,19 @@ impl RemoteWorktree { Ok(()) } - pub fn add_peer( + pub fn add_collaborator( &mut self, - envelope: TypedEnvelope, + collaborator: Collaborator, cx: &mut ModelContext, - ) -> Result<()> { - let peer = envelope - .payload - .peer - .as_ref() - .ok_or_else(|| anyhow!("empty peer"))?; - let peer_id = PeerId(peer.peer_id); - self.collaborators.insert( - peer_id, - Collaborator { - peer_id, - user: todo!(), - replica_id: peer.replica_id as ReplicaId, - }, - ); + ) { + self.collaborators + .insert(collaborator.peer_id, collaborator); cx.notify(); - Ok(()) } - pub fn remove_peer( + pub fn remove_collaborator( &mut self, - envelope: TypedEnvelope, + envelope: TypedEnvelope, cx: &mut ModelContext, ) -> Result<()> { let peer_id = PeerId(envelope.payload.peer_id); @@ -3146,9 +3157,8 @@ mod tests { let user_id = 5; let mut client = Client::new(); - let http_client = FakeHttpClient::with_404_response(); - let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx)); let server = FakeServer::for_client(user_id, &mut client, &cx).await; + let user_store = server.build_user_store(client.clone(), &mut cx).await; let tree = Worktree::open_local( client, user_store.clone(), @@ -3203,7 +3213,7 @@ mod tests { proto::JoinWorktreeResponse { worktree: share_request.await.unwrap().worktree, replica_id: 1, - peers: Vec::new(), + collaborators: Vec::new(), }, Client::new(), user_store, @@ -3355,8 +3365,7 @@ mod tests { let user_id = 100; let mut client = Client::new(); let server = FakeServer::for_client(user_id, &mut client, &cx).await; - let http_client = FakeHttpClient::with_404_response(); - let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx)); + let user_store = server.build_user_store(client.clone(), &mut cx).await; let fs = Arc::new(FakeFs::new()); fs.insert_tree( @@ -3383,11 +3392,6 @@ mod tests { .await .unwrap(); - { - let cx = cx.to_async(); - client.authenticate_and_connect(&cx).await.unwrap(); - } - let open_worktree = server.receive::().await.unwrap(); assert_eq!( open_worktree.payload, diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 5b899ae916..775f94d595 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -21,8 +21,8 @@ message Envelope { UpdateBuffer update_buffer = 16; SaveBuffer save_buffer = 17; BufferSaved buffer_saved = 18; - AddPeer add_peer = 19; - RemovePeer remove_peer = 20; + AddCollaborator add_collaborator = 19; + RemoveCollaborator remove_collaborator = 20; GetChannels get_channels = 21; GetChannelsResponse get_channels_response = 22; GetUsers get_users = 23; @@ -83,7 +83,7 @@ message LeaveWorktree { message JoinWorktreeResponse { Worktree worktree = 2; uint32 replica_id = 3; - repeated Peer peers = 4; + repeated Collaborator collaborators = 4; } message UpdateWorktree { @@ -96,12 +96,12 @@ message CloseWorktree { uint64 worktree_id = 1; } -message AddPeer { +message AddCollaborator { uint64 worktree_id = 1; - Peer peer = 2; + Collaborator collaborator = 2; } -message RemovePeer { +message RemoveCollaborator { uint64 worktree_id = 1; uint32 peer_id = 2; } @@ -196,7 +196,7 @@ message UpdateContacts { // Entities -message Peer { +message Collaborator { uint32 peer_id = 1; uint32 replica_id = 2; uint64 user_id = 3; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 8299952c79..bfdce85b77 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -121,7 +121,7 @@ macro_rules! entity_messages { messages!( Ack, - AddPeer, + AddCollaborator, BufferSaved, ChannelMessageSent, CloseBuffer, @@ -145,7 +145,7 @@ messages!( OpenWorktree, OpenWorktreeResponse, Ping, - RemovePeer, + RemoveCollaborator, SaveBuffer, SendChannelMessage, SendChannelMessageResponse, @@ -174,13 +174,13 @@ request_messages!( entity_messages!( worktree_id, - AddPeer, + AddCollaborator, BufferSaved, CloseBuffer, CloseWorktree, OpenBuffer, JoinWorktree, - RemovePeer, + RemoveCollaborator, SaveBuffer, UnshareWorktree, UpdateBuffer, diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 9f4c724770..a4c21b4ea4 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -187,7 +187,7 @@ impl Server { broadcast(connection_id, peer_ids, |conn_id| { self.peer.send( conn_id, - proto::RemovePeer { + proto::RemoveCollaborator { worktree_id, peer_id: connection_id.0, }, @@ -341,15 +341,15 @@ impl Server { .and_then(|joined| { let share = joined.worktree.share()?; let peer_count = share.guests.len(); - let mut peers = Vec::with_capacity(peer_count); - peers.push(proto::Peer { + let mut collaborators = Vec::with_capacity(peer_count); + collaborators.push(proto::Collaborator { peer_id: joined.worktree.host_connection_id.0, replica_id: 0, user_id: joined.worktree.host_user_id.to_proto(), }); for (peer_conn_id, (peer_replica_id, peer_user_id)) in &share.guests { if *peer_conn_id != request.sender_id { - peers.push(proto::Peer { + collaborators.push(proto::Collaborator { peer_id: peer_conn_id.0, replica_id: *peer_replica_id as u32, user_id: peer_user_id.to_proto(), @@ -363,7 +363,7 @@ impl Server { entries: share.entries.values().cloned().collect(), }), replica_id: joined.replica_id as u32, - peers, + collaborators, }; let connection_ids = joined.worktree.connection_ids(); let contact_user_ids = joined.worktree.authorized_user_ids.clone(); @@ -375,9 +375,9 @@ impl Server { broadcast(request.sender_id, connection_ids, |conn_id| { self.peer.send( conn_id, - proto::AddPeer { + proto::AddCollaborator { worktree_id, - peer: Some(proto::Peer { + collaborator: Some(proto::Collaborator { peer_id: request.sender_id.0, replica_id: response.replica_id, user_id: user_id.to_proto(), @@ -415,7 +415,7 @@ impl Server { broadcast(sender_id, worktree.connection_ids, |conn_id| { self.peer.send( conn_id, - proto::RemovePeer { + proto::RemoveCollaborator { worktree_id, peer_id: sender_id.0, }, @@ -960,7 +960,7 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; cx_a.foreground().forbid_parking(); @@ -978,6 +978,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/a".as_ref(), fs, lang_registry.clone(), @@ -1052,7 +1053,7 @@ mod tests { .condition(&cx_a, |tree, cx| !tree.has_open_buffer("b.txt", cx)) .await; - // Dropping the worktree removes client B from client A's peers. + // Dropping the worktree removes client B from client A's collaborators. cx_b.update(move |_| drop(worktree_b)); worktree_a .condition(&cx_a, |tree, _| tree.collaborators().is_empty()) @@ -1089,6 +1090,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( app_state_a.client.clone(), + app_state_a.user_store.clone(), "/a".as_ref(), fs, app_state_a.languages.clone(), @@ -1163,7 +1165,7 @@ mod tests { // Connect to a server as 3 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; let (client_c, user_store_c) = server.create_client(&mut cx_c, "user_c").await; @@ -1182,6 +1184,7 @@ mod tests { let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/a".as_ref(), fs.clone(), lang_registry.clone(), @@ -1304,7 +1307,7 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A @@ -1320,6 +1323,7 @@ mod tests { let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/dir".as_ref(), fs, lang_registry.clone(), @@ -1390,7 +1394,7 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A @@ -1405,6 +1409,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/dir".as_ref(), fs, lang_registry.clone(), @@ -1457,7 +1462,7 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A @@ -1472,6 +1477,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/dir".as_ref(), fs, lang_registry.clone(), @@ -1512,14 +1518,14 @@ mod tests { } #[gpui::test] - async fn test_peer_disconnection(mut cx_a: TestAppContext, cx_b: TestAppContext) { + async fn test_peer_disconnection(mut cx_a: TestAppContext, mut cx_b: TestAppContext) { cx_a.foreground().forbid_parking(); let lang_registry = Arc::new(LanguageRegistry::new()); // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_a, "user_b").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; + let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); @@ -1534,6 +1540,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/a".as_ref(), fs, lang_registry.clone(), @@ -1593,8 +1600,8 @@ mod tests { // Connect to a server as 2 clients. let mut server = TestServer::start().await; - let (client_a, _) = server.create_client(&mut cx_a, "user_a").await; - let (client_b, user_store_b) = server.create_client(&mut cx_a, "user_b").await; + let (client_a, user_store_a) = server.create_client(&mut cx_a, "user_a").await; + let (client_b, user_store_b) = server.create_client(&mut cx_b, "user_b").await; // Share a local worktree as client A let fs = Arc::new(FakeFs::new()); @@ -1609,6 +1616,7 @@ mod tests { .await; let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a, "/a".as_ref(), fs, lang_registry.clone(), @@ -2140,6 +2148,7 @@ mod tests { let worktree_a = Worktree::open_local( client_a.clone(), + user_store_a.clone(), "/a".as_ref(), fs.clone(), lang_registry.clone(),