From 1d4bdfc4a18ba8073ba5bffb94cad1b4942eb95d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 11 Oct 2022 11:28:27 +0200 Subject: [PATCH] Cancel calls automatically when caller hangs up or disconnects --- crates/collab/src/integration_tests.rs | 34 ++++++++++++ crates/collab/src/rpc.rs | 40 +++++++++----- crates/collab/src/rpc/store.rs | 76 +++++++++++++------------- 3 files changed, 97 insertions(+), 53 deletions(-) diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index ce9d01e3d3..42c95c4b2e 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -492,6 +492,40 @@ async fn test_calls_on_multiple_connections( deterministic.run_until_parked(); assert!(incoming_call_b1.next().await.unwrap().is_none()); assert!(incoming_call_b2.next().await.unwrap().is_none()); + + // User A calls user B again. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b1.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + deterministic.run_until_parked(); + assert!(incoming_call_b1.next().await.unwrap().is_some()); + assert!(incoming_call_b2.next().await.unwrap().is_some()); + + // User A hangs up, causing both connections to stop ringing. + active_call_a.update(cx_a, |call, cx| call.hang_up(cx).unwrap()); + deterministic.run_until_parked(); + assert!(incoming_call_b1.next().await.unwrap().is_none()); + assert!(incoming_call_b2.next().await.unwrap().is_none()); + + // User A calls user B again. + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_b1.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + deterministic.run_until_parked(); + assert!(incoming_call_b1.next().await.unwrap().is_some()); + assert!(incoming_call_b2.next().await.unwrap().is_some()); + + // User A disconnects up, causing both connections to stop ringing. + server.disconnect_client(client_a.current_user_id(cx_a)); + cx_a.foreground().advance_clock(rpc::RECEIVE_TIMEOUT); + assert!(incoming_call_b1.next().await.unwrap().is_none()); + assert!(incoming_call_b2.next().await.unwrap().is_none()); } #[gpui::test(iterations = 10)] diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 11219fb8b8..febdfe4434 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -479,30 +479,34 @@ impl Server { let mut store = self.store().await; let removed_connection = store.remove_connection(connection_id)?; - for (project_id, project) in removed_connection.hosted_projects { - projects_to_unshare.push(project_id); + for project in removed_connection.hosted_projects { + projects_to_unshare.push(project.id); broadcast(connection_id, project.guests.keys().copied(), |conn_id| { self.peer.send( conn_id, proto::UnshareProject { - project_id: project_id.to_proto(), + project_id: project.id.to_proto(), }, ) }); } - for project_id in removed_connection.guest_project_ids { - if let Some(project) = store.project(project_id).trace_err() { - broadcast(connection_id, project.connection_ids(), |conn_id| { - self.peer.send( - conn_id, - proto::RemoveProjectCollaborator { - project_id: project_id.to_proto(), - peer_id: connection_id.0, - }, - ) - }); - } + for project in removed_connection.guest_projects { + broadcast(connection_id, project.connection_ids, |conn_id| { + self.peer.send( + conn_id, + proto::RemoveProjectCollaborator { + project_id: project.id.to_proto(), + peer_id: connection_id.0, + }, + ) + }); + } + + for connection_id in removed_connection.canceled_call_connection_ids { + self.peer + .send(connection_id, proto::CallCanceled {}) + .trace_err(); } if let Some(room) = removed_connection @@ -666,6 +670,12 @@ impl Server { } } + for connection_id in left_room.canceled_call_connection_ids { + self.peer + .send(connection_id, proto::CallCanceled {}) + .trace_err(); + } + if let Some(room) = left_room.room { self.room_updated(room); } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 12f50db48e..9b2661abca 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -86,10 +86,11 @@ pub type ReplicaId = u16; #[derive(Default)] pub struct RemovedConnectionState { pub user_id: UserId, - pub hosted_projects: HashMap, - pub guest_project_ids: HashSet, + pub hosted_projects: Vec, + pub guest_projects: Vec, pub contact_ids: HashSet, pub room_id: Option, + pub canceled_call_connection_ids: Vec, } pub struct LeftProject { @@ -104,6 +105,7 @@ pub struct LeftRoom<'a> { pub room: Option<&'a proto::Room>, pub unshared_projects: Vec, pub left_projects: Vec, + pub canceled_call_connection_ids: Vec, } #[derive(Copy, Clone)] @@ -197,7 +199,6 @@ impl Store { .ok_or_else(|| anyhow!("no such connection"))?; let user_id = connection.user_id; - let connection_projects = mem::take(&mut connection.projects); let connection_channels = mem::take(&mut connection.channels); let mut result = RemovedConnectionState { @@ -210,48 +211,21 @@ impl Store { self.leave_channel(connection_id, channel_id); } - // Unshare and leave all projects. - for project_id in connection_projects { - if let Ok((_, project)) = self.unshare_project(project_id, connection_id) { - result.hosted_projects.insert(project_id, project); - } else if self.leave_project(project_id, connection_id).is_ok() { - result.guest_project_ids.insert(project_id); - } + let connected_user = self.connected_users.get(&user_id).unwrap(); + if let Some(active_call) = connected_user.active_call.as_ref() { + let room_id = active_call.room_id; + let left_room = self.leave_room(room_id, connection_id)?; + result.hosted_projects = left_room.unshared_projects; + result.guest_projects = left_room.left_projects; + result.room_id = Some(room_id); + result.canceled_call_connection_ids = left_room.canceled_call_connection_ids; } let connected_user = self.connected_users.get_mut(&user_id).unwrap(); connected_user.connection_ids.remove(&connection_id); - if let Some(active_call) = connected_user.active_call.as_ref() { - let room_id = active_call.room_id; - if let Some(room) = self.rooms.get_mut(&room_id) { - let prev_participant_count = room.participants.len(); - room.participants - .retain(|participant| participant.peer_id != connection_id.0); - if prev_participant_count == room.participants.len() { - if connected_user.connection_ids.is_empty() { - room.pending_participant_user_ids - .retain(|pending_user_id| *pending_user_id != user_id.to_proto()); - result.room_id = Some(room_id); - connected_user.active_call = None; - } - } else { - result.room_id = Some(room_id); - connected_user.active_call = None; - } - - if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() { - self.rooms.remove(&room_id); - } - } else { - tracing::error!("disconnected user claims to be in a room that does not exist"); - connected_user.active_call = None; - } - } - if connected_user.connection_ids.is_empty() { self.connected_users.remove(&user_id); } - self.connections.remove(&connection_id).unwrap(); Ok(result) @@ -491,6 +465,31 @@ impl Store { .ok_or_else(|| anyhow!("no such room"))?; room.participants .retain(|participant| participant.peer_id != connection_id.0); + + let mut canceled_call_connection_ids = Vec::new(); + room.pending_participant_user_ids + .retain(|pending_participant_user_id| { + if let Some(connected_user) = self + .connected_users + .get_mut(&UserId::from_proto(*pending_participant_user_id)) + { + if let Some(call) = connected_user.active_call.as_ref() { + if call.caller_user_id == user_id { + connected_user.active_call.take(); + canceled_call_connection_ids + .extend(connected_user.connection_ids.iter().copied()); + false + } else { + true + } + } else { + true + } + } else { + true + } + }); + if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() { self.rooms.remove(&room_id); } @@ -499,6 +498,7 @@ impl Store { room: self.rooms.get(&room_id), unshared_projects, left_projects, + canceled_call_connection_ids, }) }