2024-01-03 20:59:39 +00:00
|
|
|
use crate::{AppContext, PlatformDispatcher};
|
|
|
|
use futures::{channel::mpsc, pin_mut, FutureExt};
|
|
|
|
use smol::prelude::*;
|
2021-07-09 13:00:51 +00:00
|
|
|
use std::{
|
2024-01-03 20:59:39 +00:00
|
|
|
fmt::Debug,
|
2021-07-09 13:00:51 +00:00
|
|
|
marker::PhantomData,
|
|
|
|
mem,
|
2024-01-03 20:59:39 +00:00
|
|
|
num::NonZeroUsize,
|
2021-07-09 13:00:51 +00:00
|
|
|
pin::Pin,
|
|
|
|
rc::Rc,
|
2024-01-03 20:59:39 +00:00
|
|
|
sync::{
|
|
|
|
atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst},
|
|
|
|
Arc,
|
|
|
|
},
|
2021-07-20 17:20:50 +00:00
|
|
|
task::{Context, Poll},
|
2023-06-20 19:36:36 +00:00
|
|
|
time::Duration,
|
2021-07-09 13:00:51 +00:00
|
|
|
};
|
2024-01-03 20:59:39 +00:00
|
|
|
use util::TryFutureExt;
|
|
|
|
use waker_fn::waker_fn;
|
2021-02-20 23:05:36 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
|
|
|
use rand::rngs::StdRng;
|
2021-02-20 23:05:36 +00:00
|
|
|
|
2024-01-21 22:26:45 +00:00
|
|
|
/// A pointer to the executor that is currently running,
|
|
|
|
/// for spawning background tasks.
|
2024-01-03 20:59:39 +00:00
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct BackgroundExecutor {
|
|
|
|
dispatcher: Arc<dyn PlatformDispatcher>,
|
2021-02-20 23:05:36 +00:00
|
|
|
}
|
|
|
|
|
2024-01-21 22:26:45 +00:00
|
|
|
/// A pointer to the executor that is currently running,
|
|
|
|
/// for spawning tasks on the main thread.
|
2024-01-03 20:59:39 +00:00
|
|
|
#[derive(Clone)]
|
|
|
|
pub struct ForegroundExecutor {
|
|
|
|
dispatcher: Arc<dyn PlatformDispatcher>,
|
|
|
|
not_send: PhantomData<Rc<()>>,
|
2021-07-08 19:03:00 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// Task is a primitive that allows work to happen in the background.
|
2024-01-09 19:48:48 +00:00
|
|
|
///
|
|
|
|
/// It implements [`Future`] so you can `.await` on it.
|
|
|
|
///
|
|
|
|
/// If you drop a task it will be cancelled immediately. Calling [`Task::detach`] allows
|
2024-01-21 22:26:45 +00:00
|
|
|
/// the task to continue running, but with no way to return a value.
|
2021-10-21 14:26:37 +00:00
|
|
|
#[must_use]
|
2024-01-03 20:59:39 +00:00
|
|
|
#[derive(Debug)]
|
2021-10-01 18:13:17 +00:00
|
|
|
pub enum Task<T> {
|
2024-01-21 22:26:45 +00:00
|
|
|
/// A task that is ready to return a value
|
2021-12-20 19:36:59 +00:00
|
|
|
Ready(Option<T>),
|
2024-01-21 22:26:45 +00:00
|
|
|
|
|
|
|
/// A task that is currently running.
|
2024-01-03 20:59:39 +00:00
|
|
|
Spawned(async_task::Task<T>),
|
2021-10-01 18:13:17 +00:00
|
|
|
}
|
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
impl<T> Task<T> {
|
2024-01-19 16:18:50 +00:00
|
|
|
/// Creates a new task that will resolve with the value
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn ready(val: T) -> Self {
|
|
|
|
Task::Ready(Some(val))
|
|
|
|
}
|
2022-01-24 17:45:14 +00:00
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// Detaching a task runs it to completion in the background
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn detach(self) {
|
|
|
|
match self {
|
|
|
|
Task::Ready(_) => {}
|
|
|
|
Task::Spawned(task) => task.detach(),
|
|
|
|
}
|
|
|
|
}
|
2022-11-28 18:01:28 +00:00
|
|
|
}
|
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
impl<E, T> Task<Result<T, E>>
|
|
|
|
where
|
|
|
|
T: 'static,
|
|
|
|
E: 'static + Debug,
|
|
|
|
{
|
2024-01-09 18:02:57 +00:00
|
|
|
/// Run the task to completion in the background and log any
|
|
|
|
/// errors that occur.
|
2024-01-03 20:59:39 +00:00
|
|
|
#[track_caller]
|
2024-01-17 04:43:44 +00:00
|
|
|
pub fn detach_and_log_err(self, cx: &AppContext) {
|
2024-01-03 20:59:39 +00:00
|
|
|
let location = core::panic::Location::caller();
|
|
|
|
cx.foreground_executor()
|
|
|
|
.spawn(self.log_tracked_err(*location))
|
|
|
|
.detach();
|
|
|
|
}
|
2021-07-20 18:22:02 +00:00
|
|
|
}
|
2021-07-08 19:03:00 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
impl<T> Future for Task<T> {
|
|
|
|
type Output = T;
|
2022-03-05 00:54:12 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
|
|
|
match unsafe { self.get_unchecked_mut() } {
|
|
|
|
Task::Ready(val) => Poll::Ready(val.take().unwrap()),
|
|
|
|
Task::Spawned(task) => task.poll(cx),
|
|
|
|
}
|
|
|
|
}
|
2022-03-05 00:54:12 +00:00
|
|
|
}
|
|
|
|
|
2024-01-21 22:26:45 +00:00
|
|
|
/// A task label is an opaque identifier that you can use to
|
|
|
|
/// refer to a task in tests.
|
2024-01-03 20:59:39 +00:00
|
|
|
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
|
|
|
pub struct TaskLabel(NonZeroUsize);
|
2022-03-01 16:01:52 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
impl Default for TaskLabel {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self::new()
|
2021-07-08 19:03:00 +00:00
|
|
|
}
|
2024-01-03 20:59:39 +00:00
|
|
|
}
|
2021-07-08 19:03:00 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
impl TaskLabel {
|
2024-01-21 22:26:45 +00:00
|
|
|
/// Construct a new task label.
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn new() -> Self {
|
|
|
|
static NEXT_TASK_LABEL: AtomicUsize = AtomicUsize::new(1);
|
|
|
|
Self(NEXT_TASK_LABEL.fetch_add(1, SeqCst).try_into().unwrap())
|
2022-11-28 18:01:28 +00:00
|
|
|
}
|
2024-01-03 20:59:39 +00:00
|
|
|
}
|
2022-11-28 18:01:28 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
type AnyLocalFuture<R> = Pin<Box<dyn 'static + Future<Output = R>>>;
|
2022-12-24 01:34:13 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
type AnyFuture<R> = Pin<Box<dyn 'static + Send + Future<Output = R>>>;
|
2022-11-28 18:35:33 +00:00
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// BackgroundExecutor lets you run things on background threads.
|
|
|
|
/// In production this is a thread pool with no ordering guarantees.
|
2024-01-17 18:23:46 +00:00
|
|
|
/// In tests this is simulated by running tasks one by one in a deterministic
|
2024-01-09 18:02:57 +00:00
|
|
|
/// (but arbitrary) order controlled by the `SEED` environment variable.
|
2024-01-03 20:59:39 +00:00
|
|
|
impl BackgroundExecutor {
|
2024-01-17 18:23:46 +00:00
|
|
|
#[doc(hidden)]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
|
|
|
|
Self { dispatcher }
|
2022-11-28 18:01:28 +00:00
|
|
|
}
|
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
/// Enqueues the given future to be run to completion on a background thread.
|
|
|
|
pub fn spawn<R>(&self, future: impl Future<Output = R> + Send + 'static) -> Task<R>
|
|
|
|
where
|
|
|
|
R: Send + 'static,
|
|
|
|
{
|
|
|
|
self.spawn_internal::<R>(Box::pin(future), None)
|
2022-01-24 17:45:14 +00:00
|
|
|
}
|
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
/// Enqueues the given future to be run to completion on a background thread.
|
|
|
|
/// The given label can be used to control the priority of the task in tests.
|
|
|
|
pub fn spawn_labeled<R>(
|
|
|
|
&self,
|
|
|
|
label: TaskLabel,
|
|
|
|
future: impl Future<Output = R> + Send + 'static,
|
|
|
|
) -> Task<R>
|
|
|
|
where
|
|
|
|
R: Send + 'static,
|
|
|
|
{
|
|
|
|
self.spawn_internal::<R>(Box::pin(future), Some(label))
|
2022-01-24 17:45:14 +00:00
|
|
|
}
|
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
fn spawn_internal<R: Send + 'static>(
|
2022-01-25 00:13:56 +00:00
|
|
|
&self,
|
2024-01-03 20:59:39 +00:00
|
|
|
future: AnyFuture<R>,
|
|
|
|
label: Option<TaskLabel>,
|
|
|
|
) -> Task<R> {
|
|
|
|
let dispatcher = self.dispatcher.clone();
|
|
|
|
let (runnable, task) =
|
|
|
|
async_task::spawn(future, move |runnable| dispatcher.dispatch(runnable, label));
|
2021-07-08 19:03:00 +00:00
|
|
|
runnable.schedule();
|
2024-01-03 20:59:39 +00:00
|
|
|
Task::Spawned(task)
|
2021-07-08 19:03:00 +00:00
|
|
|
}
|
|
|
|
|
2024-01-17 22:31:21 +00:00
|
|
|
/// Used by the test harness to run an async test in a synchronous fashion.
|
2024-01-03 20:59:39 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
|
|
|
#[track_caller]
|
|
|
|
pub fn block_test<R>(&self, future: impl Future<Output = R>) -> R {
|
|
|
|
if let Ok(value) = self.block_internal(false, future, usize::MAX) {
|
|
|
|
value
|
|
|
|
} else {
|
|
|
|
unreachable!()
|
2022-11-28 18:01:28 +00:00
|
|
|
}
|
2021-07-08 19:03:00 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// Block the current thread until the given future resolves.
|
|
|
|
/// Consider using `block_with_timeout` instead.
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn block<R>(&self, future: impl Future<Output = R>) -> R {
|
|
|
|
if let Ok(value) = self.block_internal(true, future, usize::MAX) {
|
|
|
|
value
|
|
|
|
} else {
|
|
|
|
unreachable!()
|
2021-09-08 18:24:27 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
#[track_caller]
|
|
|
|
pub(crate) fn block_internal<R>(
|
2021-10-01 17:07:27 +00:00
|
|
|
&self,
|
2024-01-03 20:59:39 +00:00
|
|
|
background_only: bool,
|
|
|
|
future: impl Future<Output = R>,
|
|
|
|
mut max_ticks: usize,
|
|
|
|
) -> Result<R, ()> {
|
|
|
|
pin_mut!(future);
|
|
|
|
let unparker = self.dispatcher.unparker();
|
|
|
|
let awoken = Arc::new(AtomicBool::new(false));
|
|
|
|
|
|
|
|
let waker = waker_fn({
|
|
|
|
let awoken = awoken.clone();
|
|
|
|
move || {
|
|
|
|
awoken.store(true, SeqCst);
|
|
|
|
unparker.unpark();
|
|
|
|
}
|
2021-09-08 18:24:27 +00:00
|
|
|
});
|
2024-01-03 20:59:39 +00:00
|
|
|
let mut cx = std::task::Context::from_waker(&waker);
|
2021-07-23 09:54:43 +00:00
|
|
|
|
|
|
|
loop {
|
2024-01-03 20:59:39 +00:00
|
|
|
match future.as_mut().poll(&mut cx) {
|
|
|
|
Poll::Ready(result) => return Ok(result),
|
|
|
|
Poll::Pending => {
|
|
|
|
if max_ticks == 0 {
|
|
|
|
return Err(());
|
2022-03-02 09:39:46 +00:00
|
|
|
}
|
2024-01-03 20:59:39 +00:00
|
|
|
max_ticks -= 1;
|
2022-03-02 09:39:46 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
if !self.dispatcher.tick(background_only) {
|
|
|
|
if awoken.swap(false, SeqCst) {
|
|
|
|
continue;
|
2022-01-24 17:45:14 +00:00
|
|
|
}
|
2021-07-08 19:03:00 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
|
|
|
if let Some(test) = self.dispatcher.as_test() {
|
|
|
|
if !test.parking_allowed() {
|
|
|
|
let mut backtrace_message = String::new();
|
|
|
|
if let Some(backtrace) = test.waiting_backtrace() {
|
|
|
|
backtrace_message =
|
|
|
|
format!("\nbacktrace of waiting future:\n{:?}", backtrace);
|
|
|
|
}
|
|
|
|
panic!("parked with nothing left to run\n{:?}", backtrace_message)
|
|
|
|
}
|
|
|
|
}
|
2022-03-07 23:33:39 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
self.dispatcher.park();
|
|
|
|
}
|
2022-03-07 23:33:39 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
2022-03-02 09:39:46 +00:00
|
|
|
}
|
2022-05-04 00:55:33 +00:00
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// Block the current thread until the given future resolves
|
|
|
|
/// or `duration` has elapsed.
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn block_with_timeout<R>(
|
|
|
|
&self,
|
|
|
|
duration: Duration,
|
|
|
|
future: impl Future<Output = R>,
|
|
|
|
) -> Result<R, impl Future<Output = R>> {
|
|
|
|
let mut future = Box::pin(future.fuse());
|
|
|
|
if duration.is_zero() {
|
|
|
|
return Err(future);
|
2022-12-24 01:34:13 +00:00
|
|
|
}
|
2021-02-20 23:05:36 +00:00
|
|
|
|
2022-03-05 00:54:12 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
let max_ticks = self
|
|
|
|
.dispatcher
|
|
|
|
.as_test()
|
|
|
|
.map_or(usize::MAX, |dispatcher| dispatcher.gen_block_on_ticks());
|
|
|
|
#[cfg(not(any(test, feature = "test-support")))]
|
|
|
|
let max_ticks = usize::MAX;
|
|
|
|
|
|
|
|
let mut timer = self.timer(duration).fuse();
|
|
|
|
|
|
|
|
let timeout = async {
|
|
|
|
futures::select_biased! {
|
|
|
|
value = future => Ok(value),
|
|
|
|
_ = timer => Err(()),
|
2022-03-05 00:54:12 +00:00
|
|
|
}
|
2024-01-03 20:59:39 +00:00
|
|
|
};
|
|
|
|
match self.block_internal(true, timeout, max_ticks) {
|
|
|
|
Ok(Ok(value)) => Ok(value),
|
|
|
|
_ => Err(future),
|
2022-12-24 01:34:13 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// Scoped lets you start a number of tasks and waits
|
|
|
|
/// for all of them to complete before returning.
|
2024-01-03 20:59:39 +00:00
|
|
|
pub async fn scoped<'scope, F>(&self, scheduler: F)
|
|
|
|
where
|
|
|
|
F: FnOnce(&mut Scope<'scope>),
|
|
|
|
{
|
|
|
|
let mut scope = Scope::new(self.clone());
|
|
|
|
(scheduler)(&mut scope);
|
|
|
|
let spawned = mem::take(&mut scope.futures)
|
|
|
|
.into_iter()
|
|
|
|
.map(|f| self.spawn(f))
|
|
|
|
.collect::<Vec<_>>();
|
|
|
|
for task in spawned {
|
|
|
|
task.await;
|
2021-02-20 23:05:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// Returns a task that will complete after the given duration.
|
|
|
|
/// Depending on other concurrent tasks the elapsed duration may be longer
|
2024-01-17 22:31:21 +00:00
|
|
|
/// than requested.
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn timer(&self, duration: Duration) -> Task<()> {
|
|
|
|
let (runnable, task) = async_task::spawn(async move {}, {
|
|
|
|
let dispatcher = self.dispatcher.clone();
|
|
|
|
move |runnable| dispatcher.dispatch_after(duration, runnable)
|
|
|
|
});
|
|
|
|
runnable.schedule();
|
|
|
|
Task::Spawned(task)
|
2021-02-20 23:05:36 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, start_waiting lets you indicate which task is waiting (for debugging only)
|
2022-03-01 16:01:52 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn start_waiting(&self) {
|
|
|
|
self.dispatcher.as_test().unwrap().start_waiting();
|
2021-02-20 23:05:36 +00:00
|
|
|
}
|
2021-07-11 09:26:06 +00:00
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, removes the debugging data added by start_waiting
|
2022-03-01 16:01:52 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn finish_waiting(&self) {
|
|
|
|
self.dispatcher.as_test().unwrap().finish_waiting();
|
2022-02-15 22:55:38 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, run an arbitrary number of tasks (determined by the SEED environment variable)
|
2022-03-01 16:01:52 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn simulate_random_delay(&self) -> impl Future<Output = ()> {
|
|
|
|
self.dispatcher.as_test().unwrap().simulate_random_delay()
|
2022-01-05 18:53:18 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, indicate that a given task from `spawn_labeled` should run after everything else
|
2022-03-01 16:01:52 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn deprioritize(&self, task_label: TaskLabel) {
|
|
|
|
self.dispatcher.as_test().unwrap().deprioritize(task_label)
|
2022-01-06 20:33:55 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, move time forward. This does not run any tasks, but does make `timer`s ready.
|
2022-03-01 16:01:52 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn advance_clock(&self, duration: Duration) {
|
|
|
|
self.dispatcher.as_test().unwrap().advance_clock(duration)
|
2022-01-04 22:29:22 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, run one task.
|
2022-03-01 16:01:52 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn tick(&self) -> bool {
|
|
|
|
self.dispatcher.as_test().unwrap().tick(false)
|
2023-05-11 21:40:35 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, run all tasks that are ready to run. If after doing so
|
|
|
|
/// the test still has outstanding tasks, this will panic. (See also `allow_parking`)
|
2023-05-11 21:40:35 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn run_until_parked(&self) {
|
|
|
|
self.dispatcher.as_test().unwrap().run_until_parked()
|
2021-07-20 22:05:28 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, prevents `run_until_parked` from panicking if there are outstanding tasks.
|
|
|
|
/// This is useful when you are integrating other (non-GPUI) futures, like disk access, that
|
|
|
|
/// do take real async time to run.
|
2022-03-01 16:01:52 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn allow_parking(&self) {
|
|
|
|
self.dispatcher.as_test().unwrap().allow_parking();
|
2021-09-08 16:58:59 +00:00
|
|
|
}
|
|
|
|
|
2024-03-06 23:35:22 +00:00
|
|
|
/// undoes the effect of [`allow_parking`].
|
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
|
|
|
pub fn forbid_parking(&self) {
|
|
|
|
self.dispatcher.as_test().unwrap().forbid_parking();
|
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, returns the rng used by the dispatcher and seeded by the `SEED` environment variable
|
2022-03-01 16:01:52 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn rng(&self) -> StdRng {
|
|
|
|
self.dispatcher.as_test().unwrap().rng()
|
2021-02-20 23:05:36 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// How many CPUs are available to the dispatcher
|
2021-07-13 16:13:25 +00:00
|
|
|
pub fn num_cpus(&self) -> usize {
|
|
|
|
num_cpus::get()
|
2021-07-09 13:00:51 +00:00
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// Whether we're on the main thread.
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn is_main_thread(&self) -> bool {
|
|
|
|
self.dispatcher.is_main_thread()
|
2021-02-20 23:05:36 +00:00
|
|
|
}
|
2021-07-09 13:00:51 +00:00
|
|
|
|
2024-01-03 20:59:39 +00:00
|
|
|
#[cfg(any(test, feature = "test-support"))]
|
2024-01-09 18:02:57 +00:00
|
|
|
/// in tests, control the number of ticks that `block_with_timeout` will run before timing out.
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn set_block_on_ticks(&self, range: std::ops::RangeInclusive<usize>) {
|
|
|
|
self.dispatcher.as_test().unwrap().set_block_on_ticks(range);
|
2022-02-04 14:47:01 +00:00
|
|
|
}
|
2024-01-03 20:59:39 +00:00
|
|
|
}
|
2022-02-04 14:47:01 +00:00
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// ForegroundExecutor runs things on the main thread.
|
2024-01-03 20:59:39 +00:00
|
|
|
impl ForegroundExecutor {
|
2024-01-21 22:26:45 +00:00
|
|
|
/// Creates a new ForegroundExecutor from the given PlatformDispatcher.
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn new(dispatcher: Arc<dyn PlatformDispatcher>) -> Self {
|
|
|
|
Self {
|
|
|
|
dispatcher,
|
|
|
|
not_send: PhantomData,
|
2021-07-20 17:20:50 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-09 18:02:57 +00:00
|
|
|
/// Enqueues the given Task to run on the main thread at some point in the future.
|
2024-01-03 20:59:39 +00:00
|
|
|
pub fn spawn<R>(&self, future: impl Future<Output = R> + 'static) -> Task<R>
|
2021-07-09 13:00:51 +00:00
|
|
|
where
|
2024-01-03 20:59:39 +00:00
|
|
|
R: 'static,
|
2021-07-09 13:00:51 +00:00
|
|
|
{
|
2024-01-03 20:59:39 +00:00
|
|
|
let dispatcher = self.dispatcher.clone();
|
|
|
|
fn inner<R: 'static>(
|
|
|
|
dispatcher: Arc<dyn PlatformDispatcher>,
|
|
|
|
future: AnyLocalFuture<R>,
|
|
|
|
) -> Task<R> {
|
|
|
|
let (runnable, task) = async_task::spawn_local(future, move |runnable| {
|
|
|
|
dispatcher.dispatch_on_main_thread(runnable)
|
|
|
|
});
|
|
|
|
runnable.schedule();
|
|
|
|
Task::Spawned(task)
|
|
|
|
}
|
|
|
|
inner::<R>(dispatcher, Box::pin(future))
|
2022-08-10 21:39:24 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-09 19:48:48 +00:00
|
|
|
/// Scope manages a set of tasks that are enqueued and waited on together. See [`BackgroundExecutor::scoped`].
|
2021-07-09 13:00:51 +00:00
|
|
|
pub struct Scope<'a> {
|
2024-01-03 20:59:39 +00:00
|
|
|
executor: BackgroundExecutor,
|
2021-07-09 13:00:51 +00:00
|
|
|
futures: Vec<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
|
2022-04-15 17:48:01 +00:00
|
|
|
tx: Option<mpsc::Sender<()>>,
|
|
|
|
rx: mpsc::Receiver<()>,
|
2024-01-03 20:59:39 +00:00
|
|
|
lifetime: PhantomData<&'a ()>,
|
2021-07-09 13:00:51 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
impl<'a> Scope<'a> {
|
2024-01-03 20:59:39 +00:00
|
|
|
fn new(executor: BackgroundExecutor) -> Self {
|
2022-04-15 20:25:21 +00:00
|
|
|
let (tx, rx) = mpsc::channel(1);
|
2022-04-15 17:48:01 +00:00
|
|
|
Self {
|
2022-04-15 20:25:21 +00:00
|
|
|
executor,
|
2022-04-15 17:48:01 +00:00
|
|
|
tx: Some(tx),
|
|
|
|
rx,
|
|
|
|
futures: Default::default(),
|
2024-01-03 20:59:39 +00:00
|
|
|
lifetime: PhantomData,
|
2022-04-15 17:48:01 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-21 22:26:45 +00:00
|
|
|
/// Spawn a future into this scope.
|
2021-07-09 13:00:51 +00:00
|
|
|
pub fn spawn<F>(&mut self, f: F)
|
|
|
|
where
|
|
|
|
F: Future<Output = ()> + Send + 'a,
|
|
|
|
{
|
2022-04-15 17:48:01 +00:00
|
|
|
let tx = self.tx.clone().unwrap();
|
|
|
|
|
2024-01-21 22:26:45 +00:00
|
|
|
// SAFETY: The 'a lifetime is guaranteed to outlive any of these futures because
|
2022-04-15 17:48:01 +00:00
|
|
|
// dropping this `Scope` blocks until all of the futures have resolved.
|
2021-07-09 13:00:51 +00:00
|
|
|
let f = unsafe {
|
|
|
|
mem::transmute::<
|
|
|
|
Pin<Box<dyn Future<Output = ()> + Send + 'a>>,
|
|
|
|
Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
|
2022-04-15 17:48:01 +00:00
|
|
|
>(Box::pin(async move {
|
|
|
|
f.await;
|
|
|
|
drop(tx);
|
|
|
|
}))
|
2021-07-09 13:00:51 +00:00
|
|
|
};
|
|
|
|
self.futures.push(f);
|
|
|
|
}
|
2021-02-20 23:05:36 +00:00
|
|
|
}
|
2021-07-08 19:03:00 +00:00
|
|
|
|
2022-04-15 17:48:01 +00:00
|
|
|
impl<'a> Drop for Scope<'a> {
|
|
|
|
fn drop(&mut self) {
|
|
|
|
self.tx.take().unwrap();
|
|
|
|
|
|
|
|
// Wait until the channel is closed, which means that all of the spawned
|
|
|
|
// futures have resolved.
|
2022-04-15 20:25:21 +00:00
|
|
|
self.executor.block(self.rx.next());
|
2022-04-15 17:48:01 +00:00
|
|
|
}
|
|
|
|
}
|