use crate::{ConnectionState, RoomUpdate, Sid}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use collections::{BTreeMap, HashMap, HashSet}; use futures::Stream; use gpui::{BackgroundExecutor, ImageSource}; use live_kit_server::{proto, token}; use parking_lot::Mutex; use postage::watch; use std::{ future::Future, mem, sync::{ atomic::{AtomicBool, Ordering::SeqCst}, Arc, Weak, }, }; static SERVERS: Mutex>> = Mutex::new(BTreeMap::new()); pub struct TestServer { pub url: String, pub api_key: String, pub secret_key: String, rooms: Mutex>, executor: BackgroundExecutor, } impl TestServer { pub fn create( url: String, api_key: String, secret_key: String, executor: BackgroundExecutor, ) -> Result> { let mut servers = SERVERS.lock(); if servers.contains_key(&url) { Err(anyhow!("a server with url {:?} already exists", url)) } else { let server = Arc::new(TestServer { url: url.clone(), api_key, secret_key, rooms: Default::default(), executor, }); servers.insert(url, server.clone()); Ok(server) } } fn get(url: &str) -> Result> { Ok(SERVERS .lock() .get(url) .ok_or_else(|| anyhow!("no server found for url"))? .clone()) } pub fn teardown(&self) -> Result<()> { SERVERS .lock() .remove(&self.url) .ok_or_else(|| anyhow!("server with url {:?} does not exist", self.url))?; Ok(()) } pub fn create_api_client(&self) -> TestApiClient { TestApiClient { url: self.url.clone(), } } pub async fn create_room(&self, room: String) -> Result<()> { // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); if server_rooms.contains_key(&room) { Err(anyhow!("room {:?} already exists", room)) } else { server_rooms.insert(room, Default::default()); Ok(()) } } async fn delete_room(&self, room: String) -> Result<()> { // TODO: clear state associated with all `Room`s. // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); server_rooms .remove(&room) .ok_or_else(|| anyhow!("room {:?} does not exist", room))?; Ok(()) } async fn join_room(&self, token: String, client_room: Arc) -> Result<()> { // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); let mut server_rooms = self.rooms.lock(); let room = (*server_rooms).entry(room_name.to_string()).or_default(); if room.client_rooms.contains_key(&identity) { Err(anyhow!( "{:?} attempted to join room {:?} twice", identity, room_name )) } else { for track in &room.video_tracks { client_room .0 .lock() .updates_tx .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new( RemoteVideoTrack { server_track: track.clone(), }, ))) .unwrap(); } for track in &room.audio_tracks { client_room .0 .lock() .updates_tx .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack( Arc::new(RemoteAudioTrack { server_track: track.clone(), room: Arc::downgrade(&client_room), }), Arc::new(RemoteTrackPublication), )) .unwrap(); } room.client_rooms.insert(identity, client_room); Ok(()) } } async fn leave_room(&self, token: String) -> Result<()> { // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; room.client_rooms.remove(&identity).ok_or_else(|| { anyhow!( "{:?} attempted to leave room {:?} before joining it", identity, room_name ) })?; Ok(()) } async fn remove_participant(&self, room_name: String, identity: String) -> Result<()> { // TODO: clear state associated with the `Room`. // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; room.client_rooms.remove(&identity).ok_or_else(|| { anyhow!( "participant {:?} did not join room {:?}", identity, room_name ) })?; Ok(()) } async fn update_participant( &self, room_name: String, identity: String, permission: proto::ParticipantPermission, ) -> Result<()> { // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; room.participant_permissions.insert(identity, permission); Ok(()) } pub async fn disconnect_client(&self, client_identity: String) { // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] self.executor.simulate_random_delay().await; let mut server_rooms = self.rooms.lock(); for room in server_rooms.values_mut() { if let Some(room) = room.client_rooms.remove(&client_identity) { *room.0.lock().connection.0.borrow_mut() = ConnectionState::Disconnected; } } } async fn publish_video_track( &self, token: String, local_track: LocalVideoTrack, ) -> Result { // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; let can_publish = room .participant_permissions .get(&identity) .map(|permission| permission.can_publish) .or(claims.video.can_publish) .unwrap_or(true); if !can_publish { return Err(anyhow!("user is not allowed to publish")); } let sid = nanoid::nanoid!(17); let track = Arc::new(TestServerVideoTrack { sid: sid.clone(), publisher_id: identity.clone(), frames_rx: local_track.frames_rx.clone(), }); room.video_tracks.push(track.clone()); for (id, client_room) in &room.client_rooms { if *id != identity { let _ = client_room .0 .lock() .updates_tx .try_broadcast(RoomUpdate::SubscribedToRemoteVideoTrack(Arc::new( RemoteVideoTrack { server_track: track.clone(), }, ))) .unwrap(); } } Ok(sid) } async fn publish_audio_track( &self, token: String, _local_track: &LocalAudioTrack, ) -> Result { // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] self.executor.simulate_random_delay().await; let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let identity = claims.sub.unwrap().to_string(); let room_name = claims.video.room.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; let can_publish = room .participant_permissions .get(&identity) .map(|permission| permission.can_publish) .or(claims.video.can_publish) .unwrap_or(true); if !can_publish { return Err(anyhow!("user is not allowed to publish")); } let sid = nanoid::nanoid!(17); let track = Arc::new(TestServerAudioTrack { sid: sid.clone(), publisher_id: identity.clone(), muted: AtomicBool::new(false), }); let publication = Arc::new(RemoteTrackPublication); room.audio_tracks.push(track.clone()); for (id, client_room) in &room.client_rooms { if *id != identity { let _ = client_room .0 .lock() .updates_tx .try_broadcast(RoomUpdate::SubscribedToRemoteAudioTrack( Arc::new(RemoteAudioTrack { server_track: track.clone(), room: Arc::downgrade(&client_room), }), publication.clone(), )) .unwrap(); } } Ok(sid) } fn set_track_muted(&self, token: &str, track_sid: &str, muted: bool) -> Result<()> { let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let room_name = claims.video.room.unwrap(); let identity = claims.sub.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; if let Some(track) = room .audio_tracks .iter_mut() .find(|track| track.sid == track_sid) { track.muted.store(muted, SeqCst); for (id, client_room) in room.client_rooms.iter() { if *id != identity { client_room .0 .lock() .updates_tx .try_broadcast(RoomUpdate::RemoteAudioTrackMuteChanged { track_id: track_sid.to_string(), muted, }) .unwrap(); } } } Ok(()) } fn is_track_muted(&self, token: &str, track_sid: &str) -> Option { let claims = live_kit_server::token::validate(&token, &self.secret_key).ok()?; let room_name = claims.video.room.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms.get_mut(&*room_name)?; room.audio_tracks.iter().find_map(|track| { if track.sid == track_sid { Some(track.muted.load(SeqCst)) } else { None } }) } fn video_tracks(&self, token: String) -> Result>> { let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let room_name = claims.video.room.unwrap(); let identity = claims.sub.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; room.client_rooms .get(identity.as_ref()) .ok_or_else(|| anyhow!("not a participant in room"))?; Ok(room .video_tracks .iter() .map(|track| { Arc::new(RemoteVideoTrack { server_track: track.clone(), }) }) .collect()) } fn audio_tracks(&self, token: String) -> Result>> { let claims = live_kit_server::token::validate(&token, &self.secret_key)?; let room_name = claims.video.room.unwrap(); let identity = claims.sub.unwrap(); let mut server_rooms = self.rooms.lock(); let room = server_rooms .get_mut(&*room_name) .ok_or_else(|| anyhow!("room {} does not exist", room_name))?; let client_room = room .client_rooms .get(identity.as_ref()) .ok_or_else(|| anyhow!("not a participant in room"))?; Ok(room .audio_tracks .iter() .map(|track| { Arc::new(RemoteAudioTrack { server_track: track.clone(), room: Arc::downgrade(&client_room), }) }) .collect()) } } #[derive(Default)] struct TestServerRoom { client_rooms: HashMap>, video_tracks: Vec>, audio_tracks: Vec>, participant_permissions: HashMap, } #[derive(Debug)] struct TestServerVideoTrack { sid: Sid, publisher_id: Sid, frames_rx: async_broadcast::Receiver, } #[derive(Debug)] struct TestServerAudioTrack { sid: Sid, publisher_id: Sid, muted: AtomicBool, } impl TestServerRoom {} pub struct TestApiClient { url: String, } #[async_trait] impl live_kit_server::api::Client for TestApiClient { fn url(&self) -> &str { &self.url } async fn create_room(&self, name: String) -> Result<()> { let server = TestServer::get(&self.url)?; server.create_room(name).await?; Ok(()) } async fn delete_room(&self, name: String) -> Result<()> { let server = TestServer::get(&self.url)?; server.delete_room(name).await?; Ok(()) } async fn remove_participant(&self, room: String, identity: String) -> Result<()> { let server = TestServer::get(&self.url)?; server.remove_participant(room, identity).await?; Ok(()) } async fn update_participant( &self, room: String, identity: String, permission: live_kit_server::proto::ParticipantPermission, ) -> Result<()> { let server = TestServer::get(&self.url)?; server .update_participant(room, identity, permission) .await?; Ok(()) } fn room_token(&self, room: &str, identity: &str) -> Result { let server = TestServer::get(&self.url)?; token::create( &server.api_key, &server.secret_key, Some(identity), token::VideoGrant::to_join(room), ) } fn guest_token(&self, room: &str, identity: &str) -> Result { let server = TestServer::get(&self.url)?; token::create( &server.api_key, &server.secret_key, Some(identity), token::VideoGrant::for_guest(room), ) } } struct RoomState { connection: ( watch::Sender, watch::Receiver, ), display_sources: Vec, paused_audio_tracks: HashSet, updates_tx: async_broadcast::Sender, updates_rx: async_broadcast::Receiver, } pub struct Room(Mutex); impl Room { pub fn new() -> Arc { let (updates_tx, updates_rx) = async_broadcast::broadcast(128); Arc::new(Self(Mutex::new(RoomState { connection: watch::channel_with(ConnectionState::Disconnected), display_sources: Default::default(), paused_audio_tracks: Default::default(), updates_tx, updates_rx, }))) } pub fn status(&self) -> watch::Receiver { self.0.lock().connection.1.clone() } pub fn connect(self: &Arc, url: &str, token: &str) -> impl Future> { let this = self.clone(); let url = url.to_string(); let token = token.to_string(); async move { let server = TestServer::get(&url)?; server .join_room(token.clone(), this.clone()) .await .context("room join")?; *this.0.lock().connection.0.borrow_mut() = ConnectionState::Connected { url, token }; Ok(()) } } pub fn display_sources(self: &Arc) -> impl Future>> { let this = self.clone(); async move { // todo(linux): Remove this once the cross-platform LiveKit implementation is merged #[cfg(any(test, feature = "test-support"))] { let server = this.test_server(); server.executor.simulate_random_delay().await; } Ok(this.0.lock().display_sources.clone()) } } pub fn publish_video_track( self: &Arc, track: LocalVideoTrack, ) -> impl Future> { let this = self.clone(); let track = track.clone(); async move { let sid = this .test_server() .publish_video_track(this.token(), track) .await?; Ok(LocalTrackPublication { room: Arc::downgrade(&this), sid, }) } } pub fn publish_audio_track( self: &Arc, track: LocalAudioTrack, ) -> impl Future> { let this = self.clone(); let track = track.clone(); async move { let sid = this .test_server() .publish_audio_track(this.token(), &track) .await?; Ok(LocalTrackPublication { room: Arc::downgrade(&this), sid, }) } } pub fn unpublish_track(&self, _publication: LocalTrackPublication) {} pub fn remote_audio_tracks(&self, publisher_id: &str) -> Vec> { if !self.is_connected() { return Vec::new(); } self.test_server() .audio_tracks(self.token()) .unwrap() .into_iter() .filter(|track| track.publisher_id() == publisher_id) .collect() } pub fn remote_audio_track_publications( &self, publisher_id: &str, ) -> Vec> { if !self.is_connected() { return Vec::new(); } self.test_server() .audio_tracks(self.token()) .unwrap() .into_iter() .filter(|track| track.publisher_id() == publisher_id) .map(|_track| Arc::new(RemoteTrackPublication {})) .collect() } pub fn remote_video_tracks(&self, publisher_id: &str) -> Vec> { if !self.is_connected() { return Vec::new(); } self.test_server() .video_tracks(self.token()) .unwrap() .into_iter() .filter(|track| track.publisher_id() == publisher_id) .collect() } pub fn updates(&self) -> impl Stream { self.0.lock().updates_rx.clone() } pub fn set_display_sources(&self, sources: Vec) { self.0.lock().display_sources = sources; } fn test_server(&self) -> Arc { match self.0.lock().connection.1.borrow().clone() { ConnectionState::Disconnected => panic!("must be connected to call this method"), ConnectionState::Connected { url, .. } => TestServer::get(&url).unwrap(), } } fn token(&self) -> String { match self.0.lock().connection.1.borrow().clone() { ConnectionState::Disconnected => panic!("must be connected to call this method"), ConnectionState::Connected { token, .. } => token, } } fn is_connected(&self) -> bool { match *self.0.lock().connection.1.borrow() { ConnectionState::Disconnected => false, ConnectionState::Connected { .. } => true, } } } impl Drop for Room { fn drop(&mut self) { if let ConnectionState::Connected { token, .. } = mem::replace( &mut *self.0.lock().connection.0.borrow_mut(), ConnectionState::Disconnected, ) { if let Ok(server) = TestServer::get(&token) { let executor = server.executor.clone(); executor .spawn(async move { server.leave_room(token).await.unwrap() }) .detach(); } } } } #[derive(Clone)] pub struct LocalTrackPublication { sid: String, room: Weak, } impl LocalTrackPublication { pub fn set_mute(&self, mute: bool) -> impl Future> { let sid = self.sid.clone(); let room = self.room.clone(); async move { if let Some(room) = room.upgrade() { room.test_server() .set_track_muted(&room.token(), &sid, mute) } else { Err(anyhow!("no such room")) } } } pub fn is_muted(&self) -> bool { if let Some(room) = self.room.upgrade() { room.test_server() .is_track_muted(&room.token(), &self.sid) .unwrap_or(false) } else { false } } pub fn sid(&self) -> String { self.sid.clone() } } pub struct RemoteTrackPublication; impl RemoteTrackPublication { pub fn set_enabled(&self, _enabled: bool) -> impl Future> { async { Ok(()) } } pub fn is_muted(&self) -> bool { false } pub fn sid(&self) -> String { "".to_string() } } #[derive(Clone)] pub struct LocalVideoTrack { frames_rx: async_broadcast::Receiver, } impl LocalVideoTrack { pub fn screen_share_for_display(display: &MacOSDisplay) -> Self { Self { frames_rx: display.frames.1.clone(), } } } #[derive(Clone)] pub struct LocalAudioTrack; impl LocalAudioTrack { pub fn create() -> Self { Self } } #[derive(Debug)] pub struct RemoteVideoTrack { server_track: Arc, } impl RemoteVideoTrack { pub fn sid(&self) -> &str { &self.server_track.sid } pub fn publisher_id(&self) -> &str { &self.server_track.publisher_id } pub fn frames(&self) -> async_broadcast::Receiver { self.server_track.frames_rx.clone() } } #[derive(Debug)] pub struct RemoteAudioTrack { server_track: Arc, room: Weak, } impl RemoteAudioTrack { pub fn sid(&self) -> &str { &self.server_track.sid } pub fn publisher_id(&self) -> &str { &self.server_track.publisher_id } pub fn start(&self) { if let Some(room) = self.room.upgrade() { room.0 .lock() .paused_audio_tracks .remove(&self.server_track.sid); } } pub fn stop(&self) { if let Some(room) = self.room.upgrade() { room.0 .lock() .paused_audio_tracks .insert(self.server_track.sid.clone()); } } pub fn is_playing(&self) -> bool { !self .room .upgrade() .unwrap() .0 .lock() .paused_audio_tracks .contains(&self.server_track.sid) } } #[derive(Clone)] pub struct MacOSDisplay { frames: ( async_broadcast::Sender, async_broadcast::Receiver, ), } impl MacOSDisplay { pub fn new() -> Self { Self { frames: async_broadcast::broadcast(128), } } pub fn send_frame(&self, frame: Frame) { self.frames.0.try_broadcast(frame).unwrap(); } } #[derive(Clone, Debug, PartialEq, Eq)] pub struct Frame { pub label: String, pub width: usize, pub height: usize, } impl Frame { pub fn width(&self) -> usize { self.width } pub fn height(&self) -> usize { self.height } pub fn image(&self) -> ImageSource { unimplemented!("you can't call this in test mode") } }