From 4878bf82ff57e21fc1c3e64c284a6fcef31d3c45 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 15 Apr 2021 17:38:52 -0700 Subject: [PATCH] Make EventStream interface more flexible Take a callback instead of an mpsc Sender. The run method blocks and invokes the callback for each batch of events. The caller controls the threading. The callback can return false to terminate the event stream. Co-Authored-By: Nathan Sobo --- fsevent/src/lib.rs | 100 ++++++++++++++++++++++----------------------- 1 file changed, 50 insertions(+), 50 deletions(-) diff --git a/fsevent/src/lib.rs b/fsevent/src/lib.rs index d4b5fb5b25..8f81b2e016 100644 --- a/fsevent/src/lib.rs +++ b/fsevent/src/lib.rs @@ -8,31 +8,33 @@ use std::{ os::unix::ffi::OsStrExt, path::{Path, PathBuf}, slice, - sync::mpsc::Sender, time::Duration, }; -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Event { pub event_id: u64, pub flags: StreamFlags, pub path: PathBuf, } -pub struct EventStream { +pub struct EventStream { stream: fs::FSEventStreamRef, - _sender: Box>>, + _callback: Box, } -unsafe impl Send for EventStream {} +unsafe impl Send for EventStream {} -impl EventStream { - pub fn new(paths: &[&Path], latency: Duration, event_sender: Sender>) -> Self { +impl EventStream +where + F: FnMut(&[Event]) -> bool, +{ + pub fn new(paths: &[&Path], latency: Duration, callback: F) -> Self { unsafe { - let sender = Box::new(event_sender); + let callback = Box::new(callback); let stream_context = fs::FSEventStreamContext { version: 0, - info: sender.as_ref() as *const _ as *mut c_void, + info: callback.as_ref() as *const _ as *mut c_void, retain: None, release: None, copy_description: None, @@ -58,7 +60,7 @@ impl EventStream { let stream = fs::FSEventStreamCreate( cf::kCFAllocatorDefault, - callback, + Self::trampoline, &stream_context, cf_paths, fs::kFSEventStreamEventIdSinceNow, @@ -71,7 +73,7 @@ impl EventStream { EventStream { stream, - _sender: sender, + _callback: callback, } } } @@ -92,44 +94,44 @@ impl EventStream { fs::FSEventStreamRelease(self.stream); } } -} -extern "C" fn callback( - stream_ref: fs::FSEventStreamRef, - info: *mut ::std::os::raw::c_void, - num: usize, // size_t numEvents - event_paths: *mut ::std::os::raw::c_void, // void *eventPaths - event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] - event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] -) { - unsafe { - let event_paths = event_paths as *const *const ::std::os::raw::c_char; - let e_ptr = event_flags as *mut u32; - let i_ptr = event_ids as *mut u64; - let sender = (info as *mut Sender>).as_mut().unwrap(); + extern "C" fn trampoline( + stream_ref: fs::FSEventStreamRef, + info: *mut ::std::os::raw::c_void, + num: usize, // size_t numEvents + event_paths: *mut ::std::os::raw::c_void, // void *eventPaths + event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] + event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] + ) { + unsafe { + let event_paths = event_paths as *const *const ::std::os::raw::c_char; + let e_ptr = event_flags as *mut u32; + let i_ptr = event_ids as *mut u64; + let callback = (info as *mut F).as_mut().unwrap(); - let paths = slice::from_raw_parts(event_paths, num); - let flags = slice::from_raw_parts_mut(e_ptr, num); - let ids = slice::from_raw_parts_mut(i_ptr, num); + let paths = slice::from_raw_parts(event_paths, num); + let flags = slice::from_raw_parts_mut(e_ptr, num); + let ids = slice::from_raw_parts_mut(i_ptr, num); - let mut events = Vec::with_capacity(num); - for p in 0..num { - let path_c_str = CStr::from_ptr(paths[p]); - let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); - if let Some(flag) = StreamFlags::from_bits(flags[p]) { - events.push(Event { - event_id: ids[p], - flags: flag, - path, - }); - } else { - debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); + let mut events = Vec::with_capacity(num); + for p in 0..num { + let path_c_str = CStr::from_ptr(paths[p]); + let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); + if let Some(flag) = StreamFlags::from_bits(flags[p]) { + events.push(Event { + event_id: ids[p], + flags: flag, + path, + }); + } else { + debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); + } } - } - if sender.send(events).is_err() { - fs::FSEventStreamStop(stream_ref); - cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + if !callback(&events) { + fs::FSEventStreamStop(stream_ref); + cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + } } } } @@ -242,7 +244,7 @@ impl std::fmt::Display for StreamFlags { } #[test] -fn test_observe() { +fn test_event_stream() { use std::{fs, sync::mpsc, time::Duration}; use tempdir::TempDir; @@ -251,7 +253,9 @@ fn test_observe() { fs::write(path.join("a"), "a contents").unwrap(); let (tx, rx) = mpsc::channel(); - let stream = EventStream::new(&[&path], Duration::from_millis(50), tx); + let stream = EventStream::new(&[&path], Duration::from_millis(50), move |events| { + tx.send(events.to_vec()).is_ok() + }); std::thread::spawn(move || stream.run()); fs::write(path.join("b"), "b contents").unwrap(); @@ -265,8 +269,4 @@ fn test_observe() { let event = events.last().unwrap(); assert_eq!(event.path, path.join("a")); assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - - let dir2 = TempDir::new("test_observe2").unwrap(); - fs::rename(path, dir2.path().join("something")).unwrap(); - let events = rx.recv_timeout(Duration::from_millis(500)).unwrap(); }