From 44160869eb38f5b5f0f824dbd151aeee243b8c47 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Mon, 20 Jun 2022 19:39:48 -0700 Subject: [PATCH] Add an API that returns the most active users and the projects where they've been active --- ...211403_create_project_activity_periods.sql | 9 + crates/collab/src/api.rs | 22 +- crates/collab/src/db.rs | 219 +++++++++++++++++- crates/collab/src/integration_tests.rs | 2 +- crates/collab/src/main.rs | 3 + crates/collab/src/rpc.rs | 67 +++++- crates/collab/src/rpc/store.rs | 97 +++++--- 7 files changed, 376 insertions(+), 43 deletions(-) create mode 100644 crates/collab/migrations/20220620211403_create_project_activity_periods.sql diff --git a/crates/collab/migrations/20220620211403_create_project_activity_periods.sql b/crates/collab/migrations/20220620211403_create_project_activity_periods.sql new file mode 100644 index 0000000000..a676d14444 --- /dev/null +++ b/crates/collab/migrations/20220620211403_create_project_activity_periods.sql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS "project_activity_periods" ( + "id" SERIAL PRIMARY KEY, + "duration_millis" INTEGER NOT NULL, + "ended_at" TIMESTAMP NOT NULL, + "user_id" INTEGER REFERENCES users (id) NOT NULL, + "project_id" INTEGER +); + +CREATE INDEX "index_project_activity_periods_on_ended_at" ON "project_activity_periods" ("ended_at"); diff --git a/crates/collab/src/api.rs b/crates/collab/src/api.rs index 993e32e445..799e1ffc14 100644 --- a/crates/collab/src/api.rs +++ b/crates/collab/src/api.rs @@ -16,7 +16,8 @@ use axum::{ }; use axum_extra::response::ErasedJson; use serde::{Deserialize, Serialize}; -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; +use time::OffsetDateTime; use tower::ServiceBuilder; use tracing::instrument; @@ -32,6 +33,10 @@ pub fn routes(rpc_server: &Arc, state: Arc) -> Router, + Extension(app): Extension>, +) -> Result { + let end = OffsetDateTime::now_utc(); + let start = end - Duration::from_secs(params.duration_secs); + let summary = app.db.summarize_project_activity(start..end, 100).await?; + Ok(ErasedJson::pretty(summary)) +} + #[derive(Deserialize)] struct CreateAccessTokenQueryParams { public_key: String, diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index c1d5506491..c40149ffed 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1,3 +1,5 @@ +use std::{ops::Range, time::Duration}; + use crate::{Error, Result}; use anyhow::{anyhow, Context}; use async_trait::async_trait; @@ -37,6 +39,22 @@ pub trait Db: Send + Sync { email_address: Option<&str>, ) -> Result; + /// Record which users have been active in which projects during + /// a given period of time. + async fn record_project_activity( + &self, + time_period: Range, + active_projects: &[(UserId, u64)], + ) -> Result<()>; + + /// Get the users that have been most active during the given time period, + /// along with the amount of time they have been active in each project. + async fn summarize_project_activity( + &self, + time_period: Range, + max_user_count: usize, + ) -> Result>; + async fn get_contacts(&self, id: UserId) -> Result>; async fn has_contact(&self, user_id_a: UserId, user_id_b: UserId) -> Result; async fn send_contact_request(&self, requester_id: UserId, responder_id: UserId) -> Result<()>; @@ -150,7 +168,7 @@ impl Db for PostgresDb { .fetch_all(&self.pool) .await?) } - + async fn create_users(&self, users: Vec<(String, String, usize)>) -> Result> { let mut query = QueryBuilder::new( "INSERT INTO users (github_login, email_address, admin, invite_code, invite_count)", @@ -411,6 +429,92 @@ impl Db for PostgresDb { Ok(invitee_id) } + // project activity + + async fn record_project_activity( + &self, + time_period: Range, + projects: &[(UserId, u64)], + ) -> Result<()> { + let query = " + INSERT INTO project_activity_periods + (ended_at, duration_millis, user_id, project_id) + VALUES + ($1, $2, $3, $4); + "; + + let mut tx = self.pool.begin().await?; + let duration_millis = + ((time_period.end - time_period.start).as_seconds_f64() * 1000.0) as i32; + for (user_id, project_id) in projects { + sqlx::query(query) + .bind(time_period.end) + .bind(duration_millis) + .bind(user_id) + .bind(*project_id as i32) + .execute(&mut tx) + .await?; + } + tx.commit().await?; + + Ok(()) + } + + async fn summarize_project_activity( + &self, + time_period: Range, + max_user_count: usize, + ) -> Result> { + let query = " + WITH + project_durations AS ( + SELECT user_id, project_id, SUM(duration_millis) AS project_duration + FROM project_activity_periods + WHERE $1 <= ended_at AND ended_at <= $2 + GROUP BY user_id, project_id + ), + user_durations AS ( + SELECT user_id, SUM(project_duration) as total_duration + FROM project_durations + GROUP BY user_id + ORDER BY total_duration DESC + LIMIT $3 + ) + SELECT user_durations.user_id, users.github_login, project_id, project_duration + FROM user_durations, project_durations, users + WHERE + user_durations.user_id = project_durations.user_id AND + user_durations.user_id = users.id + ORDER BY user_id ASC, project_duration DESC + "; + + let mut rows = sqlx::query_as::<_, (UserId, String, i32, i64)>(query) + .bind(time_period.start) + .bind(time_period.end) + .bind(max_user_count as i32) + .fetch(&self.pool); + + let mut result = Vec::::new(); + while let Some(row) = rows.next().await { + let (user_id, github_login, project_id, duration_millis) = row?; + let project_id = project_id as u64; + let duration = Duration::from_millis(duration_millis as u64); + if let Some(last_summary) = result.last_mut() { + if last_summary.id == user_id { + last_summary.project_activity.push((project_id, duration)); + continue; + } + } + result.push(UserActivitySummary { + id: user_id, + project_activity: vec![(project_id, duration)], + github_login, + }); + } + + Ok(result) + } + // contacts async fn get_contacts(&self, user_id: UserId) -> Result> { @@ -927,6 +1031,13 @@ pub struct User { pub connected_once: bool, } +#[derive(Clone, Debug, PartialEq, Serialize)] +pub struct UserActivitySummary { + pub id: UserId, + pub github_login: String, + pub project_activity: Vec<(u64, Duration)>, +} + id_type!(OrgId); #[derive(FromRow)] pub struct Org { @@ -1125,6 +1236,94 @@ pub mod tests { assert_ne!(invite_code_4, invite_code_3); } + #[tokio::test(flavor = "multi_thread")] + async fn test_project_activity() { + let test_db = TestDb::postgres().await; + let db = test_db.db(); + + let user_1 = db.create_user("user_1", None, false).await.unwrap(); + let user_2 = db.create_user("user_2", None, false).await.unwrap(); + let user_3 = db.create_user("user_3", None, false).await.unwrap(); + let project_1 = 101; + let project_2 = 102; + let t0 = OffsetDateTime::now_utc() - Duration::from_secs(60 * 60); + + // User 2 opens a project + let t1 = t0 + Duration::from_secs(10); + db.record_project_activity(t0..t1, &[(user_2, project_2)]) + .await + .unwrap(); + + let t2 = t1 + Duration::from_secs(10); + db.record_project_activity(t1..t2, &[(user_2, project_2)]) + .await + .unwrap(); + + // User 1 joins the project + let t3 = t2 + Duration::from_secs(10); + db.record_project_activity(t2..t3, &[(user_2, project_2), (user_1, project_2)]) + .await + .unwrap(); + + // User 1 opens another project + let t4 = t3 + Duration::from_secs(10); + db.record_project_activity( + t3..t4, + &[ + (user_2, project_2), + (user_1, project_2), + (user_1, project_1), + ], + ) + .await + .unwrap(); + + // User 3 joins that project + let t5 = t4 + Duration::from_secs(10); + db.record_project_activity( + t4..t5, + &[ + (user_2, project_2), + (user_1, project_2), + (user_1, project_1), + (user_3, project_1), + ], + ) + .await + .unwrap(); + + // User 2 leaves + let t6 = t5 + Duration::from_secs(5); + db.record_project_activity(t5..t6, &[(user_1, project_1), (user_3, project_1)]) + .await + .unwrap(); + + let summary = db.summarize_project_activity(t0..t6, 10).await.unwrap(); + assert_eq!( + summary, + &[ + UserActivitySummary { + id: user_1, + github_login: "user_1".to_string(), + project_activity: vec![ + (project_2, Duration::from_secs(30)), + (project_1, Duration::from_secs(25)) + ] + }, + UserActivitySummary { + id: user_2, + github_login: "user_2".to_string(), + project_activity: vec![(project_2, Duration::from_secs(50))] + }, + UserActivitySummary { + id: user_3, + github_login: "user_3".to_string(), + project_activity: vec![(project_1, Duration::from_secs(15))] + }, + ] + ); + } + #[tokio::test(flavor = "multi_thread")] async fn test_recent_channel_messages() { for test_db in [ @@ -1841,6 +2040,24 @@ pub mod tests { unimplemented!() } + // project activity + + async fn record_project_activity( + &self, + _period: Range, + _active_projects: &[(UserId, u64)], + ) -> Result<()> { + unimplemented!() + } + + async fn summarize_project_activity( + &self, + _period: Range, + _limit: usize, + ) -> Result> { + unimplemented!() + } + // contacts async fn get_contacts(&self, id: UserId) -> Result> { diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 9f20880447..c1ae59e83a 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -4722,7 +4722,7 @@ impl TestServer { foreground: Rc, background: Arc, ) -> Self { - let test_db = TestDb::fake(background); + let test_db = TestDb::fake(background.clone()); let app_state = Self::build_app_state(&test_db).await; let peer = Peer::new(); let notifications = mpsc::unbounded(); diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index f8a9fedb66..2c2c6a94f4 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -14,6 +14,7 @@ use serde::Deserialize; use std::{ net::{SocketAddr, TcpListener}, sync::Arc, + time::Duration, }; use tracing_log::LogTracer; use tracing_subscriber::{filter::EnvFilter, fmt::format::JsonFields, Layer}; @@ -66,6 +67,8 @@ async fn main() -> Result<()> { .expect("failed to bind TCP listener"); let rpc_server = rpc::Server::new(state.clone(), None); + rpc_server.start_recording_project_activity(Duration::from_secs(5 * 60), rpc::RealExecutor); + let app = Router::::new() .merge(api::routes(&rpc_server, state.clone())) .merge(rpc::routes(rpc_server)); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index e244757970..8c5c7376e3 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -288,6 +288,57 @@ impl Server { }) } + /// Start a long lived task that records which users are active in which projects. + pub fn start_recording_project_activity( + self: &Arc, + interval: Duration, + executor: E, + ) { + executor.spawn_detached({ + let this = Arc::downgrade(self); + let executor = executor.clone(); + async move { + let mut period_start = OffsetDateTime::now_utc(); + let mut active_projects = Vec::<(UserId, u64)>::new(); + loop { + let sleep = executor.sleep(interval); + sleep.await; + let this = if let Some(this) = this.upgrade() { + this + } else { + break; + }; + + active_projects.clear(); + active_projects.extend(this.store().await.projects().flat_map( + |(project_id, project)| { + project.guests.values().chain([&project.host]).filter_map( + |collaborator| { + if collaborator + .last_activity + .map_or(false, |activity| activity > period_start) + { + Some((collaborator.user_id, *project_id)) + } else { + None + } + }, + ) + }, + )); + + let period_end = OffsetDateTime::now_utc(); + this.app_state + .db + .record_project_activity(period_start..period_end, &active_projects) + .await + .trace_err(); + period_start = period_end; + } + } + }); + } + pub fn handle_connection( self: &Arc, connection: Connection, @@ -621,7 +672,7 @@ impl Server { { let state = self.store().await; let project = state.project(project_id)?; - host_user_id = project.host_user_id; + host_user_id = project.host.user_id; host_connection_id = project.host_connection_id; guest_user_id = state.user_id_for_connection(request.sender_id)?; }; @@ -665,7 +716,7 @@ impl Server { Err(anyhow!("no such connection"))?; } - host_user_id = project.host_user_id; + host_user_id = project.host.user_id; let guest_user_id = UserId::from_proto(request.payload.requester_id); if !request.payload.allow { @@ -697,7 +748,7 @@ impl Server { collaborators.push(proto::Collaborator { peer_id: project.host_connection_id.0, replica_id: 0, - user_id: project.host_user_id.to_proto(), + user_id: project.host.user_id.to_proto(), }); let worktrees = project .worktrees @@ -720,15 +771,15 @@ impl Server { .collect::>(); // Add all guests other than the requesting user's own connections as collaborators - for (peer_conn_id, (peer_replica_id, peer_user_id)) in &project.guests { + for (guest_conn_id, guest) in &project.guests { if receipts_with_replica_ids .iter() - .all(|(receipt, _)| receipt.sender_id != *peer_conn_id) + .all(|(receipt, _)| receipt.sender_id != *guest_conn_id) { collaborators.push(proto::Collaborator { - peer_id: peer_conn_id.0, - replica_id: *peer_replica_id as u32, - user_id: peer_user_id.to_proto(), + peer_id: guest_conn_id.0, + replica_id: guest.replica_id as u32, + user_id: guest.user_id.to_proto(), }); } } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 9fe9162edd..559b4a479b 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -9,8 +9,9 @@ use std::{ mem, path::{Path, PathBuf}, str, - time::{Duration, Instant}, + time::Duration, }; +use time::OffsetDateTime; use tracing::instrument; #[derive(Default, Serialize)] @@ -35,15 +36,21 @@ struct ConnectionState { #[derive(Serialize)] pub struct Project { pub host_connection_id: ConnectionId, - pub host_user_id: UserId, - pub guests: HashMap, + pub host: Collaborator, + pub guests: HashMap, #[serde(skip)] pub join_requests: HashMap>>, pub active_replica_ids: HashSet, pub worktrees: BTreeMap, pub language_servers: Vec, +} + +#[derive(Serialize)] +pub struct Collaborator { + pub replica_id: ReplicaId, + pub user_id: UserId, #[serde(skip)] - last_activity: Option, + pub last_activity: Option, } #[derive(Default, Serialize)] @@ -93,6 +100,9 @@ pub struct Metrics { impl Store { pub fn metrics(&self) -> Metrics { + const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60); + let active_window_start = OffsetDateTime::now_utc() - ACTIVE_PROJECT_TIMEOUT; + let connections = self.connections.values().filter(|c| !c.admin).count(); let mut registered_projects = 0; let mut active_projects = 0; @@ -101,7 +111,7 @@ impl Store { if let Some(connection) = self.connections.get(&project.host_connection_id) { if !connection.admin { registered_projects += 1; - if project.is_active() { + if project.is_active_since(active_window_start) { active_projects += 1; if !project.guests.is_empty() { shared_projects += 1; @@ -289,7 +299,7 @@ impl Store { let mut metadata = Vec::new(); for project_id in project_ids { if let Some(project) = self.projects.get(&project_id) { - if project.host_user_id == user_id { + if project.host.user_id == user_id { metadata.push(proto::ProjectMetadata { id: project_id, visible_worktree_root_names: project @@ -301,7 +311,7 @@ impl Store { guests: project .guests .values() - .map(|(_, user_id)| user_id.to_proto()) + .map(|guest| guest.user_id.to_proto()) .collect(), }); } @@ -321,13 +331,16 @@ impl Store { project_id, Project { host_connection_id, - host_user_id, + host: Collaborator { + user_id: host_user_id, + replica_id: 0, + last_activity: None, + }, guests: Default::default(), join_requests: Default::default(), active_replica_ids: Default::default(), worktrees: Default::default(), language_servers: Default::default(), - last_activity: None, }, ); if let Some(connection) = self.connections.get_mut(&host_connection_id) { @@ -470,7 +483,6 @@ impl Store { .get_mut(&project_id) .ok_or_else(|| anyhow!("no such project"))?; connection.requested_projects.insert(project_id); - project.last_activity = Some(Instant::now()); project .join_requests .entry(requester_id) @@ -495,7 +507,7 @@ impl Store { let requester_connection = self.connections.get_mut(&receipt.sender_id)?; requester_connection.requested_projects.remove(&project_id); } - project.last_activity = Some(Instant::now()); + project.host.last_activity = Some(OffsetDateTime::now_utc()); Some(receipts) } @@ -522,13 +534,18 @@ impl Store { replica_id += 1; } project.active_replica_ids.insert(replica_id); - project - .guests - .insert(receipt.sender_id, (replica_id, requester_id)); + project.guests.insert( + receipt.sender_id, + Collaborator { + replica_id, + user_id: requester_id, + last_activity: Some(OffsetDateTime::now_utc()), + }, + ); receipts_with_replica_ids.push((receipt, replica_id)); } - project.last_activity = Some(Instant::now()); + project.host.last_activity = Some(OffsetDateTime::now_utc()); Some((receipts_with_replica_ids, project)) } @@ -544,13 +561,12 @@ impl Store { .ok_or_else(|| anyhow!("no such project"))?; // If the connection leaving the project is a collaborator, remove it. - let remove_collaborator = - if let Some((replica_id, _)) = project.guests.remove(&connection_id) { - project.active_replica_ids.remove(&replica_id); - true - } else { - false - }; + let remove_collaborator = if let Some(guest) = project.guests.remove(&connection_id) { + project.active_replica_ids.remove(&guest.replica_id); + true + } else { + false + }; // If the connection leaving the project has a pending request, remove it. // If that user has no other pending requests on other connections, indicate that the request should be cancelled. @@ -579,11 +595,9 @@ impl Store { } } - project.last_activity = Some(Instant::now()); - Ok(LeftProject { host_connection_id: project.host_connection_id, - host_user_id: project.host_user_id, + host_user_id: project.host.user_id, connection_ids, cancel_request, unshare, @@ -674,10 +688,25 @@ impl Store { project_id: u64, connection_id: ConnectionId, ) -> Result<()> { - self.write_project(project_id, connection_id)?.last_activity = Some(Instant::now()); + let project = self + .projects + .get_mut(&project_id) + .ok_or_else(|| anyhow!("no such project"))?; + let collaborator = if connection_id == project.host_connection_id { + &mut project.host + } else if let Some(guest) = project.guests.get_mut(&connection_id) { + guest + } else { + return Err(anyhow!("no such project"))?; + }; + collaborator.last_activity = Some(OffsetDateTime::now_utc()); Ok(()) } + pub fn projects(&self) -> impl Iterator { + self.projects.iter() + } + pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> { let project = self .projects @@ -768,7 +797,7 @@ impl Store { project .guests .values() - .map(|(replica_id, _)| *replica_id) + .map(|guest| guest.replica_id) .collect::>(), ); } @@ -783,11 +812,15 @@ impl Store { } impl Project { - fn is_active(&self) -> bool { - const ACTIVE_PROJECT_TIMEOUT: Duration = Duration::from_secs(60); - self.last_activity.map_or(false, |last_activity| { - last_activity.elapsed() < ACTIVE_PROJECT_TIMEOUT - }) + fn is_active_since(&self, start_time: OffsetDateTime) -> bool { + self.guests + .values() + .chain([&self.host]) + .any(|collaborator| { + collaborator + .last_activity + .map_or(false, |active_time| active_time > start_time) + }) } pub fn guest_connection_ids(&self) -> Vec {