diff --git a/crates/collab/src/lib.rs b/crates/collab/src/lib.rs index 27f49f5b1e..1a83193bdf 100644 --- a/crates/collab/src/lib.rs +++ b/crates/collab/src/lib.rs @@ -3,10 +3,11 @@ pub mod auth; pub mod db; pub mod env; pub mod executor; -#[cfg(test)] -mod integration_tests; pub mod rpc; +#[cfg(test)] +mod tests; + use axum::{http::StatusCode, response::IntoResponse}; use db::Database; use serde::Deserialize; diff --git a/crates/collab/src/tests.rs b/crates/collab/src/tests.rs new file mode 100644 index 0000000000..8dc29f3d60 --- /dev/null +++ b/crates/collab/src/tests.rs @@ -0,0 +1,466 @@ +use crate::{ + db::{NewUserParams, TestDb, UserId}, + executor::Executor, + rpc::{Server, CLEANUP_TIMEOUT}, + AppState, +}; +use anyhow::anyhow; +use call::ActiveCall; +use client::{ + self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials, + EstablishConnectionError, UserStore, +}; +use collections::{HashMap, HashSet}; +use fs::{FakeFs, HomeDir}; +use futures::{channel::oneshot, StreamExt as _}; +use gpui::{ + executor::Deterministic, test::EmptyView, ModelHandle, Task, TestAppContext, ViewHandle, +}; +use language::LanguageRegistry; +use parking_lot::Mutex; +use project::{Project, WorktreeId}; +use settings::Settings; +use std::{ + env, + ops::Deref, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, + Arc, + }, +}; +use theme::ThemeRegistry; +use workspace::Workspace; + +mod integration_tests; +mod randomized_integration_tests; + +struct TestServer { + app_state: Arc, + server: Arc, + connection_killers: Arc>>>, + forbid_connections: Arc, + _test_db: TestDb, + test_live_kit_server: Arc, +} + +impl TestServer { + async fn start(deterministic: &Arc) -> Self { + static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0); + + let use_postgres = env::var("USE_POSTGRES").ok(); + let use_postgres = use_postgres.as_deref(); + let test_db = if use_postgres == Some("true") || use_postgres == Some("1") { + TestDb::postgres(deterministic.build_background()) + } else { + TestDb::sqlite(deterministic.build_background()) + }; + let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst); + let live_kit_server = live_kit_client::TestServer::create( + format!("http://livekit.{}.test", live_kit_server_id), + format!("devkey-{}", live_kit_server_id), + format!("secret-{}", live_kit_server_id), + deterministic.build_background(), + ) + .unwrap(); + let app_state = Self::build_app_state(&test_db, &live_kit_server).await; + let epoch = app_state + .db + .create_server(&app_state.config.zed_environment) + .await + .unwrap(); + let server = Server::new( + epoch, + app_state.clone(), + Executor::Deterministic(deterministic.build_background()), + ); + server.start().await.unwrap(); + // Advance clock to ensure the server's cleanup task is finished. + deterministic.advance_clock(CLEANUP_TIMEOUT); + Self { + app_state, + server, + connection_killers: Default::default(), + forbid_connections: Default::default(), + _test_db: test_db, + test_live_kit_server: live_kit_server, + } + } + + async fn reset(&self) { + self.app_state.db.reset(); + let epoch = self + .app_state + .db + .create_server(&self.app_state.config.zed_environment) + .await + .unwrap(); + self.server.reset(epoch); + } + + async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient { + cx.update(|cx| { + cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf())); + + let mut settings = Settings::test(cx); + settings.projects_online_by_default = false; + cx.set_global(settings); + }); + + let http = FakeHttpClient::with_404_response(); + let user_id = if let Ok(Some(user)) = self + .app_state + .db + .get_user_by_github_account(name, None) + .await + { + user.id + } else { + self.app_state + .db + .create_user( + &format!("{name}@example.com"), + false, + NewUserParams { + github_login: name.into(), + github_user_id: 0, + invite_count: 0, + }, + ) + .await + .expect("creating user failed") + .user_id + }; + let client_name = name.to_string(); + let mut client = cx.read(|cx| Client::new(http.clone(), cx)); + let server = self.server.clone(); + let db = self.app_state.db.clone(); + let connection_killers = self.connection_killers.clone(); + let forbid_connections = self.forbid_connections.clone(); + + Arc::get_mut(&mut client) + .unwrap() + .set_id(user_id.0 as usize) + .override_authenticate(move |cx| { + cx.spawn(|_| async move { + let access_token = "the-token".to_string(); + Ok(Credentials { + user_id: user_id.0 as u64, + access_token, + }) + }) + }) + .override_establish_connection(move |credentials, cx| { + assert_eq!(credentials.user_id, user_id.0 as u64); + assert_eq!(credentials.access_token, "the-token"); + + let server = server.clone(); + let db = db.clone(); + let connection_killers = connection_killers.clone(); + let forbid_connections = forbid_connections.clone(); + let client_name = client_name.clone(); + cx.spawn(move |cx| async move { + if forbid_connections.load(SeqCst) { + Err(EstablishConnectionError::other(anyhow!( + "server is forbidding connections" + ))) + } else { + let (client_conn, server_conn, killed) = + Connection::in_memory(cx.background()); + let (connection_id_tx, connection_id_rx) = oneshot::channel(); + let user = db + .get_user_by_id(user_id) + .await + .expect("retrieving user failed") + .unwrap(); + cx.background() + .spawn(server.handle_connection( + server_conn, + client_name, + user, + Some(connection_id_tx), + Executor::Deterministic(cx.background()), + )) + .detach(); + let connection_id = connection_id_rx.await.unwrap(); + connection_killers + .lock() + .insert(connection_id.into(), killed); + Ok(client_conn) + } + }) + }); + + let fs = FakeFs::new(cx.background()); + let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx)); + let app_state = Arc::new(workspace::AppState { + client: client.clone(), + user_store: user_store.clone(), + languages: Arc::new(LanguageRegistry::new(Task::ready(()))), + themes: ThemeRegistry::new((), cx.font_cache()), + fs: fs.clone(), + build_window_options: Default::default, + initialize_workspace: |_, _, _| unimplemented!(), + dock_default_item_factory: |_, _| unimplemented!(), + }); + + Project::init(&client); + cx.update(|cx| { + workspace::init(app_state.clone(), cx); + call::init(client.clone(), user_store.clone(), cx); + }); + + client + .authenticate_and_connect(false, &cx.to_async()) + .await + .unwrap(); + + let client = TestClient { + client, + username: name.to_string(), + local_projects: Default::default(), + remote_projects: Default::default(), + next_root_dir_id: 0, + user_store, + fs, + language_registry: Arc::new(LanguageRegistry::test()), + buffers: Default::default(), + }; + client.wait_for_current_user(cx).await; + client + } + + fn disconnect_client(&self, peer_id: PeerId) { + self.connection_killers + .lock() + .remove(&peer_id) + .unwrap() + .store(true, SeqCst); + } + + fn forbid_connections(&self) { + self.forbid_connections.store(true, SeqCst); + } + + fn allow_connections(&self) { + self.forbid_connections.store(false, SeqCst); + } + + async fn make_contacts(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) { + for ix in 1..clients.len() { + let (left, right) = clients.split_at_mut(ix); + let (client_a, cx_a) = left.last_mut().unwrap(); + for (client_b, cx_b) in right { + client_a + .user_store + .update(*cx_a, |store, cx| { + store.request_contact(client_b.user_id().unwrap(), cx) + }) + .await + .unwrap(); + cx_a.foreground().run_until_parked(); + client_b + .user_store + .update(*cx_b, |store, cx| { + store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx) + }) + .await + .unwrap(); + } + } + } + + async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) { + self.make_contacts(clients).await; + + let (left, right) = clients.split_at_mut(1); + let (_client_a, cx_a) = &mut left[0]; + let active_call_a = cx_a.read(ActiveCall::global); + + for (client_b, cx_b) in right { + let user_id_b = client_b.current_user_id(*cx_b).to_proto(); + active_call_a + .update(*cx_a, |call, cx| call.invite(user_id_b, None, cx)) + .await + .unwrap(); + + cx_b.foreground().run_until_parked(); + let active_call_b = cx_b.read(ActiveCall::global); + active_call_b + .update(*cx_b, |call, cx| call.accept_incoming(cx)) + .await + .unwrap(); + } + } + + async fn build_app_state( + test_db: &TestDb, + fake_server: &live_kit_client::TestServer, + ) -> Arc { + Arc::new(AppState { + db: test_db.db().clone(), + live_kit_client: Some(Arc::new(fake_server.create_api_client())), + config: Default::default(), + }) + } +} + +impl Deref for TestServer { + type Target = Server; + + fn deref(&self) -> &Self::Target { + &self.server + } +} + +impl Drop for TestServer { + fn drop(&mut self) { + self.server.teardown(); + self.test_live_kit_server.teardown().unwrap(); + } +} + +struct TestClient { + client: Arc, + username: String, + local_projects: Vec>, + remote_projects: Vec>, + next_root_dir_id: usize, + pub user_store: ModelHandle, + language_registry: Arc, + fs: Arc, + buffers: HashMap, HashSet>>, +} + +impl Deref for TestClient { + type Target = Arc; + + fn deref(&self) -> &Self::Target { + &self.client + } +} + +struct ContactsSummary { + pub current: Vec, + pub outgoing_requests: Vec, + pub incoming_requests: Vec, +} + +impl TestClient { + pub fn current_user_id(&self, cx: &TestAppContext) -> UserId { + UserId::from_proto( + self.user_store + .read_with(cx, |user_store, _| user_store.current_user().unwrap().id), + ) + } + + async fn wait_for_current_user(&self, cx: &TestAppContext) { + let mut authed_user = self + .user_store + .read_with(cx, |user_store, _| user_store.watch_current_user()); + while authed_user.next().await.unwrap().is_none() {} + } + + async fn clear_contacts(&self, cx: &mut TestAppContext) { + self.user_store + .update(cx, |store, _| store.clear_contacts()) + .await; + } + + fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary { + self.user_store.read_with(cx, |store, _| ContactsSummary { + current: store + .contacts() + .iter() + .map(|contact| contact.user.github_login.clone()) + .collect(), + outgoing_requests: store + .outgoing_contact_requests() + .iter() + .map(|user| user.github_login.clone()) + .collect(), + incoming_requests: store + .incoming_contact_requests() + .iter() + .map(|user| user.github_login.clone()) + .collect(), + }) + } + + async fn build_local_project( + &self, + root_path: impl AsRef, + cx: &mut TestAppContext, + ) -> (ModelHandle, WorktreeId) { + let project = cx.update(|cx| { + Project::local( + self.client.clone(), + self.user_store.clone(), + self.language_registry.clone(), + self.fs.clone(), + cx, + ) + }); + let (worktree, _) = project + .update(cx, |p, cx| { + p.find_or_create_local_worktree(root_path, true, cx) + }) + .await + .unwrap(); + worktree + .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete()) + .await; + (project, worktree.read_with(cx, |tree, _| tree.id())) + } + + async fn build_remote_project( + &self, + host_project_id: u64, + guest_cx: &mut TestAppContext, + ) -> ModelHandle { + let active_call = guest_cx.read(ActiveCall::global); + let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone()); + room.update(guest_cx, |room, cx| { + room.join_project( + host_project_id, + self.language_registry.clone(), + self.fs.clone(), + cx, + ) + }) + .await + .unwrap() + } + + fn build_workspace( + &self, + project: &ModelHandle, + cx: &mut TestAppContext, + ) -> ViewHandle { + let (_, root_view) = cx.add_window(|_| EmptyView); + cx.add_view(&root_view, |cx| { + Workspace::new( + Default::default(), + 0, + project.clone(), + |_, _| unimplemented!(), + cx, + ) + }) + } + + fn create_new_root_dir(&mut self) -> PathBuf { + format!( + "/{}-root-{}", + self.username, + util::post_inc(&mut self.next_root_dir_id) + ) + .into() + } +} + +impl Drop for TestClient { + fn drop(&mut self) { + self.client.teardown(); + } +} diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/tests/integration_tests.rs similarity index 79% rename from crates/collab/src/integration_tests.rs rename to crates/collab/src/tests/integration_tests.rs index 2014b5172b..4a1aaf64d1 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/tests/integration_tests.rs @@ -1,51 +1,37 @@ use crate::{ - db::{self, NewUserParams, TestDb, UserId}, - executor::Executor, - rpc::{Server, CLEANUP_TIMEOUT, RECONNECT_TIMEOUT}, - AppState, + rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT}, + tests::{TestClient, TestServer}, }; -use anyhow::anyhow; use call::{room, ActiveCall, ParticipantLocation, Room}; -use client::{ - self, proto::PeerId, test::FakeHttpClient, Client, Connection, Credentials, - EstablishConnectionError, User, UserStore, RECEIVE_TIMEOUT, -}; -use collections::{BTreeMap, HashMap, HashSet}; +use client::{User, RECEIVE_TIMEOUT}; +use collections::HashSet; use editor::{ - self, ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, ExcerptRange, MultiBuffer, - Redo, Rename, ToOffset, ToggleCodeActions, Undo, + ConfirmCodeAction, ConfirmCompletion, ConfirmRename, Editor, ExcerptRange, MultiBuffer, Redo, + Rename, ToOffset, ToggleCodeActions, Undo, }; -use fs::{FakeFs, Fs as _, HomeDir, LineEnding, RemoveOptions}; -use futures::{channel::oneshot, StreamExt as _}; +use fs::{FakeFs, Fs as _, LineEnding, RemoveOptions}; +use futures::StreamExt as _; use gpui::{ - executor::Deterministic, geometry::vector::vec2f, test::EmptyView, ModelHandle, Task, - TestAppContext, ViewHandle, + executor::Deterministic, geometry::vector::vec2f, test::EmptyView, ModelHandle, TestAppContext, + ViewHandle, }; use language::{ - range_to_lsp, tree_sitter_rust, Anchor, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language, - LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, PointUtf16, Rope, + tree_sitter_rust, Anchor, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language, + LanguageConfig, OffsetRangeExt, Point, Rope, }; use live_kit_client::MacOSDisplay; -use lsp::{self, FakeLanguageServer}; -use parking_lot::Mutex; -use project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath, WorktreeId}; +use project::{search::SearchQuery, DiagnosticSummary, Project, ProjectPath}; use rand::prelude::*; use serde_json::json; use settings::{Formatter, Settings}; use std::{ cell::{Cell, RefCell}, env, future, mem, - ops::Deref, path::{Path, PathBuf}, rc::Rc, - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering::SeqCst}, - Arc, - }, + sync::Arc, }; -use theme::ThemeRegistry; use unindent::Unindent as _; -use util::post_inc; use workspace::{item::Item, shared_screen::SharedScreen, SplitDirection, ToggleFollow, Workspace}; #[ctor::ctor] @@ -6384,1324 +6370,6 @@ async fn test_peers_simultaneously_following_each_other( }); } -#[gpui::test(iterations = 100)] -async fn test_random_collaboration( - cx: &mut TestAppContext, - deterministic: Arc, - rng: StdRng, -) { - deterministic.forbid_parking(); - let rng = Arc::new(Mutex::new(rng)); - - let max_peers = env::var("MAX_PEERS") - .map(|i| i.parse().expect("invalid `MAX_PEERS` variable")) - .unwrap_or(5); - - let max_operations = env::var("OPERATIONS") - .map(|i| i.parse().expect("invalid `OPERATIONS` variable")) - .unwrap_or(10); - - let mut server = TestServer::start(&deterministic).await; - let db = server.app_state.db.clone(); - - let mut available_guests = Vec::new(); - for ix in 0..max_peers { - let username = format!("guest-{}", ix + 1); - let user_id = db - .create_user( - &format!("{username}@example.com"), - false, - NewUserParams { - github_login: username.clone(), - github_user_id: (ix + 1) as i32, - invite_count: 0, - }, - ) - .await - .unwrap() - .user_id; - available_guests.push((user_id, username)); - } - - for (ix, (user_id_a, _)) in available_guests.iter().enumerate() { - for (user_id_b, _) in &available_guests[ix + 1..] { - server - .app_state - .db - .send_contact_request(*user_id_a, *user_id_b) - .await - .unwrap(); - server - .app_state - .db - .respond_to_contact_request(*user_id_b, *user_id_a, true) - .await - .unwrap(); - } - } - - let mut clients = Vec::new(); - let mut user_ids = Vec::new(); - let mut op_start_signals = Vec::new(); - let mut next_entity_id = 100000; - - let mut operations = 0; - while operations < max_operations { - let distribution = rng.lock().gen_range(0..100); - match distribution { - 0..=19 if !available_guests.is_empty() => { - let guest_ix = rng.lock().gen_range(0..available_guests.len()); - let (_, guest_username) = available_guests.remove(guest_ix); - log::info!("Adding new connection for {}", guest_username); - next_entity_id += 100000; - let mut guest_cx = TestAppContext::new( - cx.foreground_platform(), - cx.platform(), - deterministic.build_foreground(next_entity_id), - deterministic.build_background(), - cx.font_cache(), - cx.leak_detector(), - next_entity_id, - cx.function_name.clone(), - ); - - let op_start_signal = futures::channel::mpsc::unbounded(); - let guest = server.create_client(&mut guest_cx, &guest_username).await; - user_ids.push(guest.current_user_id(&guest_cx)); - op_start_signals.push(op_start_signal.0); - clients.push(guest_cx.foreground().spawn(guest.simulate( - guest_username.clone(), - op_start_signal.1, - rng.clone(), - guest_cx, - ))); - - log::info!("Added connection for {}", guest_username); - operations += 1; - } - 20..=24 if clients.len() > 1 => { - let guest_ix = rng.lock().gen_range(1..clients.len()); - log::info!( - "Simulating full disconnection of guest {}", - user_ids[guest_ix] - ); - let removed_guest_id = user_ids.remove(guest_ix); - let user_connection_ids = server - .connection_pool - .lock() - .user_connection_ids(removed_guest_id) - .collect::>(); - assert_eq!(user_connection_ids.len(), 1); - let removed_peer_id = user_connection_ids[0].into(); - let guest = clients.remove(guest_ix); - op_start_signals.remove(guest_ix); - server.forbid_connections(); - server.disconnect_client(removed_peer_id); - deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); - deterministic.start_waiting(); - log::info!("Waiting for guest {} to exit...", removed_guest_id); - let (guest, mut guest_cx) = guest.await; - deterministic.finish_waiting(); - server.allow_connections(); - - for project in &guest.remote_projects { - project.read_with(&guest_cx, |project, _| assert!(project.is_read_only())); - } - for user_id in &user_ids { - let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap(); - let pool = server.connection_pool.lock(); - for contact in contacts { - if let db::Contact::Accepted { user_id, .. } = contact { - if pool.is_user_online(user_id) { - assert_ne!( - user_id, removed_guest_id, - "removed guest is still a contact of another peer" - ); - } - } - } - } - - log::info!("{} removed", guest.username); - available_guests.push((removed_guest_id, guest.username.clone())); - guest_cx.update(|cx| { - cx.clear_globals(); - drop(guest); - }); - - operations += 1; - } - 25..=29 if clients.len() > 1 => { - let guest_ix = rng.lock().gen_range(1..clients.len()); - let user_id = user_ids[guest_ix]; - log::info!("Simulating temporary disconnection of guest {}", user_id); - let user_connection_ids = server - .connection_pool - .lock() - .user_connection_ids(user_id) - .collect::>(); - assert_eq!(user_connection_ids.len(), 1); - let peer_id = user_connection_ids[0].into(); - server.disconnect_client(peer_id); - deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); - operations += 1; - } - 30..=34 => { - log::info!("Simulating server restart"); - server.reset().await; - deterministic.advance_clock(RECEIVE_TIMEOUT); - server.start().await.unwrap(); - deterministic.advance_clock(CLEANUP_TIMEOUT); - let environment = &server.app_state.config.zed_environment; - let stale_room_ids = server - .app_state - .db - .stale_room_ids(environment, server.id()) - .await - .unwrap(); - assert_eq!(stale_room_ids, vec![]); - } - _ if !op_start_signals.is_empty() => { - while operations < max_operations && rng.lock().gen_bool(0.7) { - op_start_signals - .choose(&mut *rng.lock()) - .unwrap() - .unbounded_send(()) - .unwrap(); - operations += 1; - } - - if rng.lock().gen_bool(0.8) { - deterministic.run_until_parked(); - } - } - _ => {} - } - } - - drop(op_start_signals); - deterministic.start_waiting(); - let clients = futures::future::join_all(clients).await; - deterministic.finish_waiting(); - deterministic.run_until_parked(); - - for (guest_client, guest_cx) in &clients { - for guest_project in &guest_client.remote_projects { - guest_project.read_with(guest_cx, |guest_project, cx| { - let host_project = clients.iter().find_map(|(client, cx)| { - let project = client.local_projects.iter().find(|host_project| { - host_project.read_with(cx, |host_project, _| { - host_project.remote_id() == guest_project.remote_id() - }) - })?; - Some((project, cx)) - }); - - if !guest_project.is_read_only() { - if let Some((host_project, host_cx)) = host_project { - let host_worktree_snapshots = - host_project.read_with(host_cx, |host_project, cx| { - host_project - .worktrees(cx) - .map(|worktree| { - let worktree = worktree.read(cx); - (worktree.id(), worktree.snapshot()) - }) - .collect::>() - }); - let guest_worktree_snapshots = guest_project - .worktrees(cx) - .map(|worktree| { - let worktree = worktree.read(cx); - (worktree.id(), worktree.snapshot()) - }) - .collect::>(); - - assert_eq!( - guest_worktree_snapshots.keys().collect::>(), - host_worktree_snapshots.keys().collect::>(), - "{} has different worktrees than the host", - guest_client.username - ); - - for (id, host_snapshot) in &host_worktree_snapshots { - let guest_snapshot = &guest_worktree_snapshots[id]; - assert_eq!( - guest_snapshot.root_name(), - host_snapshot.root_name(), - "{} has different root name than the host for worktree {}", - guest_client.username, - id - ); - assert_eq!( - guest_snapshot.abs_path(), - host_snapshot.abs_path(), - "{} has different abs path than the host for worktree {}", - guest_client.username, - id - ); - assert_eq!( - guest_snapshot.entries(false).collect::>(), - host_snapshot.entries(false).collect::>(), - "{} has different snapshot than the host for worktree {}", - guest_client.username, - id - ); - assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id()); - } - } - } - - guest_project.check_invariants(cx); - }); - } - - for (guest_project, guest_buffers) in &guest_client.buffers { - let project_id = if guest_project.read_with(guest_cx, |project, _| { - project.is_local() || project.is_read_only() - }) { - continue; - } else { - guest_project - .read_with(guest_cx, |project, _| project.remote_id()) - .unwrap() - }; - - let host_project = clients.iter().find_map(|(client, cx)| { - let project = client.local_projects.iter().find(|host_project| { - host_project.read_with(cx, |host_project, _| { - host_project.remote_id() == Some(project_id) - }) - })?; - Some((project, cx)) - }); - - let (host_project, host_cx) = if let Some((host_project, host_cx)) = host_project { - (host_project, host_cx) - } else { - continue; - }; - - for guest_buffer in guest_buffers { - let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id()); - let host_buffer = host_project.read_with(host_cx, |project, cx| { - project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| { - panic!( - "host does not have buffer for guest:{}, peer:{:?}, id:{}", - guest_client.username, - guest_client.peer_id(), - buffer_id - ) - }) - }); - let path = host_buffer - .read_with(host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx)); - - assert_eq!( - guest_buffer.read_with(guest_cx, |buffer, _| buffer.deferred_ops_len()), - 0, - "{}, buffer {}, path {:?} has deferred operations", - guest_client.username, - buffer_id, - path, - ); - assert_eq!( - guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()), - host_buffer.read_with(host_cx, |buffer, _| buffer.text()), - "{}, buffer {}, path {:?}, differs from the host's buffer", - guest_client.username, - buffer_id, - path - ); - } - } - } - - for (client, mut cx) in clients { - cx.update(|cx| { - cx.clear_globals(); - drop(client); - }); - } -} - -struct TestServer { - app_state: Arc, - server: Arc, - connection_killers: Arc>>>, - forbid_connections: Arc, - _test_db: TestDb, - test_live_kit_server: Arc, -} - -impl TestServer { - async fn start(deterministic: &Arc) -> Self { - static NEXT_LIVE_KIT_SERVER_ID: AtomicUsize = AtomicUsize::new(0); - - let use_postgres = env::var("USE_POSTGRES").ok(); - let use_postgres = use_postgres.as_deref(); - let test_db = if use_postgres == Some("true") || use_postgres == Some("1") { - TestDb::postgres(deterministic.build_background()) - } else { - TestDb::sqlite(deterministic.build_background()) - }; - let live_kit_server_id = NEXT_LIVE_KIT_SERVER_ID.fetch_add(1, SeqCst); - let live_kit_server = live_kit_client::TestServer::create( - format!("http://livekit.{}.test", live_kit_server_id), - format!("devkey-{}", live_kit_server_id), - format!("secret-{}", live_kit_server_id), - deterministic.build_background(), - ) - .unwrap(); - let app_state = Self::build_app_state(&test_db, &live_kit_server).await; - let epoch = app_state - .db - .create_server(&app_state.config.zed_environment) - .await - .unwrap(); - let server = Server::new( - epoch, - app_state.clone(), - Executor::Deterministic(deterministic.build_background()), - ); - server.start().await.unwrap(); - // Advance clock to ensure the server's cleanup task is finished. - deterministic.advance_clock(CLEANUP_TIMEOUT); - Self { - app_state, - server, - connection_killers: Default::default(), - forbid_connections: Default::default(), - _test_db: test_db, - test_live_kit_server: live_kit_server, - } - } - - async fn reset(&self) { - self.app_state.db.reset(); - let epoch = self - .app_state - .db - .create_server(&self.app_state.config.zed_environment) - .await - .unwrap(); - self.server.reset(epoch); - } - - async fn create_client(&mut self, cx: &mut TestAppContext, name: &str) -> TestClient { - cx.update(|cx| { - cx.set_global(HomeDir(Path::new("/tmp/").to_path_buf())); - - let mut settings = Settings::test(cx); - settings.projects_online_by_default = false; - cx.set_global(settings); - }); - - let http = FakeHttpClient::with_404_response(); - let user_id = if let Ok(Some(user)) = self - .app_state - .db - .get_user_by_github_account(name, None) - .await - { - user.id - } else { - self.app_state - .db - .create_user( - &format!("{name}@example.com"), - false, - NewUserParams { - github_login: name.into(), - github_user_id: 0, - invite_count: 0, - }, - ) - .await - .expect("creating user failed") - .user_id - }; - let client_name = name.to_string(); - let mut client = cx.read(|cx| Client::new(http.clone(), cx)); - let server = self.server.clone(); - let db = self.app_state.db.clone(); - let connection_killers = self.connection_killers.clone(); - let forbid_connections = self.forbid_connections.clone(); - - Arc::get_mut(&mut client) - .unwrap() - .set_id(user_id.0 as usize) - .override_authenticate(move |cx| { - cx.spawn(|_| async move { - let access_token = "the-token".to_string(); - Ok(Credentials { - user_id: user_id.0 as u64, - access_token, - }) - }) - }) - .override_establish_connection(move |credentials, cx| { - assert_eq!(credentials.user_id, user_id.0 as u64); - assert_eq!(credentials.access_token, "the-token"); - - let server = server.clone(); - let db = db.clone(); - let connection_killers = connection_killers.clone(); - let forbid_connections = forbid_connections.clone(); - let client_name = client_name.clone(); - cx.spawn(move |cx| async move { - if forbid_connections.load(SeqCst) { - Err(EstablishConnectionError::other(anyhow!( - "server is forbidding connections" - ))) - } else { - let (client_conn, server_conn, killed) = - Connection::in_memory(cx.background()); - let (connection_id_tx, connection_id_rx) = oneshot::channel(); - let user = db - .get_user_by_id(user_id) - .await - .expect("retrieving user failed") - .unwrap(); - cx.background() - .spawn(server.handle_connection( - server_conn, - client_name, - user, - Some(connection_id_tx), - Executor::Deterministic(cx.background()), - )) - .detach(); - let connection_id = connection_id_rx.await.unwrap(); - connection_killers - .lock() - .insert(connection_id.into(), killed); - Ok(client_conn) - } - }) - }); - - let fs = FakeFs::new(cx.background()); - let user_store = cx.add_model(|cx| UserStore::new(client.clone(), http, cx)); - let app_state = Arc::new(workspace::AppState { - client: client.clone(), - user_store: user_store.clone(), - languages: Arc::new(LanguageRegistry::new(Task::ready(()))), - themes: ThemeRegistry::new((), cx.font_cache()), - fs: fs.clone(), - build_window_options: Default::default, - initialize_workspace: |_, _, _| unimplemented!(), - dock_default_item_factory: |_, _| unimplemented!(), - }); - - Project::init(&client); - cx.update(|cx| { - workspace::init(app_state.clone(), cx); - call::init(client.clone(), user_store.clone(), cx); - }); - - client - .authenticate_and_connect(false, &cx.to_async()) - .await - .unwrap(); - - let client = TestClient { - client, - username: name.to_string(), - local_projects: Default::default(), - remote_projects: Default::default(), - next_root_dir_id: 0, - user_store, - fs, - language_registry: Arc::new(LanguageRegistry::test()), - buffers: Default::default(), - }; - client.wait_for_current_user(cx).await; - client - } - - fn disconnect_client(&self, peer_id: PeerId) { - self.connection_killers - .lock() - .remove(&peer_id) - .unwrap() - .store(true, SeqCst); - } - - fn forbid_connections(&self) { - self.forbid_connections.store(true, SeqCst); - } - - fn allow_connections(&self) { - self.forbid_connections.store(false, SeqCst); - } - - async fn make_contacts(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) { - for ix in 1..clients.len() { - let (left, right) = clients.split_at_mut(ix); - let (client_a, cx_a) = left.last_mut().unwrap(); - for (client_b, cx_b) in right { - client_a - .user_store - .update(*cx_a, |store, cx| { - store.request_contact(client_b.user_id().unwrap(), cx) - }) - .await - .unwrap(); - cx_a.foreground().run_until_parked(); - client_b - .user_store - .update(*cx_b, |store, cx| { - store.respond_to_contact_request(client_a.user_id().unwrap(), true, cx) - }) - .await - .unwrap(); - } - } - } - - async fn create_room(&self, clients: &mut [(&TestClient, &mut TestAppContext)]) { - self.make_contacts(clients).await; - - let (left, right) = clients.split_at_mut(1); - let (_client_a, cx_a) = &mut left[0]; - let active_call_a = cx_a.read(ActiveCall::global); - - for (client_b, cx_b) in right { - let user_id_b = client_b.current_user_id(*cx_b).to_proto(); - active_call_a - .update(*cx_a, |call, cx| call.invite(user_id_b, None, cx)) - .await - .unwrap(); - - cx_b.foreground().run_until_parked(); - let active_call_b = cx_b.read(ActiveCall::global); - active_call_b - .update(*cx_b, |call, cx| call.accept_incoming(cx)) - .await - .unwrap(); - } - } - - async fn build_app_state( - test_db: &TestDb, - fake_server: &live_kit_client::TestServer, - ) -> Arc { - Arc::new(AppState { - db: test_db.db().clone(), - live_kit_client: Some(Arc::new(fake_server.create_api_client())), - config: Default::default(), - }) - } -} - -impl Deref for TestServer { - type Target = Server; - - fn deref(&self) -> &Self::Target { - &self.server - } -} - -impl Drop for TestServer { - fn drop(&mut self) { - self.server.teardown(); - self.test_live_kit_server.teardown().unwrap(); - } -} - -struct TestClient { - client: Arc, - username: String, - local_projects: Vec>, - remote_projects: Vec>, - next_root_dir_id: usize, - pub user_store: ModelHandle, - language_registry: Arc, - fs: Arc, - buffers: HashMap, HashSet>>, -} - -impl Deref for TestClient { - type Target = Arc; - - fn deref(&self) -> &Self::Target { - &self.client - } -} - -struct ContactsSummary { - pub current: Vec, - pub outgoing_requests: Vec, - pub incoming_requests: Vec, -} - -impl TestClient { - pub fn current_user_id(&self, cx: &TestAppContext) -> UserId { - UserId::from_proto( - self.user_store - .read_with(cx, |user_store, _| user_store.current_user().unwrap().id), - ) - } - - async fn wait_for_current_user(&self, cx: &TestAppContext) { - let mut authed_user = self - .user_store - .read_with(cx, |user_store, _| user_store.watch_current_user()); - while authed_user.next().await.unwrap().is_none() {} - } - - async fn clear_contacts(&self, cx: &mut TestAppContext) { - self.user_store - .update(cx, |store, _| store.clear_contacts()) - .await; - } - - fn summarize_contacts(&self, cx: &TestAppContext) -> ContactsSummary { - self.user_store.read_with(cx, |store, _| ContactsSummary { - current: store - .contacts() - .iter() - .map(|contact| contact.user.github_login.clone()) - .collect(), - outgoing_requests: store - .outgoing_contact_requests() - .iter() - .map(|user| user.github_login.clone()) - .collect(), - incoming_requests: store - .incoming_contact_requests() - .iter() - .map(|user| user.github_login.clone()) - .collect(), - }) - } - - async fn build_local_project( - &self, - root_path: impl AsRef, - cx: &mut TestAppContext, - ) -> (ModelHandle, WorktreeId) { - let project = cx.update(|cx| { - Project::local( - self.client.clone(), - self.user_store.clone(), - self.language_registry.clone(), - self.fs.clone(), - cx, - ) - }); - let (worktree, _) = project - .update(cx, |p, cx| { - p.find_or_create_local_worktree(root_path, true, cx) - }) - .await - .unwrap(); - worktree - .read_with(cx, |tree, _| tree.as_local().unwrap().scan_complete()) - .await; - (project, worktree.read_with(cx, |tree, _| tree.id())) - } - - async fn build_remote_project( - &self, - host_project_id: u64, - guest_cx: &mut TestAppContext, - ) -> ModelHandle { - let active_call = guest_cx.read(ActiveCall::global); - let room = active_call.read_with(guest_cx, |call, _| call.room().unwrap().clone()); - room.update(guest_cx, |room, cx| { - room.join_project( - host_project_id, - self.language_registry.clone(), - self.fs.clone(), - cx, - ) - }) - .await - .unwrap() - } - - fn build_workspace( - &self, - project: &ModelHandle, - cx: &mut TestAppContext, - ) -> ViewHandle { - let (_, root_view) = cx.add_window(|_| EmptyView); - cx.add_view(&root_view, |cx| { - Workspace::new( - Default::default(), - 0, - project.clone(), - |_, _| unimplemented!(), - cx, - ) - }) - } - - pub async fn simulate( - mut self, - username: String, - mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, - rng: Arc>, - mut cx: TestAppContext, - ) -> (Self, TestAppContext) { - async fn tick( - client: &mut TestClient, - username: &str, - rng: Arc>, - cx: &mut TestAppContext, - ) -> anyhow::Result<()> { - let active_call = cx.read(ActiveCall::global); - if active_call.read_with(cx, |call, _| call.incoming().borrow().is_some()) { - if rng.lock().gen() { - log::info!("{}: accepting incoming call", username); - active_call - .update(cx, |call, cx| call.accept_incoming(cx)) - .await?; - } else { - log::info!("{}: declining incoming call", username); - active_call.update(cx, |call, _| call.decline_incoming())?; - } - } else { - let available_contacts = client.user_store.read_with(cx, |user_store, _| { - user_store - .contacts() - .iter() - .filter(|contact| contact.online && !contact.busy) - .cloned() - .collect::>() - }); - - let distribution = rng.lock().gen_range(0..100); - match distribution { - 0..=29 if !available_contacts.is_empty() => { - let contact = available_contacts.choose(&mut *rng.lock()).unwrap(); - log::info!("{}: inviting {}", username, contact.user.github_login); - active_call - .update(cx, |call, cx| call.invite(contact.user.id, None, cx)) - .await?; - } - 30..=39 if active_call.read_with(cx, |call, _| call.room().is_some()) => { - log::info!("{}: hanging up", username); - active_call.update(cx, |call, cx| call.hang_up(cx))?; - } - _ => {} - } - } - - let remote_projects = - if let Some(room) = active_call.read_with(cx, |call, _| call.room().cloned()) { - room.read_with(cx, |room, _| { - room.remote_participants() - .values() - .flat_map(|participant| participant.projects.clone()) - .collect::>() - }) - } else { - Default::default() - }; - let project = if remote_projects.is_empty() || rng.lock().gen() { - if client.local_projects.is_empty() || rng.lock().gen() { - let dir_paths = client.fs.directories().await; - let local_project = if dir_paths.is_empty() || rng.lock().gen() { - let root_path = format!( - "/{}-root-{}", - username, - post_inc(&mut client.next_root_dir_id) - ); - let root_path = Path::new(&root_path); - client.fs.create_dir(root_path).await.unwrap(); - client - .fs - .create_file(&root_path.join("main.rs"), Default::default()) - .await - .unwrap(); - log::info!("{}: opening local project at {:?}", username, root_path); - client.build_local_project(root_path, cx).await.0 - } else { - let root_path = dir_paths.choose(&mut *rng.lock()).unwrap(); - log::info!("{}: opening local project at {:?}", username, root_path); - client.build_local_project(root_path, cx).await.0 - }; - client.local_projects.push(local_project.clone()); - local_project - } else { - client - .local_projects - .choose(&mut *rng.lock()) - .unwrap() - .clone() - } - } else { - if client.remote_projects.is_empty() || rng.lock().gen() { - let remote_project_id = remote_projects.choose(&mut *rng.lock()).unwrap().id; - let remote_project = if let Some(project) = - client.remote_projects.iter().find(|project| { - project.read_with(cx, |project, _| { - project.remote_id() == Some(remote_project_id) - }) - }) { - project.clone() - } else { - log::info!("{}: opening remote project {}", username, remote_project_id); - let remote_project = Project::remote( - remote_project_id, - client.client.clone(), - client.user_store.clone(), - client.language_registry.clone(), - FakeFs::new(cx.background()), - cx.to_async(), - ) - .await?; - client.remote_projects.push(remote_project.clone()); - remote_project - }; - - remote_project - } else { - client - .remote_projects - .choose(&mut *rng.lock()) - .unwrap() - .clone() - } - }; - - if active_call.read_with(cx, |call, _| call.room().is_some()) { - if let Err(error) = active_call - .update(cx, |call, cx| call.share_project(project.clone(), cx)) - .await - { - log::error!("{}: error sharing project, {:?}", username, error); - } - } - - let buffers = client.buffers.entry(project.clone()).or_default(); - let buffer = if buffers.is_empty() || rng.lock().gen() { - let worktree = if let Some(worktree) = project.read_with(cx, |project, cx| { - project - .worktrees(cx) - .filter(|worktree| { - let worktree = worktree.read(cx); - worktree.is_visible() && worktree.entries(false).any(|e| e.is_file()) - }) - .choose(&mut *rng.lock()) - }) { - worktree - } else { - cx.background().simulate_random_delay().await; - return Ok(()); - }; - - let (worktree_root_name, project_path) = worktree.read_with(cx, |worktree, _| { - let entry = worktree - .entries(false) - .filter(|e| e.is_file()) - .choose(&mut *rng.lock()) - .unwrap(); - ( - worktree.root_name().to_string(), - (worktree.id(), entry.path.clone()), - ) - }); - log::info!( - "{}: opening path {:?} in worktree {} ({})", - username, - project_path.1, - project_path.0, - worktree_root_name, - ); - let buffer = project - .update(cx, |project, cx| { - project.open_buffer(project_path.clone(), cx) - }) - .await?; - log::info!( - "{}: opened path {:?} in worktree {} ({}) with buffer id {}", - username, - project_path.1, - project_path.0, - worktree_root_name, - buffer.read_with(cx, |buffer, _| buffer.remote_id()) - ); - buffers.insert(buffer.clone()); - buffer - } else { - buffers.iter().choose(&mut *rng.lock()).unwrap().clone() - }; - - let choice = rng.lock().gen_range(0..100); - match choice { - 0..=9 => { - cx.update(|cx| { - log::info!( - "{}: dropping buffer {:?}", - username, - buffer.read(cx).file().unwrap().full_path(cx) - ); - buffers.remove(&buffer); - drop(buffer); - }); - } - 10..=19 => { - let completions = project.update(cx, |project, cx| { - log::info!( - "{}: requesting completions for buffer {} ({:?})", - username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.completions(&buffer, offset, cx) - }); - let completions = cx.background().spawn(async move { - completions - .await - .map_err(|err| anyhow!("completions request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching completions request", username); - cx.update(|cx| completions.detach_and_log_err(cx)); - } else { - completions.await?; - } - } - 20..=29 => { - let code_actions = project.update(cx, |project, cx| { - log::info!( - "{}: requesting code actions for buffer {} ({:?})", - username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock()); - project.code_actions(&buffer, range, cx) - }); - let code_actions = cx.background().spawn(async move { - code_actions - .await - .map_err(|err| anyhow!("code actions request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching code actions request", username); - cx.update(|cx| code_actions.detach_and_log_err(cx)); - } else { - code_actions.await?; - } - } - 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => { - let (requested_version, save) = buffer.update(cx, |buffer, cx| { - log::info!( - "{}: saving buffer {} ({:?})", - username, - buffer.remote_id(), - buffer.file().unwrap().full_path(cx) - ); - (buffer.version(), buffer.save(cx)) - }); - let save = cx.background().spawn(async move { - let (saved_version, _, _) = save - .await - .map_err(|err| anyhow!("save request failed: {:?}", err))?; - assert!(saved_version.observed_all(&requested_version)); - Ok::<_, anyhow::Error>(()) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching save request", username); - cx.update(|cx| save.detach_and_log_err(cx)); - } else { - save.await?; - } - } - 40..=44 => { - let prepare_rename = project.update(cx, |project, cx| { - log::info!( - "{}: preparing rename for buffer {} ({:?})", - username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.prepare_rename(buffer, offset, cx) - }); - let prepare_rename = cx.background().spawn(async move { - prepare_rename - .await - .map_err(|err| anyhow!("prepare rename request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching prepare rename request", username); - cx.update(|cx| prepare_rename.detach_and_log_err(cx)); - } else { - prepare_rename.await?; - } - } - 45..=49 => { - let definitions = project.update(cx, |project, cx| { - log::info!( - "{}: requesting definitions for buffer {} ({:?})", - username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.definition(&buffer, offset, cx) - }); - let definitions = cx.background().spawn(async move { - definitions - .await - .map_err(|err| anyhow!("definitions request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching definitions request", username); - cx.update(|cx| definitions.detach_and_log_err(cx)); - } else { - buffers.extend(definitions.await?.into_iter().map(|loc| loc.target.buffer)); - } - } - 50..=54 => { - let highlights = project.update(cx, |project, cx| { - log::info!( - "{}: requesting highlights for buffer {} ({:?})", - username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.document_highlights(&buffer, offset, cx) - }); - let highlights = cx.background().spawn(async move { - highlights - .await - .map_err(|err| anyhow!("highlights request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching highlights request", username); - cx.update(|cx| highlights.detach_and_log_err(cx)); - } else { - highlights.await?; - } - } - 55..=59 => { - let search = project.update(cx, |project, cx| { - let query = rng.lock().gen_range('a'..='z'); - log::info!("{}: project-wide search {:?}", username, query); - project.search(SearchQuery::text(query, false, false), cx) - }); - let search = cx.background().spawn(async move { - search - .await - .map_err(|err| anyhow!("search request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching search request", username); - cx.update(|cx| search.detach_and_log_err(cx)); - } else { - buffers.extend(search.await?.into_keys()); - } - } - 60..=79 => { - let worktree = project - .read_with(cx, |project, cx| { - project - .worktrees(cx) - .filter(|worktree| { - let worktree = worktree.read(cx); - worktree.is_visible() - && worktree.entries(false).any(|e| e.is_file()) - && worktree.root_entry().map_or(false, |e| e.is_dir()) - }) - .choose(&mut *rng.lock()) - }) - .unwrap(); - let (worktree_id, worktree_root_name) = worktree - .read_with(cx, |worktree, _| { - (worktree.id(), worktree.root_name().to_string()) - }); - - let mut new_name = String::new(); - for _ in 0..10 { - let letter = rng.lock().gen_range('a'..='z'); - new_name.push(letter); - } - - let is_dir = rng.lock().gen::(); - let mut new_path = PathBuf::new(); - new_path.push(new_name); - if !is_dir { - new_path.set_extension("rs"); - } - log::info!( - "{}: creating {:?} in worktree {} ({})", - username, - new_path, - worktree_id, - worktree_root_name, - ); - project - .update(cx, |project, cx| { - project.create_entry((worktree_id, new_path), is_dir, cx) - }) - .unwrap() - .await?; - } - _ => { - buffer.update(cx, |buffer, cx| { - log::info!( - "{}: updating buffer {} ({:?})", - username, - buffer.remote_id(), - buffer.file().unwrap().full_path(cx) - ); - if rng.lock().gen_bool(0.7) { - buffer.randomly_edit(&mut *rng.lock(), 5, cx); - } else { - buffer.randomly_undo_redo(&mut *rng.lock(), cx); - } - }); - } - } - - Ok(()) - } - - // Setup language server - let mut language = Language::new( - LanguageConfig { - name: "Rust".into(), - path_suffixes: vec!["rs".to_string()], - ..Default::default() - }, - None, - ); - let _fake_language_servers = language - .set_fake_lsp_adapter(Arc::new(FakeLspAdapter { - name: "the-fake-language-server", - capabilities: lsp::LanguageServer::full_capabilities(), - initializer: Some(Box::new({ - let rng = rng.clone(); - let fs = self.fs.clone(); - move |fake_server: &mut FakeLanguageServer| { - fake_server.handle_request::( - |_, _| async move { - Ok(Some(lsp::CompletionResponse::Array(vec![ - lsp::CompletionItem { - text_edit: Some(lsp::CompletionTextEdit::Edit( - lsp::TextEdit { - range: lsp::Range::new( - lsp::Position::new(0, 0), - lsp::Position::new(0, 0), - ), - new_text: "the-new-text".to_string(), - }, - )), - ..Default::default() - }, - ]))) - }, - ); - - fake_server.handle_request::( - |_, _| async move { - Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction( - lsp::CodeAction { - title: "the-code-action".to_string(), - ..Default::default() - }, - )])) - }, - ); - - fake_server.handle_request::( - |params, _| async move { - Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( - params.position, - params.position, - )))) - }, - ); - - fake_server.handle_request::({ - let fs = fs.clone(); - let rng = rng.clone(); - move |_, _| { - let fs = fs.clone(); - let rng = rng.clone(); - async move { - let files = fs.files().await; - let mut rng = rng.lock(); - let count = rng.gen_range::(1..3); - let files = (0..count) - .map(|_| files.choose(&mut *rng).unwrap()) - .collect::>(); - log::info!("LSP: Returning definitions in files {:?}", &files); - Ok(Some(lsp::GotoDefinitionResponse::Array( - files - .into_iter() - .map(|file| lsp::Location { - uri: lsp::Url::from_file_path(file).unwrap(), - range: Default::default(), - }) - .collect(), - ))) - } - } - }); - - fake_server.handle_request::( - { - let rng = rng.clone(); - move |_, _| { - let mut highlights = Vec::new(); - let highlight_count = rng.lock().gen_range(1..=5); - for _ in 0..highlight_count { - let start_row = rng.lock().gen_range(0..100); - let start_column = rng.lock().gen_range(0..100); - let start = PointUtf16::new(start_row, start_column); - let end_row = rng.lock().gen_range(0..100); - let end_column = rng.lock().gen_range(0..100); - let end = PointUtf16::new(end_row, end_column); - let range = - if start > end { end..start } else { start..end }; - highlights.push(lsp::DocumentHighlight { - range: range_to_lsp(range.clone()), - kind: Some(lsp::DocumentHighlightKind::READ), - }); - } - highlights.sort_unstable_by_key(|highlight| { - (highlight.range.start, highlight.range.end) - }); - async move { Ok(Some(highlights)) } - } - }, - ); - } - })), - ..Default::default() - })) - .await; - self.language_registry.add(Arc::new(language)); - - while op_start_signal.next().await.is_some() { - if let Err(error) = tick(&mut self, &username, rng.clone(), &mut cx).await { - log::error!("{} error: {:?}", username, error); - } - - cx.background().simulate_random_delay().await; - } - log::info!("{}: done", username); - - (self, cx) - } -} - -impl Drop for TestClient { - fn drop(&mut self) { - self.client.teardown(); - } -} - #[derive(Debug, Eq, PartialEq)] struct RoomParticipants { remote: Vec, diff --git a/crates/collab/src/tests/randomized_integration_tests.rs b/crates/collab/src/tests/randomized_integration_tests.rs new file mode 100644 index 0000000000..6d1df1bee9 --- /dev/null +++ b/crates/collab/src/tests/randomized_integration_tests.rs @@ -0,0 +1,919 @@ +use crate::{ + db::{self, NewUserParams}, + rpc::{CLEANUP_TIMEOUT, RECONNECT_TIMEOUT}, + tests::{TestClient, TestServer}, +}; +use anyhow::anyhow; +use call::ActiveCall; +use client::RECEIVE_TIMEOUT; +use collections::BTreeMap; +use fs::{FakeFs, Fs as _}; +use futures::StreamExt as _; +use gpui::{executor::Deterministic, TestAppContext}; +use language::{range_to_lsp, FakeLspAdapter, Language, LanguageConfig, PointUtf16}; +use lsp::FakeLanguageServer; +use parking_lot::Mutex; +use project::{search::SearchQuery, Project}; +use rand::prelude::*; +use std::{env, path::PathBuf, sync::Arc}; + +#[gpui::test(iterations = 100)] +async fn test_random_collaboration( + cx: &mut TestAppContext, + deterministic: Arc, + rng: StdRng, +) { + deterministic.forbid_parking(); + let rng = Arc::new(Mutex::new(rng)); + + let max_peers = env::var("MAX_PEERS") + .map(|i| i.parse().expect("invalid `MAX_PEERS` variable")) + .unwrap_or(5); + + let max_operations = env::var("OPERATIONS") + .map(|i| i.parse().expect("invalid `OPERATIONS` variable")) + .unwrap_or(10); + + let mut server = TestServer::start(&deterministic).await; + let db = server.app_state.db.clone(); + + let mut available_guests = Vec::new(); + for ix in 0..max_peers { + let username = format!("guest-{}", ix + 1); + let user_id = db + .create_user( + &format!("{username}@example.com"), + false, + NewUserParams { + github_login: username.clone(), + github_user_id: (ix + 1) as i32, + invite_count: 0, + }, + ) + .await + .unwrap() + .user_id; + available_guests.push((user_id, username)); + } + + for (ix, (user_id_a, _)) in available_guests.iter().enumerate() { + for (user_id_b, _) in &available_guests[ix + 1..] { + server + .app_state + .db + .send_contact_request(*user_id_a, *user_id_b) + .await + .unwrap(); + server + .app_state + .db + .respond_to_contact_request(*user_id_b, *user_id_a, true) + .await + .unwrap(); + } + } + + let mut clients = Vec::new(); + let mut user_ids = Vec::new(); + let mut op_start_signals = Vec::new(); + let mut next_entity_id = 100000; + + let mut operations = 0; + while operations < max_operations { + let distribution = rng.lock().gen_range(0..100); + match distribution { + 0..=19 if !available_guests.is_empty() => { + let guest_ix = rng.lock().gen_range(0..available_guests.len()); + let (_, guest_username) = available_guests.remove(guest_ix); + log::info!("Adding new connection for {}", guest_username); + next_entity_id += 100000; + let mut guest_cx = TestAppContext::new( + cx.foreground_platform(), + cx.platform(), + deterministic.build_foreground(next_entity_id), + deterministic.build_background(), + cx.font_cache(), + cx.leak_detector(), + next_entity_id, + cx.function_name.clone(), + ); + + let op_start_signal = futures::channel::mpsc::unbounded(); + let guest = server.create_client(&mut guest_cx, &guest_username).await; + user_ids.push(guest.current_user_id(&guest_cx)); + op_start_signals.push(op_start_signal.0); + clients.push(guest_cx.foreground().spawn(simulate_client( + guest, + op_start_signal.1, + rng.clone(), + guest_cx, + ))); + + log::info!("Added connection for {}", guest_username); + operations += 1; + } + 20..=24 if clients.len() > 1 => { + let guest_ix = rng.lock().gen_range(1..clients.len()); + log::info!( + "Simulating full disconnection of guest {}", + user_ids[guest_ix] + ); + let removed_guest_id = user_ids.remove(guest_ix); + let user_connection_ids = server + .connection_pool + .lock() + .user_connection_ids(removed_guest_id) + .collect::>(); + assert_eq!(user_connection_ids.len(), 1); + let removed_peer_id = user_connection_ids[0].into(); + let guest = clients.remove(guest_ix); + op_start_signals.remove(guest_ix); + server.forbid_connections(); + server.disconnect_client(removed_peer_id); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + deterministic.start_waiting(); + log::info!("Waiting for guest {} to exit...", removed_guest_id); + let (guest, mut guest_cx) = guest.await; + deterministic.finish_waiting(); + server.allow_connections(); + + for project in &guest.remote_projects { + project.read_with(&guest_cx, |project, _| assert!(project.is_read_only())); + } + for user_id in &user_ids { + let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap(); + let pool = server.connection_pool.lock(); + for contact in contacts { + if let db::Contact::Accepted { user_id, .. } = contact { + if pool.is_user_online(user_id) { + assert_ne!( + user_id, removed_guest_id, + "removed guest is still a contact of another peer" + ); + } + } + } + } + + log::info!("{} removed", guest.username); + available_guests.push((removed_guest_id, guest.username.clone())); + guest_cx.update(|cx| { + cx.clear_globals(); + drop(guest); + }); + + operations += 1; + } + 25..=29 if clients.len() > 1 => { + let guest_ix = rng.lock().gen_range(1..clients.len()); + let user_id = user_ids[guest_ix]; + log::info!("Simulating temporary disconnection of guest {}", user_id); + let user_connection_ids = server + .connection_pool + .lock() + .user_connection_ids(user_id) + .collect::>(); + assert_eq!(user_connection_ids.len(), 1); + let peer_id = user_connection_ids[0].into(); + server.disconnect_client(peer_id); + deterministic.advance_clock(RECEIVE_TIMEOUT + RECONNECT_TIMEOUT); + operations += 1; + } + 30..=34 => { + log::info!("Simulating server restart"); + server.reset().await; + deterministic.advance_clock(RECEIVE_TIMEOUT); + server.start().await.unwrap(); + deterministic.advance_clock(CLEANUP_TIMEOUT); + let environment = &server.app_state.config.zed_environment; + let stale_room_ids = server + .app_state + .db + .stale_room_ids(environment, server.id()) + .await + .unwrap(); + assert_eq!(stale_room_ids, vec![]); + } + _ if !op_start_signals.is_empty() => { + while operations < max_operations && rng.lock().gen_bool(0.7) { + op_start_signals + .choose(&mut *rng.lock()) + .unwrap() + .unbounded_send(()) + .unwrap(); + operations += 1; + } + + if rng.lock().gen_bool(0.8) { + deterministic.run_until_parked(); + } + } + _ => {} + } + } + + drop(op_start_signals); + deterministic.start_waiting(); + let clients = futures::future::join_all(clients).await; + deterministic.finish_waiting(); + deterministic.run_until_parked(); + + for (guest_client, guest_cx) in &clients { + for guest_project in &guest_client.remote_projects { + guest_project.read_with(guest_cx, |guest_project, cx| { + let host_project = clients.iter().find_map(|(client, cx)| { + let project = client.local_projects.iter().find(|host_project| { + host_project.read_with(cx, |host_project, _| { + host_project.remote_id() == guest_project.remote_id() + }) + })?; + Some((project, cx)) + }); + + if !guest_project.is_read_only() { + if let Some((host_project, host_cx)) = host_project { + let host_worktree_snapshots = + host_project.read_with(host_cx, |host_project, cx| { + host_project + .worktrees(cx) + .map(|worktree| { + let worktree = worktree.read(cx); + (worktree.id(), worktree.snapshot()) + }) + .collect::>() + }); + let guest_worktree_snapshots = guest_project + .worktrees(cx) + .map(|worktree| { + let worktree = worktree.read(cx); + (worktree.id(), worktree.snapshot()) + }) + .collect::>(); + + assert_eq!( + guest_worktree_snapshots.keys().collect::>(), + host_worktree_snapshots.keys().collect::>(), + "{} has different worktrees than the host", + guest_client.username + ); + + for (id, host_snapshot) in &host_worktree_snapshots { + let guest_snapshot = &guest_worktree_snapshots[id]; + assert_eq!( + guest_snapshot.root_name(), + host_snapshot.root_name(), + "{} has different root name than the host for worktree {}", + guest_client.username, + id + ); + assert_eq!( + guest_snapshot.abs_path(), + host_snapshot.abs_path(), + "{} has different abs path than the host for worktree {}", + guest_client.username, + id + ); + assert_eq!( + guest_snapshot.entries(false).collect::>(), + host_snapshot.entries(false).collect::>(), + "{} has different snapshot than the host for worktree {}", + guest_client.username, + id + ); + assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id()); + } + } + } + + guest_project.check_invariants(cx); + }); + } + + for (guest_project, guest_buffers) in &guest_client.buffers { + let project_id = if guest_project.read_with(guest_cx, |project, _| { + project.is_local() || project.is_read_only() + }) { + continue; + } else { + guest_project + .read_with(guest_cx, |project, _| project.remote_id()) + .unwrap() + }; + + let host_project = clients.iter().find_map(|(client, cx)| { + let project = client.local_projects.iter().find(|host_project| { + host_project.read_with(cx, |host_project, _| { + host_project.remote_id() == Some(project_id) + }) + })?; + Some((project, cx)) + }); + + let (host_project, host_cx) = if let Some((host_project, host_cx)) = host_project { + (host_project, host_cx) + } else { + continue; + }; + + for guest_buffer in guest_buffers { + let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id()); + let host_buffer = host_project.read_with(host_cx, |project, cx| { + project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| { + panic!( + "host does not have buffer for guest:{}, peer:{:?}, id:{}", + guest_client.username, + guest_client.peer_id(), + buffer_id + ) + }) + }); + let path = host_buffer + .read_with(host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx)); + + assert_eq!( + guest_buffer.read_with(guest_cx, |buffer, _| buffer.deferred_ops_len()), + 0, + "{}, buffer {}, path {:?} has deferred operations", + guest_client.username, + buffer_id, + path, + ); + assert_eq!( + guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()), + host_buffer.read_with(host_cx, |buffer, _| buffer.text()), + "{}, buffer {}, path {:?}, differs from the host's buffer", + guest_client.username, + buffer_id, + path + ); + } + } + } + + for (client, mut cx) in clients { + cx.update(|cx| { + cx.clear_globals(); + drop(client); + }); + } +} + +async fn simulate_client( + mut client: TestClient, + mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, + rng: Arc>, + mut cx: TestAppContext, +) -> (TestClient, TestAppContext) { + // Setup language server + let mut language = Language::new( + LanguageConfig { + name: "Rust".into(), + path_suffixes: vec!["rs".to_string()], + ..Default::default() + }, + None, + ); + let _fake_language_servers = language + .set_fake_lsp_adapter(Arc::new(FakeLspAdapter { + name: "the-fake-language-server", + capabilities: lsp::LanguageServer::full_capabilities(), + initializer: Some(Box::new({ + let rng = rng.clone(); + let fs = client.fs.clone(); + move |fake_server: &mut FakeLanguageServer| { + fake_server.handle_request::( + |_, _| async move { + Ok(Some(lsp::CompletionResponse::Array(vec![ + lsp::CompletionItem { + text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit { + range: lsp::Range::new( + lsp::Position::new(0, 0), + lsp::Position::new(0, 0), + ), + new_text: "the-new-text".to_string(), + })), + ..Default::default() + }, + ]))) + }, + ); + + fake_server.handle_request::( + |_, _| async move { + Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction( + lsp::CodeAction { + title: "the-code-action".to_string(), + ..Default::default() + }, + )])) + }, + ); + + fake_server.handle_request::( + |params, _| async move { + Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( + params.position, + params.position, + )))) + }, + ); + + fake_server.handle_request::({ + let fs = fs.clone(); + let rng = rng.clone(); + move |_, _| { + let fs = fs.clone(); + let rng = rng.clone(); + async move { + let files = fs.files().await; + let mut rng = rng.lock(); + let count = rng.gen_range::(1..3); + let files = (0..count) + .map(|_| files.choose(&mut *rng).unwrap()) + .collect::>(); + log::info!("LSP: Returning definitions in files {:?}", &files); + Ok(Some(lsp::GotoDefinitionResponse::Array( + files + .into_iter() + .map(|file| lsp::Location { + uri: lsp::Url::from_file_path(file).unwrap(), + range: Default::default(), + }) + .collect(), + ))) + } + } + }); + + fake_server.handle_request::({ + let rng = rng.clone(); + move |_, _| { + let mut highlights = Vec::new(); + let highlight_count = rng.lock().gen_range(1..=5); + for _ in 0..highlight_count { + let start_row = rng.lock().gen_range(0..100); + let start_column = rng.lock().gen_range(0..100); + let start = PointUtf16::new(start_row, start_column); + let end_row = rng.lock().gen_range(0..100); + let end_column = rng.lock().gen_range(0..100); + let end = PointUtf16::new(end_row, end_column); + let range = if start > end { end..start } else { start..end }; + highlights.push(lsp::DocumentHighlight { + range: range_to_lsp(range.clone()), + kind: Some(lsp::DocumentHighlightKind::READ), + }); + } + highlights.sort_unstable_by_key(|highlight| { + (highlight.range.start, highlight.range.end) + }); + async move { Ok(Some(highlights)) } + } + }); + } + })), + ..Default::default() + })) + .await; + client.language_registry.add(Arc::new(language)); + + while op_start_signal.next().await.is_some() { + if let Err(error) = randomly_mutate_client(&mut client, rng.clone(), &mut cx).await { + log::error!("{} error: {:?}", client.username, error); + } + + cx.background().simulate_random_delay().await; + } + log::info!("{}: done", client.username); + + (client, cx) +} + +async fn randomly_mutate_client( + client: &mut TestClient, + rng: Arc>, + cx: &mut TestAppContext, +) -> anyhow::Result<()> { + let active_call = cx.read(ActiveCall::global); + if active_call.read_with(cx, |call, _| call.incoming().borrow().is_some()) { + if rng.lock().gen() { + log::info!("{}: accepting incoming call", client.username); + active_call + .update(cx, |call, cx| call.accept_incoming(cx)) + .await?; + } else { + log::info!("{}: declining incoming call", client.username); + active_call.update(cx, |call, _| call.decline_incoming())?; + } + } else { + let available_contacts = client.user_store.read_with(cx, |user_store, _| { + user_store + .contacts() + .iter() + .filter(|contact| contact.online && !contact.busy) + .cloned() + .collect::>() + }); + + let distribution = rng.lock().gen_range(0..100); + match distribution { + 0..=29 if !available_contacts.is_empty() => { + let contact = available_contacts.choose(&mut *rng.lock()).unwrap(); + log::info!( + "{}: inviting {}", + client.username, + contact.user.github_login + ); + active_call + .update(cx, |call, cx| call.invite(contact.user.id, None, cx)) + .await?; + } + 30..=39 if active_call.read_with(cx, |call, _| call.room().is_some()) => { + log::info!("{}: hanging up", client.username); + active_call.update(cx, |call, cx| call.hang_up(cx))?; + } + _ => {} + } + } + + let remote_projects = + if let Some(room) = active_call.read_with(cx, |call, _| call.room().cloned()) { + room.read_with(cx, |room, _| { + room.remote_participants() + .values() + .flat_map(|participant| participant.projects.clone()) + .collect::>() + }) + } else { + Default::default() + }; + + let project = if remote_projects.is_empty() || rng.lock().gen() { + if client.local_projects.is_empty() || rng.lock().gen() { + let dir_paths = client.fs.directories().await; + let local_project = if dir_paths.is_empty() || rng.lock().gen() { + let root_path = client.create_new_root_dir(); + client.fs.create_dir(&root_path).await.unwrap(); + client + .fs + .create_file(&root_path.join("main.rs"), Default::default()) + .await + .unwrap(); + log::info!( + "{}: opening local project at {:?}", + client.username, + root_path + ); + client.build_local_project(root_path, cx).await.0 + } else { + let root_path = dir_paths.choose(&mut *rng.lock()).unwrap(); + log::info!( + "{}: opening local project at {:?}", + client.username, + root_path + ); + client.build_local_project(root_path, cx).await.0 + }; + client.local_projects.push(local_project.clone()); + local_project + } else { + client + .local_projects + .choose(&mut *rng.lock()) + .unwrap() + .clone() + } + } else { + if client.remote_projects.is_empty() || rng.lock().gen() { + let remote_project_id = remote_projects.choose(&mut *rng.lock()).unwrap().id; + let remote_project = if let Some(project) = + client.remote_projects.iter().find(|project| { + project.read_with(cx, |project, _| { + project.remote_id() == Some(remote_project_id) + }) + }) { + project.clone() + } else { + log::info!( + "{}: opening remote project {}", + client.username, + remote_project_id + ); + let remote_project = Project::remote( + remote_project_id, + client.client.clone(), + client.user_store.clone(), + client.language_registry.clone(), + FakeFs::new(cx.background()), + cx.to_async(), + ) + .await?; + client.remote_projects.push(remote_project.clone()); + remote_project + }; + + remote_project + } else { + client + .remote_projects + .choose(&mut *rng.lock()) + .unwrap() + .clone() + } + }; + + if active_call.read_with(cx, |call, _| call.room().is_some()) { + if let Err(error) = active_call + .update(cx, |call, cx| call.share_project(project.clone(), cx)) + .await + { + log::error!("{}: error sharing project, {:?}", client.username, error); + } + } + + let buffers = client.buffers.entry(project.clone()).or_default(); + let buffer = if buffers.is_empty() || rng.lock().gen() { + let worktree = if let Some(worktree) = project.read_with(cx, |project, cx| { + project + .worktrees(cx) + .filter(|worktree| { + let worktree = worktree.read(cx); + worktree.is_visible() && worktree.entries(false).any(|e| e.is_file()) + }) + .choose(&mut *rng.lock()) + }) { + worktree + } else { + cx.background().simulate_random_delay().await; + return Ok(()); + }; + + let (worktree_root_name, project_path) = worktree.read_with(cx, |worktree, _| { + let entry = worktree + .entries(false) + .filter(|e| e.is_file()) + .choose(&mut *rng.lock()) + .unwrap(); + ( + worktree.root_name().to_string(), + (worktree.id(), entry.path.clone()), + ) + }); + log::info!( + "{}: opening path {:?} in worktree {} ({})", + client.username, + project_path.1, + project_path.0, + worktree_root_name, + ); + let buffer = project + .update(cx, |project, cx| { + project.open_buffer(project_path.clone(), cx) + }) + .await?; + log::info!( + "{}: opened path {:?} in worktree {} ({}) with buffer id {}", + client.username, + project_path.1, + project_path.0, + worktree_root_name, + buffer.read_with(cx, |buffer, _| buffer.remote_id()) + ); + buffers.insert(buffer.clone()); + buffer + } else { + buffers.iter().choose(&mut *rng.lock()).unwrap().clone() + }; + + let choice = rng.lock().gen_range(0..100); + match choice { + 0..=9 => { + cx.update(|cx| { + log::info!( + "{}: dropping buffer {:?}", + client.username, + buffer.read(cx).file().unwrap().full_path(cx) + ); + buffers.remove(&buffer); + drop(buffer); + }); + } + 10..=19 => { + let completions = project.update(cx, |project, cx| { + log::info!( + "{}: requesting completions for buffer {} ({:?})", + client.username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.completions(&buffer, offset, cx) + }); + let completions = cx.background().spawn(async move { + completions + .await + .map_err(|err| anyhow!("completions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching completions request", client.username); + cx.update(|cx| completions.detach_and_log_err(cx)); + } else { + completions.await?; + } + } + 20..=29 => { + let code_actions = project.update(cx, |project, cx| { + log::info!( + "{}: requesting code actions for buffer {} ({:?})", + client.username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock()); + project.code_actions(&buffer, range, cx) + }); + let code_actions = cx.background().spawn(async move { + code_actions + .await + .map_err(|err| anyhow!("code actions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching code actions request", client.username); + cx.update(|cx| code_actions.detach_and_log_err(cx)); + } else { + code_actions.await?; + } + } + 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => { + let (requested_version, save) = buffer.update(cx, |buffer, cx| { + log::info!( + "{}: saving buffer {} ({:?})", + client.username, + buffer.remote_id(), + buffer.file().unwrap().full_path(cx) + ); + (buffer.version(), buffer.save(cx)) + }); + let save = cx.background().spawn(async move { + let (saved_version, _, _) = save + .await + .map_err(|err| anyhow!("save request failed: {:?}", err))?; + assert!(saved_version.observed_all(&requested_version)); + Ok::<_, anyhow::Error>(()) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching save request", client.username); + cx.update(|cx| save.detach_and_log_err(cx)); + } else { + save.await?; + } + } + 40..=44 => { + let prepare_rename = project.update(cx, |project, cx| { + log::info!( + "{}: preparing rename for buffer {} ({:?})", + client.username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.prepare_rename(buffer, offset, cx) + }); + let prepare_rename = cx.background().spawn(async move { + prepare_rename + .await + .map_err(|err| anyhow!("prepare rename request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching prepare rename request", client.username); + cx.update(|cx| prepare_rename.detach_and_log_err(cx)); + } else { + prepare_rename.await?; + } + } + 45..=49 => { + let definitions = project.update(cx, |project, cx| { + log::info!( + "{}: requesting definitions for buffer {} ({:?})", + client.username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.definition(&buffer, offset, cx) + }); + let definitions = cx.background().spawn(async move { + definitions + .await + .map_err(|err| anyhow!("definitions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching definitions request", client.username); + cx.update(|cx| definitions.detach_and_log_err(cx)); + } else { + buffers.extend(definitions.await?.into_iter().map(|loc| loc.target.buffer)); + } + } + 50..=54 => { + let highlights = project.update(cx, |project, cx| { + log::info!( + "{}: requesting highlights for buffer {} ({:?})", + client.username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.document_highlights(&buffer, offset, cx) + }); + let highlights = cx.background().spawn(async move { + highlights + .await + .map_err(|err| anyhow!("highlights request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching highlights request", client.username); + cx.update(|cx| highlights.detach_and_log_err(cx)); + } else { + highlights.await?; + } + } + 55..=59 => { + let search = project.update(cx, |project, cx| { + let query = rng.lock().gen_range('a'..='z'); + log::info!("{}: project-wide search {:?}", client.username, query); + project.search(SearchQuery::text(query, false, false), cx) + }); + let search = cx.background().spawn(async move { + search + .await + .map_err(|err| anyhow!("search request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching search request", client.username); + cx.update(|cx| search.detach_and_log_err(cx)); + } else { + buffers.extend(search.await?.into_keys()); + } + } + 60..=79 => { + let worktree = project + .read_with(cx, |project, cx| { + project + .worktrees(cx) + .filter(|worktree| { + let worktree = worktree.read(cx); + worktree.is_visible() + && worktree.entries(false).any(|e| e.is_file()) + && worktree.root_entry().map_or(false, |e| e.is_dir()) + }) + .choose(&mut *rng.lock()) + }) + .unwrap(); + let (worktree_id, worktree_root_name) = worktree.read_with(cx, |worktree, _| { + (worktree.id(), worktree.root_name().to_string()) + }); + + let mut new_name = String::new(); + for _ in 0..10 { + let letter = rng.lock().gen_range('a'..='z'); + new_name.push(letter); + } + + let is_dir = rng.lock().gen::(); + let mut new_path = PathBuf::new(); + new_path.push(new_name); + if !is_dir { + new_path.set_extension("rs"); + } + log::info!( + "{}: creating {:?} in worktree {} ({})", + client.username, + new_path, + worktree_id, + worktree_root_name, + ); + project + .update(cx, |project, cx| { + project.create_entry((worktree_id, new_path), is_dir, cx) + }) + .unwrap() + .await?; + } + _ => { + buffer.update(cx, |buffer, cx| { + log::info!( + "{}: updating buffer {} ({:?})", + client.username, + buffer.remote_id(), + buffer.file().unwrap().full_path(cx) + ); + if rng.lock().gen_bool(0.7) { + buffer.randomly_edit(&mut *rng.lock(), 5, cx); + } else { + buffer.randomly_undo_redo(&mut *rng.lock(), cx); + } + }); + } + } + + Ok(()) +}