From b4561b848d05af4908b5f493767f7cad3005610d Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Wed, 8 Mar 2023 10:56:40 +0100 Subject: [PATCH] Limit the number of parallel messages handled for any given connection --- crates/collab/src/rpc.rs | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index d13487440f..1deaefec1a 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -53,7 +53,7 @@ use std::{ }, time::Duration, }; -use tokio::sync::watch; +use tokio::sync::{watch, Semaphore}; use tower::ServiceBuilder; use tracing::{info_span, instrument, Instrument}; @@ -542,8 +542,13 @@ impl Server { // This arrangement ensures we will attempt to process earlier messages first, but fall // back to processing messages arrived later in the spirit of making progress. let mut foreground_message_handlers = FuturesUnordered::new(); + let concurrent_handlers = Arc::new(Semaphore::new(256)); loop { - let next_message = incoming_rx.next().fuse(); + let next_message = async { + let permit = concurrent_handlers.clone().acquire_owned().await.unwrap(); + let message = incoming_rx.next().await; + (permit, message) + }.fuse(); futures::pin_mut!(next_message); futures::select_biased! { _ = teardown.changed().fuse() => return Ok(()), @@ -554,7 +559,8 @@ impl Server { break; } _ = foreground_message_handlers.next() => {} - message = next_message => { + next_message = next_message => { + let (permit, message) = next_message; if let Some(message) = message { let type_name = message.payload_type_name(); let span = tracing::info_span!("receive message", %user_id, %login, %connection_id, %address, type_name); @@ -564,7 +570,10 @@ impl Server { let handle_message = (handler)(message, session.clone()); drop(span_enter); - let handle_message = handle_message.instrument(span); + let handle_message = async move { + handle_message.await; + drop(permit); + }.instrument(span); if is_background { executor.spawn_detached(handle_message); } else {