From 717f53e3d28b8e48ac2e0888b2fc478e42f54336 Mon Sep 17 00:00:00 2001 From: Nathan Sobo Date: Thu, 7 Apr 2022 09:29:47 -0600 Subject: [PATCH 01/11] WIP --- crates/server/Cargo.toml | 2 +- crates/server/src/rpc.rs | 73 ++++++++++++++++++++++++++++------------ 2 files changed, 52 insertions(+), 23 deletions(-) diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 6e27fa16c5..f6d10517bd 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -14,7 +14,6 @@ required-features = ["seed-support"] [dependencies] collections = { path = "../collections" } -settings = { path = "../settings" } rpc = { path = "../rpc" } anyhow = "1.0.40" async-io = "1.3" @@ -65,6 +64,7 @@ editor = { path = "../editor", features = ["test-support"] } language = { path = "../language", features = ["test-support"] } lsp = { path = "../lsp", features = ["test-support"] } project = { path = "../project", features = ["test-support"] } +settings = { path = "../settings", features = ["test-support"] } workspace = { path = "../workspace", features = ["test-support"] } ctor = "0.1" env_logger = "0.8" diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 51c7807660..61a1e7cc65 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1080,7 +1080,7 @@ mod tests { use ::rpc::Peer; use client::{ self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials, - EstablishConnectionError, UserStore, + EstablishConnectionError, UserStore, RECEIVE_TIMEOUT, }; use collections::BTreeMap; use editor::{ @@ -4983,6 +4983,7 @@ mod tests { let operations = Rc::new(Cell::new(0)); let mut server = TestServer::start(cx.foreground(), cx.background()).await; let mut clients = Vec::new(); + let mut user_ids = Vec::new(); let files = Arc::new(Mutex::new(Vec::new())); let mut next_entity_id = 100000; @@ -5150,13 +5151,14 @@ mod tests { }); host_language_registry.add(Arc::new(language)); + user_ids.push(host.current_user_id(&host_cx)); clients.push(cx.foreground().spawn(host.simulate_host( host_project, files, operations.clone(), max_operations, rng.clone(), - host_cx, + &mut host_cx, ))); while operations.get() < max_operations { @@ -5191,6 +5193,7 @@ mod tests { ) .await .unwrap(); + user_ids.push(guest.current_user_id(&guest_cx)); clients.push(cx.foreground().spawn(guest.simulate_guest( guest_id, guest_project, @@ -5201,6 +5204,30 @@ mod tests { ))); log::info!("Guest {} added", guest_id); + } else if rng.lock().gen_bool(0.05) { + server.disconnect_client(user_ids[0]); + cx.foreground().advance_clock(RECEIVE_TIMEOUT); + let mut clients = futures::future::join_all(clients).await; + cx.foreground().run_until_parked(); + + let store = server.store.read(); + let (host, host_cx) = clients.remove(0); + host.project + .as_ref() + .unwrap() + .read_with(&host_cx, |project, _| assert!(!project.is_shared())); + for (guest, guest_cx) in clients { + assert!(store + .contacts_for_user(guest.current_user_id(&guest_cx)) + .is_empty()); + guest + .project + .as_ref() + .unwrap() + .read_with(&guest_cx, |project, _| assert!(project.is_read_only())); + } + + return; } } @@ -5471,6 +5498,14 @@ mod tests { } } + impl Deref for TestServer { + type Target = Server; + + fn deref(&self) -> &Self::Target { + &self.server + } + } + impl Drop for TestServer { fn drop(&mut self) { self.peer.reset(); @@ -5584,15 +5619,15 @@ mod tests { } async fn simulate_host( - mut self, + &mut self, project: ModelHandle, files: Arc>>, operations: Rc>, max_operations: usize, rng: Arc>, - mut cx: TestAppContext, - ) -> (Self, TestAppContext) { - let fs = project.read_with(&cx, |project, _| project.fs().clone()); + cx: &mut TestAppContext, + ) -> anyhow::Result<()> { + let fs = project.read_with(cx, |project, _| project.fs().clone()); while operations.get() < max_operations { operations.set(operations.get() + 1); @@ -5609,29 +5644,25 @@ mod tests { } log::info!("Host: find/create local worktree {:?}", path); - let find_or_create_worktree = project.update(&mut cx, |project, cx| { + let find_or_create_worktree = project.update(cx, |project, cx| { project.find_or_create_local_worktree(path, true, cx) }); - let find_or_create_worktree = async move { - find_or_create_worktree.await.unwrap(); - }; if rng.lock().gen() { cx.background().spawn(find_or_create_worktree).detach(); } else { - find_or_create_worktree.await; + find_or_create_worktree.await?; } } 10..=80 if !files.lock().is_empty() => { let buffer = if self.buffers.is_empty() || rng.lock().gen() { let file = files.lock().choose(&mut *rng.lock()).unwrap().clone(); let (worktree, path) = project - .update(&mut cx, |project, cx| { + .update(cx, |project, cx| { project.find_or_create_local_worktree(file.clone(), true, cx) }) - .await - .unwrap(); + .await?; let project_path = - worktree.read_with(&cx, |worktree, _| (worktree.id(), path)); + worktree.read_with(cx, |worktree, _| (worktree.id(), path)); log::info!( "Host: opening path {:?}, worktree {}, relative_path {:?}", file, @@ -5639,9 +5670,7 @@ mod tests { project_path.1 ); let buffer = project - .update(&mut cx, |project, cx| { - project.open_buffer(project_path, cx) - }) + .update(cx, |project, cx| project.open_buffer(project_path, cx)) .await .unwrap(); self.buffers.insert(buffer.clone()); @@ -5664,7 +5693,7 @@ mod tests { drop(buffer); }); } else { - buffer.update(&mut cx, |buffer, cx| { + buffer.update(cx, |buffer, cx| { log::info!( "Host: updating buffer {:?} ({})", buffer.file().unwrap().full_path(cx), @@ -5701,10 +5730,10 @@ mod tests { cx.background().simulate_random_delay().await; } - log::info!("Host done"); - self.project = Some(project); - (self, cx) + + log::info!("Host done"); + Ok(()) } pub async fn simulate_guest( From fae9048a2a95f12625b7dabb28f8e3f941b16f0f Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 11:27:53 +0200 Subject: [PATCH 02/11] Remove non-determinism from `Peer` caused by using std's `HashMap` --- Cargo.lock | 1 + crates/rpc/Cargo.toml | 10 ++++++---- crates/rpc/src/peer.rs | 8 +++++--- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ec9c58b1a9..7cbba16a00 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4007,6 +4007,7 @@ dependencies = [ "async-tungstenite", "base64 0.13.0", "clock", + "collections", "futures", "gpui", "log", diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index e773b3f0ba..9a2cb165c7 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -9,9 +9,13 @@ path = "src/rpc.rs" doctest = false [features] -test-support = ["gpui/test-support"] +test-support = ["collections/test-support", "gpui/test-support"] [dependencies] +clock = { path = "../clock" } +collections = { path = "../collections" } +gpui = { path = "../gpui", optional = true } +util = { path = "../util" } anyhow = "1.0" async-lock = "2.4" async-tungstenite = "0.16" @@ -26,14 +30,12 @@ rsa = "0.4" serde = { version = "1", features = ["derive"] } smol-timeout = "0.6" zstd = "0.9" -clock = { path = "../clock" } -gpui = { path = "../gpui", optional = true } -util = { path = "../util" } [build-dependencies] prost-build = "0.8" [dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } gpui = { path = "../gpui", features = ["test-support"] } smol = "1.2.5" tempdir = "0.3.7" diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 726453bea8..3677f0feac 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -1,6 +1,9 @@ -use super::proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage}; -use super::Connection; +use super::{ + proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage}, + Connection, +}; use anyhow::{anyhow, Context, Result}; +use collections::HashMap; use futures::{channel::oneshot, stream::BoxStream, FutureExt as _, StreamExt}; use parking_lot::{Mutex, RwLock}; use postage::{ @@ -10,7 +13,6 @@ use postage::{ use smol_timeout::TimeoutExt as _; use std::sync::atomic::Ordering::SeqCst; use std::{ - collections::HashMap, fmt, future::Future, marker::PhantomData, From da976012a91c631dda735a3463f12de1e380a427 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 11:29:00 +0200 Subject: [PATCH 03/11] Allow `simulate_guest` and `simulate_host` to fail when host disconnects --- crates/server/src/rpc.rs | 739 ++++++++++++++++++++++----------------- 1 file changed, 411 insertions(+), 328 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 61a1e7cc65..912d576322 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1118,6 +1118,7 @@ mod tests { }, time::Duration, }; + use util::TryFutureExt; use workspace::{Item, SplitDirection, Workspace, WorkspaceParams}; #[cfg(test)] @@ -5151,6 +5152,7 @@ mod tests { }); host_language_registry.add(Arc::new(language)); + let host_disconnected = Rc::new(AtomicBool::new(false)); user_ids.push(host.current_user_id(&host_cx)); clients.push(cx.foreground().spawn(host.simulate_host( host_project, @@ -5158,7 +5160,7 @@ mod tests { operations.clone(), max_operations, rng.clone(), - &mut host_cx, + host_cx, ))); while operations.get() < max_operations { @@ -5200,24 +5202,28 @@ mod tests { operations.clone(), max_operations, rng.clone(), + host_disconnected.clone(), guest_cx, ))); log::info!("Guest {} added", guest_id); } else if rng.lock().gen_bool(0.05) { + host_disconnected.store(true, SeqCst); + server.forbid_connections(); server.disconnect_client(user_ids[0]); cx.foreground().advance_clock(RECEIVE_TIMEOUT); let mut clients = futures::future::join_all(clients).await; cx.foreground().run_until_parked(); - let store = server.store.read(); - let (host, host_cx) = clients.remove(0); + let (host, mut host_cx) = clients.remove(0); host.project .as_ref() .unwrap() .read_with(&host_cx, |project, _| assert!(!project.is_shared())); - for (guest, guest_cx) in clients { - assert!(store + for (guest, mut guest_cx) in clients { + assert!(server + .store + .read() .contacts_for_user(guest.current_user_id(&guest_cx)) .is_empty()); guest @@ -5225,7 +5231,9 @@ mod tests { .as_ref() .unwrap() .read_with(&guest_cx, |project, _| assert!(project.is_read_only())); + guest_cx.update(|_| drop(guest)); } + host_cx.update(|_| drop(host)); return; } @@ -5619,121 +5627,148 @@ mod tests { } async fn simulate_host( - &mut self, + mut self, project: ModelHandle, files: Arc>>, operations: Rc>, max_operations: usize, rng: Arc>, - cx: &mut TestAppContext, - ) -> anyhow::Result<()> { - let fs = project.read_with(cx, |project, _| project.fs().clone()); - while operations.get() < max_operations { - operations.set(operations.get() + 1); + mut cx: TestAppContext, + ) -> (Self, TestAppContext) { + async fn simulate_host_internal( + client: &mut TestClient, + project: ModelHandle, + files: Arc>>, + operations: Rc>, + max_operations: usize, + rng: Arc>, + cx: &mut TestAppContext, + ) -> anyhow::Result<()> { + let fs = project.read_with(cx, |project, _| project.fs().clone()); + while operations.get() < max_operations { + operations.set(operations.get() + 1); - let distribution = rng.lock().gen_range::(0..100); - match distribution { - 0..=20 if !files.lock().is_empty() => { - let path = files.lock().choose(&mut *rng.lock()).unwrap().clone(); - let mut path = path.as_path(); - while let Some(parent_path) = path.parent() { - path = parent_path; + let distribution = rng.lock().gen_range::(0..100); + match distribution { + 0..=20 if !files.lock().is_empty() => { + let path = files.lock().choose(&mut *rng.lock()).unwrap().clone(); + let mut path = path.as_path(); + while let Some(parent_path) = path.parent() { + path = parent_path; + if rng.lock().gen() { + break; + } + } + + log::info!("Host: find/create local worktree {:?}", path); + let find_or_create_worktree = project.update(cx, |project, cx| { + project.find_or_create_local_worktree(path, true, cx) + }); if rng.lock().gen() { - break; + cx.background().spawn(find_or_create_worktree).detach(); + } else { + find_or_create_worktree.await?; } } - - log::info!("Host: find/create local worktree {:?}", path); - let find_or_create_worktree = project.update(cx, |project, cx| { - project.find_or_create_local_worktree(path, true, cx) - }); - if rng.lock().gen() { - cx.background().spawn(find_or_create_worktree).detach(); - } else { - find_or_create_worktree.await?; - } - } - 10..=80 if !files.lock().is_empty() => { - let buffer = if self.buffers.is_empty() || rng.lock().gen() { - let file = files.lock().choose(&mut *rng.lock()).unwrap().clone(); - let (worktree, path) = project - .update(cx, |project, cx| { - project.find_or_create_local_worktree(file.clone(), true, cx) - }) - .await?; - let project_path = - worktree.read_with(cx, |worktree, _| (worktree.id(), path)); - log::info!( - "Host: opening path {:?}, worktree {}, relative_path {:?}", - file, - project_path.0, - project_path.1 - ); - let buffer = project - .update(cx, |project, cx| project.open_buffer(project_path, cx)) - .await - .unwrap(); - self.buffers.insert(buffer.clone()); - buffer - } else { - self.buffers - .iter() - .choose(&mut *rng.lock()) - .unwrap() - .clone() - }; - - if rng.lock().gen_bool(0.1) { - cx.update(|cx| { + 10..=80 if !files.lock().is_empty() => { + let buffer = if client.buffers.is_empty() || rng.lock().gen() { + let file = files.lock().choose(&mut *rng.lock()).unwrap().clone(); + let (worktree, path) = project + .update(cx, |project, cx| { + project.find_or_create_local_worktree( + file.clone(), + true, + cx, + ) + }) + .await?; + let project_path = + worktree.read_with(cx, |worktree, _| (worktree.id(), path)); log::info!( - "Host: dropping buffer {:?}", - buffer.read(cx).file().unwrap().full_path(cx) + "Host: opening path {:?}, worktree {}, relative_path {:?}", + file, + project_path.0, + project_path.1 ); - self.buffers.remove(&buffer); - drop(buffer); - }); - } else { - buffer.update(cx, |buffer, cx| { - log::info!( - "Host: updating buffer {:?} ({})", - buffer.file().unwrap().full_path(cx), - buffer.remote_id() - ); - buffer.randomly_edit(&mut *rng.lock(), 5, cx) - }); + let buffer = project + .update(cx, |project, cx| project.open_buffer(project_path, cx)) + .await + .unwrap(); + client.buffers.insert(buffer.clone()); + buffer + } else { + client + .buffers + .iter() + .choose(&mut *rng.lock()) + .unwrap() + .clone() + }; + + if rng.lock().gen_bool(0.1) { + cx.update(|cx| { + log::info!( + "Host: dropping buffer {:?}", + buffer.read(cx).file().unwrap().full_path(cx) + ); + client.buffers.remove(&buffer); + drop(buffer); + }); + } else { + buffer.update(cx, |buffer, cx| { + log::info!( + "Host: updating buffer {:?} ({})", + buffer.file().unwrap().full_path(cx), + buffer.remote_id() + ); + buffer.randomly_edit(&mut *rng.lock(), 5, cx) + }); + } } + _ => loop { + let path_component_count = rng.lock().gen_range::(1..=5); + let mut path = PathBuf::new(); + path.push("/"); + for _ in 0..path_component_count { + let letter = rng.lock().gen_range(b'a'..=b'z'); + path.push(std::str::from_utf8(&[letter]).unwrap()); + } + path.set_extension("rs"); + let parent_path = path.parent().unwrap(); + + log::info!("Host: creating file {:?}", path,); + + if fs.create_dir(&parent_path).await.is_ok() + && fs.create_file(&path, Default::default()).await.is_ok() + { + files.lock().push(path); + break; + } else { + log::info!("Host: cannot create file"); + } + }, } - _ => loop { - let path_component_count = rng.lock().gen_range::(1..=5); - let mut path = PathBuf::new(); - path.push("/"); - for _ in 0..path_component_count { - let letter = rng.lock().gen_range(b'a'..=b'z'); - path.push(std::str::from_utf8(&[letter]).unwrap()); - } - path.set_extension("rs"); - let parent_path = path.parent().unwrap(); - log::info!("Host: creating file {:?}", path,); - - if fs.create_dir(&parent_path).await.is_ok() - && fs.create_file(&path, Default::default()).await.is_ok() - { - files.lock().push(path); - break; - } else { - log::info!("Host: cannot create file"); - } - }, + cx.background().simulate_random_delay().await; } - cx.background().simulate_random_delay().await; + Ok(()) } - self.project = Some(project); - + simulate_host_internal( + &mut self, + project.clone(), + files, + operations, + max_operations, + rng, + &mut cx, + ) + .log_err() + .await; log::info!("Host done"); - Ok(()) + self.project = Some(project); + (self, cx) } pub async fn simulate_guest( @@ -5743,244 +5778,292 @@ mod tests { operations: Rc>, max_operations: usize, rng: Arc>, + host_disconnected: Rc, mut cx: TestAppContext, ) -> (Self, TestAppContext) { - while operations.get() < max_operations { - let buffer = if self.buffers.is_empty() || rng.lock().gen() { - let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| { - project - .worktrees(&cx) - .filter(|worktree| { - let worktree = worktree.read(cx); - worktree.is_visible() - && worktree.entries(false).any(|e| e.is_file()) + async fn simulate_guest_internal( + client: &mut TestClient, + guest_id: usize, + project: ModelHandle, + operations: Rc>, + max_operations: usize, + rng: Arc>, + cx: &mut TestAppContext, + ) -> anyhow::Result<()> { + while operations.get() < max_operations { + let buffer = if client.buffers.is_empty() || rng.lock().gen() { + let worktree = if let Some(worktree) = + project.read_with(cx, |project, cx| { + project + .worktrees(&cx) + .filter(|worktree| { + let worktree = worktree.read(cx); + worktree.is_visible() + && worktree.entries(false).any(|e| e.is_file()) + }) + .choose(&mut *rng.lock()) + }) { + worktree + } else { + cx.background().simulate_random_delay().await; + continue; + }; + + operations.set(operations.get() + 1); + let (worktree_root_name, project_path) = + worktree.read_with(cx, |worktree, _| { + let entry = worktree + .entries(false) + .filter(|e| e.is_file()) + .choose(&mut *rng.lock()) + .unwrap(); + ( + worktree.root_name().to_string(), + (worktree.id(), entry.path.clone()), + ) + }); + log::info!( + "Guest {}: opening path {:?} in worktree {} ({})", + guest_id, + project_path.1, + project_path.0, + worktree_root_name, + ); + let buffer = project + .update(cx, |project, cx| { + project.open_buffer(project_path.clone(), cx) }) - .choose(&mut *rng.lock()) - }) { - worktree + .await?; + log::info!( + "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}", + guest_id, + project_path.1, + project_path.0, + worktree_root_name, + buffer.read_with(cx, |buffer, _| buffer.remote_id()) + ); + client.buffers.insert(buffer.clone()); + buffer } else { - cx.background().simulate_random_delay().await; - continue; + operations.set(operations.get() + 1); + + client + .buffers + .iter() + .choose(&mut *rng.lock()) + .unwrap() + .clone() }; - operations.set(operations.get() + 1); - let (worktree_root_name, project_path) = - worktree.read_with(&cx, |worktree, _| { - let entry = worktree - .entries(false) - .filter(|e| e.is_file()) - .choose(&mut *rng.lock()) - .unwrap(); - ( - worktree.root_name().to_string(), - (worktree.id(), entry.path.clone()), - ) - }); - log::info!( - "Guest {}: opening path {:?} in worktree {} ({})", - guest_id, - project_path.1, - project_path.0, - worktree_root_name, - ); - let buffer = project - .update(&mut cx, |project, cx| { - project.open_buffer(project_path.clone(), cx) - }) - .await - .unwrap(); - log::info!( - "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}", - guest_id, - project_path.1, - project_path.0, - worktree_root_name, - buffer.read_with(&cx, |buffer, _| buffer.remote_id()) - ); - self.buffers.insert(buffer.clone()); - buffer - } else { - operations.set(operations.get() + 1); - - self.buffers - .iter() - .choose(&mut *rng.lock()) - .unwrap() - .clone() - }; - - let choice = rng.lock().gen_range(0..100); - match choice { - 0..=9 => { - cx.update(|cx| { - log::info!( - "Guest {}: dropping buffer {:?}", - guest_id, - buffer.read(cx).file().unwrap().full_path(cx) - ); - self.buffers.remove(&buffer); - drop(buffer); - }); - } - 10..=19 => { - let completions = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: requesting completions for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.completions(&buffer, offset, cx) - }); - let completions = cx.background().spawn(async move { - completions.await.expect("completions request failed"); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching completions request", guest_id); - completions.detach(); - } else { - completions.await; + let choice = rng.lock().gen_range(0..100); + match choice { + 0..=9 => { + cx.update(|cx| { + log::info!( + "Guest {}: dropping buffer {:?}", + guest_id, + buffer.read(cx).file().unwrap().full_path(cx) + ); + client.buffers.remove(&buffer); + drop(buffer); + }); + } + 10..=19 => { + let completions = project.update(cx, |project, cx| { + log::info!( + "Guest {}: requesting completions for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.completions(&buffer, offset, cx) + }); + let completions = cx.background().spawn(async move { + completions + .await + .map_err(|err| anyhow!("completions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching completions request", guest_id); + cx.update(|cx| completions.detach_and_log_err(cx)); + } else { + completions.await?; + } + } + 20..=29 => { + let code_actions = project.update(cx, |project, cx| { + log::info!( + "Guest {}: requesting code actions for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock()); + project.code_actions(&buffer, range, cx) + }); + let code_actions = cx.background().spawn(async move { + code_actions.await.map_err(|err| { + anyhow!("code actions request failed: {:?}", err) + }) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching code actions request", guest_id); + cx.update(|cx| code_actions.detach_and_log_err(cx)); + } else { + code_actions.await?; + } + } + 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => { + let (requested_version, save) = buffer.update(cx, |buffer, cx| { + log::info!( + "Guest {}: saving buffer {} ({:?})", + guest_id, + buffer.remote_id(), + buffer.file().unwrap().full_path(cx) + ); + (buffer.version(), buffer.save(cx)) + }); + let save = cx.background().spawn(async move { + let (saved_version, _) = save + .await + .map_err(|err| anyhow!("save request failed: {:?}", err))?; + assert!(saved_version.observed_all(&requested_version)); + Ok::<_, anyhow::Error>(()) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching save request", guest_id); + cx.update(|cx| save.detach_and_log_err(cx)); + } else { + save.await?; + } + } + 40..=44 => { + let prepare_rename = project.update(cx, |project, cx| { + log::info!( + "Guest {}: preparing rename for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.prepare_rename(buffer, offset, cx) + }); + let prepare_rename = cx.background().spawn(async move { + prepare_rename.await.map_err(|err| { + anyhow!("prepare rename request failed: {:?}", err) + }) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching prepare rename request", guest_id); + cx.update(|cx| prepare_rename.detach_and_log_err(cx)); + } else { + prepare_rename.await?; + } + } + 45..=49 => { + let definitions = project.update(cx, |project, cx| { + log::info!( + "Guest {}: requesting definitions for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.definition(&buffer, offset, cx) + }); + let definitions = cx.background().spawn(async move { + definitions + .await + .map_err(|err| anyhow!("definitions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching definitions request", guest_id); + cx.update(|cx| definitions.detach_and_log_err(cx)); + } else { + client + .buffers + .extend(definitions.await?.into_iter().map(|loc| loc.buffer)); + } + } + 50..=54 => { + let highlights = project.update(cx, |project, cx| { + log::info!( + "Guest {}: requesting highlights for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.document_highlights(&buffer, offset, cx) + }); + let highlights = cx.background().spawn(async move { + highlights + .await + .map_err(|err| anyhow!("highlights request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching highlights request", guest_id); + cx.update(|cx| highlights.detach_and_log_err(cx)); + } else { + highlights.await?; + } + } + 55..=59 => { + let search = project.update(cx, |project, cx| { + let query = rng.lock().gen_range('a'..='z'); + log::info!("Guest {}: project-wide search {:?}", guest_id, query); + project.search(SearchQuery::text(query, false, false), cx) + }); + let search = cx.background().spawn(async move { + search + .await + .map_err(|err| anyhow!("search request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching search request", guest_id); + cx.update(|cx| search.detach_and_log_err(cx)); + } else { + client.buffers.extend(search.await?.into_keys()); + } + } + _ => { + buffer.update(cx, |buffer, cx| { + log::info!( + "Guest {}: updating buffer {} ({:?})", + guest_id, + buffer.remote_id(), + buffer.file().unwrap().full_path(cx) + ); + buffer.randomly_edit(&mut *rng.lock(), 5, cx) + }); } } - 20..=29 => { - let code_actions = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: requesting code actions for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock()); - project.code_actions(&buffer, range, cx) - }); - let code_actions = cx.background().spawn(async move { - code_actions.await.expect("code actions request failed"); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching code actions request", guest_id); - code_actions.detach(); - } else { - code_actions.await; - } - } - 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => { - let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| { - log::info!( - "Guest {}: saving buffer {} ({:?})", - guest_id, - buffer.remote_id(), - buffer.file().unwrap().full_path(cx) - ); - (buffer.version(), buffer.save(cx)) - }); - let save = cx.background().spawn(async move { - let (saved_version, _) = save.await.expect("save request failed"); - assert!(saved_version.observed_all(&requested_version)); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching save request", guest_id); - save.detach(); - } else { - save.await; - } - } - 40..=44 => { - let prepare_rename = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: preparing rename for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.prepare_rename(buffer, offset, cx) - }); - let prepare_rename = cx.background().spawn(async move { - prepare_rename.await.expect("prepare rename request failed"); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching prepare rename request", guest_id); - prepare_rename.detach(); - } else { - prepare_rename.await; - } - } - 45..=49 => { - let definitions = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: requesting definitions for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.definition(&buffer, offset, cx) - }); - let definitions = cx.background().spawn(async move { - definitions.await.expect("definitions request failed") - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching definitions request", guest_id); - definitions.detach(); - } else { - self.buffers - .extend(definitions.await.into_iter().map(|loc| loc.buffer)); - } - } - 50..=54 => { - let highlights = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: requesting highlights for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.document_highlights(&buffer, offset, cx) - }); - let highlights = cx.background().spawn(async move { - highlights.await.expect("highlights request failed"); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching highlights request", guest_id); - highlights.detach(); - } else { - highlights.await; - } - } - 55..=59 => { - let search = project.update(&mut cx, |project, cx| { - let query = rng.lock().gen_range('a'..='z'); - log::info!("Guest {}: project-wide search {:?}", guest_id, query); - project.search(SearchQuery::text(query, false, false), cx) - }); - let search = cx - .background() - .spawn(async move { search.await.expect("search request failed") }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching search request", guest_id); - search.detach(); - } else { - self.buffers.extend(search.await.into_keys()); - } - } - _ => { - buffer.update(&mut cx, |buffer, cx| { - log::info!( - "Guest {}: updating buffer {} ({:?})", - guest_id, - buffer.remote_id(), - buffer.file().unwrap().full_path(cx) - ); - buffer.randomly_edit(&mut *rng.lock(), 5, cx) - }); - } + cx.background().simulate_random_delay().await; } - cx.background().simulate_random_delay().await; + Ok(()) } - log::info!("Guest {} done", guest_id); + match simulate_guest_internal( + &mut self, + guest_id, + project.clone(), + operations, + max_operations, + rng, + &mut cx, + ) + .await + { + Ok(()) => log::info!("guest {} done", guest_id), + Err(err) => { + if host_disconnected.load(SeqCst) { + log::error!("guest {} simulation error - {:?}", guest_id, err); + } else { + panic!("guest {} simulation error - {:?}", guest_id, err); + } + } + } self.project = Some(project); (self, cx) From 3daaef02ca5e81999481cecfdb733e7c5b57946a Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 12:03:09 +0200 Subject: [PATCH 04/11] Replace `postage::oneshot` with `futures::channel::oneshot` This fixes an error in the randomized test that would cause the future returned from `Worktree::share` to never finish due to a bug in `postage` that causes its waker to not be notified upon drop. --- crates/project/src/worktree.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index beacc5a863..ac0c4bf8ff 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -11,7 +11,10 @@ use client::{proto, Client, TypedEnvelope}; use clock::ReplicaId; use collections::HashMap; use futures::{ - channel::mpsc::{self, UnboundedSender}, + channel::{ + mpsc::{self, UnboundedSender}, + oneshot, + }, Stream, StreamExt, }; use fuzzy::CharBag; @@ -26,7 +29,6 @@ use language::{ use lazy_static::lazy_static; use parking_lot::Mutex; use postage::{ - oneshot, prelude::{Sink as _, Stream as _}, watch, }; @@ -727,11 +729,11 @@ impl LocalWorktree { pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { let register = self.register(project_id, cx); - let (mut share_tx, mut share_rx) = oneshot::channel(); + let (share_tx, share_rx) = oneshot::channel(); let (snapshots_to_send_tx, snapshots_to_send_rx) = smol::channel::unbounded::(); if self.share.is_some() { - let _ = share_tx.try_send(Ok(())); + let _ = share_tx.send(Ok(())); } else { let rpc = self.client.clone(); let worktree_id = cx.model_id() as u64; @@ -756,15 +758,15 @@ impl LocalWorktree { }) .await { - let _ = share_tx.try_send(Err(error)); + let _ = share_tx.send(Err(error)); return Err(anyhow!("failed to send initial update worktree")); } else { - let _ = share_tx.try_send(Ok(())); + let _ = share_tx.send(Ok(())); snapshot } } Err(error) => { - let _ = share_tx.try_send(Err(error.into())); + let _ = share_tx.send(Err(error.into())); return Err(anyhow!("failed to send initial update worktree")); } }; @@ -804,9 +806,8 @@ impl LocalWorktree { }); } share_rx - .next() .await - .unwrap_or_else(|| Err(anyhow!("share ended"))) + .unwrap_or_else(|_| Err(anyhow!("share ended"))) }) } From 663beab1b9276991ef7c1ea70ba5a1ffc511b695 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 12:10:45 +0200 Subject: [PATCH 05/11] Avoid panicking when receiving a request for a dropped buffer --- crates/project/src/project.rs | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 3b2259d535..ef0531c36a 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -3850,7 +3850,7 @@ impl Project { let buffer = this .opened_buffers .get(&buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; Ok::<_, anyhow::Error>((project_id, buffer)) })?; @@ -3882,7 +3882,7 @@ impl Project { buffers.insert( this.opened_buffers .get(buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?, ); } @@ -3911,7 +3911,7 @@ impl Project { buffers.insert( this.opened_buffers .get(buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?, ); } @@ -3942,7 +3942,7 @@ impl Project { let buffer = this.read_with(&cx, |this, cx| { this.opened_buffers .get(&envelope.payload.buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id)) })?; buffer @@ -3972,7 +3972,7 @@ impl Project { let buffer = this .opened_buffers .get(&envelope.payload.buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; let language = buffer.read(cx).language(); let completion = language::proto::deserialize_completion( @@ -4014,7 +4014,7 @@ impl Project { let buffer = this.update(&mut cx, |this, cx| { this.opened_buffers .get(&envelope.payload.buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id)) })?; buffer @@ -4055,7 +4055,7 @@ impl Project { let buffer = this .opened_buffers .get(&envelope.payload.buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx)) })?; From c9942632254e52fef39821868c1cd00460bdd785 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 14:14:45 +0200 Subject: [PATCH 06/11] Don't insert an empty vector in `Project::buffer_snapshots` Other code paths rely on at least a version always being there, so we should enforce that invariant everywhere. --- crates/project/src/project.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index ef0531c36a..409e49e9fb 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1216,7 +1216,7 @@ impl Project { let file = File::from_dyn(buffer.file())?; let abs_path = file.as_local()?.abs_path(cx); let uri = lsp::Url::from_file_path(abs_path).unwrap(); - let buffer_snapshots = self.buffer_snapshots.entry(buffer.remote_id()).or_default(); + let buffer_snapshots = self.buffer_snapshots.get_mut(&buffer.remote_id())?; let (version, prev_snapshot) = buffer_snapshots.last()?; let next_snapshot = buffer.text_snapshot(); let next_version = version + 1; From 222cd098388512c4e654b32062074e09c5d61d45 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 14:41:30 +0200 Subject: [PATCH 07/11] Allow host to reconnect to the server in randomized test --- crates/server/src/rpc.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 912d576322..866627ef48 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -5209,7 +5209,6 @@ mod tests { log::info!("Guest {} added", guest_id); } else if rng.lock().gen_bool(0.05) { host_disconnected.store(true, SeqCst); - server.forbid_connections(); server.disconnect_client(user_ids[0]); cx.foreground().advance_clock(RECEIVE_TIMEOUT); let mut clients = futures::future::join_all(clients).await; @@ -5221,11 +5220,14 @@ mod tests { .unwrap() .read_with(&host_cx, |project, _| assert!(!project.is_shared())); for (guest, mut guest_cx) in clients { - assert!(server + let contacts = server .store .read() - .contacts_for_user(guest.current_user_id(&guest_cx)) - .is_empty()); + .contacts_for_user(guest.current_user_id(&guest_cx)); + assert!(!contacts + .iter() + .flat_map(|contact| &contact.projects) + .any(|project| project.id == host_project_id)); guest .project .as_ref() From 32fd4eb3ac4796cdb1aae45647459c0d78b0105f Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 14:41:56 +0200 Subject: [PATCH 08/11] Insert project id in connection's project during project registration ...in contrast to doing so during worktree registration. This fixes a randomized test failure which would panic because store invariants would be violated. This would happen when a peer disconnected before it had a chance to register a worktree because, when removing all the state associated with that peer upon disconnection, we would notice the registered project without however finding it in the peer's connection state. --- crates/server/src/rpc/store.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index 6f5252fecf..6c330c9c8b 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -244,6 +244,9 @@ impl Store { language_servers: Default::default(), }, ); + if let Some(connection) = self.connections.get_mut(&host_connection_id) { + connection.projects.insert(project_id); + } self.next_project_id += 1; project_id } @@ -266,9 +269,7 @@ impl Store { .or_default() .insert(project_id); } - if let Some(connection) = self.connections.get_mut(&project.host_connection_id) { - connection.projects.insert(project_id); - } + project.worktrees.insert(worktree_id, worktree); if let Ok(share) = project.share_mut() { share.worktrees.insert(worktree_id, Default::default()); From 24cb44fb00953171e68a5a78ccf88e0489212505 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 16:04:03 +0200 Subject: [PATCH 09/11] Remove `postage` from `rpc` Co-Authored-By: Nathan Sobo --- Cargo.lock | 1 - crates/client/src/test.rs | 6 +----- crates/rpc/Cargo.toml | 1 - crates/rpc/src/conn.rs | 36 ++++++++++++++++++------------------ crates/rpc/src/peer.rs | 34 +++++++++++++++++----------------- crates/server/src/rpc.rs | 13 ++++++++----- 6 files changed, 44 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7cbba16a00..b8d834c1d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4012,7 +4012,6 @@ dependencies = [ "gpui", "log", "parking_lot", - "postage", "prost", "prost-build", "rand 0.8.3", diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 35a8e85922..5417f2b51d 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -6,7 +6,6 @@ use anyhow::{anyhow, Result}; use futures::{future::BoxFuture, stream::BoxStream, Future, StreamExt}; use gpui::{executor, ModelHandle, TestAppContext}; use parking_lot::Mutex; -use postage::barrier; use rpc::{proto, ConnectionId, Peer, Receipt, TypedEnvelope}; use std::{fmt, rc::Rc, sync::Arc}; @@ -23,7 +22,6 @@ struct FakeServerState { connection_id: Option, forbid_connections: bool, auth_count: usize, - connection_killer: Option, access_token: usize, } @@ -76,15 +74,13 @@ impl FakeServer { Err(EstablishConnectionError::Unauthorized)? } - let (client_conn, server_conn, kill) = - Connection::in_memory(cx.background()); + let (client_conn, server_conn, _) = Connection::in_memory(cx.background()); let (connection_id, io, incoming) = peer.add_test_connection(server_conn, cx.background()).await; cx.background().spawn(io).detach(); let mut state = state.lock(); state.connection_id = Some(connection_id); state.incoming = Some(incoming); - state.connection_killer = Some(kill); Ok(client_conn) }) } diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 9a2cb165c7..1425f408c6 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -23,7 +23,6 @@ base64 = "0.13" futures = "0.3" log = "0.4" parking_lot = "0.11.1" -postage = { version = "0.4.1", features = ["futures-traits"] } prost = "0.8" rand = "0.8" rsa = "0.4" diff --git a/crates/rpc/src/conn.rs b/crates/rpc/src/conn.rs index a97797fc9d..53ba00a3c0 100644 --- a/crates/rpc/src/conn.rs +++ b/crates/rpc/src/conn.rs @@ -35,21 +35,24 @@ impl Connection { #[cfg(any(test, feature = "test-support"))] pub fn in_memory( executor: std::sync::Arc, - ) -> (Self, Self, postage::barrier::Sender) { - use postage::prelude::Stream; + ) -> (Self, Self, std::sync::Arc) { + use std::sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }; - let (kill_tx, kill_rx) = postage::barrier::channel(); - let (a_tx, a_rx) = channel(kill_rx.clone(), executor.clone()); - let (b_tx, b_rx) = channel(kill_rx, executor); + let killed = Arc::new(AtomicBool::new(false)); + let (a_tx, a_rx) = channel(killed.clone(), executor.clone()); + let (b_tx, b_rx) = channel(killed.clone(), executor); return ( Self { tx: a_tx, rx: b_rx }, Self { tx: b_tx, rx: a_rx }, - kill_tx, + killed, ); fn channel( - kill_rx: postage::barrier::Receiver, - executor: std::sync::Arc, + killed: Arc, + executor: Arc, ) -> ( Box>, Box< @@ -57,20 +60,17 @@ impl Connection { >, ) { use futures::channel::mpsc; - use std::{ - io::{Error, ErrorKind}, - sync::Arc, - }; + use std::io::{Error, ErrorKind}; let (tx, rx) = mpsc::unbounded::(); let tx = tx .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e))) .with({ - let kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = Arc::downgrade(&executor); move |msg| { - let mut kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = executor.clone(); Box::pin(async move { if let Some(executor) = executor.upgrade() { @@ -78,7 +78,7 @@ impl Connection { } // Writes to a half-open TCP connection will error. - if kill_rx.try_recv().is_ok() { + if killed.load(SeqCst) { std::io::Result::Err( Error::new(ErrorKind::Other, "connection lost").into(), )?; @@ -90,10 +90,10 @@ impl Connection { }); let rx = rx.then({ - let kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = Arc::downgrade(&executor); move |msg| { - let mut kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = executor.clone(); Box::pin(async move { if let Some(executor) = executor.upgrade() { @@ -101,7 +101,7 @@ impl Connection { } // Reads from a half-open TCP connection will hang. - if kill_rx.try_recv().is_ok() { + if killed.load(SeqCst) { futures::future::pending::<()>().await; } diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 3677f0feac..5efee616e1 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -4,13 +4,13 @@ use super::{ }; use anyhow::{anyhow, Context, Result}; use collections::HashMap; -use futures::{channel::oneshot, stream::BoxStream, FutureExt as _, StreamExt}; -use parking_lot::{Mutex, RwLock}; -use postage::{ - barrier, mpsc, - prelude::{Sink as _, Stream as _}, +use futures::{ + channel::{mpsc, oneshot}, + stream::BoxStream, + FutureExt, SinkExt, StreamExt, }; -use smol_timeout::TimeoutExt as _; +use parking_lot::{Mutex, RwLock}; +use smol_timeout::TimeoutExt; use std::sync::atomic::Ordering::SeqCst; use std::{ fmt, @@ -90,10 +90,10 @@ pub struct Peer { #[derive(Clone)] pub struct ConnectionState { - outgoing_tx: futures::channel::mpsc::UnboundedSender, + outgoing_tx: mpsc::UnboundedSender, next_message_id: Arc, response_channels: - Arc>>>>, + Arc)>>>>>, } const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); @@ -127,7 +127,7 @@ impl Peer { // bounded channel so that other peers will receive backpressure if they send // messages faster than this peer can process them. let (mut incoming_tx, incoming_rx) = mpsc::channel(64); - let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded(); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded(); let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); let connection_state = ConnectionState { @@ -208,14 +208,14 @@ impl Peer { if let Some(responding_to) = incoming.responding_to { let channel = response_channels.lock().as_mut()?.remove(&responding_to); if let Some(tx) = channel { - let mut requester_resumed = barrier::channel(); + let requester_resumed = oneshot::channel(); if let Err(error) = tx.send((incoming, requester_resumed.0)) { log::debug!( "received RPC but request future was dropped {:?}", error.0 ); } - requester_resumed.1.recv().await; + let _ = requester_resumed.1.await; } else { log::warn!("received RPC response to unknown request {}", responding_to); } @@ -721,26 +721,26 @@ mod tests { .add_test_connection(client_conn, cx.background()) .await; - let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel(); + let (io_ended_tx, io_ended_rx) = oneshot::channel(); executor .spawn(async move { io_handler.await.ok(); - io_ended_tx.send(()).await.unwrap(); + io_ended_tx.send(()).unwrap(); }) .detach(); - let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel(); + let (messages_ended_tx, messages_ended_rx) = oneshot::channel(); executor .spawn(async move { incoming.next().await; - messages_ended_tx.send(()).await.unwrap(); + messages_ended_tx.send(()).unwrap(); }) .detach(); client.disconnect(connection_id); - io_ended_rx.recv().await; - messages_ended_rx.recv().await; + let _ = io_ended_rx.await; + let _ = messages_ended_rx.await; assert!(server_conn .send(WebSocketMessage::Binary(vec![])) .await diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 866627ef48..e2d64b1abf 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1094,7 +1094,6 @@ mod tests { }; use lsp::{self, FakeLanguageServer}; use parking_lot::Mutex; - use postage::barrier; use project::{ fs::{FakeFs, Fs as _}, search::SearchQuery, @@ -5350,7 +5349,7 @@ mod tests { server: Arc, foreground: Rc, notifications: mpsc::UnboundedReceiver<()>, - connection_killers: Arc>>, + connection_killers: Arc>>>, forbid_connections: Arc, _test_db: TestDb, } @@ -5418,9 +5417,9 @@ mod tests { "server is forbidding connections" ))) } else { - let (client_conn, server_conn, kill_conn) = + let (client_conn, server_conn, killed) = Connection::in_memory(cx.background()); - connection_killers.lock().insert(user_id, kill_conn); + connection_killers.lock().insert(user_id, killed); cx.background() .spawn(server.handle_connection( server_conn, @@ -5462,7 +5461,11 @@ mod tests { } fn disconnect_client(&self, user_id: UserId) { - self.connection_killers.lock().remove(&user_id); + self.connection_killers + .lock() + .remove(&user_id) + .unwrap() + .store(true, SeqCst); } fn forbid_connections(&self) { From 0b1fda3e13d55a9471a6bd9b279d6b88cc47443f Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 16:14:54 +0200 Subject: [PATCH 10/11] Remove `postage` from `zed-server` Co-Authored-By: Nathan Sobo --- Cargo.lock | 1 - crates/server/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b8d834c1d4..6903812547 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6122,7 +6122,6 @@ dependencies = [ "oauth2", "oauth2-surf", "parking_lot", - "postage", "project", "rand 0.8.3", "rpc", diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index f6d10517bd..fe1a63f5f1 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -33,7 +33,6 @@ lipsum = { version = "0.8", optional = true } oauth2 = { version = "4.0.0", default_features = false } oauth2-surf = "0.1.1" parking_lot = "0.11.1" -postage = { version = "0.4.1", features = ["futures-traits"] } rand = "0.8" rust-embed = { version = "6.3", features = ["include-exclude"] } scrypt = "0.7" From 53a7f9c43e5be422475bd9a5f5fa42093a66a13b Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 16:21:39 +0200 Subject: [PATCH 11/11] Introduce a timeout when processing incoming messages We have an hypothesis that the server gets stuck while processing an incoming message, either because the buffer fills up or because a handler never completes. This should mitigate that and, once we add logging, give us some clue as to what is causing it exactly. Co-Authored-By: Nathan Sobo --- crates/rpc/src/peer.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 5efee616e1..76fd6aac18 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -175,8 +175,10 @@ impl Peer { let incoming = incoming.context("received invalid RPC message")?; receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); if let proto::Message::Envelope(incoming) = incoming { - if incoming_tx.send(incoming).await.is_err() { - return Ok(()); + match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await { + Some(Ok(_)) => {}, + Some(Err(_)) => return Ok(()), + None => Err(anyhow!("timed out processing incoming message"))?, } } break;