From 7a629769b79eab6ddbc5236ee554f8d81d5d372f Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Wed, 4 Jan 2023 16:00:43 -0700 Subject: [PATCH] Re-request incomplete remote buffers when syncing buffers Any buffers we requested but that haven't been fully sent will cause outstainding open requests to hang. If we re-request them, any waiting open requests will resume when the requested buffers finish being created. Co-authored-by: Max Brunsfeld Co-authored-by: Mikayla Maki --- crates/project/src/lsp_command.rs | 6 +- crates/project/src/project.rs | 91 ++++++++++++++++++++++--------- 2 files changed, 68 insertions(+), 29 deletions(-) diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index a0eb845581..feec1ee0e4 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -524,7 +524,7 @@ async fn location_links_from_proto( Some(origin) => { let buffer = project .update(&mut cx, |this, cx| { - this.wait_for_buffer(origin.buffer_id, cx) + this.wait_for_remote_buffer(origin.buffer_id, cx) }) .await?; let start = origin @@ -549,7 +549,7 @@ async fn location_links_from_proto( let target = link.target.ok_or_else(|| anyhow!("missing target"))?; let buffer = project .update(&mut cx, |this, cx| { - this.wait_for_buffer(target.buffer_id, cx) + this.wait_for_remote_buffer(target.buffer_id, cx) }) .await?; let start = target @@ -814,7 +814,7 @@ impl LspCommand for GetReferences { for location in message.locations { let target_buffer = project .update(&mut cx, |this, cx| { - this.wait_for_buffer(location.buffer_id, cx) + this.wait_for_remote_buffer(location.buffer_id, cx) }) .await?; let start = location diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index b1a8c81b9a..dc41e68b4f 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -107,7 +107,7 @@ pub struct Project { opened_buffer: (watch::Sender<()>, watch::Receiver<()>), shared_buffers: HashMap>, #[allow(clippy::type_complexity)] - loading_buffers: HashMap< + loading_buffers_by_path: HashMap< ProjectPath, postage::watch::Receiver, Arc>>>, >, @@ -115,7 +115,9 @@ pub struct Project { loading_local_worktrees: HashMap, Shared, Arc>>>>, opened_buffers: HashMap, - incomplete_buffers: HashMap>, + /// A mapping from a buffer ID to None means that we've started waiting for an ID but haven't finished loading it. + /// Used for re-issuing buffer requests when peers temporarily disconnect + incomplete_remote_buffers: HashMap>>, buffer_snapshots: HashMap>, buffers_being_formatted: HashSet, nonce: u128, @@ -411,8 +413,8 @@ impl Project { collaborators: Default::default(), opened_buffers: Default::default(), shared_buffers: Default::default(), - incomplete_buffers: Default::default(), - loading_buffers: Default::default(), + incomplete_remote_buffers: Default::default(), + loading_buffers_by_path: Default::default(), loading_local_worktrees: Default::default(), buffer_snapshots: Default::default(), client_state: None, @@ -467,10 +469,10 @@ impl Project { let mut this = Self { worktrees: Vec::new(), - loading_buffers: Default::default(), + loading_buffers_by_path: Default::default(), opened_buffer: watch::channel(), shared_buffers: Default::default(), - incomplete_buffers: Default::default(), + incomplete_remote_buffers: Default::default(), loading_local_worktrees: Default::default(), active_entry: None, collaborators: Default::default(), @@ -1284,7 +1286,7 @@ impl Project { return Task::ready(Ok(existing_buffer)); } - let mut loading_watch = match self.loading_buffers.entry(project_path.clone()) { + let mut loading_watch = match self.loading_buffers_by_path.entry(project_path.clone()) { // If the given path is already being loaded, then wait for that existing // task to complete and return the same buffer. hash_map::Entry::Occupied(e) => e.get().clone(), @@ -1304,7 +1306,7 @@ impl Project { let load_result = load_buffer.await; *tx.borrow_mut() = Some(this.update(&mut cx, |this, _| { // Record the fact that the buffer is no longer loading. - this.loading_buffers.remove(&project_path); + this.loading_buffers_by_path.remove(&project_path); let buffer = load_result.map_err(Arc::new)?; Ok(buffer) })); @@ -1364,7 +1366,7 @@ impl Project { }) .await?; this.update(&mut cx, |this, cx| { - this.wait_for_buffer(response.buffer_id, cx) + this.wait_for_remote_buffer(response.buffer_id, cx) }) .await }) @@ -1425,8 +1427,10 @@ impl Project { .request(proto::OpenBufferById { project_id, id }); cx.spawn(|this, mut cx| async move { let buffer_id = request.await?.buffer_id; - this.update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx)) - .await + this.update(&mut cx, |this, cx| { + this.wait_for_remote_buffer(buffer_id, cx) + }) + .await }) } else { Task::ready(Err(anyhow!("cannot open buffer while disconnected"))) @@ -3268,7 +3272,7 @@ impl Project { cx.spawn(|this, mut cx| async move { let response = request.await?; this.update(&mut cx, |this, cx| { - this.wait_for_buffer(response.buffer_id, cx) + this.wait_for_remote_buffer(response.buffer_id, cx) }) .await }) @@ -4124,7 +4128,7 @@ impl Project { for location in response.locations { let target_buffer = this .update(&mut cx, |this, cx| { - this.wait_for_buffer(location.buffer_id, cx) + this.wait_for_remote_buffer(location.buffer_id, cx) }) .await?; let start = location @@ -5005,19 +5009,21 @@ impl Project { let buffer = cx.add_model(|_| { Buffer::from_proto(this.replica_id(), state, buffer_file).unwrap() }); - this.incomplete_buffers.insert(buffer_id, buffer); + this.incomplete_remote_buffers + .insert(buffer_id, Some(buffer)); } proto::create_buffer_for_peer::Variant::Chunk(chunk) => { let buffer = this - .incomplete_buffers + .incomplete_remote_buffers .get(&chunk.buffer_id) + .cloned() + .flatten() .ok_or_else(|| { anyhow!( "received chunk for buffer {} without initial state", chunk.buffer_id ) - })? - .clone(); + })?; let operations = chunk .operations .into_iter() @@ -5026,7 +5032,7 @@ impl Project { buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?; if chunk.is_last { - this.incomplete_buffers.remove(&chunk.buffer_id); + this.incomplete_remote_buffers.remove(&chunk.buffer_id); this.register_buffer(&buffer, cx)?; } } @@ -5049,7 +5055,12 @@ impl Project { .opened_buffers .get_mut(&buffer_id) .and_then(|b| b.upgrade(cx)) - .or_else(|| this.incomplete_buffers.get(&buffer_id).cloned()) + .or_else(|| { + this.incomplete_remote_buffers + .get(&buffer_id) + .cloned() + .flatten() + }) { buffer.update(cx, |buffer, cx| buffer.set_diff_base(diff_base, cx)); } @@ -5070,7 +5081,12 @@ impl Project { .opened_buffers .get_mut(&buffer_id) .and_then(|b| b.upgrade(cx)) - .or_else(|| this.incomplete_buffers.get(&buffer_id).cloned()) + .or_else(|| { + this.incomplete_remote_buffers + .get(&buffer_id) + .cloned() + .flatten() + }) { let file = payload.file.ok_or_else(|| anyhow!("invalid file"))?; let worktree = this @@ -5610,7 +5626,9 @@ impl Project { for (buffer_id, transaction) in message.buffer_ids.into_iter().zip(message.transactions) { let buffer = this - .update(&mut cx, |this, cx| this.wait_for_buffer(buffer_id, cx)) + .update(&mut cx, |this, cx| { + this.wait_for_remote_buffer(buffer_id, cx) + }) .await?; let transaction = language::proto::deserialize_transaction(transaction)?; project_transaction.0.insert(buffer, transaction); @@ -5686,12 +5704,13 @@ impl Project { buffer_id } - fn wait_for_buffer( - &self, + fn wait_for_remote_buffer( + &mut self, id: u64, cx: &mut ModelContext, ) -> Task>> { let mut opened_buffer_rx = self.opened_buffer.1.clone(); + cx.spawn(|this, mut cx| async move { let buffer = loop { let buffer = this.read_with(&cx, |this, cx| { @@ -5705,6 +5724,9 @@ impl Project { return Err(anyhow!("disconnected before buffer {} could be opened", id)); } + this.update(&mut cx, |this, _| { + this.incomplete_remote_buffers.entry(id).or_default(); + }); opened_buffer_rx .next() .await @@ -5739,8 +5761,9 @@ impl Project { let client = self.client.clone(); cx.spawn(|this, cx| async move { - let buffers = this.read_with(&cx, |this, cx| { - this.opened_buffers + let (buffers, incomplete_buffer_ids) = this.read_with(&cx, |this, cx| { + let buffers = this + .opened_buffers .iter() .filter_map(|(id, buffer)| { let buffer = buffer.upgrade(cx)?; @@ -5749,7 +5772,14 @@ impl Project { version: language::proto::serialize_version(&buffer.read(cx).version), }) }) - .collect() + .collect(); + let incomplete_buffer_ids = this + .incomplete_remote_buffers + .keys() + .copied() + .collect::>(); + + (buffers, incomplete_buffer_ids) }); let response = client .request(proto::SynchronizeBuffers { @@ -5783,6 +5813,15 @@ impl Project { } }) }); + + // Any incomplete buffers have open requests waiting. Request that the host sends + // creates these buffers for us again to unblock any waiting futures. + for id in incomplete_buffer_ids { + cx.background() + .spawn(client.request(proto::OpenBufferById { project_id, id })) + .detach(); + } + futures::future::join_all(send_updates_for_buffers) .await .into_iter()