From 067a19c971ee97794a215201bb3b9c26f673b1ca Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Thu, 15 Dec 2022 10:45:03 +0100 Subject: [PATCH] Avoid logging an error when user who hasn't joined any room disconnects --- crates/collab/src/db.rs | 95 +++++++++++++++++++++++++++------------- crates/collab/src/rpc.rs | 12 ++--- 2 files changed, 72 insertions(+), 35 deletions(-) diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 461fe06085..1b559a48d5 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1238,36 +1238,41 @@ impl Database { &self, expected_room_id: Option, user_id: UserId, - ) -> Result> { - self.room_transaction(|tx| async move { - let participant = room_participant::Entity::find() - .filter( - room_participant::Column::UserId - .eq(user_id) - .and(room_participant::Column::AnsweringConnectionId.is_null()), - ) - .one(&*tx) - .await? - .ok_or_else(|| anyhow!("could not decline call"))?; - let room_id = participant.room_id; - - if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { - return Err(anyhow!("declining call on unexpected room"))?; + ) -> Result>> { + self.optional_room_transaction(|tx| async move { + let mut filter = Condition::all() + .add(room_participant::Column::UserId.eq(user_id)) + .add(room_participant::Column::AnsweringConnectionId.is_null()); + if let Some(room_id) = expected_room_id { + filter = filter.add(room_participant::Column::RoomId.eq(room_id)); } + let participant = room_participant::Entity::find() + .filter(filter) + .one(&*tx) + .await?; + let participant = if let Some(participant) = participant { + participant + } else if expected_room_id.is_some() { + return Err(anyhow!("could not find call to decline"))?; + } else { + return Ok(None); + }; + + let room_id = participant.room_id; room_participant::Entity::delete(participant.into_active_model()) .exec(&*tx) .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(Some((room_id, room))) }) .await } pub async fn cancel_call( &self, - expected_room_id: Option, + room_id: RoomId, calling_connection: ConnectionId, called_user_id: UserId, ) -> Result> { @@ -1276,6 +1281,7 @@ impl Database { .filter( Condition::all() .add(room_participant::Column::UserId.eq(called_user_id)) + .add(room_participant::Column::RoomId.eq(room_id)) .add( room_participant::Column::CallingConnectionId .eq(calling_connection.id as i32), @@ -1288,11 +1294,8 @@ impl Database { ) .one(&*tx) .await? - .ok_or_else(|| anyhow!("could not cancel call"))?; + .ok_or_else(|| anyhow!("no call to cancel"))?; let room_id = participant.room_id; - if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { - return Err(anyhow!("canceling call on unexpected room"))?; - } room_participant::Entity::delete(participant.into_active_model()) .exec(&*tx) @@ -1346,8 +1349,11 @@ impl Database { .await } - pub async fn leave_room(&self, connection: ConnectionId) -> Result> { - self.room_transaction(|tx| async move { + pub async fn leave_room( + &self, + connection: ConnectionId, + ) -> Result>> { + self.optional_room_transaction(|tx| async move { let leaving_participant = room_participant::Entity::find() .filter( Condition::all() @@ -1498,9 +1504,9 @@ impl Database { self.rooms.remove(&room_id); } - Ok((room_id, left_room)) + Ok(Some((room_id, left_room))) } else { - Err(anyhow!("could not leave room"))? + Ok(None) } }) .await @@ -2549,25 +2555,25 @@ impl Database { self.run(body).await } - async fn room_transaction(&self, f: F) -> Result> + async fn optional_room_transaction(&self, f: F) -> Result>> where F: Send + Fn(TransactionHandle) -> Fut, - Fut: Send + Future>, + Fut: Send + Future>>, { let body = async { loop { let (tx, result) = self.with_transaction(&f).await?; match result { - Ok((room_id, data)) => { + Ok(Some((room_id, data))) => { let lock = self.rooms.entry(room_id).or_default().clone(); let _guard = lock.lock_owned().await; match tx.commit().await.map_err(Into::into) { Ok(()) => { - return Ok(RoomGuard { + return Ok(Some(RoomGuard { data, _guard, _not_send: PhantomData, - }); + })); } Err(error) => { if is_serialization_error(&error) { @@ -2578,6 +2584,18 @@ impl Database { } } } + Ok(None) => { + match tx.commit().await.map_err(Into::into) { + Ok(()) => return Ok(None), + Err(error) => { + if is_serialization_error(&error) { + // Retry (don't break the loop) + } else { + return Err(error); + } + } + } + } Err(error) => { tx.rollback().await?; if is_serialization_error(&error) { @@ -2593,6 +2611,23 @@ impl Database { self.run(body).await } + async fn room_transaction(&self, f: F) -> Result> + where + F: Send + Fn(TransactionHandle) -> Fut, + Fut: Send + Future>, + { + let data = self + .optional_room_transaction(move |tx| { + let future = f(tx); + async { + let data = future.await?; + Ok(Some(data)) + } + }) + .await?; + Ok(data.unwrap()) + } + async fn with_transaction(&self, f: &F) -> Result<(DatabaseTransaction, Result)> where F: Send + Fn(TransactionHandle) -> Fut, diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index d83aa65a6b..74c221b82d 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -820,7 +820,7 @@ async fn sign_out( .is_user_online(session.user_id) { let db = session.db().await; - if let Some(room) = db.decline_call(None, session.user_id).await.trace_err() { + if let Some(room) = db.decline_call(None, session.user_id).await.trace_err().flatten() { room_updated(&room, &session.peer); } } @@ -1024,7 +1024,7 @@ async fn cancel_call( let room = session .db() .await - .cancel_call(Some(room_id), session.connection_id, called_user_id) + .cancel_call(room_id, session.connection_id, called_user_id) .await?; room_updated(&room, &session.peer); } @@ -1057,7 +1057,8 @@ async fn decline_call(message: proto::DeclineCall, session: Session) -> Result<( .db() .await .decline_call(Some(room_id), session.user_id) - .await?; + .await? + .ok_or_else(|| anyhow!("failed to decline call"))?; room_updated(&room, &session.peer); } @@ -2026,8 +2027,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { let canceled_calls_to_user_ids; let live_kit_room; let delete_live_kit_room; - { - let mut left_room = session.db().await.leave_room(session.connection_id).await?; + if let Some(mut left_room) = session.db().await.leave_room(session.connection_id).await? { contacts_to_update.insert(session.user_id); for project in left_room.left_projects.values() { @@ -2039,6 +2039,8 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids); live_kit_room = mem::take(&mut left_room.room.live_kit_room); delete_live_kit_room = left_room.room.participants.is_empty(); + } else { + return Ok(()); } {