diff --git a/Cargo.lock b/Cargo.lock index 36c855e9e0..1eda309563 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2159,6 +2159,7 @@ dependencies = [ "etagere", "font-kit", "foreign-types", + "futures", "gpui_macros", "log", "metal", diff --git a/gpui/Cargo.toml b/gpui/Cargo.toml index 7b8f6f9c15..a6d11dfadb 100644 --- a/gpui/Cargo.toml +++ b/gpui/Cargo.toml @@ -9,6 +9,7 @@ async-task = "4.0.3" backtrace = "0.3" ctor = "0.1" etagere = "0.2" +futures = "0.3" gpui_macros = { path = "../gpui_macros" } log = "0.4" num_cpus = "1.13" diff --git a/gpui/src/executor.rs b/gpui/src/executor.rs index a7c2eaaecc..5a1843c561 100644 --- a/gpui/src/executor.rs +++ b/gpui/src/executor.rs @@ -2,6 +2,7 @@ use anyhow::{anyhow, Result}; use async_task::Runnable; pub use async_task::Task; use backtrace::{Backtrace, BacktraceFmt, BytesOrWideString}; +use futures::task::noop_waker; use parking_lot::Mutex; use rand::prelude::*; use smol::{channel, prelude::*, Executor}; @@ -13,13 +14,14 @@ use std::{ rc::Rc, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, - mpsc::Sender, Arc, }, + task::{Context, Poll}, thread, + time::Duration, }; -use crate::platform; +use crate::{platform, util}; pub enum Foreground { Platform { @@ -43,7 +45,6 @@ struct DeterministicState { seed: u64, scheduled: Vec<(Runnable, Backtrace)>, spawned_from_foreground: Vec<(Runnable, Backtrace)>, - waker: Option>, } pub struct Deterministic(Arc>); @@ -55,7 +56,6 @@ impl Deterministic { seed, scheduled: Default::default(), spawned_from_foreground: Default::default(), - waker: None, }))) } @@ -75,9 +75,6 @@ impl Deterministic { } else { state.spawned_from_foreground.push((runnable, backtrace)); } - if let Some(waker) = state.waker.as_ref() { - waker.send(()).ok(); - } }); runnable.schedule(); task @@ -91,11 +88,7 @@ impl Deterministic { let backtrace = Backtrace::new_unresolved(); let state = self.0.clone(); let (runnable, task) = async_task::spawn(future, move |runnable| { - let mut state = state.lock(); - state.scheduled.push((runnable, backtrace.clone())); - if let Some(waker) = state.waker.as_ref() { - waker.send(()).ok(); - } + state.lock().scheduled.push((runnable, backtrace.clone())); }); runnable.schedule(); task @@ -106,43 +99,49 @@ impl Deterministic { T: 'static, F: Future + 'static, { - let (wake_tx, wake_rx) = std::sync::mpsc::channel(); - let state = self.0.clone(); - state.lock().waker = Some(wake_tx); + self.block_on(usize::MAX, future).unwrap() + } - let (output_tx, output_rx) = std::sync::mpsc::channel(); - self.spawn_from_foreground(async move { - let output = future.await; - output_tx.send(output).unwrap(); - }) - .detach(); + pub fn block_on(&self, max_ticks: usize, future: F) -> Option + where + T: 'static, + F: Future, + { + smol::pin!(future); + let waker = noop_waker(); + let mut cx = Context::from_waker(&waker); let mut trace = Trace::default(); - loop { - if let Ok(value) = output_rx.try_recv() { - state.lock().waker = None; - return value; - } - - wake_rx.recv().unwrap(); + for _ in 0..max_ticks { let runnable = { - let state = &mut *state.lock(); - let ix = state - .rng - .gen_range(0..state.scheduled.len() + state.spawned_from_foreground.len()); + let state = &mut *self.0.lock(); + let runnable_count = state.scheduled.len() + state.spawned_from_foreground.len(); + let ix = state.rng.gen_range(0..=runnable_count); if ix < state.scheduled.len() { let (_, backtrace) = &state.scheduled[ix]; trace.record(&state, backtrace.clone()); state.scheduled.remove(ix).0 - } else { + } else if ix < runnable_count { let (_, backtrace) = &state.spawned_from_foreground[0]; trace.record(&state, backtrace.clone()); state.spawned_from_foreground.remove(0).0 + } else { + if let Poll::Ready(result) = future.as_mut().poll(&mut cx) { + return Some(result); + } + + if state.scheduled.is_empty() && state.spawned_from_foreground.is_empty() { + panic!("detected non-determinism in deterministic executor"); + } else { + continue; + } } }; runnable.run(); } + + None } } @@ -354,6 +353,22 @@ impl Background { } } + pub fn block_on(&self, timeout: Duration, future: F) -> Option + where + T: 'static, + F: Future, + { + match self { + Self::Production { .. } => { + smol::block_on(async move { util::timeout(timeout, future).await.ok() }) + } + Self::Deterministic(executor) => { + let max_ticks = executor.0.lock().rng.gen_range(1..=1000); + executor.block_on(max_ticks, future) + } + } + } + pub async fn scoped<'scope, F>(&self, scheduler: F) where F: FnOnce(&mut Scope<'scope>),