diff --git a/crates/client2/src/client2.rs b/crates/client2/src/client2.rs index e39058fe0d..474f6986dd 100644 --- a/crates/client2/src/client2.rs +++ b/crates/client2/src/client2.rs @@ -11,12 +11,11 @@ use async_tungstenite::tungstenite::{ http::{Request, StatusCode}, }; use futures::{ - future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _, - TryStreamExt, + future::BoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _, TryStreamExt, }; use gpui2::{ - serde_json, AnyHandle, AnyWeakHandle, AnyWindowHandle, AppContext, AsyncAppContext, - AsyncWindowContext, Handle, SemanticVersion, Task, ViewContext, WeakHandle, WindowId, + serde_json, AnyHandle, AnyWeakHandle, AnyWindowHandle, AppContext, AsyncAppContext, Handle, + SemanticVersion, Task, ViewContext, }; use lazy_static::lazy_static; use parking_lot::RwLock; @@ -240,7 +239,7 @@ struct ClientState { Box, &Arc, AsyncAppContext, - ) -> LocalBoxFuture<'static, Result<()>>, + ) -> BoxFuture<'static, Result<()>>, >, >, } @@ -248,7 +247,7 @@ struct ClientState { enum WeakSubscriber { Entity { handle: AnyWeakHandle, - window_id: Option, + window_handle: Option, }, Pending(Vec>), } @@ -336,7 +335,7 @@ where id, WeakSubscriber::Entity { handle: model.downgrade().into(), - window_id: None, + window_handle: None, }, ); drop(state); @@ -511,7 +510,7 @@ impl Client { }, &cx, ); - cx.background().timer(delay).await; + cx.executor().timer(delay).await; delay = delay .mul_f32(rng.gen_range(1.0..=2.0)) .min(reconnect_interval); @@ -522,7 +521,7 @@ impl Client { })); } Status::SignedOut | Status::UpgradeRequired => { - cx.read(|cx| self.telemetry.set_authenticated_user_info(None, false, cx)); + cx.update(|cx| self.telemetry.set_authenticated_user_info(None, false, cx)); state._reconnect_task.take(); } _ => {} @@ -533,13 +532,16 @@ impl Client { self: &Arc, remote_id: u64, cx: &mut ViewContext, - ) -> Subscription { + ) -> Subscription + where + T: 'static + Send + Sync, + { let id = (TypeId::of::(), remote_id); self.state.write().entities_by_type_and_remote_id.insert( id, WeakSubscriber::Entity { - handle: cx.handle().into_any(), - window_id: Some(cx.window_id()), + handle: cx.handle().into(), + window_handle: Some(cx.window_handle()), }, ); Subscription::Entity { @@ -573,33 +575,28 @@ impl Client { #[track_caller] pub fn add_message_handler( self: &Arc, - model: Handle, + entity: Handle, handler: H, ) -> Subscription where M: EnvelopedMessage, E: 'static + Send + Sync, H: 'static + Send + Sync + Fn(Handle, TypedEnvelope, Arc, AsyncAppContext) -> F, - F: 'static + Future>, + F: 'static + Future> + Send, { let message_type_id = TypeId::of::(); let mut state = self.state.write(); state .models_by_message_type - .insert(message_type_id, model.downgrade().into_any()); + .insert(message_type_id, entity.downgrade().into()); let prev_handler = state.message_handlers.insert( message_type_id, - Arc::new(move |handle, envelope, client, cx| { - let handle = if let Subscriber::Model(handle) = handle { - handle - } else { - unreachable!(); - }; - let model = handle.downcast::().unwrap(); + Arc::new(move |subscriber, envelope, client, cx| { + let subscriber = subscriber.handle.downcast::().unwrap(); let envelope = envelope.into_any().downcast::>().unwrap(); - handler(model, *envelope, client.clone(), cx).boxed_local() + handler(subscriber, *envelope, client.clone(), cx).boxed() }), ); if prev_handler.is_some() { @@ -627,7 +624,7 @@ impl Client { M: RequestMessage, E: 'static + Send + Sync, H: 'static + Send + Sync + Fn(Handle, TypedEnvelope, Arc, AsyncAppContext) -> F, - F: 'static + Future>, + F: 'static + Future> + Send, { self.add_message_handler(model, move |handle, envelope, this, cx| { Self::respond_to_request( @@ -638,37 +635,12 @@ impl Client { }) } - pub fn add_view_message_handler(self: &Arc, handler: H) - where - M: EntityMessage, - H: 'static - + Send - + Sync - + Fn(WeakHandle, TypedEnvelope, Arc, AsyncWindowContext) -> F, - F: 'static + Future>, - { - self.add_entity_message_handler::(move |subscriber, message, client, cx| { - if let Some(window_handle) = subscriber.window_handle { - cx.update_window(subscriber, |cx| { - handler( - subscriber.handle.downcast::().unwrap(), - message, - client, - cx, - ) - }) - } else { - panic!() - } - }) - } - pub fn add_model_message_handler(self: &Arc, handler: H) where M: EntityMessage, E: 'static + Send + Sync, H: 'static + Send + Sync + Fn(Handle, TypedEnvelope, Arc, AsyncAppContext) -> F, - F: 'static + Future>, + F: 'static + Future> + Send, { self.add_entity_message_handler::(move |subscriber, message, client, cx| { handler( @@ -687,7 +659,7 @@ impl Client { + Send + Sync + Fn(Subscriber, TypedEnvelope, Arc, AsyncAppContext) -> F, - F: 'static + Future>, + F: 'static + Future> + Send, { let model_type_id = TypeId::of::(); let message_type_id = TypeId::of::(); @@ -713,7 +685,7 @@ impl Client { message_type_id, Arc::new(move |handle, envelope, client, cx| { let envelope = envelope.into_any().downcast::>().unwrap(); - handler(handle, *envelope, client.clone(), cx).boxed_local() + handler(handle, *envelope, client.clone(), cx).boxed() }), ); if prev_handler.is_some() { @@ -726,7 +698,7 @@ impl Client { M: EntityMessage + RequestMessage, E: 'static + Send + Sync, H: 'static + Send + Sync + Fn(Handle, TypedEnvelope, Arc, AsyncAppContext) -> F, - F: 'static + Future>, + F: 'static + Future> + Send, { self.add_model_message_handler(move |entity, envelope, client, cx| { Self::respond_to_request::( @@ -737,25 +709,6 @@ impl Client { }) } - pub fn add_view_request_handler(self: &Arc, handler: H) - where - M: EntityMessage + RequestMessage, - E: 'static + Send + Sync, - H: 'static - + Send - + Sync - + Fn(Handle, TypedEnvelope, Arc, AsyncWindowContext) -> F, - F: 'static + Future>, - { - self.add_view_message_handler(move |entity, envelope, client, cx| { - Self::respond_to_request::( - envelope.receipt(), - handler(entity, envelope, client.clone(), cx), - client, - ) - }) - } - async fn respond_to_request>>( receipt: Receipt, response: F, @@ -778,11 +731,11 @@ impl Client { } } - pub fn has_keychain_credentials(&self, cx: &AsyncAppContext) -> bool { - read_credentials_from_keychain(cx).is_some() + pub async fn has_keychain_credentials(&self, cx: &AsyncAppContext) -> bool { + read_credentials_from_keychain(cx).await.is_some() } - #[async_recursion(?Send)] + #[async_recursion] pub async fn authenticate_and_connect( self: &Arc, try_keychain: bool, @@ -840,7 +793,7 @@ impl Client { self.set_status(Status::Reconnecting, cx); } - let mut timeout = futures::FutureExt::fuse(cx.executor()?.timer(CONNECTION_TIMEOUT)); + let mut timeout = futures::FutureExt::fuse(cx.executor().timer(CONNECTION_TIMEOUT)); futures::select_biased! { connection = self.establish_connection(&credentials, cx).fuse() => { match connection { @@ -891,7 +844,7 @@ impl Client { conn: Connection, cx: &AsyncAppContext, ) -> Result<()> { - let executor = cx.executor()?; + let executor = cx.executor(); log::info!("add connection to peer"); let (connection_id, handle_io, mut incoming) = self.peer.add_connection(conn, { let executor = executor.clone(); @@ -955,7 +908,7 @@ impl Client { } } } - })? + }) .detach(); cx.spawn({ @@ -978,7 +931,8 @@ impl Client { } } } - })?; + }) + .detach(); Ok(()) } @@ -1042,13 +996,8 @@ impl Client { credentials: &Credentials, cx: &AsyncAppContext, ) -> Task> { - let executor = match cx.executor() { - Ok(executor) => executor, - Err(error) => return Task::ready(Err(error)), - }; - let use_preview_server = cx - .try_read_global(|channel: &ReleaseChannel, _| channel != ReleaseChannel::Stable) + .try_read_global(|channel: &ReleaseChannel, _| *channel != ReleaseChannel::Stable) .unwrap_or(false); let request = Request::builder() @@ -1059,7 +1008,7 @@ impl Client { .header("x-zed-protocol-version", rpc::PROTOCOL_VERSION); let http = self.http.clone(); - executor.spawn(async move { + cx.executor().spawn(async move { let mut rpc_url = Self::get_rpc_url(http, use_preview_server).await?; let rpc_host = rpc_url .host_str() @@ -1130,15 +1079,15 @@ impl Client { write!(&mut url, "&impersonate={}", impersonate_login).unwrap(); } - platform.open_url(&url); + cx.run_on_main(|cx| cx.open_url(&url))?.await; // Receive the HTTP request from the user's browser. Retrieve the user id and encrypted // access token from the query params. // // TODO - Avoid ever starting more than one HTTP server. Maybe switch to using a // custom URL scheme instead of this local HTTP server. - let (user_id, access_token) = executor - .spawn(async move { + let (user_id, access_token) = cx + .spawn(|_| async move { for _ in 0..100 { if let Some(req) = server.recv_timeout(Duration::from_secs(1))? { let path = req.url(); @@ -1181,14 +1130,13 @@ impl Client { let access_token = private_key .decrypt_string(&access_token) .context("failed to decrypt access token")?; - platform.activate(true); + cx.run_on_main(|cx| cx.activate(true))?.await; Ok(Credentials { user_id: user_id.parse()?, access_token, }) }) - .unwrap_or_else(|error| Task::ready(Err(error))) } async fn authenticate_as_admin( @@ -1344,12 +1292,16 @@ impl Client { return; } Some(weak_subscriber @ _) => match weak_subscriber { - WeakSubscriber::Model(handle) => { - subscriber = handle.upgrade(cx).map(Subscriber::Model); - } - WeakSubscriber::View(handle) => { - subscriber = Some(Subscriber::View(handle.clone())); + WeakSubscriber::Entity { + handle, + window_handle, + } => { + subscriber = handle.upgrade().map(|handle| Subscriber { + handle, + window_handle: window_handle.clone(), + }); } + WeakSubscriber::Pending(_) => {} }, _ => {} @@ -1379,8 +1331,7 @@ impl Client { sender_id, type_name ); - cx.foreground() - .spawn(async move { + cx.spawn_on_main(|_| async move { match future.await { Ok(()) => { log::debug!( diff --git a/crates/client2/src/telemetry.rs b/crates/client2/src/telemetry.rs index d1489fce8e..2c29e1231f 100644 --- a/crates/client2/src/telemetry.rs +++ b/crates/client2/src/telemetry.rs @@ -1,5 +1,5 @@ use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL}; -use gpui2::{serde_json, AppContext, Executor, Task}; +use gpui2::{serde_json, AppContext, AppMetadata, Executor, Task}; use lazy_static::lazy_static; use parking_lot::Mutex; use serde::Serialize; @@ -15,15 +15,12 @@ pub struct Telemetry { state: Mutex, } -#[derive(Default)] struct TelemetryState { metrics_id: Option>, // Per logged-in user installation_id: Option>, // Per app installation (different for dev, preview, and stable) session_id: Option>, // Per app launch - app_version: Option>, release_channel: Option<&'static str>, - os_name: &'static str, - os_version: Option>, + app_metadata: AppMetadata, architecture: &'static str, clickhouse_events_queue: Vec, flush_clickhouse_events_task: Option>, @@ -115,7 +112,6 @@ const DEBOUNCE_INTERVAL: Duration = Duration::from_secs(30); impl Telemetry { pub fn new(client: Arc, cx: &AppContext) -> Arc { - let platform = cx.platform(); let release_channel = if cx.has_global::() { Some(cx.global::().display_name()) } else { @@ -124,12 +120,10 @@ impl Telemetry { // TODO: Replace all hardware stuff with nested SystemSpecs json let this = Arc::new(Self { http_client: client, - executor: cx.background().clone(), + executor: cx.executor().clone(), state: Mutex::new(TelemetryState { - os_name: platform.os_name().into(), - os_version: platform.os_version().ok().map(|v| v.to_string().into()), + app_metadata: cx.app_metadata(), architecture: env::consts::ARCH, - app_version: platform.app_version().ok().map(|v| v.to_string().into()), release_channel, installation_id: None, metrics_id: None, @@ -196,7 +190,7 @@ impl Telemetry { core_count: system.cpus().len() as u32, }; - let telemetry_settings = cx.update(|cx| *settings::get::(cx)); + let telemetry_settings = cx.update(|cx| *settings2::get::(cx)); this.report_clickhouse_event(memory_event, telemetry_settings); this.report_clickhouse_event(cpu_event, telemetry_settings); @@ -211,7 +205,7 @@ impl Telemetry { is_staff: bool, cx: &AppContext, ) { - if !settings::get::(cx).metrics { + if !settings2::get::(cx).metrics { return; } diff --git a/crates/gpui2/src/app.rs b/crates/gpui2/src/app.rs index 869e0f3109..12f9aa1913 100644 --- a/crates/gpui2/src/app.rs +++ b/crates/gpui2/src/app.rs @@ -9,9 +9,9 @@ use refineable::Refineable; use smallvec::SmallVec; use crate::{ - current_platform, image_cache::ImageCache, Action, AssetSource, Context, DispatchPhase, - DisplayId, Executor, FocusEvent, FocusHandle, FocusId, KeyBinding, Keymap, LayoutId, - MainThread, MainThreadOnly, Platform, SemanticVersion, SharedString, SubscriberSet, + current_platform, image_cache::ImageCache, Action, AppMetadata, AssetSource, Context, + DispatchPhase, DisplayId, Executor, FocusEvent, FocusHandle, FocusId, KeyBinding, Keymap, + LayoutId, MainThread, MainThreadOnly, Platform, SemanticVersion, SharedString, SubscriberSet, SvgRenderer, Task, TextStyle, TextStyleRefinement, TextSystem, View, Window, WindowContext, WindowHandle, WindowId, }; @@ -49,13 +49,25 @@ impl App { http_client: Arc, ) -> Self { let executor = platform.executor(); + assert!( + executor.is_main_thread(), + "must construct App on main thread" + ); + + let text_system = Arc::new(TextSystem::new(platform.text_system())); let entities = EntityMap::new(); let unit_entity = entities.insert(entities.reserve(), ()); + let app_metadata = AppMetadata { + os_name: platform.os_name(), + os_version: platform.os_version().unwrap_or_default(), + app_version: platform.app_version().unwrap_or_default(), + }; Self(Arc::new_cyclic(|this| { Mutex::new(AppContext { this: this.clone(), - text_system: Arc::new(TextSystem::new(platform.text_system())), + text_system, platform: MainThreadOnly::new(platform, executor.clone()), + app_metadata, flushing_effects: false, pending_updates: 0, next_frame_callbacks: Default::default(), @@ -128,16 +140,8 @@ impl App { self } - pub fn app_version(&self) -> Result { - self.0.lock().platform.borrow_on_main_thread().app_version() - } - - pub fn os_name(&self) -> &'static str { - self.0.lock().platform.borrow_on_main_thread().os_name() - } - - pub fn os_version(&self) -> Result { - self.0.lock().platform.borrow_on_main_thread().os_version() + pub fn metadata(&self) -> AppMetadata { + self.0.lock().app_metadata.clone() } pub fn executor(&self) -> Executor { @@ -158,6 +162,7 @@ type ActionBuilder = fn(json: Option) -> anyhow::Result>, pub(crate) platform: MainThreadOnly, + app_metadata: AppMetadata, text_system: Arc, flushing_effects: bool, pending_updates: usize, @@ -184,6 +189,10 @@ pub struct AppContext { } impl AppContext { + pub fn app_metadata(&self) -> AppMetadata { + self.app_metadata.clone() + } + pub fn refresh(&mut self) { self.push_effect(Effect::Refresh); } @@ -379,7 +388,10 @@ impl AppContext { } pub fn to_async(&self) -> AsyncAppContext { - AsyncAppContext(unsafe { mem::transmute(self.this.clone()) }) + AsyncAppContext { + app: unsafe { mem::transmute(self.this.clone()) }, + executor: self.executor.clone(), + } } pub fn executor(&self) -> &Executor { @@ -639,7 +651,7 @@ impl MainThread { self.platform.borrow_on_main_thread() } - pub fn activate(&mut self, ignoring_other_apps: bool) { + pub fn activate(&self, ignoring_other_apps: bool) { self.platform().activate(ignoring_other_apps); } @@ -655,6 +667,10 @@ impl MainThread { self.platform().delete_credentials(url) } + pub fn open_url(&self, url: &str) { + self.platform().open_url(url); + } + pub fn open_window( &mut self, options: crate::WindowOptions, diff --git a/crates/gpui2/src/app/async_context.rs b/crates/gpui2/src/app/async_context.rs index e0da6a63da..0fea00ed1f 100644 --- a/crates/gpui2/src/app/async_context.rs +++ b/crates/gpui2/src/app/async_context.rs @@ -8,7 +8,10 @@ use parking_lot::Mutex; use std::{future::Future, sync::Weak}; #[derive(Clone)] -pub struct AsyncAppContext(pub(crate) Weak>); +pub struct AsyncAppContext { + pub(crate) app: Weak>, + pub(crate) executor: Executor, +} impl Context for AsyncAppContext { type EntityContext<'a, 'w, T: 'static + Send + Sync> = ModelContext<'a, T>; @@ -19,7 +22,7 @@ impl Context for AsyncAppContext { build_entity: impl FnOnce(&mut Self::EntityContext<'_, '_, T>) -> T, ) -> Self::Result> { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; let mut lock = app.lock(); // Need this to compile @@ -32,7 +35,7 @@ impl Context for AsyncAppContext { update: impl FnOnce(&mut T, &mut Self::EntityContext<'_, '_, T>) -> R, ) -> Self::Result { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; let mut lock = app.lock(); // Need this to compile @@ -43,7 +46,7 @@ impl Context for AsyncAppContext { impl AsyncAppContext { pub fn refresh(&mut self) -> Result<()> { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; let mut lock = app.lock(); // Need this to compile @@ -51,13 +54,17 @@ impl AsyncAppContext { Ok(()) } - pub fn executor(&self) -> Result { + pub fn executor(&self) -> &Executor { + &self.executor + } + + pub fn update(&self, f: impl FnOnce(&mut AppContext) -> R) -> Result { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; - let lock = app.lock(); // Need this to compile - Ok(lock.executor().clone()) + let mut lock = app.lock(); + Ok(f(&mut *lock)) } pub fn read_window( @@ -66,7 +73,7 @@ impl AsyncAppContext { update: impl FnOnce(&WindowContext) -> R, ) -> Result { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; let mut app_context = app.lock(); @@ -79,27 +86,32 @@ impl AsyncAppContext { update: impl FnOnce(&mut WindowContext) -> R, ) -> Result { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; let mut app_context = app.lock(); app_context.update_window(handle.id, update) } - pub fn spawn( - &self, - f: impl FnOnce(AsyncAppContext) -> Fut + Send + 'static, - ) -> Result> + pub fn spawn(&self, f: impl FnOnce(AsyncAppContext) -> Fut + Send + 'static) -> Task where Fut: Future + Send + 'static, R: Send + 'static, { - let app = self - .0 - .upgrade() - .ok_or_else(|| anyhow!("app was released"))?; - let app_context = app.lock(); - Ok(app_context.spawn(f)) + let this = self.clone(); + self.executor.spawn(async move { f(this).await }) + } + + pub fn spawn_on_main( + &self, + f: impl FnOnce(AsyncAppContext) -> Fut + Send + 'static, + ) -> Task + where + Fut: Future + 'static, + R: Send + 'static, + { + let this = self.clone(); + self.executor.spawn_on_main(|| f(this)) } pub fn run_on_main( @@ -110,7 +122,7 @@ impl AsyncAppContext { R: Send + 'static, { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; let mut app_context = app.lock(); @@ -119,7 +131,7 @@ impl AsyncAppContext { pub fn has_global(&self) -> Result { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; let lock = app.lock(); // Need this to compile @@ -131,7 +143,7 @@ impl AsyncAppContext { read: impl FnOnce(&G, &AppContext) -> R, ) -> Result { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; let lock = app.lock(); // Need this to compile @@ -142,7 +154,7 @@ impl AsyncAppContext { &self, read: impl FnOnce(&G, &AppContext) -> R, ) -> Option { - let app = self.0.upgrade()?; + let app = self.app.upgrade()?; let lock = app.lock(); // Need this to compile Some(read(lock.try_global()?, &lock)) } @@ -152,7 +164,7 @@ impl AsyncAppContext { update: impl FnOnce(&mut G, &mut AppContext) -> R, ) -> Result { let app = self - .0 + .app .upgrade() .ok_or_else(|| anyhow!("app was released"))?; let mut lock = app.lock(); // Need this to compile diff --git a/crates/gpui2/src/platform.rs b/crates/gpui2/src/platform.rs index 5bb59acae0..6e2243ba40 100644 --- a/crates/gpui2/src/platform.rs +++ b/crates/gpui2/src/platform.rs @@ -181,6 +181,13 @@ pub trait PlatformTextSystem: Send + Sync { ) -> Vec; } +#[derive(Clone, Debug)] +pub struct AppMetadata { + pub os_name: &'static str, + pub os_version: SemanticVersion, + pub app_version: SemanticVersion, +} + #[derive(PartialEq, Eq, Hash, Clone)] pub enum AtlasKey { Glyph(RenderGlyphParams), @@ -404,7 +411,7 @@ impl Default for CursorStyle { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)] +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)] pub struct SemanticVersion { major: usize, minor: usize,