diff --git a/crates/call/src/call.rs b/crates/call/src/call.rs index 1e3a381b40..3cd868a438 100644 --- a/crates/call/src/call.rs +++ b/crates/call/src/call.rs @@ -209,80 +209,6 @@ impl ActiveCall { }) } - pub fn join_channel( - &mut self, - channel_id: u64, - cx: &mut ModelContext, - ) -> Task> { - let room = if let Some(room) = self.room().cloned() { - Some(Task::ready(Ok(room)).shared()) - } else { - self.pending_room_creation.clone() - }; - - todo!() - // let invite = if let Some(room) = room { - // cx.spawn_weak(|_, mut cx| async move { - // let room = room.await.map_err(|err| anyhow!("{:?}", err))?; - - // // TODO join_channel: - // // let initial_project_id = if let Some(initial_project) = initial_project { - // // Some( - // // room.update(&mut cx, |room, cx| room.share_project(initial_project, cx)) - // // .await?, - // // ) - // // } else { - // // None - // // }; - - // // room.update(&mut cx, |room, cx| { - // // room.call(called_user_id, initial_project_id, cx) - // // }) - // // .await?; - - // anyhow::Ok(()) - // }) - // } else { - // let client = self.client.clone(); - // let user_store = self.user_store.clone(); - // let room = cx - // .spawn(|this, mut cx| async move { - // let create_room = async { - // let room = cx - // .update(|cx| { - // Room::create_from_channel(channel_id, client, user_store, cx) - // }) - // .await?; - - // this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx)) - // .await?; - - // anyhow::Ok(room) - // }; - - // let room = create_room.await; - // this.update(&mut cx, |this, _| this.pending_room_creation = None); - // room.map_err(Arc::new) - // }) - // .shared(); - // self.pending_room_creation = Some(room.clone()); - // cx.foreground().spawn(async move { - // room.await.map_err(|err| anyhow!("{:?}", err))?; - // anyhow::Ok(()) - // }) - // }; - - // cx.spawn(|this, mut cx| async move { - // let result = invite.await; - // this.update(&mut cx, |this, cx| { - // this.pending_invites.remove(&called_user_id); - // this.report_call_event("invite", cx); - // cx.notify(); - // }); - // result - // }) - } - pub fn cancel_invite( &mut self, called_user_id: u64, @@ -348,6 +274,30 @@ impl ActiveCall { Ok(()) } + pub fn join_channel( + &mut self, + channel_id: u64, + cx: &mut ModelContext, + ) -> Task> { + if let Some(room) = self.room().cloned() { + if room.read(cx).channel_id() == Some(channel_id) { + return Task::ready(Ok(())); + } + } + + let join = Room::join_channel(channel_id, self.client.clone(), self.user_store.clone(), cx); + + cx.spawn(|this, mut cx| async move { + let room = join.await?; + this.update(&mut cx, |this, cx| this.set_room(Some(room.clone()), cx)) + .await?; + this.update(&mut cx, |this, cx| { + this.report_call_event("join channel", cx) + }); + Ok(()) + }) + } + pub fn hang_up(&mut self, cx: &mut ModelContext) -> Task> { cx.notify(); self.report_call_event("hang up", cx); diff --git a/crates/call/src/room.rs b/crates/call/src/room.rs index e77b5437b5..683ff6f4df 100644 --- a/crates/call/src/room.rs +++ b/crates/call/src/room.rs @@ -49,6 +49,7 @@ pub enum Event { pub struct Room { id: u64, + channel_id: Option, live_kit: Option, status: RoomStatus, shared_projects: HashSet>, @@ -93,8 +94,25 @@ impl Entity for Room { } impl Room { + pub fn channel_id(&self) -> Option { + self.channel_id + } + + #[cfg(any(test, feature = "test-support"))] + pub fn is_connected(&self) -> bool { + if let Some(live_kit) = self.live_kit.as_ref() { + matches!( + *live_kit.room.status().borrow(), + live_kit_client::ConnectionState::Connected { .. } + ) + } else { + false + } + } + fn new( id: u64, + channel_id: Option, live_kit_connection_info: Option, client: Arc, user_store: ModelHandle, @@ -185,6 +203,7 @@ impl Room { Self { id, + channel_id, live_kit: live_kit_room, status: RoomStatus::Online, shared_projects: Default::default(), @@ -204,15 +223,6 @@ impl Room { } } - pub(crate) fn create_from_channel( - channel_id: u64, - client: Arc, - user_store: ModelHandle, - cx: &mut AppContext, - ) -> Task>> { - todo!() - } - pub(crate) fn create( called_user_id: u64, initial_project: Option>, @@ -226,6 +236,7 @@ impl Room { let room = cx.add_model(|cx| { Self::new( room_proto.id, + None, response.live_kit_connection_info, client, user_store, @@ -257,6 +268,35 @@ impl Room { }) } + pub(crate) fn join_channel( + channel_id: u64, + client: Arc, + user_store: ModelHandle, + cx: &mut AppContext, + ) -> Task>> { + cx.spawn(|mut cx| async move { + let response = client.request(proto::JoinChannel { channel_id }).await?; + let room_proto = response.room.ok_or_else(|| anyhow!("invalid room"))?; + let room = cx.add_model(|cx| { + Self::new( + room_proto.id, + Some(channel_id), + response.live_kit_connection_info, + client, + user_store, + cx, + ) + }); + + room.update(&mut cx, |room, cx| { + room.apply_room_update(room_proto, cx)?; + anyhow::Ok(()) + })?; + + Ok(room) + }) + } + pub(crate) fn join( call: &IncomingCall, client: Arc, @@ -270,6 +310,7 @@ impl Room { let room = cx.add_model(|cx| { Self::new( room_id, + None, response.live_kit_connection_info, client, user_store, diff --git a/crates/collab/src/db.rs b/crates/collab/src/db.rs index 5f106023f1..f87b68c1ec 100644 --- a/crates/collab/src/db.rs +++ b/crates/collab/src/db.rs @@ -1833,14 +1833,21 @@ impl Database { .await?; let room = self.get_room(room_id, &tx).await?; - if room.participants.is_empty() { - room::Entity::delete_by_id(room_id).exec(&*tx).await?; - } + let deleted = if room.participants.is_empty() { + let result = room::Entity::delete_by_id(room_id) + .filter(room::Column::ChannelId.is_null()) + .exec(&*tx) + .await?; + result.rows_affected > 0 + } else { + false + }; let left_room = LeftRoom { room, left_projects, canceled_calls_to_user_ids, + deleted, }; if left_room.room.participants.is_empty() { @@ -3065,14 +3072,21 @@ impl Database { // channels - pub async fn create_root_channel(&self, name: &str, creator_id: UserId) -> Result { - self.create_channel(name, None, creator_id).await + pub async fn create_root_channel( + &self, + name: &str, + live_kit_room: &str, + creator_id: UserId, + ) -> Result { + self.create_channel(name, None, live_kit_room, creator_id) + .await } pub async fn create_channel( &self, name: &str, parent: Option, + live_kit_room: &str, creator_id: UserId, ) -> Result { self.transaction(move |tx| async move { @@ -3106,7 +3120,7 @@ impl Database { room::ActiveModel { channel_id: ActiveValue::Set(Some(channel.id)), - live_kit_room: ActiveValue::Set(format!("channel-{}", channel.id)), + live_kit_room: ActiveValue::Set(live_kit_room.to_string()), ..Default::default() } .insert(&*tx) @@ -3731,6 +3745,7 @@ pub struct LeftRoom { pub room: proto::Room, pub left_projects: HashMap, pub canceled_calls_to_user_ids: Vec, + pub deleted: bool, } pub struct RefreshedRoom { diff --git a/crates/collab/src/db/tests.rs b/crates/collab/src/db/tests.rs index 7ef2b39640..719e8693d4 100644 --- a/crates/collab/src/db/tests.rs +++ b/crates/collab/src/db/tests.rs @@ -899,19 +899,22 @@ test_both_dbs!(test_channels_postgres, test_channels_sqlite, db, { .unwrap() .user_id; - let zed_id = db.create_root_channel("zed", a_id).await.unwrap(); - let crdb_id = db.create_channel("crdb", Some(zed_id), a_id).await.unwrap(); + let zed_id = db.create_root_channel("zed", "1", a_id).await.unwrap(); + let crdb_id = db + .create_channel("crdb", Some(zed_id), "2", a_id) + .await + .unwrap(); let livestreaming_id = db - .create_channel("livestreaming", Some(zed_id), a_id) + .create_channel("livestreaming", Some(zed_id), "3", a_id) .await .unwrap(); let replace_id = db - .create_channel("replace", Some(zed_id), a_id) + .create_channel("replace", Some(zed_id), "4", a_id) .await .unwrap(); - let rust_id = db.create_root_channel("rust", a_id).await.unwrap(); + let rust_id = db.create_root_channel("rust", "5", a_id).await.unwrap(); let cargo_id = db - .create_channel("cargo", Some(rust_id), a_id) + .create_channel("cargo", Some(rust_id), "6", a_id) .await .unwrap(); @@ -988,7 +991,10 @@ test_both_dbs!( .unwrap() .user_id; - let channel_1 = db.create_root_channel("channel_1", user_1).await.unwrap(); + let channel_1 = db + .create_root_channel("channel_1", "1", user_1) + .await + .unwrap(); let room_1 = db.get_channel_room(channel_1).await.unwrap(); // can join a room with membership to its channel diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 8cf0b7e48c..0abf2c44a7 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -186,7 +186,7 @@ impl Server { server .add_request_handler(ping) - .add_request_handler(create_room_request) + .add_request_handler(create_room) .add_request_handler(join_room) .add_request_handler(rejoin_room) .add_request_handler(leave_room) @@ -859,12 +859,42 @@ async fn ping(_: proto::Ping, response: Response, _session: Session Ok(()) } -async fn create_room_request( +async fn create_room( _request: proto::CreateRoom, response: Response, session: Session, ) -> Result<()> { - let (room, live_kit_connection_info) = create_room(&session).await?; + let live_kit_room = nanoid::nanoid!(30); + + let live_kit_connection_info = { + let live_kit_room = live_kit_room.clone(); + let live_kit = session.live_kit_client.as_ref(); + + util::async_iife!({ + let live_kit = live_kit?; + + live_kit + .create_room(live_kit_room.clone()) + .await + .trace_err()?; + + let token = live_kit + .room_token(&live_kit_room, &session.user_id.to_string()) + .trace_err()?; + + Some(proto::LiveKitConnectionInfo { + server_url: live_kit.url().into(), + token, + }) + }) + } + .await; + + let room = session + .db() + .await + .create_room(session.user_id, session.connection_id, &live_kit_room) + .await?; response.send(proto::CreateRoomResponse { room: Some(room.clone()), @@ -1259,11 +1289,12 @@ async fn update_participant_location( let location = request .location .ok_or_else(|| anyhow!("invalid location"))?; - let room = session - .db() - .await + + let db = session.db().await; + let room = db .update_room_participant_location(room_id, session.connection_id, location) .await?; + room_updated(&room, &session.peer); response.send(proto::Ack {})?; Ok(()) @@ -2067,10 +2098,17 @@ async fn create_channel( session: Session, ) -> Result<()> { let db = session.db().await; + let live_kit_room = format!("channel-{}", nanoid::nanoid!(30)); + + if let Some(live_kit) = session.live_kit_client.as_ref() { + live_kit.create_room(live_kit_room.clone()).await?; + } + let id = db .create_channel( &request.name, request.parent_id.map(|id| ChannelId::from_proto(id)), + &live_kit_room, session.user_id, ) .await?; @@ -2160,21 +2198,39 @@ async fn join_channel( response: Response, session: Session, ) -> Result<()> { - let db = session.db().await; let channel_id = ChannelId::from_proto(request.channel_id); - todo!(); - // db.check_channel_membership(session.user_id, channel_id) - // .await?; + { + let db = session.db().await; + let room_id = db.get_channel_room(channel_id).await?; - let (room, live_kit_connection_info) = create_room(&session).await?; + let room = db + .join_room( + room_id, + session.user_id, + Some(channel_id), + session.connection_id, + ) + .await?; - // db.set_channel_room(channel_id, room.id).await?; + let live_kit_connection_info = session.live_kit_client.as_ref().and_then(|live_kit| { + let token = live_kit + .room_token(&room.live_kit_room, &session.user_id.to_string()) + .trace_err()?; - response.send(proto::CreateRoomResponse { - room: Some(room.clone()), - live_kit_connection_info, - })?; + Some(LiveKitConnectionInfo { + server_url: live_kit.url().into(), + token, + }) + }); + + response.send(proto::JoinRoomResponse { + room: Some(room.clone()), + live_kit_connection_info, + })?; + + room_updated(&room, &session.peer); + } update_user_contacts(session.user_id, &session).await?; @@ -2367,7 +2423,7 @@ async fn leave_room_for_session(session: &Session) -> Result<()> { room_id = RoomId::from_proto(left_room.room.id); canceled_calls_to_user_ids = mem::take(&mut left_room.canceled_calls_to_user_ids); live_kit_room = mem::take(&mut left_room.room.live_kit_room); - delete_live_kit_room = left_room.room.participants.is_empty(); + delete_live_kit_room = left_room.deleted; } else { return Ok(()); } @@ -2435,42 +2491,6 @@ fn project_left(project: &db::LeftProject, session: &Session) { } } -async fn create_room(session: &Session) -> Result<(proto::Room, Option)> { - let live_kit_room = nanoid::nanoid!(30); - - let live_kit_connection_info = { - let live_kit_room = live_kit_room.clone(); - let live_kit = session.live_kit_client.as_ref(); - - util::async_iife!({ - let live_kit = live_kit?; - - live_kit - .create_room(live_kit_room.clone()) - .await - .trace_err()?; - - let token = live_kit - .room_token(&live_kit_room, &session.user_id.to_string()) - .trace_err()?; - - Some(proto::LiveKitConnectionInfo { - server_url: live_kit.url().into(), - token, - }) - }) - } - .await; - - let room = session - .db() - .await - .create_room(session.user_id, session.connection_id, &live_kit_room) - .await?; - - Ok((room, live_kit_connection_info)) -} - pub trait ResultExt { type Ok; diff --git a/crates/collab/src/tests/channel_tests.rs b/crates/collab/src/tests/channel_tests.rs index c86238825c..632bfdca49 100644 --- a/crates/collab/src/tests/channel_tests.rs +++ b/crates/collab/src/tests/channel_tests.rs @@ -108,17 +108,52 @@ async fn test_channel_room( .await .unwrap(); + active_call_b + .update(cx_b, |active_call, cx| active_call.join_channel(zed_id, cx)) + .await + .unwrap(); + deterministic.run_until_parked(); let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + room_a.read_with(cx_a, |room, _| assert!(room.is_connected())); assert_eq!( room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: vec![] + } + ); + + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + room_b.read_with(cx_b, |room, _| assert!(room.is_connected())); + assert_eq!( + room_participants(&room_b, cx_b), RoomParticipants { remote: vec!["user_a".to_string()], pending: vec![] } ); + // Make sure that leaving and rejoining works + + active_call_a + .update(cx_a, |active_call, cx| active_call.hang_up(cx)) + .await + .unwrap(); + + active_call_b + .update(cx_b, |active_call, cx| active_call.hang_up(cx)) + .await + .unwrap(); + + // Make sure room exists? + + active_call_a + .update(cx_a, |active_call, cx| active_call.join_channel(zed_id, cx)) + .await + .unwrap(); + active_call_b .update(cx_b, |active_call, cx| active_call.join_channel(zed_id, cx)) .await @@ -126,42 +161,23 @@ async fn test_channel_room( deterministic.run_until_parked(); - let active_call_b = cx_b.read(ActiveCall::global); + let room_a = active_call_a.read_with(cx_a, |call, _| call.room().unwrap().clone()); + room_a.read_with(cx_a, |room, _| assert!(room.is_connected())); + assert_eq!( + room_participants(&room_a, cx_a), + RoomParticipants { + remote: vec!["user_b".to_string()], + pending: vec![] + } + ); + let room_b = active_call_b.read_with(cx_b, |call, _| call.room().unwrap().clone()); + room_b.read_with(cx_b, |room, _| assert!(room.is_connected())); assert_eq!( room_participants(&room_b, cx_b), RoomParticipants { - remote: vec!["user_a".to_string(), "user_b".to_string()], + remote: vec!["user_a".to_string()], pending: vec![] } ); } - -// TODO: -// Invariants to test: -// 1. Dag structure is maintained for all operations (can't make a cycle) -// 2. Can't be a member of a super channel, and accept a membership of a sub channel (by definition, a noop) - -// #[gpui::test] -// async fn test_block_cycle_creation(deterministic: Arc, cx: &mut TestAppContext) { -// // deterministic.forbid_parking(); -// // let mut server = TestServer::start(&deterministic).await; -// // let client_a = server.create_client(cx, "user_a").await; -// // let a_id = crate::db::UserId(client_a.user_id().unwrap() as i32); -// // let db = server._test_db.db(); - -// // let zed_id = db.create_root_channel("zed", a_id).await.unwrap(); -// // let first_id = db.create_channel("first", Some(zed_id)).await.unwrap(); -// // let second_id = db -// // .create_channel("second_id", Some(first_id)) -// // .await -// // .unwrap(); -// } - -/* -Linear things: -- A way of expressing progress to the team -- A way for us to agree on a scope -- A way to figure out what we're supposed to be doing - -*/ diff --git a/crates/rpc/src/proto.rs b/crates/rpc/src/proto.rs index c3d65343d6..d71ddeed83 100644 --- a/crates/rpc/src/proto.rs +++ b/crates/rpc/src/proto.rs @@ -295,7 +295,7 @@ request_messages!( (RemoveContact, Ack), (RespondToContactRequest, Ack), (RespondToChannelInvite, Ack), - (JoinChannel, CreateRoomResponse), + (JoinChannel, JoinRoomResponse), (RenameProjectEntry, ProjectEntryResponse), (SaveBuffer, BufferSaved), (SearchProject, SearchProjectResponse),