diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 2543697bc0..5419e00f02 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -18,7 +18,7 @@ use live_kit_client::{ LocalAudioTrack, LocalTrackPublication, LocalVideoTrack, RemoteAudioTrackUpdate, RemoteVideoTrackUpdate, }; -use postage::stream::Stream; +use postage::{sink::Sink, stream::Stream, watch}; use project::Project; use std::{future::Future, mem, pin::Pin, sync::Arc, time::Duration}; use util::{post_inc, ResultExt, TryFutureExt}; @@ -70,6 +70,8 @@ pub struct Room { user_store: ModelHandle, follows_by_leader_id_project_id: HashMap<(PeerId, u64), Vec>, subscriptions: Vec, + room_update_completed_tx: watch::Sender>, + room_update_completed_rx: watch::Receiver>, pending_room_update: Option>, maintain_connection: Option>>, } @@ -211,6 +213,8 @@ impl Room { Audio::play_sound(Sound::Joined, cx); + let (room_update_completed_tx, room_update_completed_rx) = watch::channel(); + Self { id, channel_id, @@ -230,6 +234,8 @@ impl Room { user_store, follows_by_leader_id_project_id: Default::default(), maintain_connection: Some(maintain_connection), + room_update_completed_tx, + room_update_completed_rx, } } @@ -856,6 +862,7 @@ impl Room { }); this.check_invariants(); + this.room_update_completed_tx.try_send(Some(())).ok(); cx.notify(); }); })); @@ -864,6 +871,17 @@ impl Room { Ok(()) } + pub fn next_room_update(&mut self) -> impl Future { + let mut done_rx = self.room_update_completed_rx.clone(); + async move { + while let Some(result) = done_rx.next().await { + if result.is_some() { + break; + } + } + } + } + fn remote_video_track_updated( &mut self, change: RemoteVideoTrackUpdate, diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 5767ac54b7..62cd60c55f 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -798,7 +798,8 @@ impl Client { } } } - _ = status_rx.next().fuse() => { + status = status_rx.next().fuse() => { + dbg!(status); return Err(anyhow!("authentication canceled")); } } diff --git a/crates/workspace/src/workspace.rs b/crates/workspace/src/workspace.rs index 1002ae29dc..a124917309 100644 --- a/crates/workspace/src/workspace.rs +++ b/crates/workspace/src/workspace.rs @@ -22,7 +22,7 @@ use drag_and_drop::DragAndDrop; use futures::{ channel::{mpsc, oneshot}, future::try_join_all, - select_biased, FutureExt, StreamExt, + FutureExt, StreamExt, }; use gpui::{ actions, @@ -36,9 +36,9 @@ use gpui::{ CursorStyle, ModifiersChangedEvent, MouseButton, PathPromptOptions, Platform, PromptLevel, WindowBounds, WindowOptions, }, - AnyModelHandle, AnyViewHandle, AnyWeakViewHandle, AppContext, AsyncAppContext, Entity, - ModelContext, ModelHandle, SizeConstraint, Subscription, Task, View, ViewContext, ViewHandle, - WeakViewHandle, WindowContext, WindowHandle, + AnyModelHandle, AnyViewHandle, AnyWeakViewHandle, AnyWindowHandle, AppContext, AsyncAppContext, + Entity, ModelContext, ModelHandle, SizeConstraint, Subscription, Task, View, ViewContext, + ViewHandle, WeakViewHandle, WindowContext, WindowHandle, }; use item::{FollowableItem, FollowableItemHandle, Item, ItemHandle, ProjectItem}; use itertools::Itertools; @@ -4156,7 +4156,7 @@ pub async fn last_opened_workspace_paths() -> Option { async fn join_channel_internal( channel_id: u64, - app_state: Arc, + app_state: &Arc, requesting_window: Option>, active_call: &ModelHandle, cx: &mut AsyncAppContext, @@ -4196,33 +4196,24 @@ async fn join_channel_internal( let client = cx.read(|cx| active_call.read(cx).client()); - let mut timer = cx.background().timer(Duration::from_secs(5)).fuse(); let mut client_status = client.status(); + // this loop will terminate within client::CONNECTION_TIMEOUT seconds. 'outer: loop { - select_biased! { - _ = timer => { - return Err(anyhow!("connecting timed out")) - }, - status = client_status.recv().fuse() => { - let Some(status) = status else { - return Err(anyhow!("unexpected error reading connection status")) - }; + let Some(status) = client_status.recv().await else { + return Err(anyhow!("error connecting")); + }; - match status { - Status::Connecting | Status::Authenticating | Status::Reconnecting | Status::Reauthenticating => continue, - Status::Connected { .. } => break 'outer, - Status::SignedOut => { - if client.has_keychain_credentials(&cx) { - client.authenticate_and_connect(true, &cx).await?; - timer = cx.background().timer(Duration::from_secs(5)).fuse(); - } else { - return Err(anyhow!("not signed in")) - } - }, - Status::UpgradeRequired => return Err(anyhow!("zed is out of date")), - | Status::ConnectionError | Status::ConnectionLost | Status::ReconnectionError { .. } => return Err(anyhow!("zed is offline")) - } + match status { + Status::Connecting + | Status::Authenticating + | Status::Reconnecting + | Status::Reauthenticating => continue, + Status::Connected { .. } => break 'outer, + Status::SignedOut => return Err(anyhow!("not signed in")), + Status::UpgradeRequired => return Err(anyhow!("zed is out of date")), + Status::ConnectionError | Status::ConnectionLost | Status::ReconnectionError { .. } => { + return Err(anyhow!("zed is offline")) } } } @@ -4233,6 +4224,8 @@ async fn join_channel_internal( }) .await?; + room.update(cx, |room, cx| room.next_room_update()).await; + let task = room.update(cx, |room, cx| { if let Some((project, host)) = room.most_active_project() { return Some(join_remote_project(project, host, app_state.clone(), cx)); @@ -4255,10 +4248,10 @@ pub fn join_channel( cx: &mut AppContext, ) -> Task> { let active_call = ActiveCall::global(cx); - cx.spawn(|mut cx| { + cx.spawn(|mut cx| async move { let result = join_channel_internal( channel_id, - app_state, + &app_state, requesting_window, &active_call, &mut cx, @@ -4266,7 +4259,7 @@ pub fn join_channel( .await; // join channel succeeded, and opened a window - if Some(true) = result { + if matches!(result, Ok(true)) { return anyhow::Ok(()); } @@ -4275,28 +4268,53 @@ pub fn join_channel( } // find an existing workspace to focus and show call controls - for window in cx.windows() { - let found = window.update(&mut cx, |cx| { - let is_workspace = cx.root_view().clone().downcast::().is_some(); - if is_workspace { - cx.activate_window(); - } - is_workspace - }); + let mut active_window = activate_any_workspace_window(&mut cx); + if active_window.is_none() { + // no open workspaces, make one to show the error in (blergh) + cx.update(|cx| Workspace::new_local(vec![], app_state.clone(), requesting_window, cx)) + .await; + } - if found.unwrap_or(false) { - return anyhow::Ok(()); + active_window = activate_any_workspace_window(&mut cx); + if active_window.is_none() { + return result.map(|_| ()); // unreachable!() assuming new_local always opens a window + } + + if let Err(err) = result { + let prompt = active_window.unwrap().prompt( + PromptLevel::Critical, + &format!("Failed to join channel: {}", err), + &["Ok"], + &mut cx, + ); + if let Some(mut prompt) = prompt { + prompt.next().await; + } else { + return Err(err); } } - // no open workspaces - cx.update(|cx| Workspace::new_local(vec![], app_state.clone(), requesting_window, cx)) - .await; - - return connected.map(|_| ()); + // return ok, we showed the error to the user. + return anyhow::Ok(()); }) } +pub fn activate_any_workspace_window(cx: &mut AsyncAppContext) -> Option { + for window in cx.windows() { + let found = window.update(cx, |cx| { + let is_workspace = cx.root_view().clone().downcast::().is_some(); + if is_workspace { + cx.activate_window(); + } + is_workspace + }); + if found == Some(true) { + return Some(window); + } + } + None +} + #[allow(clippy::type_complexity)] pub fn open_paths( abs_paths: &[PathBuf], diff --git a/crates/zed/src/main.rs b/crates/zed/src/main.rs index 861121a1cf..52aaf639ea 100644 --- a/crates/zed/src/main.rs +++ b/crates/zed/src/main.rs @@ -8,7 +8,9 @@ use cli::{ ipc::{self, IpcSender}, CliRequest, CliResponse, IpcHandshake, FORCE_CLI_MODE_ENV_VAR_NAME, }; -use client::{self, TelemetrySettings, UserStore, ZED_APP_VERSION, ZED_SECRET_CLIENT_TOKEN}; +use client::{ + self, Client, TelemetrySettings, UserStore, ZED_APP_VERSION, ZED_SECRET_CLIENT_TOKEN, +}; use db::kvp::KEY_VALUE_STORE; use editor::{scroll::autoscroll::Autoscroll, Editor}; use futures::{ @@ -33,7 +35,7 @@ use std::{ fs::OpenOptions, io::{IsTerminal, Write as _}, panic, - path::{Path, PathBuf}, + path::Path, sync::{ atomic::{AtomicU32, Ordering}, Arc, Weak, @@ -222,6 +224,8 @@ fn main() { } } + let mut triggered_authentication = false; + match open_rx.try_next() { Ok(Some(OpenRequest::Paths { paths })) => { cx.update(|cx| workspace::open_paths(&paths, &app_state, None, cx)) @@ -231,9 +235,18 @@ fn main() { cx.spawn(|cx| handle_cli_connection(connection, app_state.clone(), cx)) .detach(); } - Ok(Some(OpenRequest::JoinChannel { channel_id })) => cx - .update(|cx| workspace::join_channel(channel_id, app_state.clone(), None, cx)) - .detach_and_log_err(cx), + Ok(Some(OpenRequest::JoinChannel { channel_id })) => { + triggered_authentication = true; + let app_state = app_state.clone(); + let client = client.clone(); + cx.spawn(|mut cx| async move { + // ignore errors here, we'll show a generic "not signed in" + let _ = authenticate(client, &cx).await; + cx.update(|cx| workspace::join_channel(channel_id, app_state, None, cx)) + .await + }) + .detach_and_log_err(cx) + } Ok(None) | Err(_) => cx .spawn({ let app_state = app_state.clone(); @@ -266,20 +279,24 @@ fn main() { }) .detach(); - cx.spawn(|cx| async move { - if stdout_is_a_pty() { - if client::IMPERSONATE_LOGIN.is_some() { - client.authenticate_and_connect(false, &cx).await?; - } - } else if client.has_keychain_credentials(&cx) { - client.authenticate_and_connect(true, &cx).await?; - } - Ok::<_, anyhow::Error>(()) - }) - .detach_and_log_err(cx); + if !triggered_authentication { + cx.spawn(|cx| async move { authenticate(client, &cx).await }) + .detach_and_log_err(cx); + } }); } +async fn authenticate(client: Arc, cx: &AsyncAppContext) -> Result<()> { + if stdout_is_a_pty() { + if client::IMPERSONATE_LOGIN.is_some() { + client.authenticate_and_connect(false, &cx).await?; + } + } else if client.has_keychain_credentials(&cx) { + client.authenticate_and_connect(true, &cx).await?; + } + Ok::<_, anyhow::Error>(()) +} + async fn installation_id() -> Result { let legacy_key_name = "device_id"; diff --git a/crates/zed/src/open_url.rs b/crates/zed/src/open_url.rs index 6f90953de2..3e4902b978 100644 --- a/crates/zed/src/open_url.rs +++ b/crates/zed/src/open_url.rs @@ -42,7 +42,6 @@ impl OpenListener { pub fn open_urls(&self, urls: Vec) { self.triggered.store(true, Ordering::Release); - dbg!(&urls); let request = if let Some(server_name) = urls.first().and_then(|url| url.strip_prefix("zed-cli://")) {