Replace remaining usages of finish_pending_tasks with condition

This commit is contained in:
Antonio Scandurra 2021-04-20 12:28:30 +02:00
parent ebb7124405
commit cd7dccd30c
4 changed files with 50 additions and 225 deletions

View file

@ -325,10 +325,6 @@ impl TestAppContext {
result result
} }
pub fn finish_pending_tasks(&self) -> impl Future<Output = ()> {
self.0.borrow().finish_pending_tasks()
}
pub fn font_cache(&self) -> Arc<FontCache> { pub fn font_cache(&self) -> Arc<FontCache> {
self.0.borrow().font_cache.clone() self.0.borrow().font_cache.clone()
} }
@ -1066,6 +1062,12 @@ impl MutableAppContext {
.or_default() .or_default()
.updated .updated
.insert(view_id); .insert(view_id);
if let Entry::Occupied(mut entry) = self.async_observations.entry(view_id) {
if entry.get_mut().blocking_send(()).is_err() {
entry.remove_entry();
}
}
} }
fn focus(&mut self, window_id: usize, focused_id: usize) { fn focus(&mut self, window_id: usize, focused_id: usize) {
@ -1207,40 +1209,6 @@ impl MutableAppContext {
result result
} }
pub fn finish_pending_tasks(&self) -> impl Future<Output = ()> {
let mut pending_tasks = self
.future_handlers
.borrow()
.keys()
.cloned()
.collect::<HashSet<_>>();
pending_tasks.extend(self.stream_handlers.borrow().keys());
let task_done = self.task_done.clone();
let future_handlers = self.future_handlers.clone();
let stream_handlers = self.stream_handlers.clone();
async move {
// A Condvar expects the condition to be protected by a Mutex, but in this case we know
// that this logic will always run on the main thread.
let mutex = async_std::sync::Mutex::new(());
loop {
{
let future_handlers = future_handlers.borrow();
let stream_handlers = stream_handlers.borrow();
pending_tasks.retain(|task_id| {
future_handlers.contains_key(task_id)
|| stream_handlers.contains_key(task_id)
});
if pending_tasks.is_empty() {
break;
}
}
task_done.wait(mutex.lock().await).await;
}
}
}
pub fn write_to_clipboard(&self, item: ClipboardItem) { pub fn write_to_clipboard(&self, item: ClipboardItem) {
self.platform.write_to_clipboard(item); self.platform.write_to_clipboard(item);
} }
@ -3388,98 +3356,4 @@ mod tests {
// assert!(invalidation.removed.is_empty()); // assert!(invalidation.removed.is_empty());
// }); // });
// } // }
#[test]
fn test_finish_pending_tasks() {
struct View;
impl Entity for View {
type Event = ();
}
impl super::View for View {
fn render<'a>(&self, _: &AppContext) -> ElementBox {
Empty::new().boxed()
}
fn ui_name() -> &'static str {
"View"
}
}
struct Model;
impl Entity for Model {
type Event = ();
}
App::test_async((), |mut app| async move {
let model = app.add_model(|_| Model);
let (_, view) = app.add_window(|_| View);
model.update(&mut app, |_, ctx| {
ctx.spawn(async {}, |_, _, _| {}).detach();
// Cancel this task
drop(ctx.spawn(async {}, |_, _, _| {}));
});
view.update(&mut app, |_, ctx| {
ctx.spawn(async {}, |_, _, _| {}).detach();
// Cancel this task
drop(ctx.spawn(async {}, |_, _, _| {}));
});
assert!(!app.0.borrow().future_handlers.borrow().is_empty());
app.finish_pending_tasks().await;
assert!(app.0.borrow().future_handlers.borrow().is_empty());
app.finish_pending_tasks().await; // Don't block if there are no tasks
model.update(&mut app, |_, ctx| {
ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {})
.detach();
// Cancel this task
drop(ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}));
});
view.update(&mut app, |_, ctx| {
ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {})
.detach();
// Cancel this task
drop(ctx.spawn_stream(smol::stream::iter(vec![1, 2, 3]), |_, _, _| {}, |_, _| {}));
});
assert!(!app.0.borrow().stream_handlers.borrow().is_empty());
app.finish_pending_tasks().await;
assert!(app.0.borrow().stream_handlers.borrow().is_empty());
app.finish_pending_tasks().await; // Don't block if there are no tasks
// Tasks are considered finished when we drop handles
let mut tasks = Vec::new();
model.update(&mut app, |_, ctx| {
tasks.push(Box::new(ctx.spawn(async {}, |_, _, _| {})));
tasks.push(Box::new(ctx.spawn_stream(
smol::stream::iter(vec![1, 2, 3]),
|_, _, _| {},
|_, _| {},
)));
});
view.update(&mut app, |_, ctx| {
tasks.push(Box::new(ctx.spawn(async {}, |_, _, _| {})));
tasks.push(Box::new(ctx.spawn_stream(
smol::stream::iter(vec![1, 2, 3]),
|_, _, _| {},
|_, _| {},
)));
});
assert!(!app.0.borrow().stream_handlers.borrow().is_empty());
let finish_pending_tasks = app.finish_pending_tasks();
drop(tasks);
finish_pending_tasks.await;
assert!(app.0.borrow().stream_handlers.borrow().is_empty());
app.finish_pending_tasks().await; // Don't block if there are no tasks
});
}
} }

