From 1f284408a97496b4dd4f87cc6df4e26ae237ffba Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 17 Apr 2023 19:15:07 +0200 Subject: [PATCH] Send buffer operations in batches to reduce latency Co-Authored-By: Max Brunsfeld --- crates/project/src/project.rs | 56 ++++++++++++++++++++++------------- 1 file changed, 36 insertions(+), 20 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 9192c7a411..2deada6a5c 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1733,13 +1733,19 @@ impl Project { async fn send_buffer_messages( this: WeakModelHandle, - mut rx: UnboundedReceiver, + rx: UnboundedReceiver, mut cx: AsyncAppContext, - ) { + ) -> Option<()> { + const MAX_BATCH_SIZE: usize = 128; + let mut needs_resync_with_host = false; - while let Some(change) = rx.next().await { - if let Some(this) = this.upgrade(&mut cx) { - let is_local = this.read_with(&cx, |this, _| this.is_local()); + let mut operations_by_buffer_id = HashMap::default(); + let mut changes = rx.ready_chunks(MAX_BATCH_SIZE); + while let Some(changes) = changes.next().await { + let this = this.upgrade(&mut cx)?; + let is_local = this.read_with(&cx, |this, _| this.is_local()); + + for change in changes { match change { BufferMessage::Operation { buffer_id, @@ -1748,21 +1754,14 @@ impl Project { if needs_resync_with_host { continue; } - let request = this.read_with(&cx, |this, _| { - let project_id = this.remote_id()?; - Some(this.client.request(proto::UpdateBuffer { - buffer_id, - project_id, - operations: vec![operation], - })) - }); - if let Some(request) = request { - if request.await.is_err() && !is_local { - needs_resync_with_host = true; - } - } + + operations_by_buffer_id + .entry(buffer_id) + .or_insert(Vec::new()) + .push(operation); } BufferMessage::Resync => { + operations_by_buffer_id.clear(); if this .update(&mut cx, |this, cx| this.synchronize_remote_buffers(cx)) .await @@ -1772,10 +1771,27 @@ impl Project { } } } - } else { - break; + } + + for (buffer_id, operations) in operations_by_buffer_id.drain() { + let request = this.read_with(&cx, |this, _| { + let project_id = this.remote_id()?; + Some(this.client.request(proto::UpdateBuffer { + buffer_id, + project_id, + operations, + })) + }); + if let Some(request) = request { + if request.await.is_err() && !is_local { + needs_resync_with_host = true; + break; + } + } } } + + None } fn on_buffer_event(