From 6871bbbc718d8d60951712f03462ce9c69d20c4a Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 11 Nov 2022 12:06:43 +0100 Subject: [PATCH] Start moving `Store` state into the database --- crates/call/src/call.rs | 20 +- crates/call/src/room.rs | 8 +- .../20221109000000_test_schema.sql | 2 +- .../20221111092550_reconnection_support.sql | 2 +- crates/collab/src/db.rs | 354 +++++++++++++++++- crates/collab/src/integration_tests.rs | 14 +- crates/collab/src/rpc.rs | 115 +++--- crates/collab/src/rpc/store.rs | 248 +----------- .../src/incoming_call_notification.rs | 6 +- crates/rpc/proto/zed.proto | 13 +- crates/rpc/src/rpc.rs | 2 +- 11 files changed, 447 insertions(+), 337 deletions(-) diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 6b72eb61da..803fbb906a 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -22,7 +22,7 @@ pub fn init(client: Arc, user_store: ModelHandle, cx: &mut Mu #[derive(Clone)] pub struct IncomingCall { pub room_id: u64, - pub caller: Arc, + pub calling_user: Arc, pub participants: Vec>, pub initial_project: Option, } @@ -78,9 +78,9 @@ impl ActiveCall { user_store.get_users(envelope.payload.participant_user_ids, cx) }) .await?, - caller: user_store + calling_user: user_store .update(&mut cx, |user_store, cx| { - user_store.get_user(envelope.payload.caller_user_id, cx) + user_store.get_user(envelope.payload.calling_user_id, cx) }) .await?, initial_project: envelope.payload.initial_project, @@ -110,13 +110,13 @@ impl ActiveCall { pub fn invite( &mut self, - recipient_user_id: u64, + called_user_id: u64, initial_project: Option>, cx: &mut ModelContext, ) -> Task> { let client = self.client.clone(); let user_store = self.user_store.clone(); - if !self.pending_invites.insert(recipient_user_id) { + if !self.pending_invites.insert(called_user_id) { return Task::ready(Err(anyhow!("user was already invited"))); } @@ -136,13 +136,13 @@ impl ActiveCall { }; room.update(&mut cx, |room, cx| { - room.call(recipient_user_id, initial_project_id, cx) + room.call(called_user_id, initial_project_id, cx) }) .await?; } else { let room = cx .update(|cx| { - Room::create(recipient_user_id, initial_project, client, user_store, cx) + Room::create(called_user_id, initial_project, client, user_store, cx) }) .await?; @@ -155,7 +155,7 @@ impl ActiveCall { let result = invite.await; this.update(&mut cx, |this, cx| { - this.pending_invites.remove(&recipient_user_id); + this.pending_invites.remove(&called_user_id); cx.notify(); }); result @@ -164,7 +164,7 @@ impl ActiveCall { pub fn cancel_invite( &mut self, - recipient_user_id: u64, + called_user_id: u64, cx: &mut ModelContext, ) -> Task> { let room_id = if let Some(room) = self.room() { @@ -178,7 +178,7 @@ impl ActiveCall { client .request(proto::CancelCall { room_id, - recipient_user_id, + called_user_id, }) .await?; anyhow::Ok(()) diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index 7d5153950d..3e55dc4ce9 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -149,7 +149,7 @@ impl Room { } pub(crate) fn create( - recipient_user_id: u64, + called_user_id: u64, initial_project: Option>, client: Arc, user_store: ModelHandle, @@ -182,7 +182,7 @@ impl Room { match room .update(&mut cx, |room, cx| { room.leave_when_empty = true; - room.call(recipient_user_id, initial_project_id, cx) + room.call(called_user_id, initial_project_id, cx) }) .await { @@ -487,7 +487,7 @@ impl Room { pub(crate) fn call( &mut self, - recipient_user_id: u64, + called_user_id: u64, initial_project_id: Option, cx: &mut ModelContext, ) -> Task> { @@ -503,7 +503,7 @@ impl Room { let result = client .request(proto::Call { room_id, - recipient_user_id, + called_user_id, initial_project_id, }) .await; diff --git a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql index 731910027e..9302657523 100644 --- a/crates/collab/migrations.sqlite/20221109000000_test_schema.sql +++ b/crates/collab/migrations.sqlite/20221109000000_test_schema.sql @@ -82,4 +82,4 @@ CREATE TABLE "calls" ( "answering_connection_id" INTEGER, "initial_project_id" INTEGER REFERENCES projects (id) ); -CREATE UNIQUE INDEX "index_calls_on_calling_user_id" ON "calls" ("calling_user_id"); +CREATE UNIQUE INDEX "index_calls_on_called_user_id" ON "calls" ("called_user_id"); diff --git a/crates/collab/migrations/20221111092550_reconnection_support.sql b/crates/collab/migrations/20221111092550_reconnection_support.sql index 9474beff42..8f932acff3 100644 --- a/crates/collab/migrations/20221111092550_reconnection_support.sql +++ b/crates/collab/migrations/20221111092550_reconnection_support.sql @@ -44,4 +44,4 @@ CREATE TABLE IF NOT EXISTS "calls" ( "answering_connection_id" INTEGER, "initial_project_id" INTEGER REFERENCES projects (id) ); -CREATE UNIQUE INDEX "index_calls_on_calling_user_id" ON "calls" ("calling_user_id"); +CREATE UNIQUE INDEX "index_calls_on_called_user_id" ON "calls" ("called_user_id"); diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 10da609d57..b7d6f995b0 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -3,6 +3,7 @@ use anyhow::anyhow; use axum::http::StatusCode; use collections::HashMap; use futures::StreamExt; +use rpc::{proto, ConnectionId}; use serde::{Deserialize, Serialize}; use sqlx::{ migrate::{Migrate as _, Migration, MigrationSource}, @@ -565,6 +566,7 @@ where for<'a> i64: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>, for<'a> bool: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>, for<'a> Uuid: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>, + for<'a> Option: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>, for<'a> sqlx::types::JsonValue: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>, for<'a> OffsetDateTime: sqlx::Encode<'a, D> + sqlx::Decode<'a, D>, for<'a> PrimitiveDateTime: sqlx::Decode<'a, D> + sqlx::Decode<'a, D>, @@ -882,42 +884,352 @@ where }) } - // projects - - /// Registers a new project for the given user. - pub async fn register_project(&self, host_user_id: UserId) -> Result { + pub async fn create_room( + &self, + user_id: UserId, + connection_id: ConnectionId, + ) -> Result { test_support!(self, { - Ok(sqlx::query_scalar( + let mut tx = self.pool.begin().await?; + let live_kit_room = nanoid::nanoid!(30); + let room_id = sqlx::query_scalar( " - INSERT INTO projects(host_user_id) - VALUES ($1) + INSERT INTO rooms (live_kit_room, version) + VALUES ($1, $2) RETURNING id ", ) - .bind(host_user_id) - .fetch_one(&self.pool) + .bind(&live_kit_room) + .bind(0) + .fetch_one(&mut tx) .await - .map(ProjectId)?) + .map(RoomId)?; + + sqlx::query( + " + INSERT INTO room_participants (room_id, user_id, connection_id) + VALUES ($1, $2, $3) + ", + ) + .bind(room_id) + .bind(user_id) + .bind(connection_id.0 as i32) + .execute(&mut tx) + .await?; + + sqlx::query( + " + INSERT INTO calls (room_id, calling_user_id, called_user_id, answering_connection_id) + VALUES ($1, $2, $3, $4) + ", + ) + .bind(room_id) + .bind(user_id) + .bind(user_id) + .bind(connection_id.0 as i32) + .execute(&mut tx) + .await?; + + self.commit_room_transaction(room_id, tx).await }) } - /// Unregisters a project for the given project id. - pub async fn unregister_project(&self, project_id: ProjectId) -> Result<()> { + pub async fn call( + &self, + room_id: RoomId, + calling_user_id: UserId, + called_user_id: UserId, + initial_project_id: Option, + ) -> Result { test_support!(self, { + let mut tx = self.pool.begin().await?; sqlx::query( " - UPDATE projects - SET unregistered = TRUE - WHERE id = $1 + INSERT INTO calls (room_id, calling_user_id, called_user_id, initial_project_id) + VALUES ($1, $2, $3, $4) + ", + ) + .bind(room_id) + .bind(calling_user_id) + .bind(called_user_id) + .bind(initial_project_id) + .execute(&mut tx) + .await?; + + sqlx::query( + " + INSERT INTO room_participants (room_id, user_id) + VALUES ($1, $2) + ", + ) + .bind(room_id) + .bind(called_user_id) + .execute(&mut tx) + .await?; + + self.commit_room_transaction(room_id, tx).await + }) + } + + pub async fn call_failed( + &self, + room_id: RoomId, + called_user_id: UserId, + ) -> Result { + test_support!(self, { + let mut tx = self.pool.begin().await?; + sqlx::query( + " + DELETE FROM calls + WHERE room_id = $1 AND called_user_id = $2 + ", + ) + .bind(room_id) + .bind(called_user_id) + .execute(&mut tx) + .await?; + + sqlx::query( + " + DELETE FROM room_participants + WHERE room_id = $1 AND user_id = $2 + ", + ) + .bind(room_id) + .bind(called_user_id) + .execute(&mut tx) + .await?; + + self.commit_room_transaction(room_id, tx).await + }) + } + + pub async fn update_room_participant_location( + &self, + room_id: RoomId, + user_id: UserId, + location: proto::ParticipantLocation, + ) -> Result { + test_support!(self, { + let mut tx = self.pool.begin().await?; + + let location_kind; + let location_project_id; + match location + .variant + .ok_or_else(|| anyhow!("invalid location"))? + { + proto::participant_location::Variant::SharedProject(project) => { + location_kind = 0; + location_project_id = Some(ProjectId::from_proto(project.id)); + } + proto::participant_location::Variant::UnsharedProject(_) => { + location_kind = 1; + location_project_id = None; + } + proto::participant_location::Variant::External(_) => { + location_kind = 2; + location_project_id = None; + } + } + + sqlx::query( + " + UPDATE room_participants + SET location_kind = $1 AND location_project_id = $2 + WHERE room_id = $1 AND user_id = $2 + ", + ) + .bind(location_kind) + .bind(location_project_id) + .bind(room_id) + .bind(user_id) + .execute(&mut tx) + .await?; + + self.commit_room_transaction(room_id, tx).await + }) + } + + async fn commit_room_transaction( + &self, + room_id: RoomId, + mut tx: sqlx::Transaction<'_, D>, + ) -> Result { + sqlx::query( + " + UPDATE rooms + SET version = version + 1 + WHERE id = $1 + ", + ) + .bind(room_id) + .execute(&mut tx) + .await?; + + let room: Room = sqlx::query_as( + " + SELECT * + FROM rooms + WHERE id = $1 + ", + ) + .bind(room_id) + .fetch_one(&mut tx) + .await?; + + let mut db_participants = + sqlx::query_as::<_, (UserId, Option, Option, Option)>( + " + SELECT user_id, connection_id, location_kind, location_project_id + FROM room_participants + WHERE room_id = $1 + ", + ) + .bind(room_id) + .fetch(&mut tx); + + let mut participants = Vec::new(); + let mut pending_participant_user_ids = Vec::new(); + while let Some(participant) = db_participants.next().await { + let (user_id, connection_id, _location_kind, _location_project_id) = participant?; + if let Some(connection_id) = connection_id { + participants.push(proto::Participant { + user_id: user_id.to_proto(), + peer_id: connection_id as u32, + projects: Default::default(), + location: Some(proto::ParticipantLocation { + variant: Some(proto::participant_location::Variant::External( + Default::default(), + )), + }), + }); + } else { + pending_participant_user_ids.push(user_id.to_proto()); + } + } + drop(db_participants); + + for participant in &mut participants { + let mut entries = sqlx::query_as::<_, (ProjectId, String)>( + " + SELECT projects.id, worktrees.root_name + FROM projects + LEFT JOIN worktrees ON projects.id = worktrees.project_id + WHERE room_id = $1 AND host_user_id = $2 + ", + ) + .bind(room_id) + .fetch(&mut tx); + + let mut projects = HashMap::default(); + while let Some(entry) = entries.next().await { + let (project_id, worktree_root_name) = entry?; + let participant_project = + projects + .entry(project_id) + .or_insert(proto::ParticipantProject { + id: project_id.to_proto(), + worktree_root_names: Default::default(), + }); + participant_project + .worktree_root_names + .push(worktree_root_name); + } + + participant.projects = projects.into_values().collect(); + } + + tx.commit().await?; + + Ok(proto::Room { + id: room.id.to_proto(), + version: room.version as u64, + live_kit_room: room.live_kit_room, + participants, + pending_participant_user_ids, + }) + } + + // projects + + pub async fn share_project( + &self, + user_id: UserId, + connection_id: ConnectionId, + room_id: RoomId, + worktrees: &[proto::WorktreeMetadata], + ) -> Result<(ProjectId, proto::Room)> { + test_support!(self, { + let mut tx = self.pool.begin().await?; + let project_id = sqlx::query_scalar( + " + INSERT INTO projects (host_user_id, room_id) + VALUES ($1) + RETURNING id + ", + ) + .bind(user_id) + .bind(room_id) + .fetch_one(&mut tx) + .await + .map(ProjectId)?; + + for worktree in worktrees { + sqlx::query( + " + INSERT INTO worktrees (id, project_id, root_name) + ", + ) + .bind(worktree.id as i32) + .bind(project_id) + .bind(&worktree.root_name) + .execute(&mut tx) + .await?; + } + + sqlx::query( + " + INSERT INTO project_collaborators ( + project_id, + connection_id, + user_id, + replica_id, + is_host + ) + VALUES ($1, $2, $3, $4, $5) ", ) .bind(project_id) - .execute(&self.pool) + .bind(connection_id.0 as i32) + .bind(user_id) + .bind(0) + .bind(true) + .execute(&mut tx) .await?; - Ok(()) + + let room = self.commit_room_transaction(room_id, tx).await?; + Ok((project_id, room)) }) } + pub async fn unshare_project(&self, project_id: ProjectId) -> Result<()> { + todo!() + // test_support!(self, { + // sqlx::query( + // " + // UPDATE projects + // SET unregistered = TRUE + // WHERE id = $1 + // ", + // ) + // .bind(project_id) + // .execute(&self.pool) + // .await?; + // Ok(()) + // }) + } + // contacts pub async fn get_contacts(&self, user_id: UserId) -> Result> { @@ -1246,6 +1558,14 @@ pub struct User { pub connected_once: bool, } +id_type!(RoomId); +#[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)] +pub struct Room { + pub id: RoomId, + pub version: i32, + pub live_kit_room: String, +} + id_type!(ProjectId); #[derive(Clone, Debug, Default, FromRow, Serialize, PartialEq)] pub struct Project { diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 0a6c01a691..6d3cff1718 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -104,7 +104,7 @@ async fn test_basic_calls( // User B receives the call. let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming()); let call_b = incoming_call_b.next().await.unwrap().unwrap(); - assert_eq!(call_b.caller.github_login, "user_a"); + assert_eq!(call_b.calling_user.github_login, "user_a"); // User B connects via another client and also receives a ring on the newly-connected client. let _client_b2 = server.create_client(cx_b2, "user_b").await; @@ -112,7 +112,7 @@ async fn test_basic_calls( let mut incoming_call_b2 = active_call_b2.read_with(cx_b2, |call, _| call.incoming()); deterministic.run_until_parked(); let call_b2 = incoming_call_b2.next().await.unwrap().unwrap(); - assert_eq!(call_b2.caller.github_login, "user_a"); + assert_eq!(call_b2.calling_user.github_login, "user_a"); // User B joins the room using the first client. active_call_b @@ -165,7 +165,7 @@ async fn test_basic_calls( // User C receives the call, but declines it. let call_c = incoming_call_c.next().await.unwrap().unwrap(); - assert_eq!(call_c.caller.github_login, "user_b"); + assert_eq!(call_c.calling_user.github_login, "user_b"); active_call_c.update(cx_c, |call, _| call.decline_incoming().unwrap()); assert!(incoming_call_c.next().await.unwrap().is_none()); @@ -308,7 +308,7 @@ async fn test_room_uniqueness( // User B receives the call from user A. let mut incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming()); let call_b1 = incoming_call_b.next().await.unwrap().unwrap(); - assert_eq!(call_b1.caller.github_login, "user_a"); + assert_eq!(call_b1.calling_user.github_login, "user_a"); // Ensure calling users A and B from client C fails. active_call_c @@ -367,7 +367,7 @@ async fn test_room_uniqueness( .unwrap(); deterministic.run_until_parked(); let call_b2 = incoming_call_b.next().await.unwrap().unwrap(); - assert_eq!(call_b2.caller.github_login, "user_c"); + assert_eq!(call_b2.calling_user.github_login, "user_c"); } #[gpui::test(iterations = 10)] @@ -695,7 +695,7 @@ async fn test_share_project( let incoming_call_b = active_call_b.read_with(cx_b, |call, _| call.incoming()); deterministic.run_until_parked(); let call = incoming_call_b.borrow().clone().unwrap(); - assert_eq!(call.caller.github_login, "user_a"); + assert_eq!(call.calling_user.github_login, "user_a"); let initial_project = call.initial_project.unwrap(); active_call_b .update(cx_b, |call, cx| call.accept_incoming(cx)) @@ -766,7 +766,7 @@ async fn test_share_project( let incoming_call_c = active_call_c.read_with(cx_c, |call, _| call.incoming()); deterministic.run_until_parked(); let call = incoming_call_c.borrow().clone().unwrap(); - assert_eq!(call.caller.github_login, "user_b"); + assert_eq!(call.calling_user.github_login, "user_b"); let initial_project = call.initial_project.unwrap(); active_call_c .update(cx_c, |call, cx| call.accept_incoming(cx)) diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 757c765838..75ff703b1f 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -2,7 +2,7 @@ mod store; use crate::{ auth, - db::{self, ProjectId, User, UserId}, + db::{self, ProjectId, RoomId, User, UserId}, AppState, Result, }; use anyhow::anyhow; @@ -486,7 +486,7 @@ impl Server { for project_id in projects_to_unshare { self.app_state .db - .unregister_project(project_id) + .unshare_project(project_id) .await .trace_err(); } @@ -559,11 +559,11 @@ impl Server { request: Message, response: Response, ) -> Result<()> { - let room; - { - let mut store = self.store().await; - room = store.create_room(request.sender_connection_id)?.clone(); - } + let room = self + .app_state + .db + .create_room(request.sender_user_id, request.sender_connection_id) + .await?; let live_kit_connection_info = if let Some(live_kit) = self.app_state.live_kit_client.as_ref() { @@ -710,8 +710,9 @@ impl Server { request: Message, response: Response, ) -> Result<()> { - let caller_user_id = request.sender_user_id; - let recipient_user_id = UserId::from_proto(request.payload.recipient_user_id); + let room_id = RoomId::from_proto(request.payload.room_id); + let calling_user_id = request.sender_user_id; + let called_user_id = UserId::from_proto(request.payload.called_user_id); let initial_project_id = request .payload .initial_project_id @@ -719,31 +720,44 @@ impl Server { if !self .app_state .db - .has_contact(caller_user_id, recipient_user_id) + .has_contact(calling_user_id, called_user_id) .await? { return Err(anyhow!("cannot call a user who isn't a contact"))?; } - let room_id = request.payload.room_id; - let mut calls = { - let mut store = self.store().await; - let (room, recipient_connection_ids, incoming_call) = store.call( - room_id, - recipient_user_id, - initial_project_id, - request.sender_connection_id, - )?; - self.room_updated(room); - recipient_connection_ids - .into_iter() - .map(|recipient_connection_id| { - self.peer - .request(recipient_connection_id, incoming_call.clone()) - }) - .collect::>() + let room = self + .app_state + .db + .call(room_id, calling_user_id, called_user_id, initial_project_id) + .await?; + self.room_updated(&room); + self.update_user_contacts(called_user_id).await?; + + let incoming_call = proto::IncomingCall { + room_id: room_id.to_proto(), + calling_user_id: calling_user_id.to_proto(), + participant_user_ids: room + .participants + .iter() + .map(|participant| participant.user_id) + .collect(), + initial_project: room.participants.iter().find_map(|participant| { + let initial_project_id = initial_project_id?.to_proto(); + participant + .projects + .iter() + .find(|project| project.id == initial_project_id) + .cloned() + }), }; - self.update_user_contacts(recipient_user_id).await?; + + let mut calls = self + .store() + .await + .connection_ids_for_user(called_user_id) + .map(|connection_id| self.peer.request(connection_id, incoming_call.clone())) + .collect::>(); while let Some(call_response) = calls.next().await { match call_response.as_ref() { @@ -757,12 +771,13 @@ impl Server { } } - { - let mut store = self.store().await; - let room = store.call_failed(room_id, recipient_user_id)?; - self.room_updated(&room); - } - self.update_user_contacts(recipient_user_id).await?; + let room = self + .app_state + .db + .call_failed(room_id, called_user_id) + .await?; + self.room_updated(&room); + self.update_user_contacts(called_user_id).await?; Err(anyhow!("failed to ring call recipient"))? } @@ -772,7 +787,7 @@ impl Server { request: Message, response: Response, ) -> Result<()> { - let recipient_user_id = UserId::from_proto(request.payload.recipient_user_id); + let recipient_user_id = UserId::from_proto(request.payload.called_user_id); { let mut store = self.store().await; let (room, recipient_connection_ids) = store.cancel_call( @@ -814,15 +829,17 @@ impl Server { request: Message, response: Response, ) -> Result<()> { - let room_id = request.payload.room_id; + let room_id = RoomId::from_proto(request.payload.room_id); let location = request .payload .location .ok_or_else(|| anyhow!("invalid location"))?; - let mut store = self.store().await; - let room = - store.update_participant_location(room_id, location, request.sender_connection_id)?; - self.room_updated(room); + let room = self + .app_state + .db + .update_room_participant_location(room_id, request.sender_user_id, location) + .await?; + self.room_updated(&room); response.send(proto::Ack {})?; Ok(()) } @@ -868,22 +885,20 @@ impl Server { request: Message, response: Response, ) -> Result<()> { - let project_id = self + let (project_id, room) = self .app_state .db - .register_project(request.sender_user_id) + .share_project( + request.sender_user_id, + request.sender_connection_id, + RoomId::from_proto(request.payload.room_id), + &request.payload.worktrees, + ) .await?; - let mut store = self.store().await; - let room = store.share_project( - request.payload.room_id, - project_id, - request.payload.worktrees, - request.sender_connection_id, - )?; response.send(proto::ShareProjectResponse { project_id: project_id.to_proto(), })?; - self.room_updated(room); + self.room_updated(&room); Ok(()) } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 81ef594ccd..72da82ea8c 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -1,12 +1,10 @@ use crate::db::{self, ProjectId, UserId}; use anyhow::{anyhow, Result}; use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet}; -use nanoid::nanoid; use rpc::{proto, ConnectionId}; use serde::Serialize; use std::{borrow::Cow, mem, path::PathBuf, str}; use tracing::instrument; -use util::post_inc; pub type RoomId = u64; @@ -34,7 +32,7 @@ struct ConnectionState { #[derive(Copy, Clone, Eq, PartialEq, Serialize)] pub struct Call { - pub caller_user_id: UserId, + pub calling_user_id: UserId, pub room_id: RoomId, pub connection_id: Option, pub initial_project_id: Option, @@ -147,7 +145,7 @@ impl Store { let room = self.room(active_call.room_id)?; Some(proto::IncomingCall { room_id: active_call.room_id, - caller_user_id: active_call.caller_user_id.to_proto(), + calling_user_id: active_call.calling_user_id.to_proto(), participant_user_ids: room .participants .iter() @@ -285,47 +283,6 @@ impl Store { } } - pub fn create_room(&mut self, creator_connection_id: ConnectionId) -> Result<&proto::Room> { - let connection = self - .connections - .get_mut(&creator_connection_id) - .ok_or_else(|| anyhow!("no such connection"))?; - let connected_user = self - .connected_users - .get_mut(&connection.user_id) - .ok_or_else(|| anyhow!("no such connection"))?; - anyhow::ensure!( - connected_user.active_call.is_none(), - "can't create a room with an active call" - ); - - let room_id = post_inc(&mut self.next_room_id); - let room = proto::Room { - id: room_id, - participants: vec![proto::Participant { - user_id: connection.user_id.to_proto(), - peer_id: creator_connection_id.0, - projects: Default::default(), - location: Some(proto::ParticipantLocation { - variant: Some(proto::participant_location::Variant::External( - proto::participant_location::External {}, - )), - }), - }], - pending_participant_user_ids: Default::default(), - live_kit_room: nanoid!(30), - }; - - self.rooms.insert(room_id, room); - connected_user.active_call = Some(Call { - caller_user_id: connection.user_id, - room_id, - connection_id: Some(creator_connection_id), - initial_project_id: None, - }); - Ok(self.rooms.get(&room_id).unwrap()) - } - pub fn join_room( &mut self, room_id: RoomId, @@ -424,7 +381,7 @@ impl Store { .get_mut(&UserId::from_proto(*pending_participant_user_id)) { if let Some(call) = connected_user.active_call.as_ref() { - if call.caller_user_id == user_id { + if call.calling_user_id == user_id { connected_user.active_call.take(); canceled_call_connection_ids .extend(connected_user.connection_ids.iter().copied()); @@ -462,101 +419,10 @@ impl Store { &self.rooms } - pub fn call( - &mut self, - room_id: RoomId, - recipient_user_id: UserId, - initial_project_id: Option, - from_connection_id: ConnectionId, - ) -> Result<(&proto::Room, Vec, proto::IncomingCall)> { - let caller_user_id = self.user_id_for_connection(from_connection_id)?; - - let recipient_connection_ids = self - .connection_ids_for_user(recipient_user_id) - .collect::>(); - let mut recipient = self - .connected_users - .get_mut(&recipient_user_id) - .ok_or_else(|| anyhow!("no such connection"))?; - anyhow::ensure!( - recipient.active_call.is_none(), - "recipient is already on another call" - ); - - let room = self - .rooms - .get_mut(&room_id) - .ok_or_else(|| anyhow!("no such room"))?; - anyhow::ensure!( - room.participants - .iter() - .any(|participant| participant.peer_id == from_connection_id.0), - "no such room" - ); - anyhow::ensure!( - room.pending_participant_user_ids - .iter() - .all(|user_id| UserId::from_proto(*user_id) != recipient_user_id), - "cannot call the same user more than once" - ); - room.pending_participant_user_ids - .push(recipient_user_id.to_proto()); - - if let Some(initial_project_id) = initial_project_id { - let project = self - .projects - .get(&initial_project_id) - .ok_or_else(|| anyhow!("no such project"))?; - anyhow::ensure!(project.room_id == room_id, "no such project"); - } - - recipient.active_call = Some(Call { - caller_user_id, - room_id, - connection_id: None, - initial_project_id, - }); - - Ok(( - room, - recipient_connection_ids, - proto::IncomingCall { - room_id, - caller_user_id: caller_user_id.to_proto(), - participant_user_ids: room - .participants - .iter() - .map(|participant| participant.user_id) - .collect(), - initial_project: initial_project_id - .and_then(|id| Self::build_participant_project(id, &self.projects)), - }, - )) - } - - pub fn call_failed(&mut self, room_id: RoomId, to_user_id: UserId) -> Result<&proto::Room> { - let mut recipient = self - .connected_users - .get_mut(&to_user_id) - .ok_or_else(|| anyhow!("no such connection"))?; - anyhow::ensure!(recipient - .active_call - .map_or(false, |call| call.room_id == room_id - && call.connection_id.is_none())); - recipient.active_call = None; - let room = self - .rooms - .get_mut(&room_id) - .ok_or_else(|| anyhow!("no such room"))?; - room.pending_participant_user_ids - .retain(|user_id| UserId::from_proto(*user_id) != to_user_id); - Ok(room) - } - pub fn cancel_call( &mut self, room_id: RoomId, - recipient_user_id: UserId, + called_user_id: UserId, canceller_connection_id: ConnectionId, ) -> Result<(&proto::Room, HashSet)> { let canceller_user_id = self.user_id_for_connection(canceller_connection_id)?; @@ -566,7 +432,7 @@ impl Store { .ok_or_else(|| anyhow!("no such connection"))?; let recipient = self .connected_users - .get(&recipient_user_id) + .get(&called_user_id) .ok_or_else(|| anyhow!("no such connection"))?; let canceller_active_call = canceller .active_call @@ -595,9 +461,9 @@ impl Store { .get_mut(&room_id) .ok_or_else(|| anyhow!("no such room"))?; room.pending_participant_user_ids - .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id); + .retain(|user_id| UserId::from_proto(*user_id) != called_user_id); - let recipient = self.connected_users.get_mut(&recipient_user_id).unwrap(); + let recipient = self.connected_users.get_mut(&called_user_id).unwrap(); recipient.active_call.take(); Ok((room, recipient.connection_ids.clone())) @@ -608,10 +474,10 @@ impl Store { room_id: RoomId, recipient_connection_id: ConnectionId, ) -> Result<(&proto::Room, Vec)> { - let recipient_user_id = self.user_id_for_connection(recipient_connection_id)?; + let called_user_id = self.user_id_for_connection(recipient_connection_id)?; let recipient = self .connected_users - .get_mut(&recipient_user_id) + .get_mut(&called_user_id) .ok_or_else(|| anyhow!("no such connection"))?; if let Some(active_call) = recipient.active_call { anyhow::ensure!(active_call.room_id == room_id, "no such room"); @@ -621,112 +487,20 @@ impl Store { ); recipient.active_call.take(); let recipient_connection_ids = self - .connection_ids_for_user(recipient_user_id) + .connection_ids_for_user(called_user_id) .collect::>(); let room = self .rooms .get_mut(&active_call.room_id) .ok_or_else(|| anyhow!("no such room"))?; room.pending_participant_user_ids - .retain(|user_id| UserId::from_proto(*user_id) != recipient_user_id); + .retain(|user_id| UserId::from_proto(*user_id) != called_user_id); Ok((room, recipient_connection_ids)) } else { Err(anyhow!("user is not being called")) } } - pub fn update_participant_location( - &mut self, - room_id: RoomId, - location: proto::ParticipantLocation, - connection_id: ConnectionId, - ) -> Result<&proto::Room> { - let room = self - .rooms - .get_mut(&room_id) - .ok_or_else(|| anyhow!("no such room"))?; - if let Some(proto::participant_location::Variant::SharedProject(project)) = - location.variant.as_ref() - { - anyhow::ensure!( - room.participants - .iter() - .flat_map(|participant| &participant.projects) - .any(|participant_project| participant_project.id == project.id), - "no such project" - ); - } - - let participant = room - .participants - .iter_mut() - .find(|participant| participant.peer_id == connection_id.0) - .ok_or_else(|| anyhow!("no such room"))?; - participant.location = Some(location); - - Ok(room) - } - - pub fn share_project( - &mut self, - room_id: RoomId, - project_id: ProjectId, - worktrees: Vec, - host_connection_id: ConnectionId, - ) -> Result<&proto::Room> { - let connection = self - .connections - .get_mut(&host_connection_id) - .ok_or_else(|| anyhow!("no such connection"))?; - - let room = self - .rooms - .get_mut(&room_id) - .ok_or_else(|| anyhow!("no such room"))?; - let participant = room - .participants - .iter_mut() - .find(|participant| participant.peer_id == host_connection_id.0) - .ok_or_else(|| anyhow!("no such room"))?; - - connection.projects.insert(project_id); - self.projects.insert( - project_id, - Project { - id: project_id, - room_id, - host_connection_id, - host: Collaborator { - user_id: connection.user_id, - replica_id: 0, - admin: connection.admin, - }, - guests: Default::default(), - active_replica_ids: Default::default(), - worktrees: worktrees - .into_iter() - .map(|worktree| { - ( - worktree.id, - Worktree { - root_name: worktree.root_name, - visible: worktree.visible, - ..Default::default() - }, - ) - }) - .collect(), - language_servers: Default::default(), - }, - ); - - participant - .projects - .extend(Self::build_participant_project(project_id, &self.projects)); - - Ok(room) - } - pub fn unshare_project( &mut self, project_id: ProjectId, diff --git a/crates/collab_ui/src/incoming_call_notification.rs b/crates/collab_ui/src/incoming_call_notification.rs index e5c4b27d7e..a51fb4891d 100644 --- a/crates/collab_ui/src/incoming_call_notification.rs +++ b/crates/collab_ui/src/incoming_call_notification.rs @@ -74,7 +74,7 @@ impl IncomingCallNotification { let active_call = ActiveCall::global(cx); if action.accept { let join = active_call.update(cx, |active_call, cx| active_call.accept_incoming(cx)); - let caller_user_id = self.call.caller.id; + let caller_user_id = self.call.calling_user.id; let initial_project_id = self.call.initial_project.as_ref().map(|project| project.id); cx.spawn_weak(|_, mut cx| async move { join.await?; @@ -105,7 +105,7 @@ impl IncomingCallNotification { .as_ref() .unwrap_or(&default_project); Flex::row() - .with_children(self.call.caller.avatar.clone().map(|avatar| { + .with_children(self.call.calling_user.avatar.clone().map(|avatar| { Image::new(avatar) .with_style(theme.caller_avatar) .aligned() @@ -115,7 +115,7 @@ impl IncomingCallNotification { Flex::column() .with_child( Label::new( - self.call.caller.github_login.clone(), + self.call.calling_user.github_login.clone(), theme.caller_username.text.clone(), ) .contained() diff --git a/crates/rpc/proto/zed.proto b/crates/rpc/proto/zed.proto index ded708370d..07e6fae3a8 100644 --- a/crates/rpc/proto/zed.proto +++ b/crates/rpc/proto/zed.proto @@ -164,9 +164,10 @@ message LeaveRoom { message Room { uint64 id = 1; - repeated Participant participants = 2; - repeated uint64 pending_participant_user_ids = 3; - string live_kit_room = 4; + uint64 version = 2; + repeated Participant participants = 3; + repeated uint64 pending_participant_user_ids = 4; + string live_kit_room = 5; } message Participant { @@ -199,13 +200,13 @@ message ParticipantLocation { message Call { uint64 room_id = 1; - uint64 recipient_user_id = 2; + uint64 called_user_id = 2; optional uint64 initial_project_id = 3; } message IncomingCall { uint64 room_id = 1; - uint64 caller_user_id = 2; + uint64 calling_user_id = 2; repeated uint64 participant_user_ids = 3; optional ParticipantProject initial_project = 4; } @@ -214,7 +215,7 @@ message CallCanceled {} message CancelCall { uint64 room_id = 1; - uint64 recipient_user_id = 2; + uint64 called_user_id = 2; } message DeclineCall { diff --git a/crates/rpc/src/rpc.rs b/crates/rpc/src/rpc.rs index b6aef64677..5ca5711d9c 100644 --- a/crates/rpc/src/rpc.rs +++ b/crates/rpc/src/rpc.rs @@ -6,4 +6,4 @@ pub use conn::Connection; pub use peer::*; mod macros; -pub const PROTOCOL_VERSION: u32 = 39; +pub const PROTOCOL_VERSION: u32 = 40;