From 716221cd387c9da50ac3caa957ffd37db7dbff0d Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Thu, 25 Jan 2024 15:39:55 -0700 Subject: [PATCH] Simplify handling of syncing versions Currently whenever a channel changes we send a huge amount of data to each member. This is the first step in reducing that Co-Authored-By: Max Co-Authored-By: bennetbo --- crates/channel/src/channel_store.rs | 215 +++++++++++++----- .../src/channel_store/channel_index.rs | 76 ------- crates/collab/src/db/queries/buffers.rs | 61 ++--- crates/collab/src/db/queries/channels.rs | 6 +- crates/collab/src/db/queries/messages.rs | 4 + crates/collab/src/db/tests/buffer_tests.rs | 101 +------- crates/collab/src/db/tests/message_tests.rs | 85 +------ crates/collab/src/rpc.rs | 42 ++-- .../collab/src/tests/channel_buffer_tests.rs | 5 - .../collab/src/tests/channel_message_tests.rs | 5 - crates/collab_ui/src/channel_view.rs | 2 +- crates/collab_ui/src/chat_panel.rs | 2 +- crates/collab_ui/src/collab_panel.rs | 9 +- 13 files changed, 209 insertions(+), 404 deletions(-) diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index eba9b8a517..831e7de431 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -29,35 +29,47 @@ pub const RECONNECT_TIMEOUT: Duration = Duration::from_secs(30); pub type ChannelId = u64; +#[derive(Debug, Clone, Default)] +struct NotesVersion { + epoch: u64, + version: clock::Global, +} + pub struct ChannelStore { pub channel_index: ChannelIndex, channel_invitations: Vec>, channel_participants: HashMap>>, - observed_chat_messages: HashMap, - observed_notes_versions: HashMap, + channel_states: HashMap, + outgoing_invites: HashSet<(ChannelId, UserId)>, update_channels_tx: mpsc::UnboundedSender, opened_buffers: HashMap>, opened_chats: HashMap>, client: Arc, user_store: Model, - _rpc_subscription: Subscription, + _rpc_subscriptions: [Subscription; 2], _watch_connection_status: Task>, disconnect_channel_buffers_task: Option>, _update_channels: Task<()>, } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug)] pub struct Channel { pub id: ChannelId, pub name: SharedString, pub visibility: proto::ChannelVisibility, pub role: proto::ChannelRole, - pub latest_message_id: Option, - pub latest_note_version: Option, pub parent_path: Vec, } +#[derive(Default)] +pub struct ChannelState { + latest_chat_message: Option, + latest_notes_versions: Option, + observed_chat_message: Option, + observed_notes_versions: Option, +} + impl Channel { pub fn link(&self) -> String { RELEASE_CHANNEL.link_prefix().to_owned() @@ -179,7 +191,7 @@ impl ChannelStore { update_channels_tx, client, user_store, - _rpc_subscription: rpc_subscription, + _rpc_subscriptions: rpc_subscriptions, _watch_connection_status: watch_connection_status, disconnect_channel_buffers_task: None, _update_channels: cx.spawn(|this, mut cx| async move { @@ -199,8 +211,7 @@ impl ChannelStore { .await .log_err(); }), - observed_chat_messages: Default::default(), - observed_notes_versions: Default::default(), + channel_states: Default::default(), } } @@ -312,39 +323,16 @@ impl ChannelStore { }) } - pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> Option { - self.channel_index - .by_id() + pub fn has_channel_buffer_changed(&self, channel_id: ChannelId) -> bool { + self.channel_states .get(&channel_id) - .map(|channel| channel.unseen_note_version.is_some()) + .is_some_and(|state| state.has_channel_buffer_changed()) } - pub fn has_new_messages(&self, channel_id: ChannelId) -> Option { - self.channel_index - .by_id() + pub fn has_new_messages(&self, channel_id: ChannelId) -> bool { + self.channel_states .get(&channel_id) - .map(|channel| channel.unseen_message_id.is_some()) - } - - pub fn notes_changed( - &mut self, - channel_id: ChannelId, - epoch: u64, - version: &clock::Global, - cx: &mut ModelContext, - ) { - self.channel_index.note_changed(channel_id, epoch, version); - cx.notify(); - } - - pub fn new_message( - &mut self, - channel_id: ChannelId, - message_id: u64, - cx: &mut ModelContext, - ) { - self.channel_index.new_message(channel_id, message_id); - cx.notify(); + .is_some_and(|state| state.has_new_messages()) } pub fn acknowledge_message_id( @@ -353,8 +341,23 @@ impl ChannelStore { message_id: u64, cx: &mut ModelContext, ) { - self.channel_index - .acknowledge_message_id(channel_id, message_id); + self.channel_states + .entry(channel_id) + .or_insert_with(|| Default::default()) + .acknowledge_message_id(message_id); + cx.notify(); + } + + pub fn update_latest_message_id( + &mut self, + channel_id: ChannelId, + message_id: u64, + cx: &mut ModelContext, + ) { + self.channel_states + .entry(channel_id) + .or_insert_with(|| Default::default()) + .update_latest_message_id(message_id); cx.notify(); } @@ -365,9 +368,25 @@ impl ChannelStore { version: &clock::Global, cx: &mut ModelContext, ) { - self.channel_index - .acknowledge_note_version(channel_id, epoch, version); - cx.notify(); + self.channel_states + .entry(channel_id) + .or_insert_with(|| Default::default()) + .acknowledge_notes_version(epoch, version); + cx.notify() + } + + pub fn update_latest_notes_version( + &mut self, + channel_id: ChannelId, + epoch: u64, + version: &clock::Global, + cx: &mut ModelContext, + ) { + self.channel_states + .entry(channel_id) + .or_insert_with(|| Default::default()) + .update_latest_notes_version(epoch, version); + cx.notify() } pub fn open_channel_chat( @@ -755,15 +774,24 @@ impl ChannelStore { async fn handle_update_user_channels( this: Model, - message: TypedEnvelope, + message: TypedEnvelope, _: Arc, mut cx: AsyncAppContext, ) -> Result<()> { - this.update(&mut cx, |this, _| { - // this.seen_channel_message_ids - // .insert(message.channel_id, message.message_id); - cx.notify(); - })?; + this.update(&mut cx, |this, cx| { + for buffer_version in message.payload.observed_channel_buffer_version { + let version = language::proto::deserialize_version(&buffer_version.version); + this.acknowledge_notes_version( + buffer_version.channel_id, + buffer_version.epoch, + &version, + cx, + ); + } + for message_id in message.payload.observed_channel_message_id { + this.acknowledge_message_id(message_id.channel_id, message_id.message_id, cx); + } + }) } fn handle_connect(&mut self, cx: &mut ModelContext) -> Task> { @@ -938,8 +966,8 @@ impl ChannelStore { let channels_changed = !payload.channels.is_empty() || !payload.delete_channels.is_empty() - || !payload.unseen_channel_messages.is_empty() - || !payload.unseen_channel_buffer_changes.is_empty(); + || !payload.latest_channel_message_ids.is_empty() + || !payload.latest_channel_buffer_versions.is_empty(); if channels_changed { if !payload.delete_channels.is_empty() { @@ -980,20 +1008,19 @@ impl ChannelStore { } } - for unseen_buffer_change in payload.unseen_channel_buffer_changes { - let version = language::proto::deserialize_version(&unseen_buffer_change.version); - index.note_changed( - unseen_buffer_change.channel_id, - unseen_buffer_change.epoch, - &version, - ); + for latest_buffer_version in payload.latest_channel_buffer_versions { + let version = language::proto::deserialize_version(&latest_buffer_version.version); + self.channel_states + .entry(latest_buffer_version.channel_id) + .or_default() + .update_latest_notes_version(latest_buffer_version.epoch, &version) } - for unseen_channel_message in payload.unseen_channel_messages { - index.new_messages( - unseen_channel_message.channel_id, - unseen_channel_message.message_id, - ); + for latest_channel_message in payload.latest_channel_message_ids { + self.channel_states + .entry(latest_channel_message.channel_id) + .or_default() + .update_latest_message_id(latest_channel_message.message_id); } } @@ -1042,3 +1069,65 @@ impl ChannelStore { })) } } + +impl ChannelState { + fn has_channel_buffer_changed(&self) -> bool { + if let Some(latest_version) = &self.latest_notes_versions { + if let Some(observed_version) = &self.observed_notes_versions { + latest_version.epoch > observed_version.epoch + || latest_version + .version + .changed_since(&observed_version.version) + } else { + true + } + } else { + false + } + } + + fn has_new_messages(&self) -> bool { + let latest_message_id = self.latest_chat_message; + let observed_message_id = self.observed_chat_message; + + latest_message_id.is_some_and(|latest_message_id| { + latest_message_id > observed_message_id.unwrap_or_default() + }) + } + + fn acknowledge_message_id(&mut self, message_id: u64) { + let observed = self.observed_chat_message.get_or_insert(message_id); + *observed = (*observed).max(message_id); + } + + fn update_latest_message_id(&mut self, message_id: u64) { + self.latest_chat_message = + Some(message_id.max(self.latest_chat_message.unwrap_or_default())); + } + + fn acknowledge_notes_version(&mut self, epoch: u64, version: &clock::Global) { + if let Some(existing) = &mut self.observed_notes_versions { + if existing.epoch == epoch { + existing.version.join(version); + return; + } + } + self.observed_notes_versions = Some(NotesVersion { + epoch, + version: version.clone(), + }); + } + + fn update_latest_notes_version(&mut self, epoch: u64, version: &clock::Global) { + if let Some(existing) = &mut self.latest_notes_versions { + if existing.epoch == epoch { + existing.version.join(version); + return; + } + } + self.latest_notes_versions = Some(NotesVersion { + epoch, + version: version.clone(), + }); + } +} diff --git a/crates/channel/src/channel_store/channel_index.rs b/crates/channel/src/channel_store/channel_index.rs index 77903b4186..ca2eb3345e 100644 --- a/crates/channel/src/channel_store/channel_index.rs +++ b/crates/channel/src/channel_store/channel_index.rs @@ -37,43 +37,6 @@ impl ChannelIndex { channels_by_id: &mut self.channels_by_id, } } - - pub fn acknowledge_note_version( - &mut self, - channel_id: ChannelId, - epoch: u64, - version: &clock::Global, - ) { - if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { - let channel = Arc::make_mut(channel); - if let Some((unseen_epoch, unseen_version)) = &channel.unseen_note_version { - if epoch > *unseen_epoch - || epoch == *unseen_epoch && version.observed_all(unseen_version) - { - channel.unseen_note_version = None; - } - } - } - } - - pub fn acknowledge_message_id(&mut self, channel_id: ChannelId, message_id: u64) { - if let Some(channel) = self.channels_by_id.get_mut(&channel_id) { - let channel = Arc::make_mut(channel); - if let Some(unseen_message_id) = channel.unseen_message_id { - if message_id >= unseen_message_id { - channel.unseen_message_id = None; - } - } - } - } - - pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) { - insert_note_changed(&mut self.channels_by_id, channel_id, epoch, version); - } - - pub fn new_message(&mut self, channel_id: ChannelId, message_id: u64) { - insert_new_message(&mut self.channels_by_id, channel_id, message_id) - } } /// A guard for ensuring that the paths index maintains its sort and uniqueness @@ -85,14 +48,6 @@ pub struct ChannelPathsInsertGuard<'a> { } impl<'a> ChannelPathsInsertGuard<'a> { - pub fn note_changed(&mut self, channel_id: ChannelId, epoch: u64, version: &clock::Global) { - insert_note_changed(self.channels_by_id, channel_id, epoch, version); - } - - pub fn new_messages(&mut self, channel_id: ChannelId, message_id: u64) { - insert_new_message(self.channels_by_id, channel_id, message_id) - } - pub fn insert(&mut self, channel_proto: proto::Channel) -> bool { let mut ret = false; if let Some(existing_channel) = self.channels_by_id.get_mut(&channel_proto.id) { @@ -114,8 +69,6 @@ impl<'a> ChannelPathsInsertGuard<'a> { role: channel_proto.role(), name: channel_proto.name.into(), parent_path: channel_proto.parent_path, - latest_message_id: channel_proto.latest_message_id, - latest_note_version: channel_proto.latest_note_version, }), ); self.insert_root(channel_proto.id); @@ -153,32 +106,3 @@ fn channel_path_sorting_key<'a>( .filter_map(|id| Some(channels_by_id.get(id)?.name.as_ref())) .chain(name) } - -fn insert_note_changed( - channels_by_id: &mut BTreeMap>, - channel_id: u64, - epoch: u64, - version: &clock::Global, -) { - if let Some(channel) = channels_by_id.get_mut(&channel_id) { - let unseen_version = Arc::make_mut(channel) - .unseen_note_version - .get_or_insert((0, clock::Global::new())); - if epoch > unseen_version.0 { - *unseen_version = (epoch, version.clone()); - } else { - unseen_version.1.join(version); - } - } -} - -fn insert_new_message( - channels_by_id: &mut BTreeMap>, - channel_id: u64, - message_id: u64, -) { - if let Some(channel) = channels_by_id.get_mut(&channel_id) { - let unseen_message_id = Arc::make_mut(channel).unseen_message_id.get_or_insert(0); - *unseen_message_id = message_id.max(*unseen_message_id); - } -} diff --git a/crates/collab/src/db/queries/buffers.rs b/crates/collab/src/db/queries/buffers.rs index c19cd530a0..59b8f8d01f 100644 --- a/crates/collab/src/db/queries/buffers.rs +++ b/crates/collab/src/db/queries/buffers.rs @@ -748,18 +748,11 @@ impl Database { .await } - pub async fn unseen_channel_buffer_changes( + pub async fn latest_channel_buffer_changes( &self, - user_id: UserId, channel_ids: &[ChannelId], tx: &DatabaseTransaction, - ) -> Result> { - #[derive(Debug, Clone, Copy, EnumIter, DeriveColumn)] - enum QueryIds { - ChannelId, - Id, - } - + ) -> Result> { let mut channel_ids_by_buffer_id = HashMap::default(); let mut rows = buffer::Entity::find() .filter(buffer::Column::ChannelId.is_in(channel_ids.iter().copied())) @@ -771,51 +764,23 @@ impl Database { } drop(rows); - let mut observed_edits_by_buffer_id = HashMap::default(); - let mut rows = observed_buffer_edits::Entity::find() - .filter(observed_buffer_edits::Column::UserId.eq(user_id)) - .filter( - observed_buffer_edits::Column::BufferId - .is_in(channel_ids_by_buffer_id.keys().copied()), - ) - .stream(&*tx) - .await?; - while let Some(row) = rows.next().await { - let row = row?; - observed_edits_by_buffer_id.insert(row.buffer_id, row); - } - drop(rows); - let latest_operations = self .get_latest_operations_for_buffers(channel_ids_by_buffer_id.keys().copied(), &*tx) .await?; - let mut changes = Vec::default(); - for latest in latest_operations { - if let Some(observed) = observed_edits_by_buffer_id.get(&latest.buffer_id) { - if ( - observed.epoch, - observed.lamport_timestamp, - observed.replica_id, - ) >= (latest.epoch, latest.lamport_timestamp, latest.replica_id) - { - continue; - } - } - - if let Some(channel_id) = channel_ids_by_buffer_id.get(&latest.buffer_id) { - changes.push(proto::UnseenChannelBufferChange { - channel_id: channel_id.to_proto(), - epoch: latest.epoch as u64, + Ok(latest_operations + .iter() + .flat_map(|op| { + Some(proto::ChannelBufferVersion { + channel_id: channel_ids_by_buffer_id.get(&op.buffer_id)?.to_proto(), + epoch: op.epoch as u64, version: vec![proto::VectorClockEntry { - replica_id: latest.replica_id as u32, - timestamp: latest.lamport_timestamp as u32, + replica_id: op.replica_id as u32, + timestamp: op.lamport_timestamp as u32, }], - }); - } - } - - Ok(changes) + }) + }) + .collect()) } /// Returns the latest operations for the buffers with the specified IDs. diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index e30c8483fa..f1b4c8af7b 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -687,8 +687,8 @@ impl Database { } let channel_ids = channels.iter().map(|c| c.id).collect::>(); - let channel_buffer_changes = self - .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx) + let latest_buffer_versions = self + .latest_channel_buffer_changes(&channel_ids, &*tx) .await?; let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?; @@ -696,7 +696,7 @@ impl Database { Ok(ChannelsForUser { channels, channel_participants, - latest_buffer_versions: channel_buffer_changes, + latest_buffer_versions, latest_channel_messages: latest_messages, }) } diff --git a/crates/collab/src/db/queries/messages.rs b/crates/collab/src/db/queries/messages.rs index c736f5d0a3..9baa7162c5 100644 --- a/crates/collab/src/db/queries/messages.rs +++ b/crates/collab/src/db/queries/messages.rs @@ -398,6 +398,10 @@ impl Database { write!(&mut values, "({})", id).unwrap(); } + if values.is_empty() { + return Ok(Vec::default()); + } + let sql = format!( r#" SELECT diff --git a/crates/collab/src/db/tests/buffer_tests.rs b/crates/collab/src/db/tests/buffer_tests.rs index 222514da0b..2eb8e15301 100644 --- a/crates/collab/src/db/tests/buffer_tests.rs +++ b/crates/collab/src/db/tests/buffer_tests.rs @@ -330,8 +330,7 @@ async fn test_channel_buffers_last_operations(db: &Database) { .transaction(|tx| { let buffers = &buffers; async move { - db.unseen_channel_buffer_changes( - observer_id, + db.latest_channel_buffer_changes( &[ buffers[0].channel_id, buffers[1].channel_id, @@ -348,12 +347,12 @@ async fn test_channel_buffers_last_operations(db: &Database) { pretty_assertions::assert_eq!( buffer_changes, [ - rpc::proto::UnseenChannelBufferChange { + rpc::proto::ChannelBufferVersion { channel_id: buffers[0].channel_id.to_proto(), epoch: 0, version: serialize_version(&text_buffers[0].version()), }, - rpc::proto::UnseenChannelBufferChange { + rpc::proto::ChannelBufferVersion { channel_id: buffers[1].channel_id.to_proto(), epoch: 1, version: serialize_version(&text_buffers[1].version()) @@ -362,99 +361,7 @@ async fn test_channel_buffers_last_operations(db: &Database) { == buffer_changes[1].version.first().unwrap().replica_id) .collect::>(), }, - rpc::proto::UnseenChannelBufferChange { - channel_id: buffers[2].channel_id.to_proto(), - epoch: 0, - version: serialize_version(&text_buffers[2].version()), - }, - ] - ); - - db.observe_buffer_version( - buffers[1].id, - observer_id, - 1, - serialize_version(&text_buffers[1].version()).as_slice(), - ) - .await - .unwrap(); - - let buffer_changes = db - .transaction(|tx| { - let buffers = &buffers; - async move { - db.unseen_channel_buffer_changes( - observer_id, - &[ - buffers[0].channel_id, - buffers[1].channel_id, - buffers[2].channel_id, - ], - &*tx, - ) - .await - } - }) - .await - .unwrap(); - - assert_eq!( - buffer_changes, - [ - rpc::proto::UnseenChannelBufferChange { - channel_id: buffers[0].channel_id.to_proto(), - epoch: 0, - version: serialize_version(&text_buffers[0].version()), - }, - rpc::proto::UnseenChannelBufferChange { - channel_id: buffers[2].channel_id.to_proto(), - epoch: 0, - version: serialize_version(&text_buffers[2].version()), - }, - ] - ); - - // Observe an earlier version of the buffer. - db.observe_buffer_version( - buffers[1].id, - observer_id, - 1, - &[rpc::proto::VectorClockEntry { - replica_id: 0, - timestamp: 0, - }], - ) - .await - .unwrap(); - - let buffer_changes = db - .transaction(|tx| { - let buffers = &buffers; - async move { - db.unseen_channel_buffer_changes( - observer_id, - &[ - buffers[0].channel_id, - buffers[1].channel_id, - buffers[2].channel_id, - ], - &*tx, - ) - .await - } - }) - .await - .unwrap(); - - assert_eq!( - buffer_changes, - [ - rpc::proto::UnseenChannelBufferChange { - channel_id: buffers[0].channel_id.to_proto(), - epoch: 0, - version: serialize_version(&text_buffers[0].version()), - }, - rpc::proto::UnseenChannelBufferChange { + rpc::proto::ChannelBufferVersion { channel_id: buffers[2].channel_id.to_proto(), epoch: 0, version: serialize_version(&text_buffers[2].version()), diff --git a/crates/collab/src/db/tests/message_tests.rs b/crates/collab/src/db/tests/message_tests.rs index 23c4da3a81..613830d38f 100644 --- a/crates/collab/src/db/tests/message_tests.rs +++ b/crates/collab/src/db/tests/message_tests.rs @@ -235,11 +235,10 @@ async fn test_unseen_channel_messages(db: &Arc) { .await .unwrap(); - let second_message = db + let _ = db .create_channel_message(channel_1, user, "1_2", &[], OffsetDateTime::now_utc(), 2) .await - .unwrap() - .message_id; + .unwrap(); let third_message = db .create_channel_message(channel_1, user, "1_3", &[], OffsetDateTime::now_utc(), 3) @@ -258,97 +257,27 @@ async fn test_unseen_channel_messages(db: &Arc) { .message_id; // Check that observer has new messages - let unseen_messages = db + let latest_messages = db .transaction(|tx| async move { - db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx) + db.latest_channel_messages(&[channel_1, channel_2], &*tx) .await }) .await .unwrap(); assert_eq!( - unseen_messages, + latest_messages, [ - rpc::proto::UnseenChannelMessage { + rpc::proto::ChannelMessageId { channel_id: channel_1.to_proto(), message_id: third_message.to_proto(), }, - rpc::proto::UnseenChannelMessage { + rpc::proto::ChannelMessageId { channel_id: channel_2.to_proto(), message_id: fourth_message.to_proto(), }, ] ); - - // Observe the second message - db.observe_channel_message(channel_1, observer, second_message) - .await - .unwrap(); - - // Make sure the observer still has a new message - let unseen_messages = db - .transaction(|tx| async move { - db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx) - .await - }) - .await - .unwrap(); - assert_eq!( - unseen_messages, - [ - rpc::proto::UnseenChannelMessage { - channel_id: channel_1.to_proto(), - message_id: third_message.to_proto(), - }, - rpc::proto::UnseenChannelMessage { - channel_id: channel_2.to_proto(), - message_id: fourth_message.to_proto(), - }, - ] - ); - - // Observe the third message, - db.observe_channel_message(channel_1, observer, third_message) - .await - .unwrap(); - - // Make sure the observer does not have a new method - let unseen_messages = db - .transaction(|tx| async move { - db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx) - .await - }) - .await - .unwrap(); - - assert_eq!( - unseen_messages, - [rpc::proto::UnseenChannelMessage { - channel_id: channel_2.to_proto(), - message_id: fourth_message.to_proto(), - }] - ); - - // Observe the second message again, should not regress our observed state - db.observe_channel_message(channel_1, observer, second_message) - .await - .unwrap(); - - // Make sure the observer does not have a new message - let unseen_messages = db - .transaction(|tx| async move { - db.unseen_channel_messages(observer, &[channel_1, channel_2], &*tx) - .await - }) - .await - .unwrap(); - assert_eq!( - unseen_messages, - [rpc::proto::UnseenChannelMessage { - channel_id: channel_2.to_proto(), - message_id: fourth_message.to_proto(), - }] - ); } test_both_dbs!( diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 6593c2f566..a6984e3edd 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2842,27 +2842,25 @@ async fn update_channel_buffer( let pool = &*session.connection_pool().await; - todo!() - - // broadcast( - // None, - // non_collaborators - // .iter() - // .flat_map(|user_id| pool.user_connection_ids(*user_id)), - // |peer_id| { - // session.peer.send( - // peer_id.into(), - // proto::UpdateChannels { - // unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange { - // channel_id: channel_id.to_proto(), - // epoch: epoch as u64, - // version: version.clone(), - // }], - // ..Default::default() - // }, - // ) - // }, - // ); + broadcast( + None, + non_collaborators + .iter() + .flat_map(|user_id| pool.user_connection_ids(*user_id)), + |peer_id| { + session.peer.send( + peer_id.into(), + proto::UpdateChannels { + latest_channel_buffer_versions: vec![proto::ChannelBufferVersion { + channel_id: channel_id.to_proto(), + epoch: epoch as u64, + version: version.clone(), + }], + ..Default::default() + }, + ) + }, + ); Ok(()) } @@ -3039,7 +3037,7 @@ async fn send_channel_message( session.peer.send( peer_id.into(), proto::UpdateChannels { - unseen_channel_messages: vec![proto::UnseenChannelMessage { + latest_channel_message_ids: vec![proto::ChannelMessageId { channel_id: channel_id.to_proto(), message_id: message_id.to_proto(), }], diff --git a/crates/collab/src/tests/channel_buffer_tests.rs b/crates/collab/src/tests/channel_buffer_tests.rs index 76cc8cb9e1..cf5b999ef6 100644 --- a/crates/collab/src/tests/channel_buffer_tests.rs +++ b/crates/collab/src/tests/channel_buffer_tests.rs @@ -637,7 +637,6 @@ async fn test_channel_buffer_changes( .channel_store() .read(cx) .has_channel_buffer_changed(channel_id) - .unwrap() }); assert!(has_buffer_changed); @@ -655,7 +654,6 @@ async fn test_channel_buffer_changes( .channel_store() .read(cx) .has_channel_buffer_changed(channel_id) - .unwrap() }); assert!(!has_buffer_changed); @@ -672,7 +670,6 @@ async fn test_channel_buffer_changes( .channel_store() .read(cx) .has_channel_buffer_changed(channel_id) - .unwrap() }); assert!(!has_buffer_changed); @@ -687,7 +684,6 @@ async fn test_channel_buffer_changes( .channel_store() .read(cx) .has_channel_buffer_changed(channel_id) - .unwrap() }); assert!(!has_buffer_changed); @@ -714,7 +710,6 @@ async fn test_channel_buffer_changes( .channel_store() .read(cx) .has_channel_buffer_changed(channel_id) - .unwrap() }); assert!(has_buffer_changed); } diff --git a/crates/collab/src/tests/channel_message_tests.rs b/crates/collab/src/tests/channel_message_tests.rs index e59aa3c705..270ba04f7e 100644 --- a/crates/collab/src/tests/channel_message_tests.rs +++ b/crates/collab/src/tests/channel_message_tests.rs @@ -313,7 +313,6 @@ async fn test_channel_message_changes( .channel_store() .read(cx) .has_new_messages(channel_id) - .unwrap() }); assert!(b_has_messages); @@ -341,7 +340,6 @@ async fn test_channel_message_changes( .channel_store() .read(cx) .has_new_messages(channel_id) - .unwrap() }); assert!(!b_has_messages); @@ -359,7 +357,6 @@ async fn test_channel_message_changes( .channel_store() .read(cx) .has_new_messages(channel_id) - .unwrap() }); assert!(!b_has_messages); @@ -382,7 +379,6 @@ async fn test_channel_message_changes( .channel_store() .read(cx) .has_new_messages(channel_id) - .unwrap() }); assert!(b_has_messages); @@ -402,7 +398,6 @@ async fn test_channel_message_changes( .channel_store() .read(cx) .has_new_messages(channel_id) - .unwrap() }); assert!(b_has_messages); diff --git a/crates/collab_ui/src/channel_view.rs b/crates/collab_ui/src/channel_view.rs index b2c243dc89..2c0ff77459 100644 --- a/crates/collab_ui/src/channel_view.rs +++ b/crates/collab_ui/src/channel_view.rs @@ -183,7 +183,7 @@ impl ChannelView { } else { self.channel_store.update(cx, |store, cx| { let channel_buffer = self.channel_buffer.read(cx); - store.notes_changed( + store.update_latest_notes_version( channel_buffer.channel_id, channel_buffer.epoch(), &channel_buffer.buffer().read(cx).version(), diff --git a/crates/collab_ui/src/chat_panel.rs b/crates/collab_ui/src/chat_panel.rs index ebba8ffc26..a3d44a92fb 100644 --- a/crates/collab_ui/src/chat_panel.rs +++ b/crates/collab_ui/src/chat_panel.rs @@ -266,7 +266,7 @@ impl ChatPanel { } => { if !self.active { self.channel_store.update(cx, |store, cx| { - store.new_message(*channel_id, *message_id, cx) + store.update_latest_message_id(*channel_id, *message_id, cx) }) } } diff --git a/crates/collab_ui/src/collab_panel.rs b/crates/collab_ui/src/collab_panel.rs index e0244b5e32..098ba02a58 100644 --- a/crates/collab_ui/src/collab_panel.rs +++ b/crates/collab_ui/src/collab_panel.rs @@ -2219,17 +2219,16 @@ impl CollabPanel { Some(call_channel == channel_id) }) .unwrap_or(false); - let is_public = self - .channel_store - .read(cx) + let channel_store = self.channel_store.read(cx); + let is_public = channel_store .channel_for_id(channel_id) .map(|channel| channel.visibility) == Some(proto::ChannelVisibility::Public); let disclosed = has_children.then(|| !self.collapsed_channels.binary_search(&channel.id).is_ok()); - let has_messages_notification = channel.unseen_message_id.is_some(); - let has_notes_notification = channel.unseen_note_version.is_some(); + let has_messages_notification = channel_store.has_new_messages(channel_id); + let has_notes_notification = channel_store.has_channel_buffer_changed(channel_id); const FACEPILE_LIMIT: usize = 3; let participants = self.channel_store.read(cx).channel_participants(channel_id);