diff --git a/Cargo.lock b/Cargo.lock index e4b99c6566..5e5c046ab9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4007,11 +4007,11 @@ dependencies = [ "async-tungstenite", "base64 0.13.0", "clock", + "collections", "futures", "gpui", "log", "parking_lot", - "postage", "prost", "prost-build", "rand 0.8.3", @@ -6122,7 +6122,6 @@ dependencies = [ "oauth2", "oauth2-surf", "parking_lot", - "postage", "project", "rand 0.8.3", "rpc", diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 35a8e85922..5417f2b51d 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -6,7 +6,6 @@ use anyhow::{anyhow, Result}; use futures::{future::BoxFuture, stream::BoxStream, Future, StreamExt}; use gpui::{executor, ModelHandle, TestAppContext}; use parking_lot::Mutex; -use postage::barrier; use rpc::{proto, ConnectionId, Peer, Receipt, TypedEnvelope}; use std::{fmt, rc::Rc, sync::Arc}; @@ -23,7 +22,6 @@ struct FakeServerState { connection_id: Option, forbid_connections: bool, auth_count: usize, - connection_killer: Option, access_token: usize, } @@ -76,15 +74,13 @@ impl FakeServer { Err(EstablishConnectionError::Unauthorized)? } - let (client_conn, server_conn, kill) = - Connection::in_memory(cx.background()); + let (client_conn, server_conn, _) = Connection::in_memory(cx.background()); let (connection_id, io, incoming) = peer.add_test_connection(server_conn, cx.background()).await; cx.background().spawn(io).detach(); let mut state = state.lock(); state.connection_id = Some(connection_id); state.incoming = Some(incoming); - state.connection_killer = Some(kill); Ok(client_conn) }) } diff --git a/crates/project/src/project.rs b/crates/project/src/project.rs index 3b2259d535..409e49e9fb 100644 --- a/crates/project/src/project.rs +++ b/crates/project/src/project.rs @@ -1216,7 +1216,7 @@ impl Project { let file = File::from_dyn(buffer.file())?; let abs_path = file.as_local()?.abs_path(cx); let uri = lsp::Url::from_file_path(abs_path).unwrap(); - let buffer_snapshots = self.buffer_snapshots.entry(buffer.remote_id()).or_default(); + let buffer_snapshots = self.buffer_snapshots.get_mut(&buffer.remote_id())?; let (version, prev_snapshot) = buffer_snapshots.last()?; let next_snapshot = buffer.text_snapshot(); let next_version = version + 1; @@ -3850,7 +3850,7 @@ impl Project { let buffer = this .opened_buffers .get(&buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?; Ok::<_, anyhow::Error>((project_id, buffer)) })?; @@ -3882,7 +3882,7 @@ impl Project { buffers.insert( this.opened_buffers .get(buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?, ); } @@ -3911,7 +3911,7 @@ impl Project { buffers.insert( this.opened_buffers .get(buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", buffer_id))?, ); } @@ -3942,7 +3942,7 @@ impl Project { let buffer = this.read_with(&cx, |this, cx| { this.opened_buffers .get(&envelope.payload.buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id)) })?; buffer @@ -3972,7 +3972,7 @@ impl Project { let buffer = this .opened_buffers .get(&envelope.payload.buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; let language = buffer.read(cx).language(); let completion = language::proto::deserialize_completion( @@ -4014,7 +4014,7 @@ impl Project { let buffer = this.update(&mut cx, |this, cx| { this.opened_buffers .get(&envelope.payload.buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id)) })?; buffer @@ -4055,7 +4055,7 @@ impl Project { let buffer = this .opened_buffers .get(&envelope.payload.buffer_id) - .map(|buffer| buffer.upgrade(cx).unwrap()) + .and_then(|buffer| buffer.upgrade(cx)) .ok_or_else(|| anyhow!("unknown buffer id {}", envelope.payload.buffer_id))?; Ok::<_, anyhow::Error>(this.apply_code_action(buffer, action, false, cx)) })?; diff --git a/crates/project/src/worktree.rs b/crates/project/src/worktree.rs index beacc5a863..ac0c4bf8ff 100644 --- a/crates/project/src/worktree.rs +++ b/crates/project/src/worktree.rs @@ -11,7 +11,10 @@ use client::{proto, Client, TypedEnvelope}; use clock::ReplicaId; use collections::HashMap; use futures::{ - channel::mpsc::{self, UnboundedSender}, + channel::{ + mpsc::{self, UnboundedSender}, + oneshot, + }, Stream, StreamExt, }; use fuzzy::CharBag; @@ -26,7 +29,6 @@ use language::{ use lazy_static::lazy_static; use parking_lot::Mutex; use postage::{ - oneshot, prelude::{Sink as _, Stream as _}, watch, }; @@ -727,11 +729,11 @@ impl LocalWorktree { pub fn share(&mut self, project_id: u64, cx: &mut ModelContext) -> Task> { let register = self.register(project_id, cx); - let (mut share_tx, mut share_rx) = oneshot::channel(); + let (share_tx, share_rx) = oneshot::channel(); let (snapshots_to_send_tx, snapshots_to_send_rx) = smol::channel::unbounded::(); if self.share.is_some() { - let _ = share_tx.try_send(Ok(())); + let _ = share_tx.send(Ok(())); } else { let rpc = self.client.clone(); let worktree_id = cx.model_id() as u64; @@ -756,15 +758,15 @@ impl LocalWorktree { }) .await { - let _ = share_tx.try_send(Err(error)); + let _ = share_tx.send(Err(error)); return Err(anyhow!("failed to send initial update worktree")); } else { - let _ = share_tx.try_send(Ok(())); + let _ = share_tx.send(Ok(())); snapshot } } Err(error) => { - let _ = share_tx.try_send(Err(error.into())); + let _ = share_tx.send(Err(error.into())); return Err(anyhow!("failed to send initial update worktree")); } }; @@ -804,9 +806,8 @@ impl LocalWorktree { }); } share_rx - .next() .await - .unwrap_or_else(|| Err(anyhow!("share ended"))) + .unwrap_or_else(|_| Err(anyhow!("share ended"))) }) } diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index e773b3f0ba..1425f408c6 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -9,9 +9,13 @@ path = "src/rpc.rs" doctest = false [features] -test-support = ["gpui/test-support"] +test-support = ["collections/test-support", "gpui/test-support"] [dependencies] +clock = { path = "../clock" } +collections = { path = "../collections" } +gpui = { path = "../gpui", optional = true } +util = { path = "../util" } anyhow = "1.0" async-lock = "2.4" async-tungstenite = "0.16" @@ -19,21 +23,18 @@ base64 = "0.13" futures = "0.3" log = "0.4" parking_lot = "0.11.1" -postage = { version = "0.4.1", features = ["futures-traits"] } prost = "0.8" rand = "0.8" rsa = "0.4" serde = { version = "1", features = ["derive"] } smol-timeout = "0.6" zstd = "0.9" -clock = { path = "../clock" } -gpui = { path = "../gpui", optional = true } -util = { path = "../util" } [build-dependencies] prost-build = "0.8" [dev-dependencies] +collections = { path = "../collections", features = ["test-support"] } gpui = { path = "../gpui", features = ["test-support"] } smol = "1.2.5" tempdir = "0.3.7" diff --git a/crates/rpc/src/conn.rs b/crates/rpc/src/conn.rs index a97797fc9d..53ba00a3c0 100644 --- a/crates/rpc/src/conn.rs +++ b/crates/rpc/src/conn.rs @@ -35,21 +35,24 @@ impl Connection { #[cfg(any(test, feature = "test-support"))] pub fn in_memory( executor: std::sync::Arc, - ) -> (Self, Self, postage::barrier::Sender) { - use postage::prelude::Stream; + ) -> (Self, Self, std::sync::Arc) { + use std::sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }; - let (kill_tx, kill_rx) = postage::barrier::channel(); - let (a_tx, a_rx) = channel(kill_rx.clone(), executor.clone()); - let (b_tx, b_rx) = channel(kill_rx, executor); + let killed = Arc::new(AtomicBool::new(false)); + let (a_tx, a_rx) = channel(killed.clone(), executor.clone()); + let (b_tx, b_rx) = channel(killed.clone(), executor); return ( Self { tx: a_tx, rx: b_rx }, Self { tx: b_tx, rx: a_rx }, - kill_tx, + killed, ); fn channel( - kill_rx: postage::barrier::Receiver, - executor: std::sync::Arc, + killed: Arc, + executor: Arc, ) -> ( Box>, Box< @@ -57,20 +60,17 @@ impl Connection { >, ) { use futures::channel::mpsc; - use std::{ - io::{Error, ErrorKind}, - sync::Arc, - }; + use std::io::{Error, ErrorKind}; let (tx, rx) = mpsc::unbounded::(); let tx = tx .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e))) .with({ - let kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = Arc::downgrade(&executor); move |msg| { - let mut kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = executor.clone(); Box::pin(async move { if let Some(executor) = executor.upgrade() { @@ -78,7 +78,7 @@ impl Connection { } // Writes to a half-open TCP connection will error. - if kill_rx.try_recv().is_ok() { + if killed.load(SeqCst) { std::io::Result::Err( Error::new(ErrorKind::Other, "connection lost").into(), )?; @@ -90,10 +90,10 @@ impl Connection { }); let rx = rx.then({ - let kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = Arc::downgrade(&executor); move |msg| { - let mut kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = executor.clone(); Box::pin(async move { if let Some(executor) = executor.upgrade() { @@ -101,7 +101,7 @@ impl Connection { } // Reads from a half-open TCP connection will hang. - if kill_rx.try_recv().is_ok() { + if killed.load(SeqCst) { futures::future::pending::<()>().await; } diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 726453bea8..76fd6aac18 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -1,16 +1,18 @@ -use super::proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage}; -use super::Connection; -use anyhow::{anyhow, Context, Result}; -use futures::{channel::oneshot, stream::BoxStream, FutureExt as _, StreamExt}; -use parking_lot::{Mutex, RwLock}; -use postage::{ - barrier, mpsc, - prelude::{Sink as _, Stream as _}, +use super::{ + proto::{self, AnyTypedEnvelope, EnvelopedMessage, MessageStream, RequestMessage}, + Connection, }; -use smol_timeout::TimeoutExt as _; +use anyhow::{anyhow, Context, Result}; +use collections::HashMap; +use futures::{ + channel::{mpsc, oneshot}, + stream::BoxStream, + FutureExt, SinkExt, StreamExt, +}; +use parking_lot::{Mutex, RwLock}; +use smol_timeout::TimeoutExt; use std::sync::atomic::Ordering::SeqCst; use std::{ - collections::HashMap, fmt, future::Future, marker::PhantomData, @@ -88,10 +90,10 @@ pub struct Peer { #[derive(Clone)] pub struct ConnectionState { - outgoing_tx: futures::channel::mpsc::UnboundedSender, + outgoing_tx: mpsc::UnboundedSender, next_message_id: Arc, response_channels: - Arc>>>>, + Arc)>>>>>, } const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); @@ -125,7 +127,7 @@ impl Peer { // bounded channel so that other peers will receive backpressure if they send // messages faster than this peer can process them. let (mut incoming_tx, incoming_rx) = mpsc::channel(64); - let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded(); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded(); let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); let connection_state = ConnectionState { @@ -173,8 +175,10 @@ impl Peer { let incoming = incoming.context("received invalid RPC message")?; receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse()); if let proto::Message::Envelope(incoming) = incoming { - if incoming_tx.send(incoming).await.is_err() { - return Ok(()); + match incoming_tx.send(incoming).timeout(RECEIVE_TIMEOUT).await { + Some(Ok(_)) => {}, + Some(Err(_)) => return Ok(()), + None => Err(anyhow!("timed out processing incoming message"))?, } } break; @@ -206,14 +210,14 @@ impl Peer { if let Some(responding_to) = incoming.responding_to { let channel = response_channels.lock().as_mut()?.remove(&responding_to); if let Some(tx) = channel { - let mut requester_resumed = barrier::channel(); + let requester_resumed = oneshot::channel(); if let Err(error) = tx.send((incoming, requester_resumed.0)) { log::debug!( "received RPC but request future was dropped {:?}", error.0 ); } - requester_resumed.1.recv().await; + let _ = requester_resumed.1.await; } else { log::warn!("received RPC response to unknown request {}", responding_to); } @@ -719,26 +723,26 @@ mod tests { .add_test_connection(client_conn, cx.background()) .await; - let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel(); + let (io_ended_tx, io_ended_rx) = oneshot::channel(); executor .spawn(async move { io_handler.await.ok(); - io_ended_tx.send(()).await.unwrap(); + io_ended_tx.send(()).unwrap(); }) .detach(); - let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel(); + let (messages_ended_tx, messages_ended_rx) = oneshot::channel(); executor .spawn(async move { incoming.next().await; - messages_ended_tx.send(()).await.unwrap(); + messages_ended_tx.send(()).unwrap(); }) .detach(); client.disconnect(connection_id); - io_ended_rx.recv().await; - messages_ended_rx.recv().await; + let _ = io_ended_rx.await; + let _ = messages_ended_rx.await; assert!(server_conn .send(WebSocketMessage::Binary(vec![])) .await diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 6e27fa16c5..fe1a63f5f1 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -14,7 +14,6 @@ required-features = ["seed-support"] [dependencies] collections = { path = "../collections" } -settings = { path = "../settings" } rpc = { path = "../rpc" } anyhow = "1.0.40" async-io = "1.3" @@ -34,7 +33,6 @@ lipsum = { version = "0.8", optional = true } oauth2 = { version = "4.0.0", default_features = false } oauth2-surf = "0.1.1" parking_lot = "0.11.1" -postage = { version = "0.4.1", features = ["futures-traits"] } rand = "0.8" rust-embed = { version = "6.3", features = ["include-exclude"] } scrypt = "0.7" @@ -65,6 +63,7 @@ editor = { path = "../editor", features = ["test-support"] } language = { path = "../language", features = ["test-support"] } lsp = { path = "../lsp", features = ["test-support"] } project = { path = "../project", features = ["test-support"] } +settings = { path = "../settings", features = ["test-support"] } workspace = { path = "../workspace", features = ["test-support"] } ctor = "0.1" env_logger = "0.8" diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index a8cb863935..efa19653ae 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1080,7 +1080,7 @@ mod tests { use ::rpc::Peer; use client::{ self, test::FakeHttpClient, Channel, ChannelDetails, ChannelList, Client, Credentials, - EstablishConnectionError, UserStore, + EstablishConnectionError, UserStore, RECEIVE_TIMEOUT, }; use collections::BTreeMap; use editor::{ @@ -1094,7 +1094,6 @@ mod tests { }; use lsp::{self, FakeLanguageServer}; use parking_lot::Mutex; - use postage::barrier; use project::{ fs::{FakeFs, Fs as _}, search::SearchQuery, @@ -1118,6 +1117,7 @@ mod tests { }, time::Duration, }; + use util::TryFutureExt; use workspace::{Item, SplitDirection, ToggleFollow, Workspace, WorkspaceParams}; #[cfg(test)] @@ -4995,6 +4995,7 @@ mod tests { let operations = Rc::new(Cell::new(0)); let mut server = TestServer::start(cx.foreground(), cx.background()).await; let mut clients = Vec::new(); + let mut user_ids = Vec::new(); let files = Arc::new(Mutex::new(Vec::new())); let mut next_entity_id = 100000; @@ -5162,6 +5163,8 @@ mod tests { }); host_language_registry.add(Arc::new(language)); + let host_disconnected = Rc::new(AtomicBool::new(false)); + user_ids.push(host.current_user_id(&host_cx)); clients.push(cx.foreground().spawn(host.simulate_host( host_project, files, @@ -5203,16 +5206,49 @@ mod tests { ) .await .unwrap(); + user_ids.push(guest.current_user_id(&guest_cx)); clients.push(cx.foreground().spawn(guest.simulate_guest( guest_id, guest_project, operations.clone(), max_operations, rng.clone(), + host_disconnected.clone(), guest_cx, ))); log::info!("Guest {} added", guest_id); + } else if rng.lock().gen_bool(0.05) { + host_disconnected.store(true, SeqCst); + server.disconnect_client(user_ids[0]); + cx.foreground().advance_clock(RECEIVE_TIMEOUT); + let mut clients = futures::future::join_all(clients).await; + cx.foreground().run_until_parked(); + + let (host, mut host_cx) = clients.remove(0); + host.project + .as_ref() + .unwrap() + .read_with(&host_cx, |project, _| assert!(!project.is_shared())); + for (guest, mut guest_cx) in clients { + let contacts = server + .store + .read() + .contacts_for_user(guest.current_user_id(&guest_cx)); + assert!(!contacts + .iter() + .flat_map(|contact| &contact.projects) + .any(|project| project.id == host_project_id)); + guest + .project + .as_ref() + .unwrap() + .read_with(&guest_cx, |project, _| assert!(project.is_read_only())); + guest_cx.update(|_| drop(guest)); + } + host_cx.update(|_| drop(host)); + + return; } } @@ -5325,7 +5361,7 @@ mod tests { server: Arc, foreground: Rc, notifications: mpsc::UnboundedReceiver<()>, - connection_killers: Arc>>, + connection_killers: Arc>>>, forbid_connections: Arc, _test_db: TestDb, } @@ -5393,9 +5429,9 @@ mod tests { "server is forbidding connections" ))) } else { - let (client_conn, server_conn, kill_conn) = + let (client_conn, server_conn, killed) = Connection::in_memory(cx.background()); - connection_killers.lock().insert(user_id, kill_conn); + connection_killers.lock().insert(user_id, killed); cx.background() .spawn(server.handle_connection( server_conn, @@ -5437,7 +5473,11 @@ mod tests { } fn disconnect_client(&self, user_id: UserId) { - self.connection_killers.lock().remove(&user_id); + self.connection_killers + .lock() + .remove(&user_id) + .unwrap() + .store(true, SeqCst); } fn forbid_connections(&self) { @@ -5483,6 +5523,14 @@ mod tests { } } + impl Deref for TestServer { + type Target = Server; + + fn deref(&self) -> &Self::Target { + &self.server + } + } + impl Drop for TestServer { fn drop(&mut self) { self.peer.reset(); @@ -5604,117 +5652,138 @@ mod tests { rng: Arc>, mut cx: TestAppContext, ) -> (Self, TestAppContext) { - let fs = project.read_with(&cx, |project, _| project.fs().clone()); - while operations.get() < max_operations { - operations.set(operations.get() + 1); + async fn simulate_host_internal( + client: &mut TestClient, + project: ModelHandle, + files: Arc>>, + operations: Rc>, + max_operations: usize, + rng: Arc>, + cx: &mut TestAppContext, + ) -> anyhow::Result<()> { + let fs = project.read_with(cx, |project, _| project.fs().clone()); + while operations.get() < max_operations { + operations.set(operations.get() + 1); - let distribution = rng.lock().gen_range::(0..100); - match distribution { - 0..=20 if !files.lock().is_empty() => { - let path = files.lock().choose(&mut *rng.lock()).unwrap().clone(); - let mut path = path.as_path(); - while let Some(parent_path) = path.parent() { - path = parent_path; + let distribution = rng.lock().gen_range::(0..100); + match distribution { + 0..=20 if !files.lock().is_empty() => { + let path = files.lock().choose(&mut *rng.lock()).unwrap().clone(); + let mut path = path.as_path(); + while let Some(parent_path) = path.parent() { + path = parent_path; + if rng.lock().gen() { + break; + } + } + + log::info!("Host: find/create local worktree {:?}", path); + let find_or_create_worktree = project.update(cx, |project, cx| { + project.find_or_create_local_worktree(path, true, cx) + }); if rng.lock().gen() { - break; + cx.background().spawn(find_or_create_worktree).detach(); + } else { + find_or_create_worktree.await?; } } - - log::info!("Host: find/create local worktree {:?}", path); - let find_or_create_worktree = project.update(&mut cx, |project, cx| { - project.find_or_create_local_worktree(path, true, cx) - }); - let find_or_create_worktree = async move { - find_or_create_worktree.await.unwrap(); - }; - if rng.lock().gen() { - cx.background().spawn(find_or_create_worktree).detach(); - } else { - find_or_create_worktree.await; - } - } - 10..=80 if !files.lock().is_empty() => { - let buffer = if self.buffers.is_empty() || rng.lock().gen() { - let file = files.lock().choose(&mut *rng.lock()).unwrap().clone(); - let (worktree, path) = project - .update(&mut cx, |project, cx| { - project.find_or_create_local_worktree(file.clone(), true, cx) - }) - .await - .unwrap(); - let project_path = - worktree.read_with(&cx, |worktree, _| (worktree.id(), path)); - log::info!( - "Host: opening path {:?}, worktree {}, relative_path {:?}", - file, - project_path.0, - project_path.1 - ); - let buffer = project - .update(&mut cx, |project, cx| { - project.open_buffer(project_path, cx) - }) - .await - .unwrap(); - self.buffers.insert(buffer.clone()); - buffer - } else { - self.buffers - .iter() - .choose(&mut *rng.lock()) - .unwrap() - .clone() - }; - - if rng.lock().gen_bool(0.1) { - cx.update(|cx| { + 10..=80 if !files.lock().is_empty() => { + let buffer = if client.buffers.is_empty() || rng.lock().gen() { + let file = files.lock().choose(&mut *rng.lock()).unwrap().clone(); + let (worktree, path) = project + .update(cx, |project, cx| { + project.find_or_create_local_worktree( + file.clone(), + true, + cx, + ) + }) + .await?; + let project_path = + worktree.read_with(cx, |worktree, _| (worktree.id(), path)); log::info!( - "Host: dropping buffer {:?}", - buffer.read(cx).file().unwrap().full_path(cx) + "Host: opening path {:?}, worktree {}, relative_path {:?}", + file, + project_path.0, + project_path.1 ); - self.buffers.remove(&buffer); - drop(buffer); - }); - } else { - buffer.update(&mut cx, |buffer, cx| { - log::info!( - "Host: updating buffer {:?} ({})", - buffer.file().unwrap().full_path(cx), - buffer.remote_id() - ); - buffer.randomly_edit(&mut *rng.lock(), 5, cx) - }); + let buffer = project + .update(cx, |project, cx| project.open_buffer(project_path, cx)) + .await + .unwrap(); + client.buffers.insert(buffer.clone()); + buffer + } else { + client + .buffers + .iter() + .choose(&mut *rng.lock()) + .unwrap() + .clone() + }; + + if rng.lock().gen_bool(0.1) { + cx.update(|cx| { + log::info!( + "Host: dropping buffer {:?}", + buffer.read(cx).file().unwrap().full_path(cx) + ); + client.buffers.remove(&buffer); + drop(buffer); + }); + } else { + buffer.update(cx, |buffer, cx| { + log::info!( + "Host: updating buffer {:?} ({})", + buffer.file().unwrap().full_path(cx), + buffer.remote_id() + ); + buffer.randomly_edit(&mut *rng.lock(), 5, cx) + }); + } } + _ => loop { + let path_component_count = rng.lock().gen_range::(1..=5); + let mut path = PathBuf::new(); + path.push("/"); + for _ in 0..path_component_count { + let letter = rng.lock().gen_range(b'a'..=b'z'); + path.push(std::str::from_utf8(&[letter]).unwrap()); + } + path.set_extension("rs"); + let parent_path = path.parent().unwrap(); + + log::info!("Host: creating file {:?}", path,); + + if fs.create_dir(&parent_path).await.is_ok() + && fs.create_file(&path, Default::default()).await.is_ok() + { + files.lock().push(path); + break; + } else { + log::info!("Host: cannot create file"); + } + }, } - _ => loop { - let path_component_count = rng.lock().gen_range::(1..=5); - let mut path = PathBuf::new(); - path.push("/"); - for _ in 0..path_component_count { - let letter = rng.lock().gen_range(b'a'..=b'z'); - path.push(std::str::from_utf8(&[letter]).unwrap()); - } - path.set_extension("rs"); - let parent_path = path.parent().unwrap(); - log::info!("Host: creating file {:?}", path,); - - if fs.create_dir(&parent_path).await.is_ok() - && fs.create_file(&path, Default::default()).await.is_ok() - { - files.lock().push(path); - break; - } else { - log::info!("Host: cannot create file"); - } - }, + cx.background().simulate_random_delay().await; } - cx.background().simulate_random_delay().await; + Ok(()) } + simulate_host_internal( + &mut self, + project.clone(), + files, + operations, + max_operations, + rng, + &mut cx, + ) + .log_err() + .await; log::info!("Host done"); - self.project = Some(project); (self, cx) } @@ -5726,244 +5795,292 @@ mod tests { operations: Rc>, max_operations: usize, rng: Arc>, + host_disconnected: Rc, mut cx: TestAppContext, ) -> (Self, TestAppContext) { - while operations.get() < max_operations { - let buffer = if self.buffers.is_empty() || rng.lock().gen() { - let worktree = if let Some(worktree) = project.read_with(&cx, |project, cx| { - project - .worktrees(&cx) - .filter(|worktree| { - let worktree = worktree.read(cx); - worktree.is_visible() - && worktree.entries(false).any(|e| e.is_file()) + async fn simulate_guest_internal( + client: &mut TestClient, + guest_id: usize, + project: ModelHandle, + operations: Rc>, + max_operations: usize, + rng: Arc>, + cx: &mut TestAppContext, + ) -> anyhow::Result<()> { + while operations.get() < max_operations { + let buffer = if client.buffers.is_empty() || rng.lock().gen() { + let worktree = if let Some(worktree) = + project.read_with(cx, |project, cx| { + project + .worktrees(&cx) + .filter(|worktree| { + let worktree = worktree.read(cx); + worktree.is_visible() + && worktree.entries(false).any(|e| e.is_file()) + }) + .choose(&mut *rng.lock()) + }) { + worktree + } else { + cx.background().simulate_random_delay().await; + continue; + }; + + operations.set(operations.get() + 1); + let (worktree_root_name, project_path) = + worktree.read_with(cx, |worktree, _| { + let entry = worktree + .entries(false) + .filter(|e| e.is_file()) + .choose(&mut *rng.lock()) + .unwrap(); + ( + worktree.root_name().to_string(), + (worktree.id(), entry.path.clone()), + ) + }); + log::info!( + "Guest {}: opening path {:?} in worktree {} ({})", + guest_id, + project_path.1, + project_path.0, + worktree_root_name, + ); + let buffer = project + .update(cx, |project, cx| { + project.open_buffer(project_path.clone(), cx) }) - .choose(&mut *rng.lock()) - }) { - worktree + .await?; + log::info!( + "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}", + guest_id, + project_path.1, + project_path.0, + worktree_root_name, + buffer.read_with(cx, |buffer, _| buffer.remote_id()) + ); + client.buffers.insert(buffer.clone()); + buffer } else { - cx.background().simulate_random_delay().await; - continue; + operations.set(operations.get() + 1); + + client + .buffers + .iter() + .choose(&mut *rng.lock()) + .unwrap() + .clone() }; - operations.set(operations.get() + 1); - let (worktree_root_name, project_path) = - worktree.read_with(&cx, |worktree, _| { - let entry = worktree - .entries(false) - .filter(|e| e.is_file()) - .choose(&mut *rng.lock()) - .unwrap(); - ( - worktree.root_name().to_string(), - (worktree.id(), entry.path.clone()), - ) - }); - log::info!( - "Guest {}: opening path {:?} in worktree {} ({})", - guest_id, - project_path.1, - project_path.0, - worktree_root_name, - ); - let buffer = project - .update(&mut cx, |project, cx| { - project.open_buffer(project_path.clone(), cx) - }) - .await - .unwrap(); - log::info!( - "Guest {}: opened path {:?} in worktree {} ({}) with buffer id {}", - guest_id, - project_path.1, - project_path.0, - worktree_root_name, - buffer.read_with(&cx, |buffer, _| buffer.remote_id()) - ); - self.buffers.insert(buffer.clone()); - buffer - } else { - operations.set(operations.get() + 1); - - self.buffers - .iter() - .choose(&mut *rng.lock()) - .unwrap() - .clone() - }; - - let choice = rng.lock().gen_range(0..100); - match choice { - 0..=9 => { - cx.update(|cx| { - log::info!( - "Guest {}: dropping buffer {:?}", - guest_id, - buffer.read(cx).file().unwrap().full_path(cx) - ); - self.buffers.remove(&buffer); - drop(buffer); - }); - } - 10..=19 => { - let completions = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: requesting completions for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.completions(&buffer, offset, cx) - }); - let completions = cx.background().spawn(async move { - completions.await.expect("completions request failed"); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching completions request", guest_id); - completions.detach(); - } else { - completions.await; + let choice = rng.lock().gen_range(0..100); + match choice { + 0..=9 => { + cx.update(|cx| { + log::info!( + "Guest {}: dropping buffer {:?}", + guest_id, + buffer.read(cx).file().unwrap().full_path(cx) + ); + client.buffers.remove(&buffer); + drop(buffer); + }); + } + 10..=19 => { + let completions = project.update(cx, |project, cx| { + log::info!( + "Guest {}: requesting completions for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.completions(&buffer, offset, cx) + }); + let completions = cx.background().spawn(async move { + completions + .await + .map_err(|err| anyhow!("completions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching completions request", guest_id); + cx.update(|cx| completions.detach_and_log_err(cx)); + } else { + completions.await?; + } + } + 20..=29 => { + let code_actions = project.update(cx, |project, cx| { + log::info!( + "Guest {}: requesting code actions for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock()); + project.code_actions(&buffer, range, cx) + }); + let code_actions = cx.background().spawn(async move { + code_actions.await.map_err(|err| { + anyhow!("code actions request failed: {:?}", err) + }) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching code actions request", guest_id); + cx.update(|cx| code_actions.detach_and_log_err(cx)); + } else { + code_actions.await?; + } + } + 30..=39 if buffer.read_with(cx, |buffer, _| buffer.is_dirty()) => { + let (requested_version, save) = buffer.update(cx, |buffer, cx| { + log::info!( + "Guest {}: saving buffer {} ({:?})", + guest_id, + buffer.remote_id(), + buffer.file().unwrap().full_path(cx) + ); + (buffer.version(), buffer.save(cx)) + }); + let save = cx.background().spawn(async move { + let (saved_version, _) = save + .await + .map_err(|err| anyhow!("save request failed: {:?}", err))?; + assert!(saved_version.observed_all(&requested_version)); + Ok::<_, anyhow::Error>(()) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching save request", guest_id); + cx.update(|cx| save.detach_and_log_err(cx)); + } else { + save.await?; + } + } + 40..=44 => { + let prepare_rename = project.update(cx, |project, cx| { + log::info!( + "Guest {}: preparing rename for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.prepare_rename(buffer, offset, cx) + }); + let prepare_rename = cx.background().spawn(async move { + prepare_rename.await.map_err(|err| { + anyhow!("prepare rename request failed: {:?}", err) + }) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching prepare rename request", guest_id); + cx.update(|cx| prepare_rename.detach_and_log_err(cx)); + } else { + prepare_rename.await?; + } + } + 45..=49 => { + let definitions = project.update(cx, |project, cx| { + log::info!( + "Guest {}: requesting definitions for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.definition(&buffer, offset, cx) + }); + let definitions = cx.background().spawn(async move { + definitions + .await + .map_err(|err| anyhow!("definitions request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching definitions request", guest_id); + cx.update(|cx| definitions.detach_and_log_err(cx)); + } else { + client + .buffers + .extend(definitions.await?.into_iter().map(|loc| loc.buffer)); + } + } + 50..=54 => { + let highlights = project.update(cx, |project, cx| { + log::info!( + "Guest {}: requesting highlights for buffer {} ({:?})", + guest_id, + buffer.read(cx).remote_id(), + buffer.read(cx).file().unwrap().full_path(cx) + ); + let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); + project.document_highlights(&buffer, offset, cx) + }); + let highlights = cx.background().spawn(async move { + highlights + .await + .map_err(|err| anyhow!("highlights request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching highlights request", guest_id); + cx.update(|cx| highlights.detach_and_log_err(cx)); + } else { + highlights.await?; + } + } + 55..=59 => { + let search = project.update(cx, |project, cx| { + let query = rng.lock().gen_range('a'..='z'); + log::info!("Guest {}: project-wide search {:?}", guest_id, query); + project.search(SearchQuery::text(query, false, false), cx) + }); + let search = cx.background().spawn(async move { + search + .await + .map_err(|err| anyhow!("search request failed: {:?}", err)) + }); + if rng.lock().gen_bool(0.3) { + log::info!("Guest {}: detaching search request", guest_id); + cx.update(|cx| search.detach_and_log_err(cx)); + } else { + client.buffers.extend(search.await?.into_keys()); + } + } + _ => { + buffer.update(cx, |buffer, cx| { + log::info!( + "Guest {}: updating buffer {} ({:?})", + guest_id, + buffer.remote_id(), + buffer.file().unwrap().full_path(cx) + ); + buffer.randomly_edit(&mut *rng.lock(), 5, cx) + }); } } - 20..=29 => { - let code_actions = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: requesting code actions for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let range = buffer.read(cx).random_byte_range(0, &mut *rng.lock()); - project.code_actions(&buffer, range, cx) - }); - let code_actions = cx.background().spawn(async move { - code_actions.await.expect("code actions request failed"); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching code actions request", guest_id); - code_actions.detach(); - } else { - code_actions.await; - } - } - 30..=39 if buffer.read_with(&cx, |buffer, _| buffer.is_dirty()) => { - let (requested_version, save) = buffer.update(&mut cx, |buffer, cx| { - log::info!( - "Guest {}: saving buffer {} ({:?})", - guest_id, - buffer.remote_id(), - buffer.file().unwrap().full_path(cx) - ); - (buffer.version(), buffer.save(cx)) - }); - let save = cx.background().spawn(async move { - let (saved_version, _) = save.await.expect("save request failed"); - assert!(saved_version.observed_all(&requested_version)); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching save request", guest_id); - save.detach(); - } else { - save.await; - } - } - 40..=44 => { - let prepare_rename = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: preparing rename for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.prepare_rename(buffer, offset, cx) - }); - let prepare_rename = cx.background().spawn(async move { - prepare_rename.await.expect("prepare rename request failed"); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching prepare rename request", guest_id); - prepare_rename.detach(); - } else { - prepare_rename.await; - } - } - 45..=49 => { - let definitions = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: requesting definitions for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.definition(&buffer, offset, cx) - }); - let definitions = cx.background().spawn(async move { - definitions.await.expect("definitions request failed") - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching definitions request", guest_id); - definitions.detach(); - } else { - self.buffers - .extend(definitions.await.into_iter().map(|loc| loc.buffer)); - } - } - 50..=54 => { - let highlights = project.update(&mut cx, |project, cx| { - log::info!( - "Guest {}: requesting highlights for buffer {} ({:?})", - guest_id, - buffer.read(cx).remote_id(), - buffer.read(cx).file().unwrap().full_path(cx) - ); - let offset = rng.lock().gen_range(0..=buffer.read(cx).len()); - project.document_highlights(&buffer, offset, cx) - }); - let highlights = cx.background().spawn(async move { - highlights.await.expect("highlights request failed"); - }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching highlights request", guest_id); - highlights.detach(); - } else { - highlights.await; - } - } - 55..=59 => { - let search = project.update(&mut cx, |project, cx| { - let query = rng.lock().gen_range('a'..='z'); - log::info!("Guest {}: project-wide search {:?}", guest_id, query); - project.search(SearchQuery::text(query, false, false), cx) - }); - let search = cx - .background() - .spawn(async move { search.await.expect("search request failed") }); - if rng.lock().gen_bool(0.3) { - log::info!("Guest {}: detaching search request", guest_id); - search.detach(); - } else { - self.buffers.extend(search.await.into_keys()); - } - } - _ => { - buffer.update(&mut cx, |buffer, cx| { - log::info!( - "Guest {}: updating buffer {} ({:?})", - guest_id, - buffer.remote_id(), - buffer.file().unwrap().full_path(cx) - ); - buffer.randomly_edit(&mut *rng.lock(), 5, cx) - }); - } + cx.background().simulate_random_delay().await; } - cx.background().simulate_random_delay().await; + Ok(()) } - log::info!("Guest {} done", guest_id); + match simulate_guest_internal( + &mut self, + guest_id, + project.clone(), + operations, + max_operations, + rng, + &mut cx, + ) + .await + { + Ok(()) => log::info!("guest {} done", guest_id), + Err(err) => { + if host_disconnected.load(SeqCst) { + log::error!("guest {} simulation error - {:?}", guest_id, err); + } else { + panic!("guest {} simulation error - {:?}", guest_id, err); + } + } + } self.project = Some(project); (self, cx) diff --git a/crates/server/src/rpc/store.rs b/crates/server/src/rpc/store.rs index 6f5252fecf..6c330c9c8b 100644 --- a/crates/server/src/rpc/store.rs +++ b/crates/server/src/rpc/store.rs @@ -244,6 +244,9 @@ impl Store { language_servers: Default::default(), }, ); + if let Some(connection) = self.connections.get_mut(&host_connection_id) { + connection.projects.insert(project_id); + } self.next_project_id += 1; project_id } @@ -266,9 +269,7 @@ impl Store { .or_default() .insert(project_id); } - if let Some(connection) = self.connections.get_mut(&project.host_connection_id) { - connection.projects.insert(project_id); - } + project.worktrees.insert(worktree_id, worktree); if let Ok(share) = project.share_mut() { share.worktrees.insert(worktree_id, Default::default());