mirror of
https://github.com/zed-industries/zed.git
synced 2025-01-24 11:01:54 +00:00
Merge pull request #2200 from zed-industries/fix-slow-project-join
Hold room lock through the entirety of a `room_transaction`
This commit is contained in:
parent
6161c5d310
commit
5c0d19d789
1 changed files with 112 additions and 86 deletions
|
@ -158,7 +158,7 @@ impl Database {
|
||||||
room_id: RoomId,
|
room_id: RoomId,
|
||||||
new_server_id: ServerId,
|
new_server_id: ServerId,
|
||||||
) -> Result<RoomGuard<RefreshedRoom>> {
|
) -> Result<RoomGuard<RefreshedRoom>> {
|
||||||
self.room_transaction(|tx| async move {
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let stale_participant_filter = Condition::all()
|
let stale_participant_filter = Condition::all()
|
||||||
.add(room_participant::Column::RoomId.eq(room_id))
|
.add(room_participant::Column::RoomId.eq(room_id))
|
||||||
.add(room_participant::Column::AnsweringConnectionId.is_not_null())
|
.add(room_participant::Column::AnsweringConnectionId.is_not_null())
|
||||||
|
@ -194,14 +194,11 @@ impl Database {
|
||||||
room::Entity::delete_by_id(room_id).exec(&*tx).await?;
|
room::Entity::delete_by_id(room_id).exec(&*tx).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((
|
Ok(RefreshedRoom {
|
||||||
room_id,
|
room,
|
||||||
RefreshedRoom {
|
stale_participant_user_ids,
|
||||||
room,
|
canceled_calls_to_user_ids,
|
||||||
stale_participant_user_ids,
|
})
|
||||||
canceled_calls_to_user_ids,
|
|
||||||
},
|
|
||||||
))
|
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -1130,18 +1127,16 @@ impl Database {
|
||||||
user_id: UserId,
|
user_id: UserId,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
live_kit_room: &str,
|
live_kit_room: &str,
|
||||||
) -> Result<RoomGuard<proto::Room>> {
|
) -> Result<proto::Room> {
|
||||||
self.room_transaction(|tx| async move {
|
self.transaction(|tx| async move {
|
||||||
let room = room::ActiveModel {
|
let room = room::ActiveModel {
|
||||||
live_kit_room: ActiveValue::set(live_kit_room.into()),
|
live_kit_room: ActiveValue::set(live_kit_room.into()),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
}
|
}
|
||||||
.insert(&*tx)
|
.insert(&*tx)
|
||||||
.await?;
|
.await?;
|
||||||
let room_id = room.id;
|
|
||||||
|
|
||||||
room_participant::ActiveModel {
|
room_participant::ActiveModel {
|
||||||
room_id: ActiveValue::set(room_id),
|
room_id: ActiveValue::set(room.id),
|
||||||
user_id: ActiveValue::set(user_id),
|
user_id: ActiveValue::set(user_id),
|
||||||
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
|
answering_connection_id: ActiveValue::set(Some(connection.id as i32)),
|
||||||
answering_connection_server_id: ActiveValue::set(Some(ServerId(
|
answering_connection_server_id: ActiveValue::set(Some(ServerId(
|
||||||
|
@ -1158,8 +1153,8 @@ impl Database {
|
||||||
.insert(&*tx)
|
.insert(&*tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let room = self.get_room(room_id, &tx).await?;
|
let room = self.get_room(room.id, &tx).await?;
|
||||||
Ok((room_id, room))
|
Ok(room)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -1172,7 +1167,7 @@ impl Database {
|
||||||
called_user_id: UserId,
|
called_user_id: UserId,
|
||||||
initial_project_id: Option<ProjectId>,
|
initial_project_id: Option<ProjectId>,
|
||||||
) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
|
) -> Result<RoomGuard<(proto::Room, proto::IncomingCall)>> {
|
||||||
self.room_transaction(|tx| async move {
|
self.room_transaction(room_id, |tx| async move {
|
||||||
room_participant::ActiveModel {
|
room_participant::ActiveModel {
|
||||||
room_id: ActiveValue::set(room_id),
|
room_id: ActiveValue::set(room_id),
|
||||||
user_id: ActiveValue::set(called_user_id),
|
user_id: ActiveValue::set(called_user_id),
|
||||||
|
@ -1191,7 +1186,7 @@ impl Database {
|
||||||
let room = self.get_room(room_id, &tx).await?;
|
let room = self.get_room(room_id, &tx).await?;
|
||||||
let incoming_call = Self::build_incoming_call(&room, called_user_id)
|
let incoming_call = Self::build_incoming_call(&room, called_user_id)
|
||||||
.ok_or_else(|| anyhow!("failed to build incoming call"))?;
|
.ok_or_else(|| anyhow!("failed to build incoming call"))?;
|
||||||
Ok((room_id, (room, incoming_call)))
|
Ok((room, incoming_call))
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -1201,7 +1196,7 @@ impl Database {
|
||||||
room_id: RoomId,
|
room_id: RoomId,
|
||||||
called_user_id: UserId,
|
called_user_id: UserId,
|
||||||
) -> Result<RoomGuard<proto::Room>> {
|
) -> Result<RoomGuard<proto::Room>> {
|
||||||
self.room_transaction(|tx| async move {
|
self.room_transaction(room_id, |tx| async move {
|
||||||
room_participant::Entity::delete_many()
|
room_participant::Entity::delete_many()
|
||||||
.filter(
|
.filter(
|
||||||
room_participant::Column::RoomId
|
room_participant::Column::RoomId
|
||||||
|
@ -1211,7 +1206,7 @@ impl Database {
|
||||||
.exec(&*tx)
|
.exec(&*tx)
|
||||||
.await?;
|
.await?;
|
||||||
let room = self.get_room(room_id, &tx).await?;
|
let room = self.get_room(room_id, &tx).await?;
|
||||||
Ok((room_id, room))
|
Ok(room)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -1258,7 +1253,7 @@ impl Database {
|
||||||
calling_connection: ConnectionId,
|
calling_connection: ConnectionId,
|
||||||
called_user_id: UserId,
|
called_user_id: UserId,
|
||||||
) -> Result<RoomGuard<proto::Room>> {
|
) -> Result<RoomGuard<proto::Room>> {
|
||||||
self.room_transaction(|tx| async move {
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let participant = room_participant::Entity::find()
|
let participant = room_participant::Entity::find()
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
|
@ -1277,14 +1272,13 @@ impl Database {
|
||||||
.one(&*tx)
|
.one(&*tx)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| anyhow!("no call to cancel"))?;
|
.ok_or_else(|| anyhow!("no call to cancel"))?;
|
||||||
let room_id = participant.room_id;
|
|
||||||
|
|
||||||
room_participant::Entity::delete(participant.into_active_model())
|
room_participant::Entity::delete(participant.into_active_model())
|
||||||
.exec(&*tx)
|
.exec(&*tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let room = self.get_room(room_id, &tx).await?;
|
let room = self.get_room(room_id, &tx).await?;
|
||||||
Ok((room_id, room))
|
Ok(room)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -1295,7 +1289,7 @@ impl Database {
|
||||||
user_id: UserId,
|
user_id: UserId,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
) -> Result<RoomGuard<proto::Room>> {
|
) -> Result<RoomGuard<proto::Room>> {
|
||||||
self.room_transaction(|tx| async move {
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let result = room_participant::Entity::update_many()
|
let result = room_participant::Entity::update_many()
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
|
@ -1317,7 +1311,7 @@ impl Database {
|
||||||
Err(anyhow!("room does not exist or was already joined"))?
|
Err(anyhow!("room does not exist or was already joined"))?
|
||||||
} else {
|
} else {
|
||||||
let room = self.get_room(room_id, &tx).await?;
|
let room = self.get_room(room_id, &tx).await?;
|
||||||
Ok((room_id, room))
|
Ok(room)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
|
@ -1329,9 +1323,9 @@ impl Database {
|
||||||
user_id: UserId,
|
user_id: UserId,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
) -> Result<RoomGuard<RejoinedRoom>> {
|
) -> Result<RoomGuard<RejoinedRoom>> {
|
||||||
self.room_transaction(|tx| async {
|
let room_id = RoomId::from_proto(rejoin_room.id);
|
||||||
|
self.room_transaction(room_id, |tx| async {
|
||||||
let tx = tx;
|
let tx = tx;
|
||||||
let room_id = RoomId::from_proto(rejoin_room.id);
|
|
||||||
let participant_update = room_participant::Entity::update_many()
|
let participant_update = room_participant::Entity::update_many()
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
|
@ -1550,14 +1544,11 @@ impl Database {
|
||||||
}
|
}
|
||||||
|
|
||||||
let room = self.get_room(room_id, &tx).await?;
|
let room = self.get_room(room_id, &tx).await?;
|
||||||
Ok((
|
Ok(RejoinedRoom {
|
||||||
room_id,
|
room,
|
||||||
RejoinedRoom {
|
rejoined_projects,
|
||||||
room,
|
reshared_projects,
|
||||||
rejoined_projects,
|
})
|
||||||
reshared_projects,
|
|
||||||
},
|
|
||||||
))
|
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -1789,7 +1780,7 @@ impl Database {
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
location: proto::ParticipantLocation,
|
location: proto::ParticipantLocation,
|
||||||
) -> Result<RoomGuard<proto::Room>> {
|
) -> Result<RoomGuard<proto::Room>> {
|
||||||
self.room_transaction(|tx| async {
|
self.room_transaction(room_id, |tx| async {
|
||||||
let tx = tx;
|
let tx = tx;
|
||||||
let location_kind;
|
let location_kind;
|
||||||
let location_project_id;
|
let location_project_id;
|
||||||
|
@ -1835,7 +1826,7 @@ impl Database {
|
||||||
|
|
||||||
if result.rows_affected == 1 {
|
if result.rows_affected == 1 {
|
||||||
let room = self.get_room(room_id, &tx).await?;
|
let room = self.get_room(room_id, &tx).await?;
|
||||||
Ok((room_id, room))
|
Ok(room)
|
||||||
} else {
|
} else {
|
||||||
Err(anyhow!("could not update room participant location"))?
|
Err(anyhow!("could not update room participant location"))?
|
||||||
}
|
}
|
||||||
|
@ -2042,7 +2033,7 @@ impl Database {
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
worktrees: &[proto::WorktreeMetadata],
|
worktrees: &[proto::WorktreeMetadata],
|
||||||
) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
|
) -> Result<RoomGuard<(ProjectId, proto::Room)>> {
|
||||||
self.room_transaction(|tx| async move {
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let participant = room_participant::Entity::find()
|
let participant = room_participant::Entity::find()
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
|
@ -2103,7 +2094,7 @@ impl Database {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let room = self.get_room(room_id, &tx).await?;
|
let room = self.get_room(room_id, &tx).await?;
|
||||||
Ok((room_id, (project.id, room)))
|
Ok((project.id, room))
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -2113,7 +2104,8 @@ impl Database {
|
||||||
project_id: ProjectId,
|
project_id: ProjectId,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
|
) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
|
||||||
self.room_transaction(|tx| async move {
|
let room_id = self.room_id_for_project(project_id).await?;
|
||||||
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
|
let guest_connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
|
||||||
|
|
||||||
let project = project::Entity::find_by_id(project_id)
|
let project = project::Entity::find_by_id(project_id)
|
||||||
|
@ -2121,12 +2113,11 @@ impl Database {
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| anyhow!("project not found"))?;
|
.ok_or_else(|| anyhow!("project not found"))?;
|
||||||
if project.host_connection()? == connection {
|
if project.host_connection()? == connection {
|
||||||
let room_id = project.room_id;
|
|
||||||
project::Entity::delete(project.into_active_model())
|
project::Entity::delete(project.into_active_model())
|
||||||
.exec(&*tx)
|
.exec(&*tx)
|
||||||
.await?;
|
.await?;
|
||||||
let room = self.get_room(room_id, &tx).await?;
|
let room = self.get_room(room_id, &tx).await?;
|
||||||
Ok((room_id, (room, guest_connection_ids)))
|
Ok((room, guest_connection_ids))
|
||||||
} else {
|
} else {
|
||||||
Err(anyhow!("cannot unshare a project hosted by another user"))?
|
Err(anyhow!("cannot unshare a project hosted by another user"))?
|
||||||
}
|
}
|
||||||
|
@ -2140,7 +2131,8 @@ impl Database {
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
worktrees: &[proto::WorktreeMetadata],
|
worktrees: &[proto::WorktreeMetadata],
|
||||||
) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
|
) -> Result<RoomGuard<(proto::Room, Vec<ConnectionId>)>> {
|
||||||
self.room_transaction(|tx| async move {
|
let room_id = self.room_id_for_project(project_id).await?;
|
||||||
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let project = project::Entity::find_by_id(project_id)
|
let project = project::Entity::find_by_id(project_id)
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
|
@ -2158,7 +2150,7 @@ impl Database {
|
||||||
|
|
||||||
let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
|
let guest_connection_ids = self.project_guest_connection_ids(project.id, &tx).await?;
|
||||||
let room = self.get_room(project.room_id, &tx).await?;
|
let room = self.get_room(project.room_id, &tx).await?;
|
||||||
Ok((project.room_id, (room, guest_connection_ids)))
|
Ok((room, guest_connection_ids))
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -2203,12 +2195,12 @@ impl Database {
|
||||||
update: &proto::UpdateWorktree,
|
update: &proto::UpdateWorktree,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
) -> Result<RoomGuard<Vec<ConnectionId>>> {
|
) -> Result<RoomGuard<Vec<ConnectionId>>> {
|
||||||
self.room_transaction(|tx| async move {
|
let project_id = ProjectId::from_proto(update.project_id);
|
||||||
let project_id = ProjectId::from_proto(update.project_id);
|
let worktree_id = update.worktree_id as i64;
|
||||||
let worktree_id = update.worktree_id as i64;
|
let room_id = self.room_id_for_project(project_id).await?;
|
||||||
|
self.room_transaction(room_id, |tx| async move {
|
||||||
// Ensure the update comes from the host.
|
// Ensure the update comes from the host.
|
||||||
let project = project::Entity::find_by_id(project_id)
|
let _project = project::Entity::find_by_id(project_id)
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
.add(project::Column::HostConnectionId.eq(connection.id as i32))
|
.add(project::Column::HostConnectionId.eq(connection.id as i32))
|
||||||
|
@ -2219,7 +2211,6 @@ impl Database {
|
||||||
.one(&*tx)
|
.one(&*tx)
|
||||||
.await?
|
.await?
|
||||||
.ok_or_else(|| anyhow!("no such project"))?;
|
.ok_or_else(|| anyhow!("no such project"))?;
|
||||||
let room_id = project.room_id;
|
|
||||||
|
|
||||||
// Update metadata.
|
// Update metadata.
|
||||||
worktree::Entity::update(worktree::ActiveModel {
|
worktree::Entity::update(worktree::ActiveModel {
|
||||||
|
@ -2299,7 +2290,7 @@ impl Database {
|
||||||
}
|
}
|
||||||
|
|
||||||
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
|
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
|
||||||
Ok((room_id, connection_ids))
|
Ok(connection_ids)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -2309,9 +2300,10 @@ impl Database {
|
||||||
update: &proto::UpdateDiagnosticSummary,
|
update: &proto::UpdateDiagnosticSummary,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
) -> Result<RoomGuard<Vec<ConnectionId>>> {
|
) -> Result<RoomGuard<Vec<ConnectionId>>> {
|
||||||
self.room_transaction(|tx| async move {
|
let project_id = ProjectId::from_proto(update.project_id);
|
||||||
let project_id = ProjectId::from_proto(update.project_id);
|
let worktree_id = update.worktree_id as i64;
|
||||||
let worktree_id = update.worktree_id as i64;
|
let room_id = self.room_id_for_project(project_id).await?;
|
||||||
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let summary = update
|
let summary = update
|
||||||
.summary
|
.summary
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
@ -2353,7 +2345,7 @@ impl Database {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
|
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
|
||||||
Ok((project.room_id, connection_ids))
|
Ok(connection_ids)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -2363,8 +2355,9 @@ impl Database {
|
||||||
update: &proto::StartLanguageServer,
|
update: &proto::StartLanguageServer,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
) -> Result<RoomGuard<Vec<ConnectionId>>> {
|
) -> Result<RoomGuard<Vec<ConnectionId>>> {
|
||||||
self.room_transaction(|tx| async move {
|
let project_id = ProjectId::from_proto(update.project_id);
|
||||||
let project_id = ProjectId::from_proto(update.project_id);
|
let room_id = self.room_id_for_project(project_id).await?;
|
||||||
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let server = update
|
let server = update
|
||||||
.server
|
.server
|
||||||
.as_ref()
|
.as_ref()
|
||||||
|
@ -2398,7 +2391,7 @@ impl Database {
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
|
let connection_ids = self.project_guest_connection_ids(project_id, &tx).await?;
|
||||||
Ok((project.room_id, connection_ids))
|
Ok(connection_ids)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -2408,7 +2401,8 @@ impl Database {
|
||||||
project_id: ProjectId,
|
project_id: ProjectId,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
) -> Result<RoomGuard<(Project, ReplicaId)>> {
|
) -> Result<RoomGuard<(Project, ReplicaId)>> {
|
||||||
self.room_transaction(|tx| async move {
|
let room_id = self.room_id_for_project(project_id).await?;
|
||||||
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let participant = room_participant::Entity::find()
|
let participant = room_participant::Entity::find()
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
|
@ -2534,7 +2528,6 @@ impl Database {
|
||||||
.all(&*tx)
|
.all(&*tx)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let room_id = project.room_id;
|
|
||||||
let project = Project {
|
let project = Project {
|
||||||
collaborators: collaborators
|
collaborators: collaborators
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -2554,7 +2547,7 @@ impl Database {
|
||||||
})
|
})
|
||||||
.collect(),
|
.collect(),
|
||||||
};
|
};
|
||||||
Ok((room_id, (project, replica_id as ReplicaId)))
|
Ok((project, replica_id as ReplicaId))
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -2564,7 +2557,8 @@ impl Database {
|
||||||
project_id: ProjectId,
|
project_id: ProjectId,
|
||||||
connection: ConnectionId,
|
connection: ConnectionId,
|
||||||
) -> Result<RoomGuard<LeftProject>> {
|
) -> Result<RoomGuard<LeftProject>> {
|
||||||
self.room_transaction(|tx| async move {
|
let room_id = self.room_id_for_project(project_id).await?;
|
||||||
|
self.room_transaction(room_id, |tx| async move {
|
||||||
let result = project_collaborator::Entity::delete_many()
|
let result = project_collaborator::Entity::delete_many()
|
||||||
.filter(
|
.filter(
|
||||||
Condition::all()
|
Condition::all()
|
||||||
|
@ -2600,7 +2594,7 @@ impl Database {
|
||||||
host_connection_id: project.host_connection()?,
|
host_connection_id: project.host_connection()?,
|
||||||
connection_ids,
|
connection_ids,
|
||||||
};
|
};
|
||||||
Ok((project.room_id, left_project))
|
Ok(left_project)
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -2610,11 +2604,8 @@ impl Database {
|
||||||
project_id: ProjectId,
|
project_id: ProjectId,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
|
) -> Result<RoomGuard<Vec<ProjectCollaborator>>> {
|
||||||
self.room_transaction(|tx| async move {
|
let room_id = self.room_id_for_project(project_id).await?;
|
||||||
let project = project::Entity::find_by_id(project_id)
|
self.room_transaction(room_id, |tx| async move {
|
||||||
.one(&*tx)
|
|
||||||
.await?
|
|
||||||
.ok_or_else(|| anyhow!("no such project"))?;
|
|
||||||
let collaborators = project_collaborator::Entity::find()
|
let collaborators = project_collaborator::Entity::find()
|
||||||
.filter(project_collaborator::Column::ProjectId.eq(project_id))
|
.filter(project_collaborator::Column::ProjectId.eq(project_id))
|
||||||
.all(&*tx)
|
.all(&*tx)
|
||||||
|
@ -2632,7 +2623,7 @@ impl Database {
|
||||||
.iter()
|
.iter()
|
||||||
.any(|collaborator| collaborator.connection_id == connection_id)
|
.any(|collaborator| collaborator.connection_id == connection_id)
|
||||||
{
|
{
|
||||||
Ok((project.room_id, collaborators))
|
Ok(collaborators)
|
||||||
} else {
|
} else {
|
||||||
Err(anyhow!("no such project"))?
|
Err(anyhow!("no such project"))?
|
||||||
}
|
}
|
||||||
|
@ -2645,11 +2636,8 @@ impl Database {
|
||||||
project_id: ProjectId,
|
project_id: ProjectId,
|
||||||
connection_id: ConnectionId,
|
connection_id: ConnectionId,
|
||||||
) -> Result<RoomGuard<HashSet<ConnectionId>>> {
|
) -> Result<RoomGuard<HashSet<ConnectionId>>> {
|
||||||
self.room_transaction(|tx| async move {
|
let room_id = self.room_id_for_project(project_id).await?;
|
||||||
let project = project::Entity::find_by_id(project_id)
|
self.room_transaction(room_id, |tx| async move {
|
||||||
.one(&*tx)
|
|
||||||
.await?
|
|
||||||
.ok_or_else(|| anyhow!("no such project"))?;
|
|
||||||
let mut collaborators = project_collaborator::Entity::find()
|
let mut collaborators = project_collaborator::Entity::find()
|
||||||
.filter(project_collaborator::Column::ProjectId.eq(project_id))
|
.filter(project_collaborator::Column::ProjectId.eq(project_id))
|
||||||
.stream(&*tx)
|
.stream(&*tx)
|
||||||
|
@ -2662,7 +2650,7 @@ impl Database {
|
||||||
}
|
}
|
||||||
|
|
||||||
if connection_ids.contains(&connection_id) {
|
if connection_ids.contains(&connection_id) {
|
||||||
Ok((project.room_id, connection_ids))
|
Ok(connection_ids)
|
||||||
} else {
|
} else {
|
||||||
Err(anyhow!("no such project"))?
|
Err(anyhow!("no such project"))?
|
||||||
}
|
}
|
||||||
|
@ -2692,6 +2680,17 @@ impl Database {
|
||||||
Ok(guest_connection_ids)
|
Ok(guest_connection_ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn room_id_for_project(&self, project_id: ProjectId) -> Result<RoomId> {
|
||||||
|
self.transaction(|tx| async move {
|
||||||
|
let project = project::Entity::find_by_id(project_id)
|
||||||
|
.one(&*tx)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| anyhow!("project {} not found", project_id))?;
|
||||||
|
Ok(project.room_id)
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
|
||||||
// access tokens
|
// access tokens
|
||||||
|
|
||||||
pub async fn create_access_token_hash(
|
pub async fn create_access_token_hash(
|
||||||
|
@ -2842,21 +2841,48 @@ impl Database {
|
||||||
self.run(body).await
|
self.run(body).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn room_transaction<F, Fut, T>(&self, f: F) -> Result<RoomGuard<T>>
|
async fn room_transaction<F, Fut, T>(&self, room_id: RoomId, f: F) -> Result<RoomGuard<T>>
|
||||||
where
|
where
|
||||||
F: Send + Fn(TransactionHandle) -> Fut,
|
F: Send + Fn(TransactionHandle) -> Fut,
|
||||||
Fut: Send + Future<Output = Result<(RoomId, T)>>,
|
Fut: Send + Future<Output = Result<T>>,
|
||||||
{
|
{
|
||||||
let data = self
|
let body = async {
|
||||||
.optional_room_transaction(move |tx| {
|
loop {
|
||||||
let future = f(tx);
|
let lock = self.rooms.entry(room_id).or_default().clone();
|
||||||
async {
|
let _guard = lock.lock_owned().await;
|
||||||
let data = future.await?;
|
let (tx, result) = self.with_transaction(&f).await?;
|
||||||
Ok(Some(data))
|
match result {
|
||||||
|
Ok(data) => {
|
||||||
|
match tx.commit().await.map_err(Into::into) {
|
||||||
|
Ok(()) => {
|
||||||
|
return Ok(RoomGuard {
|
||||||
|
data,
|
||||||
|
_guard,
|
||||||
|
_not_send: PhantomData,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
if is_serialization_error(&error) {
|
||||||
|
// Retry (don't break the loop)
|
||||||
|
} else {
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
tx.rollback().await?;
|
||||||
|
if is_serialization_error(&error) {
|
||||||
|
// Retry (don't break the loop)
|
||||||
|
} else {
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
}
|
||||||
.await?;
|
};
|
||||||
Ok(data.unwrap())
|
|
||||||
|
self.run(body).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
|
async fn with_transaction<F, Fut, T>(&self, f: &F) -> Result<(DatabaseTransaction, Result<T>)>
|
||||||
|
|
Loading…
Reference in a new issue