WIP: Lay down a skeleton for another attempt at rescan

Co-Authored-By: Max Brunsfeld <maxbrunsfeld@gmail.com>
This commit is contained in:
Nathan Sobo 2021-04-16 12:53:07 -06:00
parent e19a56c366
commit 6a549727ce

View file

@ -23,10 +23,7 @@ use std::{
ops::AddAssign,
os::unix::fs::MetadataExt,
path::{Path, PathBuf},
sync::{
atomic::{self, AtomicU64},
Arc,
},
sync::Arc,
time::Duration,
};
@ -40,16 +37,16 @@ enum ScanState {
}
pub struct Worktree {
id: usize,
path: Arc<Path>,
entries: SumTree<Entry>,
scanner: BackgroundScanner,
snapshot: Snapshot,
scanner: Arc<BackgroundScanner>,
scan_state: ScanState,
poll_scheduled: bool,
}
#[derive(Clone)]
pub struct Snapshot {
id: usize,
path: Arc<Path>,
root_inode: Option<u64>,
entries: SumTree<Entry>,
}
@ -62,14 +59,16 @@ pub struct FileHandle {
impl Worktree {
pub fn new(path: impl Into<Arc<Path>>, ctx: &mut ModelContext<Self>) -> Self {
let id = ctx.model_id();
let path = path.into();
let scan_state = smol::channel::unbounded();
let scanner = BackgroundScanner::new(id, path.clone(), scan_state.0);
let tree = Self {
id,
path,
let snapshot = Snapshot {
id: ctx.model_id(),
path: path.into(),
root_inode: None,
entries: Default::default(),
};
let scanner = Arc::new(BackgroundScanner::new(snapshot.clone(), scan_state.0));
let tree = Self {
snapshot,
scanner,
scan_state: ScanState::Idle,
poll_scheduled: false,
@ -90,7 +89,7 @@ impl Worktree {
}
fn poll_entries(&mut self, ctx: &mut ModelContext<Self>) {
self.entries = self.scanner.snapshot();
self.snapshot = self.scanner.snapshot();
ctx.notify();
if self.is_scanning() && !self.poll_scheduled {
@ -112,27 +111,23 @@ impl Worktree {
}
pub fn snapshot(&self) -> Snapshot {
Snapshot {
id: self.id,
root_inode: self.scanner.root_inode(),
entries: self.entries.clone(),
}
self.snapshot.clone()
}
pub fn contains_path(&self, path: &Path) -> bool {
path.starts_with(&self.path)
path.starts_with(&self.snapshot.path)
}
pub fn has_inode(&self, inode: u64) -> bool {
self.entries.get(&inode).is_some()
self.snapshot.entries.get(&inode).is_some()
}
pub fn file_count(&self) -> usize {
self.entries.summary().file_count
self.snapshot.entries.summary().file_count
}
pub fn abs_path_for_inode(&self, ino: u64) -> Result<PathBuf> {
let mut result = self.path.to_path_buf();
let mut result = self.snapshot.path.to_path_buf();
result.push(self.path_for_inode(ino, false)?);
Ok(result)
}
@ -140,12 +135,13 @@ impl Worktree {
pub fn path_for_inode(&self, ino: u64, include_root: bool) -> Result<PathBuf> {
let mut components = Vec::new();
let mut entry = self
.snapshot
.entries
.get(&ino)
.ok_or_else(|| anyhow!("entry does not exist in worktree"))?;
components.push(entry.name());
while let Some(parent) = entry.parent() {
entry = self.entries.get(&parent).unwrap();
entry = self.snapshot.entries.get(&parent).unwrap();
components.push(entry.name());
}
@ -196,7 +192,7 @@ impl Worktree {
}
fn fmt_entry(&self, f: &mut fmt::Formatter<'_>, ino: u64, indent: usize) -> fmt::Result {
match self.entries.get(&ino).unwrap() {
match self.snapshot.entries.get(&ino).unwrap() {
Entry::Dir { name, children, .. } => {
write!(
f,
@ -222,13 +218,16 @@ impl Worktree {
#[cfg(test)]
pub fn files<'a>(&'a self) -> impl Iterator<Item = u64> + 'a {
self.entries.cursor::<(), ()>().filter_map(|entry| {
if let Entry::File { inode, .. } = entry {
Some(*inode)
} else {
None
}
})
self.snapshot
.entries
.cursor::<(), ()>()
.filter_map(|entry| {
if let Entry::File { inode, .. } = entry {
Some(*inode)
} else {
None
}
})
}
}
@ -238,7 +237,7 @@ impl Entity for Worktree {
impl fmt::Debug for Worktree {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(root_ino) = self.scanner.root_inode() {
if let Some(root_ino) = self.snapshot.root_inode {
self.fmt_entry(f, root_ino, 0)
} else {
write!(f, "Empty tree\n")
@ -273,6 +272,11 @@ impl Snapshot {
Some(inode)
})
}
fn entry_for_path(&self, path: impl AsRef<Path>) -> Option<&Entry> {
self.inode_for_path(path)
.and_then(|inode| self.entries.get(&inode))
}
}
impl FileHandle {
@ -392,55 +396,60 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount {
}
}
#[derive(Clone)]
struct BackgroundScanner {
id: usize,
path: Arc<Path>,
root_ino: Arc<AtomicU64>,
entries: Arc<Mutex<SumTree<Entry>>>,
snapshot: Mutex<Snapshot>,
notify: Sender<ScanState>,
thread_pool: scoped_pool::Pool,
}
impl BackgroundScanner {
fn new(id: usize, path: Arc<Path>, notify: Sender<ScanState>) -> Self {
fn new(snapshot: Snapshot, notify: Sender<ScanState>) -> Self {
Self {
id,
path,
root_ino: Arc::new(AtomicU64::new(0)),
entries: Default::default(),
snapshot: Mutex::new(snapshot),
notify,
thread_pool: scoped_pool::Pool::new(16),
}
}
fn root_inode(&self) -> Option<u64> {
let ino = self.root_ino.load(atomic::Ordering::SeqCst);
if ino == 0 {
None
} else {
Some(ino)
}
fn path(&self) -> Arc<Path> {
self.snapshot.lock().path.clone()
}
fn snapshot(&self) -> SumTree<Entry> {
self.entries.lock().clone()
fn snapshot(&self) -> Snapshot {
self.snapshot.lock().clone()
}
fn run(&self) {
let event_stream = fsevent::EventStream::new(
&[self.path.as_ref()],
Duration::from_millis(100),
|events| {
match self.process_events(events) {
Ok(alive) => alive,
Err(err) => {
// TODO: handle errors
false
}
let path = {
let mut snapshot = self.snapshot.lock();
let canonical_path = snapshot
.path
.canonicalize()
.map(Arc::from)
.unwrap_or_else(|_| snapshot.path.clone());
snapshot.path = canonical_path.clone();
canonical_path
};
// Create the event stream before we start scanning to ensure we receive events for changes
// that occur in the middle of the scan.
let event_stream =
fsevent::EventStream::new(&[path.as_ref()], Duration::from_millis(100), |events| {
if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
return false;
}
},
);
if let Err(error) = self.process_events(events) {
log::error!("error handling events: {}", error);
return false;
}
if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
return false;
}
true
});
if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() {
return;
@ -460,40 +469,38 @@ impl BackgroundScanner {
}
fn scan_dirs(&self) -> io::Result<()> {
let metadata = fs::metadata(&self.path)?;
let ino = metadata.ino();
let is_symlink = fs::symlink_metadata(&self.path)?.file_type().is_symlink();
let name = Arc::from(self.path.file_name().unwrap_or(OsStr::new("/")));
let path = self.path();
let metadata = fs::metadata(&path)?;
let inode = metadata.ino();
let is_symlink = fs::symlink_metadata(&path)?.file_type().is_symlink();
let name = Arc::from(path.file_name().unwrap_or(OsStr::new("/")));
let relative_path = PathBuf::from(&name);
let mut ignore = IgnoreBuilder::new()
.build()
.add_parents(&self.path)
.unwrap();
let mut ignore = IgnoreBuilder::new().build().add_parents(&path).unwrap();
if metadata.is_dir() {
ignore = ignore.add_child(&self.path).unwrap();
ignore = ignore.add_child(&path).unwrap();
}
let is_ignored = ignore.matched(&self.path, metadata.is_dir()).is_ignore();
let is_ignored = ignore.matched(&path, metadata.is_dir()).is_ignore();
if metadata.file_type().is_dir() {
let is_ignored = is_ignored || name.as_ref() == ".git";
let dir_entry = Entry::Dir {
parent: None,
name,
inode: ino,
inode,
is_symlink,
is_ignored,
children: Arc::from([]),
pending: true,
};
self.insert_entries(Some(dir_entry.clone()));
self.root_ino.store(ino, atomic::Ordering::SeqCst);
self.snapshot.lock().root_inode = Some(inode);
let (tx, rx) = crossbeam_channel::unbounded();
tx.send(Ok(ScanJob {
ino,
path: self.path.clone(),
ino: inode,
path: path.clone(),
relative_path,
dir_entry,
ignore: Some(ignore),
@ -522,12 +529,12 @@ impl BackgroundScanner {
self.insert_entries(Some(Entry::File {
parent: None,
name,
path: PathEntry::new(ino, &relative_path, is_ignored),
inode: ino,
path: PathEntry::new(inode, &relative_path, is_ignored),
inode,
is_symlink,
is_ignored,
}));
self.root_ino.store(ino, atomic::Ordering::SeqCst);
self.snapshot.lock().root_inode = Some(inode);
}
Ok(())
@ -620,195 +627,256 @@ impl BackgroundScanner {
Ok(())
}
fn process_events(&self, mut events: Vec<fsevent::Event>) -> Result<bool> {
if self.notify.receiver_count() == 0 {
return Ok(false);
}
// TODO: should we canonicalize this at the start?
let root_path = self.path.canonicalize()?;
let snapshot = Snapshot {
id: self.id,
entries: self.entries.lock().clone(),
root_inode: self.root_inode(),
};
let mut removed = HashSet::new();
let mut observed = HashSet::new();
let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded();
fn process_events(&self, mut events: Vec<fsevent::Event>) -> Result<()> {
let snapshot = self.snapshot();
events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
let mut paths = events.into_iter().map(|e| e.path).peekable();
while let Some(path) = paths.next() {
let relative_path = path.strip_prefix(&root_path)?.to_path_buf();
match fs::metadata(&path) {
Ok(metadata) => {
let inode = metadata.ino();
let is_symlink = fs::symlink_metadata(&path)?.file_type().is_symlink();
let name: Arc<OsStr> = Arc::from(path.file_name().unwrap_or(OsStr::new("/")));
let mut ignore = IgnoreBuilder::new().build().add_parents(&path).unwrap();
if metadata.is_dir() {
ignore = ignore.add_child(&path).unwrap();
}
let is_ignored = ignore.matched(&path, metadata.is_dir()).is_ignore();
let parent = if path == root_path {
None
} else {
Some(fs::metadata(path.parent().unwrap())?.ino())
};
let prev_entry = snapshot.entries.get(&inode);
// If we haven't seen this inode yet, we are going to recursively scan it, so
// ignore event involving a descendant.
if prev_entry.is_none() {
while paths.peek().map_or(false, |p| p.starts_with(&path)) {
paths.next();
}
}
for path in paths {
let relative_path = path.strip_prefix(&snapshot.path)?.to_path_buf();
let snapshot_entry = snapshot.entry_for_path(relative_path);
observed.insert(inode);
if metadata.file_type().is_dir() {
let is_ignored = is_ignored || name.as_ref() == ".git";
let dir_entry = Entry::Dir {
parent,
name,
inode,
is_symlink,
is_ignored,
children: Arc::from([]),
pending: true,
};
self.insert_entries(Some(dir_entry.clone()));
if let Some(fs_entry) = self.fs_entry_for_path(&path) {
if let Some(snapshot_entry) = snapshot_entry {
// If the parent does not match:
// Remove snapshot entry from its parent.
// Set its parent to none.
// Add its inode to a set of inodes to potentially remove after the batch.
scan_queue_tx
.send(Ok(ScanJob {
ino: inode,
path: Arc::from(path),
relative_path,
dir_entry,
ignore: Some(ignore),
scan_queue: scan_queue_tx.clone(),
}))
.unwrap();
} else {
self.insert_entries(Some(Entry::File {
parent,
name,
path: PathEntry::new(inode, &relative_path, is_ignored),
inode,
is_symlink,
is_ignored,
}));
}
// If the parent does match, continue to next path
}
Err(err) => {
if err.kind() == io::ErrorKind::NotFound {
// Fill removed with the inodes of all descendants of this path.
let mut stack = Vec::new();
stack.extend(snapshot.inode_for_path(&relative_path));
while let Some(inode) = stack.pop() {
removed.insert(inode);
if let Some(Entry::Dir { children, .. }) = snapshot.entries.get(&inode)
{
stack.extend(children.iter().copied())
}
}
} else {
return Err(anyhow::Error::new(err));
}
// If we get here, we either had no snapshot entry at the path or we had the wrong
// entry (different inode) and removed it.
// In either case, we now need to add the entry for this path
if let Some(existing_snapshot_entry) = snapshot.entry_for_inode(fs_entry.inode) {
// An entry already exists in the snapshot, but in the wrong spot.
// Set its parent to the correct parent
// Insert it in the children of its parent
} else {
// An entry doesn't exist in the snapshot, this is the first time we've seen it.
// If this is a directory, do a recursive scan and discard subsequent events that are contained by the current path
// Then set the parent of the result of that scan to the correct parent
// Insert it in the children of that parent.
}
} else {
if let Some(snapshot_entry) = snapshot_entry {
// Remove snapshot entry from its parent.
// Set its parent to none.
// Add its inode to a set of inodes to potentially remove after the batch.
}
}
}
drop(scan_queue_tx);
let mut scanned_inodes = Vec::new();
scanned_inodes.resize_with(self.thread_pool.workers(), || Ok(Vec::new()));
self.thread_pool.scoped(|pool| {
for worker_inodes in &mut scanned_inodes {
pool.execute(|| {
let worker_inodes = worker_inodes;
while let Ok(job) = scan_queue_rx.recv() {
if let Err(err) = job.and_then(|job| {
self.scan_dir(job, Some(worker_inodes.as_mut().unwrap()))
}) {
*worker_inodes = Err(err);
break;
}
}
});
}
});
// Check whether any entry whose parent was set to none is still an orphan. If so, remove it and all descedants.
for worker_inodes in scanned_inodes {
for inode in worker_inodes? {
remove_counts.remove(&inode);
}
}
self.remove_entries(remove_counts);
*self.snapshot.lock() = snapshot;
Ok(self.notify.receiver_count() != 0)
Ok(())
}
fn fs_entry_for_path(&self, path: &Path) -> Option<Entry> {
todo!()
}
// fn process_events2(&self, mut events: Vec<fsevent::Event>) -> Result<bool> {
// if self.notify.receiver_count() == 0 {
// return Ok(false);
// }
// // TODO: should we canonicalize this at the start?
// let root_path = self.path.canonicalize()?;
// let snapshot = Snapshot {
// id: self.id,
// entries: self.entries.lock().clone(),
// root_inode: self.root_inode(),
// };
// let mut removed = HashSet::new();
// let mut observed = HashSet::new();
// let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded();
// events.sort_unstable_by(|a, b| a.path.cmp(&b.path));
// let mut paths = events.into_iter().map(|e| e.path).peekable();
// while let Some(path) = paths.next() {
// let relative_path = path.strip_prefix(&root_path)?.to_path_buf();
// match fs::metadata(&path) {
// Ok(metadata) => {
// let inode = metadata.ino();
// let is_symlink = fs::symlink_metadata(&path)?.file_type().is_symlink();
// let name: Arc<OsStr> = Arc::from(path.file_name().unwrap_or(OsStr::new("/")));
// let mut ignore = IgnoreBuilder::new().build().add_parents(&path).unwrap();
// if metadata.is_dir() {
// ignore = ignore.add_child(&path).unwrap();
// }
// let is_ignored = ignore.matched(&path, metadata.is_dir()).is_ignore();
// let parent = if path == root_path {
// None
// } else {
// Some(fs::metadata(path.parent().unwrap())?.ino())
// };
// let prev_entry = snapshot.entries.get(&inode);
// // If we haven't seen this inode yet, we are going to recursively scan it, so
// // ignore event involving a descendant.
// if prev_entry.is_none() {
// while paths.peek().map_or(false, |p| p.starts_with(&path)) {
// paths.next();
// }
// }
// observed.insert(inode);
// if metadata.file_type().is_dir() {
// let is_ignored = is_ignored || name.as_ref() == ".git";
// let dir_entry = Entry::Dir {
// parent,
// name,
// inode,
// is_symlink,
// is_ignored,
// children: Arc::from([]),
// pending: true,
// };
// self.insert_entries(Some(dir_entry.clone()));
// scan_queue_tx
// .send(Ok(ScanJob {
// ino: inode,
// path: Arc::from(path),
// relative_path,
// dir_entry,
// ignore: Some(ignore),
// scan_queue: scan_queue_tx.clone(),
// }))
// .unwrap();
// } else {
// self.insert_entries(Some(Entry::File {
// parent,
// name,
// path: PathEntry::new(inode, &relative_path, is_ignored),
// inode,
// is_symlink,
// is_ignored,
// }));
// }
// }
// Err(err) => {
// if err.kind() == io::ErrorKind::NotFound {
// // Fill removed with the inodes of all descendants of this path.
// let mut stack = Vec::new();
// stack.extend(snapshot.inode_for_path(&relative_path));
// while let Some(inode) = stack.pop() {
// removed.insert(inode);
// if let Some(Entry::Dir { children, .. }) = snapshot.entries.get(&inode)
// {
// stack.extend(children.iter().copied())
// }
// }
// } else {
// return Err(anyhow::Error::new(err));
// }
// }
// }
// }
// drop(scan_queue_tx);
// let mut scanned_inodes = Vec::new();
// scanned_inodes.resize_with(self.thread_pool.workers(), || Ok(Vec::new()));
// self.thread_pool.scoped(|pool| {
// for worker_inodes in &mut scanned_inodes {
// pool.execute(|| {
// let worker_inodes = worker_inodes;
// while let Ok(job) = scan_queue_rx.recv() {
// if let Err(err) = job.and_then(|job| {
// self.scan_dir(job, Some(worker_inodes.as_mut().unwrap()))
// }) {
// *worker_inodes = Err(err);
// break;
// }
// }
// });
// }
// });
// for worker_inodes in scanned_inodes {
// for inode in worker_inodes? {
// remove_counts.remove(&inode);
// }
// }
// self.remove_entries(remove_counts);
// Ok(self.notify.receiver_count() != 0)
// }
fn insert_entries(&self, entries: impl IntoIterator<Item = Entry>) {
let mut edits = Vec::new();
let mut new_parents = HashMap::new();
for entry in entries {
new_parents.insert(entry.ino(), entry.parent());
edits.push(Edit::Insert(entry));
}
let mut entries = self.entries.lock();
let prev_entries = entries.edit(edits);
Self::remove_stale_children(&mut *entries, prev_entries, new_parents);
self.snapshot
.lock()
.entries
.edit(entries.into_iter().map(Edit::Insert).collect::<Vec<_>>());
}
fn remove_entries(&self, inodes: impl IntoIterator<Item = u64>) {
let mut entries = self.entries.lock();
let prev_entries = entries.edit(inodes.into_iter().map(Edit::Remove).collect());
Self::remove_stale_children(&mut *entries, prev_entries, HashMap::new());
}
// fn insert_entries(&self, entries: impl IntoIterator<Item = Entry>) {
// let mut edits = Vec::new();
// let mut new_parents = HashMap::new();
// for entry in entries {
// new_parents.insert(entry.ino(), entry.parent());
// edits.push(Edit::Insert(entry));
// }
fn remove_stale_children(
tree: &mut SumTree<Entry>,
prev_entries: Vec<Entry>,
new_parents: HashMap<u64, Option<u64>>,
) {
let mut new_parent_entries = HashMap::new();
// let mut entries = self.snapshot.lock().entries;
// let prev_entries = entries.edit(edits);
// Self::remove_stale_children(&mut *entries, prev_entries, new_parents);
// }
for prev_entry in prev_entries {
let new_parent = new_parents.get(&prev_entry.ino()).copied().flatten();
if new_parent != prev_entry.parent() {
if let Some(prev_parent) = prev_entry.parent() {
let (_, new_children) =
new_parent_entries.entry(prev_parent).or_insert_with(|| {
let prev_parent_entry = tree.get(&prev_parent).unwrap();
if let Entry::Dir { children, .. } = prev_parent_entry {
(prev_parent_entry.clone(), children.to_vec())
} else {
unreachable!()
}
});
// fn remove_entries(&self, inodes: impl IntoIterator<Item = u64>) {
// let mut entries = self.entries.lock();
// let prev_entries = entries.edit(inodes.into_iter().map(Edit::Remove).collect());
// Self::remove_stale_children(&mut *entries, prev_entries, HashMap::new());
// }
if let Some(ix) = new_children.iter().position(|ino| *ino == prev_entry.ino()) {
new_children.swap_remove(ix);
}
}
}
}
// fn remove_stale_children(
// tree: &mut SumTree<Entry>,
// prev_entries: Vec<Entry>,
// new_parents: HashMap<u64, Option<u64>>,
// ) {
// let mut new_parent_entries = HashMap::new();
let parent_edits = new_parent_entries
.into_iter()
.map(|(_, (mut parent_entry, new_children))| {
if let Entry::Dir { children, .. } = &mut parent_entry {
*children = Arc::from(new_children);
} else {
unreachable!()
}
Edit::Insert(parent_entry)
})
.collect::<Vec<_>>();
tree.edit(parent_edits);
}
// for prev_entry in prev_entries {
// let new_parent = new_parents.get(&prev_entry.ino()).copied().flatten();
// if new_parent != prev_entry.parent() {
// if let Some(prev_parent) = prev_entry.parent() {
// let (_, new_children) =
// new_parent_entries.entry(prev_parent).or_insert_with(|| {
// let prev_parent_entry = tree.get(&prev_parent).unwrap();
// if let Entry::Dir { children, .. } = prev_parent_entry {
// (prev_parent_entry.clone(), children.to_vec())
// } else {
// unreachable!()
// }
// });
// if let Some(ix) = new_children.iter().position(|ino| *ino == prev_entry.ino()) {
// new_children.swap_remove(ix);
// }
// }
// }
// }
// let parent_edits = new_parent_entries
// .into_iter()
// .map(|(_, (mut parent_entry, new_children))| {
// if let Entry::Dir { children, .. } = &mut parent_entry {
// *children = Arc::from(new_children);
// } else {
// unreachable!()
// }
// Edit::Insert(parent_entry)
// })
// .collect::<Vec<_>>();
// tree.edit(parent_edits);
// }
}
struct ScanJob {