diff --git a/crates/fsevent/src/fsevent.rs b/crates/fsevent/src/fsevent.rs index efd498efed..4b05b6599c 100644 --- a/crates/fsevent/src/fsevent.rs +++ b/crates/fsevent/src/fsevent.rs @@ -8,7 +8,7 @@ use std::{ ffi::{c_void, CStr, OsStr}, os::unix::ffi::OsStrExt, path::{Path, PathBuf}, - slice, + ptr, slice, sync::Arc, time::Duration, }; @@ -21,7 +21,6 @@ pub struct Event { } pub struct EventStream { - stream: fs::FSEventStreamRef, lifecycle: Arc>, state: Box, } @@ -31,6 +30,7 @@ struct State { paths: cf::CFMutableArrayRef, callback: Option) -> bool>>, last_valid_event_id: Option, + stream: fs::FSEventStreamRef, } impl Drop for State { @@ -73,11 +73,12 @@ impl EventStream { cf::CFRelease(cf_url); } - let state = Box::new(State { + let mut state = Box::new(State { latency, paths: cf_paths, callback: None, last_valid_event_id: None, + stream: ptr::null_mut(), }); let stream_context = fs::FSEventStreamContext { version: 0, @@ -97,11 +98,11 @@ impl EventStream { | fs::kFSEventStreamCreateFlagNoDefer | fs::kFSEventStreamCreateFlagWatchRoot, ); + state.stream = stream; let lifecycle = Arc::new(Mutex::new(Lifecycle::New)); ( EventStream { - stream, lifecycle: lifecycle.clone(), state, }, @@ -125,10 +126,16 @@ impl EventStream { Lifecycle::Stopped => return, } } - fs::FSEventStreamScheduleWithRunLoop(self.stream, run_loop, cf::kCFRunLoopDefaultMode); - fs::FSEventStreamStart(self.stream); + fs::FSEventStreamScheduleWithRunLoop( + self.state.stream, + run_loop, + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(self.state.stream); cf::CFRunLoopRun(); - fs::FSEventStreamRelease(self.stream); + fs::FSEventStreamStop(self.state.stream); + fs::FSEventStreamInvalidate(self.state.stream); + fs::FSEventStreamRelease(self.state.stream); } } @@ -154,30 +161,83 @@ impl EventStream { let paths = slice::from_raw_parts(event_paths, num); let flags = slice::from_raw_parts_mut(e_ptr, num); let ids = slice::from_raw_parts_mut(i_ptr, num); + let mut stream_restarted = false; - let mut events = Vec::with_capacity(num); - for p in 0..num { - let path_c_str = CStr::from_ptr(paths[p]); - let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); - if let Some(flag) = StreamFlags::from_bits(flags[p]) { - if flag.contains(StreamFlags::HISTORY_DONE) { - events.clear(); - } else { - events.push(Event { - event_id: ids[p], - flags: flag, - path, - }); - } - } else { - debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); + // Sometimes FSEvents reports a "dropped" event, an indication that either the kernel + // or our code couldn't keep up with the sheer volume of file-system events that were + // generated. If we observed a valid event before this happens, we'll try to read the + // file-system journal by stopping the current stream and creating a new one starting at + // such event. Otherwise, we'll let invoke the callback with the dropped event, which + // will likely perform a re-scan of one of the root directories. + if flags + .iter() + .copied() + .filter_map(StreamFlags::from_bits) + .any(|flags| { + flags.contains(StreamFlags::USER_DROPPED) + || flags.contains(StreamFlags::KERNEL_DROPPED) + }) + { + if let Some(last_valid_event_id) = state.last_valid_event_id.take() { + fs::FSEventStreamStop(state.stream); + fs::FSEventStreamInvalidate(state.stream); + fs::FSEventStreamRelease(state.stream); + + let stream_context = fs::FSEventStreamContext { + version: 0, + info, + retain: None, + release: None, + copy_description: None, + }; + let stream = fs::FSEventStreamCreate( + cf::kCFAllocatorDefault, + Self::trampoline, + &stream_context, + state.paths, + last_valid_event_id, + state.latency.as_secs_f64(), + fs::kFSEventStreamCreateFlagFileEvents + | fs::kFSEventStreamCreateFlagNoDefer + | fs::kFSEventStreamCreateFlagWatchRoot, + ); + + state.stream = stream; + fs::FSEventStreamScheduleWithRunLoop( + state.stream, + cf::CFRunLoopGetCurrent(), + cf::kCFRunLoopDefaultMode, + ); + fs::FSEventStreamStart(state.stream); + stream_restarted = true; } } - if !events.is_empty() { - if !callback(events) { - fs::FSEventStreamStop(stream_ref); - cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + if !stream_restarted { + let mut events = Vec::with_capacity(num); + for p in 0..num { + if let Some(flag) = StreamFlags::from_bits(flags[p]) { + if !flag.contains(StreamFlags::HISTORY_DONE) { + let path_c_str = CStr::from_ptr(paths[p]); + let path = PathBuf::from(OsStr::from_bytes(path_c_str.to_bytes())); + let event = Event { + event_id: ids[p], + flags: flag, + path, + }; + state.last_valid_event_id = Some(event.event_id); + events.push(event); + } + } else { + debug_assert!(false, "unknown flag set for fs event: {}", flags[p]); + } + } + + if !events.is_empty() { + if !callback(events) { + fs::FSEventStreamStop(stream_ref); + cf::CFRunLoopStop(cf::CFRunLoopGetCurrent()); + } } } }