From 7acebc4eb8d27e3c033dd9c59bbd62033efff3ac Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 21 Jun 2022 10:29:26 +0200 Subject: [PATCH] Register projects in the database and record worktree extensions --- ...211403_create_project_activity_periods.sql | 9 - .../20220620211403_create_projects.sql | 24 +++ crates/collab/src/db.rs | 153 ++++++++++++++-- crates/collab/src/integration_tests.rs | 4 +- crates/collab/src/rpc.rs | 170 ++++++++++-------- crates/collab/src/rpc/store.rs | 92 +++++----- 6 files changed, 312 insertions(+), 140 deletions(-) delete mode 100644 crates/collab/migrations/20220620211403_create_project_activity_periods.sql create mode 100644 crates/collab/migrations/20220620211403_create_projects.sql diff --git a/crates/collab/migrations/20220620211403_create_project_activity_periods.sql b/crates/collab/migrations/20220620211403_create_project_activity_periods.sql deleted file mode 100644 index a676d14444..0000000000 --- a/crates/collab/migrations/20220620211403_create_project_activity_periods.sql +++ /dev/null @@ -1,9 +0,0 @@ -CREATE TABLE IF NOT EXISTS "project_activity_periods" ( - "id" SERIAL PRIMARY KEY, - "duration_millis" INTEGER NOT NULL, - "ended_at" TIMESTAMP NOT NULL, - "user_id" INTEGER REFERENCES users (id) NOT NULL, - "project_id" INTEGER -); - -CREATE INDEX "index_project_activity_periods_on_ended_at" ON "project_activity_periods" ("ended_at"); diff --git a/crates/collab/migrations/20220620211403_create_projects.sql b/crates/collab/migrations/20220620211403_create_projects.sql new file mode 100644 index 0000000000..d813c9f7a1 --- /dev/null +++ b/crates/collab/migrations/20220620211403_create_projects.sql @@ -0,0 +1,24 @@ +CREATE TABLE IF NOT EXISTS "projects" ( + "id" SERIAL PRIMARY KEY, + "host_user_id" INTEGER REFERENCES users (id) NOT NULL, + "unregistered" BOOLEAN NOT NULL DEFAULT false +); + +CREATE TABLE IF NOT EXISTS "worktree_extensions" ( + "id" SERIAL PRIMARY KEY, + "project_id" INTEGER REFERENCES projects (id) NOT NULL, + "worktree_id" INTEGER NOT NULL, + "extension" VARCHAR(255), + "count" INTEGER NOT NULL +); + +CREATE TABLE IF NOT EXISTS "project_activity_periods" ( + "id" SERIAL PRIMARY KEY, + "duration_millis" INTEGER NOT NULL, + "ended_at" TIMESTAMP NOT NULL, + "user_id" INTEGER REFERENCES users (id) NOT NULL, + "project_id" INTEGER REFERENCES projects (id) NOT NULL +); + +CREATE INDEX "index_project_activity_periods_on_ended_at" ON "project_activity_periods" ("ended_at"); +CREATE UNIQUE INDEX "index_worktree_extensions_on_project_id_and_worktree_id_and_extension" ON "worktree_extensions" ("project_id", "worktree_id", "extension"); \ No newline at end of file diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index c40149ffed..3fb7c22da7 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -4,6 +4,7 @@ use crate::{Error, Result}; use anyhow::{anyhow, Context}; use async_trait::async_trait; use axum::http::StatusCode; +use collections::HashMap; use futures::StreamExt; use nanoid::nanoid; use serde::Serialize; @@ -39,12 +40,26 @@ pub trait Db: Send + Sync { email_address: Option<&str>, ) -> Result; + /// Registers a new project for the given user. + async fn register_project(&self, host_user_id: UserId) -> Result; + + /// Unregisters a project for the given project id. + async fn unregister_project(&self, project_id: ProjectId) -> Result<()>; + + /// Create a new project for the given user. + async fn update_worktree_extensions( + &self, + project_id: ProjectId, + worktree_id: u64, + extensions: HashMap, + ) -> Result<()>; + /// Record which users have been active in which projects during /// a given period of time. async fn record_project_activity( &self, time_period: Range, - active_projects: &[(UserId, u64)], + active_projects: &[(UserId, ProjectId)], ) -> Result<()>; /// Get the users that have been most active during the given time period, @@ -429,12 +444,67 @@ impl Db for PostgresDb { Ok(invitee_id) } - // project activity + // projects + + async fn register_project(&self, host_user_id: UserId) -> Result { + Ok(sqlx::query_scalar( + " + INSERT INTO projects(host_user_id) + VALUES ($1) + RETURNING id + ", + ) + .bind(host_user_id) + .fetch_one(&self.pool) + .await + .map(ProjectId)?) + } + + async fn unregister_project(&self, project_id: ProjectId) -> Result<()> { + sqlx::query( + " + UPDATE projects + SET unregistered = 't' + WHERE project_id = $1 + ", + ) + .bind(project_id) + .execute(&self.pool) + .await?; + Ok(()) + } + + async fn update_worktree_extensions( + &self, + project_id: ProjectId, + worktree_id: u64, + extensions: HashMap, + ) -> Result<()> { + let mut query = QueryBuilder::new( + "INSERT INTO worktree_extensions (project_id, worktree_id, extension, count)", + ); + query.push_values(extensions, |mut query, (extension, count)| { + query + .push_bind(project_id) + .push_bind(worktree_id as i32) + .push_bind(extension) + .push_bind(count as u32); + }); + query.push( + " + ON CONFLICT (project_id, worktree_id, extension) DO UPDATE SET + count = excluded.count + ", + ); + query.build().execute(&self.pool).await?; + + Ok(()) + } async fn record_project_activity( &self, time_period: Range, - projects: &[(UserId, u64)], + projects: &[(UserId, ProjectId)], ) -> Result<()> { let query = " INSERT INTO project_activity_periods @@ -451,7 +521,7 @@ impl Db for PostgresDb { .bind(time_period.end) .bind(duration_millis) .bind(user_id) - .bind(*project_id as i32) + .bind(project_id) .execute(&mut tx) .await?; } @@ -488,7 +558,7 @@ impl Db for PostgresDb { ORDER BY user_id ASC, project_duration DESC "; - let mut rows = sqlx::query_as::<_, (UserId, String, i32, i64)>(query) + let mut rows = sqlx::query_as::<_, (UserId, String, ProjectId, i64)>(query) .bind(time_period.start) .bind(time_period.end) .bind(max_user_count as i32) @@ -497,7 +567,7 @@ impl Db for PostgresDb { let mut result = Vec::::new(); while let Some(row) = rows.next().await { let (user_id, github_login, project_id, duration_millis) = row?; - let project_id = project_id as u64; + let project_id = project_id; let duration = Duration::from_millis(duration_millis as u64); if let Some(last_summary) = result.last_mut() { if last_summary.id == user_id { @@ -1031,11 +1101,19 @@ pub struct User { pub connected_once: bool, } +id_type!(ProjectId); +#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)] +pub struct Project { + pub id: ProjectId, + pub host_user_id: UserId, + pub unregistered: bool, +} + #[derive(Clone, Debug, PartialEq, Serialize)] pub struct UserActivitySummary { pub id: UserId, pub github_login: String, - pub project_activity: Vec<(u64, Duration)>, + pub project_activity: Vec<(ProjectId, Duration)>, } id_type!(OrgId); @@ -1244,8 +1322,8 @@ pub mod tests { let user_1 = db.create_user("user_1", None, false).await.unwrap(); let user_2 = db.create_user("user_2", None, false).await.unwrap(); let user_3 = db.create_user("user_3", None, false).await.unwrap(); - let project_1 = 101; - let project_2 = 102; + let project_1 = db.register_project(user_1).await.unwrap(); + let project_2 = db.register_project(user_2).await.unwrap(); let t0 = OffsetDateTime::now_utc() - Duration::from_secs(60 * 60); // User 2 opens a project @@ -1895,6 +1973,8 @@ pub mod tests { pub struct FakeDb { background: Arc, pub users: Mutex>, + pub projects: Mutex>, + pub worktree_extensions: Mutex>, pub orgs: Mutex>, pub org_memberships: Mutex>, pub channels: Mutex>, @@ -1905,6 +1985,7 @@ pub mod tests { next_user_id: Mutex, next_org_id: Mutex, next_channel_id: Mutex, + next_project_id: Mutex, } #[derive(Debug)] @@ -1921,6 +2002,9 @@ pub mod tests { background, users: Default::default(), next_user_id: Mutex::new(1), + projects: Default::default(), + worktree_extensions: Default::default(), + next_project_id: Mutex::new(1), orgs: Default::default(), next_org_id: Mutex::new(1), org_memberships: Default::default(), @@ -2040,12 +2124,59 @@ pub mod tests { unimplemented!() } - // project activity + // projects + + async fn register_project(&self, host_user_id: UserId) -> Result { + self.background.simulate_random_delay().await; + if !self.users.lock().contains_key(&host_user_id) { + Err(anyhow!("no such user"))?; + } + + let project_id = ProjectId(post_inc(&mut *self.next_project_id.lock())); + self.projects.lock().insert( + project_id, + Project { + id: project_id, + host_user_id, + unregistered: false, + }, + ); + Ok(project_id) + } + + async fn unregister_project(&self, project_id: ProjectId) -> Result<()> { + self.projects + .lock() + .get_mut(&project_id) + .ok_or_else(|| anyhow!("no such project"))? + .unregistered = true; + Ok(()) + } + + async fn update_worktree_extensions( + &self, + project_id: ProjectId, + worktree_id: u64, + extensions: HashMap, + ) -> Result<()> { + self.background.simulate_random_delay().await; + if !self.projects.lock().contains_key(&project_id) { + Err(anyhow!("no such project"))?; + } + + for (extension, count) in extensions { + self.worktree_extensions + .lock() + .insert((project_id, worktree_id, extension), count); + } + + Ok(()) + } async fn record_project_activity( &self, _period: Range, - _active_projects: &[(UserId, u64)], + _active_projects: &[(UserId, ProjectId)], ) -> Result<()> { unimplemented!() } diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index c1ae59e83a..47000d0bdb 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -1,5 +1,5 @@ use crate::{ - db::{tests::TestDb, UserId}, + db::{tests::TestDb, ProjectId, UserId}, rpc::{Executor, Server, Store}, AppState, }; @@ -1447,7 +1447,7 @@ async fn test_collaborating_with_diagnostics( deterministic.run_until_parked(); { let store = server.store.read().await; - let project = store.project(project_id).unwrap(); + let project = store.project(ProjectId::from_proto(project_id)).unwrap(); let worktree = project.worktrees.get(&worktree_id.to_proto()).unwrap(); assert!(!worktree.diagnostic_summaries.is_empty()); } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 8c5c7376e3..89d62b294e 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,7 +2,7 @@ mod store; use crate::{ auth, - db::{self, ChannelId, MessageId, User, UserId}, + db::{self, ChannelId, MessageId, ProjectId, User, UserId}, AppState, Result, }; use anyhow::anyhow; @@ -299,7 +299,7 @@ impl Server { let executor = executor.clone(); async move { let mut period_start = OffsetDateTime::now_utc(); - let mut active_projects = Vec::<(UserId, u64)>::new(); + let mut active_projects = Vec::<(UserId, ProjectId)>::new(); loop { let sleep = executor.sleep(interval); sleep.await; @@ -458,8 +458,12 @@ impl Server { for (project_id, project) in removed_connection.hosted_projects { broadcast(connection_id, project.guests.keys().copied(), |conn_id| { - self.peer - .send(conn_id, proto::UnregisterProject { project_id }) + self.peer.send( + conn_id, + proto::UnregisterProject { + project_id: project_id.to_proto(), + }, + ) }); for (_, receipts) in project.join_requests { @@ -484,7 +488,7 @@ impl Server { self.peer.send( conn_id, proto::RemoveProjectCollaborator { - project_id, + project_id: project_id.to_proto(), peer_id: connection_id.0, }, ) @@ -493,7 +497,9 @@ impl Server { self.peer .send( project.host_connection_id, - proto::ProjectUnshared { project_id }, + proto::ProjectUnshared { + project_id: project_id.to_proto(), + }, ) .trace_err(); } @@ -567,14 +573,18 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let project_id; - { - let mut state = self.store_mut().await; - let user_id = state.user_id_for_connection(request.sender_id)?; - project_id = state.register_project(request.sender_id, user_id); - }; + let user_id = self + .store() + .await + .user_id_for_connection(request.sender_id)?; + let project_id = self.app_state.db.register_project(user_id).await?; + self.store_mut() + .await + .register_project(request.sender_id, project_id)?; - response.send(proto::RegisterProjectResponse { project_id })?; + response.send(proto::RegisterProjectResponse { + project_id: project_id.to_proto(), + })?; Ok(()) } @@ -586,8 +596,10 @@ impl Server { ) -> Result<()> { let (user_id, project) = { let mut state = self.store_mut().await; - let project = - state.unregister_project(request.payload.project_id, request.sender_id)?; + let project = state.unregister_project( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; (state.user_id_for_connection(request.sender_id)?, project) }; @@ -664,7 +676,7 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let project_id = request.payload.project_id; + let project_id = ProjectId::from_proto(request.payload.project_id); let host_user_id; let guest_user_id; @@ -677,7 +689,7 @@ impl Server { guest_user_id = state.user_id_for_connection(request.sender_id)?; }; - tracing::info!(project_id, %host_user_id, %host_connection_id, "join project"); + tracing::info!(%project_id, %host_user_id, %host_connection_id, "join project"); let has_contact = self .app_state .db @@ -695,7 +707,7 @@ impl Server { self.peer.send( host_connection_id, proto::RequestJoinProject { - project_id, + project_id: project_id.to_proto(), requester_id: guest_user_id.to_proto(), }, )?; @@ -710,7 +722,7 @@ impl Server { { let mut state = self.store_mut().await; - let project_id = request.payload.project_id; + let project_id = ProjectId::from_proto(request.payload.project_id); let project = state.project(project_id)?; if project.host_connection_id != request.sender_id { Err(anyhow!("no such connection"))?; @@ -790,7 +802,7 @@ impl Server { self.peer.send( conn_id, proto::AddProjectCollaborator { - project_id, + project_id: project_id.to_proto(), collaborator: Some(proto::Collaborator { peer_id: receipt.sender_id.0, replica_id: *replica_id as u32, @@ -828,13 +840,13 @@ impl Server { request: TypedEnvelope, ) -> Result<()> { let sender_id = request.sender_id; - let project_id = request.payload.project_id; + let project_id = ProjectId::from_proto(request.payload.project_id); let project; { let mut store = self.store_mut().await; project = store.leave_project(sender_id, project_id)?; tracing::info!( - project_id, + %project_id, host_user_id = %project.host_user_id, host_connection_id = %project.host_connection_id, "leave project" @@ -845,7 +857,7 @@ impl Server { self.peer.send( conn_id, proto::RemoveProjectCollaborator { - project_id, + project_id: project_id.to_proto(), peer_id: sender_id.0, }, ) @@ -856,7 +868,7 @@ impl Server { self.peer.send( project.host_connection_id, proto::JoinProjectRequestCancelled { - project_id, + project_id: project_id.to_proto(), requester_id: requester_id.to_proto(), }, )?; @@ -865,7 +877,9 @@ impl Server { if project.unshare { self.peer.send( project.host_connection_id, - proto::ProjectUnshared { project_id }, + proto::ProjectUnshared { + project_id: project_id.to_proto(), + }, )?; } } @@ -877,18 +891,15 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let user_id; { let mut state = self.store_mut().await; user_id = state.user_id_for_connection(request.sender_id)?; let guest_connection_ids = state - .read_project(request.payload.project_id, request.sender_id)? + .read_project(project_id, request.sender_id)? .guest_connection_ids(); - state.update_project( - request.payload.project_id, - &request.payload.worktrees, - request.sender_id, - )?; + state.update_project(project_id, &request.payload.worktrees, request.sender_id)?; broadcast(request.sender_id, guest_connection_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -902,9 +913,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - self.store_mut() - .await - .register_project_activity(request.payload.project_id, request.sender_id)?; + self.store_mut().await.register_project_activity( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; Ok(()) } @@ -913,28 +925,25 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { - let (connection_ids, metadata_changed) = { + let project_id = ProjectId::from_proto(request.payload.project_id); + let worktree_id = request.payload.worktree_id; + let (connection_ids, metadata_changed, extension_counts) = { let mut store = self.store_mut().await; let (connection_ids, metadata_changed, extension_counts) = store.update_worktree( request.sender_id, - request.payload.project_id, - request.payload.worktree_id, + project_id, + worktree_id, &request.payload.root_name, &request.payload.removed_entries, &request.payload.updated_entries, request.payload.scan_id, )?; - for (extension, count) in extension_counts { - tracing::info!( - project_id = request.payload.project_id, - worktree_id = request.payload.worktree_id, - ?extension, - %count, - "worktree updated" - ); - } - (connection_ids, metadata_changed) + (connection_ids, metadata_changed, extension_counts.clone()) }; + self.app_state + .db + .update_worktree_extensions(project_id, worktree_id, extension_counts) + .await?; broadcast(request.sender_id, connection_ids, |connection_id| { self.peer @@ -961,7 +970,7 @@ impl Server { .clone() .ok_or_else(|| anyhow!("invalid summary"))?; let receiver_ids = self.store_mut().await.update_diagnostic_summary( - request.payload.project_id, + ProjectId::from_proto(request.payload.project_id), request.payload.worktree_id, request.sender_id, summary, @@ -979,7 +988,7 @@ impl Server { request: TypedEnvelope, ) -> Result<()> { let receiver_ids = self.store_mut().await.start_language_server( - request.payload.project_id, + ProjectId::from_proto(request.payload.project_id), request.sender_id, request .payload @@ -998,10 +1007,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = self.store().await.project_connection_ids( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -1020,7 +1029,10 @@ impl Server { let host_connection_id = self .store() .await - .read_project(request.payload.remote_entity_id(), request.sender_id)? + .read_project( + ProjectId::from_proto(request.payload.remote_entity_id()), + request.sender_id, + )? .host_connection_id; response.send( @@ -1036,10 +1048,11 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let host = self .store() .await - .read_project(request.payload.project_id, request.sender_id)? + .read_project(project_id, request.sender_id)? .host_connection_id; let response_payload = self .peer @@ -1049,7 +1062,7 @@ impl Server { let mut guests = self .store() .await - .read_project(request.payload.project_id, request.sender_id)? + .read_project(project_id, request.sender_id)? .connection_ids(); guests.retain(|guest_connection_id| *guest_connection_id != request.sender_id); broadcast(host, guests, |conn_id| { @@ -1065,10 +1078,11 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let receiver_ids = { let mut store = self.store_mut().await; - store.register_project_activity(request.payload.project_id, request.sender_id)?; - store.project_connection_ids(request.payload.project_id, request.sender_id)? + store.register_project_activity(project_id, request.sender_id)?; + store.project_connection_ids(project_id, request.sender_id)? }; broadcast(request.sender_id, receiver_ids, |connection_id| { @@ -1083,10 +1097,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = self.store().await.project_connection_ids( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -1098,10 +1112,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = self.store().await.project_connection_ids( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -1113,10 +1127,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { - let receiver_ids = self - .store() - .await - .project_connection_ids(request.payload.project_id, request.sender_id)?; + let receiver_ids = self.store().await.project_connection_ids( + ProjectId::from_proto(request.payload.project_id), + request.sender_id, + )?; broadcast(request.sender_id, receiver_ids, |connection_id| { self.peer .forward_send(request.sender_id, connection_id, request.payload.clone()) @@ -1129,18 +1143,19 @@ impl Server { request: TypedEnvelope, response: Response, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let leader_id = ConnectionId(request.payload.leader_id); let follower_id = request.sender_id; { let mut store = self.store_mut().await; if !store - .project_connection_ids(request.payload.project_id, follower_id)? + .project_connection_ids(project_id, follower_id)? .contains(&leader_id) { Err(anyhow!("no such peer"))?; } - store.register_project_activity(request.payload.project_id, follower_id)?; + store.register_project_activity(project_id, follower_id)?; } let mut response_payload = self @@ -1155,15 +1170,16 @@ impl Server { } async fn unfollow(self: Arc, request: TypedEnvelope) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let leader_id = ConnectionId(request.payload.leader_id); let mut store = self.store_mut().await; if !store - .project_connection_ids(request.payload.project_id, request.sender_id)? + .project_connection_ids(project_id, request.sender_id)? .contains(&leader_id) { Err(anyhow!("no such peer"))?; } - store.register_project_activity(request.payload.project_id, request.sender_id)?; + store.register_project_activity(project_id, request.sender_id)?; self.peer .forward_send(request.sender_id, leader_id, request.payload)?; Ok(()) @@ -1173,10 +1189,10 @@ impl Server { self: Arc, request: TypedEnvelope, ) -> Result<()> { + let project_id = ProjectId::from_proto(request.payload.project_id); let mut store = self.store_mut().await; - store.register_project_activity(request.payload.project_id, request.sender_id)?; - let connection_ids = - store.project_connection_ids(request.payload.project_id, request.sender_id)?; + store.register_project_activity(project_id, request.sender_id)?; + let connection_ids = store.project_connection_ids(project_id, request.sender_id)?; let leader_id = request .payload .variant diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 559b4a479b..437cd8acc2 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -1,11 +1,13 @@ -use crate::db::{self, ChannelId, UserId}; +use crate::db::{self, ChannelId, ProjectId, UserId}; use anyhow::{anyhow, Result}; -use collections::{hash_map::Entry, BTreeMap, HashMap, HashSet}; +use collections::{ + btree_map, + hash_map::{self, Entry}, + BTreeMap, BTreeSet, HashMap, HashSet, +}; use rpc::{proto, ConnectionId, Receipt}; use serde::Serialize; use std::{ - collections::hash_map, - ffi::{OsStr, OsString}, mem, path::{Path, PathBuf}, str, @@ -18,18 +20,17 @@ use tracing::instrument; pub struct Store { connections: HashMap, connections_by_user_id: HashMap>, - projects: HashMap, + projects: BTreeMap, #[serde(skip)] channels: HashMap, - next_project_id: u64, } #[derive(Serialize)] struct ConnectionState { user_id: UserId, admin: bool, - projects: HashSet, - requested_projects: HashSet, + projects: BTreeSet, + requested_projects: HashSet, channels: HashSet, } @@ -60,7 +61,7 @@ pub struct Worktree { #[serde(skip)] pub entries: HashMap, #[serde(skip)] - pub extension_counts: HashMap, + pub extension_counts: HashMap, #[serde(skip)] pub diagnostic_summaries: BTreeMap, pub scan_id: u64, @@ -76,8 +77,8 @@ pub type ReplicaId = u16; #[derive(Default)] pub struct RemovedConnectionState { pub user_id: UserId, - pub hosted_projects: HashMap, - pub guest_project_ids: HashSet, + pub hosted_projects: HashMap, + pub guest_project_ids: HashSet, pub contact_ids: HashSet, } @@ -301,7 +302,7 @@ impl Store { if let Some(project) = self.projects.get(&project_id) { if project.host.user_id == user_id { metadata.push(proto::ProjectMetadata { - id: project_id, + id: project_id.to_proto(), visible_worktree_root_names: project .worktrees .values() @@ -324,15 +325,19 @@ impl Store { pub fn register_project( &mut self, host_connection_id: ConnectionId, - host_user_id: UserId, - ) -> u64 { - let project_id = self.next_project_id; + project_id: ProjectId, + ) -> Result<()> { + let connection = self + .connections + .get_mut(&host_connection_id) + .ok_or_else(|| anyhow!("no such connection"))?; + connection.projects.insert(project_id); self.projects.insert( project_id, Project { host_connection_id, host: Collaborator { - user_id: host_user_id, + user_id: connection.user_id, replica_id: 0, last_activity: None, }, @@ -343,16 +348,12 @@ impl Store { language_servers: Default::default(), }, ); - if let Some(connection) = self.connections.get_mut(&host_connection_id) { - connection.projects.insert(project_id); - } - self.next_project_id += 1; - project_id + Ok(()) } pub fn update_project( &mut self, - project_id: u64, + project_id: ProjectId, worktrees: &[proto::WorktreeMetadata], connection_id: ConnectionId, ) -> Result<()> { @@ -384,11 +385,11 @@ impl Store { pub fn unregister_project( &mut self, - project_id: u64, + project_id: ProjectId, connection_id: ConnectionId, ) -> Result { match self.projects.entry(project_id) { - hash_map::Entry::Occupied(e) => { + btree_map::Entry::Occupied(e) => { if e.get().host_connection_id == connection_id { let project = e.remove(); @@ -421,13 +422,13 @@ impl Store { Err(anyhow!("no such project"))? } } - hash_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?, + btree_map::Entry::Vacant(_) => Err(anyhow!("no such project"))?, } } pub fn update_diagnostic_summary( &mut self, - project_id: u64, + project_id: ProjectId, worktree_id: u64, connection_id: ConnectionId, summary: proto::DiagnosticSummary, @@ -452,7 +453,7 @@ impl Store { pub fn start_language_server( &mut self, - project_id: u64, + project_id: ProjectId, connection_id: ConnectionId, language_server: proto::LanguageServer, ) -> Result> { @@ -471,7 +472,7 @@ impl Store { pub fn request_join_project( &mut self, requester_id: UserId, - project_id: u64, + project_id: ProjectId, receipt: Receipt, ) -> Result<()> { let connection = self @@ -495,7 +496,7 @@ impl Store { &mut self, responder_connection_id: ConnectionId, requester_id: UserId, - project_id: u64, + project_id: ProjectId, ) -> Option>> { let project = self.projects.get_mut(&project_id)?; if responder_connection_id != project.host_connection_id { @@ -516,7 +517,7 @@ impl Store { &mut self, responder_connection_id: ConnectionId, requester_id: UserId, - project_id: u64, + project_id: ProjectId, ) -> Option<(Vec<(Receipt, ReplicaId)>, &Project)> { let project = self.projects.get_mut(&project_id)?; if responder_connection_id != project.host_connection_id { @@ -552,7 +553,7 @@ impl Store { pub fn leave_project( &mut self, connection_id: ConnectionId, - project_id: u64, + project_id: ProjectId, ) -> Result { let user_id = self.user_id_for_connection(connection_id)?; let project = self @@ -608,13 +609,13 @@ impl Store { pub fn update_worktree( &mut self, connection_id: ConnectionId, - project_id: u64, + project_id: ProjectId, worktree_id: u64, worktree_root_name: &str, removed_entries: &[u64], updated_entries: &[proto::Entry], scan_id: u64, - ) -> Result<(Vec, bool, &HashMap)> { + ) -> Result<(Vec, bool, HashMap)> { let project = self.write_project(project_id, connection_id)?; let connection_ids = project.connection_ids(); let mut worktree = project.worktrees.entry(worktree_id).or_default(); @@ -656,12 +657,16 @@ impl Store { } worktree.scan_id = scan_id; - Ok((connection_ids, metadata_changed, &worktree.extension_counts)) + Ok(( + connection_ids, + metadata_changed, + worktree.extension_counts.clone(), + )) } pub fn project_connection_ids( &self, - project_id: u64, + project_id: ProjectId, acting_connection_id: ConnectionId, ) -> Result> { Ok(self @@ -677,7 +682,7 @@ impl Store { .connection_ids()) } - pub fn project(&self, project_id: u64) -> Result<&Project> { + pub fn project(&self, project_id: ProjectId) -> Result<&Project> { self.projects .get(&project_id) .ok_or_else(|| anyhow!("no such project")) @@ -685,7 +690,7 @@ impl Store { pub fn register_project_activity( &mut self, - project_id: u64, + project_id: ProjectId, connection_id: ConnectionId, ) -> Result<()> { let project = self @@ -703,11 +708,15 @@ impl Store { Ok(()) } - pub fn projects(&self) -> impl Iterator { + pub fn projects(&self) -> impl Iterator { self.projects.iter() } - pub fn read_project(&self, project_id: u64, connection_id: ConnectionId) -> Result<&Project> { + pub fn read_project( + &self, + project_id: ProjectId, + connection_id: ConnectionId, + ) -> Result<&Project> { let project = self .projects .get(&project_id) @@ -723,7 +732,7 @@ impl Store { fn write_project( &mut self, - project_id: u64, + project_id: ProjectId, connection_id: ConnectionId, ) -> Result<&mut Project> { let project = self @@ -842,9 +851,10 @@ impl Channel { } } -fn extension_for_entry(entry: &proto::Entry) -> Option<&OsStr> { +fn extension_for_entry(entry: &proto::Entry) -> Option<&str> { str::from_utf8(&entry.path) .ok() .map(Path::new) .and_then(|p| p.extension()) + .and_then(|e| e.to_str()) }