Remove peer message listeners when their receiver is dropped

This commit is contained in:
Max Brunsfeld 2021-06-16 17:02:40 -07:00
parent 9de4d73ffb
commit fb736d5e78

View file

@ -32,8 +32,9 @@ struct Connection {
_close_barrier: barrier::Sender, _close_barrier: barrier::Sender,
} }
type MessageHandler = type MessageHandler = Box<
Box<dyn Send + Sync + Fn(&mut Option<proto::Envelope>, ConnectionId) -> Option<BoxFuture<()>>>; dyn Send + Sync + Fn(&mut Option<proto::Envelope>, ConnectionId) -> Option<BoxFuture<bool>>,
>;
pub struct TypedEnvelope<T> { pub struct TypedEnvelope<T> {
id: u32, id: u32,
@ -90,7 +91,8 @@ impl Peer {
connection_id, connection_id,
payload: T::from_envelope(envelope).unwrap(), payload: T::from_envelope(envelope).unwrap(),
}) })
.await; .await
.is_err()
} }
.boxed(), .boxed(),
) )
@ -152,17 +154,24 @@ impl Peer {
); );
} }
} else { } else {
let mut handled = false;
let mut envelope = Some(incoming); let mut envelope = Some(incoming);
for handler in this.message_handlers.read().await.iter() { let mut handler_index = None;
let mut handler_was_dropped = false;
for (i, handler) in
this.message_handlers.read().await.iter().enumerate()
{
if let Some(future) = handler(&mut envelope, connection_id) { if let Some(future) = handler(&mut envelope, connection_id) {
future.await; handler_was_dropped = future.await;
handled = true; handler_index = Some(i);
break; break;
} }
} }
if !handled { if let Some(handler_index) = handler_index {
if handler_was_dropped {
drop(this.message_handlers.write().await.remove(handler_index));
}
} else {
log::warn!("unhandled message: {:?}", envelope.unwrap().payload); log::warn!("unhandled message: {:?}", envelope.unwrap().payload);
} }
} }