mirror of
https://github.com/zed-industries/zed.git
synced 2025-02-06 02:37:21 +00:00
Make EventStream interface more flexible
Take a callback instead of an mpsc Sender. The run method blocks and invokes the callback for each batch of events. The caller controls the threading. The callback can return false to terminate the event stream. Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
parent
6d3dc85dad
commit
4878bf82ff
1 changed files with 50 additions and 50 deletions
|
@ -8,31 +8,33 @@ use std::{
|
||||||
os::unix::ffi::OsStrExt,
|
os::unix::ffi::OsStrExt,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
slice,
|
slice,
|
||||||
sync::mpsc::Sender,
|
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Clone, Debug)]
|
||||||
pub struct Event {
|
pub struct Event {
|
||||||
pub event_id: u64,
|
pub event_id: u64,
|
||||||
pub flags: StreamFlags,
|
pub flags: StreamFlags,
|
||||||
pub path: PathBuf,
|
pub path: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct EventStream {
|
pub struct EventStream<F> {
|
||||||
stream: fs::FSEventStreamRef,
|
stream: fs::FSEventStreamRef,
|
||||||
_sender: Box<Sender<Vec<Event>>>,
|
_callback: Box<F>,
|
||||||
}
|
}
|
||||||
|
|
||||||
unsafe impl Send for EventStream {}
|
unsafe impl<F> Send for EventStream<F> {}
|
||||||
|
|
||||||
impl EventStream {
|
impl<F> EventStream<F>
|
||||||
pub fn new(paths: &[&Path], latency: Duration, event_sender: Sender<Vec<Event>>) -> Self {
|
where
|
||||||
|
F: FnMut(&[Event]) -> bool,
|
||||||
|
{
|
||||||
|
pub fn new(paths: &[&Path], latency: Duration, callback: F) -> Self {
|
||||||
unsafe {
|
unsafe {
|
||||||
let sender = Box::new(event_sender);
|
let callback = Box::new(callback);
|
||||||
let stream_context = fs::FSEventStreamContext {
|
let stream_context = fs::FSEventStreamContext {
|
||||||
version: 0,
|
version: 0,
|
||||||
info: sender.as_ref() as *const _ as *mut c_void,
|
info: callback.as_ref() as *const _ as *mut c_void,
|
||||||
retain: None,
|
retain: None,
|
||||||
release: None,
|
release: None,
|
||||||
copy_description: None,
|
copy_description: None,
|
||||||
|
@ -58,7 +60,7 @@ impl EventStream {
|
||||||
|
|
||||||
let stream = fs::FSEventStreamCreate(
|
let stream = fs::FSEventStreamCreate(
|
||||||
cf::kCFAllocatorDefault,
|
cf::kCFAllocatorDefault,
|
||||||
callback,
|
Self::trampoline,
|
||||||
&stream_context,
|
&stream_context,
|
||||||
cf_paths,
|
cf_paths,
|
||||||
fs::kFSEventStreamEventIdSinceNow,
|
fs::kFSEventStreamEventIdSinceNow,
|
||||||
|
@ -71,7 +73,7 @@ impl EventStream {
|
||||||
|
|
||||||
EventStream {
|
EventStream {
|
||||||
stream,
|
stream,
|
||||||
_sender: sender,
|
_callback: callback,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -92,21 +94,20 @@ impl EventStream {
|
||||||
fs::FSEventStreamRelease(self.stream);
|
fs::FSEventStreamRelease(self.stream);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
extern "C" fn callback(
|
extern "C" fn trampoline(
|
||||||
stream_ref: fs::FSEventStreamRef,
|
stream_ref: fs::FSEventStreamRef,
|
||||||
info: *mut ::std::os::raw::c_void,
|
info: *mut ::std::os::raw::c_void,
|
||||||
num: usize, // size_t numEvents
|
num: usize, // size_t numEvents
|
||||||
event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
|
event_paths: *mut ::std::os::raw::c_void, // void *eventPaths
|
||||||
event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
|
event_flags: *const ::std::os::raw::c_void, // const FSEventStreamEventFlags eventFlags[]
|
||||||
event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
|
event_ids: *const ::std::os::raw::c_void, // const FSEventStreamEventId eventIds[]
|
||||||
) {
|
) {
|
||||||
unsafe {
|
unsafe {
|
||||||
let event_paths = event_paths as *const *const ::std::os::raw::c_char;
|
let event_paths = event_paths as *const *const ::std::os::raw::c_char;
|
||||||
let e_ptr = event_flags as *mut u32;
|
let e_ptr = event_flags as *mut u32;
|
||||||
let i_ptr = event_ids as *mut u64;
|
let i_ptr = event_ids as *mut u64;
|
||||||
let sender = (info as *mut Sender<Vec<Event>>).as_mut().unwrap();
|
let callback = (info as *mut F).as_mut().unwrap();
|
||||||
|
|
||||||
let paths = slice::from_raw_parts(event_paths, num);
|
let paths = slice::from_raw_parts(event_paths, num);
|
||||||
let flags = slice::from_raw_parts_mut(e_ptr, num);
|
let flags = slice::from_raw_parts_mut(e_ptr, num);
|
||||||
|
@ -127,11 +128,12 @@ extern "C" fn callback(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if sender.send(events).is_err() {
|
if !callback(&events) {
|
||||||
fs::FSEventStreamStop(stream_ref);
|
fs::FSEventStreamStop(stream_ref);
|
||||||
cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
|
cf::CFRunLoopStop(cf::CFRunLoopGetCurrent());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Synchronize with
|
// Synchronize with
|
||||||
|
@ -242,7 +244,7 @@ impl std::fmt::Display for StreamFlags {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_observe() {
|
fn test_event_stream() {
|
||||||
use std::{fs, sync::mpsc, time::Duration};
|
use std::{fs, sync::mpsc, time::Duration};
|
||||||
use tempdir::TempDir;
|
use tempdir::TempDir;
|
||||||
|
|
||||||
|
@ -251,7 +253,9 @@ fn test_observe() {
|
||||||
fs::write(path.join("a"), "a contents").unwrap();
|
fs::write(path.join("a"), "a contents").unwrap();
|
||||||
|
|
||||||
let (tx, rx) = mpsc::channel();
|
let (tx, rx) = mpsc::channel();
|
||||||
let stream = EventStream::new(&[&path], Duration::from_millis(50), tx);
|
let stream = EventStream::new(&[&path], Duration::from_millis(50), move |events| {
|
||||||
|
tx.send(events.to_vec()).is_ok()
|
||||||
|
});
|
||||||
std::thread::spawn(move || stream.run());
|
std::thread::spawn(move || stream.run());
|
||||||
|
|
||||||
fs::write(path.join("b"), "b contents").unwrap();
|
fs::write(path.join("b"), "b contents").unwrap();
|
||||||
|
@ -265,8 +269,4 @@ fn test_observe() {
|
||||||
let event = events.last().unwrap();
|
let event = events.last().unwrap();
|
||||||
assert_eq!(event.path, path.join("a"));
|
assert_eq!(event.path, path.join("a"));
|
||||||
assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
|
assert!(event.flags.contains(StreamFlags::ITEM_REMOVED));
|
||||||
|
|
||||||
let dir2 = TempDir::new("test_observe2").unwrap();
|
|
||||||
fs::rename(path, dir2.path().join("something")).unwrap();
|
|
||||||
let events = rx.recv_timeout(Duration::from_millis(500)).unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue