Emit a diff event when worktree's snapshot is updated

Co-Authored-By: Nathan Sobo <nathan@zed.dev>
This commit is contained in:
Max Brunsfeld 2021-06-21 17:07:56 -07:00
parent 8ae5c0d7d6
commit cabf6b1f58
4 changed files with 186 additions and 71 deletions

View file

@ -2131,9 +2131,9 @@ impl<T: Entity> ModelHandle<T> {
let cx = cx.weak_self.as_ref().unwrap().upgrade().unwrap();
let handle = self.downgrade();
let duration = if std::env::var("CI").is_ok() {
Duration::from_secs(2)
Duration::from_secs(5)
} else {
Duration::from_millis(500)
Duration::from_secs(1)
};
async move {

View file

@ -95,8 +95,9 @@ message Entry {
bool is_dir = 1;
string path = 2;
uint64 inode = 3;
bool is_symlink = 4;
bool is_ignored = 5;
Timestamp mtime = 4;
bool is_symlink = 5;
bool is_ignored = 6;
}
message Buffer {
@ -121,3 +122,8 @@ message Operation {
uint32 timestamp = 2;
}
}
message Timestamp {
uint64 seconds = 1;
uint32 nanos = 2;
}

View file

@ -1,6 +1,10 @@
use futures::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt as _};
use prost::Message;
use std::{convert::TryInto, io};
use std::{
convert::TryInto,
io,
time::{Duration, SystemTime, UNIX_EPOCH},
};
include!(concat!(env!("OUT_DIR"), "/zed.messages.rs"));
@ -122,6 +126,24 @@ where
}
}
impl Into<SystemTime> for Timestamp {
fn into(self) -> SystemTime {
UNIX_EPOCH
.checked_add(Duration::new(self.seconds, self.nanos))
.unwrap()
}
}
impl From<SystemTime> for Timestamp {
fn from(time: SystemTime) -> Self {
let duration = time.duration_since(UNIX_EPOCH).unwrap();
Self {
seconds: duration.as_secs(),
nanos: duration.subsec_nanos(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -35,7 +35,7 @@ use std::{
atomic::{AtomicU64, Ordering::SeqCst},
Arc, Weak,
},
time::{Duration, UNIX_EPOCH},
time::{Duration, SystemTime, UNIX_EPOCH},
};
lazy_static! {
@ -44,7 +44,7 @@ lazy_static! {
#[derive(Clone, Debug)]
enum ScanState {
Idle,
Idle(Option<Diff>),
Scanning,
Err(Arc<io::Error>),
}
@ -55,7 +55,7 @@ pub enum Worktree {
}
impl Entity for Worktree {
type Event = ();
type Event = Diff;
}
impl Worktree {
@ -104,7 +104,7 @@ impl Worktree {
) -> impl Future<Output = Result<()>> {
match self {
Worktree::Local(worktree) => worktree.save(path, content, cx),
Worktree::Remote(worktree) => todo!(),
Worktree::Remote(_) => todo!(),
}
}
}
@ -127,7 +127,7 @@ pub struct LocalWorktree {
next_handle_id: AtomicU64,
scan_state: (watch::Sender<ScanState>, watch::Receiver<ScanState>),
_event_stream_handle: fsevent::Handle,
polling_snapshot: bool,
poll_scheduled: bool,
rpc: Option<rpc::Client>,
}
@ -191,7 +191,7 @@ impl LocalWorktree {
next_handle_id: Default::default(),
scan_state: watch::channel_with(ScanState::Scanning),
_event_stream_handle: event_stream_handle,
polling_snapshot: false,
poll_scheduled: false,
rpc: None,
};
@ -240,45 +240,32 @@ impl LocalWorktree {
}
}
fn observe_scan_state(&mut self, scan_state: ScanState, cx: &mut ModelContext<Worktree>) {
let _ = self.scan_state.0.blocking_send(scan_state);
if !self.polling_snapshot {
self.poll_snapshot(cx);
fn observe_scan_state(&mut self, mut scan_state: ScanState, cx: &mut ModelContext<Worktree>) {
if let ScanState::Idle(diff) = &mut scan_state {
if let Some(diff) = diff.take() {
cx.emit(diff);
}
}
let _ = self.scan_state.0.blocking_send(scan_state);
self.poll_snapshot(cx);
}
fn poll_snapshot(&mut self, cx: &mut ModelContext<Worktree>) {
let poll_again = self.is_scanning();
if poll_again {
self.polling_snapshot = true;
}
self.snapshot = self.background_snapshot.lock().clone();
cx.notify();
// let prev_snapshot = self.snapshot.clone();
let background_snapshot = self.background_snapshot.clone();
let next_snapshot = cx.background_executor().spawn(async move {
let next_snapshot = background_snapshot.lock().clone();
// TODO: Diff with next and prev snapshots
next_snapshot
});
cx.spawn(|this, mut cx| async move {
let next_snapshot = next_snapshot.await;
this.update(&mut cx, |this, cx| {
let worktree = this.as_local_mut().unwrap();
worktree.snapshot = next_snapshot;
cx.notify();
});
if poll_again {
if self.is_scanning() && !self.poll_scheduled {
cx.spawn(|this, mut cx| async move {
smol::Timer::after(Duration::from_millis(100)).await;
this.update(&mut cx, |this, cx| {
let worktree = this.as_local_mut().unwrap();
worktree.polling_snapshot = false;
worktree.poll_scheduled = false;
worktree.poll_snapshot(cx);
})
}
})
.detach();
})
.detach();
self.poll_scheduled = true;
}
}
fn is_scanning(&self) -> bool {
@ -353,6 +340,7 @@ impl LocalWorktree {
is_dir: entry.is_dir(),
path: entry.path.to_string_lossy().to_string(),
inode: entry.inode,
mtime: Some(entry.mtime.into()),
is_symlink: entry.is_symlink,
is_ignored: entry.is_ignored,
})
@ -419,7 +407,7 @@ impl RemoteWorktree {
.collect();
let mut entries = SumTree::new();
entries.extend(
worktree.entries.into_iter().map(|entry| {
worktree.entries.into_iter().filter_map(|entry| {
let kind = if entry.is_dir {
EntryKind::Dir
} else {
@ -427,12 +415,18 @@ impl RemoteWorktree {
char_bag.extend(entry.path.chars().map(|c| c.to_ascii_lowercase()));
EntryKind::File(char_bag)
};
Entry {
kind,
path: Path::new(&entry.path).into(),
inode: entry.inode,
is_symlink: entry.is_symlink,
is_ignored: entry.is_ignored,
if let Some(mtime) = entry.mtime {
Some(Entry {
kind,
path: Path::new(&entry.path).into(),
inode: entry.inode,
mtime: mtime.into(),
is_symlink: entry.is_symlink,
is_ignored: entry.is_ignored,
})
} else {
log::warn!("missing mtime in worktree entry message");
None
}
}),
&(),
@ -627,34 +621,37 @@ impl Snapshot {
let mut old = old.entries.cursor::<(), ()>().peekable();
let mut diff = Diff::default();
let mut removed_inodes = HashMap::new();
let mut added_inodes = HashMap::new();
let mut removed = HashMap::new();
let mut added = HashMap::new();
loop {
match (new.peek(), old.peek()) {
match (new.peek().copied(), old.peek().copied()) {
(Some(new_entry), Some(old_entry)) => match new_entry.path.cmp(&old_entry.path) {
cmp::Ordering::Equal => {
if new_entry.mtime > old_entry.mtime {
diff.modified.insert(new_entry.path.clone());
}
new.next();
old.next();
}
cmp::Ordering::Less => {
added_inodes.insert(new_entry.inode, new_entry.path.clone());
added.insert(new_entry.inode, new_entry);
diff.added.insert(new_entry.path.clone());
new.next();
}
cmp::Ordering::Greater => {
removed_inodes.insert(old_entry.path.clone(), old_entry.inode);
removed.insert(&old_entry.path, old_entry);
diff.removed.insert(old_entry.path.clone());
old.next();
}
},
(Some(new_entry), None) => {
added_inodes.insert(new_entry.inode, new_entry.path.clone());
added.insert(new_entry.inode, new_entry);
diff.added.insert(new_entry.path.clone());
new.next();
}
(None, Some(old_entry)) => {
removed_inodes.insert(old_entry.path.clone(), old_entry.inode);
removed.insert(&old_entry.path, old_entry);
diff.removed.insert(old_entry.path.clone());
old.next();
}
@ -662,11 +659,15 @@ impl Snapshot {
}
}
for (removed_path, inode) in removed_inodes {
if let Some(added_path) = added_inodes.remove(&inode) {
diff.removed.remove(&removed_path);
diff.added.remove(&added_path);
diff.moved.insert(removed_path, added_path);
for (removed_path, removed_entry) in removed {
if let Some(added_entry) = added.remove(&removed_entry.inode) {
diff.removed.remove(removed_path);
diff.added.remove(&added_entry.path);
diff.moved
.insert(removed_path.clone(), added_entry.path.clone());
if added_entry.mtime > removed_entry.mtime {
diff.modified.insert(added_entry.path.clone());
}
}
}
@ -686,12 +687,12 @@ impl fmt::Debug for Snapshot {
}
}
#[derive(Default)]
struct Diff {
moved: HashMap<Arc<Path>, Arc<Path>>,
removed: HashSet<Arc<Path>>,
added: HashSet<Arc<Path>>,
modified: HashSet<Arc<Path>>,
#[derive(Clone, Default, Debug, PartialEq)]
pub struct Diff {
pub moved: HashMap<Arc<Path>, Arc<Path>>,
pub removed: HashSet<Arc<Path>>,
pub added: HashSet<Arc<Path>>,
pub modified: HashSet<Arc<Path>>,
}
impl FileHandle {
@ -826,6 +827,7 @@ pub struct Entry {
kind: EntryKind,
path: Arc<Path>,
inode: u64,
mtime: SystemTime,
is_symlink: bool,
is_ignored: bool,
}
@ -1036,7 +1038,7 @@ impl BackgroundScanner {
}
}
if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
if smol::block_on(self.notify.send(ScanState::Idle(None))).is_err() {
return;
}
@ -1045,11 +1047,13 @@ impl BackgroundScanner {
return false;
}
let prev_snapshot = self.snapshot.lock().clone();
if !self.process_events(events) {
return false;
}
if smol::block_on(self.notify.send(ScanState::Idle)).is_err() {
let diff = self.snapshot.lock().diff(&prev_snapshot);
if smol::block_on(self.notify.send(ScanState::Idle(Some(diff)))).is_err() {
return false;
}
@ -1066,6 +1070,7 @@ impl BackgroundScanner {
let inode = metadata.ino();
let is_symlink = fs::symlink_metadata(&abs_path)?.file_type().is_symlink();
let is_dir = metadata.file_type().is_dir();
let mtime = metadata.modified()?;
// After determining whether the root entry is a file or a directory, populate the
// snapshot's "root name", which will be used for the purpose of fuzzy matching.
@ -1083,6 +1088,7 @@ impl BackgroundScanner {
kind: EntryKind::PendingDir,
path: path.clone(),
inode,
mtime,
is_symlink,
is_ignored: false,
});
@ -1113,6 +1119,7 @@ impl BackgroundScanner {
kind: EntryKind::File(self.char_bag(&path)),
path,
inode,
mtime,
is_symlink,
is_ignored: false,
});
@ -1142,6 +1149,7 @@ impl BackgroundScanner {
};
let child_inode = child_metadata.ino();
let child_mtime = child_metadata.modified()?;
// If we find a .gitignore, add it to the stack of ignores used to determine which paths are ignored
if child_name == *GITIGNORE {
@ -1176,6 +1184,7 @@ impl BackgroundScanner {
kind: EntryKind::PendingDir,
path: child_path.clone(),
inode: child_inode,
mtime: child_mtime,
is_symlink: child_is_symlink,
is_ignored,
});
@ -1195,6 +1204,7 @@ impl BackgroundScanner {
kind: EntryKind::File(self.char_bag(&child_path)),
path: child_path,
inode: child_inode,
mtime: child_mtime,
is_symlink: child_is_symlink,
is_ignored,
});
@ -1469,6 +1479,7 @@ impl BackgroundScanner {
Ok(metadata) => metadata,
};
let inode = metadata.ino();
let mtime = metadata.modified()?;
let is_symlink = fs::symlink_metadata(&abs_path)
.context("failed to read symlink metadata")?
.file_type()
@ -1482,6 +1493,7 @@ impl BackgroundScanner {
},
path,
inode,
mtime,
is_symlink,
is_ignored: false,
};
@ -1754,10 +1766,7 @@ mod tests {
use anyhow::Result;
use rand::prelude::*;
use serde_json::json;
use std::env;
use std::fmt::Write;
use std::os::unix;
use std::time::SystemTime;
use std::{cell::RefCell, env, fmt::Write, os::unix, rc::Rc, time::SystemTime};
#[gpui::test]
async fn test_populate_and_search(mut cx: gpui::TestAppContext) {
@ -1998,6 +2007,7 @@ mod tests {
path: Path::new("b").into(),
kind: EntryKind::Dir,
inode: 0,
mtime: UNIX_EPOCH,
is_ignored: false,
is_symlink: false,
}),
@ -2005,6 +2015,7 @@ mod tests {
path: Path::new("b/a").into(),
kind: EntryKind::Dir,
inode: 0,
mtime: UNIX_EPOCH,
is_ignored: false,
is_symlink: false,
}),
@ -2012,6 +2023,7 @@ mod tests {
path: Path::new("b/c").into(),
kind: EntryKind::PendingDir,
inode: 0,
mtime: UNIX_EPOCH,
is_ignored: false,
is_symlink: false,
}),
@ -2019,6 +2031,7 @@ mod tests {
path: Path::new("b/e").into(),
kind: EntryKind::Dir,
inode: 0,
mtime: UNIX_EPOCH,
is_ignored: false,
is_symlink: false,
}),
@ -2034,6 +2047,80 @@ mod tests {
assert!(!snapshot.path_is_pending("b/e"));
}
#[gpui::test]
async fn test_file_change_events(mut cx: gpui::TestAppContext) {
let dir = temp_tree(json!({
"dir_a": {
"file1": "1",
"file2": "2",
"dir_b": {
"file3": "3",
}
},
"dir_c": {
"dir_d": {
"file4": "4",
"file5": "5",
}
}
}));
let root = dir.path();
let tree = cx.add_model(|cx| Worktree::local(root, cx));
cx.read(|cx| tree.read(cx).as_local().unwrap().scan_complete())
.await;
tree.flush_fs_events(&cx).await;
let events = Rc::new(RefCell::new(Vec::new()));
tree.update(&mut cx, {
let events = events.clone();
|_, cx| {
cx.subscribe(&tree, move |_, event, _| {
events.borrow_mut().push(event.clone());
})
}
});
std::fs::remove_file(root.join("dir_a/file1")).unwrap();
std::fs::rename(root.join("dir_a/file2"), root.join("dir_c/file20")).unwrap();
std::fs::write(root.join("dir_c/dir_d/file4"), "modified 4").unwrap();
std::fs::write(root.join("dir_c/file10"), "hi").unwrap();
std::fs::rename(
root.join("dir_c/dir_d/file5"),
root.join("dir_c/dir_d/file50"),
)
.unwrap();
std::fs::write(root.join("dir_c/dir_d/file50"), "modified after rename").unwrap();
tree.condition(&cx, |_, _| !events.borrow().is_empty())
.await;
assert_eq!(
*events.borrow(),
&[Diff {
moved: vec![
(
Path::new("dir_a/file2").into(),
Path::new("dir_c/file20").into(),
),
(
Path::new("dir_c/dir_d/file5").into(),
Path::new("dir_c/dir_d/file50").into(),
)
]
.into_iter()
.collect(),
added: vec![Path::new("dir_c/file10").into()].into_iter().collect(),
removed: vec![Path::new("dir_a/file1").into()].into_iter().collect(),
modified: vec![
Path::new("dir_c/dir_d/file4").into(),
Path::new("dir_c/dir_d/file50").into()
]
.into_iter()
.collect(),
}]
);
}
#[test]
fn test_random() {
let iterations = env::var("ITERATIONS")