diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index c4a3167f83..b03c183881 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -835,11 +835,14 @@ impl Client { continue; }; - if let Some(handler) = state.message_handlers.get(&payload_type_id).cloned() - { - drop(state); // Avoid deadlocks if the handler interacts with rpc::Client - let future = handler(model, message, &this, cx.clone()); + let handler = state.message_handlers.get(&payload_type_id).cloned(); + // Dropping the state prevents deadlocks if the handler interacts with rpc::Client. + // It also ensures we don't hold the lock while yielding back to the executor, as + // that might cause the executor thread driving this future to block indefinitely. + drop(state); + if let Some(handler) = handler { + let future = handler(model, message, &this, cx.clone()); let client_id = this.id; log::debug!( "rpc message received. client_id:{}, message_id:{}, sender_id:{:?}, type:{}", diff --git a/crates/collab/src/integration_tests.rs b/crates/collab/src/integration_tests.rs index 166de2b4ac..2bf2701f23 100644 --- a/crates/collab/src/integration_tests.rs +++ b/crates/collab/src/integration_tests.rs @@ -28,7 +28,7 @@ use gpui::{ }; use language::{ range_to_lsp, tree_sitter_rust, Diagnostic, DiagnosticEntry, FakeLspAdapter, Language, - LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, Rope, + LanguageConfig, LanguageRegistry, OffsetRangeExt, Point, PointUtf16, Rope, }; use live_kit_client::MacOSDisplay; use lsp::{self, FakeLanguageServer}; @@ -54,6 +54,7 @@ use std::{ }; use theme::ThemeRegistry; use unindent::Unindent as _; +use util::post_inc; use workspace::{shared_screen::SharedScreen, Item, SplitDirection, ToggleFollow, Workspace}; #[ctor::ctor] @@ -5778,57 +5779,28 @@ async fn test_random_collaboration( rng: StdRng, ) { deterministic.forbid_parking(); + let rng = Arc::new(Mutex::new(rng)); + let max_peers = env::var("MAX_PEERS") .map(|i| i.parse().expect("invalid `MAX_PEERS` variable")) .unwrap_or(5); - assert!(max_peers <= 5); let max_operations = env::var("OPERATIONS") .map(|i| i.parse().expect("invalid `OPERATIONS` variable")) .unwrap_or(10); - let rng = Arc::new(Mutex::new(rng)); - - let guest_lang_registry = Arc::new(LanguageRegistry::test()); - let host_language_registry = Arc::new(LanguageRegistry::test()); - - let fs = FakeFs::new(cx.background()); - fs.insert_tree("/_collab", json!({"init": ""})).await; - let mut server = TestServer::start(cx.foreground(), cx.background()).await; let db = server.app_state.db.clone(); - let room_creator_user_id = db - .create_user( - "room-creator@example.com", - false, - NewUserParams { - github_login: "room-creator".into(), - github_user_id: 0, - invite_count: 0, - }, - ) - .await - .unwrap() - .user_id; - let mut available_guests = vec![ - "guest-1".to_string(), - "guest-2".to_string(), - "guest-3".to_string(), - "guest-4".to_string(), - ]; - - for (ix, username) in Some(&"host".to_string()) - .into_iter() - .chain(&available_guests) - .enumerate() - { + let mut available_guests = Vec::new(); + for ix in 0..max_peers { + let username = format!("guest-{}", ix + 1); let user_id = db .create_user( &format!("{username}@example.com"), false, NewUserParams { - github_login: username.into(), + github_login: username.clone(), github_user_id: (ix + 1) as i32, invite_count: 0, }, @@ -5836,270 +5808,39 @@ async fn test_random_collaboration( .await .unwrap() .user_id; - server - .app_state - .db - .send_contact_request(user_id, room_creator_user_id) - .await - .unwrap(); - server - .app_state - .db - .respond_to_contact_request(room_creator_user_id, user_id, true) - .await - .unwrap(); + available_guests.push((user_id, username)); } - let _room_creator = server.create_client(cx, "room-creator").await; - let active_call = cx.read(ActiveCall::global); + for (ix, (user_id_a, _)) in available_guests.iter().enumerate() { + for (user_id_b, _) in &available_guests[ix + 1..] { + server + .app_state + .db + .send_contact_request(*user_id_a, *user_id_b) + .await + .unwrap(); + server + .app_state + .db + .respond_to_contact_request(*user_id_b, *user_id_a, true) + .await + .unwrap(); + } + } let mut clients = Vec::new(); let mut user_ids = Vec::new(); let mut peer_ids = Vec::new(); let mut op_start_signals = Vec::new(); - let mut next_entity_id = 100000; - let mut host_cx = TestAppContext::new( - cx.foreground_platform(), - cx.platform(), - deterministic.build_foreground(next_entity_id), - deterministic.build_background(), - cx.font_cache(), - cx.leak_detector(), - next_entity_id, - cx.function_name.clone(), - ); - let host = server.create_client(&mut host_cx, "host").await; - let host_project = host_cx.update(|cx| { - Project::local( - host.client.clone(), - host.user_store.clone(), - host.project_store.clone(), - host_language_registry.clone(), - fs.clone(), - cx, - ) - }); - - let (collab_worktree, _) = host_project - .update(&mut host_cx, |project, cx| { - project.find_or_create_local_worktree("/_collab", true, cx) - }) - .await - .unwrap(); - collab_worktree - .read_with(&host_cx, |tree, _| tree.as_local().unwrap().scan_complete()) - .await; - - // Set up fake language servers. - let mut language = Language::new( - LanguageConfig { - name: "Rust".into(), - path_suffixes: vec!["rs".to_string()], - ..Default::default() - }, - None, - ); - let _fake_servers = language - .set_fake_lsp_adapter(Arc::new(FakeLspAdapter { - name: "the-fake-language-server", - capabilities: lsp::LanguageServer::full_capabilities(), - initializer: Some(Box::new({ - let rng = rng.clone(); - let fs = fs.clone(); - let project = host_project.downgrade(); - move |fake_server: &mut FakeLanguageServer| { - fake_server.handle_request::( - |_, _| async move { - Ok(Some(lsp::CompletionResponse::Array(vec![ - lsp::CompletionItem { - text_edit: Some(lsp::CompletionTextEdit::Edit(lsp::TextEdit { - range: lsp::Range::new( - lsp::Position::new(0, 0), - lsp::Position::new(0, 0), - ), - new_text: "the-new-text".to_string(), - })), - ..Default::default() - }, - ]))) - }, - ); - - fake_server.handle_request::( - |_, _| async move { - Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction( - lsp::CodeAction { - title: "the-code-action".to_string(), - ..Default::default() - }, - )])) - }, - ); - - fake_server.handle_request::( - |params, _| async move { - Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( - params.position, - params.position, - )))) - }, - ); - - fake_server.handle_request::({ - let fs = fs.clone(); - let rng = rng.clone(); - move |_, _| { - let fs = fs.clone(); - let rng = rng.clone(); - async move { - let files = fs.files().await; - let mut rng = rng.lock(); - let count = rng.gen_range::(1..3); - let files = (0..count) - .map(|_| files.choose(&mut *rng).unwrap()) - .collect::>(); - log::info!("LSP: Returning definitions in files {:?}", &files); - Ok(Some(lsp::GotoDefinitionResponse::Array( - files - .into_iter() - .map(|file| lsp::Location { - uri: lsp::Url::from_file_path(file).unwrap(), - range: Default::default(), - }) - .collect(), - ))) - } - } - }); - - fake_server.handle_request::({ - let rng = rng.clone(); - let project = project; - move |params, mut cx| { - let highlights = if let Some(project) = project.upgrade(&cx) { - project.update(&mut cx, |project, cx| { - let path = params - .text_document_position_params - .text_document - .uri - .to_file_path() - .unwrap(); - let (worktree, relative_path) = - project.find_local_worktree(&path, cx)?; - let project_path = - ProjectPath::from((worktree.read(cx).id(), relative_path)); - let buffer = - project.get_open_buffer(&project_path, cx)?.read(cx); - - let mut highlights = Vec::new(); - let highlight_count = rng.lock().gen_range(1..=5); - let mut prev_end = 0; - for _ in 0..highlight_count { - let range = - buffer.random_byte_range(prev_end, &mut *rng.lock()); - - highlights.push(lsp::DocumentHighlight { - range: range_to_lsp(range.to_point_utf16(buffer)), - kind: Some(lsp::DocumentHighlightKind::READ), - }); - prev_end = range.end; - } - Some(highlights) - }) - } else { - None - }; - async move { Ok(highlights) } - } - }); - } - })), - ..Default::default() - })) - .await; - host_language_registry.add(Arc::new(language)); - - let host_user_id = host.current_user_id(&host_cx); - active_call - .update(cx, |call, cx| { - call.invite(host_user_id.to_proto(), None, cx) - }) - .await - .unwrap(); - active_call.read_with(cx, |call, cx| call.room().unwrap().read(cx).id()); - deterministic.run_until_parked(); - let host_active_call = host_cx.read(ActiveCall::global); - host_active_call - .update(&mut host_cx, |call, cx| call.accept_incoming(cx)) - .await - .unwrap(); - - let host_project_id = host_active_call - .update(&mut host_cx, |call, cx| { - call.share_project(host_project.clone(), cx) - }) - .await - .unwrap(); - - let op_start_signal = futures::channel::mpsc::unbounded(); - user_ids.push(host_user_id); - peer_ids.push(host.peer_id().unwrap()); - op_start_signals.push(op_start_signal.0); - clients.push(host_cx.foreground().spawn(host.simulate_host( - host_project, - op_start_signal.1, - rng.clone(), - host_cx, - ))); - - let disconnect_host_at = if rng.lock().gen_bool(0.2) { - rng.lock().gen_range(0..max_operations) - } else { - max_operations - }; let mut operations = 0; while operations < max_operations { - if operations == disconnect_host_at { - server.disconnect_client(peer_ids[0]); - deterministic.advance_clock(RECEIVE_TIMEOUT); - drop(op_start_signals); - - deterministic.start_waiting(); - let mut clients = futures::future::join_all(clients).await; - deterministic.finish_waiting(); - deterministic.run_until_parked(); - - let (host, host_project, mut host_cx, host_err) = clients.remove(0); - if let Some(host_err) = host_err { - log::error!("host error - {:?}", host_err); - } - host_project.read_with(&host_cx, |project, _| assert!(!project.is_shared())); - for (guest, guest_project, mut guest_cx, guest_err) in clients { - if let Some(guest_err) = guest_err { - log::error!("{} error - {:?}", guest.username, guest_err); - } - - guest_project.read_with(&guest_cx, |project, _| assert!(project.is_read_only())); - guest_cx.update(|cx| { - cx.clear_globals(); - drop((guest, guest_project)); - }); - } - host_cx.update(|cx| { - cx.clear_globals(); - drop((host, host_project)); - }); - - return; - } - let distribution = rng.lock().gen_range(0..100); match distribution { 0..=19 if !available_guests.is_empty() => { let guest_ix = rng.lock().gen_range(0..available_guests.len()); - let guest_username = available_guests.remove(guest_ix); + let (_, guest_username) = available_guests.remove(guest_ix); log::info!("Adding new connection for {}", guest_username); next_entity_id += 100000; let mut guest_cx = TestAppContext::new( @@ -6113,43 +5854,13 @@ async fn test_random_collaboration( cx.function_name.clone(), ); - deterministic.start_waiting(); - let guest = server.create_client(&mut guest_cx, &guest_username).await; - let guest_user_id = guest.current_user_id(&guest_cx); - - active_call - .update(cx, |call, cx| { - call.invite(guest_user_id.to_proto(), None, cx) - }) - .await - .unwrap(); - deterministic.run_until_parked(); - guest_cx - .read(ActiveCall::global) - .update(&mut guest_cx, |call, cx| call.accept_incoming(cx)) - .await - .unwrap(); - - let guest_project = Project::remote( - host_project_id, - guest.client.clone(), - guest.user_store.clone(), - guest.project_store.clone(), - guest_lang_registry.clone(), - FakeFs::new(cx.background()), - guest_cx.to_async(), - ) - .await - .unwrap(); - deterministic.finish_waiting(); - let op_start_signal = futures::channel::mpsc::unbounded(); - user_ids.push(guest_user_id); + let guest = server.create_client(&mut guest_cx, &guest_username).await; + user_ids.push(guest.current_user_id(&guest_cx)); peer_ids.push(guest.peer_id().unwrap()); op_start_signals.push(op_start_signal.0); - clients.push(guest_cx.foreground().spawn(guest.simulate_guest( + clients.push(guest_cx.foreground().spawn(guest.simulate( guest_username.clone(), - guest_project, op_start_signal.1, rng.clone(), guest_cx, @@ -6170,14 +5881,13 @@ async fn test_random_collaboration( deterministic.advance_clock(RECEIVE_TIMEOUT); deterministic.start_waiting(); log::info!("Waiting for guest {} to exit...", removed_guest_id); - let (guest, guest_project, mut guest_cx, guest_err) = guest.await; + let (guest, mut guest_cx) = guest.await; deterministic.finish_waiting(); server.allow_connections(); - if let Some(guest_err) = guest_err { - log::error!("{} error - {:?}", guest.username, guest_err); + for project in &guest.remote_projects { + project.read_with(&guest_cx, |project, _| assert!(project.is_read_only())); } - guest_project.read_with(&guest_cx, |project, _| assert!(project.is_read_only())); for user_id in &user_ids { let contacts = server.app_state.db.get_contacts(*user_id).await.unwrap(); let contacts = server @@ -6197,15 +5907,15 @@ async fn test_random_collaboration( } log::info!("{} removed", guest.username); - available_guests.push(guest.username.clone()); + available_guests.push((removed_guest_id, guest.username.clone())); guest_cx.update(|cx| { cx.clear_globals(); - drop((guest, guest_project)); + drop(guest); }); operations += 1; } - _ => { + _ if !op_start_signals.is_empty() => { while operations < max_operations && rng.lock().gen_bool(0.7) { op_start_signals .choose(&mut *rng.lock()) @@ -6219,115 +5929,147 @@ async fn test_random_collaboration( deterministic.run_until_parked(); } } + _ => {} } } drop(op_start_signals); deterministic.start_waiting(); - let mut clients = futures::future::join_all(clients).await; + let clients = futures::future::join_all(clients).await; deterministic.finish_waiting(); deterministic.run_until_parked(); - let (host_client, host_project, mut host_cx, host_err) = clients.remove(0); - if let Some(host_err) = host_err { - panic!("host error - {:?}", host_err); - } - let host_worktree_snapshots = host_project.read_with(&host_cx, |project, cx| { - project - .worktrees(cx) - .map(|worktree| { - let snapshot = worktree.read(cx).snapshot(); - (snapshot.id(), snapshot) - }) - .collect::>() - }); + for (guest_client, guest_cx) in &clients { + for guest_project in &guest_client.remote_projects { + guest_project.read_with(guest_cx, |guest_project, cx| { + let host_project = clients.iter().find_map(|(client, cx)| { + let project = client.local_projects.iter().find(|host_project| { + host_project.read_with(cx, |host_project, _| { + host_project.remote_id() == guest_project.remote_id() + }) + })?; + Some((project, cx)) + }); - host_project.read_with(&host_cx, |project, cx| project.check_invariants(cx)); + if !guest_project.is_read_only() { + if let Some((host_project, host_cx)) = host_project { + let host_worktree_snapshots = + host_project.read_with(host_cx, |host_project, cx| { + host_project + .worktrees(cx) + .map(|worktree| { + let worktree = worktree.read(cx); + (worktree.id(), worktree.snapshot()) + }) + .collect::>() + }); + let guest_worktree_snapshots = guest_project + .worktrees(cx) + .map(|worktree| { + let worktree = worktree.read(cx); + (worktree.id(), worktree.snapshot()) + }) + .collect::>(); - for (guest_client, guest_project, mut guest_cx, guest_err) in clients.into_iter() { - if let Some(guest_err) = guest_err { - panic!("{} error - {:?}", guest_client.username, guest_err); - } - let worktree_snapshots = guest_project.read_with(&guest_cx, |project, cx| { - project - .worktrees(cx) - .map(|worktree| { - let worktree = worktree.read(cx); - (worktree.id(), worktree.snapshot()) - }) - .collect::>() - }); + assert_eq!( + guest_worktree_snapshots.keys().collect::>(), + host_worktree_snapshots.keys().collect::>(), + "{} has different worktrees than the host", + guest_client.username + ); - assert_eq!( - worktree_snapshots.keys().collect::>(), - host_worktree_snapshots.keys().collect::>(), - "{} has different worktrees than the host", - guest_client.username - ); - for (id, host_snapshot) in &host_worktree_snapshots { - let guest_snapshot = &worktree_snapshots[id]; - assert_eq!( - guest_snapshot.root_name(), - host_snapshot.root_name(), - "{} has different root name than the host for worktree {}", - guest_client.username, - id - ); - assert_eq!( - guest_snapshot.entries(false).collect::>(), - host_snapshot.entries(false).collect::>(), - "{} has different snapshot than the host for worktree {}", - guest_client.username, - id - ); - assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id()); - } + for (id, host_snapshot) in &host_worktree_snapshots { + let guest_snapshot = &guest_worktree_snapshots[id]; + assert_eq!( + guest_snapshot.root_name(), + host_snapshot.root_name(), + "{} has different root name than the host for worktree {}", + guest_client.username, + id + ); + assert_eq!( + guest_snapshot.entries(false).collect::>(), + host_snapshot.entries(false).collect::>(), + "{} has different snapshot than the host for worktree {}", + guest_client.username, + id + ); + assert_eq!(guest_snapshot.scan_id(), host_snapshot.scan_id()); + } + } + } - guest_project.read_with(&guest_cx, |project, cx| project.check_invariants(cx)); - - for guest_buffer in &guest_client.buffers { - let buffer_id = guest_buffer.read_with(&guest_cx, |buffer, _| buffer.remote_id()); - let host_buffer = host_project.read_with(&host_cx, |project, cx| { - project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| { - panic!( - "host does not have buffer for guest:{}, peer:{:?}, id:{}", - guest_client.username, - guest_client.peer_id(), - buffer_id - ) - }) + guest_project.check_invariants(cx); }); - let path = - host_buffer.read_with(&host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx)); - - assert_eq!( - guest_buffer.read_with(&guest_cx, |buffer, _| buffer.deferred_ops_len()), - 0, - "{}, buffer {}, path {:?} has deferred operations", - guest_client.username, - buffer_id, - path, - ); - assert_eq!( - guest_buffer.read_with(&guest_cx, |buffer, _| buffer.text()), - host_buffer.read_with(&host_cx, |buffer, _| buffer.text()), - "{}, buffer {}, path {:?}, differs from the host's buffer", - guest_client.username, - buffer_id, - path - ); } - guest_cx.update(|cx| { - cx.clear_globals(); - drop((guest_project, guest_client)); - }); + for (guest_project, guest_buffers) in &guest_client.buffers { + let project_id = if guest_project.read_with(guest_cx, |project, _| { + project.is_local() || project.is_read_only() + }) { + continue; + } else { + guest_project + .read_with(guest_cx, |project, _| project.remote_id()) + .unwrap() + }; + + let host_project = clients.iter().find_map(|(client, cx)| { + let project = client.local_projects.iter().find(|host_project| { + host_project.read_with(cx, |host_project, _| { + host_project.remote_id() == Some(project_id) + }) + })?; + Some((project, cx)) + }); + + let (host_project, host_cx) = if let Some((host_project, host_cx)) = host_project { + (host_project, host_cx) + } else { + continue; + }; + + for guest_buffer in guest_buffers { + let buffer_id = guest_buffer.read_with(guest_cx, |buffer, _| buffer.remote_id()); + let host_buffer = host_project.read_with(host_cx, |project, cx| { + project.buffer_for_id(buffer_id, cx).unwrap_or_else(|| { + panic!( + "host does not have buffer for guest:{}, peer:{:?}, id:{}", + guest_client.username, + guest_client.peer_id(), + buffer_id + ) + }) + }); + let path = host_buffer + .read_with(host_cx, |buffer, cx| buffer.file().unwrap().full_path(cx)); + + assert_eq!( + guest_buffer.read_with(guest_cx, |buffer, _| buffer.deferred_ops_len()), + 0, + "{}, buffer {}, path {:?} has deferred operations", + guest_client.username, + buffer_id, + path, + ); + assert_eq!( + guest_buffer.read_with(guest_cx, |buffer, _| buffer.text()), + host_buffer.read_with(host_cx, |buffer, _| buffer.text()), + "{}, buffer {}, path {:?}, differs from the host's buffer", + guest_client.username, + buffer_id, + path + ); + } + } } - host_cx.update(|cx| { - cx.clear_globals(); - drop((host_client, host_project)) - }); + for (client, mut cx) in clients { + cx.update(|cx| { + cx.clear_globals(); + drop(client); + }); + } } struct TestServer { @@ -6494,6 +6236,9 @@ impl TestServer { let client = TestClient { client, username: name.to_string(), + local_projects: Default::default(), + remote_projects: Default::default(), + next_root_dir_id: 0, user_store, project_store, fs, @@ -6612,11 +6357,14 @@ impl Drop for TestServer { struct TestClient { client: Arc, username: String, + local_projects: Vec>, + remote_projects: Vec>, + next_root_dir_id: usize, pub user_store: ModelHandle, pub project_store: ModelHandle, language_registry: Arc, fs: Arc, - buffers: HashSet>, + buffers: HashMap, HashSet>>, } impl Deref for TestClient { @@ -6731,470 +6479,558 @@ impl TestClient { }) } - async fn simulate_host( + pub async fn simulate( mut self, - project: ModelHandle, - op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, + username: String, + mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, rng: Arc>, mut cx: TestAppContext, - ) -> ( - Self, - ModelHandle, - TestAppContext, - Option, - ) { - async fn simulate_host_internal( + ) -> (Self, TestAppContext) { + async fn tick( client: &mut TestClient, - project: ModelHandle, - mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, + username: &str, rng: Arc>, cx: &mut TestAppContext, ) -> anyhow::Result<()> { - let fs = project.read_with(cx, |project, _| project.fs().clone()); - - while op_start_signal.next().await.is_some() { - let distribution = rng.lock().gen_range::(0..100); - let files = fs.as_fake().files().await; - match distribution { - 0..=19 if !files.is_empty() => { - let path = files.choose(&mut *rng.lock()).unwrap(); - let mut path = path.as_path(); - while let Some(parent_path) = path.parent() { - path = parent_path; - if rng.lock().gen() { - break; - } - } - - log::info!("Host: find/create local worktree {:?}", path); - let find_or_create_worktree = project.update(cx, |project, cx| { - project.find_or_create_local_worktree(path, true, cx) - }); - if rng.lock().gen() { - cx.background().spawn(find_or_create_worktree).detach(); - } else { - find_or_create_worktree.await?; - } - } - 20..=79 if !files.is_empty() => { - let buffer = if client.buffers.is_empty() || rng.lock().gen() { - let file = files.choose(&mut *rng.lock()).unwrap(); - let (worktree, path) = project - .update(cx, |project, cx| { - project.find_or_create_local_worktree(file.clone(), true, cx) - }) - .await?; - let project_path = - worktree.read_with(cx, |worktree, _| (worktree.id(), path)); - log::info!( - "Host: opening path {:?}, worktree {}, relative_path {:?}", - file, - project_path.0, - project_path.1 - ); - let buffer = project - .update(cx, |project, cx| project.open_buffer(project_path, cx)) - .await - .unwrap(); - client.buffers.insert(buffer.clone()); - buffer - } else { - client - .buffers - .iter() - .choose(&mut *rng.lock()) - .unwrap() - .clone() - }; - - if rng.lock().gen_bool(0.1) { - cx.update(|cx| { - log::info!( - "Host: dropping buffer {:?}", - buffer.read(cx).file().unwrap().full_path(cx) - ); - client.buffers.remove(&buffer); - drop(buffer); - }); - } else { - buffer.update(cx, |buffer, cx| { - log::info!( - "Host: updating buffer {:?} ({})", - buffer.file().unwrap().full_path(cx), - buffer.remote_id() - ); - - if rng.lock().gen_bool(0.7) { - buffer.randomly_edit(&mut *rng.lock(), 5, cx); - } else { - buffer.randomly_undo_redo(&mut *rng.lock(), cx); - } - }); - } - } - _ => loop { - let path_component_count = rng.lock().gen_range::(1..=5); - let mut path = PathBuf::new(); - path.push("/"); - for _ in 0..path_component_count { - let letter = rng.lock().gen_range(b'a'..=b'z'); - path.push(std::str::from_utf8(&[letter]).unwrap()); - } - path.set_extension("rs"); - let parent_path = path.parent().unwrap(); - - log::info!("Host: creating file {:?}", path,); - - if fs.create_dir(parent_path).await.is_ok() - && fs.create_file(&path, Default::default()).await.is_ok() - { - break; - } else { - log::info!("Host: cannot create file"); - } - }, + let active_call = cx.read(ActiveCall::global); + if active_call.read_with(cx, |call, _| call.incoming().borrow().is_some()) { + if rng.lock().gen() { + log::info!("{}: accepting incoming call", username); + active_call + .update(cx, |call, cx| call.accept_incoming(cx)) + .await?; + } else { + log::info!("{}: declining incoming call", username); + active_call.update(cx, |call, _| call.decline_incoming())?; } + } else { + let available_contacts = client.user_store.read_with(cx, |user_store, _| { + user_store + .contacts() + .iter() + .filter(|contact| contact.online && !contact.busy) + .cloned() + .collect::>() + }); - cx.background().simulate_random_delay().await; + let distribution = rng.lock().gen_range(0..100); + match distribution { + 0..=29 if !available_contacts.is_empty() => { + let contact = available_contacts.choose(&mut *rng.lock()).unwrap(); + log::info!("{}: inviting {}", username, contact.user.github_login); + active_call + .update(cx, |call, cx| call.invite(contact.user.id, None, cx)) + .await?; + } + 30..=39 if active_call.read_with(cx, |call, _| call.room().is_some()) => { + log::info!("{}: hanging up", username); + active_call.update(cx, |call, cx| call.hang_up(cx))?; + } + _ => {} + } } - Ok(()) - } - - let result = - simulate_host_internal(&mut self, project.clone(), op_start_signal, rng, &mut cx).await; - log::info!("Host done"); - (self, project, cx, result.err()) - } - - pub async fn simulate_guest( - mut self, - guest_username: String, - project: ModelHandle, - op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, - rng: Arc>, - mut cx: TestAppContext, - ) -> ( - Self, - ModelHandle, - TestAppContext, - Option, - ) { - async fn simulate_guest_internal( - client: &mut TestClient, - guest_username: &str, - project: ModelHandle, - mut op_start_signal: futures::channel::mpsc::UnboundedReceiver<()>, - rng: Arc>, - cx: &mut TestAppContext, - ) -> anyhow::Result<()> { - while op_start_signal.next().await.is_some() { - let buffer = if client.buffers.is_empty() || rng.lock().gen() { - let worktree = if let Some(worktree) = project.read_with(cx, |project, cx| { - project - .worktrees(cx) - .filter(|worktree| { - let worktree = worktree.read(cx); - worktree.is_visible() - && worktree.entries(false).any(|e| e.is_file()) - }) - .choose(&mut *rng.lock()) - }) { - worktree + let remote_projects = + if let Some(room) = active_call.read_with(cx, |call, _| call.room().cloned()) { + room.read_with(cx, |room, _| { + room.remote_participants() + .values() + .flat_map(|participant| participant.projects.clone()) + .collect::>() + }) + } else { + Default::default() + }; + let project = if remote_projects.is_empty() || rng.lock().gen() { + if client.local_projects.is_empty() || rng.lock().gen() { + let dir_paths = client.fs.directories().await; + let local_project = if dir_paths.is_empty() || rng.lock().gen() { + let root_path = format!( + "/{}-root-{}", + username, + post_inc(&mut client.next_root_dir_id) + ); + let root_path = Path::new(&root_path); + client.fs.create_dir(root_path).await.unwrap(); + client + .fs + .create_file(&root_path.join("main.rs"), Default::default()) + .await + .unwrap(); + log::info!("{}: opening local project at {:?}", username, root_path); + client.build_local_project(root_path, cx).await.0 } else { - cx.background().simulate_random_delay().await; - continue; + let root_path = dir_paths.choose(&mut *rng.lock()).unwrap(); + log::info!("{}: opening local project at {:?}", username, root_path); + client.build_local_project(root_path, cx).await.0 }; - - let (worktree_root_name, project_path) = - worktree.read_with(cx, |worktree, _| { - let entry = worktree - .entries(false) - .filter(|e| e.is_file()) - .choose(&mut *rng.lock()) - .unwrap(); - ( - worktree.root_name().to_string(), - (worktree.id(), entry.path.clone()), - ) - }); - log::info!( - "{}: opening path {:?} in worktree {} ({})", - guest_username, - project_path.1, - project_path.0, - worktree_root_name, - ); - let buffer = project - .update(cx, |project, cx| { - project.open_buffer(project_path.clone(), cx) - }) - .await?; - log::info!( - "{}: opened path {:?} in worktree {} ({}) with buffer id {}", - guest_username, - project_path.1, - project_path.0, - worktree_root_name, - buffer.read_with(cx, |buffer, _| buffer.remote_id()) - ); - client.buffers.insert(buffer.clone()); - buffer + client.local_projects.push(local_project.clone()); + local_project } else { client - .buffers - .iter() + .local_projects .choose(&mut *rng.lock()) .unwrap() .clone() + } + } else { + if client.remote_projects.is_empty() || rng.lock().gen() { + let remote_project_id = remote_projects.choose(&mut *rng.lock()).unwrap().id; + let remote_project = if let Some(project) = + client.remote_projects.iter().find(|project| { + project.read_with(cx, |project, _| { + project.remote_id() == Some(remote_project_id) + }) + }) { + project.clone() + } else { + log::info!("{}: opening remote project {}", username, remote_project_id); + let remote_project = Project::remote( + remote_project_id, + client.client.clone(), + client.user_store.clone(), + client.project_store.clone(), + client.language_registry.clone(), + FakeFs::new(cx.background()), + cx.to_async(), + ) + .await?; + client.remote_projects.push(remote_project.clone()); + remote_project + }; + + remote_project + } else { + client + .remote_projects + .choose(&mut *rng.lock()) + .unwrap() + .clone() + } + }; + if let Err(error) = active_call + .update(cx, |call, cx| call.share_project(project.clone(), cx)) + .await + { + log::error!("{}: error sharing project, {:?}", username, error); + } + + let buffers = client.buffers.entry(project.clone()).or_default(); + let buffer = if buffers.is_empty() || rng.lock().gen() { + let worktree = if let Some(worktree) = project.read_with(cx, |project, cx| { + project + .worktrees(cx) + .filter(|worktree| { + let worktree = worktree.read(cx); + worktree.is_visible() && worktree.entries(false).any(|e| e.is_file()) + }) + .choose(&mut *rng.lock()) + }) { + worktree + } else { + cx.background().simulate_random_delay().await; + return Ok(()); }; - let choice = rng.lock().gen_range(0..100); - match choice { - 0..=9 => { - cx.update(|cx| { - log::info!( - "{}: dropping buffer {:?}", - guest_username, - buffer.read(cx).file().unwrap().full_path(cx) - ); - client.buffers.remove(&buffer); - drop(buffer); - }); - } - 10..=19 => { - let completions = project.update(cx, |project, cx| { - log::info!( - "{}: requesting completions for buffer {} ({:?})", - guest_username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.completions(&buffer, offset, cx) - }); - let completions = cx.background().spawn(async move { - completions - .await - .map_err(|err| anyhow!("completions request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching completions request", guest_username); - cx.update(|cx| completions.detach_and_log_err(cx)); - } else { - completions.await?; - } - } - 20..=29 => { - let code_actions = project.update(cx, |project, cx| { - log::info!( - "{}: requesting code actions for buffer {} ({:?})", - guest_username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock()); - project.code_actions(&buffer, range, cx) - }); - let code_actions = cx.background().spawn(async move { - code_actions - .await - .map_err(|err| anyhow!("code actions request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching code actions request", guest_username); - cx.update(|cx| code_actions.detach_and_log_err(cx)); - } else { - code_actions.await?; - } - } - 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => { - let (requested_version, save) = buffer.update(cx, |buffer, cx| { - log::info!( - "{}: saving buffer {} ({:?})", - guest_username, - buffer.remote_id(), - buffer.file().unwrap().full_path(cx) - ); - (buffer.version(), buffer.save(cx)) - }); - let save = cx.background().spawn(async move { - let (saved_version, _, _) = save - .await - .map_err(|err| anyhow!("save request failed: {:?}", err))?; - assert!(saved_version.observed_all(&requested_version)); - Ok::<_, anyhow::Error>(()) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching save request", guest_username); - cx.update(|cx| save.detach_and_log_err(cx)); - } else { - save.await?; - } - } - 40..=44 => { - let prepare_rename = project.update(cx, |project, cx| { - log::info!( - "{}: preparing rename for buffer {} ({:?})", - guest_username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.prepare_rename(buffer, offset, cx) - }); - let prepare_rename = cx.background().spawn(async move { - prepare_rename - .await - .map_err(|err| anyhow!("prepare rename request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching prepare rename request", guest_username); - cx.update(|cx| prepare_rename.detach_and_log_err(cx)); - } else { - prepare_rename.await?; - } - } - 45..=49 => { - let definitions = project.update(cx, |project, cx| { - log::info!( - "{}: requesting definitions for buffer {} ({:?})", - guest_username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.definition(&buffer, offset, cx) - }); - let definitions = cx.background().spawn(async move { - definitions - .await - .map_err(|err| anyhow!("definitions request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching definitions request", guest_username); - cx.update(|cx| definitions.detach_and_log_err(cx)); - } else { - client.buffers.extend( - definitions.await?.into_iter().map(|loc| loc.target.buffer), - ); - } - } - 50..=54 => { - let highlights = project.update(cx, |project, cx| { - log::info!( - "{}: requesting highlights for buffer {} ({:?})", - guest_username, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.document_highlights(&buffer, offset, cx) - }); - let highlights = cx.background().spawn(async move { - highlights - .await - .map_err(|err| anyhow!("highlights request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching highlights request", guest_username); - cx.update(|cx| highlights.detach_and_log_err(cx)); - } else { - highlights.await?; - } - } - 55..=59 => { - let search = project.update(cx, |project, cx| { - let query = rng.lock().gen_range('a'..='z'); - log::info!("{}: project-wide search {:?}", guest_username, query); - project.search(SearchQuery::text(query, false, false), cx) - }); - let search = cx.background().spawn(async move { - search - .await - .map_err(|err| anyhow!("search request failed: {:?}", err)) - }); - if rng.lock().gen_bool(0.3) { - log::info!("{}: detaching search request", guest_username); - cx.update(|cx| search.detach_and_log_err(cx)); - } else { - client.buffers.extend(search.await?.into_keys()); - } - } - 60..=69 => { - let worktree = project - .read_with(cx, |project, cx| { - project - .worktrees(cx) - .filter(|worktree| { - let worktree = worktree.read(cx); - worktree.is_visible() - && worktree.entries(false).any(|e| e.is_file()) - && worktree.root_entry().map_or(false, |e| e.is_dir()) - }) - .choose(&mut *rng.lock()) - }) - .unwrap(); - let (worktree_id, worktree_root_name) = worktree - .read_with(cx, |worktree, _| { - (worktree.id(), worktree.root_name().to_string()) - }); + let (worktree_root_name, project_path) = worktree.read_with(cx, |worktree, _| { + let entry = worktree + .entries(false) + .filter(|e| e.is_file()) + .choose(&mut *rng.lock()) + .unwrap(); + ( + worktree.root_name().to_string(), + (worktree.id(), entry.path.clone()), + ) + }); + log::info!( + "{}: opening path {:?} in worktree {} ({})", + username, + project_path.1, + project_path.0, + worktree_root_name, + ); + let buffer = project + .update(cx, |project, cx| { + project.open_buffer(project_path.clone(), cx) + }) + .await?; + log::info!( + "{}: opened path {:?} in worktree {} ({}) with buffer id {}", + username, + project_path.1, + project_path.0, + worktree_root_name, + buffer.read_with(cx, |buffer, _| buffer.remote_id()) + ); + buffers.insert(buffer.clone()); + buffer + } else { + buffers.iter().choose(&mut *rng.lock()).unwrap().clone() + }; - let mut new_name = String::new(); - for _ in 0..10 { - let letter = rng.lock().gen_range('a'..='z'); - new_name.push(letter); - } - let mut new_path = PathBuf::new(); - new_path.push(new_name); - new_path.set_extension("rs"); + let choice = rng.lock().gen_range(0..100); + match choice { + 0..=9 => { + cx.update(|cx| { log::info!( - "{}: creating {:?} in worktree {} ({})", - guest_username, - new_path, - worktree_id, - worktree_root_name, + "{}: dropping buffer {:?}", + username, + buffer.read(cx).file().unwrap().full_path(cx) ); - project - .update(cx, |project, cx| { - project.create_entry((worktree_id, new_path), false, cx) - }) - .unwrap() - .await?; - } - _ => { - buffer.update(cx, |buffer, cx| { - log::info!( - "{}: updating buffer {} ({:?})", - guest_username, - buffer.remote_id(), - buffer.file().unwrap().full_path(cx) - ); - if rng.lock().gen_bool(0.7) { - buffer.randomly_edit(&mut *rng.lock(), 5, cx); - } else { - buffer.randomly_undo_redo(&mut *rng.lock(), cx); - } - }); + buffers.remove(&buffer); + drop(buffer); + }); + } + 10..=19 => { + let completions = project.update(cx, |project, cx| { + log::info!( + "{}: requesting completions for buffer {} ({:?})", + username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.completions(&buffer, offset, cx) + }); + let completions = cx.background().spawn(async move { + completions + .await + .map_err(|err| anyhow!("completions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching completions request", username); + cx.update(|cx| completions.detach_and_log_err(cx)); + } else { + completions.await?; } } - cx.background().simulate_random_delay().await; + 20..=29 => { + let code_actions = project.update(cx, |project, cx| { + log::info!( + "{}: requesting code actions for buffer {} ({:?})", + username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock()); + project.code_actions(&buffer, range, cx) + }); + let code_actions = cx.background().spawn(async move { + code_actions + .await + .map_err(|err| anyhow!("code actions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching code actions request", username); + cx.update(|cx| code_actions.detach_and_log_err(cx)); + } else { + code_actions.await?; + } + } + 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => { + let (requested_version, save) = buffer.update(cx, |buffer, cx| { + log::info!( + "{}: saving buffer {} ({:?})", + username, + buffer.remote_id(), + buffer.file().unwrap().full_path(cx) + ); + (buffer.version(), buffer.save(cx)) + }); + let save = cx.background().spawn(async move { + let (saved_version, _, _) = save + .await + .map_err(|err| anyhow!("save request failed: {:?}", err))?; + assert!(saved_version.observed_all(&requested_version)); + Ok::<_, anyhow::Error>(()) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching save request", username); + cx.update(|cx| save.detach_and_log_err(cx)); + } else { + save.await?; + } + } + 40..=44 => { + let prepare_rename = project.update(cx, |project, cx| { + log::info!( + "{}: preparing rename for buffer {} ({:?})", + username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.prepare_rename(buffer, offset, cx) + }); + let prepare_rename = cx.background().spawn(async move { + prepare_rename + .await + .map_err(|err| anyhow!("prepare rename request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching prepare rename request", username); + cx.update(|cx| prepare_rename.detach_and_log_err(cx)); + } else { + prepare_rename.await?; + } + } + 45..=49 => { + let definitions = project.update(cx, |project, cx| { + log::info!( + "{}: requesting definitions for buffer {} ({:?})", + username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.definition(&buffer, offset, cx) + }); + let definitions = cx.background().spawn(async move { + definitions + .await + .map_err(|err| anyhow!("definitions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching definitions request", username); + cx.update(|cx| definitions.detach_and_log_err(cx)); + } else { + buffers.extend(definitions.await?.into_iter().map(|loc| loc.target.buffer)); + } + } + 50..=54 => { + let highlights = project.update(cx, |project, cx| { + log::info!( + "{}: requesting highlights for buffer {} ({:?})", + username, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.document_highlights(&buffer, offset, cx) + }); + let highlights = cx.background().spawn(async move { + highlights + .await + .map_err(|err| anyhow!("highlights request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching highlights request", username); + cx.update(|cx| highlights.detach_and_log_err(cx)); + } else { + highlights.await?; + } + } + 55..=59 => { + let search = project.update(cx, |project, cx| { + let query = rng.lock().gen_range('a'..='z'); + log::info!("{}: project-wide search {:?}", username, query); + project.search(SearchQuery::text(query, false, false), cx) + }); + let search = cx.background().spawn(async move { + search + .await + .map_err(|err| anyhow!("search request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("{}: detaching search request", username); + cx.update(|cx| search.detach_and_log_err(cx)); + } else { + buffers.extend(search.await?.into_keys()); + } + } + 60..=69 => { + let worktree = project + .read_with(cx, |project, cx| { + project + .worktrees(cx) + .filter(|worktree| { + let worktree = worktree.read(cx); + worktree.is_visible() + && worktree.entries(false).any(|e| e.is_file()) + && worktree.root_entry().map_or(false, |e| e.is_dir()) + }) + .choose(&mut *rng.lock()) + }) + .unwrap(); + let (worktree_id, worktree_root_name) = worktree + .read_with(cx, |worktree, _| { + (worktree.id(), worktree.root_name().to_string()) + }); + + let mut new_name = String::new(); + for _ in 0..10 { + let letter = rng.lock().gen_range('a'..='z'); + new_name.push(letter); + } + + let is_dir = rng.lock().gen::(); + let mut new_path = PathBuf::new(); + new_path.push(new_name); + if !is_dir { + new_path.set_extension("rs"); + } + log::info!( + "{}: creating {:?} in worktree {} ({})", + username, + new_path, + worktree_id, + worktree_root_name, + ); + project + .update(cx, |project, cx| { + project.create_entry((worktree_id, new_path), is_dir, cx) + }) + .unwrap() + .await?; + } + _ => { + buffer.update(cx, |buffer, cx| { + log::info!( + "{}: updating buffer {} ({:?})", + username, + buffer.remote_id(), + buffer.file().unwrap().full_path(cx) + ); + if rng.lock().gen_bool(0.7) { + buffer.randomly_edit(&mut *rng.lock(), 5, cx); + } else { + buffer.randomly_undo_redo(&mut *rng.lock(), cx); + } + }); + } } + Ok(()) } - let result = simulate_guest_internal( - &mut self, - &guest_username, - project.clone(), - op_start_signal, - rng, - &mut cx, - ) - .await; - log::info!("{}: done", guest_username); + // Setup language server + let mut language = Language::new( + LanguageConfig { + name: "Rust".into(), + path_suffixes: vec!["rs".to_string()], + ..Default::default() + }, + None, + ); + let _fake_language_servers = language + .set_fake_lsp_adapter(Arc::new(FakeLspAdapter { + name: "the-fake-language-server", + capabilities: lsp::LanguageServer::full_capabilities(), + initializer: Some(Box::new({ + let rng = rng.clone(); + let fs = self.fs.clone(); + move |fake_server: &mut FakeLanguageServer| { + fake_server.handle_request::( + |_, _| async move { + Ok(Some(lsp::CompletionResponse::Array(vec![ + lsp::CompletionItem { + text_edit: Some(lsp::CompletionTextEdit::Edit( + lsp::TextEdit { + range: lsp::Range::new( + lsp::Position::new(0, 0), + lsp::Position::new(0, 0), + ), + new_text: "the-new-text".to_string(), + }, + )), + ..Default::default() + }, + ]))) + }, + ); - (self, project, cx, result.err()) + fake_server.handle_request::( + |_, _| async move { + Ok(Some(vec![lsp::CodeActionOrCommand::CodeAction( + lsp::CodeAction { + title: "the-code-action".to_string(), + ..Default::default() + }, + )])) + }, + ); + + fake_server.handle_request::( + |params, _| async move { + Ok(Some(lsp::PrepareRenameResponse::Range(lsp::Range::new( + params.position, + params.position, + )))) + }, + ); + + fake_server.handle_request::({ + let fs = fs.clone(); + let rng = rng.clone(); + move |_, _| { + let fs = fs.clone(); + let rng = rng.clone(); + async move { + let files = fs.files().await; + let mut rng = rng.lock(); + let count = rng.gen_range::(1..3); + let files = (0..count) + .map(|_| files.choose(&mut *rng).unwrap()) + .collect::>(); + log::info!("LSP: Returning definitions in files {:?}", &files); + Ok(Some(lsp::GotoDefinitionResponse::Array( + files + .into_iter() + .map(|file| lsp::Location { + uri: lsp::Url::from_file_path(file).unwrap(), + range: Default::default(), + }) + .collect(), + ))) + } + } + }); + + fake_server.handle_request::( + { + let rng = rng.clone(); + move |_, _| { + let mut highlights = Vec::new(); + let highlight_count = rng.lock().gen_range(1..=5); + for _ in 0..highlight_count { + let start_row = rng.lock().gen_range(0..100); + let start_column = rng.lock().gen_range(0..100); + let start = PointUtf16::new(start_row, start_column); + let end_row = rng.lock().gen_range(0..100); + let end_column = rng.lock().gen_range(0..100); + let end = PointUtf16::new(end_row, end_column); + let range = + if start > end { end..start } else { start..end }; + highlights.push(lsp::DocumentHighlight { + range: range_to_lsp(range.clone()), + kind: Some(lsp::DocumentHighlightKind::READ), + }); + } + highlights.sort_unstable_by_key(|highlight| { + (highlight.range.start, highlight.range.end) + }); + async move { Ok(Some(highlights)) } + } + }, + ); + } + })), + ..Default::default() + })) + .await; + self.language_registry.add(Arc::new(language)); + + while op_start_signal.next().await.is_some() { + if let Err(error) = tick(&mut self, &username, rng.clone(), &mut cx).await { + log::error!("{} error: {:?}", username, error); + } + + cx.background().simulate_random_delay().await; + } + log::info!("{}: done", username); + + (self, cx) } } diff --git a/crates/collab/src/rpc.rs b/crates/collab/src/rpc.rs index 059a1d46e6..9fd9bef825 100644 --- a/crates/collab/src/rpc.rs +++ b/crates/collab/src/rpc.rs @@ -1041,17 +1041,19 @@ impl Server { for conn_id in project.connection_ids() { if conn_id != request.sender_id { - self.peer.send( - conn_id, - proto::AddProjectCollaborator { - project_id: project_id.to_proto(), - collaborator: Some(proto::Collaborator { - peer_id: request.sender_id.0, - replica_id: replica_id as u32, - user_id: guest_user_id.to_proto(), - }), - }, - )?; + self.peer + .send( + conn_id, + proto::AddProjectCollaborator { + project_id: project_id.to_proto(), + collaborator: Some(proto::Collaborator { + peer_id: request.sender_id.0, + replica_id: replica_id as u32, + user_id: guest_user_id.to_proto(), + }), + }, + ) + .trace_err(); } } diff --git a/crates/collab/src/rpc/store.rs b/crates/collab/src/rpc/store.rs index a7abce7094..c9358ddc2a 100644 --- a/crates/collab/src/rpc/store.rs +++ b/crates/collab/src/rpc/store.rs @@ -674,8 +674,13 @@ impl Store { .connected_users .get_mut(&recipient_user_id) .ok_or_else(|| anyhow!("no such connection"))?; - if let Some(active_call) = recipient.active_call.take() { + if let Some(active_call) = recipient.active_call { anyhow::ensure!(active_call.room_id == room_id, "no such room"); + anyhow::ensure!( + active_call.connection_id.is_none(), + "cannot decline a call after joining room" + ); + recipient.active_call.take(); let recipient_connection_ids = self .connection_ids_for_user(recipient_user_id) .collect::>(); @@ -1196,7 +1201,9 @@ impl Store { assert!( self.connections .contains_key(&ConnectionId(participant.peer_id)), - "room contains participant that has disconnected" + "room {} contains participant {:?} that has disconnected", + room_id, + participant ); for participant_project in &participant.projects { diff --git a/crates/fs/src/fs.rs b/crates/fs/src/fs.rs index 7bd7346b4b..d44aebce0f 100644 --- a/crates/fs/src/fs.rs +++ b/crates/fs/src/fs.rs @@ -631,6 +631,21 @@ impl FakeFs { } } + pub async fn directories(&self) -> Vec { + let mut result = Vec::new(); + let mut queue = collections::VecDeque::new(); + queue.push_back((PathBuf::from("/"), self.state.lock().await.root.clone())); + while let Some((path, entry)) = queue.pop_front() { + if let FakeFsEntry::Dir { entries, .. } = &*entry.lock().await { + for (name, entry) in entries { + queue.push_back((path.join(name), entry.clone())); + } + result.push(path); + } + } + result + } + pub async fn files(&self) -> Vec { let mut result = Vec::new(); let mut queue = collections::VecDeque::new();