Parallelize scanning of changed directories

This commit is contained in:
Antonio Scandurra 2021-04-16 16:11:55 +02:00
parent fd12117288
commit fbd5fbd703

View file

@ -434,7 +434,6 @@ impl BackgroundScanner {
Duration::from_millis(100),
|events| {
if let Err(err) = scanner.process_events(events) {
dbg!(err);
// TODO: handle errors
false
} else {
@ -637,6 +636,7 @@ impl BackgroundScanner {
let mut paths = events.into_iter().map(|e| &*e.path).collect::<Vec<_>>();
paths.sort_unstable();
let (scan_queue_tx, scan_queue_rx) = crossbeam_channel::unbounded();
let mut paths = paths.into_iter().peekable();
while let Some(path) = paths.next() {
let relative_path = path.strip_prefix(&root_path)?.to_path_buf();
@ -685,45 +685,16 @@ impl BackgroundScanner {
};
self.insert_entries(Some(dir_entry.clone()));
let (tx, rx) = crossbeam_channel::unbounded();
tx.send(Ok(ScanJob {
ino: inode,
path: Arc::from(path),
relative_path,
dir_entry,
ignore: Some(ignore),
scan_queue: tx.clone(),
}))
.unwrap();
drop(tx);
let mut inodes = Vec::new();
inodes.resize_with(self.thread_pool.workers(), || Ok(Vec::new()));
self.thread_pool.scoped(|pool| {
for worker_inodes in &mut inodes {
pool.execute(|| {
let worker_inodes = worker_inodes;
while let Ok(job) = rx.recv() {
if let Err(err) = job.and_then(|job| {
self.scan_dir(
job,
Some(worker_inodes.as_mut().unwrap()),
)
}) {
*worker_inodes = Err(err);
break;
}
}
});
}
});
for worker_inodes in inodes {
for inode in worker_inodes? {
removed.remove(&inode);
}
}
scan_queue_tx
.send(Ok(ScanJob {
ino: inode,
path: Arc::from(path),
relative_path,
dir_entry,
ignore: Some(ignore),
scan_queue: scan_queue_tx.clone(),
}))
.unwrap();
} else {
self.insert_entries(Some(Entry::File {
parent,
@ -742,7 +713,31 @@ impl BackgroundScanner {
}
}
}
drop(scan_queue_tx);
let mut inodes = Vec::new();
inodes.resize_with(self.thread_pool.workers(), || Ok(Vec::new()));
self.thread_pool.scoped(|pool| {
for worker_inodes in &mut inodes {
pool.execute(|| {
let worker_inodes = worker_inodes;
while let Ok(job) = scan_queue_rx.recv() {
if let Err(err) = job.and_then(|job| {
self.scan_dir(job, Some(worker_inodes.as_mut().unwrap()))
}) {
*worker_inodes = Err(err);
break;
}
}
});
}
});
for worker_inodes in inodes {
for inode in worker_inodes? {
removed.remove(&inode);
}
}
self.remove_entries(removed);
Ok(self.notify.receiver_count() != 0)