diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index b08b29890d..ef65e9a9bd 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -158,7 +158,7 @@ impl Database { room_id: RoomId, new_server_id: ServerId, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { let stale_participant_filter = Condition::all() .add(room_participant::Column::RoomId.eq(room_id)) .add(room_participant::Column::AnsweringConnectionId.is_not_null()) @@ -194,14 +194,11 @@ impl Database { room::Entity::delete_by_id(room_id).exec(&*tx).await?; } - Ok(( - room_id, - RefreshedRoom { - room, - stale_participant_user_ids, - canceled_calls_to_user_ids, - }, - )) + Ok(RefreshedRoom { + room, + stale_participant_user_ids, + canceled_calls_to_user_ids, + }) }) .await } @@ -1130,18 +1127,16 @@ impl Database { user_id: UserId, connection: ConnectionId, live_kit_room: &str, - ) -> Result> { - self.room_transaction(|tx| async move { + ) -> Result { + self.transaction(|tx| async move { let room = room::ActiveModel { live_kit_room: ActiveValue::set(live_kit_room.into()), ..Default::default() } .insert(&*tx) .await?; - let room_id = room.id; - room_participant::ActiveModel { - room_id: ActiveValue::set(room_id), + room_id: ActiveValue::set(room.id), user_id: ActiveValue::set(user_id), answering_connection_id: ActiveValue::set(Some(connection.id as i32)), answering_connection_server_id: ActiveValue::set(Some(ServerId( @@ -1158,8 +1153,8 @@ impl Database { .insert(&*tx) .await?; - let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + let room = self.get_room(room.id, &tx).await?; + Ok(room) }) .await } @@ -1172,7 +1167,7 @@ impl Database { called_user_id: UserId, initial_project_id: Option, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { room_participant::ActiveModel { room_id: ActiveValue::set(room_id), user_id: ActiveValue::set(called_user_id), @@ -1191,7 +1186,7 @@ impl Database { let room = self.get_room(room_id, &tx).await?; let incoming_call = Self::build_incoming_call(&room, called_user_id) .ok_or_else(|| anyhow!("failed to build incoming call"))?; - Ok((room_id, (room, incoming_call))) + Ok((room, incoming_call)) }) .await } @@ -1201,7 +1196,7 @@ impl Database { room_id: RoomId, called_user_id: UserId, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { room_participant::Entity::delete_many() .filter( room_participant::Column::RoomId @@ -1211,7 +1206,7 @@ impl Database { .exec(&*tx) .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(room) }) .await } @@ -1258,7 +1253,7 @@ impl Database { calling_connection: ConnectionId, called_user_id: UserId, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { let participant = room_participant::Entity::find() .filter( Condition::all() @@ -1277,14 +1272,13 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("no call to cancel"))?; - let room_id = participant.room_id; room_participant::Entity::delete(participant.into_active_model()) .exec(&*tx) .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(room) }) .await } @@ -1295,7 +1289,7 @@ impl Database { user_id: UserId, connection: ConnectionId, ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { let result = room_participant::Entity::update_many() .filter( Condition::all() @@ -1317,7 +1311,7 @@ impl Database { Err(anyhow!("room does not exist or was already joined"))? } else { let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(room) } }) .await @@ -1329,9 +1323,9 @@ impl Database { user_id: UserId, connection: ConnectionId, ) -> Result> { - self.room_transaction(|tx| async { + let room_id = RoomId::from_proto(rejoin_room.id); + self.room_transaction(room_id, |tx| async { let tx = tx; - let room_id = RoomId::from_proto(rejoin_room.id); let participant_update = room_participant::Entity::update_many() .filter( Condition::all() @@ -1550,14 +1544,11 @@ impl Database { } let room = self.get_room(room_id, &tx).await?; - Ok(( - room_id, - RejoinedRoom { - room, - rejoined_projects, - reshared_projects, - }, - )) + Ok(RejoinedRoom { + room, + rejoined_projects, + reshared_projects, + }) }) .await } @@ -1789,7 +1780,7 @@ impl Database { connection: ConnectionId, location: proto::ParticipantLocation, ) -> Result> { - self.room_transaction(|tx| async { + self.room_transaction(room_id, |tx| async { let tx = tx; let location_kind; let location_project_id; @@ -1835,7 +1826,7 @@ impl Database { if result.rows_affected == 1 { let room = self.get_room(room_id, &tx).await?; - Ok((room_id, room)) + Ok(room) } else { Err(anyhow!("could not update room participant location"))? } @@ -2042,7 +2033,7 @@ impl Database { connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], ) -> Result> { - self.room_transaction(|tx| async move { + self.room_transaction(room_id, |tx| async move { let participant = room_participant::Entity::find() .filter( Condition::all() @@ -2103,7 +2094,7 @@ impl Database { .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, (project.id, room))) + Ok((project.id, room)) }) .await } @@ -2113,7 +2104,8 @@ impl Database { project_id: ProjectId, connection: ConnectionId, ) -> Result)>> { - self.room_transaction(|tx| async move { + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; let project = project::Entity::find_by_id(project_id) @@ -2121,12 +2113,11 @@ impl Database { .await? .ok_or_else(|| anyhow!("project not found"))?; if project.host_connection()? == connection { - let room_id = project.room_id; project::Entity::delete(project.into_active_model()) .exec(&*tx) .await?; let room = self.get_room(room_id, &tx).await?; - Ok((room_id, (room, guest_connection_ids))) + Ok((room, guest_connection_ids)) } else { Err(anyhow!("cannot unshare a project hosted by another user"))? } @@ -2140,7 +2131,8 @@ impl Database { connection: ConnectionId, worktrees: &[proto::WorktreeMetadata], ) -> Result)>> { - self.room_transaction(|tx| async move { + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let project = project::Entity::find_by_id(project_id) .filter( Condition::all() @@ -2158,7 +2150,7 @@ impl Database { let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?; let room = self.get_room(project.room_id, &tx).await?; - Ok((project.room_id, (room, guest_connection_ids))) + Ok((room, guest_connection_ids)) }) .await } @@ -2203,12 +2195,12 @@ impl Database { update: &proto::UpdateWorktree, connection: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project_id = ProjectId::from_proto(update.project_id); - let worktree_id = update.worktree_id as i64; - + let project_id = ProjectId::from_proto(update.project_id); + let worktree_id = update.worktree_id as i64; + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { // Ensure the update comes from the host. - let project = project::Entity::find_by_id(project_id) + let _project = project::Entity::find_by_id(project_id) .filter( Condition::all() .add(project::Column::HostConnectionId.eq(connection.id as i32)) @@ -2219,7 +2211,6 @@ impl Database { .one(&*tx) .await? .ok_or_else(|| anyhow!("no such project"))?; - let room_id = project.room_id; // Update metadata. worktree::Entity::update(worktree::ActiveModel { @@ -2299,7 +2290,7 @@ impl Database { } let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; - Ok((room_id, connection_ids)) + Ok(connection_ids) }) .await } @@ -2309,9 +2300,10 @@ impl Database { update: &proto::UpdateDiagnosticSummary, connection: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project_id = ProjectId::from_proto(update.project_id); - let worktree_id = update.worktree_id as i64; + let project_id = ProjectId::from_proto(update.project_id); + let worktree_id = update.worktree_id as i64; + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let summary = update .summary .as_ref() @@ -2353,7 +2345,7 @@ impl Database { .await?; let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; - Ok((project.room_id, connection_ids)) + Ok(connection_ids) }) .await } @@ -2363,8 +2355,9 @@ impl Database { update: &proto::StartLanguageServer, connection: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project_id = ProjectId::from_proto(update.project_id); + let project_id = ProjectId::from_proto(update.project_id); + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let server = update .server .as_ref() @@ -2398,7 +2391,7 @@ impl Database { .await?; let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?; - Ok((project.room_id, connection_ids)) + Ok(connection_ids) }) .await } @@ -2408,7 +2401,8 @@ impl Database { project_id: ProjectId, connection: ConnectionId, ) -> Result> { - self.room_transaction(|tx| async move { + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let participant = room_participant::Entity::find() .filter( Condition::all() @@ -2534,7 +2528,6 @@ impl Database { .all(&*tx) .await?; - let room_id = project.room_id; let project = Project { collaborators: collaborators .into_iter() @@ -2554,7 +2547,7 @@ impl Database { }) .collect(), }; - Ok((room_id, (project, replica_id as ReplicaId))) + Ok((project, replica_id as ReplicaId)) }) .await } @@ -2564,7 +2557,8 @@ impl Database { project_id: ProjectId, connection: ConnectionId, ) -> Result> { - self.room_transaction(|tx| async move { + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let result = project_collaborator::Entity::delete_many() .filter( Condition::all() @@ -2600,7 +2594,7 @@ impl Database { host_connection_id: project.host_connection()?, connection_ids, }; - Ok((project.room_id, left_project)) + Ok(left_project) }) .await } @@ -2610,11 +2604,8 @@ impl Database { project_id: ProjectId, connection_id: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project = project::Entity::find_by_id(project_id) - .one(&*tx) - .await? - .ok_or_else(|| anyhow!("no such project"))?; + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let collaborators = project_collaborator::Entity::find() .filter(project_collaborator::Column::ProjectId.eq(project_id)) .all(&*tx) @@ -2632,7 +2623,7 @@ impl Database { .iter() .any(|collaborator| collaborator.connection_id == connection_id) { - Ok((project.room_id, collaborators)) + Ok(collaborators) } else { Err(anyhow!("no such project"))? } @@ -2645,11 +2636,8 @@ impl Database { project_id: ProjectId, connection_id: ConnectionId, ) -> Result>> { - self.room_transaction(|tx| async move { - let project = project::Entity::find_by_id(project_id) - .one(&*tx) - .await? - .ok_or_else(|| anyhow!("no such project"))?; + let room_id = self.room_id_for_project(project_id).await?; + self.room_transaction(room_id, |tx| async move { let mut collaborators = project_collaborator::Entity::find() .filter(project_collaborator::Column::ProjectId.eq(project_id)) .stream(&*tx) @@ -2662,7 +2650,7 @@ impl Database { } if connection_ids.contains(&connection_id) { - Ok((project.room_id, connection_ids)) + Ok(connection_ids) } else { Err(anyhow!("no such project"))? } @@ -2692,6 +2680,17 @@ impl Database { Ok(guest_connection_ids) } + async fn room_id_for_project(&self, project_id: ProjectId) -> Result { + self.transaction(|tx| async move { + let project = project::Entity::find_by_id(project_id) + .one(&*tx) + .await? + .ok_or_else(|| anyhow!("project {} not found", project_id))?; + Ok(project.room_id) + }) + .await + } + // access tokens pub async fn create_access_token_hash( @@ -2842,21 +2841,48 @@ impl Database { self.run(body).await } - async fn room_transaction(&self, f: F) -> Result> + async fn room_transaction(&self, room_id: RoomId, f: F) -> Result> where F: Send + Fn(TransactionHandle) -> Fut, - Fut: Send + Future>, + Fut: Send + Future>, { - let data = self - .optional_room_transaction(move |tx| { - let future = f(tx); - async { - let data = future.await?; - Ok(Some(data)) + let body = async { + loop { + let lock = self.rooms.entry(room_id).or_default().clone(); + let _guard = lock.lock_owned().await; + let (tx, result) = self.with_transaction(&f).await?; + match result { + Ok(data) => { + match tx.commit().await.map_err(Into::into) { + Ok(()) => { + return Ok(RoomGuard { + data, + _guard, + _not_send: PhantomData, + }); + } + Err(error) => { + if is_serialization_error(&error) { + // Retry (don't break the loop) + } else { + return Err(error); + } + } + } + } + Err(error) => { + tx.rollback().await?; + if is_serialization_error(&error) { + // Retry (don't break the loop) + } else { + return Err(error); + } + } } - }) - .await?; - Ok(data.unwrap()) + } + }; + + self.run(body).await } async fn with_transaction(&self, f: &F) -> Result<(DatabaseTransaction, Result)>