From 674fddac875d4877976ab95767ffe982fbaf5ff8 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 14 Dec 2022 11:42:12 +0100 Subject: [PATCH] Instrument `rpc::Server::start` and reduce cleanup timeout again --- crates/collab/src/rpc.rs | 146 ++++++++++++++++++++++----------------- 1 file changed, 83 insertions(+), 63 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index e1b0056226..70802fa431 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -58,7 +58,7 @@ use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(5); -pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(20); +pub const CLEANUP_TIMEOUT: Duration = Duration::from_secs(10); lazy_static! { static ref METRIC_CONNECTIONS: IntGauge = @@ -239,88 +239,108 @@ impl Server { } pub async fn start(&self) -> Result<()> { - self.app_state.db.delete_stale_projects().await?; let db = self.app_state.db.clone(); let peer = self.peer.clone(); let timeout = self.executor.sleep(CLEANUP_TIMEOUT); let pool = self.connection_pool.clone(); let live_kit_client = self.app_state.live_kit_client.clone(); - self.executor.spawn_detached(async move { - timeout.await; - if let Some(room_ids) = db.stale_room_ids().await.trace_err() { - for room_id in room_ids { - let mut contacts_to_update = HashSet::default(); - let mut canceled_calls_to_user_ids = Vec::new(); - let mut live_kit_room = String::new(); - let mut delete_live_kit_room = false; - if let Ok(mut refreshed_room) = db.refresh_room(room_id).await { - room_updated(&refreshed_room.room, &peer); - contacts_to_update - .extend(refreshed_room.stale_participant_user_ids.iter().copied()); - contacts_to_update - .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied()); - canceled_calls_to_user_ids = - mem::take(&mut refreshed_room.canceled_calls_to_user_ids); - live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room); - delete_live_kit_room = refreshed_room.room.participants.is_empty(); - } + let span = info_span!("start server"); + let span_enter = span.enter(); - { - let pool = pool.lock(); - for canceled_user_id in canceled_calls_to_user_ids { - for connection_id in pool.user_connection_ids(canceled_user_id) { - peer.send( - connection_id, - proto::CallCanceled { - room_id: room_id.to_proto(), - }, - ) - .trace_err(); + tracing::info!("begin deleting stale projects"); + self.app_state.db.delete_stale_projects().await?; + tracing::info!("finish deleting stale projects"); + + drop(span_enter); + self.executor.spawn_detached( + async move { + tracing::info!("waiting for cleanup timeout"); + timeout.await; + tracing::info!("cleanup timeout expired, retrieving stale rooms"); + if let Some(room_ids) = db.stale_room_ids().await.trace_err() { + tracing::info!(stale_room_count = room_ids.len(), "retrieved stale rooms"); + for room_id in room_ids { + let mut contacts_to_update = HashSet::default(); + let mut canceled_calls_to_user_ids = Vec::new(); + let mut live_kit_room = String::new(); + let mut delete_live_kit_room = false; + + if let Ok(mut refreshed_room) = db.refresh_room(room_id).await { + tracing::info!( + room_id = room_id.0, + new_participant_count = refreshed_room.room.participants.len(), + "refreshed room" + ); + room_updated(&refreshed_room.room, &peer); + contacts_to_update + .extend(refreshed_room.stale_participant_user_ids.iter().copied()); + contacts_to_update + .extend(refreshed_room.canceled_calls_to_user_ids.iter().copied()); + canceled_calls_to_user_ids = + mem::take(&mut refreshed_room.canceled_calls_to_user_ids); + live_kit_room = mem::take(&mut refreshed_room.room.live_kit_room); + delete_live_kit_room = refreshed_room.room.participants.is_empty(); + } + + { + let pool = pool.lock(); + for canceled_user_id in canceled_calls_to_user_ids { + for connection_id in pool.user_connection_ids(canceled_user_id) { + peer.send( + connection_id, + proto::CallCanceled { + room_id: room_id.to_proto(), + }, + ) + .trace_err(); + } } } - } - for user_id in contacts_to_update { - let busy = db.is_user_busy(user_id).await.trace_err(); - let contacts = db.get_contacts(user_id).await.trace_err(); - if let Some((busy, contacts)) = busy.zip(contacts) { - let pool = pool.lock(); - let updated_contact = contact_for_user(user_id, false, busy, &pool); - for contact in contacts { - if let db::Contact::Accepted { - user_id: contact_user_id, - .. - } = contact - { - for contact_conn_id in pool.user_connection_ids(contact_user_id) + for user_id in contacts_to_update { + let busy = db.is_user_busy(user_id).await.trace_err(); + let contacts = db.get_contacts(user_id).await.trace_err(); + if let Some((busy, contacts)) = busy.zip(contacts) { + let pool = pool.lock(); + let updated_contact = contact_for_user(user_id, false, busy, &pool); + for contact in contacts { + if let db::Contact::Accepted { + user_id: contact_user_id, + .. + } = contact { - peer.send( - contact_conn_id, - proto::UpdateContacts { - contacts: vec![updated_contact.clone()], - remove_contacts: Default::default(), - incoming_requests: Default::default(), - remove_incoming_requests: Default::default(), - outgoing_requests: Default::default(), - remove_outgoing_requests: Default::default(), - }, - ) - .trace_err(); + for contact_conn_id in + pool.user_connection_ids(contact_user_id) + { + peer.send( + contact_conn_id, + proto::UpdateContacts { + contacts: vec![updated_contact.clone()], + remove_contacts: Default::default(), + incoming_requests: Default::default(), + remove_incoming_requests: Default::default(), + outgoing_requests: Default::default(), + remove_outgoing_requests: Default::default(), + }, + ) + .trace_err(); + } } } } } - } - if let Some(live_kit) = live_kit_client.as_ref() { - if delete_live_kit_room { - live_kit.delete_room(live_kit_room).await.trace_err(); + if let Some(live_kit) = live_kit_client.as_ref() { + if delete_live_kit_room { + live_kit.delete_room(live_kit_room).await.trace_err(); + } } } } } - }); + .instrument(span), + ); Ok(()) }