use anyhow::{anyhow, Result}; use async_task::Runnable; use futures::channel::mpsc; use smol::{channel, prelude::*, Executor}; use std::{ any::Any, fmt::{self, Display}, marker::PhantomData, mem, panic::Location, pin::Pin, rc::Rc, sync::Arc, task::{Context, Poll}, thread, time::Duration, }; use crate::{ platform::{self, Dispatcher}, util, AppContext, }; pub enum Foreground { Platform { dispatcher: Arc, _not_send_or_sync: PhantomData>, }, #[cfg(any(test, feature = "test-support"))] Deterministic { cx_id: usize, executor: Arc, }, } pub enum Background { #[cfg(any(test, feature = "test-support"))] Deterministic { executor: Arc }, Production { executor: Arc>, _stop: channel::Sender<()>, }, } type AnyLocalFuture = Pin>>>; type AnyFuture = Pin>>>; type AnyTask = async_task::Task>; type AnyLocalTask = async_task::Task>; #[must_use] pub enum Task { Ready(Option), Local { any_task: AnyLocalTask, result_type: PhantomData, }, Send { any_task: AnyTask, result_type: PhantomData, }, } unsafe impl Send for Task {} #[cfg(any(test, feature = "test-support"))] struct DeterministicState { rng: rand::prelude::StdRng, seed: u64, scheduled_from_foreground: collections::HashMap>, scheduled_from_background: Vec, forbid_parking: bool, block_on_ticks: std::ops::RangeInclusive, now: std::time::Instant, next_timer_id: usize, pending_timers: Vec<(usize, std::time::Instant, postage::barrier::Sender)>, waiting_backtrace: Option, next_runnable_id: usize, poll_history: Vec, previous_poll_history: Option>, enable_runnable_backtraces: bool, runnable_backtraces: collections::HashMap, } #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum ExecutorEvent { PollRunnable { id: usize }, EnqueuRunnable { id: usize }, } #[cfg(any(test, feature = "test-support"))] struct ForegroundRunnable { id: usize, runnable: Runnable, main: bool, } #[cfg(any(test, feature = "test-support"))] struct BackgroundRunnable { id: usize, runnable: Runnable, } #[cfg(any(test, feature = "test-support"))] pub struct Deterministic { state: Arc>, parker: parking_lot::Mutex, } pub enum Timer { Production(smol::Timer), #[cfg(any(test, feature = "test-support"))] Deterministic(DeterministicTimer), } #[cfg(any(test, feature = "test-support"))] pub struct DeterministicTimer { rx: postage::barrier::Receiver, id: usize, state: Arc>, } #[cfg(any(test, feature = "test-support"))] impl Deterministic { pub fn new(seed: u64) -> Arc { use rand::prelude::*; Arc::new(Self { state: Arc::new(parking_lot::Mutex::new(DeterministicState { rng: StdRng::seed_from_u64(seed), seed, scheduled_from_foreground: Default::default(), scheduled_from_background: Default::default(), forbid_parking: false, block_on_ticks: 0..=1000, now: std::time::Instant::now(), next_timer_id: Default::default(), pending_timers: Default::default(), waiting_backtrace: None, next_runnable_id: 0, poll_history: Default::default(), previous_poll_history: Default::default(), enable_runnable_backtraces: false, runnable_backtraces: Default::default(), })), parker: Default::default(), }) } pub fn execution_history(&self) -> Vec { self.state.lock().poll_history.clone() } pub fn set_previous_execution_history(&self, history: Option>) { self.state.lock().previous_poll_history = history; } pub fn enable_runnable_backtrace(&self) { self.state.lock().enable_runnable_backtraces = true; } pub fn runnable_backtrace(&self, runnable_id: usize) -> backtrace::Backtrace { let mut backtrace = self.state.lock().runnable_backtraces[&runnable_id].clone(); backtrace.resolve(); backtrace } pub fn build_background(self: &Arc) -> Arc { Arc::new(Background::Deterministic { executor: self.clone(), }) } pub fn build_foreground(self: &Arc, id: usize) -> Rc { Rc::new(Foreground::Deterministic { cx_id: id, executor: self.clone(), }) } fn spawn_from_foreground( &self, cx_id: usize, future: AnyLocalFuture, main: bool, ) -> AnyLocalTask { let state = self.state.clone(); let id; { let mut state = state.lock(); id = util::post_inc(&mut state.next_runnable_id); if state.enable_runnable_backtraces { state .runnable_backtraces .insert(id, backtrace::Backtrace::new_unresolved()); } } let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn_local(future, move |runnable| { let mut state = state.lock(); state.push_to_history(ExecutorEvent::EnqueuRunnable { id }); state .scheduled_from_foreground .entry(cx_id) .or_default() .push(ForegroundRunnable { id, runnable, main }); unparker.unpark(); }); runnable.schedule(); task } fn spawn(&self, future: AnyFuture) -> AnyTask { let state = self.state.clone(); let id; { let mut state = state.lock(); id = util::post_inc(&mut state.next_runnable_id); if state.enable_runnable_backtraces { state .runnable_backtraces .insert(id, backtrace::Backtrace::new_unresolved()); } } let unparker = self.parker.lock().unparker(); let (runnable, task) = async_task::spawn(future, move |runnable| { let mut state = state.lock(); state .poll_history .push(ExecutorEvent::EnqueuRunnable { id }); state .scheduled_from_background .push(BackgroundRunnable { id, runnable }); unparker.unpark(); }); runnable.schedule(); task } fn run<'a>( &self, cx_id: usize, main_future: Pin>>>, ) -> Box { use std::sync::atomic::{AtomicBool, Ordering::SeqCst}; let woken = Arc::new(AtomicBool::new(false)); let state = self.state.clone(); let id; { let mut state = state.lock(); id = util::post_inc(&mut state.next_runnable_id); if state.enable_runnable_backtraces { state .runnable_backtraces .insert(id, backtrace::Backtrace::new_unresolved()); } } let unparker = self.parker.lock().unparker(); let (runnable, mut main_task) = unsafe { async_task::spawn_unchecked(main_future, move |runnable| { let state = &mut *state.lock(); state .scheduled_from_foreground .entry(cx_id) .or_default() .push(ForegroundRunnable { id: util::post_inc(&mut state.next_runnable_id), runnable, main: true, }); unparker.unpark(); }) }; runnable.schedule(); loop { if let Some(result) = self.run_internal(woken.clone(), Some(&mut main_task)) { return result; } if !woken.load(SeqCst) { self.state.lock().will_park(); } woken.store(false, SeqCst); self.parker.lock().park(); } } pub fn run_until_parked(&self) { use std::sync::atomic::AtomicBool; let woken = Arc::new(AtomicBool::new(false)); self.run_internal(woken, None); } fn run_internal( &self, woken: Arc, mut main_task: Option<&mut AnyLocalTask>, ) -> Option> { use rand::prelude::*; use std::sync::atomic::Ordering::SeqCst; let unparker = self.parker.lock().unparker(); let waker = waker_fn::waker_fn(move || { woken.store(true, SeqCst); unparker.unpark(); }); let mut cx = Context::from_waker(&waker); loop { let mut state = self.state.lock(); if state.scheduled_from_foreground.is_empty() && state.scheduled_from_background.is_empty() { if let Some(main_task) = main_task { if let Poll::Ready(result) = main_task.poll(&mut cx) { return Some(result); } } return None; } if !state.scheduled_from_background.is_empty() && state.rng.gen() { let background_len = state.scheduled_from_background.len(); let ix = state.rng.gen_range(0..background_len); let background_runnable = state.scheduled_from_background.remove(ix); state.push_to_history(ExecutorEvent::PollRunnable { id: background_runnable.id, }); drop(state); background_runnable.runnable.run(); } else if !state.scheduled_from_foreground.is_empty() { let available_cx_ids = state .scheduled_from_foreground .keys() .copied() .collect::>(); let cx_id_to_run = *available_cx_ids.iter().choose(&mut state.rng).unwrap(); let scheduled_from_cx = state .scheduled_from_foreground .get_mut(&cx_id_to_run) .unwrap(); let foreground_runnable = scheduled_from_cx.remove(0); if scheduled_from_cx.is_empty() { state.scheduled_from_foreground.remove(&cx_id_to_run); } state.push_to_history(ExecutorEvent::PollRunnable { id: foreground_runnable.id, }); drop(state); foreground_runnable.runnable.run(); if let Some(main_task) = main_task.as_mut() { if foreground_runnable.main { if let Poll::Ready(result) = main_task.poll(&mut cx) { return Some(result); } } } } } } fn block(&self, future: &mut F, max_ticks: usize) -> Option where F: Unpin + Future, { use rand::prelude::*; let unparker = self.parker.lock().unparker(); let waker = waker_fn::waker_fn(move || { unparker.unpark(); }); let mut cx = Context::from_waker(&waker); for _ in 0..max_ticks { let mut state = self.state.lock(); let runnable_count = state.scheduled_from_background.len(); let ix = state.rng.gen_range(0..=runnable_count); if ix < state.scheduled_from_background.len() { let background_runnable = state.scheduled_from_background.remove(ix); state.push_to_history(ExecutorEvent::PollRunnable { id: background_runnable.id, }); drop(state); background_runnable.runnable.run(); } else { drop(state); if let Poll::Ready(result) = future.poll(&mut cx) { return Some(result); } let mut state = self.state.lock(); if state.scheduled_from_background.is_empty() { state.will_park(); drop(state); self.parker.lock().park(); } continue; } } None } pub fn timer(&self, duration: Duration) -> Timer { let (tx, rx) = postage::barrier::channel(); let mut state = self.state.lock(); let wakeup_at = state.now + duration; let id = util::post_inc(&mut state.next_timer_id); match state .pending_timers .binary_search_by_key(&wakeup_at, |e| e.1) { Ok(ix) | Err(ix) => state.pending_timers.insert(ix, (id, wakeup_at, tx)), } let state = self.state.clone(); Timer::Deterministic(DeterministicTimer { rx, id, state }) } pub fn now(&self) -> std::time::Instant { let state = self.state.lock(); state.now } pub fn advance_clock(&self, duration: Duration) { let new_now = self.state.lock().now + duration; loop { self.run_until_parked(); let mut state = self.state.lock(); if let Some((_, wakeup_time, _)) = state.pending_timers.first() { let wakeup_time = *wakeup_time; if wakeup_time <= new_now { let timer_count = state .pending_timers .iter() .take_while(|(_, t, _)| *t == wakeup_time) .count(); state.now = wakeup_time; let timers_to_wake = state .pending_timers .drain(0..timer_count) .collect::>(); drop(state); drop(timers_to_wake); continue; } } break; } self.state.lock().now = new_now; } pub fn start_waiting(&self) { self.state.lock().waiting_backtrace = Some(backtrace::Backtrace::new_unresolved()); } pub fn finish_waiting(&self) { self.state.lock().waiting_backtrace.take(); } pub fn forbid_parking(&self) { use rand::prelude::*; let mut state = self.state.lock(); state.forbid_parking = true; state.rng = StdRng::seed_from_u64(state.seed); } pub fn allow_parking(&self) { use rand::prelude::*; let mut state = self.state.lock(); state.forbid_parking = false; state.rng = StdRng::seed_from_u64(state.seed); } pub async fn simulate_random_delay(&self) { use rand::prelude::*; use smol::future::yield_now; if self.state.lock().rng.gen_bool(0.2) { let yields = self.state.lock().rng.gen_range(1..=10); for _ in 0..yields { yield_now().await; } } } pub fn record_backtrace(&self) { let mut state = self.state.lock(); if state.enable_runnable_backtraces { let current_id = state .poll_history .iter() .rev() .find_map(|event| match event { ExecutorEvent::PollRunnable { id } => Some(*id), _ => None, }); if let Some(id) = current_id { state .runnable_backtraces .insert(id, backtrace::Backtrace::new_unresolved()); } } } } impl Drop for Timer { fn drop(&mut self) { #[cfg(any(test, feature = "test-support"))] if let Timer::Deterministic(DeterministicTimer { state, id, .. }) = self { state .lock() .pending_timers .retain(|(timer_id, _, _)| timer_id != id) } } } impl Future for Timer { type Output = (); fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match &mut *self { #[cfg(any(test, feature = "test-support"))] Self::Deterministic(DeterministicTimer { rx, .. }) => { use postage::stream::{PollRecv, Stream as _}; smol::pin!(rx); match rx.poll_recv(&mut postage::Context::from_waker(cx.waker())) { PollRecv::Ready(()) | PollRecv::Closed => Poll::Ready(()), PollRecv::Pending => Poll::Pending, } } Self::Production(timer) => { smol::pin!(timer); match timer.poll(cx) { Poll::Ready(_) => Poll::Ready(()), Poll::Pending => Poll::Pending, } } } } } #[cfg(any(test, feature = "test-support"))] impl DeterministicState { fn push_to_history(&mut self, event: ExecutorEvent) { use std::fmt::Write as _; self.poll_history.push(event); if let Some(prev_history) = &self.previous_poll_history { let ix = self.poll_history.len() - 1; let prev_event = prev_history[ix]; if event != prev_event { let mut message = String::new(); writeln!( &mut message, "current runnable backtrace:\n{:?}", self.runnable_backtraces.get_mut(&event.id()).map(|trace| { trace.resolve(); util::CwdBacktrace(trace) }) ) .unwrap(); writeln!( &mut message, "previous runnable backtrace:\n{:?}", self.runnable_backtraces .get_mut(&prev_event.id()) .map(|trace| { trace.resolve(); util::CwdBacktrace(trace) }) ) .unwrap(); panic!("detected non-determinism after {ix}. {message}"); } } } fn will_park(&mut self) { if self.forbid_parking { let mut backtrace_message = String::new(); #[cfg(any(test, feature = "test-support"))] if let Some(backtrace) = self.waiting_backtrace.as_mut() { backtrace.resolve(); backtrace_message = format!( "\nbacktrace of waiting future:\n{:?}", util::CwdBacktrace(backtrace) ); } panic!( "deterministic executor parked after a call to forbid_parking{}", backtrace_message ); } } } #[cfg(any(test, feature = "test-support"))] impl ExecutorEvent { pub fn id(&self) -> usize { match self { ExecutorEvent::PollRunnable { id } => *id, ExecutorEvent::EnqueuRunnable { id } => *id, } } } impl Foreground { pub fn platform(dispatcher: Arc) -> Result { if dispatcher.is_main_thread() { Ok(Self::Platform { dispatcher, _not_send_or_sync: PhantomData, }) } else { Err(anyhow!("must be constructed on main thread")) } } pub fn spawn(&self, future: impl Future + 'static) -> Task { let future = any_local_future(future); let any_task = match self { #[cfg(any(test, feature = "test-support"))] Self::Deterministic { cx_id, executor } => { executor.spawn_from_foreground(*cx_id, future, false) } Self::Platform { dispatcher, .. } => { fn spawn_inner( future: AnyLocalFuture, dispatcher: &Arc, ) -> AnyLocalTask { let dispatcher = dispatcher.clone(); let schedule = move |runnable: Runnable| dispatcher.run_on_main_thread(runnable); let (runnable, task) = async_task::spawn_local(future, schedule); runnable.schedule(); task } spawn_inner(future, dispatcher) } }; Task::local(any_task) } #[cfg(any(test, feature = "test-support"))] pub fn run(&self, future: impl Future) -> T { let future = async move { Box::new(future.await) as Box }.boxed_local(); let result = match self { Self::Deterministic { cx_id, executor } => executor.run(*cx_id, future), Self::Platform { .. } => panic!("you can't call run on a platform foreground executor"), }; *result.downcast().unwrap() } #[cfg(any(test, feature = "test-support"))] pub fn run_until_parked(&self) { match self { Self::Deterministic { executor, .. } => executor.run_until_parked(), _ => panic!("this method can only be called on a deterministic executor"), } } #[cfg(any(test, feature = "test-support"))] pub fn parking_forbidden(&self) -> bool { match self { Self::Deterministic { executor, .. } => executor.state.lock().forbid_parking, _ => panic!("this method can only be called on a deterministic executor"), } } #[cfg(any(test, feature = "test-support"))] pub fn start_waiting(&self) { match self { Self::Deterministic { executor, .. } => executor.start_waiting(), _ => panic!("this method can only be called on a deterministic executor"), } } #[cfg(any(test, feature = "test-support"))] pub fn finish_waiting(&self) { match self { Self::Deterministic { executor, .. } => executor.finish_waiting(), _ => panic!("this method can only be called on a deterministic executor"), } } #[cfg(any(test, feature = "test-support"))] pub fn forbid_parking(&self) { match self { Self::Deterministic { executor, .. } => executor.forbid_parking(), _ => panic!("this method can only be called on a deterministic executor"), } } #[cfg(any(test, feature = "test-support"))] pub fn allow_parking(&self) { match self { Self::Deterministic { executor, .. } => executor.allow_parking(), _ => panic!("this method can only be called on a deterministic executor"), } } #[cfg(any(test, feature = "test-support"))] pub fn advance_clock(&self, duration: Duration) { match self { Self::Deterministic { executor, .. } => executor.advance_clock(duration), _ => panic!("this method can only be called on a deterministic executor"), } } #[cfg(any(test, feature = "test-support"))] pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive) { match self { Self::Deterministic { executor, .. } => executor.state.lock().block_on_ticks = range, _ => panic!("this method can only be called on a deterministic executor"), } } } impl Background { pub fn new() -> Self { let executor = Arc::new(Executor::new()); let stop = channel::unbounded::<()>(); for i in 0..2 * num_cpus::get() { let executor = executor.clone(); let stop = stop.1.clone(); thread::Builder::new() .name(format!("background-executor-{}", i)) .spawn(move || smol::block_on(executor.run(stop.recv()))) .unwrap(); } Self::Production { executor, _stop: stop.0, } } pub fn num_cpus(&self) -> usize { num_cpus::get() } pub fn spawn(&self, future: F) -> Task where T: 'static + Send, F: Send + Future + 'static, { let future = any_future(future); let any_task = match self { Self::Production { executor, .. } => executor.spawn(future), #[cfg(any(test, feature = "test-support"))] Self::Deterministic { executor } => executor.spawn(future), }; Task::send(any_task) } pub fn block(&self, future: F) -> T where F: Future, { smol::pin!(future); match self { Self::Production { .. } => smol::block_on(&mut future), #[cfg(any(test, feature = "test-support"))] Self::Deterministic { executor, .. } => { executor.block(&mut future, usize::MAX).unwrap() } } } pub fn block_with_timeout( &self, timeout: Duration, future: F, ) -> Result> where T: 'static, F: 'static + Unpin + Future, { let mut future = any_local_future(future); if !timeout.is_zero() { let output = match self { Self::Production { .. } => smol::block_on(util::timeout(timeout, &mut future)).ok(), #[cfg(any(test, feature = "test-support"))] Self::Deterministic { executor, .. } => { use rand::prelude::*; let max_ticks = { let mut state = executor.state.lock(); let range = state.block_on_ticks.clone(); state.rng.gen_range(range) }; executor.block(&mut future, max_ticks) } }; if let Some(output) = output { return Ok(*output.downcast().unwrap()); } } Err(async { *future.await.downcast().unwrap() }) } pub async fn scoped<'scope, F>(self: &Arc, scheduler: F) where F: FnOnce(&mut Scope<'scope>), { let mut scope = Scope::new(self.clone()); (scheduler)(&mut scope); let spawned = mem::take(&mut scope.futures) .into_iter() .map(|f| self.spawn(f)) .collect::>(); for task in spawned { task.await; } } pub fn timer(&self, duration: Duration) -> Timer { match self { Background::Production { .. } => Timer::Production(smol::Timer::after(duration)), #[cfg(any(test, feature = "test-support"))] Background::Deterministic { executor } => executor.timer(duration), } } pub fn now(&self) -> std::time::Instant { match self { Background::Production { .. } => std::time::Instant::now(), #[cfg(any(test, feature = "test-support"))] Background::Deterministic { executor } => executor.now(), } } #[cfg(any(test, feature = "test-support"))] pub fn rng<'a>(&'a self) -> impl 'a + std::ops::DerefMut { match self { Self::Deterministic { executor, .. } => { parking_lot::lock_api::MutexGuard::map(executor.state.lock(), |s| &mut s.rng) } _ => panic!("this method can only be called on a deterministic executor"), } } #[cfg(any(test, feature = "test-support"))] pub async fn simulate_random_delay(&self) { match self { Self::Deterministic { executor, .. } => { executor.simulate_random_delay().await; } _ => { panic!("this method can only be called on a deterministic executor") } } } #[cfg(any(test, feature = "test-support"))] pub fn record_backtrace(&self) { match self { Self::Deterministic { executor, .. } => executor.record_backtrace(), _ => { panic!("this method can only be called on a deterministic executor") } } } #[cfg(any(test, feature = "test-support"))] pub fn start_waiting(&self) { match self { Self::Deterministic { executor, .. } => executor.start_waiting(), _ => panic!("this method can only be called on a deterministic executor"), } } } impl Default for Background { fn default() -> Self { Self::new() } } pub struct Scope<'a> { executor: Arc, futures: Vec + Send + 'static>>>, tx: Option>, rx: mpsc::Receiver<()>, _phantom: PhantomData<&'a ()>, } impl<'a> Scope<'a> { fn new(executor: Arc) -> Self { let (tx, rx) = mpsc::channel(1); Self { executor, tx: Some(tx), rx, futures: Default::default(), _phantom: PhantomData, } } pub fn spawn(&mut self, f: F) where F: Future + Send + 'a, { let tx = self.tx.clone().unwrap(); // Safety: The 'a lifetime is guaranteed to outlive any of these futures because // dropping this `Scope` blocks until all of the futures have resolved. let f = unsafe { mem::transmute::< Pin + Send + 'a>>, Pin + Send + 'static>>, >(Box::pin(async move { f.await; drop(tx); })) }; self.futures.push(f); } } impl<'a> Drop for Scope<'a> { fn drop(&mut self) { self.tx.take().unwrap(); // Wait until the channel is closed, which means that all of the spawned // futures have resolved. self.executor.block(self.rx.next()); } } impl Task { pub fn ready(value: T) -> Self { Self::Ready(Some(value)) } fn local(any_task: AnyLocalTask) -> Self { Self::Local { any_task, result_type: PhantomData, } } pub fn detach(self) { match self { Task::Ready(_) => {} Task::Local { any_task, .. } => any_task.detach(), Task::Send { any_task, .. } => any_task.detach(), } } } impl Task> { #[track_caller] pub fn detach_and_log_err(self, cx: &mut AppContext) { let caller = Location::caller(); cx.spawn(|_| async move { if let Err(err) = self.await { log::error!("{}:{}: {:#}", caller.file(), caller.line(), err); } }) .detach(); } } impl Task { fn send(any_task: AnyTask) -> Self { Self::Send { any_task, result_type: PhantomData, } } } impl fmt::Debug for Task { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Task::Ready(value) => value.fmt(f), Task::Local { any_task, .. } => any_task.fmt(f), Task::Send { any_task, .. } => any_task.fmt(f), } } } impl Future for Task { type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { match unsafe { self.get_unchecked_mut() } { Task::Ready(value) => Poll::Ready(value.take().unwrap()), Task::Local { any_task, .. } => { any_task.poll(cx).map(|value| *value.downcast().unwrap()) } Task::Send { any_task, .. } => { any_task.poll(cx).map(|value| *value.downcast().unwrap()) } } } } fn any_future(future: F) -> AnyFuture where T: 'static + Send, F: Future + Send + 'static, { async { Box::new(future.await) as Box }.boxed() } fn any_local_future(future: F) -> AnyLocalFuture where T: 'static, F: Future + 'static, { async { Box::new(future.await) as Box }.boxed_local() }