ssh remoting: Treat closed stderr as error (#19289)

Before this change we had a race condition bug: if stderr was closed
before the other two sockets, we wouldn't properly detect when the
server died, and not report or retry anything.

That's because we treated a closed stderr as a non-error.

Technically, it isn't an error (closing a connection is okay!), but
until we have a proper shutdown ceremony between all three processes, we
can treat it as an error, because that lets us to detect when the server
is gone.

On the client-side, we also always react to these errors by
reconnecting. Except when we shutdown: there we do a proper shutdown and
won't error on the proxy exit code.

So, this works, even if I wish there was a better way for the server to
communicate to the proxy that it shutdown properly. But I don't want a
fourth socket.

Release Notes:

- N/A
This commit is contained in:
Thorsten Ball 2024-10-16 18:05:52 +02:00 committed by GitHub
parent 9c3d80d6e8
commit 69abe71bf7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -186,7 +186,6 @@ fn start_server(
log::info!("accepting new connections"); log::info!("accepting new connections");
let result = select! { let result = select! {
streams = streams.fuse() => { streams = streams.fuse() => {
log::warn!("stdin {:?}, stdout: {:?}, stderr: {:?}", streams.0, streams.1, streams.2);
let (Some(Ok(stdin_stream)), Some(Ok(stdout_stream)), Some(Ok(stderr_stream))) = streams else { let (Some(Ok(stdin_stream)), Some(Ok(stdout_stream)), Some(Ok(stderr_stream))) = streams else {
break; break;
}; };
@ -211,8 +210,6 @@ fn start_server(
break; break;
}; };
log::info!("yep! we got connections");
let mut input_buffer = Vec::new(); let mut input_buffer = Vec::new();
let mut output_buffer = Vec::new(); let mut output_buffer = Vec::new();
loop { loop {
@ -253,7 +250,6 @@ fn start_server(
} }
} }
// // TODO: How do we handle backpressure?
log_message = log_rx.next().fuse() => { log_message = log_rx.next().fuse() => {
if let Some(log_message) = log_message { if let Some(log_message) = log_message {
if let Err(error) = stderr_stream.write_all(&log_message).await { if let Err(error) = stderr_stream.write_all(&log_message).await {
@ -316,7 +312,7 @@ pub fn execute_run(
let listeners = ServerListeners::new(stdin_socket, stdout_socket, stderr_socket)?; let listeners = ServerListeners::new(stdin_socket, stdout_socket, stderr_socket)?;
log::debug!("starting gpui app"); log::info!("starting headless gpui app");
gpui::App::headless().run(move |cx| { gpui::App::headless().run(move |cx| {
settings::init(cx); settings::init(cx);
HeadlessProject::init(cx); HeadlessProject::init(cx);
@ -404,7 +400,7 @@ pub fn execute_proxy(identifier: String, is_reconnecting: bool) -> Result<()> {
init_logging_proxy(); init_logging_proxy();
init_panic_hook(); init_panic_hook();
log::debug!("starting up. PID: {}", std::process::id()); log::info!("starting proxy process. PID: {}", std::process::id());
let server_paths = ServerPaths::new(&identifier)?; let server_paths = ServerPaths::new(&identifier)?;
@ -417,7 +413,7 @@ pub fn execute_proxy(identifier: String, is_reconnecting: bool) -> Result<()> {
} }
} else { } else {
if let Some(pid) = server_pid { if let Some(pid) = server_pid {
log::debug!("found server already running with PID {}. Killing process and cleaning up files...", pid); log::info!("proxy found server already running with PID {}. Killing process and cleaning up files...", pid);
kill_running_server(pid, &server_paths)?; kill_running_server(pid, &server_paths)?;
} }
@ -443,7 +439,9 @@ pub fn execute_proxy(identifier: String, is_reconnecting: bool) -> Result<()> {
loop { loop {
match stream.read(&mut stderr_buffer).await { match stream.read(&mut stderr_buffer).await {
Ok(0) => { Ok(0) => {
return anyhow::Ok(()); let error =
std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "stderr closed");
Err(anyhow!(error))?;
} }
Ok(n) => { Ok(n) => {
stderr.write_all(&mut stderr_buffer[..n]).await?; stderr.write_all(&mut stderr_buffer[..n]).await?;
@ -463,6 +461,12 @@ pub fn execute_proxy(identifier: String, is_reconnecting: bool) -> Result<()> {
result = stderr_task.fuse() => result, result = stderr_task.fuse() => result,
} }
}) { }) {
if let Some(error) = forwarding_result.downcast_ref::<std::io::Error>() {
if error.kind() == std::io::ErrorKind::UnexpectedEof {
log::error!("connection to server closed due to unexpected EOF");
return Err(anyhow!("connection to server closed"));
}
}
log::error!( log::error!(
"failed to forward messages: {:?}, terminating...", "failed to forward messages: {:?}, terminating...",
forwarding_result forwarding_result
@ -518,7 +522,10 @@ fn spawn_server(paths: &ServerPaths) -> Result<()> {
.arg(&paths.stderr_socket) .arg(&paths.stderr_socket)
.spawn()?; .spawn()?;
log::debug!("server started. PID: {:?}", server_process.id()); log::info!(
"proxy spawned server process. PID: {:?}",
server_process.id()
);
let mut total_time_waited = std::time::Duration::from_secs(0); let mut total_time_waited = std::time::Duration::from_secs(0);
let wait_duration = std::time::Duration::from_millis(20); let wait_duration = std::time::Duration::from_millis(20);