diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index 417f486b9e..f0f66f4839 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -44,6 +44,7 @@ pub struct Channel { pub id: ChannelId, pub name: String, pub has_note_changed: bool, + pub has_new_messages: bool, } #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)] @@ -223,6 +224,13 @@ impl ChannelStore { .map(|channel| channel.has_note_changed) } + pub fn has_new_messages(&self, channel_id: ChannelId) -> Option { + self.channel_index + .by_id() + .get(&channel_id) + .map(|channel| channel.has_new_messages) + } + pub fn open_channel_chat( &mut self, channel_id: ChannelId, @@ -230,12 +238,20 @@ impl ChannelStore { ) -> Task>> { let client = self.client.clone(); let user_store = self.user_store.clone(); - self.open_channel_resource( + let open_channel_chat = self.open_channel_resource( channel_id, |this| &mut this.opened_chats, |channel, cx| ChannelChat::new(channel, user_store, client, cx), cx, - ) + ); + cx.spawn(|this, mut cx| async move { + let chat = open_channel_chat.await?; + this.update(&mut cx, |this, cx| { + this.channel_index.clear_message_changed(channel_id); + cx.notify(); + }); + Ok(chat) + }) } /// Asynchronously open a given resource associated with a channel. @@ -796,6 +812,7 @@ impl ChannelStore { id: channel.id, name: channel.name, has_note_changed: false, + has_new_messages: false, }), ), } @@ -805,7 +822,8 @@ impl ChannelStore { || !payload.delete_channels.is_empty() || !payload.insert_edge.is_empty() || !payload.delete_edge.is_empty() - || !payload.notes_changed.is_empty(); + || !payload.notes_changed.is_empty() + || !payload.new_messages.is_empty(); if channels_changed { if !payload.delete_channels.is_empty() { @@ -836,6 +854,10 @@ impl ChannelStore { index.note_changed(id_changed); } + for id_changed in payload.new_messages { + index.new_messages(id_changed); + } + for edge in payload.insert_edge { index.insert_edge(edge.channel_id, edge.parent_id); } diff --git a/crates/channel/src/channel_store/channel_index.rs b/crates/channel/src/channel_store/channel_index.rs index 42474f1d85..513e20e3a7 100644 --- a/crates/channel/src/channel_store/channel_index.rs +++ b/crates/channel/src/channel_store/channel_index.rs @@ -44,6 +44,12 @@ impl ChannelIndex { Arc::make_mut(channel).has_note_changed = false; } } + + pub fn clear_message_changed(&mut self, channel_id: ChannelId) { + if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { + Arc::make_mut(channel).has_new_messages = false; + } + } } impl Deref for ChannelIndex { @@ -88,6 +94,12 @@ impl<'a> ChannelPathsInsertGuard<'a> { } } + pub fn new_messages(&mut self, channel_id: ChannelId) { + if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { + Arc::make_mut(channel).has_new_messages = true; + } + } + pub fn insert(&mut self, channel_proto: proto::Channel) { if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) { Arc::make_mut(existing_channel).name = channel_proto.name; @@ -98,6 +110,7 @@ impl<'a> ChannelPathsInsertGuard<'a> { id: channel_proto.id, name: channel_proto.name, has_note_changed: false, + has_new_messages: false, }), ); self.insert_root(channel_proto.id); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index f0896f8732..8f7f9cc975 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -436,8 +436,9 @@ pub struct Channel { pub struct ChannelsForUser { pub channels: ChannelGraph, pub channel_participants: HashMap>, - pub channels_with_changed_notes: HashSet, pub channels_with_admin_privileges: HashSet, + pub channels_with_changed_notes: HashSet, + pub channels_with_new_messages: HashSet, } #[derive(Debug)] diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index 6274550c25..ea9f64fe5e 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -464,10 +464,14 @@ impl Database { } let mut channels_with_changed_notes = HashSet::default(); + let mut channels_with_new_messages = HashSet::default(); for channel in graph.channels.iter() { if self.has_note_changed(user_id, channel.id, tx).await? { channels_with_changed_notes.insert(channel.id); } + if self.has_new_message(channel.id, user_id, tx).await? { + channels_with_new_messages.insert(channel.id); + } } Ok(ChannelsForUser { @@ -475,6 +479,7 @@ impl Database { channel_participants, channels_with_admin_privileges, channels_with_changed_notes, + channels_with_new_messages, }) } diff --git a/crates/collab/src/db/queries/messages.rs b/crates/collab/src/db/queries/messages.rs index 328737dd0a..8e3c92d916 100644 --- a/crates/collab/src/db/queries/messages.rs +++ b/crates/collab/src/db/queries/messages.rs @@ -97,7 +97,7 @@ impl Database { let mut messages = Vec::new(); while let Some(row) = rows.next().await { let row = row?; - dbg!(&max_id); + max_assign(&mut max_id, row.id); let nonce = row.nonce.as_u64_pair(); @@ -113,23 +113,18 @@ impl Database { }); } drop(rows); - dbg!(&max_id); if let Some(max_id) = max_id { - let has_older_message = dbg!( - observed_channel_messages::Entity::find() - .filter( - observed_channel_messages::Column::UserId - .eq(user_id) - .and(observed_channel_messages::Column::ChannelId.eq(channel_id)) - .and( - observed_channel_messages::Column::ChannelMessageId.lt(max_id) - ), - ) - .one(&*tx) - .await - )? - .is_some(); + let has_older_message = observed_channel_messages::Entity::find() + .filter( + observed_channel_messages::Column::UserId + .eq(user_id) + .and(observed_channel_messages::Column::ChannelId.eq(channel_id)) + .and(observed_channel_messages::Column::ChannelMessageId.lt(max_id)), + ) + .one(&*tx) + .await? + .is_some(); if has_older_message { observed_channel_messages::Entity::update( @@ -174,7 +169,7 @@ impl Database { body: &str, timestamp: OffsetDateTime, nonce: u128, - ) -> Result<(MessageId, Vec)> { + ) -> Result<(MessageId, Vec, Vec)> { self.transaction(|tx| async move { let mut rows = channel_chat_participant::Entity::find() .filter(channel_chat_participant::Column::ChannelId.eq(channel_id)) @@ -241,7 +236,14 @@ impl Database { .exec(&*tx) .await?; - Ok((message.last_insert_id, participant_connection_ids)) + let mut channel_members = self.get_channel_members_internal(channel_id, &*tx).await?; + channel_members.retain(|member| !participant_user_ids.contains(member)); + + Ok(( + message.last_insert_id, + participant_connection_ids, + channel_members, + )) }) .await } @@ -290,7 +292,7 @@ impl Database { .await? .map(|model| model.channel_message_id); - Ok(dbg!(last_message_read) != dbg!(latest_message_id)) + Ok(last_message_read != latest_message_id) } pub async fn remove_channel_message( diff --git a/crates/collab/src/db/tests/message_tests.rs b/crates/collab/src/db/tests/message_tests.rs index 2c745bb8ae..98b8cc6037 100644 --- a/crates/collab/src/db/tests/message_tests.rs +++ b/crates/collab/src/db/tests/message_tests.rs @@ -120,7 +120,7 @@ async fn test_channel_message_new_notification(db: &Arc) { .await .unwrap(); - let (second_message, _) = db + let (second_message, _, _) = db .create_channel_message(channel, user_a, "2", OffsetDateTime::now_utc(), 2) .await .unwrap(); diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index b9dae999cd..371d0466c1 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2568,6 +2568,16 @@ async fn respond_to_channel_invite( name: channel.name, }), ); + update.notes_changed = result + .channels_with_changed_notes + .iter() + .map(|id| id.to_proto()) + .collect(); + update.new_messages = result + .channels_with_new_messages + .iter() + .map(|id| id.to_proto()) + .collect(); update.insert_edge = result.channels.edges; update .channel_participants @@ -2818,7 +2828,7 @@ async fn send_channel_message( .ok_or_else(|| anyhow!("nonce can't be blank"))?; let channel_id = ChannelId::from_proto(request.channel_id); - let (message_id, connection_ids) = session + let (message_id, connection_ids, non_participants) = session .db() .await .create_channel_message( @@ -2848,6 +2858,26 @@ async fn send_channel_message( response.send(proto::SendChannelMessageResponse { message: Some(message), })?; + + dbg!(&non_participants); + let pool = &*session.connection_pool().await; + + broadcast( + None, + non_participants + .iter() + .flat_map(|user_id| pool.user_connection_ids(*user_id)), + |peer_id| { + session.peer.send( + peer_id.into(), + proto::UpdateChannels { + new_messages: vec![channel_id.to_proto()], + ..Default::default() + }, + ) + }, + ); + Ok(()) } @@ -3011,6 +3041,12 @@ fn build_initial_channels_update( .map(|channel_id| channel_id.to_proto()) .collect(); + update.new_messages = channels + .channels_with_new_messages + .iter() + .map(|channel_id| channel_id.to_proto()) + .collect(); + update.insert_edge = channels.channels.edges; for (channel_id, participants) in channels.channel_participants { diff --git a/crates/collab/src/tests/channel_buffer_tests.rs b/crates/collab/src/tests/channel_buffer_tests.rs index 7ca6f0db3f..68acffacf8 100644 --- a/crates/collab/src/tests/channel_buffer_tests.rs +++ b/crates/collab/src/tests/channel_buffer_tests.rs @@ -446,6 +446,7 @@ fn channel(id: u64, name: &'static str) -> Channel { id, name: name.to_string(), has_note_changed: false, + has_new_messages: false, } } diff --git a/crates/collab/src/tests/channel_message_tests.rs b/crates/collab/src/tests/channel_message_tests.rs index 58494c538b..1b3a54dc42 100644 --- a/crates/collab/src/tests/channel_message_tests.rs +++ b/crates/collab/src/tests/channel_message_tests.rs @@ -1,6 +1,6 @@ use crate::{rpc::RECONNECT_TIMEOUT, tests::TestServer}; use channel::{ChannelChat, ChannelMessageId}; -use gpui::{executor::Deterministic, ModelHandle, TestAppContext}; +use gpui::{executor::Deterministic, BorrowAppContext, ModelHandle, TestAppContext}; use std::sync::Arc; #[gpui::test] @@ -223,3 +223,106 @@ fn assert_messages(chat: &ModelHandle, messages: &[&str], cx: &mut messages ); } + +#[gpui::test] +async fn test_channel_message_changes( + deterministic: Arc, + cx_a: &mut TestAppContext, + cx_b: &mut TestAppContext, +) { + deterministic.forbid_parking(); + let mut server = TestServer::start(&deterministic).await; + let client_a = server.create_client(cx_a, "user_a").await; + let client_b = server.create_client(cx_b, "user_b").await; + + let channel_id = server + .make_channel( + "the-channel", + None, + (&client_a, cx_a), + &mut [(&client_b, cx_b)], + ) + .await; + + // Client A sends a message, client B should see that there is a new message. + let channel_chat_a = client_a + .channel_store() + .update(cx_a, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + + channel_chat_a + .update(cx_a, |c, cx| c.send_message("one".into(), cx).unwrap()) + .await + .unwrap(); + + deterministic.run_until_parked(); + + let b_has_messages = cx_b.read_with(|cx| { + client_b + .channel_store() + .read(cx) + .has_new_messages(channel_id) + .unwrap() + }); + + assert!(b_has_messages); + + // Opening the chat should clear the changed flag. + let channel_chat_b = client_b + .channel_store() + .update(cx_b, |store, cx| store.open_channel_chat(channel_id, cx)) + .await + .unwrap(); + + let b_has_messages = cx_b.read_with(|cx| { + client_b + .channel_store() + .read(cx) + .has_new_messages(channel_id) + .unwrap() + }); + + assert!(!b_has_messages); + + // Sending a message while the chat is open should not change the flag. + channel_chat_a + .update(cx_a, |c, cx| c.send_message("two".into(), cx).unwrap()) + .await + .unwrap(); + + deterministic.run_until_parked(); + + let b_has_messages = cx_b.read_with(|cx| { + client_b + .channel_store() + .read(cx) + .has_new_messages(channel_id) + .unwrap() + }); + + assert!(!b_has_messages); + + // Closing the chat should re-enable change tracking + + cx_b.update(|_| { + drop(channel_chat_b); + }); + + deterministic.run_until_parked(); + + channel_chat_a + .update(cx_a, |c, cx| c.send_message("three".into(), cx).unwrap()) + .await + .unwrap(); + + let b_has_messages = cx_b.read_with(|cx| { + client_b + .channel_store() + .read(cx) + .has_new_messages(channel_id) + .unwrap() + }); + + assert!(b_has_messages); +} diff --git a/crates/collab_ui/src/collab_panel.rs b/crates/collab_ui/src/collab_panel.rs index 0fd265d0aa..53d5140a12 100644 --- a/crates/collab_ui/src/collab_panel.rs +++ b/crates/collab_ui/src/collab_panel.rs @@ -1821,7 +1821,7 @@ impl CollabPanel { channel.name.clone(), theme .channel_name - .in_state(channel.has_note_changed) + .in_state(channel.has_new_messages) .text .clone(), ) diff --git a/crates/gpui/src/app.rs b/crates/gpui/src/app.rs index 6b3cd03e8f..edcc7ad6f6 100644 --- a/crates/gpui/src/app.rs +++ b/crates/gpui/src/app.rs @@ -1252,7 +1252,7 @@ impl AppContext { result }) } else { - panic!("circular model update"); + panic!("circular model update for {}", std::any::type_name::()); } } diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index c53db447d3..d0c9d73fc4 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -956,6 +956,7 @@ message UpdateChannels { repeated ChannelParticipants channel_participants = 7; repeated ChannelPermission channel_permissions = 8; repeated uint64 notes_changed = 9; + repeated uint64 new_messages = 10; } message ChannelEdge {