From 8a105bf12fd1589ac4ec14189cb932a69522cc57 Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Thu, 30 Jun 2022 18:04:31 -0700 Subject: [PATCH] WIP - try representing snapshots_to_send as a watch --- crates/project/src/worktree.rs | 196 ++++++++++++++++----------------- 1 file changed, 93 insertions(+), 103 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index e6af57bf5c..af7cf02cb8 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -134,7 +134,7 @@ enum ScanState { struct ShareState { project_id: u64, - snapshots_tx: Sender, + snapshots_tx: watch::Sender, _maintain_remote_snapshot: Option>>, } @@ -334,38 +334,9 @@ impl Worktree { fn poll_snapshot(&mut self, cx: &mut ModelContext) { match self { - Self::Local(worktree) => { - let is_fake_fs = worktree.fs.is_fake(); - worktree.snapshot = worktree.background_snapshot.lock().clone(); - if matches!(worktree.scan_state(), ScanState::Initializing) { - if worktree.poll_task.is_none() { - worktree.poll_task = Some(cx.spawn_weak(|this, mut cx| async move { - if is_fake_fs { - #[cfg(any(test, feature = "test-support"))] - cx.background().simulate_random_delay().await; - } else { - smol::Timer::after(Duration::from_millis(100)).await; - } - if let Some(this) = this.upgrade(&cx) { - this.update(&mut cx, |this, cx| { - this.as_local_mut().unwrap().poll_task = None; - this.poll_snapshot(cx); - }); - } - })); - } - } else { - worktree.poll_task.take(); - cx.emit(Event::UpdatedEntries); - } - } - Self::Remote(worktree) => { - worktree.snapshot = worktree.background_snapshot.lock().clone(); - cx.emit(Event::UpdatedEntries); - } + Self::Local(worktree) => worktree.poll_snapshot(cx), + Self::Remote(worktree) => worktree.poll_snapshot(cx), }; - - cx.notify(); } } @@ -441,9 +412,8 @@ impl LocalWorktree { last_scan_state_tx.blocking_send(scan_state).ok(); this.update(&mut cx, |this, cx| { this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; + this.as_local_mut().unwrap().broadcast_snapshot() + }); } else { break; } @@ -527,6 +497,40 @@ impl LocalWorktree { Ok(updated) } + fn poll_snapshot(&mut self, cx: &mut ModelContext) { + match self.scan_state() { + ScanState::Idle => { + self.snapshot = self.background_snapshot.lock().clone(); + self.poll_task.take(); + cx.emit(Event::UpdatedEntries); + } + ScanState::Initializing => { + self.snapshot = self.background_snapshot.lock().clone(); + if self.poll_task.is_none() { + let is_fake_fs = self.fs.is_fake(); + self.poll_task = Some(cx.spawn_weak(|this, mut cx| async move { + if is_fake_fs { + #[cfg(any(test, feature = "test-support"))] + cx.background().simulate_random_delay().await; + } else { + smol::Timer::after(Duration::from_millis(100)).await; + } + if let Some(this) = this.upgrade(&cx) { + this.update(&mut cx, |this, cx| { + this.as_local_mut().unwrap().poll_task = None; + this.poll_snapshot(cx); + }); + } + })); + } + cx.emit(Event::UpdatedEntries); + } + ScanState::Updating => {} + ScanState::Err(_) => {} + } + cx.notify(); + } + pub fn scan_complete(&self) -> impl Future { let mut scan_state_rx = self.last_scan_state_rx.clone(); async move { @@ -666,16 +670,15 @@ impl LocalWorktree { Some(cx.spawn(|this, mut cx| async move { delete.await?; - this.update(&mut cx, |this, _| { - let this = this.as_local_mut().unwrap(); - let mut snapshot = this.background_snapshot.lock(); - snapshot.delete_entry(entry_id); - }); this.update(&mut cx, |this, cx| { + let this = this.as_local_mut().unwrap(); + { + let mut snapshot = this.background_snapshot.lock(); + snapshot.delete_entry(entry_id); + } this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; + this.broadcast_snapshot(); + }); Ok(()) })) } @@ -712,10 +715,10 @@ impl LocalWorktree { }) .await?; this.update(&mut cx, |this, cx| { + let this = this.as_local_mut().unwrap(); this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; + this.broadcast_snapshot(); + }); Ok(entry) })) } @@ -752,10 +755,10 @@ impl LocalWorktree { }) .await?; this.update(&mut cx, |this, cx| { + let this = this.as_local_mut().unwrap(); this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; + this.broadcast_snapshot() + }); Ok(entry) })) } @@ -790,10 +793,10 @@ impl LocalWorktree { }) .await?; this.update(&mut cx, |this, cx| { + let this = this.as_local_mut().unwrap(); this.poll_snapshot(cx); - this.as_local().unwrap().broadcast_snapshot() - }) - .await; + this.broadcast_snapshot(); + }); Ok(entry) }) } @@ -826,45 +829,42 @@ impl LocalWorktree { let this = this .upgrade(&cx) .ok_or_else(|| anyhow!("worktree was dropped"))?; - let (entry, snapshot, snapshots_tx) = this.read_with(&cx, |this, _| { - let this = this.as_local().unwrap(); - let mut snapshot = this.background_snapshot.lock(); - entry.is_ignored = snapshot - .ignore_stack_for_path(&path, entry.is_dir()) - .is_path_ignored(&path, entry.is_dir()); - if let Some(old_path) = old_path { - snapshot.remove_path(&old_path); + this.update(&mut cx, |this, cx| { + let this = this.as_local_mut().unwrap(); + let inserted_entry; + { + let mut snapshot = this.background_snapshot.lock(); + entry.is_ignored = snapshot + .ignore_stack_for_path(&path, entry.is_dir()) + .is_path_ignored(&path, entry.is_dir()); + if let Some(old_path) = old_path { + snapshot.remove_path(&old_path); + } + inserted_entry = snapshot.insert_entry(entry, fs.as_ref()); + snapshot.scan_id += 1; } - let entry = snapshot.insert_entry(entry, fs.as_ref()); - snapshot.scan_id += 1; - let snapshots_tx = this.share.as_ref().map(|s| s.snapshots_tx.clone()); - (entry, snapshot.clone(), snapshots_tx) - }); - this.update(&mut cx, |this, cx| this.poll_snapshot(cx)); - - if let Some(snapshots_tx) = snapshots_tx { - snapshots_tx.send(snapshot).await.ok(); - } - - Ok(entry) + this.poll_snapshot(cx); + this.broadcast_snapshot(); + Ok(inserted_entry) + }) }) } pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { let (share_tx, share_rx) = oneshot::channel(); - let (snapshots_to_send_tx, snapshots_to_send_rx) = - smol::channel::unbounded::(); + if self.share.is_some() { let _ = share_tx.send(Ok(())); } else { + let (snapshots_tx, mut snapshots_rx) = watch::channel_with(self.snapshot()); let rpc = self.client.clone(); let worktree_id = cx.model_id() as u64; let maintain_remote_snapshot = cx.background().spawn({ let rpc = rpc.clone(); let diagnostic_summaries = self.diagnostic_summaries.clone(); async move { - let mut prev_snapshot = match snapshots_to_send_rx.recv().await { - Ok(snapshot) => { + let mut prev_snapshot = match snapshots_rx.recv().await { + Some(snapshot) => { let update = proto::UpdateWorktree { project_id, worktree_id, @@ -886,8 +886,10 @@ impl LocalWorktree { snapshot } } - Err(error) => { - let _ = share_tx.send(Err(error.into())); + None => { + share_tx + .send(Err(anyhow!("worktree dropped before share completed"))) + .ok(); return Err(anyhow!("failed to send initial update worktree")); } }; @@ -900,11 +902,7 @@ impl LocalWorktree { })?; } - while let Ok(mut snapshot) = snapshots_to_send_rx.recv().await { - while let Ok(newer_snapshot) = snapshots_to_send_rx.try_recv() { - snapshot = newer_snapshot; - } - + while let Some(snapshot) = snapshots_rx.recv().await { send_worktree_update( &rpc, snapshot.build_update(&prev_snapshot, project_id, worktree_id, true), @@ -919,18 +917,12 @@ impl LocalWorktree { }); self.share = Some(ShareState { project_id, - snapshots_tx: snapshots_to_send_tx.clone(), + snapshots_tx, _maintain_remote_snapshot: Some(maintain_remote_snapshot), }); } - cx.spawn_weak(|this, cx| async move { - if let Some(this) = this.upgrade(&cx) { - this.read_with(&cx, |this, _| { - let this = this.as_local().unwrap(); - let _ = snapshots_to_send_tx.try_send(this.snapshot()); - }); - } + cx.foreground().spawn(async move { share_rx .await .unwrap_or_else(|_| Err(anyhow!("share ended"))) @@ -945,19 +937,11 @@ impl LocalWorktree { self.share.is_some() } - fn broadcast_snapshot(&self) -> impl Future { - let mut to_send = None; + fn broadcast_snapshot(&mut self) { if matches!(self.scan_state(), ScanState::Idle) { - if let Some(share) = self.share.as_ref() { - to_send = Some((self.snapshot(), share.snapshots_tx.clone())); - } - } - - async move { - if let Some((snapshot, snapshots_to_send_tx)) = to_send { - if let Err(err) = snapshots_to_send_tx.send(snapshot).await { - log::error!("error submitting snapshot to send {}", err); - } + let snapshot = self.snapshot(); + if let Some(share) = self.share.as_mut() { + *share.snapshots_tx.borrow_mut() = snapshot; } } } @@ -968,6 +952,12 @@ impl RemoteWorktree { self.snapshot.clone() } + fn poll_snapshot(&mut self, cx: &mut ModelContext) { + self.snapshot = self.background_snapshot.lock().clone(); + cx.emit(Event::UpdatedEntries); + cx.notify(); + } + pub fn disconnected_from_host(&mut self) { self.updates_tx.take(); self.snapshot_subscriptions.clear();