#![cfg(target_os = "macos")] use bitflags::bitflags; use fsevent_sys::{self as fs, core_foundation as cf}; use parking_lot::Mutex; use std::{ convert::AsRef, ffi::{c_void, CStr, OsStr}, os::unix::ffi::OsStrExt, path::{Path, PathBuf}, ptr, slice, sync::Arc, time::Duration, }; #[derive(Clone, Debug)] pub struct Event { pub event_id: u64, pub flags: StreamFlags, pub path: PathBuf, } pub struct EventStream { lifecycle: Arc>, state: Box, } struct State { latency: Duration, paths: cf::CFMutableArrayRef, callback: Option) -> bool>>, last_valid_event_id: Option, stream: fs::FSEventStreamRef, } impl Drop for State { fn drop(&mut self) { unsafe { cf::CFRelease(self.paths); fs::FSEventStreamStop(self.stream); fs::FSEventStreamInvalidate(self.stream); fs::FSEventStreamRelease(self.stream); } } } enum Lifecycle { New, Running(cf::CFRunLoopRef), Stopped, } pub struct Handle(Arc>); unsafe impl Send for EventStream {} unsafe impl Send for Lifecycle {} impl EventStream { pub fn new(paths: &[&Path], latency: Duration) -> (Self, Handle) { unsafe { let cf_paths = cf::CFArrayCreateMutable(cf::kCFAllocatorDefault, 0, &cf::kCFTypeArrayCallBacks); assert!(!cf_paths.is_null()); for path in paths { let path_bytes = path.as_os_str().as_bytes(); let cf_url = cf::CFURLCreateFromFileSystemRepresentation( cf::kCFAllocatorDefault, path_bytes.as_ptr() as *const i8, path_bytes.len() as cf::CFIndex, false, ); let cf_path = cf::CFURLCopyFileSystemPath(cf_url, cf::kCFURLPOSIXPathStyle); cf::CFArrayAppendValue(cf_paths, cf_path); cf::CFRelease(cf_path); cf::CFRelease(cf_url); } let mut state = Box::new(State { latency, paths: cf_paths, callback: None, last_valid_event_id: None, stream: ptr::null_mut(), }); let stream_context = fs::FSEventStreamContext { version: 0, info: state.as_ref() as *const _ as *mut c_void, retain: None, release: None, copy_description: None, }; let stream = fs::FSEventStreamCreate( cf::kCFAllocatorDefault, Self::trampoline, &stream_context, cf_paths, FSEventsGetCurrentEventId(), latency.as_secs_f64(), fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer | fs::kFSEventStreamCreateFlagWatchRoot, ); state.stream = stream; let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); ( EventStream { lifecycle: lifecycle.clone(), state, }, Handle(lifecycle), ) } } pub fn run(mut self, f: F) where F: FnMut(Vec) -> bool + 'static, { self.state.callback = Some(Box::new(f)); unsafe { let run_loop = cf::CFRunLoopGetCurrent(); { let mut state = self.lifecycle.lock(); match *state { Lifecycle::New => *state = Lifecycle::Running(run_loop), Lifecycle::Running(_) => unreachable!(), Lifecycle::Stopped => return, } } fs::FSEventStreamScheduleWithRunLoop( self.state.stream, run_loop, cf::kCFRunLoopDefaultMode, ); fs::FSEventStreamStart(self.state.stream); cf::CFRunLoopRun(); } } extern "C" fn trampoline( stream_ref: fs::FSEventStreamRef, info: *mut ::std::os::raw::c_void, num: usize, // size_t numEvents event_paths: *mut ::std::os::raw::c_void, // void *eventPaths event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[] event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[] ) { unsafe { let event_paths = event_paths as *const *const ::std::os::raw::c_char; let e_ptr = event_flags as *mut u32; let i_ptr = event_ids as *mut u64; let state = (info as *mut State).as_mut().unwrap(); let callback = if let Some(callback) = state.callback.as_mut() { callback } else { return; }; let paths = slice::from_raw_parts(event_paths, num); let flags = slice::from_raw_parts_mut(e_ptr, num); let ids = slice::from_raw_parts_mut(i_ptr, num); let mut stream_restarted = false; // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel // or our code couldn't keep up with the sheer volume of file-system events that were // generated. If we observed a valid event before this happens, we'll try to read the // file-system journal by stopping the current stream and creating a new one starting at // such event. Otherwise, we'll let invoke the callback with the dropped event, which // will likely perform a re-scan of one of the root directories. if flags .iter() .copied() .filter_map(StreamFlags::from_bits) .any(|flags| { flags.contains(StreamFlags::USER_DROPPED) || flags.contains(StreamFlags::KERNEL_DROPPED) }) { if let Some(last_valid_event_id) = state.last_valid_event_id.take() { fs::FSEventStreamStop(state.stream); fs::FSEventStreamInvalidate(state.stream); fs::FSEventStreamRelease(state.stream); let stream_context = fs::FSEventStreamContext { version: 0, info, retain: None, release: None, copy_description: None, }; let stream = fs::FSEventStreamCreate( cf::kCFAllocatorDefault, Self::trampoline, &stream_context, state.paths, last_valid_event_id, state.latency.as_secs_f64(), fs::kFSEventStreamCreateFlagFileEvents | fs::kFSEventStreamCreateFlagNoDefer | fs::kFSEventStreamCreateFlagWatchRoot, ); state.stream = stream; fs::FSEventStreamScheduleWithRunLoop( state.stream, cf::CFRunLoopGetCurrent(), cf::kCFRunLoopDefaultMode, ); fs::FSEventStreamStart(state.stream); stream_restarted = true; } } if !stream_restarted { let mut events = Vec::with_capacity(num); for p in 0..num { if let Some(flag) = StreamFlags::from_bits(flags[p]) { if !flag.contains(StreamFlags::HISTORY_DONE) { let path_c_str = CStr::from_ptr(paths[p]); let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); let event = Event { event_id: ids[p], flags: flag, path, }; state.last_valid_event_id = Some(event.event_id); events.push(event); } } else { debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); } } if !events.is_empty() && !callback(events) { fs::FSEventStreamStop(stream_ref); cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); } } } } } impl Drop for Handle { fn drop(&mut self) { let mut state = self.0.lock(); if let Lifecycle::Running(run_loop) = *state { unsafe { cf::CFRunLoopStop(run_loop); } } *state = Lifecycle::Stopped; } } // Synchronize with // /System/Library/Frameworks/CoreServices.framework/Versions/A/Frameworks/FSEvents.framework/Versions/A/Headers/FSEvents.h bitflags! { #[repr(C)] pub struct StreamFlags: u32 { const NONE = 0x00000000; const MUST_SCAN_SUBDIRS = 0x00000001; const USER_DROPPED = 0x00000002; const KERNEL_DROPPED = 0x00000004; const IDS_WRAPPED = 0x00000008; const HISTORY_DONE = 0x00000010; const ROOT_CHANGED = 0x00000020; const MOUNT = 0x00000040; const UNMOUNT = 0x00000080; const ITEM_CREATED = 0x00000100; const ITEM_REMOVED = 0x00000200; const INODE_META_MOD = 0x00000400; const ITEM_RENAMED = 0x00000800; const ITEM_MODIFIED = 0x00001000; const FINDER_INFO_MOD = 0x00002000; const ITEM_CHANGE_OWNER = 0x00004000; const ITEM_XATTR_MOD = 0x00008000; const IS_FILE = 0x00010000; const IS_DIR = 0x00020000; const IS_SYMLINK = 0x00040000; const OWN_EVENT = 0x00080000; const IS_HARDLINK = 0x00100000; const IS_LAST_HARDLINK = 0x00200000; const ITEM_CLONED = 0x400000; } } impl std::fmt::Display for StreamFlags { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { if self.contains(StreamFlags::MUST_SCAN_SUBDIRS) { let _d = write!(f, "MUST_SCAN_SUBDIRS "); } if self.contains(StreamFlags::USER_DROPPED) { let _d = write!(f, "USER_DROPPED "); } if self.contains(StreamFlags::KERNEL_DROPPED) { let _d = write!(f, "KERNEL_DROPPED "); } if self.contains(StreamFlags::IDS_WRAPPED) { let _d = write!(f, "IDS_WRAPPED "); } if self.contains(StreamFlags::HISTORY_DONE) { let _d = write!(f, "HISTORY_DONE "); } if self.contains(StreamFlags::ROOT_CHANGED) { let _d = write!(f, "ROOT_CHANGED "); } if self.contains(StreamFlags::MOUNT) { let _d = write!(f, "MOUNT "); } if self.contains(StreamFlags::UNMOUNT) { let _d = write!(f, "UNMOUNT "); } if self.contains(StreamFlags::ITEM_CREATED) { let _d = write!(f, "ITEM_CREATED "); } if self.contains(StreamFlags::ITEM_REMOVED) { let _d = write!(f, "ITEM_REMOVED "); } if self.contains(StreamFlags::INODE_META_MOD) { let _d = write!(f, "INODE_META_MOD "); } if self.contains(StreamFlags::ITEM_RENAMED) { let _d = write!(f, "ITEM_RENAMED "); } if self.contains(StreamFlags::ITEM_MODIFIED) { let _d = write!(f, "ITEM_MODIFIED "); } if self.contains(StreamFlags::FINDER_INFO_MOD) { let _d = write!(f, "FINDER_INFO_MOD "); } if self.contains(StreamFlags::ITEM_CHANGE_OWNER) { let _d = write!(f, "ITEM_CHANGE_OWNER "); } if self.contains(StreamFlags::ITEM_XATTR_MOD) { let _d = write!(f, "ITEM_XATTR_MOD "); } if self.contains(StreamFlags::IS_FILE) { let _d = write!(f, "IS_FILE "); } if self.contains(StreamFlags::IS_DIR) { let _d = write!(f, "IS_DIR "); } if self.contains(StreamFlags::IS_SYMLINK) { let _d = write!(f, "IS_SYMLINK "); } if self.contains(StreamFlags::OWN_EVENT) { let _d = write!(f, "OWN_EVENT "); } if self.contains(StreamFlags::IS_LAST_HARDLINK) { let _d = write!(f, "IS_LAST_HARDLINK "); } if self.contains(StreamFlags::IS_HARDLINK) { let _d = write!(f, "IS_HARDLINK "); } if self.contains(StreamFlags::ITEM_CLONED) { let _d = write!(f, "ITEM_CLONED "); } write!(f, "") } } #[link(name = "CoreServices", kind = "framework")] extern "C" { pub fn FSEventsGetCurrentEventId() -> u64; } #[cfg(test)] mod tests { use super::*; use std::{fs, sync::mpsc, thread, time::Duration}; use tempdir::TempDir; #[test] fn test_event_stream_simple() { for _ in 0..3 { let dir = TempDir::new("test-event-stream").unwrap(); let path = dir.path().canonicalize().unwrap(); for i in 0..10 { fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); } flush_historical_events(); let (tx, rx) = mpsc::channel(); let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); fs::write(path.join("new-file"), "").unwrap(); let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); let event = events.last().unwrap(); assert_eq!(event.path, path.join("new-file")); assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); fs::remove_file(path.join("existing-file-5")).unwrap(); let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); let event = events.last().unwrap(); assert_eq!(event.path, path.join("existing-file-5")); assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); drop(handle); } } #[test] fn test_event_stream_delayed_start() { for _ in 0..3 { let dir = TempDir::new("test-event-stream").unwrap(); let path = dir.path().canonicalize().unwrap(); for i in 0..10 { fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); } flush_historical_events(); let (tx, rx) = mpsc::channel(); let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); // Delay the call to `run` in order to make sure we don't miss any events that occur // between creating the `EventStream` and calling `run`. thread::spawn(move || { thread::sleep(Duration::from_millis(100)); stream.run(move |events| tx.send(events.to_vec()).is_ok()) }); fs::write(path.join("new-file"), "").unwrap(); let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); let event = events.last().unwrap(); assert_eq!(event.path, path.join("new-file")); assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); fs::remove_file(path.join("existing-file-5")).unwrap(); let events = rx.recv_timeout(Duration::from_secs(2)).unwrap(); let event = events.last().unwrap(); assert_eq!(event.path, path.join("existing-file-5")); assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); drop(handle); } } #[test] fn test_event_stream_shutdown_by_dropping_handle() { let dir = TempDir::new("test-event-stream").unwrap(); let path = dir.path().canonicalize().unwrap(); flush_historical_events(); let (tx, rx) = mpsc::channel(); let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); thread::spawn(move || { stream.run({ let tx = tx.clone(); move |_| { tx.send("running").unwrap(); true } }); tx.send("stopped").unwrap(); }); fs::write(path.join("new-file"), "").unwrap(); assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "running"); // Dropping the handle causes `EventStream::run` to return. drop(handle); assert_eq!(rx.recv_timeout(Duration::from_secs(2)).unwrap(), "stopped"); } #[test] fn test_event_stream_shutdown_before_run() { let dir = TempDir::new("test-event-stream").unwrap(); let path = dir.path().canonicalize().unwrap(); let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); drop(handle); // This returns immediately because the handle was already dropped. stream.run(|_| true); } fn flush_historical_events() { let duration = if std::env::var("CI").is_ok() { Duration::from_secs(2) } else { Duration::from_millis(500) }; thread::sleep(duration); } }