From 24cb44fb00953171e68a5a78ccf88e0489212505 Mon Sep 17 00:00:00 2001 From: Antonio Scandurra Date: Fri, 8 Apr 2022 16:04:03 +0200 Subject: [PATCH] Remove `postage` from `rpc` Co-Authored-By: Nathan Sobo --- Cargo.lock | 1 - crates/client/src/test.rs | 6 +----- crates/rpc/Cargo.toml | 1 - crates/rpc/src/conn.rs | 36 ++++++++++++++++++------------------ crates/rpc/src/peer.rs | 34 +++++++++++++++++----------------- crates/server/src/rpc.rs | 13 ++++++++----- 6 files changed, 44 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7cbba16a00..b8d834c1d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4012,7 +4012,6 @@ dependencies = [ "gpui", "log", "parking_lot", - "postage", "prost", "prost-build", "rand 0.8.3", diff --git a/crates/client/src/test.rs b/crates/client/src/test.rs index 35a8e85922..5417f2b51d 100644 --- a/crates/client/src/test.rs +++ b/crates/client/src/test.rs @@ -6,7 +6,6 @@ use anyhow::{anyhow, Result}; use futures::{future::BoxFuture, stream::BoxStream, Future, StreamExt}; use gpui::{executor, ModelHandle, TestAppContext}; use parking_lot::Mutex; -use postage::barrier; use rpc::{proto, ConnectionId, Peer, Receipt, TypedEnvelope}; use std::{fmt, rc::Rc, sync::Arc}; @@ -23,7 +22,6 @@ struct FakeServerState { connection_id: Option, forbid_connections: bool, auth_count: usize, - connection_killer: Option, access_token: usize, } @@ -76,15 +74,13 @@ impl FakeServer { Err(EstablishConnectionError::Unauthorized)? } - let (client_conn, server_conn, kill) = - Connection::in_memory(cx.background()); + let (client_conn, server_conn, _) = Connection::in_memory(cx.background()); let (connection_id, io, incoming) = peer.add_test_connection(server_conn, cx.background()).await; cx.background().spawn(io).detach(); let mut state = state.lock(); state.connection_id = Some(connection_id); state.incoming = Some(incoming); - state.connection_killer = Some(kill); Ok(client_conn) }) } diff --git a/crates/rpc/Cargo.toml b/crates/rpc/Cargo.toml index 9a2cb165c7..1425f408c6 100644 --- a/crates/rpc/Cargo.toml +++ b/crates/rpc/Cargo.toml @@ -23,7 +23,6 @@ base64 = "0.13" futures = "0.3" log = "0.4" parking_lot = "0.11.1" -postage = { version = "0.4.1", features = ["futures-traits"] } prost = "0.8" rand = "0.8" rsa = "0.4" diff --git a/crates/rpc/src/conn.rs b/crates/rpc/src/conn.rs index a97797fc9d..53ba00a3c0 100644 --- a/crates/rpc/src/conn.rs +++ b/crates/rpc/src/conn.rs @@ -35,21 +35,24 @@ impl Connection { #[cfg(any(test, feature = "test-support"))] pub fn in_memory( executor: std::sync::Arc, - ) -> (Self, Self, postage::barrier::Sender) { - use postage::prelude::Stream; + ) -> (Self, Self, std::sync::Arc) { + use std::sync::{ + atomic::{AtomicBool, Ordering::SeqCst}, + Arc, + }; - let (kill_tx, kill_rx) = postage::barrier::channel(); - let (a_tx, a_rx) = channel(kill_rx.clone(), executor.clone()); - let (b_tx, b_rx) = channel(kill_rx, executor); + let killed = Arc::new(AtomicBool::new(false)); + let (a_tx, a_rx) = channel(killed.clone(), executor.clone()); + let (b_tx, b_rx) = channel(killed.clone(), executor); return ( Self { tx: a_tx, rx: b_rx }, Self { tx: b_tx, rx: a_rx }, - kill_tx, + killed, ); fn channel( - kill_rx: postage::barrier::Receiver, - executor: std::sync::Arc, + killed: Arc, + executor: Arc, ) -> ( Box>, Box< @@ -57,20 +60,17 @@ impl Connection { >, ) { use futures::channel::mpsc; - use std::{ - io::{Error, ErrorKind}, - sync::Arc, - }; + use std::io::{Error, ErrorKind}; let (tx, rx) = mpsc::unbounded::(); let tx = tx .sink_map_err(|e| WebSocketError::from(Error::new(ErrorKind::Other, e))) .with({ - let kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = Arc::downgrade(&executor); move |msg| { - let mut kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = executor.clone(); Box::pin(async move { if let Some(executor) = executor.upgrade() { @@ -78,7 +78,7 @@ impl Connection { } // Writes to a half-open TCP connection will error. - if kill_rx.try_recv().is_ok() { + if killed.load(SeqCst) { std::io::Result::Err( Error::new(ErrorKind::Other, "connection lost").into(), )?; @@ -90,10 +90,10 @@ impl Connection { }); let rx = rx.then({ - let kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = Arc::downgrade(&executor); move |msg| { - let mut kill_rx = kill_rx.clone(); + let killed = killed.clone(); let executor = executor.clone(); Box::pin(async move { if let Some(executor) = executor.upgrade() { @@ -101,7 +101,7 @@ impl Connection { } // Reads from a half-open TCP connection will hang. - if kill_rx.try_recv().is_ok() { + if killed.load(SeqCst) { futures::future::pending::<()>().await; } diff --git a/crates/rpc/src/peer.rs b/crates/rpc/src/peer.rs index 3677f0feac..5efee616e1 100644 --- a/crates/rpc/src/peer.rs +++ b/crates/rpc/src/peer.rs @@ -4,13 +4,13 @@ use super::{ }; use anyhow::{anyhow, Context, Result}; use collections::HashMap; -use futures::{channel::oneshot, stream::BoxStream, FutureExt as _, StreamExt}; -use parking_lot::{Mutex, RwLock}; -use postage::{ - barrier, mpsc, - prelude::{Sink as _, Stream as _}, +use futures::{ + channel::{mpsc, oneshot}, + stream::BoxStream, + FutureExt, SinkExt, StreamExt, }; -use smol_timeout::TimeoutExt as _; +use parking_lot::{Mutex, RwLock}; +use smol_timeout::TimeoutExt; use std::sync::atomic::Ordering::SeqCst; use std::{ fmt, @@ -90,10 +90,10 @@ pub struct Peer { #[derive(Clone)] pub struct ConnectionState { - outgoing_tx: futures::channel::mpsc::UnboundedSender, + outgoing_tx: mpsc::UnboundedSender, next_message_id: Arc, response_channels: - Arc>>>>, + Arc)>>>>>, } const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); @@ -127,7 +127,7 @@ impl Peer { // bounded channel so that other peers will receive backpressure if they send // messages faster than this peer can process them. let (mut incoming_tx, incoming_rx) = mpsc::channel(64); - let (outgoing_tx, mut outgoing_rx) = futures::channel::mpsc::unbounded(); + let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded(); let connection_id = ConnectionId(self.next_connection_id.fetch_add(1, SeqCst)); let connection_state = ConnectionState { @@ -208,14 +208,14 @@ impl Peer { if let Some(responding_to) = incoming.responding_to { let channel = response_channels.lock().as_mut()?.remove(&responding_to); if let Some(tx) = channel { - let mut requester_resumed = barrier::channel(); + let requester_resumed = oneshot::channel(); if let Err(error) = tx.send((incoming, requester_resumed.0)) { log::debug!( "received RPC but request future was dropped {:?}", error.0 ); } - requester_resumed.1.recv().await; + let _ = requester_resumed.1.await; } else { log::warn!("received RPC response to unknown request {}", responding_to); } @@ -721,26 +721,26 @@ mod tests { .add_test_connection(client_conn, cx.background()) .await; - let (mut io_ended_tx, mut io_ended_rx) = postage::barrier::channel(); + let (io_ended_tx, io_ended_rx) = oneshot::channel(); executor .spawn(async move { io_handler.await.ok(); - io_ended_tx.send(()).await.unwrap(); + io_ended_tx.send(()).unwrap(); }) .detach(); - let (mut messages_ended_tx, mut messages_ended_rx) = postage::barrier::channel(); + let (messages_ended_tx, messages_ended_rx) = oneshot::channel(); executor .spawn(async move { incoming.next().await; - messages_ended_tx.send(()).await.unwrap(); + messages_ended_tx.send(()).unwrap(); }) .detach(); client.disconnect(connection_id); - io_ended_rx.recv().await; - messages_ended_rx.recv().await; + let _ = io_ended_rx.await; + let _ = messages_ended_rx.await; assert!(server_conn .send(WebSocketMessage::Binary(vec![])) .await diff --git a/crates/server/src/rpc.rs b/crates/server/src/rpc.rs index 866627ef48..e2d64b1abf 100644 --- a/crates/server/src/rpc.rs +++ b/crates/server/src/rpc.rs @@ -1094,7 +1094,6 @@ mod tests { }; use lsp::{self, FakeLanguageServer}; use parking_lot::Mutex; - use postage::barrier; use project::{ fs::{FakeFs, Fs as _}, search::SearchQuery, @@ -5350,7 +5349,7 @@ mod tests { server: Arc, foreground: Rc, notifications: mpsc::UnboundedReceiver<()>, - connection_killers: Arc>>, + connection_killers: Arc>>>, forbid_connections: Arc, _test_db: TestDb, } @@ -5418,9 +5417,9 @@ mod tests { "server is forbidding connections" ))) } else { - let (client_conn, server_conn, kill_conn) = + let (client_conn, server_conn, killed) = Connection::in_memory(cx.background()); - connection_killers.lock().insert(user_id, kill_conn); + connection_killers.lock().insert(user_id, killed); cx.background() .spawn(server.handle_connection( server_conn, @@ -5462,7 +5461,11 @@ mod tests { } fn disconnect_client(&self, user_id: UserId) { - self.connection_killers.lock().remove(&user_id); + self.connection_killers + .lock() + .remove(&user_id) + .unwrap() + .store(true, SeqCst); } fn forbid_connections(&self) {