From 170487a5283b3d7783aa24e5485c0974600165e3 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 23 Feb 2022 15:25:58 -0800 Subject: [PATCH] Fix race conditions with LSP requests that return buffers * Avoid panic when registering a buffer that was previously open, and whose weak handle was still present in the open_buffers map. * Avoid releasing any buffers while a request is outstanding which could return a reference to a buffer. Co-Authored-By: Nathan Sobo --- crates/project/src/lsp_command.rs | 23 +++++- crates/project/src/project.rs | 117 ++++++++++++++++++++++++++---- crates/server/src/rpc.rs | 32 +++++--- 3 files changed, 145 insertions(+), 27 deletions(-) diff --git a/crates/project/src/lsp_command.rs b/crates/project/src/lsp_command.rs index 3b502fc8fa..b091fe0bc3 100644 --- a/crates/project/src/lsp_command.rs +++ b/crates/project/src/lsp_command.rs @@ -1,4 +1,4 @@ -use crate::{DocumentHighlight, Location, Project, ProjectTransaction}; +use crate::{BufferRequestHandle, DocumentHighlight, Location, Project, ProjectTransaction}; use anyhow::{anyhow, Result}; use async_trait::async_trait; use client::{proto, PeerId}; @@ -48,6 +48,7 @@ pub(crate) trait LspCommand: 'static + Sized { message: ::Response, project: ModelHandle, buffer: ModelHandle, + request_handle: BufferRequestHandle, cx: AsyncAppContext, ) -> Result; fn buffer_id_from_proto(message: &Self::ProtoRequest) -> u64; @@ -161,6 +162,7 @@ impl LspCommand for PrepareRename { message: proto::PrepareRenameResponse, _: ModelHandle, buffer: ModelHandle, + _: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result>> { if message.can_rename { @@ -277,6 +279,7 @@ impl LspCommand for PerformRename { message: proto::PerformRenameResponse, project: ModelHandle, _: ModelHandle, + request_handle: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result { let message = message @@ -284,7 +287,12 @@ impl LspCommand for PerformRename { .ok_or_else(|| anyhow!("missing transaction"))?; project .update(&mut cx, |project, cx| { - project.deserialize_project_transaction(message, self.push_to_history, cx) + project.deserialize_project_transaction( + message, + self.push_to_history, + request_handle, + cx, + ) }) .await } @@ -427,13 +435,16 @@ impl LspCommand for GetDefinition { message: proto::GetDefinitionResponse, project: ModelHandle, _: ModelHandle, + request_handle: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result> { let mut locations = Vec::new(); for location in message.locations { let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?; let buffer = project - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle.clone(), cx) + }) .await?; let start = location .start @@ -575,13 +586,16 @@ impl LspCommand for GetReferences { message: proto::GetReferencesResponse, project: ModelHandle, _: ModelHandle, + request_handle: BufferRequestHandle, mut cx: AsyncAppContext, ) -> Result> { let mut locations = Vec::new(); for location in message.locations { let buffer = location.buffer.ok_or_else(|| anyhow!("missing buffer"))?; let target_buffer = project - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle.clone(), cx) + }) .await?; let start = location .start @@ -706,6 +720,7 @@ impl LspCommand for GetDocumentHighlights { message: proto::GetDocumentHighlightsResponse, _: ModelHandle, _: ModelHandle, + _: BufferRequestHandle, _: AsyncAppContext, ) -> Result> { Ok(message diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 1de6b7b131..7b8d6d17dd 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -25,12 +25,17 @@ use rand::prelude::*; use sha2::{Digest, Sha256}; use smol::block_on; use std::{ + cell::RefCell, convert::TryInto, hash::Hash, mem, ops::Range, path::{Component, Path, PathBuf}, - sync::{atomic::AtomicBool, Arc}, + rc::Rc, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, + }, time::Instant, }; use util::{post_inc, ResultExt, TryFutureExt as _}; @@ -58,6 +63,8 @@ pub struct Project { ProjectPath, postage::watch::Receiver, Arc>>>, >, + buffer_request_count: Rc, + preserved_buffers: Rc>>>, shared_buffers: HashMap>>, nonce: u128, } @@ -142,6 +149,11 @@ pub struct Symbol { pub signature: [u8; 32], } +pub struct BufferRequestHandle { + buffer_request_count: Rc, + preserved_buffers: Rc>>>, +} + #[derive(Default)] pub struct ProjectTransaction(pub HashMap, language::Transaction>); @@ -273,6 +285,7 @@ impl Project { open_buffers: Default::default(), loading_buffers: Default::default(), shared_buffers: Default::default(), + preserved_buffers: Default::default(), client_state: ProjectClientState::Local { is_shared: false, remote_id_tx, @@ -288,6 +301,7 @@ impl Project { fs, language_servers_with_diagnostics_running: 0, language_servers: Default::default(), + buffer_request_count: Default::default(), started_language_servers: Default::default(), nonce: StdRng::from_entropy().gen(), } @@ -342,6 +356,8 @@ impl Project { language_servers_with_diagnostics_running: 0, language_servers: Default::default(), started_language_servers: Default::default(), + buffer_request_count: Default::default(), + preserved_buffers: Default::default(), nonce: StdRng::from_entropy().gen(), }; for worktree in worktrees { @@ -682,6 +698,7 @@ impl Project { let remote_worktree_id = worktree.read(cx).id(); let path = path.clone(); let path_string = path.to_string_lossy().to_string(); + let request_handle = self.start_buffer_request(cx); cx.spawn(|this, mut cx| async move { let response = rpc .request(proto::OpenBuffer { @@ -691,8 +708,11 @@ impl Project { }) .await?; let buffer = response.buffer.ok_or_else(|| anyhow!("missing buffer"))?; - this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) - .await + + this.update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle, cx) + }) + .await }) } @@ -733,6 +753,21 @@ impl Project { }) } + fn start_buffer_request(&self, cx: &AppContext) -> BufferRequestHandle { + if self.buffer_request_count.fetch_add(1, Ordering::SeqCst) == 0 { + self.preserved_buffers.borrow_mut().extend( + self.open_buffers + .values() + .filter_map(|buffer| buffer.upgrade(cx)), + ) + } + + BufferRequestHandle { + buffer_request_count: self.buffer_request_count.clone(), + preserved_buffers: self.preserved_buffers.clone(), + } + } + pub fn save_buffer_as( &self, buffer: ModelHandle, @@ -804,15 +839,23 @@ impl Project { worktree: Option<&ModelHandle>, cx: &mut ModelContext, ) -> Result<()> { - match self.open_buffers.insert( - buffer.read(cx).remote_id(), - OpenBuffer::Loaded(buffer.downgrade()), - ) { + let remote_id = buffer.read(cx).remote_id(); + match self + .open_buffers + .insert(remote_id, OpenBuffer::Loaded(buffer.downgrade())) + { None => {} Some(OpenBuffer::Loading(operations)) => { buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))? } - Some(OpenBuffer::Loaded(_)) => Err(anyhow!("registered the same buffer twice"))?, + Some(OpenBuffer::Loaded(existing_handle)) => { + if existing_handle.upgrade(cx).is_some() { + Err(anyhow!( + "already registered buffer with remote id {}", + remote_id + ))? + } + } } self.assign_language_to_buffer(&buffer, worktree, cx); Ok(()) @@ -1195,6 +1238,7 @@ impl Project { let remote_buffers = self.remote_id().zip(remote_buffers); let client = self.client.clone(); + let request_handle = self.start_buffer_request(cx); cx.spawn(|this, mut cx| async move { let mut project_transaction = ProjectTransaction::default(); @@ -1213,7 +1257,12 @@ impl Project { .ok_or_else(|| anyhow!("missing transaction"))?; project_transaction = this .update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) + this.deserialize_project_transaction( + response, + push_to_history, + request_handle, + cx, + ) }) .await?; } @@ -1430,6 +1479,7 @@ impl Project { cx, ) } else if let Some(project_id) = self.remote_id() { + let request_handle = self.start_buffer_request(cx); let request = self.client.request(proto::OpenBufferForSymbol { project_id, symbol: Some(serialize_symbol(symbol)), @@ -1437,8 +1487,10 @@ impl Project { cx.spawn(|this, mut cx| async move { let response = request.await?; let buffer = response.buffer.ok_or_else(|| anyhow!("invalid buffer"))?; - this.update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) - .await + this.update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle, cx) + }) + .await }) } else { Task::ready(Err(anyhow!("project does not have a remote id"))) @@ -1817,6 +1869,7 @@ impl Project { }) } else if let Some(project_id) = self.remote_id() { let client = self.client.clone(); + let request_handle = self.start_buffer_request(cx); let request = proto::ApplyCodeAction { project_id, buffer_id: buffer_handle.read(cx).remote_id(), @@ -1829,7 +1882,12 @@ impl Project { .transaction .ok_or_else(|| anyhow!("missing transaction"))?; this.update(&mut cx, |this, cx| { - this.deserialize_project_transaction(response, push_to_history, cx) + this.deserialize_project_transaction( + response, + push_to_history, + request_handle, + cx, + ) }) .await }) @@ -2020,11 +2078,12 @@ impl Project { } } else if let Some(project_id) = self.remote_id() { let rpc = self.client.clone(); + let request_handle = self.start_buffer_request(cx); let message = request.to_proto(project_id, buffer); return cx.spawn(|this, cx| async move { let response = rpc.request(message).await?; request - .response_from_proto(response, this, buffer_handle, cx) + .response_from_proto(response, this, buffer_handle, request_handle, cx) .await }); } @@ -2864,13 +2923,16 @@ impl Project { &mut self, message: proto::ProjectTransaction, push_to_history: bool, + request_handle: BufferRequestHandle, cx: &mut ModelContext, ) -> Task> { cx.spawn(|this, mut cx| async move { let mut project_transaction = ProjectTransaction::default(); for (buffer, transaction) in message.buffers.into_iter().zip(message.transactions) { let buffer = this - .update(&mut cx, |this, cx| this.deserialize_buffer(buffer, cx)) + .update(&mut cx, |this, cx| { + this.deserialize_buffer(buffer, request_handle.clone(), cx) + }) .await?; let transaction = language::proto::deserialize_transaction(transaction)?; project_transaction.0.insert(buffer, transaction); @@ -2917,6 +2979,7 @@ impl Project { fn deserialize_buffer( &mut self, buffer: proto::Buffer, + request_handle: BufferRequestHandle, cx: &mut ModelContext, ) -> Task>> { let replica_id = self.replica_id(); @@ -2963,6 +3026,8 @@ impl Project { let buffer = cx.add_model(|cx| { Buffer::from_proto(replica_id, buffer, buffer_file, cx).unwrap() }); + + request_handle.preserve_buffer(buffer.clone()); this.update(&mut cx, |this, cx| { this.register_buffer(&buffer, buffer_worktree.as_ref(), cx) })?; @@ -3111,6 +3176,30 @@ impl Project { } } +impl BufferRequestHandle { + fn preserve_buffer(&self, buffer: ModelHandle) { + self.preserved_buffers.borrow_mut().push(buffer); + } +} + +impl Clone for BufferRequestHandle { + fn clone(&self) -> Self { + self.buffer_request_count.fetch_add(1, Ordering::SeqCst); + Self { + buffer_request_count: self.buffer_request_count.clone(), + preserved_buffers: self.preserved_buffers.clone(), + } + } +} + +impl Drop for BufferRequestHandle { + fn drop(&mut self) { + if self.buffer_request_count.fetch_sub(1, Ordering::SeqCst) == 1 { + self.preserved_buffers.borrow_mut().clear(); + } + } +} + impl WorktreeHandle { pub fn upgrade(&self, cx: &AppContext) -> Option> { match self { diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 791796d494..919fc9636e 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -4856,6 +4856,8 @@ mod tests { cx.background().simulate_random_delay().await; } + log::info!("Host done"); + self.project = Some(project); (self, cx) } @@ -4887,15 +4889,25 @@ mod tests { }; operations.set(operations.get() + 1); - let project_path = worktree.read_with(&cx, |worktree, _| { - let entry = worktree - .entries(false) - .filter(|e| e.is_file()) - .choose(&mut *rng.lock()) - .unwrap(); - (worktree.id(), entry.path.clone()) - }); - log::info!("Guest {}: opening path {:?}", guest_id, project_path); + let (worktree_root_name, project_path) = + worktree.read_with(&cx, |worktree, _| { + let entry = worktree + .entries(false) + .filter(|e| e.is_file()) + .choose(&mut *rng.lock()) + .unwrap(); + ( + worktree.root_name().to_string(), + (worktree.id(), entry.path.clone()), + ) + }); + log::info!( + "Guest {}: opening path in worktree {:?} {:?} {:?}", + guest_id, + project_path.0, + worktree_root_name, + project_path.1 + ); let buffer = project .update(&mut cx, |project, cx| project.open_buffer(project_path, cx)) .await @@ -5062,6 +5074,8 @@ mod tests { cx.background().simulate_random_delay().await; } + log::info!("Guest {} done", guest_id); + self.project = Some(project); (self, cx) }