From 3ce2bea63a1f1c76a2b4cb54145b8134e8bd1516 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Tue, 6 Jul 2021 10:33:33 +0200 Subject: [PATCH] Unify maintenance of open buffers into `Worktree::poll_snapshot` --- zed/src/worktree.rs | 314 ++++++++++++++++++++------------------------ 1 file changed, 143 insertions(+), 171 deletions(-) diff --git a/zed/src/worktree.rs b/zed/src/worktree.rs index 2d436ab669..379c33418b 100644 --- a/zed/src/worktree.rs +++ b/zed/src/worktree.rs @@ -181,8 +181,7 @@ impl Worktree { }; let (updates_tx, mut updates_rx) = postage::mpsc::channel(64); - let (mut snapshot_tx, mut snapshot_rx) = - postage::watch::channel_with(snapshot.clone()); + let (mut snapshot_tx, snapshot_rx) = watch::channel_with(snapshot.clone()); cx.background() .spawn(async move { @@ -196,26 +195,25 @@ impl Worktree { }) .detach(); - cx.spawn_weak(|this, mut cx| async move { - while let Some(snapshot) = snapshot_rx.recv().await { - if let Some(this) = cx.read(|cx| this.upgrade(cx)) { - this.update(&mut cx, |this, cx| { - let this = this.as_remote_mut().unwrap(); - this.snapshot = snapshot; - cx.notify(); - this.update_open_buffers(cx); - }); - } else { - break; + { + let mut snapshot_rx = snapshot_rx.clone(); + cx.spawn_weak(|this, mut cx| async move { + while let Some(_) = snapshot_rx.recv().await { + if let Some(this) = cx.read(|cx| this.upgrade(cx)) { + this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); + } else { + break; + } } - } - }) - .detach(); + }) + .detach(); + } Worktree::Remote(RemoteWorktree { remote_id, replica_id, snapshot, + snapshot_rx, updates_tx, rpc: rpc.clone(), open_buffers: Default::default(), @@ -392,6 +390,112 @@ impl Worktree { )) } } + + fn poll_snapshot(&mut self, cx: &mut ModelContext) { + let update_buffers = match self { + Self::Local(worktree) => { + worktree.snapshot = worktree.background_snapshot.lock().clone(); + if worktree.is_scanning() { + if !worktree.poll_scheduled { + cx.spawn(|this, mut cx| async move { + smol::Timer::after(Duration::from_millis(100)).await; + this.update(&mut cx, |this, cx| { + this.as_local_mut().unwrap().poll_scheduled = false; + this.poll_snapshot(cx); + }) + }) + .detach(); + worktree.poll_scheduled = true; + } + false + } else { + true + } + } + Self::Remote(worktree) => { + worktree.snapshot = worktree.snapshot_rx.borrow().clone(); + true + } + }; + + if update_buffers { + let mut buffers_to_delete = Vec::new(); + for (buffer_id, buffer) in self.open_buffers() { + if let Some(buffer) = buffer.upgrade(&cx) { + buffer.update(cx, |buffer, cx| { + let buffer_is_clean = !buffer.is_dirty(); + + if let Some(file) = buffer.file_mut() { + let mut file_changed = false; + + if let Some(entry) = file + .entry_id + .and_then(|entry_id| self.entry_for_id(entry_id)) + { + if entry.path != file.path { + file.path = entry.path.clone(); + file_changed = true; + } + + if entry.mtime != file.mtime { + file.mtime = entry.mtime; + file_changed = true; + if let Some(worktree) = self.as_local() { + if buffer_is_clean { + let abs_path = worktree.absolutize(&file.path); + refresh_buffer(abs_path, cx); + } + } + } + } else if let Some(entry) = self.entry_for_path(&file.path) { + file.entry_id = Some(entry.id); + file.mtime = entry.mtime; + if let Some(worktree) = self.as_local() { + if buffer_is_clean { + let abs_path = worktree.absolutize(&file.path); + refresh_buffer(abs_path, cx); + } + } + file_changed = true; + } else if !file.is_deleted() { + if buffer_is_clean { + cx.emit(editor::buffer::Event::Dirtied); + } + file.entry_id = None; + file_changed = true; + } + + if file_changed { + cx.emit(editor::buffer::Event::FileHandleChanged); + } + } + }); + } else { + buffers_to_delete.push(*buffer_id); + } + } + + for buffer_id in buffers_to_delete { + self.open_buffers_mut().remove(&buffer_id); + } + } + + cx.notify(); + } + + fn open_buffers(&self) -> &HashMap> { + match self { + Self::Local(worktree) => &worktree.open_buffers, + Self::Remote(worktree) => &worktree.open_buffers, + } + } + + fn open_buffers_mut(&mut self) -> &mut HashMap> { + match self { + Self::Local(worktree) => &mut worktree.open_buffers, + Self::Remote(worktree) => &mut worktree.open_buffers, + } + } } impl Deref for Worktree { @@ -409,7 +513,7 @@ pub struct LocalWorktree { snapshot: Snapshot, background_snapshot: Arc>, snapshots_to_send_tx: Option>, - scan_state: (watch::Sender, watch::Receiver), + last_scan_state_rx: watch::Receiver, _event_stream_handle: fsevent::Handle, poll_scheduled: bool, rpc: Option<(rpc::Client, u64)>, @@ -426,7 +530,8 @@ impl LocalWorktree { cx: &mut ModelContext, ) -> Self { let abs_path = path.into(); - let (scan_state_tx, scan_state_rx) = smol::channel::unbounded(); + let (scan_states_tx, scan_states_rx) = smol::channel::unbounded(); + let (mut last_scan_state_tx, last_scan_state_rx) = watch::channel_with(ScanState::Scanning); let id = cx.model_id(); let snapshot = Snapshot { id, @@ -449,7 +554,7 @@ impl LocalWorktree { snapshot, background_snapshot: background_snapshot.clone(), snapshots_to_send_tx: None, - scan_state: watch::channel_with(ScanState::Scanning), + last_scan_state_rx, _event_stream_handle: event_stream_handle, poll_scheduled: false, open_buffers: Default::default(), @@ -460,17 +565,26 @@ impl LocalWorktree { }; std::thread::spawn(move || { - let scanner = BackgroundScanner::new(background_snapshot, scan_state_tx, id); + let scanner = BackgroundScanner::new(background_snapshot, scan_states_tx, id); scanner.run(event_stream) }); cx.spawn_weak(|this, mut cx| async move { - while let Ok(scan_state) = scan_state_rx.recv().await { + while let Ok(scan_state) = scan_states_rx.recv().await { if let Some(handle) = cx.read(|cx| this.upgrade(&cx)) { handle.update(&mut cx, |this, cx| { - this.as_local_mut() - .unwrap() - .observe_scan_state(scan_state, cx) + last_scan_state_tx.blocking_send(scan_state).ok(); + this.poll_snapshot(cx); + let tree = this.as_local_mut().unwrap(); + if !tree.is_scanning() { + if let Some(snapshots_to_send_tx) = tree.snapshots_to_send_tx.clone() { + if let Err(err) = + smol::block_on(snapshots_to_send_tx.send(tree.snapshot())) + { + log::error!("error submitting snapshot to send {}", err); + } + } + } }); } else { break; @@ -600,7 +714,7 @@ impl LocalWorktree { } pub fn scan_complete(&self) -> impl Future { - let mut scan_state_rx = self.scan_state.1.clone(); + let mut scan_state_rx = self.last_scan_state_rx.clone(); async move { let mut scan_state = Some(scan_state_rx.borrow().clone()); while let Some(ScanState::Scanning) = scan_state { @@ -609,96 +723,8 @@ impl LocalWorktree { } } - fn observe_scan_state(&mut self, scan_state: ScanState, cx: &mut ModelContext) { - self.scan_state.0.blocking_send(scan_state).ok(); - self.poll_snapshot(cx); - if !self.is_scanning() { - if let Some(snapshots_to_send_tx) = self.snapshots_to_send_tx.clone() { - if let Err(err) = smol::block_on(snapshots_to_send_tx.send(self.snapshot())) { - log::error!("error submitting snapshot to send {}", err); - } - } - } - } - - fn poll_snapshot(&mut self, cx: &mut ModelContext) { - self.snapshot = self.background_snapshot.lock().clone(); - if self.is_scanning() { - if !self.poll_scheduled { - cx.spawn(|this, mut cx| async move { - smol::Timer::after(Duration::from_millis(100)).await; - this.update(&mut cx, |this, cx| { - let worktree = this.as_local_mut().unwrap(); - worktree.poll_scheduled = false; - worktree.poll_snapshot(cx); - }) - }) - .detach(); - self.poll_scheduled = true; - } - } else { - let mut buffers_to_delete = Vec::new(); - for (buffer_id, buffer) in &self.open_buffers { - if let Some(buffer) = buffer.upgrade(&cx) { - buffer.update(cx, |buffer, cx| { - let buffer_is_clean = !buffer.is_dirty(); - - if let Some(file) = buffer.file_mut() { - let mut file_changed = false; - - if let Some(entry) = file - .entry_id - .and_then(|entry_id| self.entry_for_id(entry_id)) - { - if entry.path != file.path { - file.path = entry.path.clone(); - file_changed = true; - } - - if entry.mtime != file.mtime { - file.mtime = entry.mtime; - file_changed = true; - if buffer_is_clean { - let abs_path = self.absolutize(&file.path); - refresh_buffer(abs_path, cx); - } - } - } else if let Some(entry) = self.entry_for_path(&file.path) { - file.entry_id = Some(entry.id); - file.mtime = entry.mtime; - if buffer_is_clean { - let abs_path = self.absolutize(&file.path); - refresh_buffer(abs_path, cx); - } - file_changed = true; - } else if !file.is_deleted() { - if buffer_is_clean { - cx.emit(editor::buffer::Event::Dirtied); - } - file.entry_id = None; - file_changed = true; - } - - if file_changed { - cx.emit(editor::buffer::Event::FileHandleChanged); - } - } - }); - } else { - buffers_to_delete.push(*buffer_id); - } - } - - for buffer_id in buffers_to_delete { - self.open_buffers.remove(&buffer_id); - } - } - - cx.notify(); - } - fn is_scanning(&self) -> bool { - if let ScanState::Scanning = *self.scan_state.1.borrow() { + if let ScanState::Scanning = *self.last_scan_state_rx.borrow() { true } else { false @@ -736,10 +762,7 @@ impl LocalWorktree { file.read_to_string(&mut text).await?; // Eagerly populate the snapshot with an updated entry for the loaded file let entry = refresh_entry(&background_snapshot, path, &abs_path)?; - this.update(&mut cx, |this, cx| { - let this = this.as_local_mut().unwrap(); - this.poll_snapshot(cx); - }); + this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); Ok((File::new(entry.id, handle, entry.path, entry.mtime), text)) }) } @@ -787,9 +810,7 @@ impl LocalWorktree { cx.spawn(|this, mut cx| async move { let entry = save.await?; - this.update(&mut cx, |this, cx| { - this.as_local_mut().unwrap().poll_snapshot(cx); - }); + this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); Ok(entry) }) } @@ -902,6 +923,7 @@ impl fmt::Debug for LocalWorktree { pub struct RemoteWorktree { remote_id: u64, snapshot: Snapshot, + snapshot_rx: watch::Receiver, rpc: rpc::Client, updates_tx: postage::mpsc::Sender, replica_id: ReplicaId, @@ -1000,56 +1022,6 @@ impl RemoteWorktree { cx.notify(); Ok(()) } - - fn update_open_buffers(&mut self, cx: &mut ModelContext) { - let mut buffers_to_delete = Vec::new(); - for (buffer_id, buffer) in &self.open_buffers { - if let Some(buffer) = buffer.upgrade(&cx) { - buffer.update(cx, |buffer, cx| { - let buffer_is_clean = !buffer.is_dirty(); - - if let Some(file) = buffer.file_mut() { - let mut file_changed = false; - - if let Some(entry) = file - .entry_id - .and_then(|entry_id| self.snapshot.entry_for_id(entry_id)) - { - if entry.path != file.path { - file.path = entry.path.clone(); - file_changed = true; - } - - if entry.mtime != file.mtime { - file.mtime = entry.mtime; - file_changed = true; - } - } else if let Some(entry) = self.snapshot.entry_for_path(&file.path) { - file.entry_id = Some(entry.id); - file.mtime = entry.mtime; - file_changed = true; - } else if !file.is_deleted() { - if buffer_is_clean { - cx.emit(editor::buffer::Event::Dirtied); - } - file.entry_id = None; - file_changed = true; - } - - if file_changed { - cx.emit(editor::buffer::Event::FileHandleChanged); - } - } - }); - } else { - buffers_to_delete.push(*buffer_id); - } - } - - for buffer_id in buffers_to_delete { - self.open_buffers.remove(&buffer_id); - } - } } #[derive(Clone)]