diff --git a/Cargo.lock b/Cargo.lock index 372bb8457..9514862b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1063,6 +1063,7 @@ dependencies = [ "prost", "rand", "rand_chacha", + "rayon", "regex", "rustix 0.38.6", "serde_json", @@ -1663,21 +1664,19 @@ dependencies = [ [[package]] name = "rayon" -version = "1.5.3" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd99e5772ead8baa5215278c9b15bf92087709e9c1b2d1f97cdb5a183c933a7d" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" dependencies = [ - "autocfg", - "crossbeam-deque", "either", "rayon-core", ] [[package]] name = "rayon-core" -version = "1.9.3" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "258bcdb5ac6dad48491bb2992db6b7cf74878b0384908af124823d118c99683f" +checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" dependencies = [ "crossbeam-channel", "crossbeam-deque", diff --git a/lib/Cargo.toml b/lib/Cargo.toml index f42aa6cfb..0dfdc4c0b 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -41,6 +41,7 @@ pest_derive = "2.7.2" prost = "0.11.9" rand = "0.8.5" rand_chacha = "0.3.1" +rayon = "1.7.0" regex = "1.9.1" serde_json = "1.0.104" smallvec = { version = "1.11.0", features = [ diff --git a/lib/src/matchers.rs b/lib/src/matchers.rs index 5cfa5965d..6498ddb18 100644 --- a/lib/src/matchers.rs +++ b/lib/src/matchers.rs @@ -67,7 +67,7 @@ pub enum VisitFiles { Set(HashSet), } -pub trait Matcher { +pub trait Matcher: Sync { fn matches(&self, file: &RepoPath) -> bool; fn visit(&self, dir: &RepoPath) -> Visit; } diff --git a/lib/src/working_copy.rs b/lib/src/working_copy.rs index 201c6a656..8406aaf5a 100644 --- a/lib/src/working_copy.rs +++ b/lib/src/working_copy.rs @@ -33,6 +33,8 @@ use std::time::UNIX_EPOCH; use itertools::Itertools; use once_cell::unsync::OnceCell; use prost::Message; +use rayon::iter::IntoParallelIterator; +use rayon::prelude::ParallelIterator; use tempfile::NamedTempFile; use thiserror::Error; use tracing::{instrument, trace_span}; @@ -649,30 +651,26 @@ impl TreeState { }); let matcher = IntersectionMatcher::new(sparse_matcher.as_ref(), fsmonitor_matcher); - let mut work = vec![WorkItem { + let work_item = WorkItem { dir: RepoPath::root(), disk_dir: self.working_copy_path.clone(), git_ignore: base_ignores, - }]; + }; trace_span!("traverse filesystem").in_scope(|| -> Result<(), SnapshotError> { let (tree_entries_tx, tree_entries_rx) = channel(); let (file_states_tx, file_states_rx) = channel(); let (deleted_files_tx, deleted_files_rx) = channel(); - while let Some(work_item) = work.pop() { - work.extend(self.visit_directory( - &matcher, - ¤t_tree, - tree_entries_tx.clone(), - file_states_tx.clone(), - deleted_files_tx.clone(), - work_item, - progress, - )?); - } - drop(tree_entries_tx); - drop(file_states_tx); - drop(deleted_files_tx); + self.visit_directory( + &matcher, + ¤t_tree, + tree_entries_tx, + file_states_tx, + deleted_files_tx, + work_item, + progress, + )?; + while let Ok((path, tree_value)) = tree_entries_rx.recv() { tree_builder.set(path, tree_value); } @@ -706,7 +704,7 @@ impl TreeState { deleted_files_tx: Sender, work_item: WorkItem, progress: Option<&SnapshotProgress>, - ) -> Result, SnapshotError> { + ) -> Result<(), SnapshotError> { let WorkItem { dir, disk_dir, @@ -714,115 +712,141 @@ impl TreeState { } = work_item; if matcher.visit(&dir).is_nothing() { - return Ok(Default::default()); + return Ok(()); } let git_ignore = git_ignore.chain_with_file(&dir.to_internal_dir_string(), disk_dir.join(".gitignore")); - let mut work = Vec::new(); - for maybe_entry in disk_dir.read_dir().unwrap() { - let entry = maybe_entry.unwrap(); - let file_type = entry.file_type().unwrap(); - let file_name = entry.file_name(); - let name = file_name - .to_str() - .ok_or_else(|| SnapshotError::InvalidUtf8Path { - path: file_name.clone(), - })?; - if name == ".jj" || name == ".git" { - continue; - } - let path = dir.join(&RepoPathComponent::from(name)); - if let Some(file_state) = self.file_states.get(&path) { - if file_state.file_type == FileType::GitSubmodule { - continue; - } - } + let dir_entries = disk_dir + .read_dir() + .unwrap() + .map(|maybe_entry| maybe_entry.unwrap()) + .collect_vec(); + dir_entries.into_par_iter().try_for_each_with( + ( + tree_entries_tx.clone(), + file_states_tx.clone(), + deleted_files_tx.clone(), + ), + |(tree_entries_tx, file_states_tx, deleted_files_tx), + entry| + -> Result<(), SnapshotError> { + let file_type = entry.file_type().unwrap(); + let file_name = entry.file_name(); + let name = file_name + .to_str() + .ok_or_else(|| SnapshotError::InvalidUtf8Path { + path: file_name.clone(), + })?; - if file_type.is_dir() { - if git_ignore.matches_all_files_in(&path.to_internal_dir_string()) { - // If the whole directory is ignored, visit only paths we're already - // tracking. - let tracked_paths = self - .file_states - .range((Bound::Excluded(&path), Bound::Unbounded)) - .take_while(|(sub_path, _)| path.contains(sub_path)) - .map(|(sub_path, file_state)| (sub_path.clone(), file_state.clone())) - .collect_vec(); - for (tracked_path, current_file_state) in tracked_paths { - if !matcher.matches(&tracked_path) { - continue; - } - let disk_path = tracked_path.to_fs_path(&self.working_copy_path); - let metadata = match disk_path.metadata() { - Ok(metadata) => metadata, - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + if name == ".jj" || name == ".git" { + return Ok(()); + } + let path = dir.join(&RepoPathComponent::from(name)); + if let Some(file_state) = self.file_states.get(&path) { + if file_state.file_type == FileType::GitSubmodule { + return Ok(()); + } + } + + if file_type.is_dir() { + if git_ignore.matches_all_files_in(&path.to_internal_dir_string()) { + // If the whole directory is ignored, visit only paths we're already + // tracking. + let tracked_paths = self + .file_states + .range((Bound::Excluded(&path), Bound::Unbounded)) + .take_while(|(sub_path, _)| path.contains(sub_path)) + .map(|(sub_path, file_state)| (sub_path.clone(), file_state.clone())) + .collect_vec(); + for (tracked_path, current_file_state) in tracked_paths { + if !matcher.matches(&tracked_path) { continue; } - Err(err) => { - return Err(SnapshotError::IoError { - message: format!("Failed to stat file {}", disk_path.display()), - err, - }); + let disk_path = tracked_path.to_fs_path(&self.working_copy_path); + let metadata = match disk_path.metadata() { + Ok(metadata) => metadata, + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + continue; + } + Err(err) => { + return Err(SnapshotError::IoError { + message: format!( + "Failed to stat file {}", + disk_path.display() + ), + err, + }); + } + }; + if let Some(new_file_state) = file_state(&metadata) { + deleted_files_tx.send(tracked_path.clone()).ok(); + let update = self.get_updated_tree_value( + &tracked_path, + disk_path, + Some(¤t_file_state), + current_tree, + &new_file_state, + )?; + if let Some(tree_value) = update { + tree_entries_tx + .send((tracked_path.clone(), tree_value)) + .ok(); + } + file_states_tx.send((tracked_path, new_file_state)).ok(); } + } + } else { + let work_item = WorkItem { + dir: path, + disk_dir: entry.path(), + git_ignore: git_ignore.clone(), }; + self.visit_directory( + matcher, + current_tree, + tree_entries_tx.clone(), + file_states_tx.clone(), + deleted_files_tx.clone(), + work_item, + progress, + )?; + } + } else if matcher.matches(&path) { + if let Some(progress) = progress { + progress(&path); + } + let maybe_current_file_state = self.file_states.get(&path); + if maybe_current_file_state.is_none() + && git_ignore.matches_file(&path.to_internal_file_string()) + { + // If it wasn't already tracked and it matches + // the ignored paths, then + // ignore it. + } else { + let metadata = entry.metadata().map_err(|err| SnapshotError::IoError { + message: format!("Failed to stat file {}", entry.path().display()), + err, + })?; if let Some(new_file_state) = file_state(&metadata) { - deleted_files_tx.send(tracked_path.clone()).ok(); + deleted_files_tx.send(path.clone()).ok(); let update = self.get_updated_tree_value( - &tracked_path, - disk_path, - Some(¤t_file_state), + &path, + entry.path(), + maybe_current_file_state, current_tree, &new_file_state, )?; if let Some(tree_value) = update { - tree_entries_tx - .send((tracked_path.clone(), tree_value)) - .ok(); + tree_entries_tx.send((path.clone(), tree_value)).ok(); } - file_states_tx.send((tracked_path, new_file_state)).ok(); + file_states_tx.send((path, new_file_state)).ok(); } } - } else { - work.push(WorkItem { - dir: path, - disk_dir: entry.path(), - git_ignore: git_ignore.clone(), - }); } - } else if matcher.matches(&path) { - if let Some(progress) = progress { - progress(&path); - } - let maybe_current_file_state = self.file_states.get(&path); - if maybe_current_file_state.is_none() - && git_ignore.matches_file(&path.to_internal_file_string()) - { - // If it wasn't already tracked and it matches - // the ignored paths, then - // ignore it. - } else { - let metadata = entry.metadata().map_err(|err| SnapshotError::IoError { - message: format!("Failed to stat file {}", entry.path().display()), - err, - })?; - if let Some(new_file_state) = file_state(&metadata) { - deleted_files_tx.send(path.clone()).ok(); - let update = self.get_updated_tree_value( - &path, - entry.path(), - maybe_current_file_state, - current_tree, - &new_file_state, - )?; - if let Some(tree_value) = update { - tree_entries_tx.send((path.clone(), tree_value)).ok(); - } - file_states_tx.send((path, new_file_state)).ok(); - } - } - } - } - Ok(work) + Ok(()) + }, + )?; + Ok(()) } #[instrument(skip_all)] @@ -1587,4 +1611,4 @@ impl Drop for LockedWorkingCopy<'_> { } } -pub type SnapshotProgress<'a> = dyn Fn(&RepoPath) + 'a; +pub type SnapshotProgress<'a> = dyn Fn(&RepoPath) + 'a + Sync;