From 7b96888ab179bbd828e8eac68ae0b331fba34e6f Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 15 Jun 2021 11:15:55 +0200 Subject: [PATCH] Ensure that futures returns from `RpcClient` are 'static --- zed/src/rpc_client.rs | 123 +++++++++++++++++++++++------------------- 1 file changed, 68 insertions(+), 55 deletions(-) diff --git a/zed/src/rpc_client.rs b/zed/src/rpc_client.rs index 2f6849e3da..c4f1c45f30 100644 --- a/zed/src/rpc_client.rs +++ b/zed/src/rpc_client.rs @@ -12,6 +12,7 @@ use smol::{ }; use std::{ collections::HashMap, + future::Future, sync::{ atomic::{self, AtomicI32}, Arc, @@ -32,7 +33,7 @@ impl RpcClient where Conn: AsyncRead + AsyncWrite + Unpin + Send + 'static, { - pub fn new(conn: Conn, executor: Arc) -> Self { + pub fn new(conn: Conn, executor: Arc) -> Arc { let response_channels = Arc::new(Mutex::new(HashMap::new())); let (conn_rx, conn_tx) = smol::io::split(conn); let (_drop_tx, drop_rx) = barrier::channel(); @@ -45,12 +46,12 @@ where )) .detach(); - Self { + Arc::new(Self { response_channels, outgoing: Mutex::new(MessageStream::new(conn_tx)), _drop_tx, next_message_id: AtomicI32::new(0), - } + }) } async fn handle_incoming( @@ -101,63 +102,75 @@ where } } - pub async fn request(&self, req: T) -> Result { - let message_id = self.next_message_id.fetch_add(1, atomic::Ordering::SeqCst); - let (tx, mut rx) = mpsc::channel(1); - self.response_channels - .lock() - .await - .insert(message_id, (tx, true)); - self.outgoing - .lock() - .await - .write_message(&proto::FromClient { - id: message_id, - variant: Some(req.to_variant()), - }) - .await?; - let response = rx - .recv() - .await - .expect("response channel was unexpectedly dropped"); - T::Response::from_variant(response) - .ok_or_else(|| anyhow!("received response of the wrong t")) + pub fn request( + self: &Arc, + req: T, + ) -> impl Future> { + let this = self.clone(); + async move { + let message_id = this.next_message_id.fetch_add(1, atomic::Ordering::SeqCst); + let (tx, mut rx) = mpsc::channel(1); + this.response_channels + .lock() + .await + .insert(message_id, (tx, true)); + this.outgoing + .lock() + .await + .write_message(&proto::FromClient { + id: message_id, + variant: Some(req.to_variant()), + }) + .await?; + let response = rx + .recv() + .await + .expect("response channel was unexpectedly dropped"); + T::Response::from_variant(response) + .ok_or_else(|| anyhow!("received response of the wrong t")) + } } - pub async fn send(&self, message: T) -> Result<()> { - let message_id = self.next_message_id.fetch_add(1, atomic::Ordering::SeqCst); - self.outgoing - .lock() - .await - .write_message(&proto::FromClient { - id: message_id, - variant: Some(message.to_variant()), - }) - .await?; - Ok(()) + pub fn send(self: &Arc, message: T) -> impl Future> { + let this = self.clone(); + async move { + let message_id = this.next_message_id.fetch_add(1, atomic::Ordering::SeqCst); + this.outgoing + .lock() + .await + .write_message(&proto::FromClient { + id: message_id, + variant: Some(message.to_variant()), + }) + .await?; + Ok(()) + } } - pub async fn subscribe( - &self, + pub fn subscribe( + self: &Arc, subscription: T, - ) -> Result>> { - let message_id = self.next_message_id.fetch_add(1, atomic::Ordering::SeqCst); - let (tx, rx) = mpsc::channel(256); - self.response_channels - .lock() - .await - .insert(message_id, (tx, false)); - self.outgoing - .lock() - .await - .write_message(&proto::FromClient { - id: message_id, - variant: Some(subscription.to_variant()), - }) - .await?; - Ok(rx.map(|event| { - T::Event::from_variant(event).ok_or_else(|| anyhow!("invalid event {:?}")) - })) + ) -> impl Future>>> { + let this = self.clone(); + async move { + let message_id = this.next_message_id.fetch_add(1, atomic::Ordering::SeqCst); + let (tx, rx) = mpsc::channel(256); + this.response_channels + .lock() + .await + .insert(message_id, (tx, false)); + this.outgoing + .lock() + .await + .write_message(&proto::FromClient { + id: message_id, + variant: Some(subscription.to_variant()), + }) + .await?; + Ok(rx.map(|event| { + T::Event::from_variant(event).ok_or_else(|| anyhow!("invalid event {:?}")) + })) + } } }