diff --git a/Cargo.lock b/Cargo.lock index b5f6452481..1391ead446 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1460,7 +1460,6 @@ dependencies = [ "db", "feature_flags", "futures 0.3.28", - "gpui", "gpui2", "image", "lazy_static", @@ -1473,6 +1472,7 @@ dependencies = [ "serde", "serde_derive", "settings", + "settings2", "smol", "sum_tree", "sysinfo", diff --git a/crates/client2/Cargo.toml b/crates/client2/Cargo.toml index 6d2d61ec13..79ab72c41c 100644 --- a/crates/client2/Cargo.toml +++ b/crates/client2/Cargo.toml @@ -9,7 +9,7 @@ path = "src/client2.rs" doctest = false [features] -test-support = ["collections/test-support", "gpui/test-support", "rpc/test-support"] +test-support = ["collections/test-support", "gpui2/test-support", "rpc/test-support"] [dependencies] collections = { path = "../collections" } @@ -18,7 +18,7 @@ gpui2 = { path = "../gpui2" } util = { path = "../util" } rpc = { path = "../rpc" } text = { path = "../text" } -settings = { path = "../settings" } +settings2 = { path = "../settings2" } feature_flags = { path = "../feature_flags" } sum_tree = { path = "../sum_tree" } diff --git a/crates/client2/src/client2.rs b/crates/client2/src/client2.rs index 9f63d0e2be..2fcd3d40a0 100644 --- a/crates/client2/src/client2.rs +++ b/crates/client2/src/client2.rs @@ -4,7 +4,7 @@ pub mod test; pub mod telemetry; pub mod user; -use anyhow::{anyhow, Context, Result}; +use anyhow::{anyhow, Context as _, Result}; use async_recursion::async_recursion; use async_tungstenite::tungstenite::{ error::Error as WebsocketError, @@ -14,10 +14,9 @@ use futures::{ future::LocalBoxFuture, AsyncReadExt, FutureExt, SinkExt, StreamExt, TryFutureExt as _, TryStreamExt, }; -use gpui::{ - actions, platform::AppVersion, serde_json, AnyModelHandle, AnyWeakModelHandle, - AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, ModelHandle, Task, View, ViewContext, - WeakViewHandle, +use gpui2::{ + serde_json, AnyHandle, AnyWeakHandle, AnyWindowHandle, AppContext, AsyncAppContext, + AsyncWindowContext, Handle, SemanticVersion, Task, ViewContext, WeakHandle, WindowId, }; use lazy_static::lazy_static; use parking_lot::RwLock; @@ -57,7 +56,7 @@ lazy_static! { pub static ref ADMIN_API_TOKEN: Option = std::env::var("ZED_ADMIN_API_TOKEN") .ok() .and_then(|s| if s.is_empty() { None } else { Some(s) }); - pub static ref ZED_APP_VERSION: Option = std::env::var("ZED_APP_VERSION") + pub static ref ZED_APP_VERSION: Option = std::env::var("ZED_APP_VERSION") .ok() .and_then(|v| v.parse().ok()); pub static ref ZED_APP_PATH: Option = @@ -70,17 +69,25 @@ pub const ZED_SECRET_CLIENT_TOKEN: &str = "618033988749894"; pub const INITIAL_RECONNECTION_DELAY: Duration = Duration::from_millis(100); pub const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); -actions!(client, [SignIn, SignOut, Reconnect]); +#[derive(Clone, Default, PartialEq, Deserialize)] +pub struct SignIn; + +#[derive(Clone, Default, PartialEq, Deserialize)] +pub struct SignOut; + +#[derive(Clone, Default, PartialEq, Deserialize)] +pub struct Reconnect; pub fn init_settings(cx: &mut AppContext) { - settings::register::(cx); + settings2::register::(cx); } pub fn init(client: &Arc, cx: &mut AppContext) { init_settings(cx); let client = Arc::downgrade(client); - cx.add_global_action({ + cx.register_action_type::(); + cx.on_action({ let client = client.clone(); move |_: &SignIn, cx| { if let Some(client) = client.upgrade() { @@ -91,7 +98,9 @@ pub fn init(client: &Arc, cx: &mut AppContext) { } } }); - cx.add_global_action({ + + cx.register_action_type::(); + cx.on_action({ let client = client.clone(); move |_: &SignOut, cx| { if let Some(client) = client.upgrade() { @@ -102,7 +111,9 @@ pub fn init(client: &Arc, cx: &mut AppContext) { } } }); - cx.add_global_action({ + + cx.register_action_type::(); + cx.on_action({ let client = client.clone(); move |_: &Reconnect, cx| { if let Some(client) = client.upgrade() { @@ -216,7 +227,7 @@ struct ClientState { _reconnect_task: Option>, reconnect_interval: Duration, entities_by_type_and_remote_id: HashMap<(TypeId, u64), WeakSubscriber>, - models_by_message_type: HashMap, + models_by_message_type: HashMap, entity_types_by_message_type: HashMap, #[allow(clippy::type_complexity)] message_handlers: HashMap< @@ -235,14 +246,16 @@ struct ClientState { } enum WeakSubscriber { - Model(AnyWeakModelHandle), - View(AnyWeakViewHandle), + Entity { + handle: AnyWeakHandle, + window_id: Option, + }, Pending(Vec>), } -enum Subscriber { - Model(AnyModelHandle), - View(AnyWeakViewHandle), +struct Subscriber { + handle: AnyHandle, + window_handle: Option, } #[derive(Clone, Debug)] @@ -298,15 +311,18 @@ impl Drop for Subscription { } } -pub struct PendingEntitySubscription { +pub struct PendingEntitySubscription { client: Arc, remote_id: u64, _entity_type: PhantomData, consumed: bool, } -impl PendingEntitySubscription { - pub fn set_model(mut self, model: &ModelHandle, cx: &mut AsyncAppContext) -> Subscription { +impl PendingEntitySubscription +where + T: 'static + Send + Sync, +{ + pub fn set_model(mut self, model: &Handle, cx: &mut AsyncAppContext) -> Subscription { self.consumed = true; let mut state = self.client.state.write(); let id = (TypeId::of::(), self.remote_id); @@ -316,9 +332,13 @@ impl PendingEntitySubscription { unreachable!() }; - state - .entities_by_type_and_remote_id - .insert(id, WeakSubscriber::Model(model.downgrade().into_any())); + state.entities_by_type_and_remote_id.insert( + id, + WeakSubscriber::Entity { + handle: model.downgrade().into(), + window_id: None, + }, + ); drop(state); for message in messages { self.client.handle_message(message, cx); @@ -330,7 +350,7 @@ impl PendingEntitySubscription { } } -impl Drop for PendingEntitySubscription { +impl Drop for PendingEntitySubscription { fn drop(&mut self) { if !self.consumed { let mut state = self.client.state.write(); @@ -358,7 +378,7 @@ pub struct TelemetrySettingsContent { pub metrics: Option, } -impl settings::Setting for TelemetrySettings { +impl settings2::Setting for TelemetrySettings { const KEY: Option<&'static str> = Some("telemetry"); type FileContent = TelemetrySettingsContent; @@ -509,23 +529,26 @@ impl Client { } } - pub fn add_view_for_remote_entity( + pub fn add_view_for_remote_entity( self: &Arc, remote_id: u64, cx: &mut ViewContext, ) -> Subscription { let id = (TypeId::of::(), remote_id); - self.state - .write() - .entities_by_type_and_remote_id - .insert(id, WeakSubscriber::View(cx.weak_handle().into_any())); + self.state.write().entities_by_type_and_remote_id.insert( + id, + WeakSubscriber::Entity { + handle: cx.handle().into_any(), + window_id: Some(cx.window_id()), + }, + ); Subscription::Entity { client: Arc::downgrade(self), id, } } - pub fn subscribe_to_entity( + pub fn subscribe_to_entity( self: &Arc, remote_id: u64, ) -> Result> { @@ -550,16 +573,13 @@ impl Client { #[track_caller] pub fn add_message_handler( self: &Arc, - model: ModelHandle, + model: Handle, handler: H, ) -> Subscription where M: EnvelopedMessage, - E: Entity, - H: 'static - + Send - + Sync - + Fn(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + E: 'static + Send + Sync, + H: 'static + Send + Sync + Fn(Handle, TypedEnvelope, Arc, AsyncAppContext) -> F, F: 'static + Future>, { let message_type_id = TypeId::of::(); @@ -600,16 +620,13 @@ impl Client { pub fn add_request_handler( self: &Arc, - model: ModelHandle, + model: Handle, handler: H, ) -> Subscription where M: RequestMessage, - E: Entity, - H: 'static - + Send - + Sync - + Fn(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + E: 'static + Send + Sync, + H: 'static + Send + Sync + Fn(Handle, TypedEnvelope, Arc, AsyncAppContext) -> F, F: 'static + Future>, { self.add_message_handler(model, move |handle, envelope, this, cx| { @@ -624,18 +641,24 @@ impl Client { pub fn add_view_message_handler(self: &Arc, handler: H) where M: EntityMessage, - E: View, H: 'static + Send + Sync - + Fn(WeakViewHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + + Fn(WeakHandle, TypedEnvelope, Arc, AsyncWindowContext) -> F, F: 'static + Future>, { - self.add_entity_message_handler::(move |handle, message, client, cx| { - if let Subscriber::View(handle) = handle { - handler(handle.downcast::().unwrap(), message, client, cx) + self.add_entity_message_handler::(move |subscriber, message, client, cx| { + if let Some(window_handle) = subscriber.window_handle { + cx.update_window(subscriber, |cx| { + handler( + subscriber.handle.downcast::().unwrap(), + message, + client, + cx, + ) + }) } else { - unreachable!(); + panic!() } }) } @@ -643,26 +666,23 @@ impl Client { pub fn add_model_message_handler(self: &Arc, handler: H) where M: EntityMessage, - E: Entity, - H: 'static - + Send - + Sync - + Fn(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + E: 'static + Send + Sync, + H: 'static + Send + Sync + Fn(Handle, TypedEnvelope, Arc, AsyncAppContext) -> F, F: 'static + Future>, { - self.add_entity_message_handler::(move |handle, message, client, cx| { - if let Subscriber::Model(handle) = handle { - handler(handle.downcast::().unwrap(), message, client, cx) - } else { - unreachable!(); - } + self.add_entity_message_handler::(move |subscriber, message, client, cx| { + handler( + subscriber.handle.downcast::().unwrap(), + message, + client, + cx, + ) }) } fn add_entity_message_handler(self: &Arc, handler: H) where M: EntityMessage, - E: Entity, H: 'static + Send + Sync @@ -704,11 +724,8 @@ impl Client { pub fn add_model_request_handler(self: &Arc, handler: H) where M: EntityMessage + RequestMessage, - E: Entity, - H: 'static - + Send - + Sync - + Fn(ModelHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + E: 'static + Send + Sync, + H: 'static + Send + Sync + Fn(Handle, TypedEnvelope, Arc, AsyncAppContext) -> F, F: 'static + Future>, { self.add_model_message_handler(move |entity, envelope, client, cx| { @@ -723,11 +740,11 @@ impl Client { pub fn add_view_request_handler(self: &Arc, handler: H) where M: EntityMessage + RequestMessage, - E: View, + E: 'static + Send + Sync, H: 'static + Send + Sync - + Fn(WeakViewHandle, TypedEnvelope, Arc, AsyncAppContext) -> F, + + Fn(Handle, TypedEnvelope, Arc, AsyncWindowContext) -> F, F: 'static + Future>, { self.add_view_message_handler(move |entity, envelope, client, cx| { @@ -823,14 +840,14 @@ impl Client { self.set_status(Status::Reconnecting, cx); } - let mut timeout = cx.background().timer(CONNECTION_TIMEOUT).fuse(); + let mut timeout = futures::FutureExt::fuse(cx.executor()?.timer(CONNECTION_TIMEOUT)); futures::select_biased! { connection = self.establish_connection(&credentials, cx).fuse() => { match connection { Ok(conn) => { self.state.write().credentials = Some(credentials.clone()); if !read_from_keychain && IMPERSONATE_LOGIN.is_none() { - write_credentials_to_keychain(&credentials, cx).log_err(); + write_credentials_to_keychain(credentials, cx).log_err(); } futures::select_biased! { @@ -844,7 +861,7 @@ impl Client { Err(EstablishConnectionError::Unauthorized) => { self.state.write().credentials.take(); if read_from_keychain { - cx.platform().delete_credentials(&ZED_SERVER_URL).log_err(); + delete_credentials_from_keychain(cx).log_err(); self.set_status(Status::SignedOut, cx); self.authenticate_and_connect(false, cx).await } else { @@ -874,12 +891,12 @@ impl Client { conn: Connection, cx: &AsyncAppContext, ) -> Result<()> { - let executor = cx.background(); + let executor = cx.executor()?; log::info!("add connection to peer"); let (connection_id, handle_io, mut incoming) = self .peer .add_connection(conn, move |duration| executor.timer(duration)); - let handle_io = cx.background().spawn(handle_io); + let handle_io = executor.spawn(handle_io); let peer_id = async { log::info!("waiting for server hello"); @@ -925,10 +942,10 @@ impl Client { }, cx, ); - cx.foreground() - .spawn({ - let cx = cx.clone(); - let this = self.clone(); + + cx.spawn({ + let this = self.clone(); + |cx| { async move { while let Some(message) = incoming.next().await { this.handle_message(message, &cx); @@ -936,13 +953,13 @@ impl Client { smol::future::yield_now().await; } } - }) - .detach(); + } + })? + .detach(); - let this = self.clone(); - let cx = cx.clone(); - cx.foreground() - .spawn(async move { + cx.spawn({ + let this = self.clone(); + move |cx| async move { match handle_io.await { Ok(()) => { if this.status().borrow().clone() @@ -959,8 +976,8 @@ impl Client { this.set_status(Status::ConnectionLost, &cx); } } - }) - .detach(); + } + })?; Ok(()) } @@ -1299,12 +1316,15 @@ impl Client { let mut subscriber = None; - if let Some(message_model) = state + if let Some(handle) = state .models_by_message_type .get(&payload_type_id) - .and_then(|model| model.upgrade(cx)) + .and_then(|handle| handle.upgrade()) { - subscriber = Some(Subscriber::Model(message_model)); + subscriber = Some(Subscriber { + handle, + window_handle: None, + }); } else if let Some((extract_entity_id, entity_type_id)) = state.entity_id_extractors.get(&payload_type_id).zip( state @@ -1393,28 +1413,39 @@ impl Client { } } -fn read_credentials_from_keychain(cx: &AsyncAppContext) -> Option { +async fn read_credentials_from_keychain(cx: &AsyncAppContext) -> Option { if IMPERSONATE_LOGIN.is_some() { return None; } let (user_id, access_token) = cx - .platform() - .read_credentials(&ZED_SERVER_URL) - .log_err() - .flatten()?; + .run_on_main(|cx| cx.read_credentials(&ZED_SERVER_URL).log_err().flatten()) + .ok()? + .await?; + Some(Credentials { user_id: user_id.parse().ok()?, access_token: String::from_utf8(access_token).ok()?, }) } -fn write_credentials_to_keychain(credentials: &Credentials, cx: &AsyncAppContext) -> Result<()> { - cx.platform().write_credentials( - &ZED_SERVER_URL, - &credentials.user_id.to_string(), - credentials.access_token.as_bytes(), - ) +async fn write_credentials_to_keychain( + credentials: Credentials, + cx: &AsyncAppContext, +) -> Result<()> { + cx.run_on_main(move |cx| { + cx.write_credentials( + &ZED_SERVER_URL, + &credentials.user_id.to_string(), + credentials.access_token.as_bytes(), + ) + })? + .await +} + +async fn delete_credentials_from_keychain(cx: &AsyncAppContext) -> Result<()> { + cx.run_on_main(move |cx| cx.delete_credentials(&ZED_SERVER_URL))? + .await } const WORKTREE_URL_PREFIX: &str = "zed://worktrees/"; @@ -1434,290 +1465,290 @@ pub fn decode_worktree_url(url: &str) -> Option<(u64, String)> { Some((id, access_token.to_string())) } -#[cfg(test)] -mod tests { - use super::*; - use crate::test::FakeServer; - use gpui::{executor::Deterministic, TestAppContext}; - use parking_lot::Mutex; - use std::future; - use util::http::FakeHttpClient; +// #[cfg(test)] +// mod tests { +// use super::*; +// use crate::test::FakeServer; +// use gpui::{executor::Deterministic, TestAppContext}; +// use parking_lot::Mutex; +// use std::future; +// use util::http::FakeHttpClient; - #[gpui::test(iterations = 10)] - async fn test_reconnection(cx: &mut TestAppContext) { - cx.foreground().forbid_parking(); +// #[gpui::test(iterations = 10)] +// async fn test_reconnection(cx: &mut TestAppContext) { +// cx.foreground().forbid_parking(); - let user_id = 5; - let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); - let server = FakeServer::for_client(user_id, &client, cx).await; - let mut status = client.status(); - assert!(matches!( - status.next().await, - Some(Status::Connected { .. }) - )); - assert_eq!(server.auth_count(), 1); +// let user_id = 5; +// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); +// let server = FakeServer::for_client(user_id, &client, cx).await; +// let mut status = client.status(); +// assert!(matches!( +// status.next().await, +// Some(Status::Connected { .. }) +// )); +// assert_eq!(server.auth_count(), 1); - server.forbid_connections(); - server.disconnect(); - while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} +// server.forbid_connections(); +// server.disconnect(); +// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} - server.allow_connections(); - cx.foreground().advance_clock(Duration::from_secs(10)); - while !matches!(status.next().await, Some(Status::Connected { .. })) {} - assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting +// server.allow_connections(); +// cx.foreground().advance_clock(Duration::from_secs(10)); +// while !matches!(status.next().await, Some(Status::Connected { .. })) {} +// assert_eq!(server.auth_count(), 1); // Client reused the cached credentials when reconnecting - server.forbid_connections(); - server.disconnect(); - while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} +// server.forbid_connections(); +// server.disconnect(); +// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} - // Clear cached credentials after authentication fails - server.roll_access_token(); - server.allow_connections(); - cx.foreground().advance_clock(Duration::from_secs(10)); - while !matches!(status.next().await, Some(Status::Connected { .. })) {} - assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token - } +// // Clear cached credentials after authentication fails +// server.roll_access_token(); +// server.allow_connections(); +// cx.foreground().advance_clock(Duration::from_secs(10)); +// while !matches!(status.next().await, Some(Status::Connected { .. })) {} +// assert_eq!(server.auth_count(), 2); // Client re-authenticated due to an invalid token +// } - #[gpui::test(iterations = 10)] - async fn test_connection_timeout(deterministic: Arc, cx: &mut TestAppContext) { - deterministic.forbid_parking(); +// #[gpui::test(iterations = 10)] +// async fn test_connection_timeout(deterministic: Arc, cx: &mut TestAppContext) { +// deterministic.forbid_parking(); - let user_id = 5; - let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); - let mut status = client.status(); +// let user_id = 5; +// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); +// let mut status = client.status(); - // Time out when client tries to connect. - client.override_authenticate(move |cx| { - cx.foreground().spawn(async move { - Ok(Credentials { - user_id, - access_token: "token".into(), - }) - }) - }); - client.override_establish_connection(|_, cx| { - cx.foreground().spawn(async move { - future::pending::<()>().await; - unreachable!() - }) - }); - let auth_and_connect = cx.spawn({ - let client = client.clone(); - |cx| async move { client.authenticate_and_connect(false, &cx).await } - }); - deterministic.run_until_parked(); - assert!(matches!(status.next().await, Some(Status::Connecting))); +// // Time out when client tries to connect. +// client.override_authenticate(move |cx| { +// cx.foreground().spawn(async move { +// Ok(Credentials { +// user_id, +// access_token: "token".into(), +// }) +// }) +// }); +// client.override_establish_connection(|_, cx| { +// cx.foreground().spawn(async move { +// future::pending::<()>().await; +// unreachable!() +// }) +// }); +// let auth_and_connect = cx.spawn({ +// let client = client.clone(); +// |cx| async move { client.authenticate_and_connect(false, &cx).await } +// }); +// deterministic.run_until_parked(); +// assert!(matches!(status.next().await, Some(Status::Connecting))); - deterministic.advance_clock(CONNECTION_TIMEOUT); - assert!(matches!( - status.next().await, - Some(Status::ConnectionError { .. }) - )); - auth_and_connect.await.unwrap_err(); +// deterministic.advance_clock(CONNECTION_TIMEOUT); +// assert!(matches!( +// status.next().await, +// Some(Status::ConnectionError { .. }) +// )); +// auth_and_connect.await.unwrap_err(); - // Allow the connection to be established. - let server = FakeServer::for_client(user_id, &client, cx).await; - assert!(matches!( - status.next().await, - Some(Status::Connected { .. }) - )); +// // Allow the connection to be established. +// let server = FakeServer::for_client(user_id, &client, cx).await; +// assert!(matches!( +// status.next().await, +// Some(Status::Connected { .. }) +// )); - // Disconnect client. - server.forbid_connections(); - server.disconnect(); - while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} +// // Disconnect client. +// server.forbid_connections(); +// server.disconnect(); +// while !matches!(status.next().await, Some(Status::ReconnectionError { .. })) {} - // Time out when re-establishing the connection. - server.allow_connections(); - client.override_establish_connection(|_, cx| { - cx.foreground().spawn(async move { - future::pending::<()>().await; - unreachable!() - }) - }); - deterministic.advance_clock(2 * INITIAL_RECONNECTION_DELAY); - assert!(matches!( - status.next().await, - Some(Status::Reconnecting { .. }) - )); +// // Time out when re-establishing the connection. +// server.allow_connections(); +// client.override_establish_connection(|_, cx| { +// cx.foreground().spawn(async move { +// future::pending::<()>().await; +// unreachable!() +// }) +// }); +// deterministic.advance_clock(2 * INITIAL_RECONNECTION_DELAY); +// assert!(matches!( +// status.next().await, +// Some(Status::Reconnecting { .. }) +// )); - deterministic.advance_clock(CONNECTION_TIMEOUT); - assert!(matches!( - status.next().await, - Some(Status::ReconnectionError { .. }) - )); - } +// deterministic.advance_clock(CONNECTION_TIMEOUT); +// assert!(matches!( +// status.next().await, +// Some(Status::ReconnectionError { .. }) +// )); +// } - #[gpui::test(iterations = 10)] - async fn test_authenticating_more_than_once( - cx: &mut TestAppContext, - deterministic: Arc, - ) { - cx.foreground().forbid_parking(); +// #[gpui::test(iterations = 10)] +// async fn test_authenticating_more_than_once( +// cx: &mut TestAppContext, +// deterministic: Arc, +// ) { +// cx.foreground().forbid_parking(); - let auth_count = Arc::new(Mutex::new(0)); - let dropped_auth_count = Arc::new(Mutex::new(0)); - let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); - client.override_authenticate({ - let auth_count = auth_count.clone(); - let dropped_auth_count = dropped_auth_count.clone(); - move |cx| { - let auth_count = auth_count.clone(); - let dropped_auth_count = dropped_auth_count.clone(); - cx.foreground().spawn(async move { - *auth_count.lock() += 1; - let _drop = util::defer(move || *dropped_auth_count.lock() += 1); - future::pending::<()>().await; - unreachable!() - }) - } - }); +// let auth_count = Arc::new(Mutex::new(0)); +// let dropped_auth_count = Arc::new(Mutex::new(0)); +// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); +// client.override_authenticate({ +// let auth_count = auth_count.clone(); +// let dropped_auth_count = dropped_auth_count.clone(); +// move |cx| { +// let auth_count = auth_count.clone(); +// let dropped_auth_count = dropped_auth_count.clone(); +// cx.foreground().spawn(async move { +// *auth_count.lock() += 1; +// let _drop = util::defer(move || *dropped_auth_count.lock() += 1); +// future::pending::<()>().await; +// unreachable!() +// }) +// } +// }); - let _authenticate = cx.spawn(|cx| { - let client = client.clone(); - async move { client.authenticate_and_connect(false, &cx).await } - }); - deterministic.run_until_parked(); - assert_eq!(*auth_count.lock(), 1); - assert_eq!(*dropped_auth_count.lock(), 0); +// let _authenticate = cx.spawn(|cx| { +// let client = client.clone(); +// async move { client.authenticate_and_connect(false, &cx).await } +// }); +// deterministic.run_until_parked(); +// assert_eq!(*auth_count.lock(), 1); +// assert_eq!(*dropped_auth_count.lock(), 0); - let _authenticate = cx.spawn(|cx| { - let client = client.clone(); - async move { client.authenticate_and_connect(false, &cx).await } - }); - deterministic.run_until_parked(); - assert_eq!(*auth_count.lock(), 2); - assert_eq!(*dropped_auth_count.lock(), 1); - } +// let _authenticate = cx.spawn(|cx| { +// let client = client.clone(); +// async move { client.authenticate_and_connect(false, &cx).await } +// }); +// deterministic.run_until_parked(); +// assert_eq!(*auth_count.lock(), 2); +// assert_eq!(*dropped_auth_count.lock(), 1); +// } - #[test] - fn test_encode_and_decode_worktree_url() { - let url = encode_worktree_url(5, "deadbeef"); - assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string()))); - assert_eq!( - decode_worktree_url(&format!("\n {}\t", url)), - Some((5, "deadbeef".to_string())) - ); - assert_eq!(decode_worktree_url("not://the-right-format"), None); - } +// #[test] +// fn test_encode_and_decode_worktree_url() { +// let url = encode_worktree_url(5, "deadbeef"); +// assert_eq!(decode_worktree_url(&url), Some((5, "deadbeef".to_string()))); +// assert_eq!( +// decode_worktree_url(&format!("\n {}\t", url)), +// Some((5, "deadbeef".to_string())) +// ); +// assert_eq!(decode_worktree_url("not://the-right-format"), None); +// } - #[gpui::test] - async fn test_subscribing_to_entity(cx: &mut TestAppContext) { - cx.foreground().forbid_parking(); +// #[gpui::test] +// async fn test_subscribing_to_entity(cx: &mut TestAppContext) { +// cx.foreground().forbid_parking(); - let user_id = 5; - let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); - let server = FakeServer::for_client(user_id, &client, cx).await; +// let user_id = 5; +// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); +// let server = FakeServer::for_client(user_id, &client, cx).await; - let (done_tx1, mut done_rx1) = smol::channel::unbounded(); - let (done_tx2, mut done_rx2) = smol::channel::unbounded(); - client.add_model_message_handler( - move |model: ModelHandle, _: TypedEnvelope, _, cx| { - match model.read_with(&cx, |model, _| model.id) { - 1 => done_tx1.try_send(()).unwrap(), - 2 => done_tx2.try_send(()).unwrap(), - _ => unreachable!(), - } - async { Ok(()) } - }, - ); - let model1 = cx.add_model(|_| Model { - id: 1, - subscription: None, - }); - let model2 = cx.add_model(|_| Model { - id: 2, - subscription: None, - }); - let model3 = cx.add_model(|_| Model { - id: 3, - subscription: None, - }); +// let (done_tx1, mut done_rx1) = smol::channel::unbounded(); +// let (done_tx2, mut done_rx2) = smol::channel::unbounded(); +// client.add_model_message_handler( +// move |model: ModelHandle, _: TypedEnvelope, _, cx| { +// match model.read_with(&cx, |model, _| model.id) { +// 1 => done_tx1.try_send(()).unwrap(), +// 2 => done_tx2.try_send(()).unwrap(), +// _ => unreachable!(), +// } +// async { Ok(()) } +// }, +// ); +// let model1 = cx.add_model(|_| Model { +// id: 1, +// subscription: None, +// }); +// let model2 = cx.add_model(|_| Model { +// id: 2, +// subscription: None, +// }); +// let model3 = cx.add_model(|_| Model { +// id: 3, +// subscription: None, +// }); - let _subscription1 = client - .subscribe_to_entity(1) - .unwrap() - .set_model(&model1, &mut cx.to_async()); - let _subscription2 = client - .subscribe_to_entity(2) - .unwrap() - .set_model(&model2, &mut cx.to_async()); - // Ensure dropping a subscription for the same entity type still allows receiving of - // messages for other entity IDs of the same type. - let subscription3 = client - .subscribe_to_entity(3) - .unwrap() - .set_model(&model3, &mut cx.to_async()); - drop(subscription3); +// let _subscription1 = client +// .subscribe_to_entity(1) +// .unwrap() +// .set_model(&model1, &mut cx.to_async()); +// let _subscription2 = client +// .subscribe_to_entity(2) +// .unwrap() +// .set_model(&model2, &mut cx.to_async()); +// // Ensure dropping a subscription for the same entity type still allows receiving of +// // messages for other entity IDs of the same type. +// let subscription3 = client +// .subscribe_to_entity(3) +// .unwrap() +// .set_model(&model3, &mut cx.to_async()); +// drop(subscription3); - server.send(proto::JoinProject { project_id: 1 }); - server.send(proto::JoinProject { project_id: 2 }); - done_rx1.next().await.unwrap(); - done_rx2.next().await.unwrap(); - } +// server.send(proto::JoinProject { project_id: 1 }); +// server.send(proto::JoinProject { project_id: 2 }); +// done_rx1.next().await.unwrap(); +// done_rx2.next().await.unwrap(); +// } - #[gpui::test] - async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) { - cx.foreground().forbid_parking(); +// #[gpui::test] +// async fn test_subscribing_after_dropping_subscription(cx: &mut TestAppContext) { +// cx.foreground().forbid_parking(); - let user_id = 5; - let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); - let server = FakeServer::for_client(user_id, &client, cx).await; +// let user_id = 5; +// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); +// let server = FakeServer::for_client(user_id, &client, cx).await; - let model = cx.add_model(|_| Model::default()); - let (done_tx1, _done_rx1) = smol::channel::unbounded(); - let (done_tx2, mut done_rx2) = smol::channel::unbounded(); - let subscription1 = client.add_message_handler( - model.clone(), - move |_, _: TypedEnvelope, _, _| { - done_tx1.try_send(()).unwrap(); - async { Ok(()) } - }, - ); - drop(subscription1); - let _subscription2 = client.add_message_handler( - model.clone(), - move |_, _: TypedEnvelope, _, _| { - done_tx2.try_send(()).unwrap(); - async { Ok(()) } - }, - ); - server.send(proto::Ping {}); - done_rx2.next().await.unwrap(); - } +// let model = cx.add_model(|_| Model::default()); +// let (done_tx1, _done_rx1) = smol::channel::unbounded(); +// let (done_tx2, mut done_rx2) = smol::channel::unbounded(); +// let subscription1 = client.add_message_handler( +// model.clone(), +// move |_, _: TypedEnvelope, _, _| { +// done_tx1.try_send(()).unwrap(); +// async { Ok(()) } +// }, +// ); +// drop(subscription1); +// let _subscription2 = client.add_message_handler( +// model.clone(), +// move |_, _: TypedEnvelope, _, _| { +// done_tx2.try_send(()).unwrap(); +// async { Ok(()) } +// }, +// ); +// server.send(proto::Ping {}); +// done_rx2.next().await.unwrap(); +// } - #[gpui::test] - async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) { - cx.foreground().forbid_parking(); +// #[gpui::test] +// async fn test_dropping_subscription_in_handler(cx: &mut TestAppContext) { +// cx.foreground().forbid_parking(); - let user_id = 5; - let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); - let server = FakeServer::for_client(user_id, &client, cx).await; +// let user_id = 5; +// let client = cx.update(|cx| Client::new(FakeHttpClient::with_404_response(), cx)); +// let server = FakeServer::for_client(user_id, &client, cx).await; - let model = cx.add_model(|_| Model::default()); - let (done_tx, mut done_rx) = smol::channel::unbounded(); - let subscription = client.add_message_handler( - model.clone(), - move |model, _: TypedEnvelope, _, mut cx| { - model.update(&mut cx, |model, _| model.subscription.take()); - done_tx.try_send(()).unwrap(); - async { Ok(()) } - }, - ); - model.update(cx, |model, _| { - model.subscription = Some(subscription); - }); - server.send(proto::Ping {}); - done_rx.next().await.unwrap(); - } +// let model = cx.add_model(|_| Model::default()); +// let (done_tx, mut done_rx) = smol::channel::unbounded(); +// let subscription = client.add_message_handler( +// model.clone(), +// move |model, _: TypedEnvelope, _, mut cx| { +// model.update(&mut cx, |model, _| model.subscription.take()); +// done_tx.try_send(()).unwrap(); +// async { Ok(()) } +// }, +// ); +// model.update(cx, |model, _| { +// model.subscription = Some(subscription); +// }); +// server.send(proto::Ping {}); +// done_rx.next().await.unwrap(); +// } - #[derive(Default)] - struct Model { - id: usize, - subscription: Option, - } +// #[derive(Default)] +// struct Model { +// id: usize, +// subscription: Option, +// } - impl Entity for Model { - type Event = (); - } -} +// impl Entity for Model { +// type Event = (); +// } +// } diff --git a/crates/client2/src/telemetry.rs b/crates/client2/src/telemetry.rs index 70878bf2e4..d1489fce8e 100644 --- a/crates/client2/src/telemetry.rs +++ b/crates/client2/src/telemetry.rs @@ -1,5 +1,5 @@ use crate::{TelemetrySettings, ZED_SECRET_CLIENT_TOKEN, ZED_SERVER_URL}; -use gpui::{executor::Background, serde_json, AppContext, Task}; +use gpui2::{serde_json, AppContext, Executor, Task}; use lazy_static::lazy_static; use parking_lot::Mutex; use serde::Serialize; @@ -11,7 +11,7 @@ use util::{channel::ReleaseChannel, TryFutureExt}; pub struct Telemetry { http_client: Arc, - executor: Arc, + executor: Executor, state: Mutex, } diff --git a/crates/client2/src/test.rs b/crates/client2/src/test.rs index 38cd12f21c..96c20791ec 100644 --- a/crates/client2/src/test.rs +++ b/crates/client2/src/test.rs @@ -1,215 +1,215 @@ -use crate::{Client, Connection, Credentials, EstablishConnectionError, UserStore}; -use anyhow::{anyhow, Result}; -use futures::{stream::BoxStream, StreamExt}; -use gpui::{executor, ModelHandle, TestAppContext}; -use parking_lot::Mutex; -use rpc::{ - proto::{self, GetPrivateUserInfo, GetPrivateUserInfoResponse}, - ConnectionId, Peer, Receipt, TypedEnvelope, -}; -use std::{rc::Rc, sync::Arc}; -use util::http::FakeHttpClient; +// use crate::{Client, Connection, Credentials, EstablishConnectionError, UserStore}; +// use anyhow::{anyhow, Result}; +// use futures::{stream::BoxStream, StreamExt}; +// use gpui2::{Executor, Handle, TestAppContext}; +// use parking_lot::Mutex; +// use rpc::{ +// proto::{self, GetPrivateUserInfo, GetPrivateUserInfoResponse}, +// ConnectionId, Peer, Receipt, TypedEnvelope, +// }; +// use std::{rc::Rc, sync::Arc}; +// use util::http::FakeHttpClient; -pub struct FakeServer { - peer: Arc, - state: Arc>, - user_id: u64, - executor: Rc, -} +// pub struct FakeServer { +// peer: Arc, +// state: Arc>, +// user_id: u64, +// executor: Executor, +// } -#[derive(Default)] -struct FakeServerState { - incoming: Option>>, - connection_id: Option, - forbid_connections: bool, - auth_count: usize, - access_token: usize, -} +// #[derive(Default)] +// struct FakeServerState { +// incoming: Option>>, +// connection_id: Option, +// forbid_connections: bool, +// auth_count: usize, +// access_token: usize, +// } -impl FakeServer { - pub async fn for_client( - client_user_id: u64, - client: &Arc, - cx: &TestAppContext, - ) -> Self { - let server = Self { - peer: Peer::new(0), - state: Default::default(), - user_id: client_user_id, - executor: cx.foreground(), - }; +// impl FakeServer { +// pub async fn for_client( +// client_user_id: u64, +// client: &Arc, +// cx: &TestAppContext, +// ) -> Self { +// let server = Self { +// peer: Peer::new(0), +// state: Default::default(), +// user_id: client_user_id, +// executor: cx.foreground(), +// }; - client - .override_authenticate({ - let state = Arc::downgrade(&server.state); - move |cx| { - let state = state.clone(); - cx.spawn(move |_| async move { - let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; - let mut state = state.lock(); - state.auth_count += 1; - let access_token = state.access_token.to_string(); - Ok(Credentials { - user_id: client_user_id, - access_token, - }) - }) - } - }) - .override_establish_connection({ - let peer = Arc::downgrade(&server.peer); - let state = Arc::downgrade(&server.state); - move |credentials, cx| { - let peer = peer.clone(); - let state = state.clone(); - let credentials = credentials.clone(); - cx.spawn(move |cx| async move { - let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; - let peer = peer.upgrade().ok_or_else(|| anyhow!("server dropped"))?; - if state.lock().forbid_connections { - Err(EstablishConnectionError::Other(anyhow!( - "server is forbidding connections" - )))? - } +// client +// .override_authenticate({ +// let state = Arc::downgrade(&server.state); +// move |cx| { +// let state = state.clone(); +// cx.spawn(move |_| async move { +// let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; +// let mut state = state.lock(); +// state.auth_count += 1; +// let access_token = state.access_token.to_string(); +// Ok(Credentials { +// user_id: client_user_id, +// access_token, +// }) +// }) +// } +// }) +// .override_establish_connection({ +// let peer = Arc::downgrade(&server.peer); +// let state = Arc::downgrade(&server.state); +// move |credentials, cx| { +// let peer = peer.clone(); +// let state = state.clone(); +// let credentials = credentials.clone(); +// cx.spawn(move |cx| async move { +// let state = state.upgrade().ok_or_else(|| anyhow!("server dropped"))?; +// let peer = peer.upgrade().ok_or_else(|| anyhow!("server dropped"))?; +// if state.lock().forbid_connections { +// Err(EstablishConnectionError::Other(anyhow!( +// "server is forbidding connections" +// )))? +// } - assert_eq!(credentials.user_id, client_user_id); +// assert_eq!(credentials.user_id, client_user_id); - if credentials.access_token != state.lock().access_token.to_string() { - Err(EstablishConnectionError::Unauthorized)? - } +// if credentials.access_token != state.lock().access_token.to_string() { +// Err(EstablishConnectionError::Unauthorized)? +// } - let (client_conn, server_conn, _) = Connection::in_memory(cx.background()); - let (connection_id, io, incoming) = - peer.add_test_connection(server_conn, cx.background()); - cx.background().spawn(io).detach(); - { - let mut state = state.lock(); - state.connection_id = Some(connection_id); - state.incoming = Some(incoming); - } - peer.send( - connection_id, - proto::Hello { - peer_id: Some(connection_id.into()), - }, - ) - .unwrap(); +// let (client_conn, server_conn, _) = Connection::in_memory(cx.background()); +// let (connection_id, io, incoming) = +// peer.add_test_connection(server_conn, cx.background()); +// cx.background().spawn(io).detach(); +// { +// let mut state = state.lock(); +// state.connection_id = Some(connection_id); +// state.incoming = Some(incoming); +// } +// peer.send( +// connection_id, +// proto::Hello { +// peer_id: Some(connection_id.into()), +// }, +// ) +// .unwrap(); - Ok(client_conn) - }) - } - }); +// Ok(client_conn) +// }) +// } +// }); - client - .authenticate_and_connect(false, &cx.to_async()) - .await - .unwrap(); +// client +// .authenticate_and_connect(false, &cx.to_async()) +// .await +// .unwrap(); - server - } +// server +// } - pub fn disconnect(&self) { - if self.state.lock().connection_id.is_some() { - self.peer.disconnect(self.connection_id()); - let mut state = self.state.lock(); - state.connection_id.take(); - state.incoming.take(); - } - } +// pub fn disconnect(&self) { +// if self.state.lock().connection_id.is_some() { +// self.peer.disconnect(self.connection_id()); +// let mut state = self.state.lock(); +// state.connection_id.take(); +// state.incoming.take(); +// } +// } - pub fn auth_count(&self) -> usize { - self.state.lock().auth_count - } +// pub fn auth_count(&self) -> usize { +// self.state.lock().auth_count +// } - pub fn roll_access_token(&self) { - self.state.lock().access_token += 1; - } +// pub fn roll_access_token(&self) { +// self.state.lock().access_token += 1; +// } - pub fn forbid_connections(&self) { - self.state.lock().forbid_connections = true; - } +// pub fn forbid_connections(&self) { +// self.state.lock().forbid_connections = true; +// } - pub fn allow_connections(&self) { - self.state.lock().forbid_connections = false; - } +// pub fn allow_connections(&self) { +// self.state.lock().forbid_connections = false; +// } - pub fn send(&self, message: T) { - self.peer.send(self.connection_id(), message).unwrap(); - } +// pub fn send(&self, message: T) { +// self.peer.send(self.connection_id(), message).unwrap(); +// } - #[allow(clippy::await_holding_lock)] - pub async fn receive(&self) -> Result> { - self.executor.start_waiting(); +// #[allow(clippy::await_holding_lock)] +// pub async fn receive(&self) -> Result> { +// self.executor.start_waiting(); - loop { - let message = self - .state - .lock() - .incoming - .as_mut() - .expect("not connected") - .next() - .await - .ok_or_else(|| anyhow!("other half hung up"))?; - self.executor.finish_waiting(); - let type_name = message.payload_type_name(); - let message = message.into_any(); +// loop { +// let message = self +// .state +// .lock() +// .incoming +// .as_mut() +// .expect("not connected") +// .next() +// .await +// .ok_or_else(|| anyhow!("other half hung up"))?; +// self.executor.finish_waiting(); +// let type_name = message.payload_type_name(); +// let message = message.into_any(); - if message.is::>() { - return Ok(*message.downcast().unwrap()); - } +// if message.is::>() { +// return Ok(*message.downcast().unwrap()); +// } - if message.is::>() { - self.respond( - message - .downcast::>() - .unwrap() - .receipt(), - GetPrivateUserInfoResponse { - metrics_id: "the-metrics-id".into(), - staff: false, - flags: Default::default(), - }, - ); - continue; - } +// if message.is::>() { +// self.respond( +// message +// .downcast::>() +// .unwrap() +// .receipt(), +// GetPrivateUserInfoResponse { +// metrics_id: "the-metrics-id".into(), +// staff: false, +// flags: Default::default(), +// }, +// ); +// continue; +// } - panic!( - "fake server received unexpected message type: {:?}", - type_name - ); - } - } +// panic!( +// "fake server received unexpected message type: {:?}", +// type_name +// ); +// } +// } - pub fn respond(&self, receipt: Receipt, response: T::Response) { - self.peer.respond(receipt, response).unwrap() - } +// pub fn respond(&self, receipt: Receipt, response: T::Response) { +// self.peer.respond(receipt, response).unwrap() +// } - fn connection_id(&self) -> ConnectionId { - self.state.lock().connection_id.expect("not connected") - } +// fn connection_id(&self) -> ConnectionId { +// self.state.lock().connection_id.expect("not connected") +// } - pub async fn build_user_store( - &self, - client: Arc, - cx: &mut TestAppContext, - ) -> ModelHandle { - let http_client = FakeHttpClient::with_404_response(); - let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx)); - assert_eq!( - self.receive::() - .await - .unwrap() - .payload - .user_ids, - &[self.user_id] - ); - user_store - } -} +// pub async fn build_user_store( +// &self, +// client: Arc, +// cx: &mut TestAppContext, +// ) -> ModelHandle { +// let http_client = FakeHttpClient::with_404_response(); +// let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx)); +// assert_eq!( +// self.receive::() +// .await +// .unwrap() +// .payload +// .user_ids, +// &[self.user_id] +// ); +// user_store +// } +// } -impl Drop for FakeServer { - fn drop(&mut self) { - self.disconnect(); - } -} +// impl Drop for FakeServer { +// fn drop(&mut self) { +// self.disconnect(); +// } +// } diff --git a/crates/client2/src/user.rs b/crates/client2/src/user.rs index 6aa41708e3..f237469698 100644 --- a/crates/client2/src/user.rs +++ b/crates/client2/src/user.rs @@ -3,7 +3,7 @@ use anyhow::{anyhow, Context, Result}; use collections::{hash_map::Entry, HashMap, HashSet}; use feature_flags::FeatureFlagAppExt; use futures::{channel::mpsc, future, AsyncReadExt, Future, StreamExt}; -use gpui::{AsyncAppContext, Entity, ImageData, ModelContext, ModelHandle, Task}; +use gpui2::{AsyncAppContext, EventEmitter, Handle, ImageData, ModelContext, Task}; use postage::{sink::Sink, watch}; use rpc::proto::{RequestMessage, UsersResponse}; use std::sync::{Arc, Weak}; @@ -103,7 +103,7 @@ pub enum ContactEventKind { Cancelled, } -impl Entity for UserStore { +impl EventEmitter for UserStore { type Event = Event; } @@ -217,7 +217,7 @@ impl UserStore { } async fn handle_update_invite_info( - this: ModelHandle, + this: Handle, message: TypedEnvelope, _: Arc, mut cx: AsyncAppContext, @@ -233,7 +233,7 @@ impl UserStore { } async fn handle_show_contacts( - this: ModelHandle, + this: Handle, _: TypedEnvelope, _: Arc, mut cx: AsyncAppContext, @@ -247,7 +247,7 @@ impl UserStore { } async fn handle_update_contacts( - this: ModelHandle, + this: Handle, message: TypedEnvelope, _: Arc, mut cx: AsyncAppContext, @@ -689,7 +689,7 @@ impl User { impl Contact { async fn from_proto( contact: proto::Contact, - user_store: &ModelHandle, + user_store: &Handle, cx: &mut AsyncAppContext, ) -> Result { let user = user_store diff --git a/crates/gpui2/src/app.rs b/crates/gpui2/src/app.rs index 88bb3b9eb5..043b4897ff 100644 --- a/crates/gpui2/src/app.rs +++ b/crates/gpui2/src/app.rs @@ -9,10 +9,11 @@ use refineable::Refineable; use smallvec::SmallVec; use crate::{ - current_platform, image_cache::ImageCache, Action, AssetSource, Context, DisplayId, Executor, - FocusEvent, FocusHandle, FocusId, KeyBinding, Keymap, LayoutId, MainThread, MainThreadOnly, - Platform, SemanticVersion, SharedString, SubscriberSet, SvgRenderer, Task, TextStyle, - TextStyleRefinement, TextSystem, View, Window, WindowContext, WindowHandle, WindowId, + current_platform, image_cache::ImageCache, Action, AssetSource, Context, DispatchPhase, + DisplayId, Executor, FocusEvent, FocusHandle, FocusId, KeyBinding, Keymap, LayoutId, + MainThread, MainThreadOnly, Platform, SemanticVersion, SharedString, SubscriberSet, + SvgRenderer, Task, TextStyle, TextStyleRefinement, TextSystem, View, Window, WindowContext, + WindowHandle, WindowId, }; use anyhow::{anyhow, Result}; use collections::{HashMap, HashSet, VecDeque}; @@ -67,6 +68,7 @@ impl App { entities, windows: SlotMap::with_key(), keymap: Arc::new(RwLock::new(Keymap::default())), + global_action_listeners: HashMap::default(), action_builders: HashMap::default(), pending_notifications: Default::default(), pending_effects: Default::default(), @@ -74,6 +76,7 @@ impl App { event_handlers: SubscriberSet::new(), release_handlers: SubscriberSet::new(), layout_id_buffer: Default::default(), + propagate_event: true, }) })) } @@ -168,6 +171,8 @@ pub struct AppContext { pub(crate) entities: EntityMap, pub(crate) windows: SlotMap>, pub(crate) keymap: Arc>, + pub(crate) global_action_listeners: + HashMap>>, action_builders: HashMap, pub(crate) pending_notifications: HashSet, pending_effects: VecDeque, @@ -175,6 +180,7 @@ pub struct AppContext { pub(crate) event_handlers: SubscriberSet, pub(crate) release_handlers: SubscriberSet, pub(crate) layout_id_buffer: Vec, // We recycle this memory across layout requests. + pub(crate) propagate_event: bool, } impl AppContext { @@ -508,6 +514,21 @@ impl AppContext { self.push_effect(Effect::Refresh); } + pub fn on_action( + &mut self, + listener: impl Fn(&A, &mut Self) + Send + Sync + 'static, + ) { + self.global_action_listeners + .entry(TypeId::of::()) + .or_default() + .push(Box::new(move |action, phase, cx| { + if phase == DispatchPhase::Bubble { + let action = action.as_any().downcast_ref().unwrap(); + listener(action, cx) + } + })); + } + pub fn register_action_type(&mut self) { self.action_builders.insert(A::qualified_name(), A::build); } @@ -523,6 +544,10 @@ impl AppContext { .ok_or_else(|| anyhow!("no action type registered for {}", name))?; (build)(params) } + + pub fn stop_propagation(&mut self) { + self.propagate_event = false; + } } impl Context for AppContext { @@ -610,6 +635,18 @@ impl MainThread { self.platform().activate(ignoring_other_apps); } + pub fn write_credentials(&self, url: &str, username: &str, password: &[u8]) -> Result<()> { + self.platform().write_credentials(url, username, password) + } + + pub fn read_credentials(&self, url: &str) -> Result)>> { + self.platform().read_credentials(url) + } + + pub fn delete_credentials(&self, url: &str) -> Result<()> { + self.platform().delete_credentials(url) + } + pub fn open_window( &mut self, options: crate::WindowOptions, diff --git a/crates/gpui2/src/app/async_context.rs b/crates/gpui2/src/app/async_context.rs index 13e49de1d3..72ca368963 100644 --- a/crates/gpui2/src/app/async_context.rs +++ b/crates/gpui2/src/app/async_context.rs @@ -1,8 +1,9 @@ use crate::{ - AnyWindowHandle, AppContext, Context, Handle, ModelContext, Result, Task, ViewContext, - WindowContext, + AnyWindowHandle, AppContext, Context, Executor, Handle, MainThread, ModelContext, Result, Task, + ViewContext, WindowContext, }; use anyhow::anyhow; +use derive_more::{Deref, DerefMut}; use parking_lot::Mutex; use std::{future::Future, sync::Weak}; @@ -75,6 +76,15 @@ impl Context for AsyncAppContext { } impl AsyncAppContext { + pub fn executor(&self) -> Result { + let app = self + .0 + .upgrade() + .ok_or_else(|| anyhow!("app was released"))?; + let lock = app.lock(); // Need this to compile + Ok(lock.executor().clone()) + } + pub fn read_window( &self, handle: AnyWindowHandle, @@ -116,10 +126,27 @@ impl AsyncAppContext { let app_context = app.lock(); Ok(app_context.spawn(f)) } + + pub fn run_on_main( + &self, + f: impl FnOnce(&mut MainThread) -> R + Send + 'static, + ) -> Result> + where + R: Send + 'static, + { + let app = self + .0 + .upgrade() + .ok_or_else(|| anyhow!("app was released"))?; + let mut app_context = app.lock(); + Ok(app_context.run_on_main(f)) + } } -#[derive(Clone)] +#[derive(Clone, Deref, DerefMut)] pub struct AsyncWindowContext { + #[deref] + #[deref_mut] app: AsyncAppContext, window: AnyWindowHandle, } diff --git a/crates/gpui2/src/app/entity_map.rs b/crates/gpui2/src/app/entity_map.rs index a756fe0bc8..b62b62705d 100644 --- a/crates/gpui2/src/app/entity_map.rs +++ b/crates/gpui2/src/app/entity_map.rs @@ -4,7 +4,7 @@ use derive_more::{Deref, DerefMut}; use parking_lot::{RwLock, RwLockUpgradableReadGuard}; use slotmap::{SecondaryMap, SlotMap}; use std::{ - any::Any, + any::{Any, TypeId}, marker::PhantomData, mem, sync::{ @@ -70,9 +70,12 @@ impl EntityMap { pub fn weak_handle(&self, id: EntityId) -> WeakHandle { WeakHandle { - id, + any_handle: AnyWeakHandle { + id, + entity_type: TypeId::of::(), + entity_map: Arc::downgrade(&self.0), + }, entity_type: PhantomData, - entity_map: Arc::downgrade(&self.0), } } @@ -112,44 +115,45 @@ impl Drop for Lease { #[derive(Deref, DerefMut)] pub struct Slot(Handle); -pub struct Handle { +pub struct AnyHandle { pub(crate) id: EntityId, - entity_type: PhantomData, + entity_type: TypeId, entity_map: Weak>, } -impl Handle { - fn new(id: EntityId, entity_map: Weak>) -> Self { +impl AnyHandle { + fn new(id: EntityId, entity_type: TypeId, entity_map: Weak>) -> Self { Self { id, - entity_type: PhantomData, + entity_type, entity_map, } } - pub fn downgrade(&self) -> WeakHandle { - WeakHandle { + pub fn downgrade(&self) -> AnyWeakHandle { + AnyWeakHandle { id: self.id, entity_type: self.entity_type, entity_map: self.entity_map.clone(), } } - /// Update the entity referenced by this handle with the given function. - /// - /// The update function receives a context appropriate for its environment. - /// When updating in an `AppContext`, it receives a `ModelContext`. - /// When updating an a `WindowContext`, it receives a `ViewContext`. - pub fn update( - &self, - cx: &mut C, - update: impl FnOnce(&mut T, &mut C::EntityContext<'_, '_, T>) -> R, - ) -> C::Result { - cx.update_entity(self, update) + pub fn downcast(&self) -> Option> + where + T: 'static + Send + Sync, + { + if TypeId::of::() == self.entity_type { + Some(Handle { + any_handle: self.clone(), + entity_type: PhantomData, + }) + } else { + None + } } } -impl Clone for Handle { +impl Clone for AnyHandle { fn clone(&self) -> Self { if let Some(entity_map) = self.entity_map.upgrade() { let entity_map = entity_map.read(); @@ -163,13 +167,13 @@ impl Clone for Handle { Self { id: self.id, - entity_type: PhantomData, + entity_type: self.entity_type, entity_map: self.entity_map.clone(), } } } -impl Drop for Handle { +impl Drop for AnyHandle { fn drop(&mut self) { if let Some(entity_map) = self.entity_map.upgrade() { let entity_map = entity_map.upgradable_read(); @@ -193,36 +197,117 @@ impl Drop for Handle { } } -pub struct WeakHandle { - pub(crate) id: EntityId, - entity_type: PhantomData, - entity_map: Weak>, +impl From> for AnyHandle +where + T: 'static + Send + Sync, +{ + fn from(handle: Handle) -> Self { + handle.any_handle + } } -impl Clone for WeakHandle { +#[derive(Deref, DerefMut)] +pub struct Handle { + #[deref] + #[deref_mut] + any_handle: AnyHandle, + entity_type: PhantomData, +} + +impl Handle { + fn new(id: EntityId, entity_map: Weak>) -> Self { + Self { + any_handle: AnyHandle::new(id, TypeId::of::(), entity_map), + entity_type: PhantomData, + } + } + + pub fn downgrade(&self) -> WeakHandle { + WeakHandle { + any_handle: self.any_handle.downgrade(), + entity_type: self.entity_type, + } + } + + /// Update the entity referenced by this handle with the given function. + /// + /// The update function receives a context appropriate for its environment. + /// When updating in an `AppContext`, it receives a `ModelContext`. + /// When updating an a `WindowContext`, it receives a `ViewContext`. + pub fn update( + &self, + cx: &mut C, + update: impl FnOnce(&mut T, &mut C::EntityContext<'_, '_, T>) -> R, + ) -> C::Result { + cx.update_entity(self, update) + } +} + +impl Clone for Handle { fn clone(&self) -> Self { Self { - id: self.id, + any_handle: self.any_handle.clone(), entity_type: self.entity_type, - entity_map: self.entity_map.clone(), } } } -impl WeakHandle { - pub fn upgrade(&self, _: &impl Context) -> Option> { +#[derive(Clone)] +pub struct AnyWeakHandle { + pub(crate) id: EntityId, + entity_type: TypeId, + entity_map: Weak>, +} + +impl AnyWeakHandle { + pub fn upgrade(&self) -> Option { let entity_map = &self.entity_map.upgrade()?; entity_map .read() .ref_counts .get(self.id)? .fetch_add(1, SeqCst); - Some(Handle { + Some(AnyHandle { id: self.id, entity_type: self.entity_type, entity_map: self.entity_map.clone(), }) } +} + +impl From> for AnyWeakHandle +where + T: 'static + Send + Sync, +{ + fn from(handle: WeakHandle) -> Self { + handle.any_handle + } +} + +#[derive(Deref, DerefMut)] +pub struct WeakHandle { + #[deref] + #[deref_mut] + any_handle: AnyWeakHandle, + entity_type: PhantomData, +} + +impl Clone for WeakHandle { + fn clone(&self) -> Self { + Self { + any_handle: self.any_handle.clone(), + entity_type: self.entity_type, + } + } +} + +impl WeakHandle { + pub fn upgrade(&self) -> Option> { + Some(Handle { + any_handle: self.any_handle.upgrade()?, + entity_type: self.entity_type, + }) + } /// Update the entity referenced by this handle with the given function if /// the referenced entity still exists. Returns an error if the entity has @@ -240,7 +325,7 @@ impl WeakHandle { Result>: crate::Flatten, { crate::Flatten::flatten( - self.upgrade(cx) + self.upgrade() .ok_or_else(|| anyhow!("entity release")) .map(|this| cx.update_entity(&this, update)), ) diff --git a/crates/gpui2/src/app/model_context.rs b/crates/gpui2/src/app/model_context.rs index 7318ecdc65..65ac1fd03e 100644 --- a/crates/gpui2/src/app/model_context.rs +++ b/crates/gpui2/src/app/model_context.rs @@ -1,5 +1,5 @@ use crate::{ - AppContext, Context, Effect, EntityId, EventEmitter, Handle, Reference, Subscription, + AppContext, Context, Effect, EntityId, EventEmitter, Executor, Handle, Reference, Subscription, WeakHandle, }; use std::marker::PhantomData; @@ -51,7 +51,7 @@ impl<'a, T: Send + Sync + 'static> ModelContext<'a, T> { self.app.observers.insert( handle.id, Box::new(move |cx| { - if let Some((this, handle)) = this.upgrade(cx).zip(handle.upgrade(cx)) { + if let Some((this, handle)) = this.upgrade().zip(handle.upgrade()) { this.update(cx, |this, cx| on_notify(this, handle, cx)); true } else { @@ -75,7 +75,7 @@ impl<'a, T: Send + Sync + 'static> ModelContext<'a, T> { handle.id, Box::new(move |event, cx| { let event = event.downcast_ref().expect("invalid event type"); - if let Some((this, handle)) = this.upgrade(cx).zip(handle.upgrade(cx)) { + if let Some((this, handle)) = this.upgrade().zip(handle.upgrade()) { this.update(cx, |this, cx| on_event(this, handle, event, cx)); true } else { @@ -108,7 +108,7 @@ impl<'a, T: Send + Sync + 'static> ModelContext<'a, T> { handle.id, Box::new(move |entity, cx| { let entity = entity.downcast_mut().expect("invalid entity type"); - if let Some(this) = this.upgrade(cx) { + if let Some(this) = this.upgrade() { this.update(cx, |this, cx| on_release(this, entity, cx)); } }), diff --git a/crates/gpui2/src/executor.rs b/crates/gpui2/src/executor.rs index 43a1afc3cc..c59c7d7b94 100644 --- a/crates/gpui2/src/executor.rs +++ b/crates/gpui2/src/executor.rs @@ -8,6 +8,7 @@ use std::{ pin::Pin, sync::Arc, task::{Context, Poll}, + time::Duration, }; use util::TryFutureExt; @@ -151,6 +152,11 @@ impl Executor { } } + pub fn timer(&self, duration: Duration) -> smol::Timer { + // todo!("integrate with deterministic dispatcher") + smol::Timer::after(duration) + } + pub fn is_main_thread(&self) -> bool { self.dispatcher.is_main_thread() } diff --git a/crates/gpui2/src/window.rs b/crates/gpui2/src/window.rs index 997513a5be..94e8d5a916 100644 --- a/crates/gpui2/src/window.rs +++ b/crates/gpui2/src/window.rs @@ -1,13 +1,13 @@ use crate::{ px, size, Action, AnyBox, AnyView, AppContext, AsyncWindowContext, AvailableSpace, BorrowAppContext, Bounds, BoxShadow, Context, Corners, DevicePixels, DispatchContext, - DisplayId, Edges, Effect, Element, EntityId, EventEmitter, FocusEvent, FontId, GlobalElementId, - GlyphId, Handle, Hsla, ImageData, InputEvent, IsZero, KeyListener, KeyMatch, KeyMatcher, - Keystroke, LayoutId, MainThread, MainThreadOnly, MonochromeSprite, MouseMoveEvent, Path, - Pixels, Platform, PlatformAtlas, PlatformWindow, Point, PolychromeSprite, Quad, Reference, - RenderGlyphParams, RenderImageParams, RenderSvgParams, ScaledPixels, SceneBuilder, Shadow, - SharedString, Size, Style, Subscription, TaffyLayoutEngine, Task, Underline, UnderlineStyle, - WeakHandle, WindowOptions, SUBPIXEL_VARIANTS, + DisplayId, Edges, Effect, Element, EntityId, EventEmitter, Executor, FocusEvent, FontId, + GlobalElementId, GlyphId, Handle, Hsla, ImageData, InputEvent, IsZero, KeyListener, KeyMatch, + KeyMatcher, Keystroke, LayoutId, MainThread, MainThreadOnly, MonochromeSprite, MouseMoveEvent, + Path, Pixels, Platform, PlatformAtlas, PlatformWindow, Point, PolychromeSprite, Quad, + Reference, RenderGlyphParams, RenderImageParams, RenderSvgParams, ScaledPixels, SceneBuilder, + Shadow, SharedString, Size, Style, Subscription, TaffyLayoutEngine, Task, Underline, + UnderlineStyle, WeakHandle, WindowOptions, SUBPIXEL_VARIANTS, }; use anyhow::Result; use collections::HashMap; @@ -167,7 +167,6 @@ pub struct Window { focus_parents_by_child: HashMap, pub(crate) focus_listeners: Vec, pub(crate) focus_handles: Arc>>, - propagate: bool, default_prevented: bool, mouse_position: Point, scale_factor: f32, @@ -243,7 +242,6 @@ impl Window { focus_parents_by_child: HashMap::default(), focus_listeners: Vec::new(), focus_handles: Arc::new(RwLock::new(SlotMap::with_key())), - propagate: true, default_prevented: true, mouse_position, scale_factor, @@ -302,6 +300,10 @@ impl<'a, 'w> WindowContext<'a, 'w> { } } + pub fn window_handle(&self) -> AnyWindowHandle { + self.window.handle + } + pub fn notify(&mut self) { self.window.dirty = true; } @@ -477,10 +479,6 @@ impl<'a, 'w> WindowContext<'a, 'w> { .to_pixels(text_style.font_size.into(), rem_size) } - pub fn stop_propagation(&mut self) { - self.window.propagate = false; - } - pub fn prevent_default(&mut self) { self.window.default_prevented = true; } @@ -878,7 +876,7 @@ impl<'a, 'w> WindowContext<'a, 'w> { } // Handlers may set this to false by calling `stop_propagation` - self.window.propagate = true; + self.app.propagate_event = true; self.window.default_prevented = false; if let Some(mut handlers) = self @@ -893,16 +891,16 @@ impl<'a, 'w> WindowContext<'a, 'w> { // special purposes, such as detecting events outside of a given Bounds. for (_, handler) in &handlers { handler(any_mouse_event, DispatchPhase::Capture, self); - if !self.window.propagate { + if !self.app.propagate_event { break; } } // Bubble phase, where most normal handlers do their work. - if self.window.propagate { + if self.app.propagate_event { for (_, handler) in handlers.iter().rev() { handler(any_mouse_event, DispatchPhase::Bubble, self); - if !self.window.propagate { + if !self.app.propagate_event { break; } } @@ -940,7 +938,7 @@ impl<'a, 'w> WindowContext<'a, 'w> { ) { self.dispatch_action(action, &key_dispatch_stack[..ix]); } - if !self.window.propagate { + if !self.app.propagate_event { break; } } @@ -951,7 +949,7 @@ impl<'a, 'w> WindowContext<'a, 'w> { } } - if self.window.propagate { + if self.app.propagate_event { for (ix, frame) in key_dispatch_stack.iter().enumerate().rev() { match frame { KeyDispatchStackFrame::Listener { @@ -968,7 +966,7 @@ impl<'a, 'w> WindowContext<'a, 'w> { self.dispatch_action(action, &key_dispatch_stack[..ix]); } - if !self.window.propagate { + if !self.app.propagate_event { break; } } @@ -1015,22 +1013,41 @@ impl<'a, 'w> WindowContext<'a, 'w> { dispatch_stack: &[KeyDispatchStackFrame], ) { let action_type = action.as_any().type_id(); - for stack_frame in dispatch_stack { - if let KeyDispatchStackFrame::Listener { - event_type, - listener, - } = stack_frame - { - if action_type == *event_type { - listener(action.as_any(), &[], DispatchPhase::Capture, self); - if !self.window.propagate { - break; + + if let Some(mut global_listeners) = self.app.global_action_listeners.remove(&action_type) { + for listener in &global_listeners { + listener(action.as_ref(), DispatchPhase::Capture, self); + if !self.app.propagate_event { + break; + } + } + global_listeners.extend( + self.global_action_listeners + .remove(&action_type) + .unwrap_or_default(), + ); + self.global_action_listeners + .insert(action_type, global_listeners); + } + + if self.app.propagate_event { + for stack_frame in dispatch_stack { + if let KeyDispatchStackFrame::Listener { + event_type, + listener, + } = stack_frame + { + if action_type == *event_type { + listener(action.as_any(), &[], DispatchPhase::Capture, self); + if !self.app.propagate_event { + break; + } } } } } - if self.window.propagate { + if self.app.propagate_event { for stack_frame in dispatch_stack.iter().rev() { if let KeyDispatchStackFrame::Listener { event_type, @@ -1039,13 +1056,33 @@ impl<'a, 'w> WindowContext<'a, 'w> { { if action_type == *event_type { listener(action.as_any(), &[], DispatchPhase::Bubble, self); - if !self.window.propagate { + if !self.app.propagate_event { break; } } } } } + + if self.app.propagate_event { + if let Some(mut global_listeners) = + self.app.global_action_listeners.remove(&action_type) + { + for listener in global_listeners.iter().rev() { + listener(action.as_ref(), DispatchPhase::Bubble, self); + if !self.app.propagate_event { + break; + } + } + global_listeners.extend( + self.global_action_listeners + .remove(&action_type) + .unwrap_or_default(), + ); + self.global_action_listeners + .insert(action_type, global_listeners); + } + } } } @@ -1313,7 +1350,7 @@ impl<'a, 'w, V: Send + Sync + 'static> ViewContext<'a, 'w, V> { handle.id, Box::new(move |cx| { cx.update_window(window_handle.id, |cx| { - if let Some(handle) = handle.upgrade(cx) { + if let Some(handle) = handle.upgrade() { this.update(cx, |this, cx| on_notify(this, handle, cx)) .is_ok() } else { @@ -1340,7 +1377,7 @@ impl<'a, 'w, V: Send + Sync + 'static> ViewContext<'a, 'w, V> { handle.id, Box::new(move |event, cx| { cx.update_window(window_handle.id, |cx| { - if let Some(handle) = handle.upgrade(cx) { + if let Some(handle) = handle.upgrade() { let event = event.downcast_ref().expect("invalid event type"); this.update(cx, |this, cx| on_event(this, handle, event, cx)) .is_ok() @@ -1504,7 +1541,7 @@ impl<'a, 'w, V: Send + Sync + 'static> ViewContext<'a, 'w, V> { let cx = unsafe { mem::transmute::<&mut Self, &mut MainThread>(self) }; Task::ready(Ok(f(view, cx))) } else { - let handle = self.handle().upgrade(self).unwrap(); + let handle = self.handle().upgrade().unwrap(); self.window_cx.run_on_main(move |cx| handle.update(cx, f)) } } @@ -1528,7 +1565,7 @@ impl<'a, 'w, V: Send + Sync + 'static> ViewContext<'a, 'w, V> { &mut self, handler: impl Fn(&mut V, &Event, DispatchPhase, &mut ViewContext) + Send + Sync + 'static, ) { - let handle = self.handle().upgrade(self).unwrap(); + let handle = self.handle().upgrade().unwrap(); self.window_cx.on_mouse_event(move |event, phase, cx| { handle.update(cx, |view, cx| { handler(view, event, phase, cx);