Use fake LiveKit server to test we can send frames when screen sharing

This commit is contained in:
Nathan Sobo 2022-10-19 11:20:31 -06:00
parent b6e5aa3bb0
commit 723fa83909
9 changed files with 334 additions and 188 deletions

15
Cargo.lock generated
View file

@ -181,6 +181,17 @@ dependencies = [
"futures-core",
]
[[package]]
name = "async-broadcast"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d26004fe83b2d1cd3a97609b21e39f9a31535822210fe83205d2ce48866ea61"
dependencies = [
"event-listener",
"futures-core",
"parking_lot 0.12.1",
]
[[package]]
name = "async-channel"
version = "1.7.1"
@ -2991,7 +3002,7 @@ name = "language"
version = "0.1.0"
dependencies = [
"anyhow",
"async-broadcast",
"async-broadcast 0.3.4",
"async-trait",
"client",
"clock",
@ -3164,6 +3175,7 @@ name = "live_kit_client"
version = "0.1.0"
dependencies = [
"anyhow",
"async-broadcast 0.4.1",
"async-trait",
"block",
"byteorder",
@ -3181,6 +3193,7 @@ dependencies = [
"live_kit_server",
"log",
"media",
"nanoid",
"objc",
"parking_lot 0.11.2",
"postage",

View file

@ -2,7 +2,7 @@ use anyhow::{anyhow, Result};
use client::{proto, User};
use collections::HashMap;
use gpui::{Task, WeakModelHandle};
use media::core_video::CVImageBuffer;
use live_kit_client::Frame;
use project::Project;
use std::sync::Arc;
@ -46,13 +46,13 @@ pub struct RemoteParticipant {
#[derive(Clone)]
pub struct RemoteVideoTrack {
pub(crate) frame: Option<CVImageBuffer>,
pub(crate) frame: Option<Frame>,
pub(crate) _live_kit_track: Arc<live_kit_client::RemoteVideoTrack>,
pub(crate) _maintain_frame: Arc<Task<()>>,
}
impl RemoteVideoTrack {
pub fn frame(&self) -> Option<&CVImageBuffer> {
pub fn frame(&self) -> Option<&Frame> {
self.frame.as_ref()
}
}

View file

@ -418,31 +418,33 @@ impl Room {
_live_kit_track: track,
_maintain_frame: Arc::new(cx.spawn_weak(|this, mut cx| async move {
while let Some(frame) = rx.next().await {
let this = if let Some(this) = this.upgrade(&cx) {
this
} else {
break;
};
let done = this.update(&mut cx, |this, cx| {
if let Some(track) =
this.remote_participants.get_mut(&peer_id).and_then(
|participant| participant.tracks.get_mut(&track_id),
)
{
track.frame = frame;
cx.emit(Event::Frame {
participant_id: peer_id,
track_id: track_id.clone(),
});
false
if let Some(frame) = frame {
let this = if let Some(this) = this.upgrade(&cx) {
this
} else {
true
}
});
break;
};
if done {
break;
let done = this.update(&mut cx, |this, cx| {
if let Some(track) =
this.remote_participants.get_mut(&peer_id).and_then(
|participant| participant.tracks.get_mut(&track_id),
)
{
track.frame = Some(frame);
cx.emit(Event::Frame {
participant_id: peer_id,
track_id: track_id.clone(),
});
false
} else {
true
}
});
if done {
break;
}
}
}
})),
@ -620,18 +622,18 @@ impl Room {
return Task::ready(Err(anyhow!("screen was already shared")));
}
let publish_id = if let Some(live_kit) = self.live_kit.as_mut() {
let (displays, publish_id) = if let Some(live_kit) = self.live_kit.as_mut() {
let publish_id = post_inc(&mut live_kit.next_publish_id);
live_kit.screen_track = ScreenTrack::Pending { publish_id };
cx.notify();
publish_id
(live_kit.room.display_sources(), publish_id)
} else {
return Task::ready(Err(anyhow!("live-kit was not initialized")));
};
cx.spawn_weak(|this, mut cx| async move {
let publish_track = async {
let displays = live_kit_client::display_sources().await?;
let displays = displays.await?;
let display = displays
.first()
.ok_or_else(|| anyhow!("no display found"))?;
@ -711,6 +713,15 @@ impl Room {
}
}
}
#[cfg(any(test, feature = "test-support"))]
pub fn set_display_sources(&self, sources: Vec<live_kit_client::MacOSDisplay>) {
self.live_kit
.as_ref()
.unwrap()
.room
.set_display_sources(sources);
}
}
struct LiveKitRoom {

View file

@ -5,7 +5,10 @@ use crate::{
};
use ::rpc::Peer;
use anyhow::anyhow;
use call::{room, ActiveCall, ParticipantLocation, Room};
use call::{
room::{self, Event},
ActiveCall, ParticipantLocation, Room,
};
use client::{
self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Connection,
Credentials, EstablishConnectionError, PeerId, User, UserStore, RECEIVE_TIMEOUT,
@ -30,6 +33,7 @@ use language::{
range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language,
LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope,
};
use live_kit_client::{Frame, MacOSDisplay};
use lsp::{self, FakeLanguageServer};
use parking_lot::Mutex;
use project::{
@ -185,6 +189,45 @@ async fn test_basic_calls(
}
);
// User A shares their screen
let display = MacOSDisplay::new();
let events_b = active_call_events(cx_b);
active_call_a
.update(cx_a, |call, cx| {
call.room().unwrap().update(cx, |room, cx| {
room.set_display_sources(vec![display.clone()]);
room.share_screen(cx)
})
})
.await
.unwrap();
let frame = Frame {
width: 800,
height: 600,
label: "a".into(),
};
display.send_frame(frame.clone());
deterministic.run_until_parked();
assert_eq!(events_b.borrow().len(), 1);
let event = events_b.borrow().first().unwrap().clone();
if let Event::Frame {
participant_id,
track_id,
} = event
{
assert_eq!(participant_id, client_a.peer_id().unwrap());
room_b.read_with(cx_b, |room, _| {
assert_eq!(
room.remote_participants()[&client_a.peer_id().unwrap()].tracks[&track_id].frame(),
Some(&frame)
);
});
} else {
panic!("unexpected event")
}
// User A leaves the room.
active_call_a.update(cx_a, |call, cx| {
call.hang_up(cx).unwrap();
@ -954,21 +997,21 @@ async fn test_active_call_events(
deterministic.run_until_parked();
assert_eq!(mem::take(&mut *events_a.borrow_mut()), vec![]);
assert_eq!(mem::take(&mut *events_b.borrow_mut()), vec![]);
}
fn active_call_events(cx: &mut TestAppContext) -> Rc<RefCell<Vec<room::Event>>> {
let events = Rc::new(RefCell::new(Vec::new()));
let active_call = cx.read(ActiveCall::global);
cx.update({
let events = events.clone();
|cx| {
cx.subscribe(&active_call, move |_, event, _| {
events.borrow_mut().push(event.clone())
})
.detach()
}
});
events
}
fn active_call_events(cx: &mut TestAppContext) -> Rc<RefCell<Vec<room::Event>>> {
let events = Rc::new(RefCell::new(Vec::new()));
let active_call = cx.read(ActiveCall::global);
cx.update({
let events = events.clone();
|cx| {
cx.subscribe(&active_call, move |_, event, _| {
events.borrow_mut().push(event.clone())
})
.detach()
}
});
events
}
#[gpui::test(iterations = 10)]

View file

@ -13,11 +13,13 @@ name = "test_app"
[features]
test-support = [
"async-broadcast",
"async-trait",
"collections/test-support",
"gpui/test-support",
"lazy_static",
"live_kit_server"
"live_kit_server",
"nanoid",
]
[dependencies]
@ -27,13 +29,16 @@ live_kit_server = { path = "../live_kit_server", optional = true }
media = { path = "../media" }
anyhow = "1.0.38"
async-trait = { version = "0.1", optional = true }
core-foundation = "0.9.3"
core-graphics = "0.22.3"
futures = "0.3"
lazy_static = { version = "1.4", optional = true }
parking_lot = "0.11.1"
async-broadcast = { version = "0.4", optional = true }
async-trait = { version = "0.1", optional = true }
lazy_static = { version = "1.4", optional = true }
nanoid = { version ="0.4", optional = true}
[dev-dependencies]
collections = { path = "../collections", features = ["test-support"] }
gpui = { path = "../gpui", features = ["test-support"] }

View file

@ -1,21 +1,9 @@
use futures::StreamExt;
use gpui::{
actions,
elements::{Canvas, *},
keymap::Binding,
platform::current::Surface,
Menu, MenuItem, ViewContext,
};
use gpui::{actions, keymap::Binding, Menu, MenuItem};
use live_kit_client::{LocalVideoTrack, RemoteVideoTrackUpdate, Room};
use live_kit_server::{
api::Client,
token::{self, VideoGrant},
};
use live_kit_server::token::{self, VideoGrant};
use log::LevelFilter;
use media::core_video::CVImageBuffer;
use postage::watch;
use simplelog::SimpleLogger;
use std::sync::Arc;
actions!(capture, [Quit]);
@ -62,7 +50,7 @@ fn main() {
let mut track_changes = room_b.remote_video_track_updates();
let displays = live_kit_client::display_sources().await.unwrap();
let displays = room_a.display_sources().await.unwrap();
let display = displays.into_iter().next().unwrap();
let track_a = LocalVideoTrack::screen_share_for_display(&display);
@ -72,6 +60,7 @@ fn main() {
let remote_tracks = room_b.remote_video_tracks("test-participant-1");
assert_eq!(remote_tracks.len(), 1);
assert_eq!(remote_tracks[0].publisher_id(), "test-participant-1");
dbg!(track.sid());
assert_eq!(track.publisher_id(), "test-participant-1");
} else {
panic!("unexpected message");
@ -100,73 +89,6 @@ fn main() {
});
}
struct ScreenCaptureView {
image_buffer: Option<CVImageBuffer>,
_room: Arc<Room>,
}
impl gpui::Entity for ScreenCaptureView {
type Event = ();
}
impl ScreenCaptureView {
pub fn new(room: Arc<Room>, cx: &mut ViewContext<Self>) -> Self {
let mut remote_video_tracks = room.remote_video_track_updates();
cx.spawn_weak(|this, mut cx| async move {
if let Some(video_track) = remote_video_tracks.next().await {
let (mut frames_tx, mut frames_rx) = watch::channel_with(None);
// video_track.add_renderer(move |frame| *frames_tx.borrow_mut() = Some(frame));
while let Some(frame) = frames_rx.next().await {
if let Some(this) = this.upgrade(&cx) {
this.update(&mut cx, |this, cx| {
this.image_buffer = frame;
cx.notify();
});
} else {
break;
}
}
}
})
.detach();
Self {
image_buffer: None,
_room: room,
}
}
}
impl gpui::View for ScreenCaptureView {
fn ui_name() -> &'static str {
"View"
}
fn render(&mut self, _: &mut gpui::RenderContext<Self>) -> gpui::ElementBox {
let image_buffer = self.image_buffer.clone();
let canvas = Canvas::new(move |bounds, _, cx| {
if let Some(image_buffer) = image_buffer.clone() {
cx.scene.push_surface(Surface {
bounds,
image_buffer,
});
}
});
if let Some(image_buffer) = self.image_buffer.as_ref() {
canvas
.constrained()
.with_width(image_buffer.width() as f32)
.with_height(image_buffer.height() as f32)
.aligned()
.boxed()
} else {
canvas.boxed()
}
}
}
fn quit(_: &Quit, cx: &mut gpui::MutableAppContext) {
cx.platform().quit();
}

View file

@ -8,7 +8,8 @@ use futures::{
channel::{mpsc, oneshot},
Future,
};
use media::core_video::{CVImageBuffer, CVImageBufferRef};
pub use media::core_video::CVImageBuffer;
use media::core_video::CVImageBufferRef;
use parking_lot::Mutex;
use std::{
ffi::c_void,
@ -109,8 +110,35 @@ impl Room {
async { rx.await.unwrap().context("error connecting to room") }
}
pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
unsafe {
let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
if sources.is_null() {
let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
} else {
let sources = CFArray::wrap_under_get_rule(sources)
.into_iter()
.map(|source| MacOSDisplay::new(*source))
.collect();
let _ = tx.send(Ok(sources));
}
}
}
let (tx, rx) = oneshot::channel();
unsafe {
LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
}
async move { rx.await.unwrap() }
}
pub fn publish_video_track(
&self,
self: &Arc<Self>,
track: &LocalVideoTrack,
) -> impl Future<Output = Result<LocalTrackPublication>> {
let (tx, rx) = oneshot::channel::<Result<LocalTrackPublication>>();
@ -338,16 +366,16 @@ impl RemoteVideoTrack {
pub fn add_renderer<F>(&self, callback: F)
where
F: 'static + FnMut(CVImageBuffer),
F: 'static + Send + Sync + FnMut(Frame),
{
extern "C" fn on_frame<F>(callback_data: *mut c_void, frame: CVImageBufferRef)
where
F: FnMut(CVImageBuffer),
F: FnMut(Frame),
{
unsafe {
let buffer = CVImageBuffer::wrap_under_get_rule(frame);
let callback = &mut *(callback_data as *mut F);
callback(buffer);
callback(Frame(buffer));
}
}
@ -394,29 +422,18 @@ impl Drop for MacOSDisplay {
}
}
pub fn display_sources() -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
extern "C" fn callback(tx: *mut c_void, sources: CFArrayRef, error: CFStringRef) {
unsafe {
let tx = Box::from_raw(tx as *mut oneshot::Sender<Result<Vec<MacOSDisplay>>>);
pub struct Frame(CVImageBuffer);
if sources.is_null() {
let _ = tx.send(Err(anyhow!("{}", CFString::wrap_under_get_rule(error))));
} else {
let sources = CFArray::wrap_under_get_rule(sources)
.into_iter()
.map(|source| MacOSDisplay::new(*source))
.collect();
let _ = tx.send(Ok(sources));
}
}
impl Frame {
pub fn width(&self) -> usize {
self.0.width()
}
let (tx, rx) = oneshot::channel();
unsafe {
LKDisplaySources(Box::into_raw(Box::new(tx)) as *mut _, callback);
pub fn height(&self) -> usize {
self.0.height()
}
async move { rx.await.unwrap() }
pub fn image(&self) -> CVImageBuffer {
self.0.clone()
}
}

View file

@ -1,8 +1,8 @@
use anyhow::{anyhow, Result};
use async_trait::async_trait;
use collections::HashMap;
use futures::{channel::mpsc, future};
use gpui::executor::Background;
use futures::{Stream, StreamExt};
use gpui::executor::{self, Background};
use lazy_static::lazy_static;
use live_kit_server::token;
use media::core_video::CVImageBuffer;
@ -96,14 +96,14 @@ impl TestServer {
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {:?} does not exist", room_name))?;
if room.clients.contains_key(&identity) {
if room.client_rooms.contains_key(&identity) {
Err(anyhow!(
"{:?} attempted to join room {:?} twice",
identity,
room_name
))
} else {
room.clients.insert(identity, client_room);
room.client_rooms.insert(identity, client_room);
Ok(())
}
}
@ -117,7 +117,7 @@ impl TestServer {
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
room.clients.remove(&identity).ok_or_else(|| {
room.client_rooms.remove(&identity).ok_or_else(|| {
anyhow!(
"{:?} attempted to leave room {:?} before joining it",
identity,
@ -135,7 +135,7 @@ impl TestServer {
let room = server_rooms
.get_mut(&room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
room.clients.remove(&identity).ok_or_else(|| {
room.client_rooms.remove(&identity).ok_or_else(|| {
anyhow!(
"participant {:?} did not join room {:?}",
identity,
@ -144,11 +144,44 @@ impl TestServer {
})?;
Ok(())
}
async fn publish_video_track(&self, token: String, local_track: LocalVideoTrack) -> Result<()> {
self.background.simulate_random_delay().await;
let claims = live_kit_server::token::validate(&token, &self.secret_key)?;
let identity = claims.sub.unwrap().to_string();
let room_name = claims.video.room.unwrap();
let mut server_rooms = self.rooms.lock();
let room = server_rooms
.get_mut(&*room_name)
.ok_or_else(|| anyhow!("room {} does not exist", room_name))?;
let update = RemoteVideoTrackUpdate::Subscribed(Arc::new(RemoteVideoTrack {
sid: nanoid::nanoid!(17),
publisher_id: identity.clone(),
frames_rx: local_track.frames_rx.clone(),
background: self.background.clone(),
}));
for (id, client_room) in &room.client_rooms {
if *id != identity {
let _ = client_room
.0
.lock()
.video_track_updates
.0
.try_broadcast(update.clone())
.unwrap();
}
}
Ok(())
}
}
#[derive(Default)]
struct TestServerRoom {
clients: HashMap<Sid, Arc<Room>>,
client_rooms: HashMap<Sid, Arc<Room>>,
}
impl TestServerRoom {}
@ -194,17 +227,29 @@ impl live_kit_server::api::Client for TestApiClient {
pub type Sid = String;
#[derive(Default)]
struct RoomState {
token: Option<String>,
connection: Option<ConnectionState>,
display_sources: Vec<MacOSDisplay>,
video_track_updates: (
async_broadcast::Sender<RemoteVideoTrackUpdate>,
async_broadcast::Receiver<RemoteVideoTrackUpdate>,
),
}
struct ConnectionState {
url: String,
token: String,
}
#[derive(Default)]
pub struct Room(Mutex<RoomState>);
impl Room {
pub fn new() -> Arc<Self> {
Default::default()
Arc::new(Self(Mutex::new(RoomState {
connection: None,
display_sources: Default::default(),
video_track_updates: async_broadcast::broadcast(128),
})))
}
pub fn connect(self: &Arc<Self>, url: &str, token: &str) -> impl Future<Output = Result<()>> {
@ -214,36 +259,75 @@ impl Room {
async move {
let server = TestServer::get(&url)?;
server.join_room(token.clone(), this.clone()).await?;
this.0.lock().token = Some(token);
this.0.lock().connection = Some(ConnectionState { url, token });
Ok(())
}
}
pub fn publish_video_track(
&self,
track: &LocalVideoTrack,
) -> impl Future<Output = Result<LocalTrackPublication>> {
future::pending()
pub fn display_sources(self: &Arc<Self>) -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
let this = self.clone();
async move {
let server = this.test_server();
server.background.simulate_random_delay().await;
Ok(this.0.lock().display_sources.clone())
}
}
pub fn unpublish_track(&self, publication: LocalTrackPublication) {}
pub fn publish_video_track(
self: &Arc<Self>,
track: &LocalVideoTrack,
) -> impl Future<Output = Result<LocalTrackPublication>> {
let this = self.clone();
let track = track.clone();
async move {
this.test_server()
.publish_video_track(this.token(), track)
.await?;
Ok(LocalTrackPublication)
}
}
pub fn remote_video_tracks(&self, participant_id: &str) -> Vec<Arc<RemoteVideoTrack>> {
pub fn unpublish_track(&self, _: LocalTrackPublication) {}
pub fn remote_video_tracks(&self, _: &str) -> Vec<Arc<RemoteVideoTrack>> {
Default::default()
}
pub fn remote_video_track_updates(&self) -> mpsc::UnboundedReceiver<RemoteVideoTrackUpdate> {
mpsc::unbounded().1
pub fn remote_video_track_updates(&self) -> impl Stream<Item = RemoteVideoTrackUpdate> {
self.0.lock().video_track_updates.1.clone()
}
pub fn set_display_sources(&self, sources: Vec<MacOSDisplay>) {
self.0.lock().display_sources = sources;
}
fn test_server(&self) -> Arc<TestServer> {
let this = self.0.lock();
let connection = this
.connection
.as_ref()
.expect("must be connected to call this method");
TestServer::get(&connection.url).unwrap()
}
fn token(&self) -> String {
self.0
.lock()
.connection
.as_ref()
.expect("must be connected to call this method")
.token
.clone()
}
}
impl Drop for Room {
fn drop(&mut self) {
if let Some(token) = self.0.lock().token.take() {
if let Ok(server) = TestServer::get(&token) {
if let Some(connection) = self.0.lock().connection.take() {
if let Ok(server) = TestServer::get(&connection.token) {
let background = server.background.clone();
background
.spawn(async move { server.leave_room(token).await.unwrap() })
.spawn(async move { server.leave_room(connection.token).await.unwrap() })
.detach();
}
}
@ -252,17 +336,24 @@ impl Drop for Room {
pub struct LocalTrackPublication;
pub struct LocalVideoTrack;
#[derive(Clone)]
pub struct LocalVideoTrack {
frames_rx: async_broadcast::Receiver<Frame>,
}
impl LocalVideoTrack {
pub fn screen_share_for_display(display: &MacOSDisplay) -> Self {
Self
Self {
frames_rx: display.frames.1.clone(),
}
}
}
pub struct RemoteVideoTrack {
sid: Sid,
publisher_id: Sid,
frames_rx: async_broadcast::Receiver<Frame>,
background: Arc<executor::Background>,
}
impl RemoteVideoTrack {
@ -274,20 +365,64 @@ impl RemoteVideoTrack {
&self.publisher_id
}
pub fn add_renderer<F>(&self, callback: F)
pub fn add_renderer<F>(&self, mut callback: F)
where
F: 'static + FnMut(CVImageBuffer),
F: 'static + Send + Sync + FnMut(Frame),
{
let mut frames_rx = self.frames_rx.clone();
self.background
.spawn(async move {
while let Some(frame) = frames_rx.next().await {
callback(frame)
}
})
.detach();
}
}
#[derive(Clone)]
pub enum RemoteVideoTrackUpdate {
Subscribed(Arc<RemoteVideoTrack>),
Unsubscribed { publisher_id: Sid, track_id: Sid },
}
pub struct MacOSDisplay;
pub fn display_sources() -> impl Future<Output = Result<Vec<MacOSDisplay>>> {
future::pending()
#[derive(Clone)]
pub struct MacOSDisplay {
frames: (
async_broadcast::Sender<Frame>,
async_broadcast::Receiver<Frame>,
),
}
impl MacOSDisplay {
pub fn new() -> Self {
Self {
frames: async_broadcast::broadcast(128),
}
}
pub fn send_frame(&self, frame: Frame) {
self.frames.0.try_broadcast(frame).unwrap();
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct Frame {
pub label: String,
pub width: usize,
pub height: usize,
}
impl Frame {
pub fn width(&self) -> usize {
self.width
}
pub fn height(&self) -> usize {
self.height
}
pub fn image(&self) -> CVImageBuffer {
unimplemented!("you can't call this in test mode")
}
}

View file

@ -159,7 +159,7 @@ impl Member {
let origin = bounds.origin() + (bounds.size() / 2.) - size / 2.;
cx.scene.push_surface(gpui::mac::Surface {
bounds: RectF::new(origin, size),
image_buffer: frame,
image_buffer: frame.image(),
});
}
})