From 80ab144bf3f5d1c45739d76390ca97836020fcc9 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 26 Sep 2022 17:37:58 +0200 Subject: [PATCH] Ring users upon connection if somebody was calling them before connecting Co-Authored-By: Nathan Sobo --- crates/client/src/user.rs | 2 +- crates/collab/src/integration_tests.rs | 13 ++++- crates/collab/src/rpc.rs | 30 ++++------ crates/collab/src/rpc/store.rs | 80 ++++++++++++++++++-------- crates/room/src/room.rs | 8 ++- crates/rpc/proto/zed.proto | 4 +- 6 files changed, 90 insertions(+), 47 deletions(-) diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index 57a403ebad..0dbb8bb198 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -214,7 +214,7 @@ impl UserStore { .await?, from: this .update(&mut cx, |this, cx| { - this.get_user(envelope.payload.from_user_id, cx) + this.get_user(envelope.payload.caller_user_id, cx) }) .await?, }; diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 7834c3da7f..c235a31c55 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -66,6 +66,7 @@ async fn test_basic_calls( deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, + cx_b2: &mut TestAppContext, cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); @@ -111,8 +112,18 @@ async fn test_basic_calls( } ); - // User B receives the call and joins the room. + // User B receives the call. let call_b = incoming_call_b.next().await.unwrap().unwrap(); + + // User B connects via another client and also receives a ring on the newly-connected client. + let client_b2 = server.create_client(cx_b2, "user_b").await; + let mut incoming_call_b2 = client_b2 + .user_store + .update(cx_b2, |user, _| user.incoming_call()); + deterministic.run_until_parked(); + let _call_b2 = incoming_call_b2.next().await.unwrap().unwrap(); + + // User B joins the room using the first client. let room_b = cx_b .update(|cx| Room::join(&call_b, client_b.clone(), cx)) .await diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 04eaad4edb..55c3414d85 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -388,7 +388,11 @@ impl Server { { let mut store = this.store().await; - store.add_connection(connection_id, user_id, user.admin); + let incoming_call = store.add_connection(connection_id, user_id, user.admin); + if let Some(incoming_call) = incoming_call { + this.peer.send(connection_id, incoming_call)?; + } + this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?; if let Some((code, count)) = invite_code { @@ -648,28 +652,18 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let to_user_id = UserId::from_proto(request.payload.to_user_id); + let recipient_user_id = UserId::from_proto(request.payload.recipient_user_id); let room_id = request.payload.room_id; let mut calls = { let mut store = self.store().await; - let (from_user_id, recipient_connection_ids, room) = - store.call(room_id, request.sender_id, to_user_id)?; + let (room, recipient_connection_ids, incoming_call) = + store.call(room_id, request.sender_id, recipient_user_id)?; self.room_updated(room); recipient_connection_ids .into_iter() - .map(|recipient_id| { - self.peer.request( - recipient_id, - proto::IncomingCall { - room_id, - from_user_id: from_user_id.to_proto(), - participant_user_ids: room - .participants - .iter() - .map(|p| p.user_id) - .collect(), - }, - ) + .map(|recipient_connection_id| { + self.peer + .request(recipient_connection_id, incoming_call.clone()) }) .collect::>() }; @@ -688,7 +682,7 @@ impl Server { { let mut store = self.store().await; - let room = store.call_failed(room_id, to_user_id)?; + let room = store.call_failed(room_id, recipient_user_id)?; self.room_updated(&room); } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index f55da1763b..1c69a7c2f8 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -24,7 +24,7 @@ pub struct Store { #[derive(Default, Serialize)] struct ConnectedUser { connection_ids: HashSet, - active_call: Option, + active_call: Option, } #[derive(Serialize)] @@ -37,9 +37,10 @@ struct ConnectionState { } #[derive(Copy, Clone, Eq, PartialEq, Serialize)] -struct CallState { - room_id: RoomId, - joined: bool, +pub struct Call { + pub caller_user_id: UserId, + pub room_id: RoomId, + pub joined: bool, } #[derive(Serialize)] @@ -146,7 +147,12 @@ impl Store { } #[instrument(skip(self))] - pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) { + pub fn add_connection( + &mut self, + connection_id: ConnectionId, + user_id: UserId, + admin: bool, + ) -> Option { self.connections.insert( connection_id, ConnectionState { @@ -157,11 +163,26 @@ impl Store { channels: Default::default(), }, ); - self.connected_users - .entry(user_id) - .or_default() - .connection_ids - .insert(connection_id); + let connected_user = self.connected_users.entry(user_id).or_default(); + connected_user.connection_ids.insert(connection_id); + if let Some(active_call) = connected_user.active_call { + if active_call.joined { + None + } else { + let room = self.room(active_call.room_id)?; + Some(proto::IncomingCall { + room_id: active_call.room_id, + caller_user_id: active_call.caller_user_id.to_proto(), + participant_user_ids: room + .participants + .iter() + .map(|participant| participant.user_id) + .collect(), + }) + } + } else { + None + } } #[instrument(skip(self))] @@ -393,7 +414,8 @@ impl Store { let room_id = post_inc(&mut self.next_room_id); self.rooms.insert(room_id, room); - connected_user.active_call = Some(CallState { + connected_user.active_call = Some(Call { + caller_user_id: connection.user_id, room_id, joined: true, }); @@ -412,14 +434,16 @@ impl Store { let user_id = connection.user_id; let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::>(); - let mut connected_user = self + let connected_user = self .connected_users .get_mut(&user_id) .ok_or_else(|| anyhow!("no such connection"))?; + let active_call = connected_user + .active_call + .as_mut() + .ok_or_else(|| anyhow!("not being called"))?; anyhow::ensure!( - connected_user - .active_call - .map_or(false, |call| call.room_id == room_id && !call.joined), + active_call.room_id == room_id && !active_call.joined, "not being called on this room" ); @@ -443,10 +467,7 @@ impl Store { )), }), }); - connected_user.active_call = Some(CallState { - room_id, - joined: true, - }); + active_call.joined = true; Ok((room, recipient_connection_ids)) } @@ -493,8 +514,8 @@ impl Store { room_id: RoomId, from_connection_id: ConnectionId, recipient_id: UserId, - ) -> Result<(UserId, Vec, &proto::Room)> { - let from_user_id = self.user_id_for_connection(from_connection_id)?; + ) -> Result<(&proto::Room, Vec, proto::IncomingCall)> { + let caller_user_id = self.user_id_for_connection(from_connection_id)?; let recipient_connection_ids = self .connection_ids_for_user(recipient_id) @@ -525,12 +546,25 @@ impl Store { "cannot call the same user more than once" ); room.pending_user_ids.push(recipient_id.to_proto()); - recipient.active_call = Some(CallState { + recipient.active_call = Some(Call { + caller_user_id, room_id, joined: false, }); - Ok((from_user_id, recipient_connection_ids, room)) + Ok(( + room, + recipient_connection_ids, + proto::IncomingCall { + room_id, + caller_user_id: caller_user_id.to_proto(), + participant_user_ids: room + .participants + .iter() + .map(|participant| participant.user_id) + .collect(), + }, + )) } pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> { diff --git a/crates/room/src/room.rs b/crates/room/src/room.rs index 8d80b47508..c6daacd7e2 100644 --- a/crates/room/src/room.rs +++ b/crates/room/src/room.rs @@ -136,7 +136,11 @@ impl Room { Ok(()) } - pub fn call(&mut self, to_user_id: u64, cx: &mut ModelContext) -> Task> { + pub fn call( + &mut self, + recipient_user_id: u64, + cx: &mut ModelContext, + ) -> Task> { if self.status.is_offline() { return Task::ready(Err(anyhow!("room is offline"))); } @@ -147,7 +151,7 @@ impl Room { client .request(proto::Call { room_id, - to_user_id, + recipient_user_id, }) .await?; Ok(()) diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 47f33a5d25..1125c2b3ad 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -181,12 +181,12 @@ message ParticipantLocation { message Call { uint64 room_id = 1; - uint64 to_user_id = 2; + uint64 recipient_user_id = 2; } message IncomingCall { uint64 room_id = 1; - uint64 from_user_id = 2; + uint64 caller_user_id = 2; repeated uint64 participant_user_ids = 3; }