diff --git a/crates/client/src/channel.rs b/crates/client/src/channel.rs index f89f578247..b508e36016 100644 --- a/crates/client/src/channel.rs +++ b/crates/client/src/channel.rs @@ -398,29 +398,23 @@ impl Channel { cursor } - fn handle_message_sent( - &mut self, + async fn handle_message_sent( + this: ModelHandle, message: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let user_store = self.user_store.clone(); + let user_store = this.read_with(&cx, |this, _| this.user_store.clone()); let message = message .payload .message .ok_or_else(|| anyhow!("empty message"))?; - cx.spawn(|this, mut cx| { - async move { - let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?; - this.update(&mut cx, |this, cx| { - this.insert_messages(SumTree::from_item(message, &()), cx) - }); - Ok(()) - } - .log_err() - }) - .detach(); + let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| { + this.insert_messages(SumTree::from_item(message, &()), cx) + }); + Ok(()) } diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 103471c6f3..cf7e9df085 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -11,8 +11,8 @@ use async_tungstenite::tungstenite::{ error::Error as WebsocketError, http::{Request, StatusCode}, }; -use futures::StreamExt; -use gpui::{action, AsyncAppContext, Entity, ModelContext, MutableAppContext, Task}; +use futures::{future::LocalBoxFuture, FutureExt, StreamExt}; +use gpui::{action, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task}; use http::HttpClient; use lazy_static::lazy_static; use parking_lot::RwLock; @@ -20,10 +20,11 @@ use postage::watch; use rand::prelude::*; use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage}; use std::{ - any::TypeId, + any::{type_name, TypeId}, collections::HashMap, convert::TryFrom, fmt::Write as _, + future::Future, sync::{ atomic::{AtomicUsize, Ordering}, Arc, Weak, @@ -123,14 +124,17 @@ pub enum Status { ReconnectionError { next_reconnection: Instant }, } +type ModelHandler = Box< + dyn Send + + Sync + + FnMut(Box, &AsyncAppContext) -> LocalBoxFuture<'static, Result<()>>, +>; + struct ClientState { credentials: Option, status: (watch::Sender, watch::Receiver), entity_id_extractors: HashMap u64>>, - model_handlers: HashMap< - (TypeId, Option), - Option, &mut AsyncAppContext)>>, - >, + model_handlers: HashMap<(TypeId, Option), Option>, _maintain_connection: Option>, heartbeat_interval: Duration, } @@ -262,7 +266,7 @@ impl Client { } } - pub fn subscribe( + pub fn subscribe( self: &Arc, cx: &mut ModelContext, mut handler: F, @@ -273,7 +277,8 @@ impl Client { F: 'static + Send + Sync - + FnMut(&mut M, TypedEnvelope, Arc, &mut ModelContext) -> Result<()>, + + FnMut(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> Fut, + Fut: 'static + Future>, { let subscription_id = (TypeId::of::(), None); let client = self.clone(); @@ -284,11 +289,15 @@ impl Client { Some(Box::new(move |envelope, cx| { if let Some(model) = model.upgrade(cx) { let envelope = envelope.into_any().downcast::>().unwrap(); - model.update(cx, |model, cx| { - if let Err(error) = handler(model, *envelope, client.clone(), cx) { - log::error!("error handling message: {}", error) - } - }); + handler(model, *envelope, client.clone(), cx.clone()).boxed_local() + } else { + async move { + Err(anyhow!( + "received message for {:?} but model was dropped", + type_name::() + )) + } + .boxed_local() } })), ); @@ -302,7 +311,7 @@ impl Client { } } - pub fn subscribe_to_entity( + pub fn subscribe_to_entity( self: &Arc, remote_id: u64, cx: &mut ModelContext, @@ -314,7 +323,8 @@ impl Client { F: 'static + Send + Sync - + FnMut(&mut M, TypedEnvelope, Arc, &mut ModelContext) -> Result<()>, + + FnMut(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> Fut, + Fut: 'static + Future>, { let subscription_id = (TypeId::of::(), Some(remote_id)); let client = self.clone(); @@ -337,11 +347,15 @@ impl Client { Some(Box::new(move |envelope, cx| { if let Some(model) = model.upgrade(cx) { let envelope = envelope.into_any().downcast::>().unwrap(); - model.update(cx, |model, cx| { - if let Err(error) = handler(model, *envelope, client.clone(), cx) { - log::error!("error handling message: {}", error) - } - }); + handler(model, *envelope, client.clone(), cx.clone()).boxed_local() + } else { + async move { + Err(anyhow!( + "received message for {:?} but model was dropped", + type_name::() + )) + } + .boxed_local() } })), ); @@ -355,6 +369,44 @@ impl Client { } } + pub fn subscribe_to_entity_request( + self: &Arc, + remote_id: u64, + cx: &mut ModelContext, + mut handler: F, + ) -> Subscription + where + T: EntityMessage + RequestMessage, + M: Entity, + F: 'static + + Send + + Sync + + FnMut(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> Fut, + Fut: 'static + Future>, + { + self.subscribe_to_entity(remote_id, cx, move |model, envelope, client, cx| { + let receipt = envelope.receipt(); + let response = handler(model, envelope, client.clone(), cx); + async move { + match response.await { + Ok(response) => { + client.respond(receipt, response)?; + Ok(()) + } + Err(error) => { + client.respond_with_error( + receipt, + proto::Error { + message: error.to_string(), + }, + )?; + Err(error) + } + } + } + }) + } + pub fn has_keychain_credentials(&self, cx: &AsyncAppContext) -> bool { read_credentials_from_keychain(cx).is_some() } @@ -442,7 +494,7 @@ impl Client { let (connection_id, handle_io, mut incoming) = self.peer.add_connection(conn).await; cx.foreground() .spawn({ - let mut cx = cx.clone(); + let cx = cx.clone(); let this = self.clone(); async move { while let Some(message) = incoming.next().await { @@ -468,12 +520,28 @@ impl Client { this.id, type_name ); - (handler)(message, &mut cx); - log::debug!( - "rpc message handled. client_id:{}, name:{}", - this.id, - type_name - ); + + let future = (handler)(message, &cx); + let client_id = this.id; + cx.foreground() + .spawn(async move { + match future.await { + Ok(()) => { + log::debug!( + "rpc message handled. client_id:{}, name:{}", + client_id, + type_name + ); + } + Err(error) => { + log::error!( + "error handling rpc message. client_id:{}, name:{}, error: {}", + client_id, type_name, error + ); + } + } + }) + .detach(); let mut state = this.state.write(); if state.model_handlers.contains_key(&handler_key) { @@ -715,16 +783,12 @@ impl Client { response } - pub fn respond( - &self, - receipt: Receipt, - response: T::Response, - ) -> Result<()> { + fn respond(&self, receipt: Receipt, response: T::Response) -> Result<()> { log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME); self.peer.respond(receipt, response) } - pub fn respond_with_error( + fn respond_with_error( &self, receipt: Receipt, error: proto::Error, @@ -866,7 +930,7 @@ mod tests { cx, move |_, _: TypedEnvelope, _, _| { postage::sink::Sink::try_send(&mut done_tx1, ()).unwrap(); - Ok(()) + async { Ok(()) } }, ) }); @@ -876,7 +940,7 @@ mod tests { cx, move |_, _: TypedEnvelope, _, _| { postage::sink::Sink::try_send(&mut done_tx2, ()).unwrap(); - Ok(()) + async { Ok(()) } }, ) }); @@ -887,7 +951,7 @@ mod tests { client.subscribe_to_entity( 3, cx, - move |_, _: TypedEnvelope, _, _| Ok(()), + |_, _: TypedEnvelope, _, _| async { Ok(()) }, ) }); drop(subscription3); @@ -912,14 +976,14 @@ mod tests { let subscription1 = model.update(&mut cx, |_, cx| { client.subscribe(cx, move |_, _: TypedEnvelope, _, _| { postage::sink::Sink::try_send(&mut done_tx1, ()).unwrap(); - Ok(()) + async { Ok(()) } }) }); drop(subscription1); let _subscription2 = model.update(&mut cx, |_, cx| { client.subscribe(cx, move |_, _: TypedEnvelope, _, _| { postage::sink::Sink::try_send(&mut done_tx2, ()).unwrap(); - Ok(()) + async { Ok(()) } }) }); server.send(proto::Ping {}); @@ -939,10 +1003,10 @@ mod tests { model.update(&mut cx, |model, cx| { model.subscription = Some(client.subscribe( cx, - move |model, _: TypedEnvelope, _, _| { - model.subscription.take(); + move |model, _: TypedEnvelope, _, mut cx| { + model.update(&mut cx, |model, _| model.subscription.take()); postage::sink::Sink::try_send(&mut done_tx, ()).unwrap(); - Ok(()) + async { Ok(()) } }, )); }); diff --git a/crates/client/src/user.rs b/crates/client/src/user.rs index 26be77bf2d..10388e00bf 100644 --- a/crates/client/src/user.rs +++ b/crates/client/src/user.rs @@ -60,9 +60,9 @@ impl UserStore { watch::channel::>(); let update_contacts_subscription = client.subscribe( cx, - move |_: &mut Self, msg: TypedEnvelope, _, _| { - let _ = update_contacts_tx.blocking_send(Some(msg.payload)); - Ok(()) + move |_: ModelHandle, msg: TypedEnvelope, _, _| { + *update_contacts_tx.borrow_mut() = Some(msg.payload); + async move { Ok(()) } }, ); Self { diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 655936401d..29478d2e2c 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -340,24 +340,24 @@ impl Project { if let Some(remote_id) = remote_id { let client = &self.client; self.subscriptions.extend([ - client.subscribe_to_entity(remote_id, cx, Self::handle_open_buffer), + client.subscribe_to_entity_request(remote_id, cx, Self::handle_open_buffer), client.subscribe_to_entity(remote_id, cx, Self::handle_close_buffer), client.subscribe_to_entity(remote_id, cx, Self::handle_add_collaborator), client.subscribe_to_entity(remote_id, cx, Self::handle_remove_collaborator), client.subscribe_to_entity(remote_id, cx, Self::handle_update_worktree), client.subscribe_to_entity(remote_id, cx, Self::handle_update_buffer), - client.subscribe_to_entity(remote_id, cx, Self::handle_save_buffer), + client.subscribe_to_entity_request(remote_id, cx, Self::handle_save_buffer), client.subscribe_to_entity(remote_id, cx, Self::handle_buffer_saved), - client.subscribe_to_entity(remote_id, cx, Self::handle_format_buffers), - client.subscribe_to_entity(remote_id, cx, Self::handle_get_completions), - client.subscribe_to_entity( + client.subscribe_to_entity_request(remote_id, cx, Self::handle_format_buffers), + client.subscribe_to_entity_request(remote_id, cx, Self::handle_get_completions), + client.subscribe_to_entity_request( remote_id, cx, Self::handle_apply_additional_edits_for_completion, ), - client.subscribe_to_entity(remote_id, cx, Self::handle_get_code_actions), - client.subscribe_to_entity(remote_id, cx, Self::handle_apply_code_action), - client.subscribe_to_entity(remote_id, cx, Self::handle_get_definition), + client.subscribe_to_entity_request(remote_id, cx, Self::handle_get_code_actions), + client.subscribe_to_entity_request(remote_id, cx, Self::handle_apply_code_action), + client.subscribe_to_entity_request(remote_id, cx, Self::handle_get_definition), ]); } } @@ -1996,580 +1996,477 @@ impl Project { // RPC message handlers - fn handle_unshare_project( - &mut self, + async fn handle_unshare_project( + this: ModelHandle, _: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - if let ProjectClientState::Remote { - sharing_has_stopped, - .. - } = &mut self.client_state - { - *sharing_has_stopped = true; - self.collaborators.clear(); - cx.notify(); - Ok(()) - } else { - unreachable!() - } + this.update(&mut cx, |this, cx| { + if let ProjectClientState::Remote { + sharing_has_stopped, + .. + } = &mut this.client_state + { + *sharing_has_stopped = true; + this.collaborators.clear(); + cx.notify(); + } else { + unreachable!() + } + }); + + Ok(()) } - fn handle_add_collaborator( - &mut self, + async fn handle_add_collaborator( + this: ModelHandle, mut envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let user_store = self.user_store.clone(); + let user_store = this.read_with(&cx, |this, _| this.user_store.clone()); let collaborator = envelope .payload .collaborator .take() .ok_or_else(|| anyhow!("empty collaborator"))?; - cx.spawn(|this, mut cx| { - async move { - let collaborator = - Collaborator::from_proto(collaborator, &user_store, &mut cx).await?; - this.update(&mut cx, |this, cx| { - this.collaborators - .insert(collaborator.peer_id, collaborator); - cx.notify(); - }); - Ok(()) - } - .log_err() - }) - .detach(); + let collaborator = Collaborator::from_proto(collaborator, &user_store, &mut cx).await?; + this.update(&mut cx, |this, cx| { + this.collaborators + .insert(collaborator.peer_id, collaborator); + cx.notify(); + }); Ok(()) } - fn handle_remove_collaborator( - &mut self, + async fn handle_remove_collaborator( + this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let peer_id = PeerId(envelope.payload.peer_id); - let replica_id = self - .collaborators - .remove(&peer_id) - .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? - .replica_id; - self.shared_buffers.remove(&peer_id); - for (_, buffer) in &self.open_buffers { - if let Some(buffer) = buffer.upgrade(cx) { - buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); + this.update(&mut cx, |this, cx| { + let peer_id = PeerId(envelope.payload.peer_id); + let replica_id = this + .collaborators + .remove(&peer_id) + .ok_or_else(|| anyhow!("unknown peer {:?}", peer_id))? + .replica_id; + this.shared_buffers.remove(&peer_id); + for (_, buffer) in &this.open_buffers { + if let Some(buffer) = buffer.upgrade(cx) { + buffer.update(cx, |buffer, cx| buffer.remove_peer(replica_id, cx)); + } } - } - cx.notify(); - Ok(()) + cx.notify(); + Ok(()) + }) } - fn handle_share_worktree( - &mut self, + async fn handle_share_worktree( + this: ModelHandle, envelope: TypedEnvelope, client: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let remote_id = self.remote_id().ok_or_else(|| anyhow!("invalid project"))?; - let replica_id = self.replica_id(); - let worktree = envelope - .payload - .worktree - .ok_or_else(|| anyhow!("invalid worktree"))?; - let (worktree, load_task) = Worktree::remote(remote_id, replica_id, worktree, client, cx); - self.add_worktree(&worktree, cx); - load_task.detach(); - Ok(()) + this.update(&mut cx, |this, cx| { + let remote_id = this.remote_id().ok_or_else(|| anyhow!("invalid project"))?; + let replica_id = this.replica_id(); + let worktree = envelope + .payload + .worktree + .ok_or_else(|| anyhow!("invalid worktree"))?; + let (worktree, load_task) = + Worktree::remote(remote_id, replica_id, worktree, client, cx); + this.add_worktree(&worktree, cx); + load_task.detach(); + Ok(()) + }) } - fn handle_unregister_worktree( - &mut self, + async fn handle_unregister_worktree( + this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); - self.remove_worktree(worktree_id, cx); - Ok(()) + this.update(&mut cx, |this, cx| { + let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); + this.remove_worktree(worktree_id, cx); + Ok(()) + }) } - fn handle_update_worktree( - &mut self, + async fn handle_update_worktree( + this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); - if let Some(worktree) = self.worktree_for_id(worktree_id, cx) { - worktree.update(cx, |worktree, cx| { - let worktree = worktree.as_remote_mut().unwrap(); - worktree.update_from_remote(envelope, cx) - })?; - } - Ok(()) + this.update(&mut cx, |this, cx| { + let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); + if let Some(worktree) = this.worktree_for_id(worktree_id, cx) { + worktree.update(cx, |worktree, cx| { + let worktree = worktree.as_remote_mut().unwrap(); + worktree.update_from_remote(envelope, cx) + })?; + } + Ok(()) + }) } - fn handle_update_diagnostic_summary( - &mut self, + async fn handle_update_diagnostic_summary( + this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); - if let Some(worktree) = self.worktree_for_id(worktree_id, cx) { - if let Some(summary) = envelope.payload.summary { - let project_path = ProjectPath { - worktree_id, - path: Path::new(&summary.path).into(), - }; - worktree.update(cx, |worktree, _| { - worktree - .as_remote_mut() - .unwrap() - .update_diagnostic_summary(project_path.path.clone(), &summary); - }); - cx.emit(Event::DiagnosticsUpdated(project_path)); + this.update(&mut cx, |this, cx| { + let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); + if let Some(worktree) = this.worktree_for_id(worktree_id, cx) { + if let Some(summary) = envelope.payload.summary { + let project_path = ProjectPath { + worktree_id, + path: Path::new(&summary.path).into(), + }; + worktree.update(cx, |worktree, _| { + worktree + .as_remote_mut() + .unwrap() + .update_diagnostic_summary(project_path.path.clone(), &summary); + }); + cx.emit(Event::DiagnosticsUpdated(project_path)); + } } - } - Ok(()) + Ok(()) + }) } - fn handle_disk_based_diagnostics_updating( - &mut self, + async fn handle_disk_based_diagnostics_updating( + this: ModelHandle, _: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - self.disk_based_diagnostics_started(cx); + this.update(&mut cx, |this, cx| this.disk_based_diagnostics_started(cx)); Ok(()) } - fn handle_disk_based_diagnostics_updated( - &mut self, + async fn handle_disk_based_diagnostics_updated( + this: ModelHandle, _: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - self.disk_based_diagnostics_finished(cx); + this.update(&mut cx, |this, cx| this.disk_based_diagnostics_finished(cx)); Ok(()) } - pub fn handle_update_buffer( - &mut self, + async fn handle_update_buffer( + this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let payload = envelope.payload.clone(); - let buffer_id = payload.buffer_id as usize; - let ops = payload - .operations - .into_iter() - .map(|op| language::proto::deserialize_operation(op)) - .collect::, _>>()?; - if let Some(buffer) = self.open_buffers.get_mut(&buffer_id) { - if let Some(buffer) = buffer.upgrade(cx) { - buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; + this.update(&mut cx, |this, cx| { + let payload = envelope.payload.clone(); + let buffer_id = payload.buffer_id as usize; + let ops = payload + .operations + .into_iter() + .map(|op| language::proto::deserialize_operation(op)) + .collect::, _>>()?; + if let Some(buffer) = this.open_buffers.get_mut(&buffer_id) { + if let Some(buffer) = buffer.upgrade(cx) { + buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx))?; + } } - } - Ok(()) + Ok(()) + }) } - pub fn handle_update_buffer_file( - &mut self, + async fn handle_update_buffer_file( + this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let payload = envelope.payload.clone(); - let buffer_id = payload.buffer_id as usize; - let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?; - let worktree = self - .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx) - .ok_or_else(|| anyhow!("no such worktree"))?; - let file = File::from_proto(file, worktree.clone(), cx)?; - let buffer = self - .open_buffers - .get_mut(&buffer_id) - .and_then(|b| b.upgrade(cx)) - .ok_or_else(|| anyhow!("no such buffer"))?; - buffer.update(cx, |buffer, cx| { - buffer.file_updated(Box::new(file), cx).detach(); - }); - - Ok(()) + this.update(&mut cx, |this, cx| { + let payload = envelope.payload.clone(); + let buffer_id = payload.buffer_id as usize; + let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?; + let worktree = this + .worktree_for_id(WorktreeId::from_proto(file.worktree_id), cx) + .ok_or_else(|| anyhow!("no such worktree"))?; + let file = File::from_proto(file, worktree.clone(), cx)?; + let buffer = this + .open_buffers + .get_mut(&buffer_id) + .and_then(|b| b.upgrade(cx)) + .ok_or_else(|| anyhow!("no such buffer"))?; + buffer.update(cx, |buffer, cx| { + buffer.file_updated(Box::new(file), cx).detach(); + }); + Ok(()) + }) } - pub fn handle_save_buffer( - &mut self, + async fn handle_save_buffer( + this: ModelHandle, envelope: TypedEnvelope, - rpc: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - let sender_id = envelope.original_sender_id()?; - let project_id = self.remote_id().ok_or_else(|| anyhow!("not connected"))?; - let buffer = self - .shared_buffers - .get(&sender_id) - .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) - .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; - let receipt = envelope.receipt(); + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { let buffer_id = envelope.payload.buffer_id; - let save = cx.spawn(|_, mut cx| async move { - buffer.update(&mut cx, |buffer, cx| buffer.save(cx)).await - }); + let sender_id = envelope.original_sender_id()?; + let (project_id, save) = this.update(&mut cx, |this, cx| { + let project_id = this.remote_id().ok_or_else(|| anyhow!("not connected"))?; + let buffer = this + .shared_buffers + .get(&sender_id) + .and_then(|shared_buffers| shared_buffers.get(&buffer_id).cloned()) + .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; + Ok::<_, anyhow::Error>((project_id, buffer.update(cx, |buffer, cx| buffer.save(cx)))) + })?; - cx.background() - .spawn( - async move { - let (version, mtime) = save.await?; - - rpc.respond( - receipt, - proto::BufferSaved { - project_id, - buffer_id, - version: (&version).into(), - mtime: Some(mtime.into()), - }, - )?; - - Ok(()) - } - .log_err(), - ) - .detach(); - Ok(()) + let (version, mtime) = save.await?; + Ok(proto::BufferSaved { + project_id, + buffer_id, + version: (&version).into(), + mtime: Some(mtime.into()), + }) } - pub fn handle_format_buffers( - &mut self, + async fn handle_format_buffers( + this: ModelHandle, envelope: TypedEnvelope, - rpc: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - let receipt = envelope.receipt(); + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { let sender_id = envelope.original_sender_id()?; - let shared_buffers = self - .shared_buffers - .get(&sender_id) - .ok_or_else(|| anyhow!("peer has no buffers"))?; - let mut buffers = HashSet::default(); - for buffer_id in envelope.payload.buffer_ids { - buffers.insert( - shared_buffers - .get(&buffer_id) - .cloned() - .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?, - ); - } - cx.spawn(|this, mut cx| async move { - let project_transaction = this - .update(&mut cx, |this, cx| this.format(buffers, false, cx)) - .await - .map(|project_transaction| { - this.update(&mut cx, |this, cx| { - this.serialize_project_transaction_for_peer( - project_transaction, - sender_id, - cx, - ) - }) - }); - // We spawn here in order to enqueue the sending of the response *after* transmission of - // edits associated with formatting. - cx.spawn(|_| async move { - match project_transaction { - Ok(transaction) => rpc.respond( - receipt, - proto::FormatBuffersResponse { - transaction: Some(transaction), - }, - )?, - Err(error) => rpc.respond_with_error( - receipt, - proto::Error { - message: error.to_string(), - }, - )?, - } - Ok::<_, anyhow::Error>(()) - }) - .await - .log_err(); + let format = this.update(&mut cx, |this, cx| { + let shared_buffers = this + .shared_buffers + .get(&sender_id) + .ok_or_else(|| anyhow!("peer has no buffers"))?; + let mut buffers = HashSet::default(); + for buffer_id in &envelope.payload.buffer_ids { + buffers.insert( + shared_buffers + .get(buffer_id) + .cloned() + .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?, + ); + } + Ok::<_, anyhow::Error>(this.format(buffers, false, cx)) + })?; + + let project_transaction = format.await?; + let project_transaction = this.update(&mut cx, |this, cx| { + this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx) + }); + Ok(proto::FormatBuffersResponse { + transaction: Some(project_transaction), }) - .detach(); - Ok(()) } - fn handle_get_completions( - &mut self, + async fn handle_get_completions( + this: ModelHandle, envelope: TypedEnvelope, - rpc: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - let receipt = envelope.receipt(); + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { let sender_id = envelope.original_sender_id()?; - let buffer = self - .shared_buffers - .get(&sender_id) - .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) - .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; let position = envelope .payload .position .and_then(language::proto::deserialize_anchor) .ok_or_else(|| anyhow!("invalid position"))?; - cx.spawn(|this, mut cx| async move { - match this - .update(&mut cx, |this, cx| this.completions(&buffer, position, cx)) - .await - { - Ok(completions) => rpc.respond( - receipt, - proto::GetCompletionsResponse { - completions: completions - .iter() - .map(language::proto::serialize_completion) - .collect(), - }, - ), - Err(error) => rpc.respond_with_error( - receipt, - proto::Error { - message: error.to_string(), - }, - ), - } + let completions = this.update(&mut cx, |this, cx| { + let buffer = this + .shared_buffers + .get(&sender_id) + .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) + .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; + Ok::<_, anyhow::Error>(this.completions(&buffer, position, cx)) + })?; + + Ok(proto::GetCompletionsResponse { + completions: completions + .await? + .iter() + .map(language::proto::serialize_completion) + .collect(), }) - .detach_and_log_err(cx); - Ok(()) } - fn handle_apply_additional_edits_for_completion( - &mut self, + async fn handle_apply_additional_edits_for_completion( + this: ModelHandle, envelope: TypedEnvelope, - rpc: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - let receipt = envelope.receipt(); + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { let sender_id = envelope.original_sender_id()?; - let buffer = self - .shared_buffers - .get(&sender_id) - .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) - .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; - let language = buffer.read(cx).language(); - let completion = language::proto::deserialize_completion( - envelope - .payload - .completion - .ok_or_else(|| anyhow!("invalid completion"))?, - language, - )?; - cx.spawn(|this, mut cx| async move { - match this - .update(&mut cx, |this, cx| { - this.apply_additional_edits_for_completion(buffer, completion, false, cx) - }) - .await - { - Ok(transaction) => rpc.respond( - receipt, - proto::ApplyCompletionAdditionalEditsResponse { - transaction: transaction - .as_ref() - .map(language::proto::serialize_transaction), - }, - ), - Err(error) => rpc.respond_with_error( - receipt, - proto::Error { - message: error.to_string(), - }, - ), - } + let apply_additional_edits = this.update(&mut cx, |this, cx| { + let buffer = this + .shared_buffers + .get(&sender_id) + .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) + .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; + let language = buffer.read(cx).language(); + let completion = language::proto::deserialize_completion( + envelope + .payload + .completion + .ok_or_else(|| anyhow!("invalid completion"))?, + language, + )?; + Ok::<_, anyhow::Error>( + this.apply_additional_edits_for_completion(buffer, completion, false, cx), + ) + })?; + + Ok(proto::ApplyCompletionAdditionalEditsResponse { + transaction: apply_additional_edits + .await? + .as_ref() + .map(language::proto::serialize_transaction), }) - .detach_and_log_err(cx); - Ok(()) } - fn handle_get_code_actions( - &mut self, + async fn handle_get_code_actions( + this: ModelHandle, envelope: TypedEnvelope, - rpc: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - let receipt = envelope.receipt(); + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { let sender_id = envelope.original_sender_id()?; - let buffer = self - .shared_buffers - .get(&sender_id) - .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) - .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; let position = envelope .payload .position .and_then(language::proto::deserialize_anchor) .ok_or_else(|| anyhow!("invalid position"))?; - cx.spawn(|this, mut cx| async move { - match this - .update(&mut cx, |this, cx| this.code_actions(&buffer, position, cx)) - .await - { - Ok(actions) => rpc.respond( - receipt, - proto::GetCodeActionsResponse { - actions: actions - .iter() - .map(language::proto::serialize_code_action) - .collect(), - }, - ), - Err(error) => rpc.respond_with_error( - receipt, - proto::Error { - message: error.to_string(), - }, - ), - } + let code_actions = this.update(&mut cx, |this, cx| { + let buffer = this + .shared_buffers + .get(&sender_id) + .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) + .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; + Ok::<_, anyhow::Error>(this.code_actions(&buffer, position, cx)) + })?; + + Ok(proto::GetCodeActionsResponse { + actions: code_actions + .await? + .iter() + .map(language::proto::serialize_code_action) + .collect(), }) - .detach_and_log_err(cx); - Ok(()) } - fn handle_apply_code_action( - &mut self, + async fn handle_apply_code_action( + this: ModelHandle, envelope: TypedEnvelope, - rpc: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - let receipt = envelope.receipt(); + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { let sender_id = envelope.original_sender_id()?; - let buffer = self - .shared_buffers - .get(&sender_id) - .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) - .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; let action = language::proto::deserialize_code_action( envelope .payload .action .ok_or_else(|| anyhow!("invalid action"))?, )?; - let apply_code_action = self.apply_code_action(buffer, action, false, cx); - cx.spawn(|this, mut cx| async move { - match apply_code_action.await { - Ok(project_transaction) => this.update(&mut cx, |this, cx| { - let serialized_transaction = this.serialize_project_transaction_for_peer( - project_transaction, - sender_id, - cx, - ); - rpc.respond( - receipt, - proto::ApplyCodeActionResponse { - transaction: Some(serialized_transaction), - }, - ) - }), - Err(error) => rpc.respond_with_error( - receipt, - proto::Error { - message: error.to_string(), - }, - ), - } + let apply_code_action = this.update(&mut cx, |this, cx| { + let buffer = this + .shared_buffers + .get(&sender_id) + .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) + .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; + Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx)) + })?; + + let project_transaction = apply_code_action.await?; + let project_transaction = this.update(&mut cx, |this, cx| { + this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx) + }); + Ok(proto::ApplyCodeActionResponse { + transaction: Some(project_transaction), }) - .detach_and_log_err(cx); - Ok(()) } - pub fn handle_get_definition( - &mut self, + async fn handle_get_definition( + this: ModelHandle, envelope: TypedEnvelope, - rpc: Arc, - cx: &mut ModelContext, - ) -> Result<()> { - let receipt = envelope.receipt(); + _: Arc, + mut cx: AsyncAppContext, + ) -> Result { let sender_id = envelope.original_sender_id()?; - let source_buffer = self - .shared_buffers - .get(&sender_id) - .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) - .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; let position = envelope .payload .position .and_then(deserialize_anchor) .ok_or_else(|| anyhow!("invalid position"))?; - if !source_buffer.read(cx).can_resolve(&position) { - return Err(anyhow!("cannot resolve position")); - } + let definitions = this.update(&mut cx, |this, cx| { + let source_buffer = this + .shared_buffers + .get(&sender_id) + .and_then(|shared_buffers| shared_buffers.get(&envelope.payload.buffer_id).cloned()) + .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; + if source_buffer.read(cx).can_resolve(&position) { + Ok(this.definition(&source_buffer, position, cx)) + } else { + Err(anyhow!("cannot resolve position")) + } + })?; - let definitions = self.definition(&source_buffer, position, cx); - cx.spawn(|this, mut cx| async move { - let definitions = definitions.await?; + let definitions = definitions.await?; + + this.update(&mut cx, |this, cx| { let mut response = proto::GetDefinitionResponse { definitions: Default::default(), }; - this.update(&mut cx, |this, cx| { - for definition in definitions { - let buffer = - this.serialize_buffer_for_peer(&definition.target_buffer, sender_id, cx); - response.definitions.push(proto::Definition { - target_start: Some(serialize_anchor(&definition.target_range.start)), - target_end: Some(serialize_anchor(&definition.target_range.end)), - buffer: Some(buffer), - }); - } - }); - rpc.respond(receipt, response)?; - Ok::<_, anyhow::Error>(()) + for definition in definitions { + let buffer = + this.serialize_buffer_for_peer(&definition.target_buffer, sender_id, cx); + response.definitions.push(proto::Definition { + target_start: Some(serialize_anchor(&definition.target_range.start)), + target_end: Some(serialize_anchor(&definition.target_range.end)), + buffer: Some(buffer), + }); + } + Ok(response) }) - .detach_and_log_err(cx); - - Ok(()) } - pub fn handle_open_buffer( - &mut self, + async fn handle_open_buffer( + this: ModelHandle, envelope: TypedEnvelope, - rpc: Arc, - cx: &mut ModelContext, - ) -> anyhow::Result<()> { - let receipt = envelope.receipt(); + _: Arc, + mut cx: AsyncAppContext, + ) -> anyhow::Result { let peer_id = envelope.original_sender_id()?; let worktree_id = WorktreeId::from_proto(envelope.payload.worktree_id); - let open_buffer = self.open_buffer( - ProjectPath { - worktree_id, - path: PathBuf::from(envelope.payload.path).into(), - }, - cx, - ); - cx.spawn(|this, mut cx| { - async move { - let buffer = open_buffer.await?; - let buffer = this.update(&mut cx, |this, cx| { - this.serialize_buffer_for_peer(&buffer, peer_id, cx) - }); - rpc.respond( - receipt, - proto::OpenBufferResponse { - buffer: Some(buffer), - }, - ) - } - .log_err() + let open_buffer = this.update(&mut cx, |this, cx| { + this.open_buffer( + ProjectPath { + worktree_id, + path: PathBuf::from(envelope.payload.path).into(), + }, + cx, + ) + }); + + let buffer = open_buffer.await?; + this.update(&mut cx, |this, cx| { + Ok(proto::OpenBufferResponse { + buffer: Some(this.serialize_buffer_for_peer(&buffer, peer_id, cx)), + }) }) - .detach(); - Ok(()) } fn serialize_project_transaction_for_peer( @@ -2685,67 +2582,74 @@ impl Project { } } - pub fn handle_close_buffer( - &mut self, + async fn handle_close_buffer( + this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> anyhow::Result<()> { - if let Some(shared_buffers) = self.shared_buffers.get_mut(&envelope.original_sender_id()?) { - shared_buffers.remove(&envelope.payload.buffer_id); - cx.notify(); - } - Ok(()) + this.update(&mut cx, |this, cx| { + if let Some(shared_buffers) = + this.shared_buffers.get_mut(&envelope.original_sender_id()?) + { + shared_buffers.remove(&envelope.payload.buffer_id); + cx.notify(); + } + Ok(()) + }) } - pub fn handle_buffer_saved( - &mut self, + async fn handle_buffer_saved( + this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { - let payload = envelope.payload.clone(); - let buffer = self - .open_buffers - .get(&(payload.buffer_id as usize)) - .and_then(|buffer| buffer.upgrade(cx)); - if let Some(buffer) = buffer { - buffer.update(cx, |buffer, cx| { - let version = payload.version.try_into()?; - let mtime = payload - .mtime - .ok_or_else(|| anyhow!("missing mtime"))? - .into(); - buffer.did_save(version, mtime, None, cx); - Result::<_, anyhow::Error>::Ok(()) - })?; - } - Ok(()) + let version = envelope.payload.version.try_into()?; + let mtime = envelope + .payload + .mtime + .ok_or_else(|| anyhow!("missing mtime"))? + .into(); + + this.update(&mut cx, |this, cx| { + let buffer = this + .open_buffers + .get(&(envelope.payload.buffer_id as usize)) + .and_then(|buffer| buffer.upgrade(cx)); + if let Some(buffer) = buffer { + buffer.update(cx, |buffer, cx| { + buffer.did_save(version, mtime, None, cx); + }); + } + Ok(()) + }) } - pub fn handle_buffer_reloaded( - &mut self, + async fn handle_buffer_reloaded( + this: ModelHandle, envelope: TypedEnvelope, _: Arc, - cx: &mut ModelContext, + mut cx: AsyncAppContext, ) -> Result<()> { let payload = envelope.payload.clone(); - let buffer = self - .open_buffers - .get(&(payload.buffer_id as usize)) - .and_then(|buffer| buffer.upgrade(cx)); - if let Some(buffer) = buffer { - buffer.update(cx, |buffer, cx| { - let version = payload.version.try_into()?; - let mtime = payload - .mtime - .ok_or_else(|| anyhow!("missing mtime"))? - .into(); - buffer.did_reload(version, mtime, cx); - Result::<_, anyhow::Error>::Ok(()) - })?; - } - Ok(()) + let version = payload.version.try_into()?; + let mtime = payload + .mtime + .ok_or_else(|| anyhow!("missing mtime"))? + .into(); + this.update(&mut cx, |this, cx| { + let buffer = this + .open_buffers + .get(&(payload.buffer_id as usize)) + .and_then(|buffer| buffer.upgrade(cx)); + if let Some(buffer) = buffer { + buffer.update(cx, |buffer, cx| { + buffer.did_reload(version, mtime, cx); + }); + } + Ok(()) + }) } pub fn match_paths<'a>(