diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 2204a0319c..e244757970 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -61,8 +61,10 @@ pub use store::{Store, Worktree}; lazy_static! { static ref METRIC_CONNECTIONS: IntGauge = register_int_gauge!("connections", "number of connections").unwrap(); - static ref METRIC_PROJECTS: IntGauge = - register_int_gauge!("projects", "number of open projects").unwrap(); + static ref METRIC_REGISTERED_PROJECTS: IntGauge = + register_int_gauge!("registered_projects", "number of registered projects").unwrap(); + static ref METRIC_ACTIVE_PROJECTS: IntGauge = + register_int_gauge!("active_projects", "number of active projects").unwrap(); static ref METRIC_SHARED_PROJECTS: IntGauge = register_int_gauge!( "shared_projects", "number of open projects with one or more guests" @@ -159,6 +161,7 @@ impl Server { .add_message_handler(Server::leave_project) .add_message_handler(Server::respond_to_join_project_request) .add_message_handler(Server::update_project) + .add_message_handler(Server::register_project_activity) .add_request_handler(Server::update_worktree) .add_message_handler(Server::start_language_server) .add_message_handler(Server::update_language_server) @@ -329,7 +332,7 @@ impl Server { { let mut store = this.store_mut().await; - store.add_connection(connection_id, user_id); + store.add_connection(connection_id, user_id, user.admin); this.peer.send(connection_id, store.build_initial_contacts_update(contacts))?; if let Some((code, count)) = invite_code { @@ -844,6 +847,16 @@ impl Server { Ok(()) } + async fn register_project_activity( + self: Arc, + request: TypedEnvelope, + ) -> Result<()> { + self.store_mut() + .await + .register_project_activity(request.payload.project_id, request.sender_id)?; + Ok(()) + } + async fn update_worktree( self: Arc, request: TypedEnvelope, @@ -1001,10 +1014,12 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = { + let mut store = self.store_mut().await; + store.register_project_activity(request.payload.project_id, request.sender_id)?; + store.project_connection_ids(request.payload.project_id, request.sender_id)? + }; + broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -1065,14 +1080,18 @@ impl Server { ) -> Result<()> { let leader_id = ConnectionId(request.payload.leader_id); let follower_id = request.sender_id; - if !self - .store() - .await - .project_connection_ids(request.payload.project_id, follower_id)? - .contains(&leader_id) { - Err(anyhow!("no such peer"))?; + let mut store = self.store_mut().await; + if !store + .project_connection_ids(request.payload.project_id, follower_id)? + .contains(&leader_id) + { + Err(anyhow!("no such peer"))?; + } + + store.register_project_activity(request.payload.project_id, follower_id)?; } + let mut response_payload = self .peer .forward_request(request.sender_id, leader_id, request.payload) @@ -1086,14 +1105,14 @@ impl Server { async fn unfollow(self: Arc, request: TypedEnvelope) -> Result<()> { let leader_id = ConnectionId(request.payload.leader_id); - if !self - .store() - .await + let mut store = self.store_mut().await; + if !store .project_connection_ids(request.payload.project_id, request.sender_id)? .contains(&leader_id) { Err(anyhow!("no such peer"))?; } + store.register_project_activity(request.payload.project_id, request.sender_id)?; self.peer .forward_send(request.sender_id, leader_id, request.payload)?; Ok(()) @@ -1103,10 +1122,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let connection_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let mut store = self.store_mut().await; + store.register_project_activity(request.payload.project_id, request.sender_id)?; + let connection_ids = + store.project_connection_ids(request.payload.project_id, request.sender_id)?; let leader_id = request .payload .variant @@ -1574,12 +1593,14 @@ impl<'a> Drop for StoreWriteGuard<'a> { let metrics = self.metrics(); METRIC_CONNECTIONS.set(metrics.connections as _); - METRIC_PROJECTS.set(metrics.registered_projects as _); + METRIC_REGISTERED_PROJECTS.set(metrics.registered_projects as _); + METRIC_ACTIVE_PROJECTS.set(metrics.active_projects as _); METRIC_SHARED_PROJECTS.set(metrics.shared_projects as _); tracing::info!( connections = metrics.connections, registered_projects = metrics.registered_projects, + active_projects = metrics.active_projects, shared_projects = metrics.shared_projects, "metrics" ); @@ -1649,10 +1670,10 @@ pub fn routes(server: Arc) -> Router { .layer( ServiceBuilder::new() .layer(Extension(server.app_state.clone())) - .layer(middleware::from_fn(auth::validate_header)) - .layer(Extension(server)), + .layer(middleware::from_fn(auth::validate_header)), ) .route("/metrics", get(handle_metrics)) + .layer(Extension(server)) } pub async fn handle_websocket_request( @@ -1686,7 +1707,10 @@ pub async fn handle_websocket_request( }) } -pub async fn handle_metrics() -> axum::response::Response { +pub async fn handle_metrics(Extension(server): Extension>) -> axum::response::Response { + // We call `store_mut` here for its side effects of updating metrics. + server.store_mut().await; + let encoder = prometheus::TextEncoder::new(); let metric_families = prometheus::gather(); match encoder.encode_to_string(&metric_families) { diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index d929078dc5..9fe9162edd 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -9,6 +9,7 @@ use std::{ mem, path::{Path, PathBuf}, str, + time::{Duration, Instant}, }; use tracing::instrument; @@ -25,6 +26,7 @@ pub struct Store { #[derive(Serialize)] struct ConnectionState { user_id: UserId, + admin: bool, projects: HashSet, requested_projects: HashSet, channels: HashSet, @@ -40,6 +42,8 @@ pub struct Project { pub active_replica_ids: HashSet, pub worktrees: BTreeMap, pub language_servers: Vec, + #[serde(skip)] + last_activity: Option, } #[derive(Default, Serialize)] @@ -83,34 +87,45 @@ pub struct LeftProject { pub struct Metrics { pub connections: usize, pub registered_projects: usize, + pub active_projects: usize, pub shared_projects: usize, } impl Store { pub fn metrics(&self) -> Metrics { - let connections = self.connections.len(); + let connections = self.connections.values().filter(|c| !c.admin).count(); let mut registered_projects = 0; + let mut active_projects = 0; let mut shared_projects = 0; for project in self.projects.values() { - registered_projects += 1; - if !project.guests.is_empty() { - shared_projects += 1; + if let Some(connection) = self.connections.get(&project.host_connection_id) { + if !connection.admin { + registered_projects += 1; + if project.is_active() { + active_projects += 1; + if !project.guests.is_empty() { + shared_projects += 1; + } + } + } } } Metrics { connections, registered_projects, + active_projects, shared_projects, } } #[instrument(skip(self))] - pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId) { + pub fn add_connection(&mut self, connection_id: ConnectionId, user_id: UserId, admin: bool) { self.connections.insert( connection_id, ConnectionState { user_id, + admin, projects: Default::default(), requested_projects: Default::default(), channels: Default::default(), @@ -312,6 +327,7 @@ impl Store { active_replica_ids: Default::default(), worktrees: Default::default(), language_servers: Default::default(), + last_activity: None, }, ); if let Some(connection) = self.connections.get_mut(&host_connection_id) { @@ -454,6 +470,7 @@ impl Store { .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; connection.requested_projects.insert(project_id); + project.last_activity = Some(Instant::now()); project .join_requests .entry(requester_id) @@ -478,6 +495,8 @@ impl Store { let requester_connection = self.connections.get_mut(&receipt.sender_id)?; requester_connection.requested_projects.remove(&project_id); } + project.last_activity = Some(Instant::now()); + Some(receipts) } @@ -509,6 +528,7 @@ impl Store { receipts_with_replica_ids.push((receipt, replica_id)); } + project.last_activity = Some(Instant::now()); Some((receipts_with_replica_ids, project)) } @@ -559,6 +579,8 @@ impl Store { } } + project.last_activity = Some(Instant::now()); + Ok(LeftProject { host_connection_id: project.host_connection_id, host_user_id: project.host_user_id, @@ -647,6 +669,15 @@ impl Store { .ok_or_else(|| anyhow!("no such project")) } + pub fn register_project_activity( + &mut self, + project_id: u64, + connection_id: ConnectionId, + ) -> Result<()> { + self.write_project(project_id, connection_id)?.last_activity = Some(Instant::now()); + Ok(()) + } + pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> { let project = self .projects @@ -752,6 +783,13 @@ impl Store { } impl Project { + fn is_active(&self) -> bool { + const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60); + self.last_activity.map_or(false, |last_activity| { + last_activity.elapsed() < ACTIVE_PROJECT_TIMEOUT + }) + } + pub fn guest_connection_ids(&self) -> Vec { self.guests.keys().copied().collect() } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 9b38665ca3..7c291cade7 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1780,6 +1780,10 @@ impl Project { operations: vec![language::proto::serialize_operation(&operation)], }); cx.background().spawn(request).detach_and_log_err(cx); + } else if let Some(project_id) = self.remote_id() { + let _ = self + .client + .send(proto::RegisterProjectActivity { project_id }); } } BufferEvent::Edited { .. } => { diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index ff52fb9e8d..ed05ed7b43 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -36,6 +36,7 @@ message Envelope { OpenBufferForSymbolResponse open_buffer_for_symbol_response = 29; UpdateProject update_project = 30; + RegisterProjectActivity register_project_activity = 31; UpdateWorktree update_worktree = 32; CreateProjectEntry create_project_entry = 33; @@ -135,6 +136,10 @@ message UpdateProject { repeated WorktreeMetadata worktrees = 2; } +message RegisterProjectActivity { + uint64 project_id = 1; +} + message RequestJoinProject { uint64 requester_id = 1; uint64 project_id = 2; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 6343c05cea..ecee370986 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -134,6 +134,7 @@ messages!( (Ping, Foreground), (ProjectUnshared, Foreground), (RegisterProject, Foreground), + (RegisterProjectActivity, Foreground), (ReloadBuffers, Foreground), (ReloadBuffersResponse, Foreground), (RemoveProjectCollaborator, Foreground), @@ -236,6 +237,7 @@ entity_messages!( PerformRename, PrepareRename, ProjectUnshared, + RegisterProjectActivity, ReloadBuffers, RemoveProjectCollaborator, RenameProjectEntry,