From 102026f3c70d308913ffda04d4a7b26035729288 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 9 Jul 2021 15:00:51 +0200 Subject: [PATCH] Implement a scoped pool on `executor::Background` --- gpui/src/executor.rs | 75 +++++++++++++++++++++++++--- zed/src/worktree.rs | 114 +++++++++++++++++++++++-------------------- 2 files changed, 130 insertions(+), 59 deletions(-) diff --git a/gpui/src/executor.rs b/gpui/src/executor.rs index 715d6f2874..8b8b38e2ac 100644 --- a/gpui/src/executor.rs +++ b/gpui/src/executor.rs @@ -3,12 +3,15 @@ use async_task::Runnable; pub use async_task::Task; use parking_lot::Mutex; use rand::prelude::*; -use smol::prelude::*; -use smol::{channel, Executor}; -use std::rc::Rc; -use std::sync::mpsc::SyncSender; -use std::sync::Arc; -use std::{marker::PhantomData, thread}; +use smol::{channel, prelude::*, Executor}; +use std::{ + marker::PhantomData, + mem, + pin::Pin, + rc::Rc, + sync::{mpsc::SyncSender, Arc}, + thread, +}; use crate::platform; @@ -25,6 +28,7 @@ pub enum Background { Deterministic(Arc), Production { executor: Arc>, + threads: usize, _stop: channel::Sender<()>, }, } @@ -155,8 +159,9 @@ impl Background { pub fn new() -> Self { let executor = Arc::new(Executor::new()); let stop = channel::unbounded::<()>(); + let threads = num_cpus::get(); - for i in 0..num_cpus::get() { + for i in 0..threads { let executor = executor.clone(); let stop = stop.1.clone(); thread::Builder::new() @@ -167,10 +172,18 @@ impl Background { Self::Production { executor, + threads, _stop: stop.0, } } + pub fn threads(&self) -> usize { + match self { + Self::Deterministic(_) => 1, + Self::Production { threads, .. } => *threads, + } + } + pub fn spawn(&self, future: F) -> Task where T: 'static + Send, @@ -181,6 +194,54 @@ impl Background { Self::Deterministic(executor) => executor.spawn(future), } } + + pub async fn scoped<'scope, F>(&self, scheduler: F) + where + F: FnOnce(&mut Scope<'scope>), + { + let mut scope = Scope { + futures: Default::default(), + _phantom: PhantomData, + }; + (scheduler)(&mut scope); + match self { + Self::Deterministic(_) => { + for spawned in scope.futures { + spawned.await; + } + } + Self::Production { executor, .. } => { + let spawned = scope + .futures + .into_iter() + .map(|f| executor.spawn(f)) + .collect::>(); + for task in spawned { + task.await; + } + } + } + } +} + +pub struct Scope<'a> { + futures: Vec + Send + 'static>>>, + _phantom: PhantomData<&'a ()>, +} + +impl<'a> Scope<'a> { + pub fn spawn(&mut self, f: F) + where + F: Future + Send + 'a, + { + let f = unsafe { + mem::transmute::< + Pin + Send + 'a>>, + Pin + Send + 'static>>, + >(Box::pin(f)) + }; + self.futures.push(f); + } } pub fn deterministic(seed: u64) -> (Rc, Arc) { diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 0a9ce4ea3c..04f15d34e6 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -16,7 +16,7 @@ use anyhow::{anyhow, Context, Result}; use atomic::Ordering::SeqCst; pub use fuzzy::{match_paths, PathMatch}; use gpui::{ - scoped_pool, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, + executor, AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task, WeakModelHandle, }; use lazy_static::lazy_static; @@ -374,9 +374,13 @@ impl Worktree { Duration::from_millis(100), ); let background_snapshot = tree.background_snapshot.clone(); - let id = tree.id; std::thread::spawn(move || { - let scanner = BackgroundScanner::new(fs, background_snapshot, scan_states_tx, id); + let scanner = BackgroundScanner::new( + background_snapshot, + scan_states_tx, + fs, + Arc::new(executor::Background::new()), + ); scanner.run(event_stream); }); tree._event_stream_handle = Some(event_stream_handle); @@ -392,12 +396,13 @@ impl Worktree { ) -> Self { let (tree, scan_states_tx) = LocalWorktree::new(path, languages, fs.clone(), cx); let background_snapshot = tree.background_snapshot.clone(); - let id = tree.id; let fs = fs.clone(); + let background = cx.background().clone(); cx.background() .spawn(async move { let events_rx = fs.events().await; - let scanner = BackgroundScanner::new(fs, background_snapshot, scan_states_tx, id); + let scanner = + BackgroundScanner::new(background_snapshot, scan_states_tx, fs, background); scanner.run_test(events_rx).await; }) .detach(); @@ -1980,21 +1985,21 @@ struct BackgroundScanner { fs: Arc, snapshot: Arc>, notify: Sender, - thread_pool: scoped_pool::Pool, + executor: Arc, } impl BackgroundScanner { fn new( - fs: Arc, snapshot: Arc>, notify: Sender, - worktree_id: usize, + fs: Arc, + executor: Arc, ) -> Self { Self { fs, snapshot, notify, - thread_pool: scoped_pool::Pool::new(16, format!("worktree-{}-scanner", worktree_id)), + executor, } } @@ -2124,21 +2129,23 @@ impl BackgroundScanner { .unwrap(); drop(tx); - self.thread_pool.scoped(|pool| { - for _ in 0..self.thread_pool.thread_count() { - pool.execute(|| { - while let Ok(job) = rx.recv() { - if let Err(err) = smol::block_on(self.scan_dir( - root_char_bag, - next_entry_id.clone(), - &job, - )) { - log::error!("error scanning {:?}: {}", job.abs_path, err); + self.executor + .scoped(|scope| { + for _ in 0..self.executor.threads() { + scope.spawn(async { + while let Ok(job) = rx.recv() { + if let Err(err) = smol::block_on(self.scan_dir( + root_char_bag, + next_entry_id.clone(), + &job, + )) { + log::error!("error scanning {:?}: {}", job.abs_path, err); + } } - } - }); - } - }); + }); + } + }) + .await; } Ok(()) @@ -2324,30 +2331,31 @@ impl BackgroundScanner { // Scan any directories that were created as part of this event batch. drop(scan_queue_tx); - self.thread_pool.scoped(|pool| { - for _ in 0..self.thread_pool.thread_count() { - pool.execute(|| { - while let Ok(job) = scan_queue_rx.recv() { - if let Err(err) = smol::block_on(self.scan_dir( - root_char_bag, - next_entry_id.clone(), - &job, - )) { - log::error!("error scanning {:?}: {}", job.abs_path, err); + self.executor + .scoped(|scope| { + for _ in 0..self.executor.threads() { + scope.spawn(async { + while let Ok(job) = scan_queue_rx.recv() { + if let Err(err) = self + .scan_dir(root_char_bag, next_entry_id.clone(), &job) + .await + { + log::error!("error scanning {:?}: {}", job.abs_path, err); + } } - } - }); - } - }); + }); + } + }) + .await; // Attempt to detect renames only over a single batch of file-system events. self.snapshot.lock().removed_entry_ids.clear(); - self.update_ignore_statuses(); + self.update_ignore_statuses().await; true } - fn update_ignore_statuses(&self) { + async fn update_ignore_statuses(&self) { let mut snapshot = self.snapshot(); let mut ignores_to_update = Vec::new(); @@ -2390,15 +2398,17 @@ impl BackgroundScanner { } drop(ignore_queue_tx); - self.thread_pool.scoped(|scope| { - for _ in 0..self.thread_pool.thread_count() { - scope.execute(|| { - while let Ok(job) = ignore_queue_rx.recv() { - self.update_ignore_status(job, &snapshot); - } - }); - } - }); + self.executor + .scoped(|scope| { + for _ in 0..self.executor.threads() { + scope.spawn(async { + while let Ok(job) = ignore_queue_rx.recv() { + self.update_ignore_status(job, &snapshot); + } + }); + } + }) + .await; } fn update_ignore_status(&self, job: UpdateIgnoreStatusJob, snapshot: &Snapshot) { @@ -3147,7 +3157,6 @@ mod tests { let (notify_tx, _notify_rx) = smol::channel::unbounded(); let mut scanner = BackgroundScanner::new( - Arc::new(OsFs), Arc::new(Mutex::new(Snapshot { id: 0, scan_id: 0, @@ -3161,7 +3170,8 @@ mod tests { next_entry_id: Default::default(), })), notify_tx, - 0, + Arc::new(OsFs), + Arc::new(gpui::executor::Background::new()), ); smol::block_on(scanner.scan_dirs()).unwrap(); scanner.snapshot().check_invariants(); @@ -3186,7 +3196,6 @@ mod tests { let (notify_tx, _notify_rx) = smol::channel::unbounded(); let mut new_scanner = BackgroundScanner::new( - scanner.fs.clone(), Arc::new(Mutex::new(Snapshot { id: 0, scan_id: 0, @@ -3200,7 +3209,8 @@ mod tests { next_entry_id: Default::default(), })), notify_tx, - 1, + scanner.fs.clone(), + scanner.executor.clone(), ); smol::block_on(new_scanner.scan_dirs()).unwrap(); assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());