From 917543cc32e63c70f64d54a588deba2871e2b14c Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 1 Mar 2022 13:37:33 -0800 Subject: [PATCH] Handle Peer responses using a futures::oneshot instead of postage::mpsc --- crates/rpc/src/peer.rs | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index a1d5ab8e96..0a00f6d801 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -1,8 +1,7 @@ use super::proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage}; use super::Connection; use anyhow::{anyhow, Context, Result}; -use futures::stream::BoxStream; -use futures::{FutureExt as _, StreamExt}; +use futures::{channel::oneshot, stream::BoxStream, FutureExt as _, StreamExt}; use parking_lot::{Mutex, RwLock}; use postage::{ barrier, mpsc, @@ -92,7 +91,7 @@ pub struct ConnectionState { outgoing_tx: futures::channel::mpsc::UnboundedSender, next_message_id: Arc, response_channels: - Arc>>>>, + Arc>>>>, } const WRITE_TIMEOUT: Duration = Duration::from_secs(10); @@ -177,18 +176,14 @@ impl Peer { async move { if let Some(responding_to) = incoming.responding_to { let channel = response_channels.lock().as_mut()?.remove(&responding_to); - if let Some(mut tx) = channel { + if let Some(tx) = channel { let mut requester_resumed = barrier::channel(); - if let Err(error) = tx.send((incoming, requester_resumed.0)).await { + if let Err(error) = tx.send((incoming, requester_resumed.0)) { log::debug!( "received RPC but request future was dropped {:?}", - error.0 .0 + error.0 ); } - // Drop response channel before awaiting on the barrier. This allows the - // barrier to get dropped even if the request's future is dropped before it - // has a chance to observe the response. - drop(tx); requester_resumed.1.recv().await; } else { log::warn!("received RPC response to unknown request {}", responding_to); @@ -239,7 +234,7 @@ impl Peer { receiver_id: ConnectionId, request: T, ) -> impl Future> { - let (tx, mut rx) = mpsc::channel(1); + let (tx, rx) = oneshot::channel(); let send = self.connection_state(receiver_id).and_then(|connection| { let message_id = connection.next_message_id.fetch_add(1, SeqCst); connection @@ -260,10 +255,7 @@ impl Peer { }); async move { send?; - let (response, _barrier) = rx - .recv() - .await - .ok_or_else(|| anyhow!("connection was closed"))?; + let (response, _barrier) = rx.await.map_err(|_| anyhow!("connection was closed"))?; if let Some(proto::envelope::Payload::Error(error)) = &response.payload { Err(anyhow!("RPC request failed - {}", error.message)) } else {