diff --git a/fsevent/src/lib.rs b/fsevent/src/lib.rs index d4b5fb5b25..8f81b2e016 100644 --- a/fsevent/src/lib.rs +++ b/fsevent/src/lib.rs @@ -8,31 +8,33 @@ use std::{ os::unix::ffi::OsStrExt, path::{Path, PathBuf}, slice, - sync::mpsc::Sender, time::Duration, }; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Event { pub event_id: u64, pub flags: StreamFlags, pub path: PathBuf, } -pub struct EventStream { +pub struct EventStream { stream: fs::FSEventStreamRef, - _sender: Box>>, + _callback: Box, } -unsafe impl Send for EventStream {} +unsafe impl Send for EventStream {} -impl EventStream { - pub fn new(paths: &[&Path], latency: Duration, event_sender: Sender>) -> Self { +impl EventStream +where + F: FnMut(&[Event]) -> bool, +{ + pub fn new(paths: &[&Path], latency: Duration, callback: F) -> Self { unsafe { - let sender = Box::new(event_sender); + let callback = Box::new(callback); let stream_context = fs::FSEventStreamContext { version: 0, - info: sender.as_ref() as *const _ as *mut c_void, + info: callback.as_ref() as *const _ as *mut c_void, retain: None, release: None, copy_description: None, @@ -58,7 +60,7 @@ impl EventStream { let stream = fs::FSEventStreamCreate( cf::kCFAllocatorDefault, - callback, + Self::trampoline, &stream_context, cf_paths, fs::kFSEventStreamEventIdSinceNow, @@ -71,7 +73,7 @@ impl EventStream { EventStream { stream, - _sender: sender, + _callback: callback, } } } @@ -92,44 +94,44 @@ impl EventStream { fs::FSEventStreamRelease(self.stream); } } -} -extern "C" fn callback( - stream_ref: fs::FSEventStreamRef, - info: *mut ::std::os::raw::c_void, - num: usize, // size_t numEvents - event_paths: *mut ::std::os::raw::c_void, // void *eventPaths - event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] - event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] -) { - unsafe { - let event_paths = event_paths as *const *const ::std::os::raw::c_char; - let e_ptr = event_flags as *mut u32; - let i_ptr = event_ids as *mut u64; - let sender = (info as *mut Sender>).as_mut().unwrap(); + extern "C" fn trampoline( + stream_ref: fs::FSEventStreamRef, + info: *mut ::std::os::raw::c_void, + num: usize, // size_t numEvents + event_paths: *mut ::std::os::raw::c_void, // void *eventPaths + event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] + event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] + ) { + unsafe { + let event_paths = event_paths as *const *const ::std::os::raw::c_char; + let e_ptr = event_flags as *mut u32; + let i_ptr = event_ids as *mut u64; + let callback = (info as *mut F).as_mut().unwrap(); - let paths = slice::from_raw_parts(event_paths, num); - let flags = slice::from_raw_parts_mut(e_ptr, num); - let ids = slice::from_raw_parts_mut(i_ptr, num); + let paths = slice::from_raw_parts(event_paths, num); + let flags = slice::from_raw_parts_mut(e_ptr, num); + let ids = slice::from_raw_parts_mut(i_ptr, num); - let mut events = Vec::with_capacity(num); - for p in 0..num { - let path_c_str = CStr::from_ptr(paths[p]); - let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); - if let Some(flag) = StreamFlags::from_bits(flags[p]) { - events.push(Event { - event_id: ids[p], - flags: flag, - path, - }); - } else { - debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); + let mut events = Vec::with_capacity(num); + for p in 0..num { + let path_c_str = CStr::from_ptr(paths[p]); + let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); + if let Some(flag) = StreamFlags::from_bits(flags[p]) { + events.push(Event { + event_id: ids[p], + flags: flag, + path, + }); + } else { + debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); + } } - } - if sender.send(events).is_err() { - fs::FSEventStreamStop(stream_ref); - cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + if !callback(&events) { + fs::FSEventStreamStop(stream_ref); + cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + } } } } @@ -242,7 +244,7 @@ impl std::fmt::Display for StreamFlags { } #[test] -fn test_observe() { +fn test_event_stream() { use std::{fs, sync::mpsc, time::Duration}; use tempdir::TempDir; @@ -251,7 +253,9 @@ fn test_observe() { fs::write(path.join("a"), "a contents").unwrap(); let (tx, rx) = mpsc::channel(); - let stream = EventStream::new(&[&path], Duration::from_millis(50), tx); + let stream = EventStream::new(&[&path], Duration::from_millis(50), move |events| { + tx.send(events.to_vec()).is_ok() + }); std::thread::spawn(move || stream.run()); fs::write(path.join("b"), "b contents").unwrap(); @@ -265,8 +269,4 @@ fn test_observe() { let event = events.last().unwrap(); assert_eq!(event.path, path.join("a")); assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - - let dir2 = TempDir::new("test_observe2").unwrap(); - fs::rename(path, dir2.path().join("something")).unwrap(); - let events = rx.recv_timeout(Duration::from_millis(500)).unwrap(); }