ssh remoting: Fix ssh process not being cleaned up when connection is closed (#18623)

We introduced a memory leak in #18572, which meant that `Drop` was never
called on `SshRemoteConnection`, meaning that the ssh process kept
running

Co-Authored-by: Thorsten <thorsten@zed.dev>

Release Notes:

- N/A

---------

Co-authored-by: Thorsten <thorsten@zed.dev>
This commit is contained in:
Bennet Bo Fenner 2024-10-02 13:21:19 +02:00 committed by GitHub
parent e80cbab93f
commit b3cdd2ccff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -31,7 +31,7 @@ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{ sync::{
atomic::{AtomicU32, Ordering::SeqCst}, atomic::{AtomicU32, Ordering::SeqCst},
Arc, Arc, Weak,
}, },
time::Instant, time::Instant,
}; };
@ -244,12 +244,12 @@ struct SshRemoteClientState {
ssh_connection: SshRemoteConnection, ssh_connection: SshRemoteConnection,
delegate: Arc<dyn SshClientDelegate>, delegate: Arc<dyn SshClientDelegate>,
forwarder: ChannelForwarder, forwarder: ChannelForwarder,
_multiplex_task: Task<Result<()>>, multiplex_task: Task<Result<()>>,
} }
pub struct SshRemoteClient { pub struct SshRemoteClient {
client: Arc<ChannelClient>, client: Arc<ChannelClient>,
inner_state: Arc<Mutex<Option<SshRemoteClientState>>>, inner_state: Mutex<Option<SshRemoteClientState>>,
} }
impl SshRemoteClient { impl SshRemoteClient {
@ -264,7 +264,7 @@ impl SshRemoteClient {
let client = cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx))?; let client = cx.update(|cx| ChannelClient::new(incoming_rx, outgoing_tx, cx))?;
let this = Arc::new(Self { let this = Arc::new(Self {
client, client,
inner_state: Arc::new(Mutex::new(None)), inner_state: Mutex::new(None),
}); });
let inner_state = { let inner_state = {
@ -276,7 +276,7 @@ impl SshRemoteClient {
.await?; .await?;
let multiplex_task = Self::multiplex( let multiplex_task = Self::multiplex(
this.clone(), Arc::downgrade(&this),
ssh_process, ssh_process,
proxy_incoming_tx, proxy_incoming_tx,
proxy_outgoing_rx, proxy_outgoing_rx,
@ -287,7 +287,7 @@ impl SshRemoteClient {
ssh_connection, ssh_connection,
delegate, delegate,
forwarder: proxy, forwarder: proxy,
_multiplex_task: multiplex_task, multiplex_task,
} }
}; };
@ -305,9 +305,9 @@ impl SshRemoteClient {
mut ssh_connection, mut ssh_connection,
delegate, delegate,
forwarder: proxy, forwarder: proxy,
_multiplex_task, multiplex_task,
} = state; } = state;
drop(_multiplex_task); drop(multiplex_task);
cx.spawn(|mut cx| async move { cx.spawn(|mut cx| async move {
let (incoming_tx, outgoing_rx) = proxy.into_channels().await; let (incoming_tx, outgoing_rx) = proxy.into_channels().await;
@ -331,8 +331,8 @@ impl SshRemoteClient {
ssh_connection, ssh_connection,
delegate, delegate,
forwarder: proxy, forwarder: proxy,
_multiplex_task: Self::multiplex( multiplex_task: Self::multiplex(
this.clone(), Arc::downgrade(&this),
ssh_process, ssh_process,
proxy_incoming_tx, proxy_incoming_tx,
proxy_outgoing_rx, proxy_outgoing_rx,
@ -349,7 +349,7 @@ impl SshRemoteClient {
} }
fn multiplex( fn multiplex(
this: Arc<Self>, this: Weak<Self>,
mut ssh_process: Child, mut ssh_process: Child,
incoming_tx: UnboundedSender<Envelope>, incoming_tx: UnboundedSender<Envelope>,
mut outgoing_rx: UnboundedReceiver<Envelope>, mut outgoing_rx: UnboundedReceiver<Envelope>,
@ -444,7 +444,9 @@ impl SshRemoteClient {
if let Err(error) = result { if let Err(error) = result {
log::warn!("ssh io task died with error: {:?}. reconnecting...", error); log::warn!("ssh io task died with error: {:?}. reconnecting...", error);
Self::reconnect(this, &mut cx).ok(); if let Some(this) = this.upgrade() {
Self::reconnect(this, &mut cx).ok();
}
} }
Ok(()) Ok(())
@ -516,7 +518,7 @@ impl SshRemoteClient {
let client = ChannelClient::new(server_to_client_rx, client_to_server_tx, cx); let client = ChannelClient::new(server_to_client_rx, client_to_server_tx, cx);
Arc::new(Self { Arc::new(Self {
client, client,
inner_state: Arc::new(Mutex::new(None)), inner_state: Mutex::new(None),
}) })
}), }),
server_cx.update(|cx| ChannelClient::new(client_to_server_rx, server_to_client_tx, cx)), server_cx.update(|cx| ChannelClient::new(client_to_server_rx, server_to_client_tx, cx)),