diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index cd4868e7c6..b5a26ab47a 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -270,7 +270,7 @@ impl Server { } let contacts = this.app_state.db.get_contacts(user_id).await?; - + { let mut store = this.store_mut().await; store.add_connection(connection_id, user_id); @@ -324,7 +324,7 @@ impl Server { if let Err(error) = this.sign_out(connection_id).await { tracing::error!(%error, "error signing out"); } - + Ok(()) }.instrument(span) } @@ -332,7 +332,7 @@ impl Server { async fn sign_out(self: &mut Arc, connection_id: ConnectionId) -> Result<()> { self.peer.disconnect(connection_id); let removed_connection = self.store_mut().await.remove_connection(connection_id)?; - + for (project_id, project) in removed_connection.hosted_projects { if let Some(share) = project.share { broadcast( @@ -357,22 +357,26 @@ impl Server { ) }); } - - let contacts_to_update = self.app_state.db.get_contacts(removed_connection.user_id).await?; + + let contacts_to_update = self + .app_state + .db + .get_contacts(removed_connection.user_id) + .await?; let mut update = proto::UpdateContacts::default(); update.contacts.push(proto::Contact { user_id: removed_connection.user_id.to_proto(), projects: Default::default(), online: false, }); - + let store = self.store().await; for user_id in contacts_to_update.current { for connection_id in store.connection_ids_for_user(user_id) { - self.peer.send(connection_id, update.clone()); + self.peer.send(connection_id, update.clone()).trace_err(); } - } - + } + Ok(()) } @@ -955,24 +959,26 @@ impl Server { .db .send_contact_request(requester_id, responder_id) .await?; - + // Update outgoing contact requests of requester let mut update = proto::UpdateContacts::default(); update.outgoing_requests.push(responder_id.to_proto()); for connection_id in self.store().await.connection_ids_for_user(requester_id) { self.peer.send(connection_id, update.clone())?; } - + // Update incoming contact requests of responder let mut update = proto::UpdateContacts::default(); - update.incoming_requests.push(proto::IncomingContactRequest { - requester_id: requester_id.to_proto(), - should_notify: true, - }); + update + .incoming_requests + .push(proto::IncomingContactRequest { + requester_id: requester_id.to_proto(), + should_notify: true, + }); for connection_id in self.store().await.connection_ids_for_user(responder_id) { self.peer.send(connection_id, update.clone())?; } - + response.send(proto::Ack {})?; Ok(()) } @@ -991,41 +997,51 @@ impl Server { let accept = request.payload.response == proto::ContactRequestResponse::Accept as i32; self.app_state .db - .respond_to_contact_request( - responder_id, - requester_id, - accept, - ) + .respond_to_contact_request(responder_id, requester_id, accept) .await?; - + + // Update responder with new contact + let mut update = proto::UpdateContacts::default(); if accept { - // Update responder with new contact - let mut update = proto::UpdateContacts::default(); update.contacts.push(proto::Contact { user_id: requester_id.to_proto(), projects: Default::default(), // TODO - online: true, // TODO + online: true, // TODO }); - update.remove_incoming_requests.push(requester_id.to_proto()); - for connection_id in self.store.read().await.connection_ids_for_user(responder_id) { - self.peer.send(connection_id, update.clone())?; - } - - // Update requester with new contact - let mut update = proto::UpdateContacts::default(); + } + update + .remove_incoming_requests + .push(requester_id.to_proto()); + for connection_id in self + .store + .read() + .await + .connection_ids_for_user(responder_id) + { + self.peer.send(connection_id, update.clone())?; + } + + // Update requester with new contact + let mut update = proto::UpdateContacts::default(); + if accept { update.contacts.push(proto::Contact { user_id: responder_id.to_proto(), projects: Default::default(), // TODO - online: true, // TODO + online: true, // TODO }); - update.remove_outgoing_requests.push(responder_id.to_proto()); - for connection_id in self.store.read().await.connection_ids_for_user(requester_id) { - self.peer.send(connection_id, update.clone())?; - } - } else { - todo!() } - + update + .remove_outgoing_requests + .push(responder_id.to_proto()); + for connection_id in self + .store + .read() + .await + .connection_ids_for_user(requester_id) + { + self.peer.send(connection_id, update.clone())?; + } + response.send(proto::Ack {})?; Ok(()) } @@ -1374,7 +1390,8 @@ pub async fn handle_websocket_request( .with(|message| async move { Ok(to_axum_message(message)) }); let connection = Connection::new(Box::pin(socket)); async move { - server.handle_connection(connection, socket_address, user_id, None, RealExecutor) + server + .handle_connection(connection, socket_address, user_id, None, RealExecutor) .await .log_err(); } @@ -5031,9 +5048,17 @@ mod tests { .collect() } } - + #[gpui::test(iterations = 10)] - async fn test_contact_requests(executor: Arc, cx_a: &mut TestAppContext, cx_a2: &mut TestAppContext, cx_b: &mut TestAppContext, cx_b2: &mut TestAppContext) { + async fn test_contact_requests( + executor: Arc, + cx_a: &mut TestAppContext, + cx_a2: &mut TestAppContext, + cx_b: &mut TestAppContext, + cx_b2: &mut TestAppContext, + cx_c: &mut TestAppContext, + cx_c2: &mut TestAppContext, + ) { cx_a.foreground().forbid_parking(); // Connect to a server as 3 clients. @@ -5042,10 +5067,14 @@ mod tests { let client_a2 = server.create_client(cx_a2, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_b2 = server.create_client(cx_b2, "user_b").await; - - assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap()); + let client_c = server.create_client(cx_c, "user_c").await; + let client_c2 = server.create_client(cx_c2, "user_c").await; - // User A requests that user B become their contact + assert_eq!(client_a.user_id().unwrap(), client_a2.user_id().unwrap()); + assert_eq!(client_b.user_id().unwrap(), client_b2.user_id().unwrap()); + assert_eq!(client_c.user_id().unwrap(), client_c2.user_id().unwrap()); + + // User A and User C request that user B become their contact. client_a .user_store .read_with(cx_a, |store, _| { @@ -5053,36 +5082,78 @@ mod tests { }) .await .unwrap(); + client_c + .user_store + .read_with(cx_c, |store, _| { + store.request_contact(client_b.user_id().unwrap()) + }) + .await + .unwrap(); executor.run_until_parked(); - - // Both users see the pending request appear in all their clients. - assert_eq!(client_a.summarize_contacts(&cx_a).outgoing_requests, &["user_b"]); - assert_eq!(client_a2.summarize_contacts(&cx_a2).outgoing_requests, &["user_b"]); - assert_eq!(client_b.summarize_contacts(&cx_b).incoming_requests, &["user_a"]); - assert_eq!(client_b2.summarize_contacts(&cx_b2).incoming_requests, &["user_a"]); - + + // All users see the pending request appear in all their clients. + assert_eq!( + client_a.summarize_contacts(&cx_a).outgoing_requests, + &["user_b"] + ); + assert_eq!( + client_a2.summarize_contacts(&cx_a2).outgoing_requests, + &["user_b"] + ); + assert_eq!( + client_b.summarize_contacts(&cx_b).incoming_requests, + &["user_a", "user_c"] + ); + assert_eq!( + client_b2.summarize_contacts(&cx_b2).incoming_requests, + &["user_a", "user_c"] + ); + assert_eq!( + client_c.summarize_contacts(&cx_c).outgoing_requests, + &["user_b"] + ); + assert_eq!( + client_c2.summarize_contacts(&cx_c2).outgoing_requests, + &["user_b"] + ); + // Contact requests are present upon connecting (tested here via disconnect/reconnect) disconnect_and_reconnect(&client_a, cx_a).await; disconnect_and_reconnect(&client_b, cx_b).await; + disconnect_and_reconnect(&client_c, cx_c).await; executor.run_until_parked(); - assert_eq!(client_a.summarize_contacts(&cx_a).outgoing_requests, &["user_b"]); - assert_eq!(client_b.summarize_contacts(&cx_b).incoming_requests, &["user_a"]); - - // User B accepts the request. - client_b.user_store.read_with(cx_b, |store, _| { - store.respond_to_contact_request(client_a.user_id().unwrap(), true) - }).await.unwrap(); + assert_eq!( + client_a.summarize_contacts(&cx_a).outgoing_requests, + &["user_b"] + ); + assert_eq!( + client_b.summarize_contacts(&cx_b).incoming_requests, + &["user_a", "user_c"] + ); + assert_eq!( + client_c.summarize_contacts(&cx_c).outgoing_requests, + &["user_b"] + ); + + // User B accepts the request from user A. + client_b + .user_store + .read_with(cx_b, |store, _| { + store.respond_to_contact_request(client_a.user_id().unwrap(), true) + }) + .await + .unwrap(); executor.run_until_parked(); // User B sees user A as their contact now in all client, and the incoming request from them is removed. let contacts_b = client_b.summarize_contacts(&cx_b); assert_eq!(contacts_b.current, &["user_a"]); - assert!(contacts_b.incoming_requests.is_empty()); + assert_eq!(contacts_b.incoming_requests, &["user_c"]); let contacts_b2 = client_b2.summarize_contacts(&cx_b2); assert_eq!(contacts_b2.current, &["user_a"]); - assert!(contacts_b2.incoming_requests.is_empty()); - + assert_eq!(contacts_b2.incoming_requests, &["user_c"]); + // User A sees user B as their contact now in all clients, and the outgoing request to them is removed. let contacts_a = client_a.summarize_contacts(&cx_a); assert_eq!(contacts_a.current, &["user_b"]); @@ -5094,14 +5165,71 @@ mod tests { // Contacts are present upon connecting (tested here via disconnect/reconnect) disconnect_and_reconnect(&client_a, cx_a).await; disconnect_and_reconnect(&client_b, cx_b).await; + disconnect_and_reconnect(&client_c, cx_c).await; executor.run_until_parked(); assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]); - // assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]); - + assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]); + assert_eq!( + client_b.summarize_contacts(&cx_b).incoming_requests, + &["user_c"] + ); + assert!(client_c.summarize_contacts(&cx_c).current.is_empty()); + assert_eq!( + client_c.summarize_contacts(&cx_c).outgoing_requests, + &["user_b"] + ); + + // User B rejects the request from user C. + client_b + .user_store + .read_with(cx_b, |store, _| { + store.respond_to_contact_request(client_c.user_id().unwrap(), false) + }) + .await + .unwrap(); + + executor.run_until_parked(); + + // User B doesn't see user C as their contact, and the incoming request from them is removed. + let contacts_b = client_b.summarize_contacts(&cx_b); + assert_eq!(contacts_b.current, &["user_a"]); + assert!(contacts_b.incoming_requests.is_empty()); + let contacts_b2 = client_b2.summarize_contacts(&cx_b2); + assert_eq!(contacts_b2.current, &["user_a"]); + assert!(contacts_b2.incoming_requests.is_empty()); + + // User C doesn't see user B as their contact, and the outgoing request to them is removed. + let contacts_c = client_c.summarize_contacts(&cx_c); + assert!(contacts_c.current.is_empty()); + assert!(contacts_c.outgoing_requests.is_empty()); + let contacts_c2 = client_c2.summarize_contacts(&cx_c2); + assert!(contacts_c2.current.is_empty()); + assert!(contacts_c2.outgoing_requests.is_empty()); + + // Incoming/outgoing requests are not present upon connecting (tested here via disconnect/reconnect) + disconnect_and_reconnect(&client_a, cx_a).await; + disconnect_and_reconnect(&client_b, cx_b).await; + disconnect_and_reconnect(&client_c, cx_c).await; + executor.run_until_parked(); + assert_eq!(client_a.summarize_contacts(&cx_a).current, &["user_b"]); + assert_eq!(client_b.summarize_contacts(&cx_b).current, &["user_a"]); + assert!(client_b + .summarize_contacts(&cx_b) + .incoming_requests + .is_empty()); + assert!(client_c.summarize_contacts(&cx_c).current.is_empty()); + assert!(client_c + .summarize_contacts(&cx_c) + .outgoing_requests + .is_empty()); + async fn disconnect_and_reconnect(client: &TestClient, cx: &mut TestAppContext) { client.disconnect(&cx.to_async()).unwrap(); client.clear_contacts(cx); - client.authenticate_and_connect(false, &cx.to_async()).await.unwrap(); + client + .authenticate_and_connect(false, &cx.to_async()) + .await + .unwrap(); } } @@ -6143,11 +6271,12 @@ mod tests { }); let http = FakeHttpClient::with_404_response(); - let user_id = if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await { - user.id - } else { - self.app_state.db.create_user(name, false).await.unwrap() - }; + let user_id = + if let Ok(Some(user)) = self.app_state.db.get_user_by_github_login(name).await { + user.id + } else { + self.app_state.db.create_user(name, false).await.unwrap() + }; let client_name = name.to_string(); let mut client = Client::new(http.clone()); let server = self.server.clone(); @@ -6299,7 +6428,7 @@ mod tests { &self.client } } - + struct ContactsSummary { pub current: Vec, pub outgoing_requests: Vec, @@ -6320,20 +6449,30 @@ mod tests { .read_with(cx, |user_store, _| user_store.watch_current_user()); while authed_user.next().await.unwrap().is_none() {} } - + fn clear_contacts(&self, cx: &mut TestAppContext) { self.user_store.update(cx, |store, _| { store.clear_contacts(); }); } - + fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary { - self.user_store.read_with(cx, |store, cx| { - ContactsSummary { - current: store.contacts().iter().map(|contact| contact.user.github_login.clone()).collect(), - outgoing_requests: store.outgoing_contact_requests().iter().map(|user| user.github_login.clone()).collect(), - incoming_requests: store.incoming_contact_requests().iter().map(|user| user.github_login.clone()).collect(), - } + self.user_store.read_with(cx, |store, cx| ContactsSummary { + current: store + .contacts() + .iter() + .map(|contact| contact.user.github_login.clone()) + .collect(), + outgoing_requests: store + .outgoing_contact_requests() + .iter() + .map(|user| user.github_login.clone()) + .collect(), + incoming_requests: store + .incoming_contact_requests() + .iter() + .map(|user| user.github_login.clone()) + .collect(), }) }