From 21cfb75a73bbbb28b284b89332fa4956c7f37b00 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Wed, 28 Apr 2021 12:42:34 -0700 Subject: [PATCH] Flush redundant fs events in worktree test Co-Authored-By: Antonio Scandurra --- fsevent/src/lib.rs | 177 +++++++++++++++++++++++++++----------------- gpui/src/app.rs | 15 +++- zed/src/worktree.rs | 32 +++++++- 3 files changed, 149 insertions(+), 75 deletions(-) diff --git a/fsevent/src/lib.rs b/fsevent/src/lib.rs index 69ea21cdb0..07a43a43b9 100644 --- a/fsevent/src/lib.rs +++ b/fsevent/src/lib.rs @@ -82,15 +82,6 @@ impl EventStream { ); cf::CFRelease(cf_paths); - fs::FSEventStreamScheduleWithRunLoop( - stream, - cf::CFRunLoopGetCurrent(), - cf::kCFRunLoopDefaultMode, - ); - fs::FSEventStreamStart(stream); - fs::FSEventStreamFlushSync(stream); - fs::FSEventStreamStop(stream); - let state = Arc::new(Mutex::new(Lifecycle::New)); ( @@ -302,70 +293,118 @@ extern "C" { pub fn FSEventsGetCurrentEventId() -> u64; } -#[test] -fn test_event_stream() { - use std::{fs, sync::mpsc, time::Duration}; +#[cfg(test)] +mod tests { + use super::*; + use std::{fs, sync::mpsc, thread, time::Duration}; use tempdir::TempDir; - let dir = TempDir::new("test_observe").unwrap(); - let path = dir.path().canonicalize().unwrap(); - fs::write(path.join("a"), "a contents").unwrap(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - std::thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); - - fs::write(path.join("b"), "b contents").unwrap(); - let events = rx.recv_timeout(Duration::from_millis(500)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("b")); - assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); - - fs::remove_file(path.join("a")).unwrap(); - let events = rx.recv_timeout(Duration::from_millis(500)).unwrap(); - let event = events.last().unwrap(); - assert_eq!(event.path, path.join("a")); - assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); - drop(handle); -} - -#[test] -fn test_event_stream_shutdown() { - use std::{fs, sync::mpsc, time::Duration}; - use tempdir::TempDir; - - let dir = TempDir::new("test_observe").unwrap(); - let path = dir.path().canonicalize().unwrap(); - - let (tx, rx) = mpsc::channel(); - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - std::thread::spawn(move || { - stream.run({ - let tx = tx.clone(); - move |_| { - tx.send(()).unwrap(); - true + #[test] + fn test_event_stream_simple() { + for _ in 0..3 { + let dir = TempDir::new("test-event-stream").unwrap(); + let path = dir.path().canonicalize().unwrap(); + for i in 0..10 { + fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); } + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + thread::spawn(move || stream.run(move |events| tx.send(events.to_vec()).is_ok())); + + // Flush any historical events. + rx.recv_timeout(Duration::from_millis(500)).ok(); + + fs::write(path.join("new-file"), "").unwrap(); + let events = rx.recv_timeout(Duration::from_millis(500)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("new-file")); + assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); + + fs::remove_file(path.join("existing-file-5")).unwrap(); + let events = rx.recv_timeout(Duration::from_millis(500)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("existing-file-5")); + assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); + drop(handle); + } + } + + #[test] + fn test_event_stream_delayed_start() { + for _ in 0..3 { + let dir = TempDir::new("test-event-stream").unwrap(); + let path = dir.path().canonicalize().unwrap(); + for i in 0..10 { + fs::write(path.join(format!("existing-file-{}", i)), "").unwrap(); + } + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + + // Delay the call to `run` in order to make sure we don't miss any events that occur + // between creating the `EventStream` and calling `run`. + thread::spawn(move || { + thread::sleep(Duration::from_millis(250)); + stream.run(move |events| tx.send(events.to_vec()).is_ok()) + }); + + fs::write(path.join("new-file"), "").unwrap(); + let events = rx.recv_timeout(Duration::from_millis(500)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("new-file")); + assert!(event.flags.contains(StreamFlags::ITEM_CREATED)); + + fs::remove_file(path.join("existing-file-5")).unwrap(); + let events = rx.recv_timeout(Duration::from_millis(500)).unwrap(); + let event = events.last().unwrap(); + assert_eq!(event.path, path.join("existing-file-5")); + assert!(event.flags.contains(StreamFlags::ITEM_REMOVED)); + drop(handle); + } + } + + #[test] + fn test_event_stream_shutdown_by_dropping_handle() { + let dir = TempDir::new("test-event-stream").unwrap(); + let path = dir.path().canonicalize().unwrap(); + + let (tx, rx) = mpsc::channel(); + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + thread::spawn(move || { + stream.run({ + let tx = tx.clone(); + move |_| { + tx.send("running").unwrap(); + true + } + }); + tx.send("stopped").unwrap(); }); - tx.send(()).unwrap(); - }); - fs::write(path.join("b"), "b contents").unwrap(); - rx.recv_timeout(Duration::from_millis(500)).unwrap(); + fs::write(path.join("new-file"), "").unwrap(); + assert_eq!( + rx.recv_timeout(Duration::from_millis(500)).unwrap(), + "running" + ); - drop(handle); - rx.recv_timeout(Duration::from_millis(500)).unwrap(); -} - -#[test] -fn test_event_stream_shutdown_before_run() { - use std::time::Duration; - use tempdir::TempDir; - - let dir = TempDir::new("test_observe").unwrap(); - let path = dir.path().canonicalize().unwrap(); - - let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); - drop(handle); - stream.run(|_| true); + // Dropping the handle causes `EventStream::run` to return. + drop(handle); + assert_eq!( + rx.recv_timeout(Duration::from_millis(500)).unwrap(), + "stopped" + ); + } + + #[test] + fn test_event_stream_shutdown_before_run() { + let dir = TempDir::new("test-event-stream").unwrap(); + let path = dir.path().canonicalize().unwrap(); + + let (stream, handle) = EventStream::new(&[&path], Duration::from_millis(50)); + drop(handle); + + // This returns immediately because the handle was already dropped. + stream.run(|_| true); + } } diff --git a/gpui/src/app.rs b/gpui/src/app.rs index 43e422ac12..3e7b3cb4ad 100644 --- a/gpui/src/app.rs +++ b/gpui/src/app.rs @@ -2003,8 +2003,17 @@ impl ModelHandle { pub fn condition( &self, ctx: &TestAppContext, - mut predicate: impl 'static + FnMut(&T, &AppContext) -> bool, - ) -> impl 'static + Future { + predicate: impl FnMut(&T, &AppContext) -> bool, + ) -> impl Future { + self.condition_with_duration(Duration::from_millis(100), ctx, predicate) + } + + pub fn condition_with_duration( + &self, + duration: Duration, + ctx: &TestAppContext, + mut predicate: impl FnMut(&T, &AppContext) -> bool, + ) -> impl Future { let mut ctx = ctx.0.borrow_mut(); let tx = ctx .async_observations @@ -2015,7 +2024,7 @@ impl ModelHandle { let handle = self.downgrade(); async move { - timeout(Duration::from_millis(200), async move { + timeout(duration, async move { loop { { let ctx = ctx.borrow(); diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index e475e3e733..f5965ec1d5 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -1235,7 +1235,7 @@ mod tests { use crate::editor::Buffer; use crate::test::*; use anyhow::Result; - use gpui::App; + use gpui::{App, TestAppContext}; use rand::prelude::*; use serde_json::json; use std::env; @@ -1345,7 +1345,7 @@ mod tests { let tree = app.add_model(|ctx| Worktree::new(dir.path(), ctx)); app.read(|ctx| tree.read(ctx).scan_complete()).await; - app.read(|ctx| assert_eq!(tree.read(ctx).file_count(), 5)); + flush_fs_events(&tree, &app).await; let (file2, file3, file4, file5) = app.read(|ctx| { ( @@ -1358,8 +1358,8 @@ mod tests { std::fs::rename(dir.path().join("a/file3"), dir.path().join("b/c/file3")).unwrap(); std::fs::remove_file(dir.path().join("b/c/file5")).unwrap(); - std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap(); std::fs::rename(dir.path().join("b/c"), dir.path().join("d")).unwrap(); + std::fs::rename(dir.path().join("a/file2"), dir.path().join("a/file2.new")).unwrap(); app.read(|ctx| tree.read(ctx).next_scan_complete()).await; app.read(|ctx| { @@ -1411,6 +1411,7 @@ mod tests { let tree = app.add_model(|ctx| Worktree::new(dir.path(), ctx)); app.read(|ctx| tree.read(ctx).scan_complete()).await; + flush_fs_events(&tree, &app).await; app.read(|ctx| { let tree = tree.read(ctx); let tracked = tree.entry_for_path("tracked-dir/tracked-file1").unwrap(); @@ -1717,4 +1718,29 @@ mod tests { paths } } + + // When the worktree's FS event stream sometimes delivers "redundant" events for FS changes that + // occurred before the worktree was constructed. These events can cause the worktree to perfrom + // extra directory scans, and emit extra scan-state notifications. + // + // This function mutates the worktree's directory and waits for those mutations to be picked up, + // to ensure that all redundant FS events have already been processed. + async fn flush_fs_events(tree: &ModelHandle, app: &TestAppContext) { + let filename = "fs-event-sentinel"; + let root_path = app.read(|ctx| tree.read(ctx).abs_path.clone()); + + fs::write(root_path.join(filename), "").unwrap(); + tree.condition_with_duration(Duration::from_secs(5), &app, |tree, _| { + tree.entry_for_path(filename).is_some() + }) + .await; + + fs::remove_file(root_path.join(filename)).unwrap(); + tree.condition_with_duration(Duration::from_secs(5), &app, |tree, _| { + tree.entry_for_path(filename).is_none() + }) + .await; + + app.read(|ctx| tree.read(ctx).scan_complete()).await; + } }