use super::Client; use super::*; use crate::http::{HttpClient, Request, Response, ServerResponse}; use futures::{future::BoxFuture, Future}; use gpui::{ModelHandle, TestAppContext}; use parking_lot::Mutex; use postage::{mpsc, prelude::Stream}; use rpc::{proto, ConnectionId, Peer, Receipt, TypedEnvelope}; use std::fmt; use std::sync::atomic::Ordering::SeqCst; use std::sync::{ atomic::{AtomicBool, AtomicUsize}, Arc, }; pub struct FakeServer { peer: Arc, incoming: Mutex>>>, connection_id: Mutex>, forbid_connections: AtomicBool, auth_count: AtomicUsize, access_token: AtomicUsize, user_id: u64, } impl FakeServer { pub async fn for_client( client_user_id: u64, client: &mut Arc, cx: &TestAppContext, ) -> Arc { let server = Arc::new(Self { peer: Peer::new(), incoming: Default::default(), connection_id: Default::default(), forbid_connections: Default::default(), auth_count: Default::default(), access_token: Default::default(), user_id: client_user_id, }); Arc::get_mut(client) .unwrap() .override_authenticate({ let server = server.clone(); move |cx| { server.auth_count.fetch_add(1, SeqCst); let access_token = server.access_token.load(SeqCst).to_string(); cx.spawn(move |_| async move { Ok(Credentials { user_id: client_user_id, access_token, }) }) } }) .override_establish_connection({ let server = server.clone(); move |credentials, cx| { let credentials = credentials.clone(); cx.spawn({ let server = server.clone(); move |cx| async move { server.establish_connection(&credentials, &cx).await } }) } }); client .authenticate_and_connect(&cx.to_async()) .await .unwrap(); server } pub async fn disconnect(&self) { self.peer.disconnect(self.connection_id()).await; self.connection_id.lock().take(); self.incoming.lock().take(); } async fn establish_connection( &self, credentials: &Credentials, cx: &AsyncAppContext, ) -> Result { assert_eq!(credentials.user_id, self.user_id); if self.forbid_connections.load(SeqCst) { Err(EstablishConnectionError::Other(anyhow!( "server is forbidding connections" )))? } if credentials.access_token != self.access_token.load(SeqCst).to_string() { Err(EstablishConnectionError::Unauthorized)? } let (client_conn, server_conn, _) = Connection::in_memory(); let (connection_id, io, incoming) = self.peer.add_connection(server_conn).await; cx.background().spawn(io).detach(); *self.incoming.lock() = Some(incoming); *self.connection_id.lock() = Some(connection_id); Ok(client_conn) } pub fn auth_count(&self) -> usize { self.auth_count.load(SeqCst) } pub fn roll_access_token(&self) { self.access_token.fetch_add(1, SeqCst); } pub fn forbid_connections(&self) { self.forbid_connections.store(true, SeqCst); } pub fn allow_connections(&self) { self.forbid_connections.store(false, SeqCst); } pub async fn send(&self, message: T) { self.peer.send(self.connection_id(), message).await.unwrap(); } pub async fn receive(&self) -> Result> { let message = self .incoming .lock() .as_mut() .expect("not connected") .recv() .await .ok_or_else(|| anyhow!("other half hung up"))?; let type_name = message.payload_type_name(); Ok(*message .into_any() .downcast::>() .unwrap_or_else(|_| { panic!( "fake server received unexpected message type: {:?}", type_name ); })) } pub async fn respond( &self, receipt: Receipt, response: T::Response, ) { self.peer.respond(receipt, response).await.unwrap() } fn connection_id(&self) -> ConnectionId { self.connection_id.lock().expect("not connected") } pub async fn build_user_store( &self, client: Arc, cx: &mut TestAppContext, ) -> ModelHandle { let http_client = FakeHttpClient::with_404_response(); let user_store = cx.add_model(|cx| UserStore::new(client, http_client, cx)); assert_eq!( self.receive::() .await .unwrap() .payload .user_ids, &[self.user_id] ); user_store } } pub struct FakeHttpClient { handler: Box BoxFuture<'static, Result>>, } impl FakeHttpClient { pub fn new(handler: F) -> Arc where Fut: 'static + Send + Future>, F: 'static + Send + Sync + Fn(Request) -> Fut, { Arc::new(Self { handler: Box::new(move |req| Box::pin(handler(req))), }) } pub fn with_404_response() -> Arc { Self::new(|_| async move { Ok(ServerResponse::new(404)) }) } } impl fmt::Debug for FakeHttpClient { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("FakeHttpClient").finish() } } impl HttpClient for FakeHttpClient { fn send<'a>(&'a self, req: Request) -> BoxFuture<'a, Result> { let future = (self.handler)(req); Box::pin(async move { future.await.map(Into::into) }) } }