Ensure client always responds when receiving a request

This commit is contained in:
Antonio Scandurra 2022-02-13 12:21:35 +01:00
parent a41eb5a663
commit a19735c05f
4 changed files with 519 additions and 557 deletions

View file

@ -398,29 +398,23 @@ impl Channel {
cursor
}
fn handle_message_sent(
&mut self,
async fn handle_message_sent(
this: ModelHandle<Self>,
message: TypedEnvelope<proto::ChannelMessageSent>,
_: Arc<Client>,
cx: &mut ModelContext<Self>,
mut cx: AsyncAppContext,
) -> Result<()> {
let user_store = self.user_store.clone();
let user_store = this.read_with(&cx, |this, _| this.user_store.clone());
let message = message
.payload
.message
.ok_or_else(|| anyhow!("empty message"))?;
cx.spawn(|this, mut cx| {
async move {
let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
this.update(&mut cx, |this, cx| {
this.insert_messages(SumTree::from_item(message, &()), cx)
});
Ok(())
}
.log_err()
})
.detach();
let message = ChannelMessage::from_proto(message, &user_store, &mut cx).await?;
this.update(&mut cx, |this, cx| {
this.insert_messages(SumTree::from_item(message, &()), cx)
});
Ok(())
}

View file

@ -11,8 +11,8 @@ use async_tungstenite::tungstenite::{
error::Error as WebsocketError,
http::{Request, StatusCode},
};
use futures::StreamExt;
use gpui::{action, AsyncAppContext, Entity, ModelContext, MutableAppContext, Task};
use futures::{future::LocalBoxFuture, FutureExt, StreamExt};
use gpui::{action, AsyncAppContext, Entity, ModelContext, ModelHandle, MutableAppContext, Task};
use http::HttpClient;
use lazy_static::lazy_static;
use parking_lot::RwLock;
@ -20,10 +20,11 @@ use postage::watch;
use rand::prelude::*;
use rpc::proto::{AnyTypedEnvelope, EntityMessage, EnvelopedMessage, RequestMessage};
use std::{
any::TypeId,
any::{type_name, TypeId},
collections::HashMap,
convert::TryFrom,
fmt::Write as _,
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Weak,
@ -123,14 +124,17 @@ pub enum Status {
ReconnectionError { next_reconnection: Instant },
}
type ModelHandler = Box<
dyn Send
+ Sync
+ FnMut(Box<dyn AnyTypedEnvelope>, &AsyncAppContext) -> LocalBoxFuture<'static, Result<()>>,
>;
struct ClientState {
credentials: Option<Credentials>,
status: (watch::Sender<Status>, watch::Receiver<Status>),
entity_id_extractors: HashMap<TypeId, Box<dyn Send + Sync + Fn(&dyn AnyTypedEnvelope) -> u64>>,
model_handlers: HashMap<
(TypeId, Option<u64>),
Option<Box<dyn Send + Sync + FnMut(Box<dyn AnyTypedEnvelope>, &mut AsyncAppContext)>>,
>,
model_handlers: HashMap<(TypeId, Option<u64>), Option<ModelHandler>>,
_maintain_connection: Option<Task<()>>,
heartbeat_interval: Duration,
}
@ -262,7 +266,7 @@ impl Client {
}
}
pub fn subscribe<T, M, F>(
pub fn subscribe<T, M, F, Fut>(
self: &Arc<Self>,
cx: &mut ModelContext<M>,
mut handler: F,
@ -273,7 +277,8 @@ impl Client {
F: 'static
+ Send
+ Sync
+ FnMut(&mut M, TypedEnvelope<T>, Arc<Self>, &mut ModelContext<M>) -> Result<()>,
+ FnMut(ModelHandle<M>, TypedEnvelope<T>, Arc<Self>, AsyncAppContext) -> Fut,
Fut: 'static + Future<Output = Result<()>>,
{
let subscription_id = (TypeId::of::<T>(), None);
let client = self.clone();
@ -284,11 +289,15 @@ impl Client {
Some(Box::new(move |envelope, cx| {
if let Some(model) = model.upgrade(cx) {
let envelope = envelope.into_any().downcast::<TypedEnvelope<T>>().unwrap();
model.update(cx, |model, cx| {
if let Err(error) = handler(model, *envelope, client.clone(), cx) {
log::error!("error handling message: {}", error)
}
});
handler(model, *envelope, client.clone(), cx.clone()).boxed_local()
} else {
async move {
Err(anyhow!(
"received message for {:?} but model was dropped",
type_name::<M>()
))
}
.boxed_local()
}
})),
);
@ -302,7 +311,7 @@ impl Client {
}
}
pub fn subscribe_to_entity<T, M, F>(
pub fn subscribe_to_entity<T, M, F, Fut>(
self: &Arc<Self>,
remote_id: u64,
cx: &mut ModelContext<M>,
@ -314,7 +323,8 @@ impl Client {
F: 'static
+ Send
+ Sync
+ FnMut(&mut M, TypedEnvelope<T>, Arc<Self>, &mut ModelContext<M>) -> Result<()>,
+ FnMut(ModelHandle<M>, TypedEnvelope<T>, Arc<Self>, AsyncAppContext) -> Fut,
Fut: 'static + Future<Output = Result<()>>,
{
let subscription_id = (TypeId::of::<T>(), Some(remote_id));
let client = self.clone();
@ -337,11 +347,15 @@ impl Client {
Some(Box::new(move |envelope, cx| {
if let Some(model) = model.upgrade(cx) {
let envelope = envelope.into_any().downcast::<TypedEnvelope<T>>().unwrap();
model.update(cx, |model, cx| {
if let Err(error) = handler(model, *envelope, client.clone(), cx) {
log::error!("error handling message: {}", error)
}
});
handler(model, *envelope, client.clone(), cx.clone()).boxed_local()
} else {
async move {
Err(anyhow!(
"received message for {:?} but model was dropped",
type_name::<M>()
))
}
.boxed_local()
}
})),
);
@ -355,6 +369,44 @@ impl Client {
}
}
pub fn subscribe_to_entity_request<T, M, F, Fut>(
self: &Arc<Self>,
remote_id: u64,
cx: &mut ModelContext<M>,
mut handler: F,
) -> Subscription
where
T: EntityMessage + RequestMessage,
M: Entity,
F: 'static
+ Send
+ Sync
+ FnMut(ModelHandle<M>, TypedEnvelope<T>, Arc<Self>, AsyncAppContext) -> Fut,
Fut: 'static + Future<Output = Result<T::Response>>,
{
self.subscribe_to_entity(remote_id, cx, move |model, envelope, client, cx| {
let receipt = envelope.receipt();
let response = handler(model, envelope, client.clone(), cx);
async move {
match response.await {
Ok(response) => {
client.respond(receipt, response)?;
Ok(())
}
Err(error) => {
client.respond_with_error(
receipt,
proto::Error {
message: error.to_string(),
},
)?;
Err(error)
}
}
}
})
}
pub fn has_keychain_credentials(&self, cx: &AsyncAppContext) -> bool {
read_credentials_from_keychain(cx).is_some()
}
@ -442,7 +494,7 @@ impl Client {
let (connection_id, handle_io, mut incoming) = self.peer.add_connection(conn).await;
cx.foreground()
.spawn({
let mut cx = cx.clone();
let cx = cx.clone();
let this = self.clone();
async move {
while let Some(message) = incoming.next().await {
@ -468,12 +520,28 @@ impl Client {
this.id,
type_name
);
(handler)(message, &mut cx);
log::debug!(
"rpc message handled. client_id:{}, name:{}",
this.id,
type_name
);
let future = (handler)(message, &cx);
let client_id = this.id;
cx.foreground()
.spawn(async move {
match future.await {
Ok(()) => {
log::debug!(
"rpc message handled. client_id:{}, name:{}",
client_id,
type_name
);
}
Err(error) => {
log::error!(
"error handling rpc message. client_id:{}, name:{}, error: {}",
client_id, type_name, error
);
}
}
})
.detach();
let mut state = this.state.write();
if state.model_handlers.contains_key(&handler_key) {
@ -715,16 +783,12 @@ impl Client {
response
}
pub fn respond<T: RequestMessage>(
&self,
receipt: Receipt<T>,
response: T::Response,
) -> Result<()> {
fn respond<T: RequestMessage>(&self, receipt: Receipt<T>, response: T::Response) -> Result<()> {
log::debug!("rpc respond. client_id: {}. name:{}", self.id, T::NAME);
self.peer.respond(receipt, response)
}
pub fn respond_with_error<T: RequestMessage>(
fn respond_with_error<T: RequestMessage>(
&self,
receipt: Receipt<T>,
error: proto::Error,
@ -866,7 +930,7 @@ mod tests {
cx,
move |_, _: TypedEnvelope<proto::UnshareProject>, _, _| {
postage::sink::Sink::try_send(&mut done_tx1, ()).unwrap();
Ok(())
async { Ok(()) }
},
)
});
@ -876,7 +940,7 @@ mod tests {
cx,
move |_, _: TypedEnvelope<proto::UnshareProject>, _, _| {
postage::sink::Sink::try_send(&mut done_tx2, ()).unwrap();
Ok(())
async { Ok(()) }
},
)
});
@ -887,7 +951,7 @@ mod tests {
client.subscribe_to_entity(
3,
cx,
move |_, _: TypedEnvelope<proto::UnshareProject>, _, _| Ok(()),
|_, _: TypedEnvelope<proto::UnshareProject>, _, _| async { Ok(()) },
)
});
drop(subscription3);
@ -912,14 +976,14 @@ mod tests {
let subscription1 = model.update(&mut cx, |_, cx| {
client.subscribe(cx, move |_, _: TypedEnvelope<proto::Ping>, _, _| {
postage::sink::Sink::try_send(&mut done_tx1, ()).unwrap();
Ok(())
async { Ok(()) }
})
});
drop(subscription1);
let _subscription2 = model.update(&mut cx, |_, cx| {
client.subscribe(cx, move |_, _: TypedEnvelope<proto::Ping>, _, _| {
postage::sink::Sink::try_send(&mut done_tx2, ()).unwrap();
Ok(())
async { Ok(()) }
})
});
server.send(proto::Ping {});
@ -939,10 +1003,10 @@ mod tests {
model.update(&mut cx, |model, cx| {
model.subscription = Some(client.subscribe(
cx,
move |model, _: TypedEnvelope<proto::Ping>, _, _| {
model.subscription.take();
move |model, _: TypedEnvelope<proto::Ping>, _, mut cx| {
model.update(&mut cx, |model, _| model.subscription.take());
postage::sink::Sink::try_send(&mut done_tx, ()).unwrap();
Ok(())
async { Ok(()) }
},
));
});

View file

@ -60,9 +60,9 @@ impl UserStore {
watch::channel::<Option<proto::UpdateContacts>>();
let update_contacts_subscription = client.subscribe(
cx,
move |_: &mut Self, msg: TypedEnvelope<proto::UpdateContacts>, _, _| {
let _ = update_contacts_tx.blocking_send(Some(msg.payload));
Ok(())
move |_: ModelHandle<Self>, msg: TypedEnvelope<proto::UpdateContacts>, _, _| {
*update_contacts_tx.borrow_mut() = Some(msg.payload);
async move { Ok(()) }
},
);
Self {

File diff suppressed because it is too large Load diff