From c213c98ea40dca5408f1f4250bc338dc49953905 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 11 Nov 2022 15:22:04 +0100 Subject: [PATCH] Remove `calls` table and use just `room_participants` --- crates/call/src/room.rs | 7 +- .../20221109000000_test_schema.sql | 16 +- .../20221111092550_reconnection_support.sql | 15 +- crates/collab/src/db.rs | 165 +++------- crates/collab/src/rpc/store.rs | 289 +++++++++--------- crates/rpc/proto/zed.proto | 8 +- 6 files changed, 208 insertions(+), 292 deletions(-) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 3e55dc4ce9..4f3079e72c 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -294,6 +294,11 @@ impl Room { .position(|participant| Some(participant.user_id) == self.client.user_id()); let local_participant = local_participant_ix.map(|ix| room.participants.swap_remove(ix)); + let pending_participant_user_ids = room + .pending_participants + .iter() + .map(|p| p.user_id) + .collect::>(); let remote_participant_user_ids = room .participants .iter() @@ -303,7 +308,7 @@ impl Room { self.user_store.update(cx, move |user_store, cx| { ( user_store.get_users(remote_participant_user_ids, cx), - user_store.get_users(room.pending_participant_user_ids, cx), + user_store.get_users(pending_participant_user_ids, cx), ) }); self.pending_room_update = Some(cx.spawn(|this, mut cx| async move { diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 9302657523..5b38ebf8b1 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -70,16 +70,8 @@ CREATE TABLE "room_participants" ( "user_id" INTEGER NOT NULL REFERENCES users (id), "connection_id" INTEGER, "location_kind" INTEGER, - "location_project_id" INTEGER REFERENCES projects (id) + "location_project_id" INTEGER REFERENCES projects (id), + "initial_project_id" INTEGER REFERENCES projects (id), + "calling_user_id" INTEGER NOT NULL REFERENCES users (id) ); -CREATE UNIQUE INDEX "index_room_participants_on_user_id_and_room_id" ON "room_participants" ("user_id", "room_id"); - -CREATE TABLE "calls" ( - "id" INTEGER PRIMARY KEY, - "room_id" INTEGER NOT NULL REFERENCES rooms (id), - "calling_user_id" INTEGER NOT NULL REFERENCES users (id), - "called_user_id" INTEGER NOT NULL REFERENCES users (id), - "answering_connection_id" INTEGER, - "initial_project_id" INTEGER REFERENCES projects (id) -); -CREATE UNIQUE INDEX "index_calls_on_called_user_id" ON "calls" ("called_user_id"); +CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); diff --git a/crates/collab/migrations/20221111092550_reconnection_support.sql b/crates/collab/migrations/20221111092550_reconnection_support.sql index 8f932acff3..621512bf43 100644 --- a/crates/collab/migrations/20221111092550_reconnection_support.sql +++ b/crates/collab/migrations/20221111092550_reconnection_support.sql @@ -32,16 +32,9 @@ CREATE TABLE IF NOT EXISTS "room_participants" ( "user_id" INTEGER NOT NULL REFERENCES users (id), "connection_id" INTEGER, "location_kind" INTEGER, - "location_project_id" INTEGER REFERENCES projects (id) + "location_project_id" INTEGER REFERENCES projects (id), + "initial_project_id" INTEGER REFERENCES projects (id), + "calling_user_id" INTEGER NOT NULL REFERENCES users (id) ); -CREATE UNIQUE INDEX "index_room_participants_on_user_id_and_room_id" ON "room_participants" ("user_id", "room_id"); +CREATE UNIQUE INDEX "index_room_participants_on_user_id" ON "room_participants" ("user_id"); -CREATE TABLE IF NOT EXISTS "calls" ( - "id" SERIAL PRIMARY KEY, - "room_id" INTEGER NOT NULL REFERENCES rooms (id), - "calling_user_id" INTEGER NOT NULL REFERENCES users (id), - "called_user_id" INTEGER NOT NULL REFERENCES users (id), - "answering_connection_id" INTEGER, - "initial_project_id" INTEGER REFERENCES projects (id) -); -CREATE UNIQUE INDEX "index_calls_on_called_user_id" ON "calls" ("called_user_id"); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 7cc0dc35fe..a98621d894 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -907,26 +907,14 @@ where sqlx::query( " - INSERT INTO room_participants (room_id, user_id, connection_id) - VALUES ($1, $2, $3) - ", - ) - .bind(room_id) - .bind(user_id) - .bind(connection_id.0 as i32) - .execute(&mut tx) - .await?; - - sqlx::query( - " - INSERT INTO calls (room_id, calling_user_id, called_user_id, answering_connection_id) + INSERT INTO room_participants (room_id, user_id, connection_id, calling_user_id) VALUES ($1, $2, $3, $4) ", ) .bind(room_id) .bind(user_id) - .bind(user_id) .bind(connection_id.0 as i32) + .bind(user_id) .execute(&mut tx) .await?; @@ -945,31 +933,20 @@ where let mut tx = self.pool.begin().await?; sqlx::query( " - INSERT INTO calls (room_id, calling_user_id, called_user_id, initial_project_id) + INSERT INTO room_participants (room_id, user_id, calling_user_id, initial_project_id) VALUES ($1, $2, $3, $4) ", ) .bind(room_id) - .bind(calling_user_id) .bind(called_user_id) + .bind(calling_user_id) .bind(initial_project_id) .execute(&mut tx) .await?; - sqlx::query( - " - INSERT INTO room_participants (room_id, user_id) - VALUES ($1, $2) - ", - ) - .bind(room_id) - .bind(called_user_id) - .execute(&mut tx) - .await?; - let room = self.commit_room_transaction(room_id, tx).await?; - let incoming_call = - Self::build_incoming_call(&room, calling_user_id, initial_project_id); + let incoming_call = Self::build_incoming_call(&room, called_user_id) + .ok_or_else(|| anyhow!("failed to build incoming call"))?; Ok((room, incoming_call)) }) } @@ -980,24 +957,20 @@ where ) -> Result> { test_support!(self, { let mut tx = self.pool.begin().await?; - let call = sqlx::query_as::<_, Call>( + let room_id = sqlx::query_scalar::<_, RoomId>( " - SELECT * - FROM calls - WHERE called_user_id = $1 AND answering_connection_id IS NULL + SELECT room_id + FROM room_participants + WHERE user_id = $1 AND connection_id IS NULL ", ) .bind(user_id) .fetch_optional(&mut tx) .await?; - if let Some(call) = call { - let room = self.get_room(call.room_id, &mut tx).await?; - Ok(Some(Self::build_incoming_call( - &room, - call.calling_user_id, - call.initial_project_id, - ))) + if let Some(room_id) = room_id { + let room = self.get_room(room_id, &mut tx).await?; + Ok(Self::build_incoming_call(&room, user_id)) } else { Ok(None) } @@ -1006,26 +979,30 @@ where fn build_incoming_call( room: &proto::Room, - calling_user_id: UserId, - initial_project_id: Option, - ) -> proto::IncomingCall { - proto::IncomingCall { + called_user_id: UserId, + ) -> Option { + let pending_participant = room + .pending_participants + .iter() + .find(|participant| participant.user_id == called_user_id.to_proto())?; + + Some(proto::IncomingCall { room_id: room.id, - calling_user_id: calling_user_id.to_proto(), + calling_user_id: pending_participant.calling_user_id, participant_user_ids: room .participants .iter() .map(|participant| participant.user_id) .collect(), initial_project: room.participants.iter().find_map(|participant| { - let initial_project_id = initial_project_id?.to_proto(); + let initial_project_id = pending_participant.initial_project_id?; participant .projects .iter() .find(|project| project.id == initial_project_id) .cloned() }), - } + }) } pub async fn call_failed( @@ -1035,17 +1012,6 @@ where ) -> Result { test_support!(self, { let mut tx = self.pool.begin().await?; - sqlx::query( - " - DELETE FROM calls - WHERE room_id = $1 AND called_user_id = $2 - ", - ) - .bind(room_id) - .bind(called_user_id) - .execute(&mut tx) - .await?; - sqlx::query( " DELETE FROM room_participants @@ -1069,20 +1035,6 @@ where ) -> Result { test_support!(self, { let mut tx = self.pool.begin().await?; - sqlx::query( - " - UPDATE calls - SET answering_connection_id = $1 - WHERE room_id = $2 AND called_user_id = $3 - RETURNING 1 - ", - ) - .bind(connection_id.0 as i32) - .bind(room_id) - .bind(user_id) - .fetch_one(&mut tx) - .await?; - sqlx::query( " UPDATE room_participants @@ -1096,54 +1048,8 @@ where .bind(user_id) .fetch_one(&mut tx) .await?; - self.commit_room_transaction(room_id, tx).await }) - - // let connection = self - // .connections - // .get_mut(&connection_id) - // .ok_or_else(|| anyhow!("no such connection"))?; - // let user_id = connection.user_id; - // let recipient_connection_ids = self.connection_ids_for_user(user_id).collect::>(); - - // let connected_user = self - // .connected_users - // .get_mut(&user_id) - // .ok_or_else(|| anyhow!("no such connection"))?; - // let active_call = connected_user - // .active_call - // .as_mut() - // .ok_or_else(|| anyhow!("not being called"))?; - // anyhow::ensure!( - // active_call.room_id == room_id && active_call.connection_id.is_none(), - // "not being called on this room" - // ); - - // let room = self - // .rooms - // .get_mut(&room_id) - // .ok_or_else(|| anyhow!("no such room"))?; - // anyhow::ensure!( - // room.pending_participant_user_ids - // .contains(&user_id.to_proto()), - // anyhow!("no such room") - // ); - // room.pending_participant_user_ids - // .retain(|pending| *pending != user_id.to_proto()); - // room.participants.push(proto::Participant { - // user_id: user_id.to_proto(), - // peer_id: connection_id.0, - // projects: Default::default(), - // location: Some(proto::ParticipantLocation { - // variant: Some(proto::participant_location::Variant::External( - // proto::participant_location::External {}, - // )), - // }), - // }); - // active_call.connection_id = Some(connection_id); - - // Ok((room, recipient_connection_ids)) } pub async fn update_room_participant_location( @@ -1231,9 +1137,9 @@ where .await?; let mut db_participants = - sqlx::query_as::<_, (UserId, Option, Option, Option)>( + sqlx::query_as::<_, (UserId, Option, Option, Option, UserId, Option)>( " - SELECT user_id, connection_id, location_kind, location_project_id + SELECT user_id, connection_id, location_kind, location_project_id, calling_user_id, initial_project_id FROM room_participants WHERE room_id = $1 ", @@ -1242,9 +1148,16 @@ where .fetch(&mut *tx); let mut participants = Vec::new(); - let mut pending_participant_user_ids = Vec::new(); + let mut pending_participants = Vec::new(); while let Some(participant) = db_participants.next().await { - let (user_id, connection_id, _location_kind, _location_project_id) = participant?; + let ( + user_id, + connection_id, + _location_kind, + _location_project_id, + calling_user_id, + initial_project_id, + ) = participant?; if let Some(connection_id) = connection_id { participants.push(proto::Participant { user_id: user_id.to_proto(), @@ -1257,7 +1170,11 @@ where }), }); } else { - pending_participant_user_ids.push(user_id.to_proto()); + pending_participants.push(proto::PendingParticipant { + user_id: user_id.to_proto(), + calling_user_id: calling_user_id.to_proto(), + initial_project_id: initial_project_id.map(|id| id.to_proto()), + }); } } drop(db_participants); @@ -1296,7 +1213,7 @@ where version: room.version as u64, live_kit_room: room.live_kit_room, participants, - pending_participant_user_ids, + pending_participants, }) } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index dfd534dbe9..610a653dc9 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -258,80 +258,81 @@ impl Store { } pub fn leave_room(&mut self, room_id: RoomId, connection_id: ConnectionId) -> Result { - let connection = self - .connections - .get_mut(&connection_id) - .ok_or_else(|| anyhow!("no such connection"))?; - let user_id = connection.user_id; + todo!() + // let connection = self + // .connections + // .get_mut(&connection_id) + // .ok_or_else(|| anyhow!("no such connection"))?; + // let user_id = connection.user_id; - let connected_user = self - .connected_users - .get(&user_id) - .ok_or_else(|| anyhow!("no such connection"))?; - anyhow::ensure!( - connected_user - .active_call - .map_or(false, |call| call.room_id == room_id - && call.connection_id == Some(connection_id)), - "cannot leave a room before joining it" - ); + // let connected_user = self + // .connected_users + // .get(&user_id) + // .ok_or_else(|| anyhow!("no such connection"))?; + // anyhow::ensure!( + // connected_user + // .active_call + // .map_or(false, |call| call.room_id == room_id + // && call.connection_id == Some(connection_id)), + // "cannot leave a room before joining it" + // ); - // Given that users can only join one room at a time, we can safely unshare - // and leave all projects associated with the connection. - let mut unshared_projects = Vec::new(); - let mut left_projects = Vec::new(); - for project_id in connection.projects.clone() { - if let Ok((_, project)) = self.unshare_project(project_id, connection_id) { - unshared_projects.push(project); - } else if let Ok(project) = self.leave_project(project_id, connection_id) { - left_projects.push(project); - } - } - self.connected_users.get_mut(&user_id).unwrap().active_call = None; + // // Given that users can only join one room at a time, we can safely unshare + // // and leave all projects associated with the connection. + // let mut unshared_projects = Vec::new(); + // let mut left_projects = Vec::new(); + // for project_id in connection.projects.clone() { + // if let Ok((_, project)) = self.unshare_project(project_id, connection_id) { + // unshared_projects.push(project); + // } else if let Ok(project) = self.leave_project(project_id, connection_id) { + // left_projects.push(project); + // } + // } + // self.connected_users.get_mut(&user_id).unwrap().active_call = None; - let room = self - .rooms - .get_mut(&room_id) - .ok_or_else(|| anyhow!("no such room"))?; - room.participants - .retain(|participant| participant.peer_id != connection_id.0); + // let room = self + // .rooms + // .get_mut(&room_id) + // .ok_or_else(|| anyhow!("no such room"))?; + // room.participants + // .retain(|participant| participant.peer_id != connection_id.0); - let mut canceled_call_connection_ids = Vec::new(); - room.pending_participant_user_ids - .retain(|pending_participant_user_id| { - if let Some(connected_user) = self - .connected_users - .get_mut(&UserId::from_proto(*pending_participant_user_id)) - { - if let Some(call) = connected_user.active_call.as_ref() { - if call.calling_user_id == user_id { - connected_user.active_call.take(); - canceled_call_connection_ids - .extend(connected_user.connection_ids.iter().copied()); - false - } else { - true - } - } else { - true - } - } else { - true - } - }); + // let mut canceled_call_connection_ids = Vec::new(); + // room.pending_participant_user_ids + // .retain(|pending_participant_user_id| { + // if let Some(connected_user) = self + // .connected_users + // .get_mut(&UserId::from_proto(*pending_participant_user_id)) + // { + // if let Some(call) = connected_user.active_call.as_ref() { + // if call.calling_user_id == user_id { + // connected_user.active_call.take(); + // canceled_call_connection_ids + // .extend(connected_user.connection_ids.iter().copied()); + // false + // } else { + // true + // } + // } else { + // true + // } + // } else { + // true + // } + // }); - let room = if room.participants.is_empty() { - Cow::Owned(self.rooms.remove(&room_id).unwrap()) - } else { - Cow::Borrowed(self.rooms.get(&room_id).unwrap()) - }; + // let room = if room.participants.is_empty() { + // Cow::Owned(self.rooms.remove(&room_id).unwrap()) + // } else { + // Cow::Borrowed(self.rooms.get(&room_id).unwrap()) + // }; - Ok(LeftRoom { - room, - unshared_projects, - left_projects, - canceled_call_connection_ids, - }) + // Ok(LeftRoom { + // room, + // unshared_projects, + // left_projects, + // canceled_call_connection_ids, + // }) } pub fn rooms(&self) -> &BTreeMap { @@ -344,48 +345,49 @@ impl Store { called_user_id: UserId, canceller_connection_id: ConnectionId, ) -> Result<(&proto::Room, HashSet)> { - let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?; - let canceller = self - .connected_users - .get(&canceller_user_id) - .ok_or_else(|| anyhow!("no such connection"))?; - let recipient = self - .connected_users - .get(&called_user_id) - .ok_or_else(|| anyhow!("no such connection"))?; - let canceller_active_call = canceller - .active_call - .as_ref() - .ok_or_else(|| anyhow!("no active call"))?; - let recipient_active_call = recipient - .active_call - .as_ref() - .ok_or_else(|| anyhow!("no active call for recipient"))?; + todo!() + // let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?; + // let canceller = self + // .connected_users + // .get(&canceller_user_id) + // .ok_or_else(|| anyhow!("no such connection"))?; + // let recipient = self + // .connected_users + // .get(&called_user_id) + // .ok_or_else(|| anyhow!("no such connection"))?; + // let canceller_active_call = canceller + // .active_call + // .as_ref() + // .ok_or_else(|| anyhow!("no active call"))?; + // let recipient_active_call = recipient + // .active_call + // .as_ref() + // .ok_or_else(|| anyhow!("no active call for recipient"))?; - anyhow::ensure!( - canceller_active_call.room_id == room_id, - "users are on different calls" - ); - anyhow::ensure!( - recipient_active_call.room_id == room_id, - "users are on different calls" - ); - anyhow::ensure!( - recipient_active_call.connection_id.is_none(), - "recipient has already answered" - ); - let room_id = recipient_active_call.room_id; - let room = self - .rooms - .get_mut(&room_id) - .ok_or_else(|| anyhow!("no such room"))?; - room.pending_participant_user_ids - .retain(|user_id| UserId::from_proto(*user_id) != called_user_id); + // anyhow::ensure!( + // canceller_active_call.room_id == room_id, + // "users are on different calls" + // ); + // anyhow::ensure!( + // recipient_active_call.room_id == room_id, + // "users are on different calls" + // ); + // anyhow::ensure!( + // recipient_active_call.connection_id.is_none(), + // "recipient has already answered" + // ); + // let room_id = recipient_active_call.room_id; + // let room = self + // .rooms + // .get_mut(&room_id) + // .ok_or_else(|| anyhow!("no such room"))?; + // room.pending_participant_user_ids + // .retain(|user_id| UserId::from_proto(*user_id) != called_user_id); - let recipient = self.connected_users.get_mut(&called_user_id).unwrap(); - recipient.active_call.take(); + // let recipient = self.connected_users.get_mut(&called_user_id).unwrap(); + // recipient.active_call.take(); - Ok((room, recipient.connection_ids.clone())) + // Ok((room, recipient.connection_ids.clone())) } pub fn decline_call( @@ -393,31 +395,32 @@ impl Store { room_id: RoomId, recipient_connection_id: ConnectionId, ) -> Result<(&proto::Room, Vec)> { - let called_user_id = self.user_id_for_connection(recipient_connection_id)?; - let recipient = self - .connected_users - .get_mut(&called_user_id) - .ok_or_else(|| anyhow!("no such connection"))?; - if let Some(active_call) = recipient.active_call { - anyhow::ensure!(active_call.room_id == room_id, "no such room"); - anyhow::ensure!( - active_call.connection_id.is_none(), - "cannot decline a call after joining room" - ); - recipient.active_call.take(); - let recipient_connection_ids = self - .connection_ids_for_user(called_user_id) - .collect::>(); - let room = self - .rooms - .get_mut(&active_call.room_id) - .ok_or_else(|| anyhow!("no such room"))?; - room.pending_participant_user_ids - .retain(|user_id| UserId::from_proto(*user_id) != called_user_id); - Ok((room, recipient_connection_ids)) - } else { - Err(anyhow!("user is not being called")) - } + todo!() + // let called_user_id = self.user_id_for_connection(recipient_connection_id)?; + // let recipient = self + // .connected_users + // .get_mut(&called_user_id) + // .ok_or_else(|| anyhow!("no such connection"))?; + // if let Some(active_call) = recipient.active_call { + // anyhow::ensure!(active_call.room_id == room_id, "no such room"); + // anyhow::ensure!( + // active_call.connection_id.is_none(), + // "cannot decline a call after joining room" + // ); + // recipient.active_call.take(); + // let recipient_connection_ids = self + // .connection_ids_for_user(called_user_id) + // .collect::>(); + // let room = self + // .rooms + // .get_mut(&active_call.room_id) + // .ok_or_else(|| anyhow!("no such room"))?; + // room.pending_participant_user_ids + // .retain(|user_id| UserId::from_proto(*user_id) != called_user_id); + // Ok((room, recipient_connection_ids)) + // } else { + // Err(anyhow!("user is not being called")) + // } } pub fn unshare_project( @@ -767,13 +770,13 @@ impl Store { } for (room_id, room) in &self.rooms { - for pending_user_id in &room.pending_participant_user_ids { - assert!( - self.connected_users - .contains_key(&UserId::from_proto(*pending_user_id)), - "call is active on a user that has disconnected" - ); - } + // for pending_user_id in &room.pending_participant_user_ids { + // assert!( + // self.connected_users + // .contains_key(&UserId::from_proto(*pending_user_id)), + // "call is active on a user that has disconnected" + // ); + // } for participant in &room.participants { assert!( @@ -793,10 +796,10 @@ impl Store { } } - assert!( - !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(), - "room can't be empty" - ); + // assert!( + // !room.pending_participant_user_ids.is_empty() || !room.participants.is_empty(), + // "room can't be empty" + // ); } for (project_id, project) in &self.projects { diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index 07e6fae3a8..c1daf75823 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -166,7 +166,7 @@ message Room { uint64 id = 1; uint64 version = 2; repeated Participant participants = 3; - repeated uint64 pending_participant_user_ids = 4; + repeated PendingParticipant pending_participants = 4; string live_kit_room = 5; } @@ -177,6 +177,12 @@ message Participant { ParticipantLocation location = 4; } +message PendingParticipant { + uint64 user_id = 1; + uint64 calling_user_id = 2; + optional uint64 initial_project_id = 3; +} + message ParticipantProject { uint64 id = 1; repeated string worktree_root_names = 2;