diff --git a/crates/client/src/channel_store.rs b/crates/client/src/channel_store.rs index 8217e6cbc8..e2c18a63a9 100644 --- a/crates/client/src/channel_store.rs +++ b/crates/client/src/channel_store.rs @@ -4,11 +4,13 @@ use anyhow::anyhow; use anyhow::Result; use collections::HashMap; use collections::HashSet; +use futures::channel::mpsc; use futures::Future; use futures::StreamExt; use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task}; use rpc::{proto, TypedEnvelope}; use std::sync::Arc; +use util::ResultExt; pub type ChannelId = u64; pub type UserId = u64; @@ -20,10 +22,12 @@ pub struct ChannelStore { channel_participants: HashMap>>, channels_with_admin_privileges: HashSet, outgoing_invites: HashSet<(ChannelId, UserId)>, + update_channels_tx: mpsc::UnboundedSender, client: Arc, user_store: ModelHandle, _rpc_subscription: Subscription, _watch_connection_status: Task<()>, + _update_channels: Task<()>, } #[derive(Clone, Debug, PartialEq)] @@ -62,6 +66,7 @@ impl ChannelStore { let rpc_subscription = client.add_message_handler(cx.handle(), Self::handle_update_channels); + let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded(); let mut connection_status = client.status(); let watch_connection_status = cx.spawn_weak(|this, mut cx| async move { while let Some(status) = connection_status.next().await { @@ -89,10 +94,23 @@ impl ChannelStore { channel_participants: Default::default(), channels_with_admin_privileges: Default::default(), outgoing_invites: Default::default(), + update_channels_tx, client, user_store, _rpc_subscription: rpc_subscription, _watch_connection_status: watch_connection_status, + _update_channels: cx.spawn_weak(|this, mut cx| async move { + while let Some(update_channels) = update_channels_rx.next().await { + if let Some(this) = this.upgrade(&cx) { + let update_task = this.update(&mut cx, |this, cx| { + this.update_channels(update_channels, cx) + }); + if let Some(update_task) = update_task { + update_task.await.log_err(); + } + } + } + }), } } @@ -159,13 +177,14 @@ impl ChannelStore { let channel_id = channel.id; this.update(&mut cx, |this, cx| { - this.update_channels( + let task = this.update_channels( proto::UpdateChannels { channels: vec![channel], ..Default::default() }, cx, ); + assert!(task.is_none()); // This event is emitted because the collab panel wants to clear the pending edit state // before this frame is rendered. But we can't guarantee that the collab panel's future @@ -287,13 +306,14 @@ impl ChannelStore { .channel .ok_or_else(|| anyhow!("missing channel in response"))?; this.update(&mut cx, |this, cx| { - this.update_channels( + let task = this.update_channels( proto::UpdateChannels { channels: vec![channel], ..Default::default() }, cx, ); + assert!(task.is_none()); // This event is emitted because the collab panel wants to clear the pending edit state // before this frame is rendered. But we can't guarantee that the collab panel's future @@ -375,8 +395,10 @@ impl ChannelStore { _: Arc, mut cx: AsyncAppContext, ) -> Result<()> { - this.update(&mut cx, |this, cx| { - this.update_channels(message.payload, cx); + this.update(&mut cx, |this, _| { + this.update_channels_tx + .unbounded_send(message.payload) + .unwrap(); }); Ok(()) } @@ -385,7 +407,7 @@ impl ChannelStore { &mut self, payload: proto::UpdateChannels, cx: &mut ModelContext, - ) { + ) -> Option>> { if !payload.remove_channel_invitations.is_empty() { self.channel_invitations .retain(|channel| !payload.remove_channel_invitations.contains(&channel.id)); @@ -470,6 +492,11 @@ impl ChannelStore { } } + cx.notify(); + if payload.channel_participants.is_empty() { + return None; + } + let mut all_user_ids = Vec::new(); let channel_participants = payload.channel_participants; for entry in &channel_participants { @@ -480,11 +507,10 @@ impl ChannelStore { } } - // TODO: Race condition if an update channels messages comes in while resolving avatars let users = self .user_store .update(cx, |user_store, cx| user_store.get_users(all_user_ids, cx)); - cx.spawn(|this, mut cx| async move { + Some(cx.spawn(|this, mut cx| async move { let users = users.await?; this.update(&mut cx, |this, cx| { @@ -509,10 +535,7 @@ impl ChannelStore { cx.notify(); }); anyhow::Ok(()) - }) - .detach(); - - cx.notify(); + })) } fn channel_path_sorting_key<'a>( diff --git a/crates/client/src/channel_store_tests.rs b/crates/client/src/channel_store_tests.rs index 3a3f3842eb..51e819349e 100644 --- a/crates/client/src/channel_store_tests.rs +++ b/crates/client/src/channel_store_tests.rs @@ -139,7 +139,8 @@ fn update_channels( message: proto::UpdateChannels, cx: &mut AppContext, ) { - channel_store.update(cx, |store, cx| store.update_channels(message, cx)); + let task = channel_store.update(cx, |store, cx| store.update_channels(message, cx)); + assert!(task.is_none()); } #[track_caller] diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index f9f2d4a2e2..2396085a01 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -1156,7 +1156,6 @@ async fn rejoin_room( channel_members = mem::take(&mut rejoined_room.channel_members); } - //TODO: move this into the room guard if let Some(channel_id) = channel_id { channel_updated( channel_id, @@ -2453,9 +2452,6 @@ async fn join_channel( joined_room.clone() }; - // TODO - do this while still holding the room guard, - // currently there's a possible race condition if someone joins the channel - // after we've dropped the lock but before we finish sending these updates channel_updated( channel_id, &joined_room.room, @@ -2748,7 +2744,6 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { return Ok(()); } - // TODO - do this while holding the room guard. if let Some(channel_id) = channel_id { channel_updated( channel_id, diff --git a/crates/collab/src/tests/channel_tests.rs b/crates/collab/src/tests/channel_tests.rs index f1157ce7ae..d4cf6423f0 100644 --- a/crates/collab/src/tests/channel_tests.rs +++ b/crates/collab/src/tests/channel_tests.rs @@ -290,6 +290,7 @@ async fn test_core_channels( ); } +#[track_caller] fn assert_participants_eq(participants: &[Arc], expected_partitipants: &[u64]) { assert_eq!( participants.iter().map(|p| p.id).collect::>(), @@ -297,6 +298,7 @@ fn assert_participants_eq(participants: &[Arc], expected_partitipants: &[u ); } +#[track_caller] fn assert_members_eq( members: &[ChannelMembership], expected_members: &[(u64, bool, proto::channel_member::Kind)], diff --git a/crates/collab_ui/src/collab_panel.rs b/crates/collab_ui/src/collab_panel.rs index e4838df939..eaa3560b9b 100644 --- a/crates/collab_ui/src/collab_panel.rs +++ b/crates/collab_ui/src/collab_panel.rs @@ -2037,8 +2037,6 @@ impl CollabPanel { self.show_channel_modal(action.channel_id, channel_modal::Mode::ManageMembers, cx); } - // TODO: Make join into a toggle - // TODO: Make enter work on channel editor fn remove(&mut self, _: &Remove, cx: &mut ViewContext) { if let Some(channel) = self.selected_channel() { self.remove_channel(channel.id, cx)