diff --git a/crates/project/src/buffer_store.rs b/crates/project/src/buffer_store.rs index 0045aba2e8..b69679d6ac 100644 --- a/crates/project/src/buffer_store.rs +++ b/crates/project/src/buffer_store.rs @@ -14,7 +14,10 @@ use gpui::{ }; use http_client::Url; use language::{ - proto::{deserialize_line_ending, deserialize_version, serialize_version, split_operations}, + proto::{ + deserialize_line_ending, deserialize_version, serialize_line_ending, serialize_version, + split_operations, + }, Buffer, BufferEvent, Capability, File as _, Language, Operation, }; use rpc::{proto, AnyProtoClient, ErrorExt as _, TypedEnvelope}; @@ -29,9 +32,8 @@ use worktree::{ /// A set of open buffers. pub struct BufferStore { - downstream_client: Option, - remote_id: Option, - #[allow(unused)] + state: BufferStoreState, + downstream_client: Option<(AnyProtoClient, u64)>, worktree_store: Model, opened_buffers: HashMap, local_buffer_ids_by_path: HashMap, @@ -44,12 +46,11 @@ pub struct BufferStore { loading_remote_buffers_by_id: HashMap>, remote_buffer_listeners: HashMap, anyhow::Error>>>>, - shared_buffers: HashMap>, + shared_buffers: HashMap>>, } enum OpenBuffer { - Strong(Model), - Weak(WeakModel), + Buffer(WeakModel), Operations(Vec), } @@ -62,6 +63,15 @@ pub enum BufferStoreEvent { }, } +enum BufferStoreState { + Remote { + shared_with_me: HashSet>, + upstream_client: AnyProtoClient, + project_id: u64, + }, + Local {}, +} + #[derive(Default, Debug)] pub struct ProjectTransaction(pub HashMap, language::Transaction>); @@ -75,17 +85,36 @@ impl BufferStore { client.add_model_message_handler(Self::handle_update_diff_base); client.add_model_request_handler(Self::handle_save_buffer); client.add_model_request_handler(Self::handle_blame_buffer); + client.add_model_request_handler(Self::handle_reload_buffers); } /// Creates a buffer store, optionally retaining its buffers. - /// - /// If `retain_buffers` is `true`, then buffers are owned by the buffer store - /// and won't be released unless they are explicitly removed, or `retain_buffers` - /// is set to `false` via `set_retain_buffers`. Otherwise, buffers are stored as - /// weak handles. - pub fn new( + pub fn local(worktree_store: Model, cx: &mut ModelContext) -> Self { + cx.subscribe(&worktree_store, |this, _, event, cx| { + if let WorktreeStoreEvent::WorktreeAdded(worktree) = event { + this.subscribe_to_worktree(worktree, cx); + } + }) + .detach(); + + Self { + state: BufferStoreState::Local {}, + downstream_client: None, + worktree_store, + opened_buffers: Default::default(), + remote_buffer_listeners: Default::default(), + loading_remote_buffers_by_id: Default::default(), + local_buffer_ids_by_path: Default::default(), + local_buffer_ids_by_entry_id: Default::default(), + loading_buffers_by_path: Default::default(), + shared_buffers: Default::default(), + } + } + + pub fn remote( worktree_store: Model, - remote_id: Option, + upstream_client: AnyProtoClient, + remote_id: u64, cx: &mut ModelContext, ) -> Self { cx.subscribe(&worktree_store, |this, _, event, cx| { @@ -96,7 +125,11 @@ impl BufferStore { .detach(); Self { - remote_id, + state: BufferStoreState::Remote { + shared_with_me: Default::default(), + upstream_client, + project_id: remote_id, + }, downstream_client: None, worktree_store, opened_buffers: Default::default(), @@ -288,16 +321,14 @@ impl BufferStore { buffer.set_diff_base(diff_base.clone(), cx); buffer.remote_id().to_proto() }); - if let Some(project_id) = this.remote_id { - if let Some(client) = &this.downstream_client { - client - .send(proto::UpdateDiffBase { - project_id, - buffer_id, - diff_base, - }) - .log_err(); - } + if let Some((client, project_id)) = &this.downstream_client { + client + .send(proto::UpdateDiffBase { + project_id: *project_id, + buffer_id, + diff_base, + }) + .log_err(); } } }) @@ -496,8 +527,8 @@ impl BufferStore { let new_file = save.await?; let mtime = new_file.mtime; this.update(&mut cx, |this, cx| { - if let Some(downstream_client) = this.downstream_client.as_ref() { - let project_id = this.remote_id.unwrap_or(0); + if let Some((downstream_client, project_id)) = this.downstream_client.as_ref() { + let project_id = *project_id; if has_changed_file { downstream_client .send(proto::UpdateBufferFile { @@ -620,11 +651,7 @@ impl BufferStore { fn add_buffer(&mut self, buffer: Model, cx: &mut ModelContext) -> Result<()> { let remote_id = buffer.read(cx).remote_id(); let is_remote = buffer.read(cx).replica_id() != 0; - let open_buffer = if self.remote_id.is_some() { - OpenBuffer::Strong(buffer.clone()) - } else { - OpenBuffer::Weak(buffer.downgrade()) - }; + let open_buffer = OpenBuffer::Buffer(buffer.downgrade()); let handle = cx.handle().downgrade(); buffer.update(cx, move |_, cx| { @@ -768,8 +795,7 @@ impl BufferStore { } pub fn disconnected_from_host(&mut self, cx: &mut AppContext) { - self.downstream_client.take(); - self.set_remote_id(None, cx); + self.drop_unnecessary_buffers(cx); for buffer in self.buffers() { buffer.update(cx, |buffer, cx| { @@ -786,32 +812,20 @@ impl BufferStore { &mut self, remote_id: u64, downstream_client: AnyProtoClient, - cx: &mut AppContext, + _cx: &mut AppContext, ) { - self.downstream_client = Some(downstream_client); - self.set_remote_id(Some(remote_id), cx); + self.downstream_client = Some((downstream_client, remote_id)); } pub fn unshared(&mut self, _cx: &mut ModelContext) { - self.remote_id.take(); + self.downstream_client.take(); + self.forget_shared_buffers(); } - fn set_remote_id(&mut self, remote_id: Option, cx: &mut AppContext) { - self.remote_id = remote_id; + fn drop_unnecessary_buffers(&mut self, cx: &mut AppContext) { for open_buffer in self.opened_buffers.values_mut() { - if remote_id.is_some() { - if let OpenBuffer::Weak(buffer) = open_buffer { - if let Some(buffer) = buffer.upgrade() { - *open_buffer = OpenBuffer::Strong(buffer); - } - } - } else { - if let Some(buffer) = open_buffer.upgrade() { - buffer.update(cx, |buffer, _| buffer.give_up_waiting()); - } - if let OpenBuffer::Strong(buffer) = open_buffer { - *open_buffer = OpenBuffer::Weak(buffer.downgrade()); - } + if let Some(buffer) = open_buffer.upgrade() { + buffer.update(cx, |buffer, _| buffer.give_up_waiting()); } } } @@ -881,8 +895,26 @@ impl BufferStore { event: &BufferEvent, cx: &mut ModelContext, ) { - if event == &BufferEvent::FileHandleChanged { - self.buffer_changed_file(buffer, cx); + match event { + BufferEvent::FileHandleChanged => { + self.buffer_changed_file(buffer, cx); + } + BufferEvent::Reloaded => { + let Some((downstream_client, project_id)) = self.downstream_client.as_ref() else { + return; + }; + let buffer = buffer.read(cx); + downstream_client + .send(proto::BufferReloaded { + project_id: *project_id, + buffer_id: buffer.remote_id().to_proto(), + version: serialize_version(&buffer.version()), + mtime: buffer.saved_mtime().map(|t| t.into()), + line_ending: serialize_line_ending(buffer.line_ending()) as i32, + }) + .log_err(); + } + _ => {} } } @@ -986,16 +1018,14 @@ impl BufferStore { } } - if let Some(project_id) = self.remote_id { - if let Some(client) = &self.downstream_client { - client - .send(proto::UpdateBufferFile { - project_id, - buffer_id: buffer_id.to_proto(), - file: Some(new_file.to_proto(cx)), - }) - .ok(); - } + if let Some((client, project_id)) = &self.downstream_client { + client + .send(proto::UpdateBufferFile { + project_id: *project_id, + buffer_id: buffer_id.to_proto(), + file: Some(new_file.to_proto(cx)), + }) + .ok(); } buffer.file_updated(Arc::new(new_file), cx); @@ -1050,11 +1080,8 @@ impl BufferStore { this.update(&mut cx, |this, cx| { match this.opened_buffers.entry(buffer_id) { hash_map::Entry::Occupied(mut e) => match e.get_mut() { - OpenBuffer::Strong(buffer) => { - buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx)); - } OpenBuffer::Operations(operations) => operations.extend_from_slice(&ops), - OpenBuffer::Weak(buffer) => { + OpenBuffer::Buffer(buffer) => { if let Some(buffer) = buffer.upgrade() { buffer.update(cx, |buffer, cx| buffer.apply_ops(ops, cx)); } @@ -1090,7 +1117,7 @@ impl BufferStore { self.shared_buffers .entry(guest_id) .or_default() - .insert(buffer_id); + .insert(buffer.clone()); let buffer = buffer.read(cx); response.buffers.push(proto::BufferVersion { @@ -1230,6 +1257,19 @@ impl BufferStore { } } else if chunk.is_last { self.loading_remote_buffers_by_id.remove(&buffer_id); + // retain buffers sent by peers to avoid races. + match &mut self.state { + BufferStoreState::Remote { + ref mut shared_with_me, + upstream_client, + .. + } => { + if upstream_client.is_via_collab() { + shared_with_me.insert(buffer.clone()); + } + } + _ => {} + } self.add_buffer(buffer, cx)?; } } @@ -1303,7 +1343,10 @@ impl BufferStore { let (buffer, project_id) = this.update(&mut cx, |this, _| { anyhow::Ok(( this.get_existing(buffer_id)?, - this.remote_id.context("project is not shared")?, + this.downstream_client + .as_ref() + .map(|(_, project_id)| *project_id) + .context("project is not shared")?, )) })??; buffer @@ -1340,12 +1383,14 @@ impl BufferStore { let peer_id = envelope.sender_id; let buffer_id = BufferId::new(envelope.payload.buffer_id)?; this.update(&mut cx, |this, _| { - if let Some(shared) = this.shared_buffers.get_mut(&peer_id) { - if shared.remove(&buffer_id) { - if shared.is_empty() { - this.shared_buffers.remove(&peer_id); + if let Some(buffer) = this.get(buffer_id) { + if let Some(shared) = this.shared_buffers.get_mut(&peer_id) { + if shared.remove(&buffer) { + if shared.is_empty() { + this.shared_buffers.remove(&peer_id); + } + return; } - return; } }; debug_panic!( @@ -1429,6 +1474,98 @@ impl BufferStore { } } + pub fn reload_buffers( + &self, + buffers: HashSet>, + push_to_history: bool, + cx: &mut ModelContext, + ) -> Task> { + let mut local_buffers = Vec::new(); + let mut remote_buffers = Vec::new(); + for buffer_handle in buffers { + let buffer = buffer_handle.read(cx); + if buffer.is_dirty() { + if let Some(file) = File::from_dyn(buffer.file()) { + if file.is_local() { + local_buffers.push(buffer_handle); + } else { + remote_buffers.push(buffer_handle); + } + } + } + } + + let client = self.upstream_client(); + + cx.spawn(move |this, mut cx| async move { + let mut project_transaction = ProjectTransaction::default(); + if let Some((client, project_id)) = client { + let response = client + .request(proto::ReloadBuffers { + project_id, + buffer_ids: remote_buffers + .iter() + .filter_map(|buffer| { + buffer + .update(&mut cx, |buffer, _| buffer.remote_id().into()) + .ok() + }) + .collect(), + }) + .await? + .transaction + .ok_or_else(|| anyhow!("missing transaction"))?; + BufferStore::deserialize_project_transaction( + this, + response, + push_to_history, + cx.clone(), + ) + .await?; + } + + for buffer in local_buffers { + let transaction = buffer + .update(&mut cx, |buffer, cx| buffer.reload(cx))? + .await?; + buffer.update(&mut cx, |buffer, cx| { + if let Some(transaction) = transaction { + if !push_to_history { + buffer.forget_transaction(transaction.id); + } + project_transaction.0.insert(cx.handle(), transaction); + } + })?; + } + + Ok(project_transaction) + }) + } + + async fn handle_reload_buffers( + this: Model, + envelope: TypedEnvelope, + mut cx: AsyncAppContext, + ) -> Result { + let sender_id = envelope.original_sender_id().unwrap_or_default(); + let reload = this.update(&mut cx, |this, cx| { + let mut buffers = HashSet::default(); + for buffer_id in &envelope.payload.buffer_ids { + let buffer_id = BufferId::new(*buffer_id)?; + buffers.insert(this.get_existing(buffer_id)?); + } + Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx)) + })??; + + let project_transaction = reload.await?; + let project_transaction = this.update(&mut cx, |this, cx| { + this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx) + })?; + Ok(proto::ReloadBuffersResponse { + transaction: Some(project_transaction), + }) + } + pub fn create_buffer_for_peer( &mut self, buffer: &Model, @@ -1440,12 +1577,12 @@ impl BufferStore { .shared_buffers .entry(peer_id) .or_default() - .insert(buffer_id) + .insert(buffer.clone()) { return Task::ready(Ok(())); } - let Some((client, project_id)) = self.downstream_client.clone().zip(self.remote_id) else { + let Some((client, project_id)) = self.downstream_client.clone() else { return Task::ready(Ok(())); }; @@ -1492,6 +1629,17 @@ impl BufferStore { }) } + pub fn upstream_client(&self) -> Option<(AnyProtoClient, u64)> { + match &self.state { + BufferStoreState::Remote { + upstream_client, + project_id, + .. + } => Some((upstream_client.clone(), *project_id)), + BufferStoreState::Local { .. } => None, + } + } + pub fn forget_shared_buffers(&mut self) { self.shared_buffers.clear(); } @@ -1506,7 +1654,7 @@ impl BufferStore { } } - pub fn shared_buffers(&self) -> &HashMap> { + pub fn shared_buffers(&self) -> &HashMap>> { &self.shared_buffers } @@ -1572,8 +1720,7 @@ impl BufferStore { impl OpenBuffer { fn upgrade(&self) -> Option> { match self { - OpenBuffer::Strong(handle) => Some(handle.clone()), - OpenBuffer::Weak(handle) => handle.upgrade(), + OpenBuffer::Buffer(handle) => handle.upgrade(), OpenBuffer::Operations(_) => None, } } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 199b5a8f5c..454a7586c8 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -42,10 +42,7 @@ use gpui::{ use itertools::Itertools; use language::{ language_settings::InlayHintKind, - proto::{ - deserialize_anchor, serialize_anchor, serialize_line_ending, serialize_version, - split_operations, - }, + proto::{deserialize_anchor, serialize_anchor, split_operations}, Buffer, BufferEvent, CachedLspAdapter, Capability, CodeLabel, ContextProvider, DiagnosticEntry, Documentation, File as _, Language, LanguageRegistry, LanguageServerName, PointUtf16, ToOffset, ToPointUtf16, Transaction, Unclipped, @@ -559,7 +556,6 @@ impl Project { client.add_model_message_handler(Self::handle_unshare_project); client.add_model_request_handler(Self::handle_update_buffer); client.add_model_message_handler(Self::handle_update_worktree); - client.add_model_request_handler(Self::handle_reload_buffers); client.add_model_request_handler(Self::handle_synchronize_buffers); client.add_model_request_handler(Self::handle_search_project); @@ -599,8 +595,7 @@ impl Project { cx.subscribe(&worktree_store, Self::on_worktree_store_event) .detach(); - let buffer_store = - cx.new_model(|cx| BufferStore::new(worktree_store.clone(), None, cx)); + let buffer_store = cx.new_model(|cx| BufferStore::local(worktree_store.clone(), cx)); cx.subscribe(&buffer_store, Self::on_buffer_store_event) .detach(); @@ -695,8 +690,14 @@ impl Project { cx.subscribe(&worktree_store, Self::on_worktree_store_event) .detach(); - let buffer_store = - cx.new_model(|cx| BufferStore::new(worktree_store.clone(), None, cx)); + let buffer_store = cx.new_model(|cx| { + BufferStore::remote( + worktree_store.clone(), + ssh.clone().into(), + SSH_PROJECT_ID, + cx, + ) + }); cx.subscribe(&buffer_store, Self::on_buffer_store_event) .detach(); @@ -851,8 +852,9 @@ impl Project { .map(DevServerProjectId), ) })?; - let buffer_store = - cx.new_model(|cx| BufferStore::new(worktree_store.clone(), Some(remote_id), cx))?; + let buffer_store = cx.new_model(|cx| { + BufferStore::remote(worktree_store.clone(), client.clone().into(), remote_id, cx) + })?; let lsp_store = cx.new_model(|cx| { let mut lsp_store = LspStore::new_remote( @@ -2167,23 +2169,6 @@ impl Project { .ok(); } - BufferEvent::Reloaded => { - if self.is_local_or_ssh() { - if let Some(project_id) = self.remote_id() { - let buffer = buffer.read(cx); - self.client - .send(proto::BufferReloaded { - project_id, - buffer_id: buffer.remote_id().to_proto(), - version: serialize_version(&buffer.version()), - mtime: buffer.saved_mtime().map(|t| t.into()), - line_ending: serialize_line_ending(buffer.line_ending()) as i32, - }) - .log_err(); - } - } - } - _ => {} } @@ -2347,67 +2332,8 @@ impl Project { push_to_history: bool, cx: &mut ModelContext, ) -> Task> { - let mut local_buffers = Vec::new(); - let mut remote_buffers = None; - for buffer_handle in buffers { - let buffer = buffer_handle.read(cx); - if buffer.is_dirty() { - if let Some(file) = File::from_dyn(buffer.file()) { - if file.is_local() { - local_buffers.push(buffer_handle); - } else { - remote_buffers.get_or_insert(Vec::new()).push(buffer_handle); - } - } - } - } - - let remote_buffers = self.remote_id().zip(remote_buffers); - let client = self.client.clone(); - - cx.spawn(move |this, mut cx| async move { - let mut project_transaction = ProjectTransaction::default(); - - if let Some((project_id, remote_buffers)) = remote_buffers { - let response = client - .request(proto::ReloadBuffers { - project_id, - buffer_ids: remote_buffers - .iter() - .filter_map(|buffer| { - buffer - .update(&mut cx, |buffer, _| buffer.remote_id().into()) - .ok() - }) - .collect(), - }) - .await? - .transaction - .ok_or_else(|| anyhow!("missing transaction"))?; - BufferStore::deserialize_project_transaction( - this.read_with(&cx, |this, _| this.buffer_store.downgrade())?, - response, - push_to_history, - cx.clone(), - ) - .await?; - } - - for buffer in local_buffers { - let transaction = buffer - .update(&mut cx, |buffer, cx| buffer.reload(cx))? - .await?; - buffer.update(&mut cx, |buffer, cx| { - if let Some(transaction) = transaction { - if !push_to_history { - buffer.forget_transaction(transaction.id); - } - project_transaction.0.insert(cx.handle(), transaction); - } - })?; - } - - Ok(project_transaction) + self.buffer_store.update(cx, |buffer_store, cx| { + buffer_store.reload_buffers(buffers, push_to_history, cx) }) } @@ -3589,30 +3515,6 @@ impl Project { })? } - async fn handle_reload_buffers( - this: Model, - envelope: TypedEnvelope, - mut cx: AsyncAppContext, - ) -> Result { - let sender_id = envelope.original_sender_id()?; - let reload = this.update(&mut cx, |this, cx| { - let mut buffers = HashSet::default(); - for buffer_id in &envelope.payload.buffer_ids { - let buffer_id = BufferId::new(*buffer_id)?; - buffers.insert(this.buffer_store.read(cx).get_existing(buffer_id)?); - } - Ok::<_, anyhow::Error>(this.reload_buffers(buffers, false, cx)) - })??; - - let project_transaction = reload.await?; - let project_transaction = this.update(&mut cx, |this, cx| { - this.serialize_project_transaction_for_peer(project_transaction, sender_id, cx) - })?; - Ok(proto::ReloadBuffersResponse { - transaction: Some(project_transaction), - }) - } - async fn handle_synchronize_buffers( this: Model, envelope: TypedEnvelope, @@ -3896,17 +3798,6 @@ impl Project { })? } - fn serialize_project_transaction_for_peer( - &mut self, - project_transaction: ProjectTransaction, - peer_id: proto::PeerId, - cx: &mut AppContext, - ) -> proto::ProjectTransaction { - self.buffer_store.update(cx, |buffer_store, cx| { - buffer_store.serialize_project_transaction_for_peer(project_transaction, peer_id, cx) - }) - } - fn create_buffer_for_peer( &mut self, buffer: &Model, diff --git a/crates/remote_server/src/headless_project.rs b/crates/remote_server/src/headless_project.rs index 043f7e95ee..0af0d6bb15 100644 --- a/crates/remote_server/src/headless_project.rs +++ b/crates/remote_server/src/headless_project.rs @@ -50,8 +50,7 @@ impl HeadlessProject { store }); let buffer_store = cx.new_model(|cx| { - let mut buffer_store = - BufferStore::new(worktree_store.clone(), Some(SSH_PROJECT_ID), cx); + let mut buffer_store = BufferStore::local(worktree_store.clone(), cx); buffer_store.shared(SSH_PROJECT_ID, session.clone().into(), cx); buffer_store }); diff --git a/crates/remote_server/src/remote_editing_tests.rs b/crates/remote_server/src/remote_editing_tests.rs index 18eb12b445..eca65f1349 100644 --- a/crates/remote_server/src/remote_editing_tests.rs +++ b/crates/remote_server/src/remote_editing_tests.rs @@ -7,6 +7,7 @@ use http_client::FakeHttpClient; use language::{ language_settings::{all_language_settings, AllLanguageSettings}, Buffer, FakeLspAdapter, LanguageConfig, LanguageMatcher, LanguageRegistry, LanguageServerName, + LineEnding, }; use lsp::{CompletionContext, CompletionResponse, CompletionTriggerKind}; use node_runtime::NodeRuntime; @@ -18,7 +19,10 @@ use remote::SshSession; use serde_json::json; use settings::{Settings, SettingsLocation, SettingsStore}; use smol::stream::StreamExt; -use std::{path::Path, sync::Arc}; +use std::{ + path::{Path, PathBuf}, + sync::Arc, +}; #[gpui::test] async fn test_basic_remote_editing(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { @@ -440,6 +444,54 @@ async fn test_remote_lsp(cx: &mut TestAppContext, server_cx: &mut TestAppContext }) } +#[gpui::test] +async fn test_remote_reload(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { + let (project, _headless, fs) = init_test(cx, server_cx).await; + let (worktree, _) = project + .update(cx, |project, cx| { + project.find_or_create_worktree("/code/project1", true, cx) + }) + .await + .unwrap(); + + let worktree_id = cx.update(|cx| worktree.read(cx).id()); + + let buffer = project + .update(cx, |project, cx| { + project.open_buffer((worktree_id, Path::new("src/lib.rs")), cx) + }) + .await + .unwrap(); + buffer.update(cx, |buffer, cx| { + buffer.edit([(0..0, "a")], None, cx); + }); + + fs.save( + &PathBuf::from("/code/project1/src/lib.rs"), + &("bloop".to_string().into()), + LineEnding::Unix, + ) + .await + .unwrap(); + + cx.run_until_parked(); + cx.update(|cx| { + assert!(buffer.read(cx).has_conflict()); + }); + + project + .update(cx, |project, cx| { + project.reload_buffers([buffer.clone()].into_iter().collect(), false, cx) + }) + .await + .unwrap(); + cx.run_until_parked(); + + cx.update(|cx| { + assert!(!buffer.read(cx).has_conflict()); + }); +} + #[gpui::test] async fn test_remote_resolve_file_path(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { let (project, _headless, _fs) = init_test(cx, server_cx).await; @@ -483,6 +535,34 @@ async fn test_remote_resolve_file_path(cx: &mut TestAppContext, server_cx: &mut ); } +#[gpui::test(iterations = 10)] +async fn test_canceling_buffer_opening(cx: &mut TestAppContext, server_cx: &mut TestAppContext) { + let (project, _headless, _fs) = init_test(cx, server_cx).await; + let (worktree, _) = project + .update(cx, |project, cx| { + project.find_or_create_worktree("/code/project1", true, cx) + }) + .await + .unwrap(); + let worktree_id = worktree.read_with(cx, |tree, _| tree.id()); + + // Open a buffer on the client but cancel after a random amount of time. + let buffer = project.update(cx, |p, cx| p.open_buffer((worktree_id, "src/lib.rs"), cx)); + cx.executor().simulate_random_delay().await; + drop(buffer); + + // Try opening the same buffer again as the client, and ensure we can + // still do it despite the cancellation above. + let buffer = project + .update(cx, |p, cx| p.open_buffer((worktree_id, "src/lib.rs"), cx)) + .await + .unwrap(); + + buffer.read_with(cx, |buf, _| { + assert_eq!(buf.text(), "fn one() -> usize { 1 }") + }); +} + fn init_logger() { if std::env::var("RUST_LOG").is_ok() { env_logger::try_init().ok();