diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 30049f2d05..bb1bff7ff8 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -987,32 +987,22 @@ impl Database { initial_project_id: Option, ) -> Result> { self.transact(|tx| async move { - todo!() - // sqlx::query( - // " - // INSERT INTO room_participants ( - // room_id, - // user_id, - // calling_user_id, - // calling_connection_id, - // initial_project_id - // ) - // VALUES ($1, $2, $3, $4, $5) - // ", - // ) - // .bind(room_id) - // .bind(called_user_id) - // .bind(calling_user_id) - // .bind(calling_connection_id.0 as i32) - // .bind(initial_project_id) - // .execute(&mut tx) - // .await?; + room_participant::ActiveModel { + room_id: ActiveValue::set(room_id), + user_id: ActiveValue::set(called_user_id), + calling_user_id: ActiveValue::set(calling_user_id), + calling_connection_id: ActiveValue::set(calling_connection_id.0 as i32), + initial_project_id: ActiveValue::set(initial_project_id), + ..Default::default() + } + .insert(&tx) + .await?; - // let room = self.get_room(room_id, &mut tx).await?; - // let incoming_call = Self::build_incoming_call(&room, called_user_id) - // .ok_or_else(|| anyhow!("failed to build incoming call"))?; - // self.commit_room_transaction(room_id, tx, (room, incoming_call)) - // .await + let room = self.get_room(room_id, &tx).await?; + let incoming_call = Self::build_incoming_call(&room, called_user_id) + .ok_or_else(|| anyhow!("failed to build incoming call"))?; + self.commit_room_transaction(room_id, tx, (room, incoming_call)) + .await }) .await } @@ -1023,20 +1013,16 @@ impl Database { called_user_id: UserId, ) -> Result> { self.transact(|tx| async move { - todo!() - // sqlx::query( - // " - // DELETE FROM room_participants - // WHERE room_id = $1 AND user_id = $2 - // ", - // ) - // .bind(room_id) - // .bind(called_user_id) - // .execute(&mut tx) - // .await?; - - // let room = self.get_room(room_id, &mut tx).await?; - // self.commit_room_transaction(room_id, tx, room).await + room_participant::Entity::delete_many() + .filter( + room_participant::Column::RoomId + .eq(room_id) + .and(room_participant::Column::UserId.eq(called_user_id)), + ) + .exec(&tx) + .await?; + let room = self.get_room(room_id, &tx).await?; + self.commit_room_transaction(room_id, tx, room).await }) .await } @@ -1047,23 +1033,27 @@ impl Database { user_id: UserId, ) -> Result> { self.transact(|tx| async move { - todo!() - // let room_id = sqlx::query_scalar( - // " - // DELETE FROM room_participants - // WHERE user_id = $1 AND answering_connection_id IS NULL - // RETURNING room_id - // ", - // ) - // .bind(user_id) - // .fetch_one(&mut tx) - // .await?; - // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { - // return Err(anyhow!("declining call on unexpected room"))?; - // } + let participant = room_participant::Entity::find() + .filter( + room_participant::Column::UserId + .eq(user_id) + .and(room_participant::Column::AnsweringConnectionId.is_null()), + ) + .one(&tx) + .await? + .ok_or_else(|| anyhow!("could not decline call"))?; + let room_id = participant.room_id; - // let room = self.get_room(room_id, &mut tx).await?; - // self.commit_room_transaction(room_id, tx, room).await + if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { + return Err(anyhow!("declining call on unexpected room"))?; + } + + room_participant::Entity::delete(participant.into_active_model()) + .exec(&tx) + .await?; + + let room = self.get_room(room_id, &tx).await?; + self.commit_room_transaction(room_id, tx, room).await }) .await } @@ -1075,24 +1065,30 @@ impl Database { called_user_id: UserId, ) -> Result> { self.transact(|tx| async move { - todo!() - // let room_id = sqlx::query_scalar( - // " - // DELETE FROM room_participants - // WHERE user_id = $1 AND calling_connection_id = $2 AND answering_connection_id IS NULL - // RETURNING room_id - // ", - // ) - // .bind(called_user_id) - // .bind(calling_connection_id.0 as i32) - // .fetch_one(&mut tx) - // .await?; - // if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { - // return Err(anyhow!("canceling call on unexpected room"))?; - // } + let participant = room_participant::Entity::find() + .filter( + room_participant::Column::UserId + .eq(called_user_id) + .and( + room_participant::Column::CallingConnectionId + .eq(calling_connection_id.0 as i32), + ) + .and(room_participant::Column::AnsweringConnectionId.is_null()), + ) + .one(&tx) + .await? + .ok_or_else(|| anyhow!("could not cancel call"))?; + let room_id = participant.room_id; + if expected_room_id.map_or(false, |expected_room_id| expected_room_id != room_id) { + return Err(anyhow!("canceling call on unexpected room"))?; + } - // let room = self.get_room(room_id, &mut tx).await?; - // self.commit_room_transaction(room_id, tx, room).await + room_participant::Entity::delete(participant.into_active_model()) + .exec(&tx) + .await?; + + let room = self.get_room(room_id, &tx).await?; + self.commit_room_transaction(room_id, tx, room).await }) .await } @@ -1104,23 +1100,25 @@ impl Database { connection_id: ConnectionId, ) -> Result> { self.transact(|tx| async move { - todo!() - // sqlx::query( - // " - // UPDATE room_participants - // SET answering_connection_id = $1 - // WHERE room_id = $2 AND user_id = $3 - // RETURNING 1 - // ", - // ) - // .bind(connection_id.0 as i32) - // .bind(room_id) - // .bind(user_id) - // .fetch_one(&mut tx) - // .await?; - - // let room = self.get_room(room_id, &mut tx).await?; - // self.commit_room_transaction(room_id, tx, room).await + let result = room_participant::Entity::update_many() + .filter( + room_participant::Column::RoomId + .eq(room_id) + .and(room_participant::Column::UserId.eq(user_id)) + .and(room_participant::Column::AnsweringConnectionId.is_null()), + ) + .col_expr( + room_participant::Column::AnsweringConnectionId, + connection_id.0.into(), + ) + .exec(&tx) + .await?; + if result.rows_affected == 0 { + Err(anyhow!("room does not exist or was already joined"))? + } else { + let room = self.get_room(room_id, &tx).await?; + self.commit_room_transaction(room_id, tx, room).await + } }) .await } @@ -1130,124 +1128,117 @@ impl Database { connection_id: ConnectionId, ) -> Result>> { self.transact(|tx| async move { - todo!() - // // Leave room. - // let room_id = sqlx::query_scalar::<_, RoomId>( - // " - // DELETE FROM room_participants - // WHERE answering_connection_id = $1 - // RETURNING room_id - // ", - // ) - // .bind(connection_id.0 as i32) - // .fetch_optional(&mut tx) - // .await?; + let leaving_participant = room_participant::Entity::find() + .filter(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)) + .one(&tx) + .await?; - // if let Some(room_id) = room_id { - // // Cancel pending calls initiated by the leaving user. - // let canceled_calls_to_user_ids: Vec = sqlx::query_scalar( - // " - // DELETE FROM room_participants - // WHERE calling_connection_id = $1 AND answering_connection_id IS NULL - // RETURNING user_id - // ", - // ) - // .bind(connection_id.0 as i32) - // .fetch_all(&mut tx) - // .await?; + if let Some(leaving_participant) = leaving_participant { + // Leave room. + let room_id = leaving_participant.room_id; + room_participant::Entity::delete_by_id(leaving_participant.id) + .exec(&tx) + .await?; - // let project_ids = sqlx::query_scalar::<_, ProjectId>( - // " - // SELECT project_id - // FROM project_collaborators - // WHERE connection_id = $1 - // ", - // ) - // .bind(connection_id.0 as i32) - // .fetch_all(&mut tx) - // .await?; + // Cancel pending calls initiated by the leaving user. + let called_participants = room_participant::Entity::find() + .filter( + room_participant::Column::CallingConnectionId + .eq(connection_id.0) + .and(room_participant::Column::AnsweringConnectionId.is_null()), + ) + .all(&tx) + .await?; + room_participant::Entity::delete_many() + .filter( + room_participant::Column::Id + .is_in(called_participants.iter().map(|participant| participant.id)), + ) + .exec(&tx) + .await?; + let canceled_calls_to_user_ids = called_participants + .into_iter() + .map(|participant| participant.user_id) + .collect(); - // // Leave projects. - // let mut left_projects = HashMap::default(); - // if !project_ids.is_empty() { - // let mut params = "?,".repeat(project_ids.len()); - // params.pop(); - // let query = format!( - // " - // SELECT * - // FROM project_collaborators - // WHERE project_id IN ({params}) - // " - // ); - // let mut query = sqlx::query_as::<_, ProjectCollaborator>(&query); - // for project_id in project_ids { - // query = query.bind(project_id); - // } + // Detect left projects. + #[derive(Copy, Clone, Debug, EnumIter, DeriveColumn)] + enum QueryProjectIds { + ProjectId, + } + let project_ids: Vec = project_collaborator::Entity::find() + .select_only() + .column_as( + project_collaborator::Column::ProjectId, + QueryProjectIds::ProjectId, + ) + .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0)) + .into_values::<_, QueryProjectIds>() + .all(&tx) + .await?; + let mut left_projects = HashMap::default(); + let mut collaborators = project_collaborator::Entity::find() + .filter(project_collaborator::Column::ProjectId.is_in(project_ids)) + .stream(&tx) + .await?; + while let Some(collaborator) = collaborators.next().await { + let collaborator = collaborator?; + let left_project = + left_projects + .entry(collaborator.project_id) + .or_insert(LeftProject { + id: collaborator.project_id, + host_user_id: Default::default(), + connection_ids: Default::default(), + host_connection_id: Default::default(), + }); - // let mut project_collaborators = query.fetch(&mut tx); - // while let Some(collaborator) = project_collaborators.next().await { - // let collaborator = collaborator?; - // let left_project = - // left_projects - // .entry(collaborator.project_id) - // .or_insert(LeftProject { - // id: collaborator.project_id, - // host_user_id: Default::default(), - // connection_ids: Default::default(), - // host_connection_id: Default::default(), - // }); + let collaborator_connection_id = + ConnectionId(collaborator.connection_id as u32); + if collaborator_connection_id != connection_id { + left_project.connection_ids.push(collaborator_connection_id); + } - // let collaborator_connection_id = - // ConnectionId(collaborator.connection_id as u32); - // if collaborator_connection_id != connection_id { - // left_project.connection_ids.push(collaborator_connection_id); - // } + if collaborator.is_host { + left_project.host_user_id = collaborator.user_id; + left_project.host_connection_id = + ConnectionId(collaborator.connection_id as u32); + } + } + drop(collaborators); - // if collaborator.is_host { - // left_project.host_user_id = collaborator.user_id; - // left_project.host_connection_id = - // ConnectionId(collaborator.connection_id as u32); - // } - // } - // } - // sqlx::query( - // " - // DELETE FROM project_collaborators - // WHERE connection_id = $1 - // ", - // ) - // .bind(connection_id.0 as i32) - // .execute(&mut tx) - // .await?; + // Leave projects. + project_collaborator::Entity::delete_many() + .filter(project_collaborator::Column::ConnectionId.eq(connection_id.0)) + .exec(&tx) + .await?; - // // Unshare projects. - // sqlx::query( - // " - // DELETE FROM projects - // WHERE room_id = $1 AND host_connection_id = $2 - // ", - // ) - // .bind(room_id) - // .bind(connection_id.0 as i32) - // .execute(&mut tx) - // .await?; + // Unshare projects. + project::Entity::delete_many() + .filter( + project::Column::RoomId + .eq(room_id) + .and(project::Column::HostConnectionId.eq(connection_id.0)), + ) + .exec(&tx) + .await?; - // let room = self.get_room(room_id, &mut tx).await?; - // Ok(Some( - // self.commit_room_transaction( - // room_id, - // tx, - // LeftRoom { - // room, - // left_projects, - // canceled_calls_to_user_ids, - // }, - // ) - // .await?, - // )) - // } else { - // Ok(None) - // } + let room = self.get_room(room_id, &tx).await?; + Ok(Some( + self.commit_room_transaction( + room_id, + tx, + LeftRoom { + room, + left_projects, + canceled_calls_to_user_ids, + }, + ) + .await?, + )) + } else { + Ok(None) + } }) .await } @@ -1259,46 +1250,48 @@ impl Database { location: proto::ParticipantLocation, ) -> Result> { self.transact(|tx| async { - todo!() - // let mut tx = tx; - // let location_kind; - // let location_project_id; - // match location - // .variant - // .as_ref() - // .ok_or_else(|| anyhow!("invalid location"))? - // { - // proto::participant_location::Variant::SharedProject(project) => { - // location_kind = 0; - // location_project_id = Some(ProjectId::from_proto(project.id)); - // } - // proto::participant_location::Variant::UnsharedProject(_) => { - // location_kind = 1; - // location_project_id = None; - // } - // proto::participant_location::Variant::External(_) => { - // location_kind = 2; - // location_project_id = None; - // } - // } + let mut tx = tx; + let location_kind; + let location_project_id; + match location + .variant + .as_ref() + .ok_or_else(|| anyhow!("invalid location"))? + { + proto::participant_location::Variant::SharedProject(project) => { + location_kind = 0; + location_project_id = Some(ProjectId::from_proto(project.id)); + } + proto::participant_location::Variant::UnsharedProject(_) => { + location_kind = 1; + location_project_id = None; + } + proto::participant_location::Variant::External(_) => { + location_kind = 2; + location_project_id = None; + } + } - // sqlx::query( - // " - // UPDATE room_participants - // SET location_kind = $1, location_project_id = $2 - // WHERE room_id = $3 AND answering_connection_id = $4 - // RETURNING 1 - // ", - // ) - // .bind(location_kind) - // .bind(location_project_id) - // .bind(room_id) - // .bind(connection_id.0 as i32) - // .fetch_one(&mut tx) - // .await?; + let result = room_participant::Entity::update_many() + .filter( + room_participant::Column::RoomId + .eq(room_id) + .and(room_participant::Column::AnsweringConnectionId.eq(connection_id.0)), + ) + .set(room_participant::ActiveModel { + location_kind: ActiveValue::set(Some(location_kind)), + location_project_id: ActiveValue::set(location_project_id), + ..Default::default() + }) + .exec(&tx) + .await?; - // let room = self.get_room(room_id, &mut tx).await?; - // self.commit_room_transaction(room_id, tx, room).await + if result.rows_affected == 1 { + let room = self.get_room(room_id, &mut tx).await?; + self.commit_room_transaction(room_id, tx, room).await + } else { + Err(anyhow!("could not update room participant location"))? + } }) .await }