diff --git a/gpui/src/executor.rs b/gpui/src/executor.rs index 7a223a96d2..5849a86902 100644 --- a/gpui/src/executor.rs +++ b/gpui/src/executor.rs @@ -124,17 +124,39 @@ impl Deterministic { T: 'static, F: Future + 'static, { - smol::pin!(future); - - let unparker = self.parker.lock().unparker(); let woken = Arc::new(AtomicBool::new(false)); - let waker = { - let woken = woken.clone(); - waker_fn(move || { - woken.store(true, SeqCst); - unparker.unpark(); - }) - }; + let mut future = Box::pin(future); + loop { + if let Some(result) = self.run_internal(woken.clone(), &mut future) { + return result; + } + + if !woken.load(SeqCst) && self.state.lock().forbid_parking { + panic!("deterministic executor parked after a call to forbid_parking"); + } + + woken.store(false, SeqCst); + self.parker.lock().park(); + } + } + + fn run_until_parked(&self) { + let woken = Arc::new(AtomicBool::new(false)); + let future = std::future::pending::<()>(); + smol::pin!(future); + self.run_internal(woken, future); + } + + pub fn run_internal(&self, woken: Arc, mut future: F) -> Option + where + T: 'static, + F: Future + Unpin, + { + let unparker = self.parker.lock().unparker(); + let waker = waker_fn(move || { + woken.store(true, SeqCst); + unparker.unpark(); + }); let mut cx = Context::from_waker(&waker); let mut trace = Trace::default(); @@ -168,23 +190,17 @@ impl Deterministic { runnable.run(); } else { drop(state); - if let Poll::Ready(result) = future.as_mut().poll(&mut cx) { - return result; + if let Poll::Ready(result) = future.poll(&mut cx) { + return Some(result); } + let state = self.state.lock(); if state.scheduled_from_foreground.is_empty() && state.scheduled_from_background.is_empty() && state.spawned_from_foreground.is_empty() { - if state.forbid_parking && !woken.load(SeqCst) { - panic!("deterministic executor parked after a call to forbid_parking"); - } - drop(state); - woken.store(false, SeqCst); - self.parker.lock().park(); + return None; } - - continue; } } } @@ -432,10 +448,16 @@ impl Foreground { pub fn advance_clock(&self, duration: Duration) { match self { Self::Deterministic(executor) => { + executor.run_until_parked(); + let mut state = executor.state.lock(); state.now += duration; let now = state.now; - state.pending_sleeps.retain(|(wakeup, _)| *wakeup > now); + let mut pending_sleeps = mem::take(&mut state.pending_sleeps); + drop(state); + + pending_sleeps.retain(|(wakeup, _)| *wakeup > now); + executor.state.lock().pending_sleeps.extend(pending_sleeps); } _ => panic!("this method can only be called on a deterministic executor"), } diff --git a/zed/src/rpc.rs b/zed/src/rpc.rs index b36ec4d376..d0c04f5872 100644 --- a/zed/src/rpc.rs +++ b/zed/src/rpc.rs @@ -3,12 +3,10 @@ use anyhow::{anyhow, Context, Result}; use async_tungstenite::tungstenite::{ http::Request, Error as WebSocketError, Message as WebSocketMessage, }; -use futures::StreamExt as _; use gpui::{AsyncAppContext, Entity, ModelContext, Task}; use lazy_static::lazy_static; use parking_lot::RwLock; use postage::{prelude::Stream, watch}; -use smol::Timer; use std::{ any::TypeId, collections::HashMap, @@ -490,18 +488,18 @@ mod tests { use crate::test::FakeServer; use gpui::TestAppContext; - #[gpui::test(iterations = 1000)] + #[gpui::test(iterations = 10)] async fn test_heartbeat(cx: TestAppContext) { let user_id = 5; let client = Client::new(); - - client.state.write().heartbeat_interval = Duration::from_millis(1); let mut server = FakeServer::for_client(user_id, &client, &cx).await; + cx.foreground().advance_clock(Duration::from_secs(10)); let ping = server.receive::().await.unwrap(); assert_eq!(ping.payload.id, 0); server.respond(ping.receipt(), proto::Pong { id: 0 }).await; + cx.foreground().advance_clock(Duration::from_secs(10)); let ping = server.receive::().await.unwrap(); assert_eq!(ping.payload.id, 1); server.respond(ping.receipt(), proto::Pong { id: 1 }).await;