From b72c03719911a71d316301ac706cf16b80f559ce Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Thu, 25 Jan 2024 13:24:45 -0700 Subject: [PATCH] TEMP --- crates/channel/src/channel_store.rs | 29 +++++++++--- .../src/channel_store/channel_index.rs | 4 +- crates/collab/src/db.rs | 4 +- crates/collab/src/db/queries/channels.rs | 8 ++-- crates/collab/src/db/queries/messages.rs | 46 +++++-------------- crates/collab/src/rpc.rs | 44 +++++++++--------- crates/rpc/proto/zed.proto | 21 +++++---- crates/rpc/src/proto.rs | 1 + crates/rpc/src/rpc.rs | 2 +- 9 files changed, 77 insertions(+), 82 deletions(-) diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index 59b69405a5..eba9b8a517 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -33,6 +33,8 @@ pub struct ChannelStore { pub channel_index: ChannelIndex, channel_invitations: Vec>, channel_participants: HashMap>>, + observed_chat_messages: HashMap, + observed_notes_versions: HashMap, outgoing_invites: HashSet<(ChannelId, UserId)>, update_channels_tx: mpsc::UnboundedSender, opened_buffers: HashMap>, @@ -51,8 +53,8 @@ pub struct Channel { pub name: SharedString, pub visibility: proto::ChannelVisibility, pub role: proto::ChannelRole, - pub unseen_note_version: Option<(u64, clock::Global)>, - pub unseen_message_id: Option, + pub latest_message_id: Option, + pub latest_note_version: Option, pub parent_path: Vec, } @@ -137,8 +139,10 @@ impl ChannelStore { user_store: Model, cx: &mut ModelContext, ) -> Self { - let rpc_subscription = - client.add_message_handler(cx.weak_model(), Self::handle_update_channels); + let rpc_subscriptions = [ + client.add_message_handler(cx.weak_model(), Self::handle_update_channels), + client.add_message_handler(cx.weak_model(), Self::handle_update_user_channels), + ]; let mut connection_status = client.status(); let (update_channels_tx, mut update_channels_rx) = mpsc::unbounded(); @@ -195,6 +199,8 @@ impl ChannelStore { .await .log_err(); }), + observed_chat_messages: Default::default(), + observed_notes_versions: Default::default(), } } @@ -747,6 +753,19 @@ impl ChannelStore { Ok(()) } + async fn handle_update_user_channels( + this: Model, + message: TypedEnvelope, + _: Arc, + mut cx: AsyncAppContext, + ) -> Result<()> { + this.update(&mut cx, |this, _| { + // this.seen_channel_message_ids + // .insert(message.channel_id, message.message_id); + cx.notify(); + })?; + } + fn handle_connect(&mut self, cx: &mut ModelContext) -> Task> { self.channel_index.clear(); self.channel_invitations.clear(); @@ -911,8 +930,6 @@ impl ChannelStore { visibility: channel.visibility(), role: channel.role(), name: channel.name.into(), - unseen_note_version: None, - unseen_message_id: None, parent_path: channel.parent_path, }), ), diff --git a/crates/channel/src/channel_store/channel_index.rs b/crates/channel/src/channel_store/channel_index.rs index 88012a57c2..77903b4186 100644 --- a/crates/channel/src/channel_store/channel_index.rs +++ b/crates/channel/src/channel_store/channel_index.rs @@ -113,9 +113,9 @@ impl<'a> ChannelPathsInsertGuard<'a> { visibility: channel_proto.visibility(), role: channel_proto.role(), name: channel_proto.name.into(), - unseen_note_version: None, - unseen_message_id: None, parent_path: channel_proto.parent_path, + latest_message_id: channel_proto.latest_message_id, + latest_note_version: channel_proto.latest_note_version, }), ); self.insert_root(channel_proto.id); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index d1a717e66e..918d1405a2 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -618,8 +618,8 @@ impl ChannelMember { pub struct ChannelsForUser { pub channels: Vec, pub channel_participants: HashMap>, - pub unseen_buffer_changes: Vec, - pub channel_messages: Vec, + pub latest_buffer_versions: Vec, + pub latest_channel_messages: Vec, } #[derive(Debug)] diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index 10b73a7094..e30c8483fa 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -691,15 +691,13 @@ impl Database { .unseen_channel_buffer_changes(user_id, &channel_ids, &*tx) .await?; - let unseen_messages = self - .unseen_channel_messages(user_id, &channel_ids, &*tx) - .await?; + let latest_messages = self.latest_channel_messages(&channel_ids, &*tx).await?; Ok(ChannelsForUser { channels, channel_participants, - unseen_buffer_changes: channel_buffer_changes, - channel_messages: unseen_messages, + latest_buffer_versions: channel_buffer_changes, + latest_channel_messages: latest_messages, }) } diff --git a/crates/collab/src/db/queries/messages.rs b/crates/collab/src/db/queries/messages.rs index 9ee313d91b..c736f5d0a3 100644 --- a/crates/collab/src/db/queries/messages.rs +++ b/crates/collab/src/db/queries/messages.rs @@ -385,25 +385,11 @@ impl Database { Ok(()) } - /// Returns the unseen messages for the given user in the specified channels. - pub async fn unseen_channel_messages( + pub async fn latest_channel_messages( &self, - user_id: UserId, channel_ids: &[ChannelId], tx: &DatabaseTransaction, - ) -> Result> { - let mut observed_messages_by_channel_id = HashMap::default(); - let mut rows = observed_channel_messages::Entity::find() - .filter(observed_channel_messages::Column::UserId.eq(user_id)) - .filter(observed_channel_messages::Column::ChannelId.is_in(channel_ids.iter().copied())) - .stream(&*tx) - .await?; - - while let Some(row) = rows.next().await { - let row = row?; - observed_messages_by_channel_id.insert(row.channel_id, row); - } - drop(rows); + ) -> Result> { let mut values = String::new(); for id in channel_ids { if !values.is_empty() { @@ -412,10 +398,6 @@ impl Database { write!(&mut values, "({})", id).unwrap(); } - if values.is_empty() { - return Ok(Default::default()); - } - let sql = format!( r#" SELECT @@ -437,26 +419,20 @@ impl Database { ); let stmt = Statement::from_string(self.pool.get_database_backend(), sql); - let last_messages = channel_message::Model::find_by_statement(stmt) - .all(&*tx) + let mut last_messages = channel_message::Model::find_by_statement(stmt) + .stream(&*tx) .await?; - let mut changes = Vec::new(); - for last_message in last_messages { - if let Some(observed_message) = - observed_messages_by_channel_id.get(&last_message.channel_id) - { - if observed_message.channel_message_id == last_message.id { - continue; - } - } - changes.push(proto::UnseenChannelMessage { - channel_id: last_message.channel_id.to_proto(), - message_id: last_message.id.to_proto(), + let mut results = Vec::new(); + while let Some(result) = last_messages.next().await { + let message = result?; + results.push(proto::ChannelMessageId { + channel_id: message.channel_id.to_proto(), + message_id: message.id.to_proto(), }); } - Ok(changes) + Ok(results) } /// Removes the channel message with the given ID. diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index bb173784d3..6593c2f566 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2842,25 +2842,27 @@ async fn update_channel_buffer( let pool = &*session.connection_pool().await; - broadcast( - None, - non_collaborators - .iter() - .flat_map(|user_id| pool.user_connection_ids(*user_id)), - |peer_id| { - session.peer.send( - peer_id.into(), - proto::UpdateChannels { - unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange { - channel_id: channel_id.to_proto(), - epoch: epoch as u64, - version: version.clone(), - }], - ..Default::default() - }, - ) - }, - ); + todo!() + + // broadcast( + // None, + // non_collaborators + // .iter() + // .flat_map(|user_id| pool.user_connection_ids(*user_id)), + // |peer_id| { + // session.peer.send( + // peer_id.into(), + // proto::UpdateChannels { + // unseen_channel_buffer_changes: vec![proto::UnseenChannelBufferChange { + // channel_id: channel_id.to_proto(), + // epoch: epoch as u64, + // version: version.clone(), + // }], + // ..Default::default() + // }, + // ) + // }, + // ); Ok(()) } @@ -3315,8 +3317,8 @@ fn build_channels_update( update.channels.push(channel.to_proto()); } - update.unseen_channel_buffer_changes = channels.unseen_buffer_changes; - update.unseen_channel_messages = channels.channel_messages; + update.latest_channel_buffer_versions = channels.latest_buffer_versions; + update.latest_channel_message_ids = channels.latest_channel_messages; for (channel_id, participants) in channels.channel_participants { update diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 8bea2f24e0..f18ff5cf2e 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -181,7 +181,9 @@ message Envelope { MarkNotificationRead mark_notification_read = 153; LspExtExpandMacro lsp_ext_expand_macro = 154; LspExtExpandMacroResponse lsp_ext_expand_macro_response = 155; - SetRoomParticipantRole set_room_participant_role = 156; // Current max + SetRoomParticipantRole set_room_participant_role = 156; + + UpdateUserChannels update_user_channels = 157; // current max } } @@ -992,21 +994,20 @@ message UpdateChannels { repeated Channel channel_invitations = 5; repeated uint64 remove_channel_invitations = 6; repeated ChannelParticipants channel_participants = 7; - repeated UnseenChannelMessage unseen_channel_messages = 9; - repeated UnseenChannelBufferChange unseen_channel_buffer_changes = 10; + repeated ChannelMessageId latest_channel_message_ids = 8; + repeated ChannelBufferVersion latest_channel_buffer_versions = 9; } -message UnseenChannelMessage { +message UpdateUserChannels { + repeated ChannelMessageId observed_channel_message_id = 1; + repeated ChannelBufferVersion observed_channel_buffer_version = 2; +} + +message ChannelMessageId { uint64 channel_id = 1; uint64 message_id = 2; } -message UnseenChannelBufferChange { - uint64 channel_id = 1; - uint64 epoch = 2; - repeated VectorClockEntry version = 3; -} - message ChannelPermission { uint64 channel_id = 1; ChannelRole role = 3; diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index 5d0a994154..9b885d1840 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -269,6 +269,7 @@ messages!( (UpdateChannelBuffer, Foreground), (UpdateChannelBufferCollaborators, Foreground), (UpdateChannels, Foreground), + (UpdateUserChannels, Foreground), (UpdateContacts, Foreground), (UpdateDiagnosticSummary, Foreground), (UpdateDiffBase, Foreground), diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index c22ecb740d..2beb7614e8 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -11,4 +11,4 @@ pub use notification::*; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 67; +pub const PROTOCOL_VERSION: u32 = 68;