From 455ffb17f10d66993f301da35d21c379b40ef4e9 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Fri, 24 Mar 2023 14:35:18 -0700 Subject: [PATCH] Handle path changes and progress updates from all worker threads during initial scan --- crates/project/src/worktree.rs | 135 ++++++++++++++++++--------------- 1 file changed, 72 insertions(+), 63 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index 763d60bbca..1d40dad864 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -9,7 +9,7 @@ use collections::{HashMap, VecDeque}; use fs::{repository::GitRepository, Fs, LineEnding}; use futures::{ channel::{ - mpsc::{self, UnboundedReceiver, UnboundedSender}, + mpsc::{self, UnboundedSender}, oneshot, }, select_biased, Stream, StreamExt, @@ -44,7 +44,10 @@ use std::{ mem, ops::{Deref, DerefMut}, path::{Path, PathBuf}, - sync::{atomic::AtomicUsize, Arc}, + sync::{ + atomic::{AtomicUsize, Ordering::SeqCst}, + Arc, + }, task::Poll, time::{Duration, SystemTime}, }; @@ -61,7 +64,7 @@ pub enum Worktree { pub struct LocalWorktree { snapshot: LocalSnapshot, - path_changes_tx: mpsc::UnboundedSender<(Vec, barrier::Sender)>, + path_changes_tx: channel::Sender<(Vec, barrier::Sender)>, is_scanning: (watch::Sender, watch::Receiver), _background_scanner_task: Task<()>, share: Option, @@ -238,7 +241,7 @@ impl Worktree { ); } - let (path_changes_tx, path_changes_rx) = mpsc::unbounded(); + let (path_changes_tx, path_changes_rx) = channel::unbounded(); let (scan_states_tx, mut scan_states_rx) = mpsc::unbounded(); cx.spawn_weak(|this, mut cx| async move { @@ -837,7 +840,7 @@ impl LocalWorktree { this.as_local_mut() .unwrap() .path_changes_tx - .unbounded_send((vec![abs_path], tx)) + .try_send((vec![abs_path], tx)) .unwrap(); }); rx.recv().await; @@ -930,7 +933,7 @@ impl LocalWorktree { } let (tx, mut rx) = barrier::channel(); - path_changes_tx.unbounded_send((paths, tx)).unwrap(); + path_changes_tx.try_send((paths, tx)).unwrap(); rx.recv().await; this.upgrade(&cx) .ok_or_else(|| anyhow!("worktree was dropped"))? @@ -2165,7 +2168,7 @@ impl BackgroundScanner { async fn run( self, events_rx: impl Stream>, - mut changed_paths: UnboundedReceiver<(Vec, barrier::Sender)>, + mut changed_paths: channel::Receiver<(Vec, barrier::Sender)>, ) { use futures::FutureExt as _; @@ -2225,64 +2228,70 @@ impl BackgroundScanner { .unwrap(); drop(tx); + let progress_update_count = AtomicUsize::new(0); self.executor .scoped(|scope| { - // While the scan is running, listen for path update requests from the worktree, - // and report updates to the worktree based on a timer. - scope.spawn(async { - let reporting_timer = self.pause_between_initializing_updates().fuse(); - futures::pin_mut!(reporting_timer); - loop { - select_biased! { - job = changed_paths.next().fuse() => { - let Some((abs_paths, barrier)) = job else { break }; - self.update_entries_for_paths(abs_paths, None).await; - if self - .notify - .unbounded_send(ScanState::Initializing { - snapshot: self.snapshot.lock().clone(), - barrier: Some(barrier), - }) - .is_err() - { - break; - } - } - _ = reporting_timer => { - if self - .notify - .unbounded_send(ScanState::Initializing { - snapshot: self.snapshot.lock().clone(), - barrier: None, - }) - .is_err() - { - break; - } - reporting_timer.set(self.pause_between_initializing_updates().fuse()); - } - job = rx.recv().fuse() => { - let Ok(job) = job else { break }; - if let Err(err) = self - .scan_dir(root_char_bag, next_entry_id.clone(), &job) - .await - { - log::error!("error scanning {:?}: {}", job.abs_path, err); - } - } - } - } - }); - - // Spawn worker threads to scan the directory recursively. - for _ in 1..self.executor.num_cpus() { + for _ in 0..self.executor.num_cpus() { scope.spawn(async { - while let Ok(job) = rx.recv().await { - if let Err(err) = self - .scan_dir(root_char_bag, next_entry_id.clone(), &job) - .await - { - log::error!("error scanning {:?}: {}", job.abs_path, err); + let mut last_progress_update_count = 0; + let progress_update_timer = self.pause_between_progress_updates().fuse(); + futures::pin_mut!(progress_update_timer); + loop { + select_biased! { + // Send periodic progress updates to the worktree. Use an atomic counter + // to ensure that only one of the workers sends a progress update after + // the update interval elapses. + _ = progress_update_timer => { + match progress_update_count.compare_exchange( + last_progress_update_count, + last_progress_update_count + 1, + SeqCst, + SeqCst + ) { + Ok(_) => { + last_progress_update_count += 1; + if self + .notify + .unbounded_send(ScanState::Initializing { + snapshot: self.snapshot.lock().clone(), + barrier: None, + }) + .is_err() + { + break; + } + } + Err(current_count) => last_progress_update_count = current_count, + } + progress_update_timer.set(self.pause_between_progress_updates().fuse()); + } + + // Refresh any paths requested by the main thread. + job = changed_paths.recv().fuse() => { + let Ok((abs_paths, barrier)) = job else { break }; + self.update_entries_for_paths(abs_paths, None).await; + if self + .notify + .unbounded_send(ScanState::Initializing { + snapshot: self.snapshot.lock().clone(), + barrier: Some(barrier), + }) + .is_err() + { + break; + } + } + + // Recursively load directories from the file system. + job = rx.recv().fuse() => { + let Ok(job) = job else { break }; + if let Err(err) = self + .scan_dir(root_char_bag, next_entry_id.clone(), &job) + .await + { + log::error!("error scanning {:?}: {}", job.abs_path, err); + } + } } } }); @@ -2370,7 +2379,7 @@ impl BackgroundScanner { } } - async fn pause_between_initializing_updates(&self) { + async fn pause_between_progress_updates(&self) { #[cfg(any(test, feature = "test-support"))] if self.fs.is_fake() { return self.executor.simulate_random_delay().await;