Merge pull request #572 from zed-industries/receive-timeout-tweaks

Reset receive timeout only on reads from the websocket connection, not writes
This commit is contained in:
Max Brunsfeld 2022-03-09 11:49:08 -08:00 committed by GitHub
commit 90c2de7342
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -96,6 +96,7 @@ pub struct ConnectionState {
const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1); const KEEPALIVE_INTERVAL: Duration = Duration::from_secs(1);
const WRITE_TIMEOUT: Duration = Duration::from_secs(2); const WRITE_TIMEOUT: Duration = Duration::from_secs(2);
const RECEIVE_TIMEOUT: Duration = Duration::from_secs(30);
impl Peer { impl Peer {
pub fn new() -> Arc<Self> { pub fn new() -> Arc<Self> {
@ -147,14 +148,14 @@ impl Peer {
let keepalive_timer = create_timer(KEEPALIVE_INTERVAL).fuse(); let keepalive_timer = create_timer(KEEPALIVE_INTERVAL).fuse();
futures::pin_mut!(keepalive_timer); futures::pin_mut!(keepalive_timer);
// Disconnect if we don't receive messages at least this frequently.
let receive_timeout = create_timer(RECEIVE_TIMEOUT).fuse();
futures::pin_mut!(receive_timeout);
loop { loop {
let read_message = reader.read().fuse(); let read_message = reader.read().fuse();
futures::pin_mut!(read_message); futures::pin_mut!(read_message);
// Disconnect if we don't receive messages at least this frequently.
let receive_timeout = create_timer(3 * KEEPALIVE_INTERVAL).fuse();
futures::pin_mut!(receive_timeout);
loop { loop {
futures::select_biased! { futures::select_biased! {
outgoing = outgoing_rx.next().fuse() => match outgoing { outgoing = outgoing_rx.next().fuse() => match outgoing {
@ -170,6 +171,7 @@ impl Peer {
}, },
incoming = read_message => { incoming = read_message => {
let incoming = incoming.context("received invalid RPC message")?; let incoming = incoming.context("received invalid RPC message")?;
receive_timeout.set(create_timer(RECEIVE_TIMEOUT).fuse());
if let proto::Message::Envelope(incoming) = incoming { if let proto::Message::Envelope(incoming) = incoming {
if incoming_tx.send(incoming).await.is_err() { if incoming_tx.send(incoming).await.is_err() {
return Ok(()); return Ok(());