From fb736d5e78fed5594595267594fdc332093a111e Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 16 Jun 2021 17:02:40 -0700 Subject: [PATCH] Remove peer message listeners when their receiver is dropped --- zed-rpc/src/peer.rs | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/zed-rpc/src/peer.rs b/zed-rpc/src/peer.rs index f41d067a2d..2638a2827f 100644 --- a/zed-rpc/src/peer.rs +++ b/zed-rpc/src/peer.rs @@ -32,8 +32,9 @@ struct Connection { _close_barrier: barrier::Sender, } -type MessageHandler = - Box, ConnectionId) -> Option>>; +type MessageHandler = Box< + dyn Send + Sync + Fn(&mut Option, ConnectionId) -> Option>, +>; pub struct TypedEnvelope { id: u32, @@ -90,7 +91,8 @@ impl Peer { connection_id, payload: T::from_envelope(envelope).unwrap(), }) - .await; + .await + .is_err() } .boxed(), ) @@ -152,17 +154,24 @@ impl Peer { ); } } else { - let mut handled = false; let mut envelope = Some(incoming); - for handler in this.message_handlers.read().await.iter() { + let mut handler_index = None; + let mut handler_was_dropped = false; + for (i, handler) in + this.message_handlers.read().await.iter().enumerate() + { if let Some(future) = handler(&mut envelope, connection_id) { - future.await; - handled = true; + handler_was_dropped = future.await; + handler_index = Some(i); break; } } - if !handled { + if let Some(handler_index) = handler_index { + if handler_was_dropped { + drop(this.message_handlers.write().await.remove(handler_index)); + } + } else { log::warn!("unhandled message: {:?}", envelope.unwrap().payload); } }