From cce00526b9a65afb415dcacd06c47b47fcac2af5 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Mon, 17 Oct 2022 14:03:44 +0200 Subject: [PATCH] Remove participants from live-kit rooms when they leave zed rooms --- crates/collab/.env.toml | 3 +++ crates/collab/src/rpc.rs | 41 ++++++++++++++++++++++--------- crates/collab/src/rpc/store.rs | 20 ++++++++------- crates/live_kit_server/src/api.rs | 36 +++++++++++++++++++++------ 4 files changed, 71 insertions(+), 29 deletions(-) diff --git a/crates/collab/.env.toml b/crates/collab/.env.toml index 98198eb775..8a9e28a7c8 100644 --- a/crates/collab/.env.toml +++ b/crates/collab/.env.toml @@ -2,6 +2,9 @@ DATABASE_URL = "postgres://postgres@localhost/zed" HTTP_PORT = 8080 API_TOKEN = "secret" INVITE_LINK_PREFIX = "http://localhost:3000/invites/" +LIVE_KIT_SERVER = "http://localhost:7880" +LIVE_KIT_KEY = "devkey" +LIVE_KIT_SECRET = "secret" # HONEYCOMB_API_KEY= # HONEYCOMB_DATASET= diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 8983f69f9b..888217f5aa 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -473,6 +473,7 @@ impl Server { let mut projects_to_unshare = Vec::new(); let mut contacts_to_update = HashSet::default(); + let mut room_left = None; { let mut store = self.store().await; let removed_connection = store.remove_connection(connection_id)?; @@ -501,23 +502,24 @@ impl Server { }); } + if let Some(room) = removed_connection.room { + self.room_updated(&room); + room_left = Some(self.room_left(&room, removed_connection.user_id)); + } + + contacts_to_update.insert(removed_connection.user_id); for connection_id in removed_connection.canceled_call_connection_ids { self.peer .send(connection_id, proto::CallCanceled {}) .trace_err(); contacts_to_update.extend(store.user_id_for_connection(connection_id).ok()); } - - if let Some(room) = removed_connection - .room_id - .and_then(|room_id| store.room(room_id)) - { - self.room_updated(room); - } - - contacts_to_update.insert(removed_connection.user_id); }; + if let Some(room_left) = room_left { + room_left.await.trace_err(); + } + for user_id in contacts_to_update { self.update_user_contacts(user_id).await.trace_err(); } @@ -682,6 +684,7 @@ impl Server { async fn leave_room(self: Arc, message: TypedEnvelope) -> Result<()> { let mut contacts_to_update = HashSet::default(); + let room_left; { let mut store = self.store().await; let user_id = store.user_id_for_connection(message.sender_id)?; @@ -720,9 +723,8 @@ impl Server { } } - if let Some(room) = left_room.room { - self.room_updated(room); - } + self.room_updated(&left_room.room); + room_left = self.room_left(&left_room.room, user_id); for connection_id in left_room.canceled_call_connection_ids { self.peer @@ -732,6 +734,7 @@ impl Server { } } + room_left.await.trace_err(); for user_id in contacts_to_update { self.update_user_contacts(user_id).await?; } @@ -880,6 +883,20 @@ impl Server { } } + fn room_left(&self, room: &proto::Room, user_id: UserId) -> impl Future> { + let client = self.app_state.live_kit_client.clone(); + let room_name = room.live_kit_room.clone(); + async move { + if let Some(client) = client { + client + .remove_participant(room_name, user_id.to_string()) + .await?; + } + + Ok(()) + } + } + async fn share_project( self: Arc, request: TypedEnvelope, diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index 096ac0fa06..df8be453b1 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -4,7 +4,7 @@ use collections::{btree_map, BTreeMap, BTreeSet, HashMap, HashSet}; use nanoid::nanoid; use rpc::{proto, ConnectionId}; use serde::Serialize; -use std::{mem, path::PathBuf, str, time::Duration}; +use std::{borrow::Cow, mem, path::PathBuf, str, time::Duration}; use time::OffsetDateTime; use tracing::instrument; use util::post_inc; @@ -85,12 +85,12 @@ pub struct Channel { pub type ReplicaId = u16; #[derive(Default)] -pub struct RemovedConnectionState { +pub struct RemovedConnectionState<'a> { pub user_id: UserId, pub hosted_projects: Vec, pub guest_projects: Vec, pub contact_ids: HashSet, - pub room_id: Option, + pub room: Option>, pub canceled_call_connection_ids: Vec, } @@ -103,7 +103,7 @@ pub struct LeftProject { } pub struct LeftRoom<'a> { - pub room: Option<&'a proto::Room>, + pub room: Cow<'a, proto::Room>, pub unshared_projects: Vec, pub left_projects: Vec, pub canceled_call_connection_ids: Vec, @@ -218,7 +218,7 @@ impl Store { let left_room = self.leave_room(room_id, connection_id)?; result.hosted_projects = left_room.unshared_projects; result.guest_projects = left_room.left_projects; - result.room_id = Some(room_id); + result.room = Some(Cow::Owned(left_room.room.into_owned())); result.canceled_call_connection_ids = left_room.canceled_call_connection_ids; } @@ -495,12 +495,14 @@ impl Store { } }); - if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() { - self.rooms.remove(&room_id); - } + let room = if room.participants.is_empty() && room.pending_participant_user_ids.is_empty() { + Cow::Owned(self.rooms.remove(&room_id).unwrap()) + } else { + Cow::Borrowed(self.rooms.get(&room_id).unwrap()) + }; Ok(LeftRoom { - room: self.rooms.get(&room_id), + room, unshared_projects, left_projects, canceled_call_connection_ids, diff --git a/crates/live_kit_server/src/api.rs b/crates/live_kit_server/src/api.rs index 2bbad785c3..cc235c15be 100644 --- a/crates/live_kit_server/src/api.rs +++ b/crates/live_kit_server/src/api.rs @@ -2,13 +2,14 @@ use crate::{proto, token}; use anyhow::{anyhow, Result}; use prost::Message; use reqwest::header::CONTENT_TYPE; -use std::future::Future; +use std::{future::Future, sync::Arc}; +#[derive(Clone)] pub struct Client { http: reqwest::Client, - uri: String, - key: String, - secret: String, + uri: Arc, + key: Arc, + secret: Arc, } impl Client { @@ -19,9 +20,9 @@ impl Client { Self { http: reqwest::Client::new(), - uri, - key, - secret, + uri: uri.into(), + key: key.into(), + secret: secret.into(), } } @@ -49,7 +50,26 @@ impl Client { proto::DeleteRoomRequest { room: name }, ); async move { - response.await?; + let _: proto::DeleteRoomResponse = response.await?; + Ok(()) + } + } + + pub fn remove_participant( + &self, + room: String, + identity: String, + ) -> impl Future> { + let response = self.request( + "twirp/livekit.RoomService/RemoveParticipant", + token::VideoGrant { + room_admin: Some(true), + ..Default::default() + }, + proto::RoomParticipantIdentity { room, identity }, + ); + async move { + let _: proto::RemoveParticipantResponse = response.await?; Ok(()) } }