diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 44a3653c5d..39661a5d4b 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -50,7 +50,7 @@ pub struct Room { user_store: ModelHandle, subscriptions: Vec, pending_room_update: Option>, - _maintain_connection: Task>, + maintain_connection: Option>>, } impl Entity for Room { @@ -121,7 +121,7 @@ impl Room { None }; - let _maintain_connection = + let maintain_connection = cx.spawn_weak(|this, cx| Self::maintain_connection(this, client.clone(), cx)); Self { @@ -138,7 +138,7 @@ impl Room { pending_room_update: None, client, user_store, - _maintain_connection, + maintain_connection: Some(maintain_connection), } } @@ -235,6 +235,8 @@ impl Room { self.participant_user_ids.clear(); self.subscriptions.clear(); self.live_kit.take(); + self.pending_room_update.take(); + self.maintain_connection.take(); self.client.send(proto::LeaveRoom {})?; Ok(()) } diff --git a/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql b/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql index ed0cf972bc..74eb53e8c3 100644 --- a/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql +++ b/crates/collab/migrations/20221207165001_add_connection_lost_to_room_participants.sql @@ -3,5 +3,6 @@ ALTER TABLE "room_participants" CREATE INDEX "index_project_collaborators_on_connection_id" ON "project_collaborators" ("connection_id"); CREATE UNIQUE INDEX "index_project_collaborators_on_project_id_connection_id_and_epoch" ON "project_collaborators" ("project_id", "connection_id", "connection_epoch"); +CREATE INDEX "index_room_participants_on_room_id_id" ON "room_participants" ("room_id"); CREATE INDEX "index_room_participants_on_answering_connection_id" ON "room_participants" ("answering_connection_id"); CREATE UNIQUE INDEX "index_room_participants_on_answering_connection_id_and_answering_connection_epoch" ON "room_participants" ("answering_connection_id", "answering_connection_epoch"); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 64a95b2300..999c87f9ee 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -131,29 +131,70 @@ impl Database { .await } - pub async fn delete_stale_rooms(&self) -> Result<()> { + pub async fn outdated_room_ids(&self) -> Result> { self.transaction(|tx| async move { + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] + enum QueryAs { + RoomId, + } + + Ok(room_participant::Entity::find() + .select_only() + .column(room_participant::Column::RoomId) + .distinct() + .filter(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch())) + .into_values::<_, QueryAs>() + .all(&*tx) + .await?) + }) + .await + } + + pub async fn refresh_room(&self, room_id: RoomId) -> Result> { + self.room_transaction(|tx| async move { + let stale_participant_filter = Condition::all() + .add(room_participant::Column::RoomId.eq(room_id)) + .add(room_participant::Column::AnsweringConnectionId.is_not_null()) + .add(room_participant::Column::AnsweringConnectionEpoch.ne(self.epoch())); + + let stale_participant_user_ids = room_participant::Entity::find() + .filter(stale_participant_filter.clone()) + .all(&*tx) + .await? + .into_iter() + .map(|participant| participant.user_id) + .collect::>(); + + // Delete participants who failed to reconnect. room_participant::Entity::delete_many() - .filter( - room_participant::Column::AnsweringConnectionEpoch - .ne(self.epoch()) - .or(room_participant::Column::CallingConnectionEpoch.ne(self.epoch())), - ) + .filter(stale_participant_filter) .exec(&*tx) .await?; - room::Entity::delete_many() - .filter( - room::Column::Id.not_in_subquery( - Query::select() - .column(room_participant::Column::RoomId) - .from(room_participant::Entity) - .distinct() - .to_owned(), - ), - ) - .exec(&*tx) - .await?; - Ok(()) + + let room = self.get_room(room_id, &tx).await?; + let mut canceled_calls_to_user_ids = Vec::new(); + // Delete the room if it becomes empty and cancel pending calls. + if room.participants.is_empty() { + canceled_calls_to_user_ids.extend( + room.pending_participants + .iter() + .map(|pending_participant| UserId::from_proto(pending_participant.user_id)), + ); + room_participant::Entity::delete_many() + .filter(room_participant::Column::RoomId.eq(room_id)) + .exec(&*tx) + .await?; + room::Entity::delete_by_id(room_id).exec(&*tx).await?; + } + + Ok(( + room_id, + RefreshedRoom { + room, + stale_participant_user_ids, + canceled_calls_to_user_ids, + }, + )) }) .await } @@ -2575,6 +2616,12 @@ pub struct LeftRoom { pub canceled_calls_to_user_ids: Vec, } +pub struct RefreshedRoom { + pub room: proto::Room, + pub stale_participant_user_ids: Vec, + pub canceled_calls_to_user_ids: Vec, +} + pub struct Project { pub collaborators: Vec, pub worktrees: BTreeMap, diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 84e7954c33..bd4ad8a580 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -18,10 +18,8 @@ use editor::{ use fs::{FakeFs, Fs as _, HomeDir, LineEnding}; use futures::{channel::oneshot, StreamExt as _}; use gpui::{ - executor::{self, Deterministic}, - geometry::vector::vec2f, - test::EmptyView, - ModelHandle, Task, TestAppContext, ViewHandle, + executor::Deterministic, geometry::vector::vec2f, test::EmptyView, ModelHandle, Task, + TestAppContext, ViewHandle, }; use language::{ range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language, @@ -36,7 +34,7 @@ use serde_json::json; use settings::{Formatter, Settings}; use std::{ cell::{Cell, RefCell}, - env, mem, + env, future, mem, ops::Deref, path::{Path, PathBuf}, rc::Rc, @@ -66,7 +64,7 @@ async fn test_basic_calls( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; @@ -265,7 +263,7 @@ async fn test_room_uniqueness( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let _client_a2 = server.create_client(cx_a2, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; @@ -370,7 +368,7 @@ async fn test_client_disconnecting_from_room( cx_b: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -520,68 +518,295 @@ async fn test_server_restarts( deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, + cx_c: &mut TestAppContext, + cx_d: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; + let client_c = server.create_client(cx_c, "user_c").await; + let client_d = server.create_client(cx_d, "user_d").await; server - .make_contacts(&mut [(&client_a, cx_a), (&client_b, cx_b)]) + .make_contacts(&mut [ + (&client_a, cx_a), + (&client_b, cx_b), + (&client_c, cx_c), + (&client_d, cx_d), + ]) .await; let active_call_a = cx_a.read(ActiveCall::global); let active_call_b = cx_b.read(ActiveCall::global); + let active_call_c = cx_c.read(ActiveCall::global); + let active_call_d = cx_d.read(ActiveCall::global); - // Call user B from client A. + // User A calls users B, C, and D. active_call_a .update(cx_a, |call, cx| { call.invite(client_b.user_id().unwrap(), None, cx) }) .await .unwrap(); + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_c.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + active_call_a + .update(cx_a, |call, cx| { + call.invite(client_d.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); // User B receives the call and joins the room. let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming()); - incoming_call_b.next().await.unwrap().unwrap(); + assert!(incoming_call_b.next().await.unwrap().is_some()); active_call_b .update(cx_b, |call, cx| call.accept_incoming(cx)) .await .unwrap(); let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + + // User C receives the call and joins the room. + let mut incoming_call_c = active_call_c.read_with(cx_c, |call, _| call.incoming()); + assert!(incoming_call_c.next().await.unwrap().is_some()); + active_call_c + .update(cx_c, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + let room_c = active_call_c.read_with(cx_c, |call, _| call.room().unwrap().clone()); + + // User D receives the call but doesn't join the room yet. + let mut incoming_call_d = active_call_d.read_with(cx_d, |call, _| call.incoming()); + assert!(incoming_call_d.next().await.unwrap().is_some()); + + deterministic.run_until_parked(); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string(), "user_c".to_string()], + pending: vec!["user_d".to_string()] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_c".to_string()], + pending: vec!["user_d".to_string()] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_b".to_string()], + pending: vec!["user_d".to_string()] + } + ); + + // The server is torn down. + server.teardown(); + + // Users A and B reconnect to the call. User C has troubles reconnecting, so it leaves the room. + client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); + deterministic.advance_clock(RECEIVE_TIMEOUT); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string(), "user_c".to_string()], + pending: vec!["user_d".to_string()] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_c".to_string()], + pending: vec!["user_d".to_string()] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + + // User D is notified again of the incoming call and accepts it. + assert!(incoming_call_d.next().await.unwrap().is_some()); + active_call_d + .update(cx_d, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + let room_d = active_call_d.read_with(cx_d, |call, _| call.room().unwrap().clone()); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec![ + "user_b".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_c".to_string(), + "user_d".to_string(), + ], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_d, cx_d), + RoomParticipants { + remote: vec![ + "user_a".to_string(), + "user_b".to_string(), + "user_c".to_string(), + ], + pending: vec![] + } + ); + + // The server finishes restarting, cleaning up stale connections. + server.start().await.unwrap(); + deterministic.advance_clock(RECONNECT_TIMEOUT); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string(), "user_d".to_string()], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_d".to_string()], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_d, cx_d), + RoomParticipants { + remote: vec!["user_a".to_string(), "user_b".to_string()], + pending: vec![] + } + ); + + // User D hangs up. + active_call_d + .update(cx_d, |call, cx| call.hang_up(cx)) + .unwrap(); deterministic.run_until_parked(); assert_eq!( room_participants(&room_a, cx_a), RoomParticipants { remote: vec!["user_b".to_string()], - pending: Default::default() + pending: vec![] } ); assert_eq!( room_participants(&room_b, cx_b), RoomParticipants { remote: vec!["user_a".to_string()], - pending: Default::default() + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_c, cx_c), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_d, cx_d), + RoomParticipants { + remote: vec![], + pending: vec![] } ); - // User A automatically reconnects to the room when the server restarts. - server.restart().await; - deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + // User B calls user D again. + active_call_b + .update(cx_b, |call, cx| { + call.invite(client_d.user_id().unwrap(), None, cx) + }) + .await + .unwrap(); + + // User D receives the call but doesn't join the room yet. + let mut incoming_call_d = active_call_d.read_with(cx_d, |call, _| call.incoming()); + assert!(incoming_call_d.next().await.unwrap().is_some()); + deterministic.run_until_parked(); assert_eq!( room_participants(&room_a, cx_a), RoomParticipants { remote: vec!["user_b".to_string()], - pending: Default::default() + pending: vec!["user_d".to_string()] } ); assert_eq!( room_participants(&room_b, cx_b), RoomParticipants { remote: vec!["user_a".to_string()], - pending: Default::default() + pending: vec!["user_d".to_string()] } ); + + // The server is torn down. + server.teardown(); + + // Users A and B have troubles reconnecting, so they leave the room. + client_a.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); + client_b.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); + client_c.override_establish_connection(|_, cx| cx.spawn(|_| future::pending())); + deterministic.advance_clock(RECEIVE_TIMEOUT); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + assert_eq!( + room_participants(&room_b, cx_b), + RoomParticipants { + remote: vec![], + pending: vec![] + } + ); + + // User D is notified again of the incoming call but doesn't accept it. + assert!(incoming_call_d.next().await.unwrap().is_some()); + + // The server finishes restarting, cleaning up stale connections and canceling the + // call to user D because the room has become empty. + server.start().await.unwrap(); + deterministic.advance_clock(RECONNECT_TIMEOUT); + assert!(incoming_call_d.next().await.unwrap().is_none()); } #[gpui::test(iterations = 10)] @@ -592,7 +817,7 @@ async fn test_calls_on_multiple_connections( cx_b2: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b1 = server.create_client(cx_b1, "user_b").await; let client_b2 = server.create_client(cx_b2, "user_b").await; @@ -744,7 +969,7 @@ async fn test_share_project( ) { deterministic.forbid_parking(); let (_, window_b) = cx_b.add_window(|_| EmptyView); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -881,7 +1106,7 @@ async fn test_unshare_project( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -964,7 +1189,7 @@ async fn test_host_disconnect( ) { cx_b.update(editor::init); deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -1082,7 +1307,7 @@ async fn test_active_call_events( cx_b: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; client_a.fs.insert_tree("/a", json!({})).await; @@ -1171,7 +1396,7 @@ async fn test_room_location( cx_b: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; client_a.fs.insert_tree("/a", json!({})).await; @@ -1337,7 +1562,7 @@ async fn test_propagate_saves_and_fs_changes( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -1507,12 +1732,12 @@ async fn test_propagate_saves_and_fs_changes( #[gpui::test(iterations = 10)] async fn test_git_diff_base_change( - executor: Arc, + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - executor.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -1581,7 +1806,7 @@ async fn test_git_diff_base_change( .unwrap(); // Wait for it to catch up to the new diff - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test diffing buffer_local_a.read_with(cx_a, |buffer, _| { @@ -1601,7 +1826,7 @@ async fn test_git_diff_base_change( .unwrap(); // Wait remote buffer to catch up to the new diff - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test diffing buffer_remote_a.read_with(cx_b, |buffer, _| { @@ -1624,7 +1849,7 @@ async fn test_git_diff_base_change( .await; // Wait for buffer_local_a to receive it - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test new diffing buffer_local_a.read_with(cx_a, |buffer, _| { @@ -1679,7 +1904,7 @@ async fn test_git_diff_base_change( .unwrap(); // Wait for it to catch up to the new diff - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test diffing buffer_local_b.read_with(cx_a, |buffer, _| { @@ -1699,7 +1924,7 @@ async fn test_git_diff_base_change( .unwrap(); // Wait remote buffer to catch up to the new diff - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test diffing buffer_remote_b.read_with(cx_b, |buffer, _| { @@ -1722,7 +1947,7 @@ async fn test_git_diff_base_change( .await; // Wait for buffer_local_b to receive it - executor.run_until_parked(); + deterministic.run_until_parked(); // Smoke test new diffing buffer_local_b.read_with(cx_a, |buffer, _| { @@ -1759,12 +1984,12 @@ async fn test_git_diff_base_change( #[gpui::test(iterations = 10)] async fn test_fs_operations( - executor: Arc, + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - executor.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2028,9 +2253,13 @@ async fn test_fs_operations( } #[gpui::test(iterations = 10)] -async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_buffer_conflict_after_save( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2082,9 +2311,13 @@ async fn test_buffer_conflict_after_save(cx_a: &mut TestAppContext, cx_b: &mut T } #[gpui::test(iterations = 10)] -async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_buffer_reloading( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2139,11 +2372,12 @@ async fn test_buffer_reloading(cx_a: &mut TestAppContext, cx_b: &mut TestAppCont #[gpui::test(iterations = 10)] async fn test_editing_while_guest_opens_buffer( + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2186,11 +2420,12 @@ async fn test_editing_while_guest_opens_buffer( #[gpui::test(iterations = 10)] async fn test_leaving_worktree_while_opening_buffer( + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2235,7 +2470,7 @@ async fn test_canceling_buffer_opening( ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2286,7 +2521,7 @@ async fn test_leaving_project( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -2397,7 +2632,7 @@ async fn test_leaving_project( // Simulate connection loss for client C and ensure client A observes client C leaving the project. client_c.wait_for_current_user(cx_c).await; server.disconnect_client(client_c.peer_id().unwrap()); - cx_a.foreground().advance_clock(RECEIVE_TIMEOUT); + deterministic.advance_clock(RECEIVE_TIMEOUT); deterministic.run_until_parked(); project_a.read_with(cx_a, |project, _| { assert_eq!(project.collaborators().len(), 0); @@ -2418,7 +2653,7 @@ async fn test_collaborating_with_diagnostics( cx_c: &mut TestAppContext, ) { deterministic.forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -2678,9 +2913,13 @@ async fn test_collaborating_with_diagnostics( } #[gpui::test(iterations = 10)] -async fn test_collaborating_with_completion(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_collaborating_with_completion( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2852,9 +3091,13 @@ async fn test_collaborating_with_completion(cx_a: &mut TestAppContext, cx_b: &mu } #[gpui::test(iterations = 10)] -async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_reloading_buffer_manually( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -2944,10 +3187,14 @@ async fn test_reloading_buffer_manually(cx_a: &mut TestAppContext, cx_b: &mut Te } #[gpui::test(iterations = 10)] -async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { +async fn test_formatting_buffer( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { use project::FormatTrigger; - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3046,9 +3293,13 @@ async fn test_formatting_buffer(cx_a: &mut TestAppContext, cx_b: &mut TestAppCon } #[gpui::test(iterations = 10)] -async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_definition( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3190,9 +3441,13 @@ async fn test_definition(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { } #[gpui::test(iterations = 10)] -async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_references( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3291,9 +3546,13 @@ async fn test_references(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { } #[gpui::test(iterations = 10)] -async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_project_search( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3370,9 +3629,13 @@ async fn test_project_search(cx_a: &mut TestAppContext, cx_b: &mut TestAppContex } #[gpui::test(iterations = 10)] -async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_document_highlights( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3472,9 +3735,13 @@ async fn test_document_highlights(cx_a: &mut TestAppContext, cx_b: &mut TestAppC } #[gpui::test(iterations = 10)] -async fn test_lsp_hover(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_lsp_hover( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3575,9 +3842,13 @@ async fn test_lsp_hover(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { } #[gpui::test(iterations = 10)] -async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; +async fn test_project_symbols( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3680,12 +3951,13 @@ async fn test_project_symbols(cx_a: &mut TestAppContext, cx_b: &mut TestAppConte #[gpui::test(iterations = 10)] async fn test_open_buffer_while_getting_definition_pointing_to_it( + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, mut rng: StdRng, ) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3756,12 +4028,13 @@ async fn test_open_buffer_while_getting_definition_pointing_to_it( #[gpui::test(iterations = 10)] async fn test_collaborating_with_code_actions( + deterministic: Arc, cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); + deterministic.forbid_parking(); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -3976,10 +4249,14 @@ async fn test_collaborating_with_code_actions( } #[gpui::test(iterations = 10)] -async fn test_collaborating_with_renames(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); +async fn test_collaborating_with_renames( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -4178,7 +4455,7 @@ async fn test_language_server_statuses( deterministic.forbid_parking(); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -4290,8 +4567,8 @@ async fn test_contacts( cx_c: &mut TestAppContext, cx_d: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); - let mut server = TestServer::start(cx_a.background()).await; + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; let client_c = server.create_client(cx_c, "user_c").await; @@ -4678,7 +4955,7 @@ async fn test_contacts( #[gpui::test(iterations = 10)] async fn test_contact_requests( - executor: Arc, + deterministic: Arc, cx_a: &mut TestAppContext, cx_a2: &mut TestAppContext, cx_b: &mut TestAppContext, @@ -4686,10 +4963,10 @@ async fn test_contact_requests( cx_c: &mut TestAppContext, cx_c2: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); + deterministic.forbid_parking(); // Connect to a server as 3 clients. - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_a2 = server.create_client(cx_a2, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; @@ -4716,7 +4993,7 @@ async fn test_contact_requests( }) .await .unwrap(); - executor.run_until_parked(); + deterministic.run_until_parked(); // All users see the pending request appear in all their clients. assert_eq!( @@ -4748,7 +5025,7 @@ async fn test_contact_requests( disconnect_and_reconnect(&client_a, cx_a).await; disconnect_and_reconnect(&client_b, cx_b).await; disconnect_and_reconnect(&client_c, cx_c).await; - executor.run_until_parked(); + deterministic.run_until_parked(); assert_eq!( client_a.summarize_contacts(cx_a).outgoing_requests, &["user_b"] @@ -4771,7 +5048,7 @@ async fn test_contact_requests( .await .unwrap(); - executor.run_until_parked(); + deterministic.run_until_parked(); // User B sees user A as their contact now in all client, and the incoming request from them is removed. let contacts_b = client_b.summarize_contacts(cx_b); @@ -4793,7 +5070,7 @@ async fn test_contact_requests( disconnect_and_reconnect(&client_a, cx_a).await; disconnect_and_reconnect(&client_b, cx_b).await; disconnect_and_reconnect(&client_c, cx_c).await; - executor.run_until_parked(); + deterministic.run_until_parked(); assert_eq!(client_a.summarize_contacts(cx_a).current, &["user_b"]); assert_eq!(client_b.summarize_contacts(cx_b).current, &["user_a"]); assert_eq!( @@ -4815,7 +5092,7 @@ async fn test_contact_requests( .await .unwrap(); - executor.run_until_parked(); + deterministic.run_until_parked(); // User B doesn't see user C as their contact, and the incoming request from them is removed. let contacts_b = client_b.summarize_contacts(cx_b); @@ -4837,7 +5114,7 @@ async fn test_contact_requests( disconnect_and_reconnect(&client_a, cx_a).await; disconnect_and_reconnect(&client_b, cx_b).await; disconnect_and_reconnect(&client_c, cx_c).await; - executor.run_until_parked(); + deterministic.run_until_parked(); assert_eq!(client_a.summarize_contacts(cx_a).current, &["user_b"]); assert_eq!(client_b.summarize_contacts(cx_b).current, &["user_a"]); assert!(client_b @@ -4866,11 +5143,11 @@ async fn test_following( cx_a: &mut TestAppContext, cx_b: &mut TestAppContext, ) { - cx_a.foreground().forbid_parking(); + deterministic.forbid_parking(); cx_a.update(editor::init); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5147,7 +5424,7 @@ async fn test_following_tab_order( cx_a.update(editor::init); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5262,12 +5539,16 @@ async fn test_following_tab_order( } #[gpui::test(iterations = 10)] -async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); +async fn test_peers_following_each_other( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); cx_a.update(editor::init); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5439,13 +5720,17 @@ async fn test_peers_following_each_other(cx_a: &mut TestAppContext, cx_b: &mut T } #[gpui::test(iterations = 10)] -async fn test_auto_unfollowing(cx_a: &mut TestAppContext, cx_b: &mut TestAppContext) { - cx_a.foreground().forbid_parking(); +async fn test_auto_unfollowing( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); cx_a.update(editor::init); cx_b.update(editor::init); // 2 clients connect to a server. - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5619,7 +5904,7 @@ async fn test_peers_simultaneously_following_each_other( cx_a.update(editor::init); cx_b.update(editor::init); - let mut server = TestServer::start(cx_a.background()).await; + let mut server = TestServer::start(&deterministic).await; let client_a = server.create_client(cx_a, "user_a").await; let client_b = server.create_client(cx_b, "user_b").await; server @@ -5689,7 +5974,7 @@ async fn test_random_collaboration( .map(|i| i.parse().expect("invalid `OPERATIONS` variable")) .unwrap_or(10); - let mut server = TestServer::start(cx.background()).await; + let mut server = TestServer::start(&deterministic).await; let db = server.app_state.db.clone(); let mut available_guests = Vec::new(); @@ -6010,27 +6295,32 @@ struct TestServer { } impl TestServer { - async fn start(background: Arc) -> Self { + async fn start(deterministic: &Arc) -> Self { static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0); let use_postgres = env::var("USE_POSTGRES").ok(); let use_postgres = use_postgres.as_deref(); let test_db = if use_postgres == Some("true") || use_postgres == Some("1") { - TestDb::postgres(background.clone()) + TestDb::postgres(deterministic.build_background()) } else { - TestDb::sqlite(background.clone()) + TestDb::sqlite(deterministic.build_background()) }; let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst); let live_kit_server = live_kit_client::TestServer::create( format!("http://livekit.{}.test", live_kit_server_id), format!("devkey-{}", live_kit_server_id), format!("secret-{}", live_kit_server_id), - background.clone(), + deterministic.build_background(), ) .unwrap(); let app_state = Self::build_app_state(&test_db, &live_kit_server).await; - let server = Server::new(app_state.clone()); + let server = Server::new( + app_state.clone(), + Executor::Deterministic(deterministic.build_background()), + ); server.start().await.unwrap(); + // Advance clock to ensure the server's cleanup task is finished. + deterministic.advance_clock(RECONNECT_TIMEOUT); Self { app_state, server, @@ -6041,12 +6331,9 @@ impl TestServer { } } - async fn restart(&self) { - self.forbid_connections(); + fn teardown(&self) { self.server.teardown(); self.app_state.db.reset(); - self.server.start().await.unwrap(); - self.allow_connections(); } async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient { @@ -6969,16 +7256,19 @@ struct RoomParticipants { } fn room_participants(room: &ModelHandle, cx: &mut TestAppContext) -> RoomParticipants { - room.read_with(cx, |room, _| RoomParticipants { - remote: room + room.read_with(cx, |room, _| { + let mut remote = room .remote_participants() .iter() .map(|(_, participant)| participant.user.github_login.clone()) - .collect(), - pending: room + .collect::>(); + let mut pending = room .pending_participants() .iter() .map(|user| user.github_login.clone()) - .collect(), + .collect::>(); + remote.sort(); + pending.sort(); + RoomParticipants { remote, pending } }) } diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index b9d43cd2ee..7e0f23f5d4 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -2,7 +2,7 @@ pub mod api; pub mod auth; pub mod db; pub mod env; -mod executor; +pub mod executor; #[cfg(test)] mod integration_tests; pub mod rpc; diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index 384789b7c2..710910fe03 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -1,6 +1,6 @@ use anyhow::anyhow; use axum::{routing::get, Router}; -use collab::{db, env, AppState, Config, MigrateConfig, Result}; +use collab::{db, env, executor::Executor, AppState, Config, MigrateConfig, Result}; use db::Database; use std::{ env::args, @@ -56,7 +56,7 @@ async fn main() -> Result<()> { let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port)) .expect("failed to bind TCP listener"); - let rpc_server = collab::rpc::Server::new(state.clone()); + let rpc_server = collab::rpc::Server::new(state.clone(), Executor::Production); rpc_server.start().await?; let app = collab::api::routes(rpc_server.clone(), state.clone()) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 18bd96c536..6b127e2432 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -142,6 +142,7 @@ pub struct Server { peer: Arc, pub(crate) connection_pool: Arc>, app_state: Arc, + executor: Executor, handlers: HashMap, teardown: watch::Sender<()>, } @@ -168,10 +169,11 @@ where } impl Server { - pub fn new(app_state: Arc) -> Arc { + pub fn new(app_state: Arc, executor: Executor) -> Arc { let mut server = Self { peer: Peer::new(), app_state, + executor, connection_pool: Default::default(), handlers: Default::default(), teardown: watch::channel(()).0, @@ -239,8 +241,85 @@ impl Server { pub async fn start(&self) -> Result<()> { self.app_state.db.delete_stale_projects().await?; - // TODO: delete stale rooms after timeout. - // self.app_state.db.delete_stale_rooms().await?; + let db = self.app_state.db.clone(); + let peer = self.peer.clone(); + let timeout = self.executor.sleep(RECONNECT_TIMEOUT); + let pool = self.connection_pool.clone(); + let live_kit_client = self.app_state.live_kit_client.clone(); + self.executor.spawn_detached(async move { + timeout.await; + if let Some(room_ids) = db.outdated_room_ids().await.trace_err() { + for room_id in room_ids { + let mut contacts_to_update = HashSet::default(); + let mut canceled_calls_to_user_ids = Vec::new(); + let mut live_kit_room = String::new(); + let mut delete_live_kit_room = false; + + if let Ok(mut refreshed_room) = db.refresh_room(room_id).await { + room_updated(&refreshed_room.room, &peer); + contacts_to_update + .extend(refreshed_room.stale_participant_user_ids.iter().copied()); + contacts_to_update + .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied()); + canceled_calls_to_user_ids = + mem::take(&mut refreshed_room.canceled_calls_to_user_ids); + dbg!(&canceled_calls_to_user_ids); + live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room); + delete_live_kit_room = refreshed_room.room.participants.is_empty(); + } + + { + let pool = pool.lock().await; + for canceled_user_id in canceled_calls_to_user_ids { + for connection_id in pool.user_connection_ids(canceled_user_id) { + peer.send(connection_id, proto::CallCanceled {}).trace_err(); + } + } + } + + for user_id in contacts_to_update { + if let Some((busy, contacts)) = db + .is_user_busy(user_id) + .await + .trace_err() + .zip(db.get_contacts(user_id).await.trace_err()) + { + let pool = pool.lock().await; + let updated_contact = contact_for_user(user_id, false, busy, &pool); + for contact in contacts { + if let db::Contact::Accepted { + user_id: contact_user_id, + .. + } = contact + { + for contact_conn_id in pool.user_connection_ids(contact_user_id) + { + peer.send( + contact_conn_id, + proto::UpdateContacts { + contacts: vec![updated_contact.clone()], + remove_contacts: Default::default(), + incoming_requests: Default::default(), + remove_incoming_requests: Default::default(), + outgoing_requests: Default::default(), + remove_outgoing_requests: Default::default(), + }, + ) + .trace_err(); + } + } + } + } + } + + if let Some(live_kit) = live_kit_client.as_ref() { + if delete_live_kit_room { + live_kit.delete_room(live_kit_room).await.trace_err(); + } + } + } + } + }); Ok(()) } @@ -690,7 +769,7 @@ async fn sign_out( { let db = session.db().await; if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() { - room_updated(&room, &session); + room_updated(&room, &session.peer); } } update_user_contacts(session.user_id, &session).await?; @@ -768,7 +847,7 @@ async fn join_room( session.connection_id, ) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); room.clone() }; @@ -843,7 +922,7 @@ async fn call( initial_project_id, ) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); mem::take(incoming_call) }; update_user_contacts(called_user_id, &session).await?; @@ -873,7 +952,7 @@ async fn call( .await .call_failed(room_id, called_user_id) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); } update_user_contacts(called_user_id, &session).await?; @@ -893,7 +972,7 @@ async fn cancel_call( .await .cancel_call(Some(room_id), session.connection_id, called_user_id) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); } for connection_id in session @@ -920,7 +999,7 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<( .await .decline_call(Some(room_id), session.user_id) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); } for connection_id in session @@ -951,7 +1030,7 @@ async fn update_participant_location( .await .update_room_participant_location(room_id, session.connection_id, location) .await?; - room_updated(&room, &session); + room_updated(&room, &session.peer); response.send(proto::Ack {})?; Ok(()) } @@ -973,7 +1052,7 @@ async fn share_project( response.send(proto::ShareProjectResponse { project_id: project_id.to_proto(), })?; - room_updated(&room, &session); + room_updated(&room, &session.peer); Ok(()) } @@ -992,7 +1071,7 @@ async fn unshare_project(message: proto::UnshareProject, session: Session) -> Re guest_connection_ids.iter().copied(), |conn_id| session.peer.send(conn_id, message.clone()), ); - room_updated(&room, &session); + room_updated(&room, &session.peer); Ok(()) } @@ -1151,7 +1230,7 @@ async fn update_project( .forward_send(session.connection_id, connection_id, request.clone()) }, ); - room_updated(&room, &session); + room_updated(&room, &session.peer); response.send(proto::Ack {})?; Ok(()) @@ -1798,17 +1877,15 @@ fn contact_for_user( } } -fn room_updated(room: &proto::Room, session: &Session) { +fn room_updated(room: &proto::Room, peer: &Peer) { for participant in &room.participants { - session - .peer - .send( - ConnectionId(participant.peer_id), - proto::RoomUpdated { - room: Some(room.clone()), - }, - ) - .trace_err(); + peer.send( + ConnectionId(participant.peer_id), + proto::RoomUpdated { + room: Some(room.clone()), + }, + ) + .trace_err(); } } @@ -1860,7 +1937,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { project_left(project, session); } - room_updated(&left_room.room, &session); + room_updated(&left_room.room, &session.peer); canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids); live_kit_room = mem::take(&mut left_room.room.live_kit_room); delete_live_kit_room = left_room.room.participants.is_empty();