View file

@ -6,7 +6,6 @@ use simplelog::SimpleLogger;
use std::{ use std::{
collections::BTreeMap, collections::BTreeMap,
path::{Path, PathBuf}, path::{Path, PathBuf},
time::{Duration, Instant},
}; };
use tempdir::TempDir; use tempdir::TempDir;
@ -144,18 +143,3 @@ fn write_tree(path: &Path, tree: serde_json::Value) {
panic!("You must pass a JSON object to this helper") panic!("You must pass a JSON object to this helper")
} }
} }
pub async fn assert_condition(poll_interval: u64, timeout: u64, mut f: impl FnMut() -> bool) {
let poll_interval = Duration::from_millis(poll_interval);
let timeout = Duration::from_millis(timeout);
let start = Instant::now();
loop {
if f() {
return;
} else if Instant::now().duration_since(start) < timeout {
smol::Timer::after(poll_interval).await;
} else {
panic!("timed out waiting on condition");
}
}
}

View file

@ -106,11 +106,10 @@ impl Pane {
} }
pub fn activate_entry(&mut self, entry_id: (usize, u64), ctx: &mut ViewContext<Self>) -> bool { pub fn activate_entry(&mut self, entry_id: (usize, u64), ctx: &mut ViewContext<Self>) -> bool {
if let Some(index) = self if let Some(index) = self.items.iter().position(|item| {
.items item.entry_id(ctx.as_ref())
.iter() .map_or(false, |id| id == entry_id)
.position(|item| item.entry_id(ctx.as_ref()).map_or(false, |id| id == entry_id)) }) {
{
self.activate_item(index, ctx); self.activate_item(index, ctx);
true true
} else { } else {

View file

@ -385,9 +385,9 @@ mod tests {
App::test_async((), |mut app| async move { App::test_async((), |mut app| async move {
let dir = temp_tree(json!({ let dir = temp_tree(json!({
"a": { "a": {
"aa": "aa contents", "file1": "contents 1",
"ab": "ab contents", "file2": "contents 2",
"ac": "ab contents", "file3": "contents 3",
}, },
})); }));
@ -396,74 +396,44 @@ mod tests {
app.read(|ctx| workspace.read(ctx).worktree_scans_complete(ctx)) app.read(|ctx| workspace.read(ctx).worktree_scans_complete(ctx))
.await; .await;
let entries = app.read(|ctx| workspace.file_entries(ctx)); let entries = app.read(|ctx| workspace.file_entries(ctx));
let file1 = entries[0];
let file2 = entries[1];
let file3 = entries[2];
let (_, workspace_view) = let (_, workspace_view) =
app.add_window(|ctx| WorkspaceView::new(workspace.clone(), settings, ctx)); app.add_window(|ctx| WorkspaceView::new(workspace.clone(), settings, ctx));
let pane = app.read(|ctx| workspace_view.read(ctx).active_pane().clone());
// Open the first entry // Open the first entry
workspace_view.update(&mut app, |w, ctx| w.open_entry(entries[0], ctx)); workspace_view.update(&mut app, |w, ctx| w.open_entry(file1, ctx));
pane.condition(&app, |pane, _| pane.items().len() == 1)
workspace_view
.condition(&app, |workspace_view, ctx| {
workspace_view.active_pane().read(ctx).items().len() == 1
})
.await; .await;
// Open the second entry // Open the second entry
workspace_view.update(&mut app, |w, ctx| w.open_entry(entries[1], ctx)); workspace_view.update(&mut app, |w, ctx| w.open_entry(file2, ctx));
pane.condition(&app, |pane, _| pane.items().len() == 2)
workspace_view
.condition(&app, |workspace_view, ctx| {
workspace_view.active_pane().read(ctx).items().len() == 2
})
.await; .await;
app.read(|ctx| { app.read(|ctx| {
assert_eq!( let pane = pane.read(ctx);
workspace_view assert_eq!(pane.active_item().unwrap().entry_id(ctx), Some(file2));
.read(ctx)
.active_pane()
.read(ctx)
.active_item()
.unwrap()
.entry_id(ctx),
Some(entries[1])
);
}); });
// Open the first entry again // Open the first entry again
workspace_view.update(&mut app, |w, ctx| w.open_entry(entries[0], ctx)); workspace_view.update(&mut app, |w, ctx| w.open_entry(file1, ctx));
pane.condition(&app, move |pane, ctx| {
{ pane.active_item().unwrap().entry_id(ctx) == Some(file1)
let entries = entries.clone(); })
workspace_view .await;
.condition(&app, move |workspace_view, ctx| {
workspace_view
.active_pane()
.read(ctx)
.active_item()
.unwrap()
.entry_id(ctx)
== Some(entries[0])
})
.await;
}
app.read(|ctx| { app.read(|ctx| {
let active_pane = workspace_view.read(ctx).active_pane().read(ctx); assert_eq!(pane.read(ctx).items().len(), 2);
assert_eq!(active_pane.items().len(), 2);
}); });
// Open the third entry twice concurrently // Open the third entry twice concurrently
workspace_view.update(&mut app, |w, ctx| { workspace_view.update(&mut app, |w, ctx| {
w.open_entry(entries[2], ctx); w.open_entry(file3, ctx);
w.open_entry(entries[2], ctx); w.open_entry(file3, ctx);
}); });
pane.condition(&app, |pane, _| pane.items().len() == 3)
workspace_view
.condition(&app, |workspace_view, ctx| {
workspace_view.active_pane().read(ctx).items().len() == 3
})
.await; .await;
}); });
} }
@ -475,44 +445,42 @@ mod tests {
let dir = temp_tree(json!({ let dir = temp_tree(json!({
"a": { "a": {
"aa": "aa contents", "file1": "contents 1",
"ab": "ab contents", "file2": "contents 2",
"ac": "ab contents", "file3": "contents 3",
}, },
})); }));
let settings = settings::channel(&app.font_cache()).unwrap().1; let settings = settings::channel(&app.font_cache()).unwrap().1;
let workspace = app.add_model(|ctx| Workspace::new(vec![dir.path().into()], ctx)); let workspace = app.add_model(|ctx| Workspace::new(vec![dir.path().into()], ctx));
app.finish_pending_tasks().await; // Open and populate worktree. app.read(|ctx| workspace.read(ctx).worktree_scans_complete(ctx))
.await;
let entries = app.read(|ctx| workspace.file_entries(ctx)); let entries = app.read(|ctx| workspace.file_entries(ctx));
let file1 = entries[0];
let (window_id, workspace_view) = let (window_id, workspace_view) =
app.add_window(|ctx| WorkspaceView::new(workspace.clone(), settings, ctx)); app.add_window(|ctx| WorkspaceView::new(workspace.clone(), settings, ctx));
workspace_view.update(&mut app, |w, ctx| w.open_entry(entries[0], ctx));
app.finish_pending_tasks().await;
let pane_1 = app.read(|ctx| workspace_view.read(ctx).active_pane().clone()); let pane_1 = app.read(|ctx| workspace_view.read(ctx).active_pane().clone());
workspace_view.update(&mut app, |w, ctx| w.open_entry(file1, ctx));
pane_1
.condition(&app, move |pane, ctx| {
pane.active_item().and_then(|i| i.entry_id(ctx)) == Some(file1)
})
.await;
app.dispatch_action(window_id, vec![pane_1.id()], "pane:split_right", ()); app.dispatch_action(window_id, vec![pane_1.id()], "pane:split_right", ());
app.update(|ctx| { app.update(|ctx| {
let pane_2 = workspace_view.read(ctx).active_pane().clone(); let pane_2 = workspace_view.read(ctx).active_pane().clone();
assert_ne!(pane_1, pane_2); assert_ne!(pane_1, pane_2);
assert_eq!( let pane2_item = pane_2.read(ctx).active_item().unwrap();
pane_2 assert_eq!(pane2_item.entry_id(ctx.as_ref()), Some(file1));
.read(ctx)
.active_item()
.unwrap()
.entry_id(ctx.as_ref()),
Some(entries[0])
);
ctx.dispatch_action(window_id, vec![pane_2.id()], "pane:close_active_item", ()); ctx.dispatch_action(window_id, vec![pane_2.id()], "pane:close_active_item", ());
let workspace_view = workspace_view.read(ctx);
let w = workspace_view.read(ctx); assert_eq!(workspace_view.panes.len(), 1);
assert_eq!(w.panes.len(), 1); assert_eq!(workspace_view.active_pane(), &pane_1);
assert_eq!(w.active_pane(), &pane_1);
}); });
}); });
} }