From 9fd8acdce7fd8a9199523238a6035917ef990379 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 20 Apr 2021 13:51:24 -0700 Subject: [PATCH 1/2] Use our own scoped_pool implementation --- Cargo.lock | 28 +----- Cargo.toml | 2 +- gpui/Cargo.toml | 2 +- gpui/src/app.rs | 2 +- scoped_pool/Cargo.toml | 8 ++ scoped_pool/src/lib.rs | 188 +++++++++++++++++++++++++++++++++++++++++ zed/src/worktree.rs | 12 +-- 7 files changed, 205 insertions(+), 37 deletions(-) create mode 100644 scoped_pool/Cargo.toml create mode 100644 scoped_pool/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4096564e92..06a6e41f5a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -448,12 +448,6 @@ dependencies = [ "cfg-if 1.0.0", ] -[[package]] -name = "crossbeam" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd66663db5a988098a89599d4857919b3acf7f61402e61365acfd3919857b9be" - [[package]] name = "crossbeam-channel" version = "0.4.4" @@ -1062,7 +1056,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd96ffd135b2fd7b973ac026d28085defbe8983df057ced3eb4f2130b0831312" dependencies = [ - "scopeguard 1.1.0", + "scopeguard", ] [[package]] @@ -1714,13 +1708,9 @@ dependencies = [ [[package]] name = "scoped-pool" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "817a3a15e704545ce59ed2b5c60a5d32bda4d7869befb8b36667b658a6c00b43" +version = "0.0.1" dependencies = [ - "crossbeam", - "scopeguard 0.1.2", - "variance", + "crossbeam-channel 0.5.0", ] [[package]] @@ -1729,12 +1719,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" -[[package]] -name = "scopeguard" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59a076157c1e2dc561d8de585151ee6965d910dd4dcb5dabb7ae3e83981a6c57" - [[package]] name = "scopeguard" version = "1.1.0" @@ -2138,12 +2122,6 @@ dependencies = [ "xmlwriter", ] -[[package]] -name = "variance" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3abfc2be1fb59663871379ea884fd81de80c496f2274e021c01d6fe56cd77b05" - [[package]] name = "vec-arena" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 1ca7597d18..c58e56b67a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["zed", "gpui", "fsevent"] +members = ["zed", "gpui", "fsevent", "scoped_pool"] [patch.crates-io] async-task = {git = "https://github.com/zed-industries/async-task", rev = "341b57d6de98cdfd7b418567b8de2022ca993a6e"} diff --git a/gpui/Cargo.toml b/gpui/Cargo.toml index 0d6a2982ad..8c7c3bf4cb 100644 --- a/gpui/Cargo.toml +++ b/gpui/Cargo.toml @@ -18,7 +18,7 @@ postage = {version = "0.4.1", features = ["futures-traits"]} rand = "0.8.3" replace_with = "0.1.7" resvg = "0.14" -scoped-pool = "1.0.0" +scoped-pool = {path = "../scoped_pool"} seahash = "4.1" serde = {version = "1.0.125", features = ["derive"]} serde_json = "1.0.64" diff --git a/gpui/src/app.rs b/gpui/src/app.rs index d81dd8797f..7c396a51e7 100644 --- a/gpui/src/app.rs +++ b/gpui/src/app.rs @@ -411,7 +411,7 @@ impl MutableAppContext { windows: HashMap::new(), ref_counts: Arc::new(Mutex::new(RefCounts::default())), background: Arc::new(executor::Background::new()), - thread_pool: scoped_pool::Pool::new(num_cpus::get()), + thread_pool: scoped_pool::Pool::new(num_cpus::get(), "app"), }, actions: HashMap::new(), global_actions: HashMap::new(), diff --git a/scoped_pool/Cargo.toml b/scoped_pool/Cargo.toml new file mode 100644 index 0000000000..a2e5a1206f --- /dev/null +++ b/scoped_pool/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "scoped-pool" +version = "0.0.1" +license = "MIT" +edition = "2018" + +[dependencies] +crossbeam-channel = "0.5" diff --git a/scoped_pool/src/lib.rs b/scoped_pool/src/lib.rs new file mode 100644 index 0000000000..2ad56f2b65 --- /dev/null +++ b/scoped_pool/src/lib.rs @@ -0,0 +1,188 @@ +use crossbeam_channel as chan; +use std::{marker::PhantomData, mem::transmute, thread}; + +#[derive(Clone)] +pub struct Pool { + req_tx: chan::Sender, + thread_count: usize, +} + +pub struct Scope<'a> { + req_count: usize, + req_tx: chan::Sender, + resp_tx: chan::Sender<()>, + resp_rx: chan::Receiver<()>, + phantom: PhantomData<&'a ()>, +} + +struct Request { + callback: Box, + resp_tx: chan::Sender<()>, +} + +impl Pool { + pub fn new(thread_count: usize, name: &str) -> Self { + let (req_tx, req_rx) = chan::unbounded(); + for i in 0..thread_count { + thread::Builder::new() + .name(format!("scoped_pool {} {}", name, i)) + .spawn({ + let req_rx = req_rx.clone(); + move || loop { + match req_rx.recv() { + Err(_) => break, + Ok(Request { callback, resp_tx }) => { + callback(); + resp_tx.send(()).ok(); + } + } + } + }) + .expect("scoped_pool: failed to spawn thread"); + } + Self { + req_tx, + thread_count, + } + } + + pub fn thread_count(&self) -> usize { + self.thread_count + } + + pub fn scoped<'scope, F, R>(&self, scheduler: F) -> R + where + F: FnOnce(&mut Scope<'scope>) -> R, + { + let (resp_tx, resp_rx) = chan::bounded(1); + let mut scope = Scope { + resp_tx, + resp_rx, + req_count: 0, + phantom: PhantomData, + req_tx: self.req_tx.clone(), + }; + let result = scheduler(&mut scope); + scope.wait(); + result + } +} + +impl<'scope> Scope<'scope> { + pub fn execute(&mut self, callback: F) + where + F: FnOnce() + Send + 'scope, + { + // Transmute the callback's lifetime to be 'static. This is safe because in ::wait, + // we block until all the callbacks have been called and dropped. + let callback = unsafe { + transmute::, Box>( + Box::new(callback), + ) + }; + + self.req_count += 1; + self.req_tx + .send(Request { + callback, + resp_tx: self.resp_tx.clone(), + }) + .unwrap(); + } + + fn wait(&self) { + for _ in 0..self.req_count { + self.resp_rx.recv().unwrap(); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::{Arc, Mutex}; + + #[test] + fn test_execute() { + let pool = Pool::new(3, "test"); + + { + let vec = Mutex::new(Vec::new()); + pool.scoped(|scope| { + for _ in 0..3 { + scope.execute(|| { + for i in 0..5 { + vec.lock().unwrap().push(i); + } + }); + } + }); + + let mut vec = vec.into_inner().unwrap(); + vec.sort_unstable(); + assert_eq!(vec, [0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]) + } + } + + #[test] + fn test_clone_send_and_execute() { + let pool = Pool::new(3, "test"); + + let mut threads = Vec::new(); + for _ in 0..3 { + threads.push(thread::spawn({ + let pool = pool.clone(); + move || { + let vec = Mutex::new(Vec::new()); + pool.scoped(|scope| { + for _ in 0..3 { + scope.execute(|| { + for i in 0..5 { + vec.lock().unwrap().push(i); + } + }); + } + }); + let mut vec = vec.into_inner().unwrap(); + vec.sort_unstable(); + assert_eq!(vec, [0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]) + } + })); + } + + for thread in threads { + thread.join().unwrap(); + } + } + + #[test] + fn test_share_and_execute() { + let pool = Arc::new(Pool::new(3, "test")); + + let mut threads = Vec::new(); + for _ in 0..3 { + threads.push(thread::spawn({ + let pool = pool.clone(); + move || { + let vec = Mutex::new(Vec::new()); + pool.scoped(|scope| { + for _ in 0..3 { + scope.execute(|| { + for i in 0..5 { + vec.lock().unwrap().push(i); + } + }); + } + }); + let mut vec = vec.into_inner().unwrap(); + vec.sort_unstable(); + assert_eq!(vec, [0, 0, 0, 1, 1, 1, 2, 2, 2, 3, 3, 3, 4, 4, 4]) + } + })); + } + + for thread in threads { + thread.join().unwrap(); + } + } +} diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 27abd4510a..a54b11ee8e 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -500,7 +500,7 @@ impl BackgroundScanner { Self { snapshot: Mutex::new(snapshot), notify, - thread_pool: scoped_pool::Pool::new(16), + thread_pool: scoped_pool::Pool::new(16, "background-scanner"), } } @@ -592,7 +592,7 @@ impl BackgroundScanner { drop(tx); let mut results = Vec::new(); - results.resize_with(self.thread_pool.workers(), || Ok(())); + results.resize_with(self.thread_pool.thread_count(), || Ok(())); self.thread_pool.scoped(|pool| { for result in &mut results { pool.execute(|| { @@ -762,7 +762,7 @@ impl BackgroundScanner { // Scan any directories that were created as part of this event batch. drop(scan_queue_tx); self.thread_pool.scoped(|pool| { - for _ in 0..self.thread_pool.workers() { + for _ in 0..self.thread_pool.thread_count() { pool.execute(|| { while let Ok(job) = scan_queue_rx.recv() { if let Err(err) = job.and_then(|job| self.scan_dir(job)) { @@ -844,12 +844,6 @@ impl BackgroundScanner { } } -impl Drop for BackgroundScanner { - fn drop(&mut self) { - self.thread_pool.shutdown(); - } -} - struct ScanJob { inode: u64, path: Arc, From b20f5e91392e655f291bd93270fbd00564f4a7fc Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 20 Apr 2021 15:55:29 -0700 Subject: [PATCH 2/2] Fully halt background scanner threads when dropping Worktree * Rework fsevent API to expose a handle for halting the event stream Co-Authored-By: Nathan Sobo --- Cargo.lock | 1 + fsevent/Cargo.toml | 1 + fsevent/examples/events.rs | 4 +- fsevent/src/lib.rs | 130 ++++++++++++++++++++++++++++++------- scoped_pool/src/lib.rs | 4 +- zed/src/worktree.rs | 86 ++++++++++++------------ 6 files changed, 158 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 06a6e41f5a..8d6115fd1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -753,6 +753,7 @@ version = "2.0.2" dependencies = [ "bitflags", "fsevent-sys", + "parking_lot", "tempdir", ] diff --git a/fsevent/Cargo.toml b/fsevent/Cargo.toml index cd7b2f0e7f..03b565cb9f 100644 --- a/fsevent/Cargo.toml +++ b/fsevent/Cargo.toml @@ -7,6 +7,7 @@ edition = "2018" [dependencies] bitflags = "1" fsevent-sys = "3.0.2" +parking_lot = "0.11.1" [dev-dependencies] tempdir = "0.3.7" diff --git a/fsevent/examples/events.rs b/fsevent/examples/events.rs index 7f064d9050..73ec6405d4 100644 --- a/fsevent/examples/events.rs +++ b/fsevent/examples/events.rs @@ -5,12 +5,12 @@ fn main() { let paths = args().skip(1).collect::>(); let paths = paths.iter().map(Path::new).collect::>(); assert!(paths.len() > 0, "Must pass 1 or more paths as arguments"); - let stream = EventStream::new(&paths, Duration::from_millis(100), |events| { + let (stream, _handle) = EventStream::new(&paths, Duration::from_millis(100)); + stream.run(|events| { eprintln!("event batch"); for event in events { eprintln!(" {:?}", event); } true }); - stream.run(); } diff --git a/fsevent/src/lib.rs b/fsevent/src/lib.rs index 38baaf2c84..c53be01686 100644 --- a/fsevent/src/lib.rs +++ b/fsevent/src/lib.rs @@ -2,12 +2,14 @@ use bitflags::bitflags; use fsevent_sys::{self as fs, core_foundation as cf}; +use parking_lot::Mutex; use std::{ convert::AsRef, ffi::{c_void, CStr, OsStr}, os::unix::ffi::OsStrExt, path::{Path, PathBuf}, slice, + sync::Arc, time::Duration, }; @@ -18,20 +20,29 @@ pub struct Event { pub path: PathBuf, } -pub struct EventStream { +pub struct EventStream { stream: fs::FSEventStreamRef, - _callback: Box, + state: Arc>, + callback: Box>, } -unsafe impl Send for EventStream {} +type RunCallback = Box) -> bool>; -impl EventStream -where - F: FnMut(Vec) -> bool, -{ - pub fn new(paths: &[&Path], latency: Duration, callback: F) -> Self { +enum Lifecycle { + New, + Running(cf::CFRunLoopRef), + Stopped, +} + +pub struct Handle(Arc>); + +unsafe impl Send for EventStream {} +unsafe impl Send for Lifecycle {} + +impl EventStream { + pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { unsafe { - let callback = Box::new(callback); + let callback = Box::new(None); let stream_context = fs::FSEventStreamContext { version: 0, info: callback.as_ref() as *const _ as *mut c_void, @@ -71,20 +82,35 @@ where ); cf::CFRelease(cf_paths); - EventStream { - stream, - _callback: callback, - } + let state = Arc::new(Mutex::new(Lifecycle::New)); + + ( + EventStream { + stream, + state: state.clone(), + callback, + }, + Handle(state), + ) } } - pub fn run(self) { + pub fn run(mut self, f: F) + where + F: FnMut(Vec) -> bool + 'static, + { + *self.callback = Some(Box::new(f)); unsafe { - fs::FSEventStreamScheduleWithRunLoop( - self.stream, - cf::CFRunLoopGetCurrent(), - cf::kCFRunLoopDefaultMode, - ); + let run_loop = cf::CFRunLoopGetCurrent(); + { + let mut state = self.state.lock(); + match *state { + Lifecycle::New => *state = Lifecycle::Running(run_loop), + Lifecycle::Running(_) => unreachable!(), + Lifecycle::Stopped => return, + } + } + fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode); fs::FSEventStreamStart(self.stream); cf::CFRunLoopRun(); @@ -107,7 +133,11 @@ where let event_paths = event_paths as *const *const ::std::os::raw::c_char; let e_ptr = event_flags as *mut u32; let i_ptr = event_ids as *mut u64; - let callback = (info as *mut F).as_mut().unwrap(); + let callback = (info as *mut Option) + .as_mut() + .unwrap() + .as_mut() + .unwrap(); let paths = slice::from_raw_parts(event_paths, num); let flags = slice::from_raw_parts_mut(e_ptr, num); @@ -136,6 +166,18 @@ where } } +impl Drop for Handle { + fn drop(&mut self) { + let mut state = self.0.lock(); + if let Lifecycle::Running(run_loop) = *state { + unsafe { + cf::CFRunLoopStop(run_loop); + } + } + *state = Lifecycle::Stopped; + } +} + // Synchronize with // /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h bitflags! { @@ -253,10 +295,8 @@ fn test_event_stream() { fs::write(path.join("a"), "a contents").unwrap(); let (tx, rx) = mpsc::channel(); - let stream = EventStream::new(&[&path], Duration::from_millis(50), move |events| { - tx.send(events.to_vec()).is_ok() - }); - std::thread::spawn(move || stream.run()); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + std::thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); fs::write(path.join("b"), "b contents").unwrap(); let events = rx.recv_timeout(Duration::from_millis(500)).unwrap(); @@ -269,4 +309,46 @@ fn test_event_stream() { let event = events.last().unwrap(); assert_eq!(event.path, path.join("a")); assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); + drop(handle); +} + +#[test] +fn test_event_stream_shutdown() { + use std::{fs, sync::mpsc, time::Duration}; + use tempdir::TempDir; + + let dir = TempDir::new("test_observe").unwrap(); + let path = dir.path().canonicalize().unwrap(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + std::thread::spawn(move || { + stream.run({ + let tx = tx.clone(); + move |_| { + tx.send(()).unwrap(); + true + } + }); + tx.send(()).unwrap(); + }); + + fs::write(path.join("b"), "b contents").unwrap(); + rx.recv_timeout(Duration::from_millis(500)).unwrap(); + + drop(handle); + rx.recv_timeout(Duration::from_millis(500)).unwrap(); +} + +#[test] +fn test_event_stream_shutdown_before_run() { + use std::time::Duration; + use tempdir::TempDir; + + let dir = TempDir::new("test_observe").unwrap(); + let path = dir.path().canonicalize().unwrap(); + + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + drop(handle); + stream.run(|_| true); } diff --git a/scoped_pool/src/lib.rs b/scoped_pool/src/lib.rs index 2ad56f2b65..da4d193e07 100644 --- a/scoped_pool/src/lib.rs +++ b/scoped_pool/src/lib.rs @@ -21,11 +21,11 @@ struct Request { } impl Pool { - pub fn new(thread_count: usize, name: &str) -> Self { + pub fn new(thread_count: usize, name: impl AsRef) -> Self { let (req_tx, req_rx) = chan::unbounded(); for i in 0..thread_count { thread::Builder::new() - .name(format!("scoped_pool {} {}", name, i)) + .name(format!("scoped_pool {} {}", name.as_ref(), i)) .spawn({ let req_rx = req_rx.clone(); move || loop { diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index a54b11ee8e..5d5e7b5742 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -37,8 +37,9 @@ enum ScanState { pub struct Worktree { snapshot: Snapshot, - scanner: Arc, + background_snapshot: Arc>, scan_state: (watch::Sender, watch::Receiver), + _event_stream_handle: fsevent::Handle, poll_scheduled: bool, } @@ -50,25 +51,33 @@ pub struct FileHandle { impl Worktree { pub fn new(path: impl Into>, ctx: &mut ModelContext) -> Self { - let scan_state = smol::channel::unbounded(); + let (scan_state_tx, scan_state_rx) = smol::channel::unbounded(); + let id = ctx.model_id(); let snapshot = Snapshot { - id: ctx.model_id(), + id, path: path.into(), root_inode: None, entries: Default::default(), }; - let scanner = Arc::new(BackgroundScanner::new(snapshot.clone(), scan_state.0)); + let (event_stream, event_stream_handle) = + fsevent::EventStream::new(&[snapshot.path.as_ref()], Duration::from_millis(100)); + + let background_snapshot = Arc::new(Mutex::new(snapshot.clone())); + let tree = Self { snapshot, - scanner, + background_snapshot: background_snapshot.clone(), scan_state: watch::channel_with(ScanState::Scanning), + _event_stream_handle: event_stream_handle, poll_scheduled: false, }; - let scanner = tree.scanner.clone(); - std::thread::spawn(move || scanner.run()); + std::thread::spawn(move || { + let scanner = BackgroundScanner::new(background_snapshot, scan_state_tx, id); + scanner.run(event_stream) + }); - ctx.spawn_stream(scan_state.1, Self::observe_scan_state, |_, _| {}) + ctx.spawn_stream(scan_state_rx, Self::observe_scan_state, |_, _| {}) .detach(); tree @@ -90,7 +99,7 @@ impl Worktree { } fn poll_entries(&mut self, ctx: &mut ModelContext) { - self.snapshot = self.scanner.snapshot(); + self.snapshot = self.background_snapshot.lock().clone(); ctx.notify(); if self.is_scanning() && !self.poll_scheduled { @@ -490,17 +499,17 @@ impl<'a> sum_tree::Dimension<'a, EntrySummary> for FileCount { } struct BackgroundScanner { - snapshot: Mutex, + snapshot: Arc>, notify: Sender, thread_pool: scoped_pool::Pool, } impl BackgroundScanner { - fn new(snapshot: Snapshot, notify: Sender) -> Self { + fn new(snapshot: Arc>, notify: Sender, worktree_id: usize) -> Self { Self { - snapshot: Mutex::new(snapshot), + snapshot, notify, - thread_pool: scoped_pool::Pool::new(16, "background-scanner"), + thread_pool: scoped_pool::Pool::new(16, format!("worktree-{}-scanner", worktree_id)), } } @@ -512,28 +521,7 @@ impl BackgroundScanner { self.snapshot.lock().clone() } - fn run(&self) { - let path = self.snapshot.lock().path.clone(); - - // Create the event stream before we start scanning to ensure we receive events for changes - // that occur in the middle of the scan. - let event_stream = - fsevent::EventStream::new(&[path.as_ref()], Duration::from_millis(100), |events| { - if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() { - return false; - } - - if !self.process_events(events) { - return false; - } - - if smol::block_on(self.notify.send(ScanState::Idle)).is_err() { - return false; - } - - true - }); - + fn run(self, event_stream: fsevent::EventStream) { if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() { return; } @@ -548,7 +536,21 @@ impl BackgroundScanner { return; } - event_stream.run(); + event_stream.run(move |events| { + if smol::block_on(self.notify.send(ScanState::Scanning)).is_err() { + return false; + } + + if !self.process_events(events) { + return false; + } + + if smol::block_on(self.notify.send(ScanState::Idle)).is_err() { + return false; + } + + true + }); } fn scan_dirs(&self) -> io::Result<()> { @@ -945,6 +947,8 @@ mod tests { ); }) }); + + eprintln!("HI"); } #[test] @@ -1045,13 +1049,14 @@ mod tests { let (notify_tx, _notify_rx) = smol::channel::unbounded(); let scanner = BackgroundScanner::new( - Snapshot { + Arc::new(Mutex::new(Snapshot { id: 0, path: root_dir.path().into(), root_inode: None, entries: Default::default(), - }, + })), notify_tx, + 0, ); scanner.scan_dirs().unwrap(); @@ -1073,13 +1078,14 @@ mod tests { let (notify_tx, _notify_rx) = smol::channel::unbounded(); let new_scanner = BackgroundScanner::new( - Snapshot { + Arc::new(Mutex::new(Snapshot { id: 0, path: root_dir.path().into(), root_inode: None, entries: Default::default(), - }, + })), notify_tx, + 1, ); new_scanner.scan_dirs().unwrap(); assert_eq!(scanner.snapshot().to_vec(), new_scanner.snapshot().to_vec());