diff --git a/crates/channel/src/channel_store.rs b/crates/channel/src/channel_store.rs index 3d2f61d61f..a4c8da6df4 100644 --- a/crates/channel/src/channel_store.rs +++ b/crates/channel/src/channel_store.rs @@ -3,7 +3,7 @@ use anyhow::{anyhow, Result}; use client::{Client, Subscription, User, UserId, UserStore}; use collections::{hash_map, HashMap, HashSet}; use futures::{channel::mpsc, future::Shared, Future, FutureExt, StreamExt}; -use gpui::{AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle}; +use gpui::{AppContext, AsyncAppContext, Entity, ModelContext, ModelHandle, Task, WeakModelHandle}; use rpc::{proto, TypedEnvelope}; use std::{mem, sync::Arc, time::Duration}; use util::ResultExt; @@ -152,6 +152,15 @@ impl ChannelStore { self.channels_by_id.get(&channel_id) } + pub fn has_open_channel_buffer(&self, channel_id: ChannelId, cx: &AppContext) -> bool { + if let Some(buffer) = self.opened_buffers.get(&channel_id) { + if let OpenedChannelBuffer::Open(buffer) = buffer { + return buffer.upgrade(cx).is_some(); + } + } + false + } + pub fn open_channel_buffer( &mut self, channel_id: ChannelId, diff --git a/crates/collab/src/db/queries/channels.rs b/crates/collab/src/db/queries/channels.rs index e3d3643a61..5da4dd1464 100644 --- a/crates/collab/src/db/queries/channels.rs +++ b/crates/collab/src/db/queries/channels.rs @@ -1,6 +1,20 @@ use super::*; impl Database { + #[cfg(test)] + pub async fn all_channels(&self) -> Result> { + self.transaction(move |tx| async move { + let mut channels = Vec::new(); + let mut rows = channel::Entity::find().stream(&*tx).await?; + while let Some(row) = rows.next().await { + let row = row?; + channels.push((row.id, row.name)); + } + Ok(channels) + }) + .await + } + pub async fn create_root_channel( &self, name: &str, diff --git a/crates/collab/src/tests/random_channel_buffer_tests.rs b/crates/collab/src/tests/random_channel_buffer_tests.rs index 929e567977..933683eaa6 100644 --- a/crates/collab/src/tests/random_channel_buffer_tests.rs +++ b/crates/collab/src/tests/random_channel_buffer_tests.rs @@ -1,12 +1,16 @@ -use crate::tests::{run_randomized_test, RandomizedTest, TestClient, TestError, UserTestPlan}; +use super::{run_randomized_test, RandomizedTest, TestClient, TestError, TestServer, UserTestPlan}; use anyhow::Result; use async_trait::async_trait; use gpui::{executor::Deterministic, TestAppContext}; -use rand::rngs::StdRng; +use rand::prelude::*; use serde_derive::{Deserialize, Serialize}; -use std::{rc::Rc, sync::Arc}; +use std::{ops::Range, rc::Rc, sync::Arc}; +use text::Bias; -#[gpui::test] +#[gpui::test( + iterations = 100, + on_failure = "crate::tests::save_randomized_test_plan" +)] async fn test_random_channel_buffers( cx: &mut TestAppContext, deterministic: Arc, @@ -19,20 +23,105 @@ struct RandomChannelBufferTest; #[derive(Clone, Serialize, Deserialize)] enum ChannelBufferOperation { - Join, + JoinChannelNotes { + channel_name: String, + }, + LeaveChannelNotes { + channel_name: String, + }, + EditChannelNotes { + channel_name: String, + edits: Vec<(Range, Arc)>, + }, + Noop, } +const CHANNEL_COUNT: usize = 3; + #[async_trait(?Send)] impl RandomizedTest for RandomChannelBufferTest { type Operation = ChannelBufferOperation; + async fn initialize(server: &mut TestServer, users: &[UserTestPlan]) { + let db = &server.app_state.db; + for ix in 0..CHANNEL_COUNT { + let id = db + .create_channel( + &format!("channel-{ix}"), + None, + &format!("livekit-room-{ix}"), + users[0].user_id, + ) + .await + .unwrap(); + for user in &users[1..] { + db.invite_channel_member(id, user.user_id, users[0].user_id, false) + .await + .unwrap(); + db.respond_to_channel_invite(id, user.user_id, true) + .await + .unwrap(); + } + } + } + fn generate_operation( client: &TestClient, rng: &mut StdRng, - plan: &mut UserTestPlan, + _: &mut UserTestPlan, cx: &TestAppContext, ) -> ChannelBufferOperation { - ChannelBufferOperation::Join + let channel_store = client.channel_store().clone(); + let channel_buffers = client.channel_buffers(); + + // When signed out, we can't do anything unless a channel buffer is + // already open. + if channel_buffers.is_empty() + && channel_store.read_with(cx, |store, _| store.channel_count() == 0) + { + return ChannelBufferOperation::Noop; + } + + loop { + match rng.gen_range(0..100_u32) { + 0..=29 => { + let channel_name = client.channel_store().read_with(cx, |store, cx| { + store.channels().find_map(|(_, channel)| { + if store.has_open_channel_buffer(channel.id, cx) { + None + } else { + Some(channel.name.clone()) + } + }) + }); + if let Some(channel_name) = channel_name { + break ChannelBufferOperation::JoinChannelNotes { channel_name }; + } + } + + 30..=40 => { + if let Some(buffer) = channel_buffers.iter().choose(rng) { + let channel_name = buffer.read_with(cx, |b, _| b.channel().name.clone()); + break ChannelBufferOperation::LeaveChannelNotes { channel_name }; + } + } + + _ => { + if let Some(buffer) = channel_buffers.iter().choose(rng) { + break buffer.read_with(cx, |b, _| { + let channel_name = b.channel().name.clone(); + let edits = b + .buffer() + .read_with(cx, |buffer, _| buffer.get_random_edits(rng, 3)); + ChannelBufferOperation::EditChannelNotes { + channel_name, + edits, + } + }); + } + } + } + } } async fn apply_operation( @@ -40,10 +129,140 @@ impl RandomizedTest for RandomChannelBufferTest { operation: ChannelBufferOperation, cx: &mut TestAppContext, ) -> Result<(), TestError> { + match operation { + ChannelBufferOperation::JoinChannelNotes { channel_name } => { + let buffer = client.channel_store().update(cx, |store, cx| { + let channel_id = store + .channels() + .find(|(_, c)| c.name == channel_name) + .unwrap() + .1 + .id; + if store.has_open_channel_buffer(channel_id, cx) { + Err(TestError::Inapplicable) + } else { + Ok(store.open_channel_buffer(channel_id, cx)) + } + })?; + + log::info!( + "{}: opening notes for channel {channel_name}", + client.username + ); + client.channel_buffers().insert(buffer.await?); + } + + ChannelBufferOperation::LeaveChannelNotes { channel_name } => { + let buffer = cx.update(|cx| { + let mut left_buffer = Err(TestError::Inapplicable); + client.channel_buffers().retain(|buffer| { + if buffer.read(cx).channel().name == channel_name { + left_buffer = Ok(buffer.clone()); + false + } else { + true + } + }); + left_buffer + })?; + + log::info!( + "{}: closing notes for channel {channel_name}", + client.username + ); + cx.update(|_| drop(buffer)); + } + + ChannelBufferOperation::EditChannelNotes { + channel_name, + edits, + } => { + let channel_buffer = cx + .read(|cx| { + client + .channel_buffers() + .iter() + .find(|buffer| buffer.read(cx).channel().name == channel_name) + .cloned() + }) + .ok_or_else(|| TestError::Inapplicable)?; + + log::info!( + "{}: editing notes for channel {channel_name} with {:?}", + client.username, + edits + ); + + channel_buffer.update(cx, |buffer, cx| { + let buffer = buffer.buffer(); + buffer.update(cx, |buffer, cx| { + let snapshot = buffer.snapshot(); + buffer.edit( + edits.into_iter().map(|(range, text)| { + let start = snapshot.clip_offset(range.start, Bias::Left); + let end = snapshot.clip_offset(range.end, Bias::Right); + (start..end, text) + }), + None, + cx, + ); + }); + }); + } + + ChannelBufferOperation::Noop => Err(TestError::Inapplicable)?, + } Ok(()) } - async fn on_client_added(client: &Rc) {} + async fn on_client_added(client: &Rc, cx: &mut TestAppContext) { + let channel_store = client.channel_store(); + while channel_store.read_with(cx, |store, _| store.channel_count() == 0) { + channel_store.next_notification(cx).await; + } + } - fn on_clients_quiesced(clients: &[(Rc, TestAppContext)]) {} + async fn on_quiesce(server: &mut TestServer, clients: &mut [(Rc, TestAppContext)]) { + let channels = server.app_state.db.all_channels().await.unwrap(); + + for (channel_id, channel_name) in channels { + let mut collaborator_user_ids = server + .app_state + .db + .get_channel_buffer_collaborators(channel_id) + .await + .unwrap() + .into_iter() + .map(|id| id.to_proto()) + .collect::>(); + collaborator_user_ids.sort(); + + for (client, client_cx) in clients.iter_mut() { + client_cx.update(|cx| { + client + .channel_buffers() + .retain(|b| b.read(cx).is_connected()); + + if let Some(channel_buffer) = client + .channel_buffers() + .iter() + .find(|b| b.read(cx).channel().id == channel_id.to_proto()) + { + let channel_buffer = channel_buffer.read(cx); + let collaborators = channel_buffer.collaborators(); + let mut user_ids = + collaborators.iter().map(|c| c.user_id).collect::>(); + user_ids.sort(); + assert_eq!( + user_ids, + collaborator_user_ids, + "client {} has different user ids for channel {} than the server", + client.user_id().unwrap(), + channel_name + ); + } + }); + } + } + } } diff --git a/crates/collab/src/tests/random_project_collaboration_tests.rs b/crates/collab/src/tests/random_project_collaboration_tests.rs index 242cfbc162..7570768249 100644 --- a/crates/collab/src/tests/random_project_collaboration_tests.rs +++ b/crates/collab/src/tests/random_project_collaboration_tests.rs @@ -1,7 +1,5 @@ -use crate::{ - db::UserId, - tests::{run_randomized_test, RandomizedTest, TestClient, TestError, UserTestPlan}, -}; +use super::{run_randomized_test, RandomizedTest, TestClient, TestError, TestServer, UserTestPlan}; +use crate::db::UserId; use anyhow::{anyhow, Result}; use async_trait::async_trait; use call::ActiveCall; @@ -145,6 +143,20 @@ struct ProjectCollaborationTest; impl RandomizedTest for ProjectCollaborationTest { type Operation = ClientOperation; + async fn initialize(server: &mut TestServer, users: &[UserTestPlan]) { + let db = &server.app_state.db; + for (ix, user_a) in users.iter().enumerate() { + for user_b in &users[ix + 1..] { + db.send_contact_request(user_a.user_id, user_b.user_id) + .await + .unwrap(); + db.respond_to_contact_request(user_b.user_id, user_a.user_id, true) + .await + .unwrap(); + } + } + } + fn generate_operation( client: &TestClient, rng: &mut StdRng, @@ -1005,7 +1017,7 @@ impl RandomizedTest for ProjectCollaborationTest { Ok(()) } - async fn on_client_added(client: &Rc) { + async fn on_client_added(client: &Rc, _: &mut TestAppContext) { let mut language = Language::new( LanguageConfig { name: "Rust".into(), @@ -1119,8 +1131,8 @@ impl RandomizedTest for ProjectCollaborationTest { client.app_state.languages.add(Arc::new(language)); } - fn on_clients_quiesced(clients: &[(Rc, TestAppContext)]) { - for (client, client_cx) in clients { + async fn on_quiesce(_: &mut TestServer, clients: &mut [(Rc, TestAppContext)]) { + for (client, client_cx) in clients.iter() { for guest_project in client.remote_projects().iter() { guest_project.read_with(client_cx, |guest_project, cx| { let host_project = clients.iter().find_map(|(client, cx)| { diff --git a/crates/collab/src/tests/randomized_test_helpers.rs b/crates/collab/src/tests/randomized_test_helpers.rs index dc102b75c6..39598bdaf9 100644 --- a/crates/collab/src/tests/randomized_test_helpers.rs +++ b/crates/collab/src/tests/randomized_test_helpers.rs @@ -1,5 +1,5 @@ use crate::{ - db::{self, Database, NewUserParams, UserId}, + db::{self, NewUserParams, UserId}, rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT}, tests::{TestClient, TestServer}, }; @@ -107,15 +107,17 @@ pub trait RandomizedTest: 'static + Sized { cx: &TestAppContext, ) -> Self::Operation; - async fn on_client_added(client: &Rc); - - fn on_clients_quiesced(client: &[(Rc, TestAppContext)]); - async fn apply_operation( client: &TestClient, operation: Self::Operation, cx: &mut TestAppContext, ) -> Result<(), TestError>; + + async fn initialize(server: &mut TestServer, users: &[UserTestPlan]); + + async fn on_client_added(client: &Rc, cx: &mut TestAppContext); + + async fn on_quiesce(server: &mut TestServer, client: &mut [(Rc, TestAppContext)]); } pub async fn run_randomized_test( @@ -125,7 +127,7 @@ pub async fn run_randomized_test( ) { deterministic.forbid_parking(); let mut server = TestServer::start(&deterministic).await; - let plan = TestPlan::::new(server.app_state.db.clone(), rng).await; + let plan = TestPlan::::new(&mut server, rng).await; LAST_PLAN.lock().replace({ let plan = plan.clone(); @@ -162,7 +164,7 @@ pub async fn run_randomized_test( deterministic.finish_waiting(); deterministic.run_until_parked(); - T::on_clients_quiesced(&clients); + T::on_quiesce(&mut server, &mut clients).await; for (client, mut cx) in clients { cx.update(|cx| { @@ -190,7 +192,7 @@ pub fn save_randomized_test_plan() { } impl TestPlan { - pub async fn new(db: Arc, mut rng: StdRng) -> Arc> { + pub async fn new(server: &mut TestServer, mut rng: StdRng) -> Arc> { let allow_server_restarts = rng.gen_bool(0.7); let allow_client_reconnection = rng.gen_bool(0.7); let allow_client_disconnection = rng.gen_bool(0.1); @@ -198,7 +200,9 @@ impl TestPlan { let mut users = Vec::new(); for ix in 0..*MAX_PEERS { let username = format!("user-{}", ix + 1); - let user_id = db + let user_id = server + .app_state + .db .create_user( &format!("{username}@example.com"), false, @@ -222,16 +226,7 @@ impl TestPlan { }); } - for (ix, user_a) in users.iter().enumerate() { - for user_b in &users[ix + 1..] { - db.send_contact_request(user_a.user_id, user_b.user_id) - .await - .unwrap(); - db.respond_to_contact_request(user_b.user_id, user_a.user_id, true) - .await - .unwrap(); - } - } + T::initialize(server, &users).await; let plan = Arc::new(Mutex::new(Self { replay: false, @@ -619,7 +614,7 @@ impl TestPlan { if quiesce && applied { deterministic.run_until_parked(); - T::on_clients_quiesced(&clients); + T::on_quiesce(server, clients).await; } return applied; @@ -634,7 +629,7 @@ impl TestPlan { mut operation_rx: futures::channel::mpsc::UnboundedReceiver, mut cx: TestAppContext, ) { - T::on_client_added(&client).await; + T::on_client_added(&client, &mut cx).await; while let Some(batch_id) = operation_rx.next().await { let Some((operation, applied)) = diff --git a/crates/collab/src/tests/test_server.rs b/crates/collab/src/tests/test_server.rs index 44f6ac1450..eef1dde967 100644 --- a/crates/collab/src/tests/test_server.rs +++ b/crates/collab/src/tests/test_server.rs @@ -6,7 +6,7 @@ use crate::{ }; use anyhow::anyhow; use call::ActiveCall; -use channel::ChannelStore; +use channel::{channel_buffer::ChannelBuffer, ChannelStore}; use client::{ self, proto::PeerId, Client, Connection, Credentials, EstablishConnectionError, UserStore, }; @@ -51,6 +51,7 @@ struct TestClientState { local_projects: Vec>, remote_projects: Vec>, buffers: HashMap, HashSet>>, + channel_buffers: HashSet>, } pub struct ContactsSummary { @@ -468,6 +469,12 @@ impl TestClient { RefMut::map(self.state.borrow_mut(), |state| &mut state.buffers) } + pub fn channel_buffers<'a>( + &'a self, + ) -> impl DerefMut>> + 'a { + RefMut::map(self.state.borrow_mut(), |state| &mut state.channel_buffers) + } + pub fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary { self.app_state .user_store