diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 58607836cc..1a89978c38 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1178,7 +1178,7 @@ impl Database { user_id: UserId, connection: ConnectionId, live_kit_room: &str, - ) -> Result { + ) -> Result { self.transaction(|tx| async move { let room = room::ActiveModel { live_kit_room: ActiveValue::set(live_kit_room.into()), @@ -1217,7 +1217,7 @@ impl Database { calling_connection: ConnectionId, called_user_id: UserId, initial_project_id: Option, - ) -> Result> { + ) -> Result> { self.room_transaction(room_id, |tx| async move { room_participant::ActiveModel { room_id: ActiveValue::set(room_id), @@ -1246,7 +1246,7 @@ impl Database { &self, room_id: RoomId, called_user_id: UserId, - ) -> Result> { + ) -> Result> { self.room_transaction(room_id, |tx| async move { room_participant::Entity::delete_many() .filter( @@ -1266,7 +1266,7 @@ impl Database { &self, expected_room_id: Option, user_id: UserId, - ) -> Result>> { + ) -> Result>> { self.optional_room_transaction(|tx| async move { let mut filter = Condition::all() .add(room_participant::Column::UserId.eq(user_id)) @@ -1303,7 +1303,7 @@ impl Database { room_id: RoomId, calling_connection: ConnectionId, called_user_id: UserId, - ) -> Result> { + ) -> Result> { self.room_transaction(room_id, |tx| async move { let participant = room_participant::Entity::find() .filter( @@ -1340,7 +1340,7 @@ impl Database { user_id: UserId, channel_id: Option, connection: ConnectionId, - ) -> Result> { + ) -> Result> { self.room_transaction(room_id, |tx| async move { if let Some(channel_id) = channel_id { channel_member::Entity::find() @@ -1868,7 +1868,7 @@ impl Database { project_id: ProjectId, leader_connection: ConnectionId, follower_connection: ConnectionId, - ) -> Result> { + ) -> Result> { let room_id = self.room_id_for_project(project_id).await?; self.room_transaction(room_id, |tx| async move { follower::ActiveModel { @@ -1898,7 +1898,7 @@ impl Database { project_id: ProjectId, leader_connection: ConnectionId, follower_connection: ConnectionId, - ) -> Result> { + ) -> Result> { let room_id = self.room_id_for_project(project_id).await?; self.room_transaction(room_id, |tx| async move { follower::Entity::delete_many() @@ -1930,7 +1930,7 @@ impl Database { room_id: RoomId, connection: ConnectionId, location: proto::ParticipantLocation, - ) -> Result> { + ) -> Result> { self.room_transaction(room_id, |tx| async { let tx = tx; let location_kind; @@ -2043,7 +2043,7 @@ impl Database { }) } - async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result { + async fn get_room(&self, room_id: RoomId, tx: &DatabaseTransaction) -> Result { let db_room = room::Entity::find_by_id(room_id) .one(tx) .await? @@ -2147,12 +2147,22 @@ impl Database { }); } - Ok(proto::Room { - id: db_room.id.to_proto(), - live_kit_room: db_room.live_kit_room, - participants: participants.into_values().collect(), - pending_participants, - followers, + let channel_users = + if let Some(channel) = db_room.find_related(channel::Entity).one(tx).await? { + self.get_channel_members_internal(channel.id, tx).await? + } else { + Vec::new() + }; + + Ok(ChannelRoom { + room: proto::Room { + id: db_room.id.to_proto(), + live_kit_room: db_room.live_kit_room, + participants: participants.into_values().collect(), + pending_participants, + followers, + }, + channel_participants: channel_users, }) } @@ -2183,7 +2193,7 @@ impl Database { room_id: RoomId, connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], - ) -> Result> { + ) -> Result> { self.room_transaction(room_id, |tx| async move { let participant = room_participant::Entity::find() .filter( @@ -2254,7 +2264,7 @@ impl Database { &self, project_id: ProjectId, connection: ConnectionId, - ) -> Result)>> { + ) -> Result)>> { let room_id = self.room_id_for_project(project_id).await?; self.room_transaction(room_id, |tx| async move { let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; @@ -2281,7 +2291,7 @@ impl Database { project_id: ProjectId, connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], - ) -> Result)>> { + ) -> Result)>> { let room_id = self.room_id_for_project(project_id).await?; self.room_transaction(room_id, |tx| async move { let project = project::Entity::find_by_id(project_id) @@ -2858,7 +2868,7 @@ impl Database { &self, project_id: ProjectId, connection: ConnectionId, - ) -> Result> { + ) -> Result> { let room_id = self.room_id_for_project(project_id).await?; self.room_transaction(room_id, |tx| async move { let result = project_collaborator::Entity::delete_many() @@ -3377,20 +3387,29 @@ impl Database { pub async fn get_channel_members(&self, id: ChannelId) -> Result> { self.transaction(|tx| async move { let tx = tx; - let ancestor_ids = self.get_channel_ancestors(id, &*tx).await?; - let user_ids = channel_member::Entity::find() - .distinct() - .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied())) - .select_only() - .column(channel_member::Column::UserId) - .into_values::<_, QueryUserIds>() - .all(&*tx) - .await?; + let user_ids = self.get_channel_members_internal(id, &*tx).await?; Ok(user_ids) }) .await } + pub async fn get_channel_members_internal( + &self, + id: ChannelId, + tx: &DatabaseTransaction, + ) -> Result> { + let ancestor_ids = self.get_channel_ancestors(id, tx).await?; + let user_ids = channel_member::Entity::find() + .distinct() + .filter(channel_member::Column::ChannelId.is_in(ancestor_ids.iter().copied())) + .select_only() + .column(channel_member::Column::UserId) + .into_values::<_, QueryUserIds>() + .all(&*tx) + .await?; + Ok(user_ids) + } + async fn get_channel_ancestors( &self, id: ChannelId, @@ -3913,8 +3932,27 @@ id_type!(ServerId); id_type!(SignupId); id_type!(UserId); -pub struct RejoinedRoom { +pub struct ChannelRoom { pub room: proto::Room, + pub channel_participants: Vec, +} + +impl Deref for ChannelRoom { + type Target = proto::Room; + + fn deref(&self) -> &Self::Target { + &self.room + } +} + +impl DerefMut for ChannelRoom { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.room + } +} + +pub struct RejoinedRoom { + pub room: ChannelRoom, pub rejoined_projects: Vec, pub reshared_projects: Vec, } @@ -3951,14 +3989,14 @@ pub struct RejoinedWorktree { } pub struct LeftRoom { - pub room: proto::Room, + pub room: ChannelRoom, pub left_projects: HashMap, pub canceled_calls_to_user_ids: Vec, pub deleted: bool, } pub struct RefreshedRoom { - pub room: proto::Room, + pub room: ChannelRoom, pub stale_participant_user_ids: Vec, pub canceled_calls_to_user_ids: Vec, } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index eaa3eb8261..4d30d17485 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,7 +2,7 @@ mod connection_pool; use crate::{ auth, - db::{self, ChannelId, Database, ProjectId, RoomId, ServerId, User, UserId}, + db::{self, ChannelId, ChannelRoom, Database, ProjectId, RoomId, ServerId, User, UserId}, executor::Executor, AppState, Result, }; @@ -2426,7 +2426,10 @@ fn contact_for_user( } } -fn room_updated(room: &proto::Room, peer: &Peer) { +fn room_updated(room: &ChannelRoom, peer: &Peer, pool: &ConnectionPool) { + let channel_ids = &room.channel_participants; + let room = &room.room; + broadcast( None, room.participants @@ -2441,6 +2444,21 @@ fn room_updated(room: &proto::Room, peer: &Peer) { ) }, ); + + broadcast( + None, + channel_ids + .iter() + .flat_map(|user_id| pool.user_connection_ids(*user_id)), + |peer_id| { + peer.send( + peer_id.into(), + proto::RoomUpdated { + room: Some(room.clone()), + }, + ) + }, + ); } async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> { @@ -2491,7 +2509,11 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { project_left(project, session); } - room_updated(&left_room.room, &session.peer); + { + let connection_pool = session.connection_pool().await; + room_updated(&left_room.room, &session.peer, &connection_pool); + } + room_id = RoomId::from_proto(left_room.room.id); canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids); live_kit_room = mem::take(&mut left_room.room.live_kit_room);