From c41b958829f7a1a04ad420332a061e00d24b5a6c Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 17 Dec 2021 22:00:39 -0800 Subject: [PATCH] WIP - start restructuring collaboration around entire projects Co-Authored-By: Nathan Sobo --- crates/client/src/user.rs | 24 +- crates/project/src/project.rs | 380 ++++++++++++++++++++++++++++- crates/project/src/worktree.rs | 433 +++++++-------------------------- crates/rpc/proto/zed.proto | 195 ++++++++------- crates/rpc/src/peer.rs | 2 + crates/rpc/src/proto.rs | 50 ++-- crates/server/src/rpc.rs | 72 +++--- crates/server/src/rpc/store.rs | 176 +++++++++----- 8 files changed, 771 insertions(+), 561 deletions(-) diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index 0a387487e1..26be77bf2d 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -22,14 +22,14 @@ pub struct User { #[derive(Debug)] pub struct Contact { pub user: Arc, - pub worktrees: Vec, + pub projects: Vec, } #[derive(Debug)] -pub struct WorktreeMetadata { +pub struct ProjectMetadata { pub id: u64, - pub root_name: String, pub is_shared: bool, + pub worktree_root_names: Vec, pub guests: Vec>, } @@ -112,7 +112,7 @@ impl UserStore { let mut user_ids = HashSet::new(); for contact in &message.contacts { user_ids.insert(contact.user_id); - user_ids.extend(contact.worktrees.iter().flat_map(|w| &w.guests).copied()); + user_ids.extend(contact.projects.iter().flat_map(|w| &w.guests).copied()); } let load_users = self.load_users(user_ids.into_iter().collect(), cx); @@ -221,10 +221,10 @@ impl Contact { user_store.fetch_user(contact.user_id, cx) }) .await?; - let mut worktrees = Vec::new(); - for worktree in contact.worktrees { + let mut projects = Vec::new(); + for project in contact.projects { let mut guests = Vec::new(); - for participant_id in worktree.guests { + for participant_id in project.guests { guests.push( user_store .update(cx, |user_store, cx| { @@ -233,14 +233,14 @@ impl Contact { .await?, ); } - worktrees.push(WorktreeMetadata { - id: worktree.id, - root_name: worktree.root_name, - is_shared: worktree.is_shared, + projects.push(ProjectMetadata { + id: project.id, + worktree_root_names: project.worktree_root_names.clone(), + is_shared: project.is_shared, guests, }); } - Ok(Self { user, worktrees }) + Ok(Self { user, projects }) } } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index cd3f09c98b..f08b8a891c 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -3,18 +3,20 @@ mod ignore; mod worktree; use anyhow::{anyhow, Result}; -use client::{Client, UserStore}; +use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore}; use clock::ReplicaId; +use collections::HashMap; use futures::Future; use fuzzy::{PathMatch, PathMatchCandidate, PathMatchCandidateSet}; -use gpui::{AppContext, Entity, ModelContext, ModelHandle, Task}; +use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task}; use language::{Buffer, DiagnosticEntry, LanguageRegistry}; use lsp::DiagnosticSeverity; +use postage::{prelude::Stream, sink::Sink, watch}; use std::{ path::Path, sync::{atomic::AtomicBool, Arc}, }; -use util::TryFutureExt as _; +use util::{ResultExt, TryFutureExt as _}; pub use fs::*; pub use worktree::*; @@ -27,6 +29,29 @@ pub struct Project { client: Arc, user_store: ModelHandle, fs: Arc, + client_state: ProjectClientState, + collaborators: HashMap, + subscriptions: Vec, +} + +enum ProjectClientState { + Local { + is_shared: bool, + remote_id_tx: watch::Sender>, + remote_id_rx: watch::Receiver>, + _maintain_remote_id_task: Task>, + }, + Remote { + remote_id: u64, + replica_id: ReplicaId, + }, +} + +#[derive(Clone, Debug)] +pub struct Collaborator { + pub user: Arc, + pub peer_id: PeerId, + pub replica_id: ReplicaId, } pub enum Event { @@ -80,14 +105,46 @@ pub struct ProjectEntry { } impl Project { - pub fn new( + pub fn local( languages: Arc, client: Arc, user_store: ModelHandle, fs: Arc, + cx: &mut ModelContext, ) -> Self { + let (remote_id_tx, remote_id_rx) = watch::channel(); + let _maintain_remote_id_task = cx.spawn_weak({ + let rpc = client.clone(); + move |this, cx| { + async move { + let mut status = rpc.status(); + while let Some(status) = status.recv().await { + if let Some(this) = this.upgrade(&cx) { + let remote_id = if let client::Status::Connected { .. } = status { + let response = rpc.request(proto::RegisterProject {}).await?; + Some(response.project_id) + } else { + None + }; + this.update(&mut cx, |this, cx| this.set_remote_id(remote_id, cx)); + } + } + Ok(()) + } + .log_err() + } + }); + Self { worktrees: Default::default(), + collaborators: Default::default(), + client_state: ProjectClientState::Local { + is_shared: false, + remote_id_tx, + remote_id_rx, + _maintain_remote_id_task, + }, + subscriptions: Vec::new(), active_worktree: None, active_entry: None, languages, @@ -97,9 +154,120 @@ impl Project { } } + pub async fn open_remote( + remote_id: u64, + languages: Arc, + client: Arc, + user_store: ModelHandle, + fs: Arc, + cx: &mut AsyncAppContext, + ) -> Result> { + client.authenticate_and_connect(&cx).await?; + + let response = client + .request(proto::JoinProject { + project_id: remote_id, + }) + .await?; + + let replica_id = response.replica_id as ReplicaId; + + let mut worktrees = Vec::new(); + for worktree in response.worktrees { + worktrees.push( + Worktree::remote( + remote_id, + replica_id, + worktree, + client.clone(), + user_store.clone(), + languages.clone(), + cx, + ) + .await?, + ); + } + + let user_ids = response + .collaborators + .iter() + .map(|peer| peer.user_id) + .collect(); + user_store + .update(cx, |user_store, cx| user_store.load_users(user_ids, cx)) + .await?; + let mut collaborators = HashMap::default(); + for message in response.collaborators { + let collaborator = Collaborator::from_proto(message, &user_store, cx).await?; + collaborators.insert(collaborator.peer_id, collaborator); + } + + Ok(cx.add_model(|cx| Self { + worktrees, + active_worktree: None, + active_entry: None, + collaborators, + languages, + user_store, + fs, + subscriptions: vec![ + client.subscribe_to_entity(remote_id, cx, Self::handle_add_collaborator), + client.subscribe_to_entity(remote_id, cx, Self::handle_remove_collaborator), + client.subscribe_to_entity(remote_id, cx, Self::handle_register_worktree), + client.subscribe_to_entity(remote_id, cx, Self::handle_update_worktree), + client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer), + client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved), + ], + client, + client_state: ProjectClientState::Remote { + remote_id, + replica_id, + }, + })) + } + + fn set_remote_id(&mut self, remote_id: Option, cx: &mut ModelContext) { + if let ProjectClientState::Local { remote_id_tx, .. } = &mut self.client_state { + cx.foreground().spawn(remote_id_tx.send(remote_id)).detach(); + } + + for worktree in &self.worktrees { + worktree.update(cx, |worktree, cx| { + if let Some(worktree) = worktree.as_local_mut() { + worktree.set_project_remote_id(remote_id); + } + }); + } + + self.subscriptions.clear(); + if let Some(remote_id) = remote_id { + self.subscriptions.extend([ + self.client + .subscribe_to_entity(remote_id, cx, Self::handle_update_worktree), + self.client + .subscribe_to_entity(remote_id, cx, Self::handle_update_buffer), + self.client + .subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved), + ]); + } + } + + pub fn remote_id(&self) -> Option { + match &self.client_state { + ProjectClientState::Local { remote_id_rx, .. } => *remote_id_rx.borrow(), + ProjectClientState::Remote { remote_id, .. } => Some(*remote_id), + } + } + pub fn replica_id(&self, cx: &AppContext) -> ReplicaId { - // TODO - self.worktrees.first().unwrap().read(cx).replica_id() + match &self.client_state { + ProjectClientState::Local { .. } => 0, + ProjectClientState::Remote { replica_id, .. } => *replica_id, + } + } + + pub fn collaborators(&self) -> &HashMap { + &self.collaborators } pub fn worktrees(&self) -> &[ModelHandle] { @@ -113,6 +281,40 @@ impl Project { .cloned() } + pub fn share(&self, cx: &mut ModelContext) -> Task> { + let rpc = self.client.clone(); + cx.spawn(|this, mut cx| async move { + let remote_id = this.update(&mut cx, |this, cx| { + if let ProjectClientState::Local { + is_shared, + remote_id_rx, + .. + } = &mut this.client_state + { + *is_shared = true; + Ok(*remote_id_rx.borrow()) + } else { + Err(anyhow!("can't share a remote project")) + } + })?; + + let remote_id = remote_id.ok_or_else(|| anyhow!("no project id"))?; + rpc.send(proto::ShareProject { + project_id: remote_id, + }) + .await?; + + this.update(&mut cx, |this, cx| { + for worktree in &this.worktrees { + worktree.update(cx, |worktree, cx| { + worktree.as_local_mut().unwrap().share(cx).detach(); + }); + } + }); + Ok(()) + }) + } + pub fn open_buffer( &self, path: ProjectPath, @@ -139,6 +341,24 @@ impl Project { let worktree = Worktree::open_local(client, user_store, path, fs, languages, &mut cx).await?; this.update(&mut cx, |this, cx| { + if let Some(project_id) = this.remote_id() { + worktree.update(cx, |worktree, cx| { + worktree + .as_local_mut() + .unwrap() + .set_project_remote_id(Some(project_id)); + cx.foreground().spawn( + client + .request(proto::RegisterWorktree { + project_id, + root_name: worktree.root_name().to_string(), + authorized_logins: worktree.authorized_logins(), + worktree_id: worktree.id() as u64, + }) + .log_err(), + ); + }); + } this.add_worktree(worktree.clone(), cx); }); Ok(worktree) @@ -154,10 +374,8 @@ impl Project { let languages = self.languages.clone(); let user_store = self.user_store.clone(); cx.spawn(|this, mut cx| async move { - rpc.authenticate_and_connect(&cx).await?; let worktree = - Worktree::open_remote(rpc.clone(), remote_id, languages, user_store, &mut cx) - .await?; + Worktree::remote(rpc.clone(), remote_id, languages, user_store, &mut cx).await?; this.update(&mut cx, |this, cx| { cx.subscribe(&worktree, move |this, _, event, cx| match event { worktree::Event::Closed => { @@ -304,6 +522,116 @@ impl Project { } } + // RPC message handlers + + fn handle_add_collaborator( + &mut self, + mut envelope: TypedEnvelope, + _: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + let user_store = self.user_store.clone(); + let collaborator = envelope + .payload + .collaborator + .take() + .ok_or_else(|| anyhow!("empty collaborator"))?; + + cx.spawn(|this, mut cx| { + async move { + let collaborator = + Collaborator::from_proto(collaborator, &user_store, &mut cx).await?; + this.collaborators + .insert(collaborator.peer_id, collaborator); + cx.notify(); + Ok(()) + } + .log_err() + }) + .detach(); + + Ok(()) + } + + fn handle_remove_collaborator( + &mut self, + envelope: TypedEnvelope, + _: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + let peer_id = PeerId(envelope.payload.peer_id); + let replica_id = self + .collaborators + .remove(&peer_id) + .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? + .replica_id; + for worktree in &self.worktrees { + worktree.update(cx, |worktree, cx| { + worktree.remove_collaborator(peer_id, replica_id, cx); + }) + } + Ok(()) + } + + fn handle_register_worktree( + &mut self, + envelope: TypedEnvelope, + _: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + let peer_id = PeerId(envelope.payload.peer_id); + let replica_id = self + .collaborators + .remove(&peer_id) + .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? + .replica_id; + for worktree in &self.worktrees { + worktree.update(cx, |worktree, cx| { + worktree.remove_collaborator(peer_id, replica_id, cx); + }) + } + Ok(()) + } + + fn handle_update_worktree( + &mut self, + mut envelope: TypedEnvelope, + _: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + if let Some(worktree) = self.worktree_for_id(envelope.payload.worktree_id as usize) { + worktree + .as_remote_mut() + .unwrap() + .update_from_remote(envelope, cx); + } + Ok(()) + } + + pub fn handle_update_buffer( + &mut self, + envelope: TypedEnvelope, + _: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + if let Some(worktree) = self.worktree_for_id(envelope.payload.worktree_id as usize) { + worktree.handle_update_buffer(envelope, cx)?; + } + Ok(()) + } + + pub fn handle_buffer_saved( + &mut self, + envelope: TypedEnvelope, + _: Arc, + cx: &mut ModelContext, + ) -> Result<()> { + if let Some(worktree) = self.worktree_for_id(envelope.payload.worktree_id as usize) { + worktree.handle_buffer_saved(envelope, cx); + } + Ok(()) + } + pub fn match_paths<'a>( &self, query: &'a str, @@ -400,6 +728,38 @@ impl<'a> Iterator for CandidateSetIter<'a> { impl Entity for Project { type Event = Event; + + fn release(&mut self, cx: &mut gpui::MutableAppContext) { + if let Some(project_id) = *self.remote_id.borrow() { + let rpc = self.client.clone(); + cx.spawn(|_| async move { + if let Err(err) = rpc.send(proto::UnregisterProject { project_id }).await { + log::error!("error unregistering project: {}", err); + } + }) + .detach(); + } + } +} + +impl Collaborator { + fn from_proto( + message: proto::Collaborator, + user_store: &ModelHandle, + cx: &mut AsyncAppContext, + ) -> impl Future> { + let user = user_store.update(cx, |user_store, cx| { + user_store.fetch_user(message.user_id, cx) + }); + + async move { + Ok(Self { + peer_id: PeerId(message.peer_id), + user: user.await?, + replica_id: message.replica_id as ReplicaId, + }) + } + } } #[cfg(test)] @@ -514,6 +874,6 @@ mod tests { let client = client::Client::new(); let http_client = FakeHttpClient::new(|_| async move { Ok(ServerResponse::new(404)) }); let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http_client, cx)); - cx.add_model(|_| Project::new(languages, client, user_store, fs)) + cx.add_model(|cx| Project::new(languages, client, user_store, fs, cx)) } } diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 89eeb4d917..011275bd3b 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -5,7 +5,7 @@ use super::{ }; use ::ignore::gitignore::{Gitignore, GitignoreBuilder}; use anyhow::{anyhow, Context, Result}; -use client::{proto, Client, PeerId, TypedEnvelope, User, UserStore}; +use client::{proto, Client, PeerId, TypedEnvelope, UserStore}; use clock::ReplicaId; use collections::{hash_map, HashMap}; use futures::{Stream, StreamExt}; @@ -66,62 +66,9 @@ pub enum Event { Closed, } -#[derive(Clone, Debug)] -pub struct Collaborator { - pub user: Arc, - pub peer_id: PeerId, - pub replica_id: ReplicaId, -} - -impl Collaborator { - fn from_proto( - message: proto::Collaborator, - user_store: &ModelHandle, - cx: &mut AsyncAppContext, - ) -> impl Future> { - let user = user_store.update(cx, |user_store, cx| { - user_store.fetch_user(message.user_id, cx) - }); - - async move { - Ok(Self { - peer_id: PeerId(message.peer_id), - user: user.await?, - replica_id: message.replica_id as ReplicaId, - }) - } - } -} - impl Entity for Worktree { type Event = Event; - fn release(&mut self, cx: &mut MutableAppContext) { - match self { - Self::Local(tree) => { - if let Some(worktree_id) = *tree.remote_id.borrow() { - let rpc = tree.client.clone(); - cx.spawn(|_| async move { - if let Err(err) = rpc.send(proto::CloseWorktree { worktree_id }).await { - log::error!("error closing worktree: {}", err); - } - }) - .detach(); - } - } - Self::Remote(tree) => { - let rpc = tree.client.clone(); - let worktree_id = tree.remote_id; - cx.spawn(|_| async move { - if let Err(err) = rpc.send(proto::LeaveWorktree { worktree_id }).await { - log::error!("error closing worktree: {}", err); - } - }) - .detach(); - } - } - } - fn app_will_quit( &mut self, _: &mut MutableAppContext, @@ -172,32 +119,16 @@ impl Worktree { Ok(tree) } - pub async fn open_remote( - client: Arc, - id: u64, - languages: Arc, - user_store: ModelHandle, - cx: &mut AsyncAppContext, - ) -> Result> { - let response = client - .request(proto::JoinWorktree { worktree_id: id }) - .await?; - Worktree::remote(response, client, user_store, languages, cx).await - } - - async fn remote( - join_response: proto::JoinWorktreeResponse, + pub async fn remote( + project_remote_id: u64, + replica_id: ReplicaId, + worktree: proto::Worktree, client: Arc, user_store: ModelHandle, languages: Arc, cx: &mut AsyncAppContext, ) -> Result> { - let worktree = join_response - .worktree - .ok_or_else(|| anyhow!("empty worktree"))?; - let remote_id = worktree.id; - let replica_id = join_response.replica_id as ReplicaId; let root_char_bag: CharBag = worktree .root_name .chars() @@ -232,20 +163,6 @@ impl Worktree { }) .await; - let user_ids = join_response - .collaborators - .iter() - .map(|peer| peer.user_id) - .collect(); - user_store - .update(cx, |user_store, cx| user_store.load_users(user_ids, cx)) - .await?; - let mut collaborators = HashMap::default(); - for message in join_response.collaborators { - let collaborator = Collaborator::from_proto(message, &user_store, cx).await?; - collaborators.insert(collaborator.peer_id, collaborator); - } - let worktree = cx.update(|cx| { cx.add_model(|cx: &mut ModelContext| { let snapshot = Snapshot { @@ -290,16 +207,8 @@ impl Worktree { .detach(); } - let _subscriptions = vec![ - client.subscribe_to_entity(remote_id, cx, Self::handle_add_collaborator), - client.subscribe_to_entity(remote_id, cx, Self::handle_remove_collaborator), - client.subscribe_to_entity(remote_id, cx, Self::handle_update), - client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer), - client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved), - client.subscribe_to_entity(remote_id, cx, Self::handle_unshare), - ]; - Worktree::Remote(RemoteWorktree { + project_remote_id, remote_id, replica_id, snapshot, @@ -309,11 +218,9 @@ impl Worktree { loading_buffers: Default::default(), open_buffers: Default::default(), diagnostic_summaries: HashMap::default(), - collaborators, queued_operations: Default::default(), languages, user_store, - _subscriptions, }) }) }); @@ -359,6 +266,25 @@ impl Worktree { } } + pub fn authorized_logins(&self) -> Vec { + match self { + Worktree::Local(worktree) => worktree.config.collaborators.clone(), + Worktree::Remote(worktree) => Vec::new(), + } + } + + pub fn remove_collaborator( + &mut self, + peer_id: PeerId, + replica_id: ReplicaId, + cx: &mut ModelContext, + ) { + match self { + Worktree::Local(worktree) => worktree.remove_collaborator(peer_id, replica_id, cx), + Worktree::Remote(worktree) => worktree.remove_collaborator(peer_id, replica_id, cx), + } + } + pub fn languages(&self) -> &Arc { match self { Worktree::Local(worktree) => &worktree.languages, @@ -373,59 +299,6 @@ impl Worktree { } } - pub fn handle_add_collaborator( - &mut self, - mut envelope: TypedEnvelope, - _: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - let user_store = self.user_store().clone(); - let collaborator = envelope - .payload - .collaborator - .take() - .ok_or_else(|| anyhow!("empty collaborator"))?; - - cx.spawn(|this, mut cx| { - async move { - let collaborator = - Collaborator::from_proto(collaborator, &user_store, &mut cx).await?; - this.update(&mut cx, |this, cx| match this { - Worktree::Local(worktree) => worktree.add_collaborator(collaborator, cx), - Worktree::Remote(worktree) => worktree.add_collaborator(collaborator, cx), - }); - Ok(()) - } - .log_err() - }) - .detach(); - - Ok(()) - } - - pub fn handle_remove_collaborator( - &mut self, - envelope: TypedEnvelope, - _: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - match self { - Worktree::Local(worktree) => worktree.remove_collaborator(envelope, cx), - Worktree::Remote(worktree) => worktree.remove_collaborator(envelope, cx), - } - } - - pub fn handle_update( - &mut self, - envelope: TypedEnvelope, - _: Arc, - cx: &mut ModelContext, - ) -> anyhow::Result<()> { - self.as_remote_mut() - .unwrap() - .update_from_remote(envelope, cx) - } - pub fn handle_open_buffer( &mut self, envelope: TypedEnvelope, @@ -463,13 +336,6 @@ impl Worktree { .close_remote_buffer(envelope, cx) } - pub fn collaborators(&self) -> &HashMap { - match self { - Worktree::Local(worktree) => &worktree.collaborators, - Worktree::Remote(worktree) => &worktree.collaborators, - } - } - pub fn diagnostic_summaries<'a>( &'a self, ) -> impl Iterator, DiagnosticSummary)> + 'a { @@ -623,9 +489,12 @@ impl Worktree { cx: &mut ModelContext, ) -> Result<()> { let sender_id = envelope.original_sender_id()?; - let buffer = self - .as_local() - .unwrap() + let this = self.as_local().unwrap(); + let project_id = this + .project_remote_id + .ok_or_else(|| anyhow!("can't save buffer while disconnected"))?; + + let buffer = this .shared_buffers .get(&sender_id) .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) @@ -646,6 +515,7 @@ impl Worktree { rpc.respond( receipt, proto::BufferSaved { + project_id, worktree_id, buffer_id, version: (&version).into(), @@ -689,16 +559,6 @@ impl Worktree { Ok(()) } - pub fn handle_unshare( - &mut self, - _: TypedEnvelope, - _: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - cx.emit(Event::Closed); - Ok(()) - } - fn poll_snapshot(&mut self, cx: &mut ModelContext) { match self { Self::Local(worktree) => { @@ -905,17 +765,19 @@ impl Worktree { operation: Operation, cx: &mut ModelContext, ) { - if let Some((rpc, remote_id)) = match self { + if let Some((rpc, project_id)) = match self { Worktree::Local(worktree) => worktree - .remote_id - .borrow() + .project_remote_id .map(|id| (worktree.client.clone(), id)), - Worktree::Remote(worktree) => Some((worktree.client.clone(), worktree.remote_id)), + Worktree::Remote(worktree) => { + Some((worktree.client.clone(), worktree.project_remote_id)) + } } { cx.spawn(|worktree, mut cx| async move { if let Err(error) = rpc .request(proto::UpdateBuffer { - worktree_id: remote_id, + project_id, + worktree_id: worktree.id() as u64, buffer_id, operations: vec![language::proto::serialize_operation(&operation)], }) @@ -956,16 +818,14 @@ pub struct LocalWorktree { background_snapshot: Arc>, last_scan_state_rx: watch::Receiver, _background_scanner_task: Option>, - _maintain_remote_id_task: Task>, + project_remote_id: Option, poll_task: Option>, - remote_id: watch::Receiver>, share: Option, loading_buffers: LoadingBuffers, open_buffers: HashMap>, shared_buffers: HashMap>>, diagnostics: HashMap, Vec>>, diagnostic_summaries: HashMap, DiagnosticSummary>, - collaborators: HashMap, queued_operations: Vec<(u64, Operation)>, languages: Arc, client: Arc, @@ -976,10 +836,10 @@ pub struct LocalWorktree { struct ShareState { snapshots_tx: Sender, - _subscriptions: Vec, } pub struct RemoteWorktree { + project_remote_id: u64, remote_id: u64, snapshot: Snapshot, snapshot_rx: watch::Receiver, @@ -988,12 +848,10 @@ pub struct RemoteWorktree { replica_id: ReplicaId, loading_buffers: LoadingBuffers, open_buffers: HashMap, - collaborators: HashMap, diagnostic_summaries: HashMap, DiagnosticSummary>, languages: Arc, user_store: ModelHandle, queued_operations: Vec<(u64, Operation)>, - _subscriptions: Vec, } type LoadingBuffers = HashMap< @@ -1061,48 +919,13 @@ impl LocalWorktree { ); } - let (mut remote_id_tx, remote_id_rx) = watch::channel(); - let _maintain_remote_id_task = cx.spawn_weak({ - let rpc = client.clone(); - move |this, cx| { - async move { - let mut status = rpc.status(); - while let Some(status) = status.recv().await { - if let Some(this) = this.upgrade(&cx) { - let remote_id = if let client::Status::Connected { .. } = status { - let authorized_logins = this.read_with(&cx, |this, _| { - this.as_local().unwrap().config.collaborators.clone() - }); - let response = rpc - .request(proto::OpenWorktree { - root_name: root_name.clone(), - authorized_logins, - }) - .await?; - - Some(response.worktree_id) - } else { - None - }; - if remote_id_tx.send(remote_id).await.is_err() { - break; - } - } - } - Ok(()) - } - .log_err() - } - }); - let tree = Self { snapshot: snapshot.clone(), config, - remote_id: remote_id_rx, + project_remote_id: None, background_snapshot: Arc::new(Mutex::new(snapshot)), last_scan_state_rx, _background_scanner_task: None, - _maintain_remote_id_task, share: None, poll_task: None, loading_buffers: Default::default(), @@ -1111,7 +934,6 @@ impl LocalWorktree { diagnostics: Default::default(), diagnostic_summaries: Default::default(), queued_operations: Default::default(), - collaborators: Default::default(), languages, client, user_store, @@ -1152,6 +974,10 @@ impl LocalWorktree { Ok((tree, scan_states_tx)) } + pub fn set_project_remote_id(&mut self, id: Option) { + self.project_remote_id = id; + } + pub fn languages(&self) -> &LanguageRegistry { &self.languages } @@ -1297,27 +1123,12 @@ impl LocalWorktree { Ok(()) } - pub fn add_collaborator( - &mut self, - collaborator: Collaborator, - cx: &mut ModelContext, - ) { - self.collaborators - .insert(collaborator.peer_id, collaborator); - cx.notify(); - } - pub fn remove_collaborator( &mut self, - envelope: TypedEnvelope, + peer_id: PeerId, + replica_id: ReplicaId, cx: &mut ModelContext, - ) -> Result<()> { - let peer_id = PeerId(envelope.payload.peer_id); - let replica_id = self - .collaborators - .remove(&peer_id) - .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? - .replica_id; + ) { self.shared_buffers.remove(&peer_id); for (_, buffer) in &self.open_buffers { if let Some(buffer) = buffer.upgrade(cx) { @@ -1325,8 +1136,6 @@ impl LocalWorktree { } } cx.notify(); - - Ok(()) } pub fn scan_complete(&self) -> impl Future { @@ -1339,22 +1148,6 @@ impl LocalWorktree { } } - pub fn remote_id(&self) -> Option { - *self.remote_id.borrow() - } - - pub fn next_remote_id(&self) -> impl Future> { - let mut remote_id = self.remote_id.clone(); - async move { - while let Some(remote_id) = remote_id.recv().await { - if remote_id.is_some() { - return remote_id; - } - } - None - } - } - fn is_scanning(&self) -> bool { if let ScanState::Scanning = *self.last_scan_state_rx.borrow() { true @@ -1456,31 +1249,28 @@ impl LocalWorktree { }) } - pub fn share(&mut self, cx: &mut ModelContext) -> Task> { + pub fn share(&mut self, cx: &mut ModelContext) -> Task> { let snapshot = self.snapshot(); - let share_request = self.share_request(cx); let rpc = self.client.clone(); + let project_id = self.project_remote_id; + let worktree_id = cx.model_id() as u64; cx.spawn(|this, mut cx| async move { - let share_request = if let Some(request) = share_request.await { - request - } else { - return Err(anyhow!("failed to open worktree on the server")); - }; + let project_id = project_id.ok_or_else(|| anyhow!("no project id"))?; - let remote_id = share_request.worktree.as_ref().unwrap().id; - let share_response = rpc.request(share_request).await?; - - log::info!("sharing worktree {:?}", share_response); let (snapshots_to_send_tx, snapshots_to_send_rx) = smol::channel::unbounded::(); - cx.background() .spawn({ let rpc = rpc.clone(); async move { let mut prev_snapshot = snapshot; while let Ok(snapshot) = snapshots_to_send_rx.recv().await { - let message = snapshot.build_update(&prev_snapshot, remote_id, false); + let message = snapshot.build_update( + &prev_snapshot, + project_id, + worktree_id, + false, + ); match rpc.send(message).await { Ok(()) => prev_snapshot = snapshot, Err(err) => log::error!("error sending snapshot diff {}", err), @@ -1491,64 +1281,32 @@ impl LocalWorktree { .detach(); this.update(&mut cx, |worktree, cx| { - let _subscriptions = vec![ - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_add_collaborator), - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_remove_collaborator), - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_open_buffer), - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_close_buffer), - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_update_buffer), - rpc.subscribe_to_entity(remote_id, cx, Worktree::handle_save_buffer), - ]; - let worktree = worktree.as_local_mut().unwrap(); worktree.share = Some(ShareState { snapshots_tx: snapshots_to_send_tx, - _subscriptions, }); }); - Ok(remote_id) + Ok(()) }) } - pub fn unshare(&mut self, cx: &mut ModelContext) { - self.share.take(); - let rpc = self.client.clone(); - let remote_id = self.remote_id(); - cx.foreground() - .spawn( - async move { - if let Some(worktree_id) = remote_id { - rpc.send(proto::UnshareWorktree { worktree_id }).await?; - } - Ok(()) - } - .log_err(), - ) - .detach() - } - - fn share_request(&self, cx: &mut ModelContext) -> Task> { - let remote_id = self.next_remote_id(); + fn to_proto(&self, cx: &mut ModelContext) -> impl Future { + let id = cx.model_id() as u64; let snapshot = self.snapshot(); let root_name = self.root_name.clone(); - cx.background().spawn(async move { - remote_id.await.map(|id| { - let entries = snapshot + async move { + proto::Worktree { + id, + root_name, + entries: snapshot .entries_by_path .cursor::<()>() .filter(|e| !e.is_ignored) .map(Into::into) - .collect(); - proto::ShareWorktree { - worktree: Some(proto::Worktree { - id, - root_name, - entries, - }), - } - }) - }) + .collect(), + } + } } } @@ -1617,6 +1375,7 @@ impl RemoteWorktree { ) -> Task>> { let rpc = self.client.clone(); let replica_id = self.replica_id; + let project_id = self.project_remote_id; let remote_worktree_id = self.remote_id; let root_path = self.snapshot.abs_path.clone(); let path: Arc = Arc::from(path); @@ -1629,6 +1388,7 @@ impl RemoteWorktree { .ok_or_else(|| anyhow!("file does not exist"))?; let response = rpc .request(proto::OpenBuffer { + project_id, worktree_id: remote_worktree_id as u64, path: path_string, }) @@ -1669,10 +1429,6 @@ impl RemoteWorktree { }) } - pub fn remote_id(&self) -> u64 { - self.remote_id - } - pub fn close_all_buffers(&mut self, cx: &mut MutableAppContext) { for (_, buffer) in self.open_buffers.drain() { if let RemoteBuffer::Loaded(buffer) = buffer { @@ -1703,34 +1459,18 @@ impl RemoteWorktree { Ok(()) } - pub fn add_collaborator( - &mut self, - collaborator: Collaborator, - cx: &mut ModelContext, - ) { - self.collaborators - .insert(collaborator.peer_id, collaborator); - cx.notify(); - } - pub fn remove_collaborator( &mut self, - envelope: TypedEnvelope, + peer_id: PeerId, + replica_id: ReplicaId, cx: &mut ModelContext, - ) -> Result<()> { - let peer_id = PeerId(envelope.payload.peer_id); - let replica_id = self - .collaborators - .remove(&peer_id) - .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? - .replica_id; + ) { for (_, buffer) in &self.open_buffers { if let Some(buffer) = buffer.upgrade(cx) { buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); } } cx.notify(); - Ok(()) } } @@ -1756,6 +1496,7 @@ impl Snapshot { pub fn build_update( &self, other: &Self, + project_id: u64, worktree_id: u64, include_ignored: bool, ) -> proto::UpdateWorktree { @@ -1809,9 +1550,10 @@ impl Snapshot { } proto::UpdateWorktree { + project_id, + worktree_id, updated_entries, removed_entries, - worktree_id, } } @@ -2168,15 +1910,17 @@ impl language::File for File { version: clock::Global, cx: &mut MutableAppContext, ) -> Task> { + let worktree_id = self.worktree.id() as u64; self.worktree.update(cx, |worktree, cx| match worktree { Worktree::Local(worktree) => { let rpc = worktree.client.clone(); - let worktree_id = *worktree.remote_id.borrow(); + let project_id = worktree.project_remote_id; let save = worktree.save(self.path.clone(), text, cx); cx.background().spawn(async move { let entry = save.await?; - if let Some(worktree_id) = worktree_id { + if let Some(project_id) = project_id { rpc.send(proto::BufferSaved { + project_id, worktree_id, buffer_id, version: (&version).into(), @@ -2189,10 +1933,11 @@ impl language::File for File { } Worktree::Remote(worktree) => { let rpc = worktree.client.clone(); - let worktree_id = worktree.remote_id; + let project_id = worktree.project_remote_id; cx.foreground().spawn(async move { let response = rpc .request(proto::SaveBuffer { + project_id, worktree_id, buffer_id, }) @@ -2225,14 +1970,16 @@ impl language::File for File { } fn buffer_removed(&self, buffer_id: u64, cx: &mut MutableAppContext) { + let worktree_id = self.worktree.id() as u64; self.worktree.update(cx, |worktree, cx| { if let Worktree::Remote(worktree) = worktree { - let worktree_id = worktree.remote_id; + let project_id = worktree.project_remote_id; let rpc = worktree.client.clone(); cx.background() .spawn(async move { if let Err(error) = rpc .send(proto::CloseBuffer { + project_id, worktree_id, buffer_id, }) @@ -3370,9 +3117,7 @@ mod tests { // Create a remote copy of this worktree. let initial_snapshot = tree.read_with(&cx, |tree, _| tree.snapshot()); let worktree_id = 1; - let share_request = tree.update(&mut cx, |tree, cx| { - tree.as_local().unwrap().share_request(cx) - }); + let proto_message = tree.update(&mut cx, |tree, cx| tree.as_local().unwrap().to_proto(cx)); let open_worktree = server.receive::().await.unwrap(); server .respond( @@ -3383,7 +3128,7 @@ mod tests { let remote = Worktree::remote( proto::JoinWorktreeResponse { - worktree: share_request.await.unwrap().worktree, + worktree: Some(proto_message.await), replica_id: 1, collaborators: Vec::new(), }, diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 34c8043795..53f1226e72 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -9,38 +9,46 @@ message Envelope { Ack ack = 4; Error error = 5; Ping ping = 6; - ShareWorktree share_worktree = 7; - ShareWorktreeResponse share_worktree_response = 8; - JoinWorktree join_worktree = 9; - JoinWorktreeResponse join_worktree_response = 10; - UpdateWorktree update_worktree = 11; - CloseWorktree close_worktree = 12; - OpenBuffer open_buffer = 13; - OpenBufferResponse open_buffer_response = 14; - CloseBuffer close_buffer = 15; - UpdateBuffer update_buffer = 16; - SaveBuffer save_buffer = 17; - BufferSaved buffer_saved = 18; - AddCollaborator add_collaborator = 19; - RemoveCollaborator remove_collaborator = 20; - GetChannels get_channels = 21; - GetChannelsResponse get_channels_response = 22; - GetUsers get_users = 23; - GetUsersResponse get_users_response = 24; - JoinChannel join_channel = 25; - JoinChannelResponse join_channel_response = 26; - LeaveChannel leave_channel = 27; - SendChannelMessage send_channel_message = 28; - SendChannelMessageResponse send_channel_message_response = 29; - ChannelMessageSent channel_message_sent = 30; - GetChannelMessages get_channel_messages = 31; - GetChannelMessagesResponse get_channel_messages_response = 32; - OpenWorktree open_worktree = 33; - OpenWorktreeResponse open_worktree_response = 34; - UnshareWorktree unshare_worktree = 35; - UpdateContacts update_contacts = 36; - LeaveWorktree leave_worktree = 37; - UpdateDiagnosticSummary update_diagnostic_summary = 38; + + RegisterProject register_project = 7; + RegisterProjectResponse register_project_response = 8; + UnregisterProject unregister_project = 9; + ShareProject share_project = 10; + UnshareProject unshare_project = 11; + JoinProject join_project = 12; + JoinProjectResponse join_project_response = 13; + LeaveProject leave_project = 14; + AddProjectCollaborator add_project_collaborator = 15; + RemoveProjectCollaborator remove_project_collaborator = 16; + + RegisterWorktree register_worktree = 17; + UnregisterWorktree unregister_worktree = 18; + ShareWorktree share_worktree = 100; + UpdateWorktree update_worktree = 19; + UpdateDiagnosticSummary update_diagnostic_summary = 20; + + OpenBuffer open_buffer = 22; + OpenBufferResponse open_buffer_response = 23; + CloseBuffer close_buffer = 24; + UpdateBuffer update_buffer = 25; + SaveBuffer save_buffer = 26; + BufferSaved buffer_saved = 27; + + GetChannels get_channels = 28; + GetChannelsResponse get_channels_response = 29; + JoinChannel join_channel = 30; + JoinChannelResponse join_channel_response = 31; + LeaveChannel leave_channel = 32; + SendChannelMessage send_channel_message = 33; + SendChannelMessageResponse send_channel_message_response = 34; + ChannelMessageSent channel_message_sent = 35; + GetChannelMessages get_channel_messages = 36; + GetChannelMessagesResponse get_channel_messages_response = 37; + + UpdateContacts update_contacts = 38; + + GetUsers get_users = 39; + GetUsersResponse get_users_response = 40; } } @@ -54,62 +62,76 @@ message Error { string message = 1; } -message OpenWorktree { - string root_name = 1; - repeated string authorized_logins = 2; +message RegisterProject {} + +message RegisterProjectResponse { + uint64 project_id = 1; } -message OpenWorktreeResponse { - uint64 worktree_id = 1; +message UnregisterProject { + uint64 project_id = 1; } -message ShareWorktree { - Worktree worktree = 1; +message ShareProject { + uint64 project_id = 1; } -message ShareWorktreeResponse {} - -message UnshareWorktree { - uint64 worktree_id = 1; +message UnshareProject { + uint64 project_id = 1; } -message JoinWorktree { - uint64 worktree_id = 1; +message JoinProject { + uint64 project_id = 1; } -message LeaveWorktree { - uint64 worktree_id = 1; -} - -message JoinWorktreeResponse { - Worktree worktree = 2; - uint32 replica_id = 3; +message JoinProjectResponse { + uint32 replica_id = 2; + repeated Worktree worktrees = 3; repeated Collaborator collaborators = 4; } +message LeaveProject { + uint64 project_id = 1; +} + +message RegisterWorktree { + uint64 project_id = 1; + uint64 worktree_id = 2; + string root_name = 3; + repeated string authorized_logins = 4; +} + +message UnregisterWorktree { + uint64 project_id = 1; + uint64 worktree_id = 2; +} + +message ShareWorktree { + uint64 project_id = 1; + Worktree worktree = 2; +} + message UpdateWorktree { - uint64 worktree_id = 1; - repeated Entry updated_entries = 2; - repeated uint64 removed_entries = 3; + uint64 project_id = 1; + uint64 worktree_id = 2; + repeated Entry updated_entries = 3; + repeated uint64 removed_entries = 4; } -message CloseWorktree { - uint64 worktree_id = 1; -} - -message AddCollaborator { - uint64 worktree_id = 1; +message AddProjectCollaborator { + uint64 project_id = 1; Collaborator collaborator = 2; } -message RemoveCollaborator { - uint64 worktree_id = 1; +message RemoveProjectCollaborator { + uint64 project_id = 1; uint32 peer_id = 2; } message OpenBuffer { - uint64 worktree_id = 1; - string path = 2; + uint64 project_id = 1; + uint64 worktree_id = 2; + string path = 3; } message OpenBufferResponse { @@ -117,33 +139,38 @@ message OpenBufferResponse { } message CloseBuffer { - uint64 worktree_id = 1; - uint64 buffer_id = 2; + uint64 project_id = 1; + uint64 worktree_id = 2; + uint64 buffer_id = 3; } message UpdateBuffer { - uint64 worktree_id = 1; - uint64 buffer_id = 2; - repeated Operation operations = 3; + uint64 project_id = 1; + uint64 worktree_id = 2; + uint64 buffer_id = 3; + repeated Operation operations = 4; } message SaveBuffer { - uint64 worktree_id = 1; - uint64 buffer_id = 2; + uint64 project_id = 1; + uint64 worktree_id = 2; + uint64 buffer_id = 3; } message BufferSaved { - uint64 worktree_id = 1; - uint64 buffer_id = 2; - repeated VectorClockEntry version = 3; - Timestamp mtime = 4; + uint64 project_id = 1; + uint64 worktree_id = 2; + uint64 buffer_id = 3; + repeated VectorClockEntry version = 4; + Timestamp mtime = 5; } message UpdateDiagnosticSummary { - uint64 worktree_id = 1; - string path = 2; - uint32 error_count = 3; - uint32 warning_count = 4; + uint64 project_id = 1; + uint64 worktree_id = 2; + string path = 3; + uint32 error_count = 4; + uint32 warning_count = 5; } message GetChannels {} @@ -368,12 +395,12 @@ message ChannelMessage { message Contact { uint64 user_id = 1; - repeated WorktreeMetadata worktrees = 2; + repeated ProjectMetadata projects = 2; } -message WorktreeMetadata { +message ProjectMetadata { uint64 id = 1; - string root_name = 2; - bool is_shared = 3; + bool is_shared = 2; + repeated string worktree_root_names = 3; repeated uint64 guests = 4; } diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index d2f2cb2c41..bd5d1c384f 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -388,6 +388,7 @@ mod tests { .request( client1_conn_id, proto::OpenBuffer { + project_id: 0, worktree_id: 1, path: "path/one".to_string(), }, @@ -410,6 +411,7 @@ mod tests { .request( client2_conn_id, proto::OpenBuffer { + project_id: 0, worktree_id: 2, path: "path/two".to_string(), }, diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index bfdce85b77..8eb62d8ce0 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -121,68 +121,70 @@ macro_rules! entity_messages { messages!( Ack, - AddCollaborator, + AddProjectCollaborator, BufferSaved, ChannelMessageSent, CloseBuffer, - CloseWorktree, Error, GetChannelMessages, GetChannelMessagesResponse, GetChannels, GetChannelsResponse, - UpdateContacts, GetUsers, GetUsersResponse, JoinChannel, JoinChannelResponse, - JoinWorktree, - JoinWorktreeResponse, + JoinProject, + JoinProjectResponse, LeaveChannel, - LeaveWorktree, + LeaveProject, OpenBuffer, OpenBufferResponse, - OpenWorktree, - OpenWorktreeResponse, + RegisterProjectResponse, Ping, - RemoveCollaborator, + RegisterProject, + RegisterWorktree, + RemoveProjectCollaborator, SaveBuffer, SendChannelMessage, SendChannelMessageResponse, + ShareProject, ShareWorktree, - ShareWorktreeResponse, - UnshareWorktree, + UnregisterProject, + UnregisterWorktree, UpdateBuffer, + UpdateContacts, UpdateWorktree, ); request_messages!( + (GetChannelMessages, GetChannelMessagesResponse), (GetChannels, GetChannelsResponse), (GetUsers, GetUsersResponse), (JoinChannel, JoinChannelResponse), + (JoinProject, JoinProjectResponse), (OpenBuffer, OpenBufferResponse), - (JoinWorktree, JoinWorktreeResponse), - (OpenWorktree, OpenWorktreeResponse), (Ping, Ack), + (RegisterProject, RegisterProjectResponse), + (RegisterWorktree, Ack), (SaveBuffer, BufferSaved), - (UpdateBuffer, Ack), - (ShareWorktree, ShareWorktreeResponse), - (UnshareWorktree, Ack), (SendChannelMessage, SendChannelMessageResponse), - (GetChannelMessages, GetChannelMessagesResponse), + (ShareWorktree, Ack), + (UpdateBuffer, Ack), ); entity_messages!( - worktree_id, - AddCollaborator, + project_id, + AddProjectCollaborator, + RemoveProjectCollaborator, + JoinProject, + LeaveProject, BufferSaved, - CloseBuffer, - CloseWorktree, OpenBuffer, - JoinWorktree, - RemoveCollaborator, + CloseBuffer, SaveBuffer, - UnshareWorktree, + RegisterWorktree, + UnregisterWorktree, UpdateBuffer, UpdateWorktree, ); diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 264d684beb..b0b07e7bae 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -60,8 +60,8 @@ impl Server { server .add_handler(Server::ping) - .add_handler(Server::open_worktree) - .add_handler(Server::close_worktree) + .add_handler(Server::register_worktree) + .add_handler(Server::unregister_worktree) .add_handler(Server::share_worktree) .add_handler(Server::unshare_worktree) .add_handler(Server::join_worktree) @@ -169,26 +169,26 @@ impl Server { self.peer.disconnect(connection_id).await; let removed_connection = self.state_mut().remove_connection(connection_id)?; - for (worktree_id, worktree) in removed_connection.hosted_worktrees { - if let Some(share) = worktree.share { + for (project_id, project) in removed_connection.hosted_projects { + if let Some(share) = project.share { broadcast( connection_id, share.guests.keys().copied().collect(), |conn_id| { self.peer - .send(conn_id, proto::UnshareWorktree { worktree_id }) + .send(conn_id, proto::UnshareProject { project_id }) }, ) .await?; } } - for (worktree_id, peer_ids) in removed_connection.guest_worktree_ids { + for (project_id, peer_ids) in removed_connection.guest_project_ids { broadcast(connection_id, peer_ids, |conn_id| { self.peer.send( conn_id, - proto::RemoveCollaborator { - worktree_id, + proto::RemoveProjectCollaborator { + project_id, peer_id: connection_id.0, }, ) @@ -207,9 +207,9 @@ impl Server { Ok(()) } - async fn open_worktree( + async fn register_worktree( mut self: Arc, - request: TypedEnvelope, + request: TypedEnvelope, ) -> tide::Result<()> { let receipt = request.receipt(); let host_user_id = self.state().user_id_for_connection(request.sender_id)?; @@ -232,38 +232,54 @@ impl Server { } let contact_user_ids = contact_user_ids.into_iter().collect::>(); - let worktree_id = self.state_mut().add_worktree(Worktree { - host_connection_id: request.sender_id, - host_user_id, - authorized_user_ids: contact_user_ids.clone(), - root_name: request.payload.root_name, - share: None, - }); + let ok = self.state_mut().register_worktree( + request.project_id, + request.worktree_id, + Worktree { + authorized_user_ids: contact_user_ids.clone(), + root_name: request.payload.root_name, + }, + ); - self.peer - .respond(receipt, proto::OpenWorktreeResponse { worktree_id }) - .await?; - self.update_contacts_for_users(&contact_user_ids).await?; + if ok { + self.peer.respond(receipt, proto::Ack {}).await?; + self.update_contacts_for_users(&contact_user_ids).await?; + } else { + self.peer + .respond_with_error( + receipt, + proto::Error { + message: "no such project".to_string(), + }, + ) + .await?; + } Ok(()) } - async fn close_worktree( + async fn unregister_worktree( mut self: Arc, - request: TypedEnvelope, + request: TypedEnvelope, ) -> tide::Result<()> { + let project_id = request.payload.project_id; let worktree_id = request.payload.worktree_id; - let worktree = self - .state_mut() - .remove_worktree(worktree_id, request.sender_id)?; + let worktree = + self.state_mut() + .unregister_worktree(project_id, worktree_id, request.sender_id)?; if let Some(share) = worktree.share { broadcast( request.sender_id, share.guests.keys().copied().collect(), |conn_id| { - self.peer - .send(conn_id, proto::UnshareWorktree { worktree_id }) + self.peer.send( + conn_id, + proto::UnregisterWorktree { + project_id, + worktree_id, + }, + ) }, ) .await?; diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index 2062683b7c..5b0b6d9554 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -8,29 +8,38 @@ use std::collections::hash_map; pub struct Store { connections: HashMap, connections_by_user_id: HashMap>, - worktrees: HashMap, - visible_worktrees_by_user_id: HashMap>, + projects: HashMap, + visible_projects_by_user_id: HashMap>, channels: HashMap, next_worktree_id: u64, } struct ConnectionState { user_id: UserId, - worktrees: HashSet, + projects: HashSet, channels: HashSet, } -pub struct Worktree { +pub struct Project { pub host_connection_id: ConnectionId, pub host_user_id: UserId, + pub share: Option, + worktrees: HashMap, +} + +pub struct Worktree { pub authorized_user_ids: Vec, pub root_name: String, - pub share: Option, +} + +#[derive(Default)] +pub struct ProjectShare { + pub guests: HashMap, + pub active_replica_ids: HashSet, + pub worktrees: HashMap, } pub struct WorktreeShare { - pub guests: HashMap, - pub active_replica_ids: HashSet, pub entries: HashMap, } @@ -43,8 +52,8 @@ pub type ReplicaId = u16; #[derive(Default)] pub struct RemovedConnectionState { - pub hosted_worktrees: HashMap, - pub guest_worktree_ids: HashMap>, + pub hosted_projects: HashMap, + pub guest_project_ids: HashMap>, pub contact_ids: HashSet, } @@ -69,7 +78,7 @@ impl Store { connection_id, ConnectionState { user_id, - worktrees: Default::default(), + projects: Default::default(), channels: Default::default(), }, ); @@ -106,7 +115,7 @@ impl Store { let mut result = RemovedConnectionState::default(); for worktree_id in connection.worktrees.clone() { - if let Ok(worktree) = self.remove_worktree(worktree_id, connection_id) { + if let Ok(worktree) = self.unregister_worktree(worktree_id, connection_id) { result .contact_ids .extend(worktree.authorized_user_ids.iter().copied()); @@ -174,12 +183,12 @@ impl Store { pub fn contacts_for_user(&self, user_id: UserId) -> Vec { let mut contacts = HashMap::default(); - for worktree_id in self - .visible_worktrees_by_user_id + for project_id in self + .visible_projects_by_user_id .get(&user_id) .unwrap_or(&HashSet::default()) { - let worktree = &self.worktrees[worktree_id]; + let project = &self.projects[project_id]; let mut guests = HashSet::default(); if let Ok(share) = worktree.share() { @@ -190,18 +199,22 @@ impl Store { } } - if let Ok(host_user_id) = self.user_id_for_connection(worktree.host_connection_id) { + if let Ok(host_user_id) = self.user_id_for_connection(project.host_connection_id) { contacts .entry(host_user_id) .or_insert_with(|| proto::Contact { user_id: host_user_id.to_proto(), - worktrees: Vec::new(), + projects: Vec::new(), }) - .worktrees - .push(proto::WorktreeMetadata { - id: *worktree_id, - root_name: worktree.root_name.clone(), - is_shared: worktree.share.is_some(), + .projects + .push(proto::ProjectMetadata { + id: *project_id, + worktree_root_names: project + .worktrees + .iter() + .map(|worktree| worktree.root_name.clone()) + .collect(), + is_shared: project.share.is_some(), guests: guests.into_iter().collect(), }); } @@ -210,41 +223,75 @@ impl Store { contacts.into_values().collect() } - pub fn add_worktree(&mut self, worktree: Worktree) -> u64 { - let worktree_id = self.next_worktree_id; - for authorized_user_id in &worktree.authorized_user_ids { - self.visible_worktrees_by_user_id - .entry(*authorized_user_id) - .or_default() - .insert(worktree_id); - } - self.next_worktree_id += 1; - if let Some(connection) = self.connections.get_mut(&worktree.host_connection_id) { - connection.worktrees.insert(worktree_id); - } - self.worktrees.insert(worktree_id, worktree); - - #[cfg(test)] - self.check_invariants(); - - worktree_id + pub fn register_project( + &mut self, + host_connection_id: ConnectionId, + host_user_id: UserId, + ) -> u64 { + let project_id = self.next_project_id; + self.projects.insert( + project_id, + Project { + host_connection_id, + host_user_id, + share: None, + worktrees: Default::default(), + }, + ); + self.next_project_id += 1; + project_id } - pub fn remove_worktree( + pub fn register_worktree( &mut self, + project_id: u64, + worktree_id: u64, + worktree: Worktree, + ) -> bool { + if let Some(project) = self.projects.get_mut(&project_id) { + for authorized_user_id in &worktree.authorized_user_ids { + self.visible_projects_by_user_id + .entry(*authorized_user_id) + .or_default() + .insert(project_id); + } + if let Some(connection) = self.connections.get_mut(&project.host_connection_id) { + connection.projects.insert(project_id); + } + project.worktrees.insert(worktree_id, worktree); + + #[cfg(test)] + self.check_invariants(); + true + } else { + false + } + } + + pub fn unregister_project(&mut self, project_id: u64) { + todo!() + } + + pub fn unregister_worktree( + &mut self, + project_id: u64, worktree_id: u64, acting_connection_id: ConnectionId, ) -> tide::Result { - let worktree = if let hash_map::Entry::Occupied(e) = self.worktrees.entry(worktree_id) { - if e.get().host_connection_id != acting_connection_id { - Err(anyhow!("not your worktree"))?; - } - e.remove() - } else { - return Err(anyhow!("no such worktree"))?; - }; + let project = self + .projects + .get_mut(&project_id) + .ok_or_else(|| anyhow!("no such project"))?; + if project.host_connection_id != acting_connection_id { + Err(anyhow!("not your worktree"))?; + } - if let Some(connection) = self.connections.get_mut(&worktree.host_connection_id) { + let worktree = project + .worktrees + .remove(&worktree_id) + .ok_or_else(|| anyhow!("no such worktree"))?; + + if let Some(connection) = self.connections.get_mut(&project.host_connection_id) { connection.worktrees.remove(&worktree_id); } @@ -271,20 +318,31 @@ impl Store { Ok(worktree) } + pub fn share_project(&mut self, project_id: u64, connection_id: ConnectionId) -> bool { + if let Some(project) = self.projects.get_mut(&project_id) { + if project.host_connection_id == connection_id { + project.share = Some(ProjectShare::default()); + return true; + } + } + false + } + pub fn share_worktree( &mut self, + project_id: u64, worktree_id: u64, connection_id: ConnectionId, entries: HashMap, ) -> Option> { - if let Some(worktree) = self.worktrees.get_mut(&worktree_id) { - if worktree.host_connection_id == connection_id { - worktree.share = Some(WorktreeShare { - guests: Default::default(), - active_replica_ids: Default::default(), - entries, - }); - return Some(worktree.authorized_user_ids.clone()); + if let Some(project) = self.projects.get_mut(&project_id) { + if project.host_connection_id == connection_id { + if let Some(share) = project.share.as_mut() { + share + .worktrees + .insert(worktree_id, WorktreeShare { entries }); + return Some(project.authorized_user_ids()); + } } } None @@ -586,14 +644,14 @@ impl Worktree { } } - pub fn share(&self) -> tide::Result<&WorktreeShare> { + pub fn share(&self) -> tide::Result<&ProjectShare> { Ok(self .share .as_ref() .ok_or_else(|| anyhow!("worktree is not shared"))?) } - fn share_mut(&mut self) -> tide::Result<&mut WorktreeShare> { + fn share_mut(&mut self) -> tide::Result<&mut ProjectShare> { Ok(self .share .as_mut()