diff --git a/Cargo.lock b/Cargo.lock index b6f86980ae..8cd5e7d6d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1041,6 +1041,7 @@ dependencies = [ "client", "collections", "ctor", + "dashmap", "editor", "env_logger", "envy", @@ -1536,6 +1537,19 @@ dependencies = [ "syn", ] +[[package]] +name = "dashmap" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "907076dfda823b0b36d2a1bb5f90c96660a5bbcd7729e10727f07858f22c4edc" +dependencies = [ + "cfg-if 1.0.0", + "hashbrown 0.12.3", + "lock_api", + "once_cell", + "parking_lot_core 0.9.4", +] + [[package]] name = "data-url" version = "0.1.1" diff --git a/crates/collab/Cargo.toml b/crates/collab/Cargo.toml index f04918605f..e5a97b9764 100644 --- a/crates/collab/Cargo.toml +++ b/crates/collab/Cargo.toml @@ -24,6 +24,7 @@ axum = { version = "0.5", features = ["json", "headers", "ws"] } axum-extra = { version = "0.3", features = ["erased-json"] } base64 = "0.13" clap = { version = "3.1", features = ["derive"], optional = true } +dashmap = "5.4" envy = "0.4.2" futures = "0.3" hyper = "0.14" diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 295234af61..84ad5082d0 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -2,6 +2,7 @@ use crate::{Error, Result}; use anyhow::anyhow; use axum::http::StatusCode; use collections::{BTreeMap, HashMap, HashSet}; +use dashmap::DashMap; use futures::{future::BoxFuture, FutureExt, StreamExt}; use rpc::{proto, ConnectionId}; use serde::{Deserialize, Serialize}; @@ -10,8 +11,17 @@ use sqlx::{ types::Uuid, FromRow, }; -use std::{future::Future, path::Path, time::Duration}; +use std::{ + future::Future, + marker::PhantomData, + ops::{Deref, DerefMut}, + path::Path, + rc::Rc, + sync::Arc, + time::Duration, +}; use time::{OffsetDateTime, PrimitiveDateTime}; +use tokio::sync::{Mutex, OwnedMutexGuard}; #[cfg(test)] pub type DefaultDb = Db; @@ -21,12 +31,33 @@ pub type DefaultDb = Db; pub struct Db { pool: sqlx::Pool, + rooms: DashMap>>, #[cfg(test)] background: Option>, #[cfg(test)] runtime: Option, } +pub struct RoomGuard { + data: T, + _guard: OwnedMutexGuard<()>, + _not_send: PhantomData>, +} + +impl Deref for RoomGuard { + type Target = T; + + fn deref(&self) -> &T { + &self.data + } +} + +impl DerefMut for RoomGuard { + fn deref_mut(&mut self) -> &mut T { + &mut self.data + } +} + pub trait BeginTransaction: Send + Sync { type Database: sqlx::Database; @@ -90,6 +121,7 @@ impl Db { .await?; Ok(Self { pool, + rooms: Default::default(), background: None, runtime: None, }) @@ -197,6 +229,7 @@ impl Db { .await?; Ok(Self { pool, + rooms: DashMap::with_capacity(16384), #[cfg(test)] background: None, #[cfg(test)] @@ -922,13 +955,29 @@ where .await } + async fn commit_room_transaction<'a, T>( + &'a self, + room_id: RoomId, + tx: sqlx::Transaction<'static, D>, + data: T, + ) -> Result> { + let lock = self.rooms.entry(room_id).or_default().clone(); + let _guard = lock.lock_owned().await; + tx.commit().await?; + Ok(RoomGuard { + data, + _guard, + _not_send: PhantomData, + }) + } + pub async fn create_room( &self, user_id: UserId, connection_id: ConnectionId, - ) -> Result { + live_kit_room: &str, + ) -> Result> { self.transact(|mut tx| async move { - let live_kit_room = nanoid::nanoid!(30); let room_id = sqlx::query_scalar( " INSERT INTO rooms (live_kit_room) @@ -956,8 +1005,7 @@ where .await?; let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - Ok(room) + self.commit_room_transaction(room_id, tx, room).await }).await } @@ -968,11 +1016,17 @@ where calling_connection_id: ConnectionId, called_user_id: UserId, initial_project_id: Option, - ) -> Result<(proto::Room, proto::IncomingCall)> { + ) -> Result> { self.transact(|mut tx| async move { sqlx::query( " - INSERT INTO room_participants (room_id, user_id, calling_user_id, calling_connection_id, initial_project_id) + INSERT INTO room_participants ( + room_id, + user_id, + calling_user_id, + calling_connection_id, + initial_project_id + ) VALUES ($1, $2, $3, $4, $5) ", ) @@ -985,12 +1039,12 @@ where .await?; let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - let incoming_call = Self::build_incoming_call(&room, called_user_id) .ok_or_else(|| anyhow!("failed to build incoming call"))?; - Ok((room, incoming_call)) - }).await + self.commit_room_transaction(room_id, tx, (room, incoming_call)) + .await + }) + .await } pub async fn incoming_call_for_user( @@ -1051,7 +1105,7 @@ where &self, room_id: RoomId, called_user_id: UserId, - ) -> Result { + ) -> Result> { self.transact(|mut tx| async move { sqlx::query( " @@ -1065,8 +1119,7 @@ where .await?; let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - Ok(room) + self.commit_room_transaction(room_id, tx, room).await }) .await } @@ -1075,7 +1128,7 @@ where &self, expected_room_id: Option, user_id: UserId, - ) -> Result { + ) -> Result> { self.transact(|mut tx| async move { let room_id = sqlx::query_scalar( " @@ -1092,8 +1145,7 @@ where } let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - Ok(room) + self.commit_room_transaction(room_id, tx, room).await }) .await } @@ -1103,7 +1155,7 @@ where expected_room_id: Option, calling_connection_id: ConnectionId, called_user_id: UserId, - ) -> Result { + ) -> Result> { self.transact(|mut tx| async move { let room_id = sqlx::query_scalar( " @@ -1121,8 +1173,7 @@ where } let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - Ok(room) + self.commit_room_transaction(room_id, tx, room).await }).await } @@ -1131,7 +1182,7 @@ where room_id: RoomId, user_id: UserId, connection_id: ConnectionId, - ) -> Result { + ) -> Result> { self.transact(|mut tx| async move { sqlx::query( " @@ -1148,13 +1199,15 @@ where .await?; let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - Ok(room) + self.commit_room_transaction(room_id, tx, room).await }) .await } - pub async fn leave_room(&self, connection_id: ConnectionId) -> Result> { + pub async fn leave_room( + &self, + connection_id: ConnectionId, + ) -> Result>> { self.transact(|mut tx| async move { // Leave room. let room_id = sqlx::query_scalar::<_, RoomId>( @@ -1258,13 +1311,18 @@ where .await?; let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - - Ok(Some(LeftRoom { - room, - left_projects, - canceled_calls_to_user_ids, - })) + Ok(Some( + self.commit_room_transaction( + room_id, + tx, + LeftRoom { + room, + left_projects, + canceled_calls_to_user_ids, + }, + ) + .await?, + )) } else { Ok(None) } @@ -1277,7 +1335,7 @@ where room_id: RoomId, connection_id: ConnectionId, location: proto::ParticipantLocation, - ) -> Result { + ) -> Result> { self.transact(|tx| async { let mut tx = tx; let location_kind; @@ -1317,8 +1375,7 @@ where .await?; let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - Ok(room) + self.commit_room_transaction(room_id, tx, room).await }) .await } @@ -1478,7 +1535,7 @@ where expected_room_id: RoomId, connection_id: ConnectionId, worktrees: &[proto::WorktreeMetadata], - ) -> Result<(ProjectId, proto::Room)> { + ) -> Result> { self.transact(|mut tx| async move { let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>( " @@ -1560,9 +1617,8 @@ where .await?; let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - - Ok((project_id, room)) + self.commit_room_transaction(room_id, tx, (project_id, room)) + .await }) .await } @@ -1571,7 +1627,7 @@ where &self, project_id: ProjectId, connection_id: ConnectionId, - ) -> Result<(proto::Room, Vec)> { + ) -> Result)>> { self.transact(|mut tx| async move { let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; let room_id: RoomId = sqlx::query_scalar( @@ -1586,9 +1642,8 @@ where .fetch_one(&mut tx) .await?; let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - - Ok((room, guest_connection_ids)) + self.commit_room_transaction(room_id, tx, (room, guest_connection_ids)) + .await }) .await } @@ -1598,7 +1653,7 @@ where project_id: ProjectId, connection_id: ConnectionId, worktrees: &[proto::WorktreeMetadata], - ) -> Result<(proto::Room, Vec)> { + ) -> Result)>> { self.transact(|mut tx| async move { let room_id: RoomId = sqlx::query_scalar( " @@ -1664,9 +1719,8 @@ where let guest_connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; let room = self.get_room(room_id, &mut tx).await?; - tx.commit().await?; - - Ok((room, guest_connection_ids)) + self.commit_room_transaction(room_id, tx, (room, guest_connection_ids)) + .await }) .await } @@ -1675,15 +1729,15 @@ where &self, update: &proto::UpdateWorktree, connection_id: ConnectionId, - ) -> Result> { + ) -> Result>> { self.transact(|mut tx| async move { let project_id = ProjectId::from_proto(update.project_id); let worktree_id = WorktreeId::from_proto(update.worktree_id); // Ensure the update comes from the host. - sqlx::query( + let room_id: RoomId = sqlx::query_scalar( " - SELECT 1 + SELECT room_id FROM projects WHERE id = $1 AND host_connection_id = $2 ", @@ -1781,8 +1835,8 @@ where } let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; - tx.commit().await?; - Ok(connection_ids) + self.commit_room_transaction(room_id, tx, connection_ids) + .await }) .await } @@ -1791,7 +1845,7 @@ where &self, update: &proto::UpdateDiagnosticSummary, connection_id: ConnectionId, - ) -> Result> { + ) -> Result>> { self.transact(|mut tx| async { let project_id = ProjectId::from_proto(update.project_id); let worktree_id = WorktreeId::from_proto(update.worktree_id); @@ -1801,9 +1855,9 @@ where .ok_or_else(|| anyhow!("invalid summary"))?; // Ensure the update comes from the host. - sqlx::query( + let room_id: RoomId = sqlx::query_scalar( " - SELECT 1 + SELECT room_id FROM projects WHERE id = $1 AND host_connection_id = $2 ", @@ -1841,8 +1895,8 @@ where .await?; let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; - tx.commit().await?; - Ok(connection_ids) + self.commit_room_transaction(room_id, tx, connection_ids) + .await }) .await } @@ -1851,7 +1905,7 @@ where &self, update: &proto::StartLanguageServer, connection_id: ConnectionId, - ) -> Result> { + ) -> Result>> { self.transact(|mut tx| async { let project_id = ProjectId::from_proto(update.project_id); let server = update @@ -1860,9 +1914,9 @@ where .ok_or_else(|| anyhow!("invalid language server"))?; // Ensure the update comes from the host. - sqlx::query( + let room_id: RoomId = sqlx::query_scalar( " - SELECT 1 + SELECT room_id FROM projects WHERE id = $1 AND host_connection_id = $2 ", @@ -1888,8 +1942,8 @@ where .await?; let connection_ids = self.get_guest_connection_ids(project_id, &mut tx).await?; - tx.commit().await?; - Ok(connection_ids) + self.commit_room_transaction(room_id, tx, connection_ids) + .await }) .await } @@ -1898,7 +1952,7 @@ where &self, project_id: ProjectId, connection_id: ConnectionId, - ) -> Result<(Project, ReplicaId)> { + ) -> Result> { self.transact(|mut tx| async move { let (room_id, user_id) = sqlx::query_as::<_, (RoomId, UserId)>( " @@ -2068,21 +2122,25 @@ where .fetch_all(&mut tx) .await?; - tx.commit().await?; - Ok(( - Project { - collaborators, - worktrees, - language_servers: language_servers - .into_iter() - .map(|language_server| proto::LanguageServer { - id: language_server.id.to_proto(), - name: language_server.name, - }) - .collect(), - }, - replica_id as ReplicaId, - )) + self.commit_room_transaction( + room_id, + tx, + ( + Project { + collaborators, + worktrees, + language_servers: language_servers + .into_iter() + .map(|language_server| proto::LanguageServer { + id: language_server.id.to_proto(), + name: language_server.name, + }) + .collect(), + }, + replica_id as ReplicaId, + ), + ) + .await }) .await } @@ -2091,7 +2149,7 @@ where &self, project_id: ProjectId, connection_id: ConnectionId, - ) -> Result { + ) -> Result> { self.transact(|mut tx| async move { let result = sqlx::query( " @@ -2122,25 +2180,29 @@ where .map(|id| ConnectionId(id as u32)) .collect(); - let (host_user_id, host_connection_id) = sqlx::query_as::<_, (i32, i32)>( - " - SELECT host_user_id, host_connection_id + let (room_id, host_user_id, host_connection_id) = + sqlx::query_as::<_, (RoomId, i32, i32)>( + " + SELECT room_id, host_user_id, host_connection_id FROM projects WHERE id = $1 ", + ) + .bind(project_id) + .fetch_one(&mut tx) + .await?; + + self.commit_room_transaction( + room_id, + tx, + LeftProject { + id: project_id, + host_user_id: UserId(host_user_id), + host_connection_id: ConnectionId(host_connection_id as u32), + connection_ids, + }, ) - .bind(project_id) - .fetch_one(&mut tx) - .await?; - - tx.commit().await?; - - Ok(LeftProject { - id: project_id, - host_user_id: UserId(host_user_id), - host_connection_id: ConnectionId(host_connection_id as u32), - connection_ids, - }) + .await }) .await } @@ -2538,9 +2600,9 @@ where let result = self.runtime.as_ref().unwrap().block_on(body); - if let Some(background) = self.background.as_ref() { - background.simulate_random_delay().await; - } + // if let Some(background) = self.background.as_ref() { + // background.simulate_random_delay().await; + // } result } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index ba97b09acd..07b9891480 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -42,6 +42,7 @@ use std::{ fmt, future::Future, marker::PhantomData, + mem, net::SocketAddr, ops::{Deref, DerefMut}, rc::Rc, @@ -702,20 +703,15 @@ async fn create_room( response: Response, session: Session, ) -> Result<()> { - let room = session - .db() - .await - .create_room(session.user_id, session.connection_id) - .await?; - + let live_kit_room = nanoid::nanoid!(30); let live_kit_connection_info = if let Some(live_kit) = session.live_kit_client.as_ref() { if let Some(_) = live_kit - .create_room(room.live_kit_room.clone()) + .create_room(live_kit_room.clone()) .await .trace_err() { if let Some(token) = live_kit - .room_token(&room.live_kit_room, &session.connection_id.to_string()) + .room_token(&live_kit_room, &session.connection_id.to_string()) .trace_err() { Some(proto::LiveKitConnectionInfo { @@ -732,10 +728,19 @@ async fn create_room( None }; - response.send(proto::CreateRoomResponse { - room: Some(room), - live_kit_connection_info, - })?; + { + let room = session + .db() + .await + .create_room(session.user_id, session.connection_id, &live_kit_room) + .await?; + + response.send(proto::CreateRoomResponse { + room: Some(room.clone()), + live_kit_connection_info, + })?; + } + update_user_contacts(session.user_id, &session).await?; Ok(()) } @@ -745,15 +750,20 @@ async fn join_room( response: Response, session: Session, ) -> Result<()> { - let room = session - .db() - .await - .join_room( - RoomId::from_proto(request.id), - session.user_id, - session.connection_id, - ) - .await?; + let room = { + let room = session + .db() + .await + .join_room( + RoomId::from_proto(request.id), + session.user_id, + session.connection_id, + ) + .await?; + room_updated(&room, &session); + room.clone() + }; + for connection_id in session .connection_pool() .await @@ -781,7 +791,6 @@ async fn join_room( None }; - room_updated(&room, &session); response.send(proto::JoinRoomResponse { room: Some(room), live_kit_connection_info, @@ -814,18 +823,21 @@ async fn call( return Err(anyhow!("cannot call a user who isn't a contact"))?; } - let (room, incoming_call) = session - .db() - .await - .call( - room_id, - calling_user_id, - calling_connection_id, - called_user_id, - initial_project_id, - ) - .await?; - room_updated(&room, &session); + let incoming_call = { + let (room, incoming_call) = &mut *session + .db() + .await + .call( + room_id, + calling_user_id, + calling_connection_id, + called_user_id, + initial_project_id, + ) + .await?; + room_updated(&room, &session); + mem::take(incoming_call) + }; update_user_contacts(called_user_id, &session).await?; let mut calls = session @@ -847,12 +859,14 @@ async fn call( } } - let room = session - .db() - .await - .call_failed(room_id, called_user_id) - .await?; - room_updated(&room, &session); + { + let room = session + .db() + .await + .call_failed(room_id, called_user_id) + .await?; + room_updated(&room, &session); + } update_user_contacts(called_user_id, &session).await?; Err(anyhow!("failed to ring user"))? @@ -865,11 +879,15 @@ async fn cancel_call( ) -> Result<()> { let called_user_id = UserId::from_proto(request.called_user_id); let room_id = RoomId::from_proto(request.room_id); - let room = session - .db() - .await - .cancel_call(Some(room_id), session.connection_id, called_user_id) - .await?; + { + let room = session + .db() + .await + .cancel_call(Some(room_id), session.connection_id, called_user_id) + .await?; + room_updated(&room, &session); + } + for connection_id in session .connection_pool() .await @@ -880,7 +898,6 @@ async fn cancel_call( .send(connection_id, proto::CallCanceled {}) .trace_err(); } - room_updated(&room, &session); response.send(proto::Ack {})?; update_user_contacts(called_user_id, &session).await?; @@ -889,11 +906,15 @@ async fn cancel_call( async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<()> { let room_id = RoomId::from_proto(message.room_id); - let room = session - .db() - .await - .decline_call(Some(room_id), session.user_id) - .await?; + { + let room = session + .db() + .await + .decline_call(Some(room_id), session.user_id) + .await?; + room_updated(&room, &session); + } + for connection_id in session .connection_pool() .await @@ -904,7 +925,6 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<( .send(connection_id, proto::CallCanceled {}) .trace_err(); } - room_updated(&room, &session); update_user_contacts(session.user_id, &session).await?; Ok(()) } @@ -933,7 +953,7 @@ async fn share_project( response: Response, session: Session, ) -> Result<()> { - let (project_id, room) = session + let (project_id, room) = &*session .db() .await .share_project( @@ -953,15 +973,17 @@ async fn share_project( async fn unshare_project(message: proto::UnshareProject, session: Session) -> Result<()> { let project_id = ProjectId::from_proto(message.project_id); - let (room, guest_connection_ids) = session + let (room, guest_connection_ids) = &*session .db() .await .unshare_project(project_id, session.connection_id) .await?; - broadcast(session.connection_id, guest_connection_ids, |conn_id| { - session.peer.send(conn_id, message.clone()) - }); + broadcast( + session.connection_id, + guest_connection_ids.iter().copied(), + |conn_id| session.peer.send(conn_id, message.clone()), + ); room_updated(&room, &session); Ok(()) @@ -977,7 +999,7 @@ async fn join_project( tracing::info!(%project_id, "join project"); - let (project, replica_id) = session + let (project, replica_id) = &mut *session .db() .await .join_project(project_id, session.connection_id) @@ -1029,7 +1051,7 @@ async fn join_project( language_servers: project.language_servers.clone(), })?; - for (worktree_id, worktree) in project.worktrees { + for (worktree_id, worktree) in mem::take(&mut project.worktrees) { #[cfg(any(test, feature = "test-support"))] const MAX_CHUNK_SIZE: usize = 2; #[cfg(not(any(test, feature = "test-support")))] @@ -1084,21 +1106,23 @@ async fn join_project( async fn leave_project(request: proto::LeaveProject, session: Session) -> Result<()> { let sender_id = session.connection_id; let project_id = ProjectId::from_proto(request.project_id); - let project; - { - project = session - .db() - .await - .leave_project(project_id, sender_id) - .await?; - tracing::info!( - %project_id, - host_user_id = %project.host_user_id, - host_connection_id = %project.host_connection_id, - "leave project" - ); - broadcast(sender_id, project.connection_ids, |conn_id| { + let project = session + .db() + .await + .leave_project(project_id, sender_id) + .await?; + tracing::info!( + %project_id, + host_user_id = %project.host_user_id, + host_connection_id = %project.host_connection_id, + "leave project" + ); + + broadcast( + sender_id, + project.connection_ids.iter().copied(), + |conn_id| { session.peer.send( conn_id, proto::RemoveProjectCollaborator { @@ -1106,8 +1130,8 @@ async fn leave_project(request: proto::LeaveProject, session: Session) -> Result peer_id: sender_id.0, }, ) - }); - } + }, + ); Ok(()) } @@ -1118,14 +1142,14 @@ async fn update_project( session: Session, ) -> Result<()> { let project_id = ProjectId::from_proto(request.project_id); - let (room, guest_connection_ids) = session + let (room, guest_connection_ids) = &*session .db() .await .update_project(project_id, session.connection_id, &request.worktrees) .await?; broadcast( session.connection_id, - guest_connection_ids, + guest_connection_ids.iter().copied(), |connection_id| { session .peer @@ -1151,7 +1175,7 @@ async fn update_worktree( broadcast( session.connection_id, - guest_connection_ids, + guest_connection_ids.iter().copied(), |connection_id| { session .peer @@ -1175,7 +1199,7 @@ async fn update_diagnostic_summary( broadcast( session.connection_id, - guest_connection_ids, + guest_connection_ids.iter().copied(), |connection_id| { session .peer @@ -1199,7 +1223,7 @@ async fn start_language_server( broadcast( session.connection_id, - guest_connection_ids, + guest_connection_ids.iter().copied(), |connection_id| { session .peer @@ -1826,52 +1850,61 @@ async fn update_user_contacts(user_id: UserId, session: &Session) -> Result<()> async fn leave_room_for_session(session: &Session) -> Result<()> { let mut contacts_to_update = HashSet::default(); - let Some(left_room) = session.db().await.leave_room(session.connection_id).await? else { - return Err(anyhow!("no room to leave"))?; - }; - contacts_to_update.insert(session.user_id); + let canceled_calls_to_user_ids; + let live_kit_room; + let delete_live_kit_room; + { + let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? else { + return Err(anyhow!("no room to leave"))?; + }; + contacts_to_update.insert(session.user_id); - for project in left_room.left_projects.into_values() { - for connection_id in project.connection_ids { - if project.host_user_id == session.user_id { - session - .peer - .send( - connection_id, - proto::UnshareProject { - project_id: project.id.to_proto(), - }, - ) - .trace_err(); - } else { - session - .peer - .send( - connection_id, - proto::RemoveProjectCollaborator { - project_id: project.id.to_proto(), - peer_id: session.connection_id.0, - }, - ) - .trace_err(); + for project in left_room.left_projects.values() { + for connection_id in &project.connection_ids { + if project.host_user_id == session.user_id { + session + .peer + .send( + *connection_id, + proto::UnshareProject { + project_id: project.id.to_proto(), + }, + ) + .trace_err(); + } else { + session + .peer + .send( + *connection_id, + proto::RemoveProjectCollaborator { + project_id: project.id.to_proto(), + peer_id: session.connection_id.0, + }, + ) + .trace_err(); + } } + + session + .peer + .send( + session.connection_id, + proto::UnshareProject { + project_id: project.id.to_proto(), + }, + ) + .trace_err(); } - session - .peer - .send( - session.connection_id, - proto::UnshareProject { - project_id: project.id.to_proto(), - }, - ) - .trace_err(); + room_updated(&left_room.room, &session); + canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids); + live_kit_room = mem::take(&mut left_room.room.live_kit_room); + delete_live_kit_room = left_room.room.participants.is_empty(); } - room_updated(&left_room.room, &session); { let pool = session.connection_pool().await; - for canceled_user_id in left_room.canceled_calls_to_user_ids { + for canceled_user_id in canceled_calls_to_user_ids { for connection_id in pool.user_connection_ids(canceled_user_id) { session .peer @@ -1888,18 +1921,12 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { if let Some(live_kit) = session.live_kit_client.as_ref() { live_kit - .remove_participant( - left_room.room.live_kit_room.clone(), - session.connection_id.to_string(), - ) + .remove_participant(live_kit_room.clone(), session.connection_id.to_string()) .await .trace_err(); - if left_room.room.participants.is_empty() { - live_kit - .delete_room(left_room.room.live_kit_room) - .await - .trace_err(); + if delete_live_kit_room { + live_kit.delete_room(live_kit_room).await.trace_err(); } }