From 95b2f4fb1614df67ffd834d93902b4254033c54f Mon Sep 17 00:00:00 2001 From: Max Brunsfeld Date: Tue, 1 Mar 2022 15:02:04 -0800 Subject: [PATCH] Fix remaining language server hangs on shutdown * Use fork of async-pipe library that handles closed pipes correctly. * Clear response handlers map when terminating output task, so as to wake any pending request futures. Co-Authored-By: Nathan Sobo --- Cargo.lock | 3 +- crates/lsp/Cargo.toml | 5 +-- crates/lsp/src/lsp.rs | 77 ++++++++++++------------------------------- 3 files changed, 26 insertions(+), 59 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4f52991986..68ebe25d81 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -336,7 +336,7 @@ dependencies = [ [[package]] name = "async-pipe" version = "0.1.3" -source = "git+https://github.com/routerify/async-pipe-rs?rev=feeb77e83142a9ff837d0767652ae41bfc5d8e47#feeb77e83142a9ff837d0767652ae41bfc5d8e47" +source = "git+https://github.com/zed-industries/async-pipe-rs?rev=82d00a04211cf4e1236029aa03e6b6ce2a74c553#82d00a04211cf4e1236029aa03e6b6ce2a74c553" dependencies = [ "futures", "log", @@ -2827,6 +2827,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-pipe", + "collections", "ctor", "env_logger", "futures", diff --git a/crates/lsp/Cargo.toml b/crates/lsp/Cargo.toml index fca48970d2..e463ba020d 100644 --- a/crates/lsp/Cargo.toml +++ b/crates/lsp/Cargo.toml @@ -10,10 +10,11 @@ path = "src/lsp.rs" test-support = ["async-pipe"] [dependencies] +collections = { path = "../collections" } gpui = { path = "../gpui" } util = { path = "../util" } anyhow = "1.0" -async-pipe = { git = "https://github.com/routerify/async-pipe-rs", rev = "feeb77e83142a9ff837d0767652ae41bfc5d8e47", optional = true } +async-pipe = { git = "https://github.com/zed-industries/async-pipe-rs", rev = "82d00a04211cf4e1236029aa03e6b6ce2a74c553", optional = true } futures = "0.3" log = "0.4" lsp-types = "0.91" @@ -26,7 +27,7 @@ smol = "1.2" [dev-dependencies] gpui = { path = "../gpui", features = ["test-support"] } util = { path = "../util", features = ["test-support"] } -async-pipe = { git = "https://github.com/routerify/async-pipe-rs", rev = "feeb77e83142a9ff837d0767652ae41bfc5d8e47" } +async-pipe = { git = "https://github.com/zed-industries/async-pipe-rs", rev = "82d00a04211cf4e1236029aa03e6b6ce2a74c553" } ctor = "0.1" env_logger = "0.8" unindent = "0.1.7" diff --git a/crates/lsp/src/lsp.rs b/crates/lsp/src/lsp.rs index ac0603cd4d..4a988f90b5 100644 --- a/crates/lsp/src/lsp.rs +++ b/crates/lsp/src/lsp.rs @@ -1,6 +1,6 @@ use anyhow::{anyhow, Context, Result}; -use futures::channel::oneshot; -use futures::{io::BufWriter, AsyncRead, AsyncWrite}; +use collections::HashMap; +use futures::{channel::oneshot, io::BufWriter, AsyncRead, AsyncWrite}; use gpui::{executor, Task}; use parking_lot::{Mutex, RwLock}; use postage::{barrier, prelude::Stream, watch}; @@ -12,7 +12,6 @@ use smol::{ process::Command, }; use std::{ - collections::HashMap, future::Future, io::Write, str::FromStr, @@ -129,14 +128,15 @@ impl LanguageServer { let mut stdin = BufWriter::new(stdin); let mut stdout = BufReader::new(stdout); let (outbound_tx, outbound_rx) = channel::unbounded::>(); - let notification_handlers = Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::new())); - let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::new())); + let notification_handlers = + Arc::new(RwLock::new(HashMap::<_, NotificationHandler>::default())); + let response_handlers = Arc::new(Mutex::new(HashMap::<_, ResponseHandler>::default())); let input_task = executor.spawn( { let notification_handlers = notification_handlers.clone(); let response_handlers = response_handlers.clone(); async move { - let _clear_response_channels = ClearResponseChannels(response_handlers.clone()); + let _clear_response_handlers = ClearResponseHandlers(response_handlers.clone()); let mut buffer = Vec::new(); loop { buffer.clear(); @@ -190,8 +190,10 @@ impl LanguageServer { .log_err(), ); let (output_done_tx, output_done_rx) = barrier::channel(); - let output_task = executor.spawn( + let output_task = executor.spawn({ + let response_handlers = response_handlers.clone(); async move { + let _clear_response_handlers = ClearResponseHandlers(response_handlers); let mut content_len_buffer = Vec::new(); while let Ok(message) = outbound_rx.recv().await { content_len_buffer.clear(); @@ -205,8 +207,8 @@ impl LanguageServer { drop(output_done_tx); Ok(()) } - .log_err(), - ); + .log_err() + }); let (initialized_tx, initialized_rx) = barrier::channel(); let (mut capabilities_tx, capabilities_rx) = watch::channel(); @@ -408,9 +410,13 @@ impl LanguageServer { params, }) .unwrap(); - let mut response_handlers = response_handlers.lock(); + + let send = outbound_tx + .try_send(message) + .context("failed to write to language server's stdin"); + let (tx, rx) = oneshot::channel(); - response_handlers.insert( + response_handlers.lock().insert( id, Box::new(move |result| { let response = match result { @@ -423,9 +429,6 @@ impl LanguageServer { }), ); - let send = outbound_tx - .try_send(message) - .context("failed to write to language server's stdin"); async move { send?; rx.await? @@ -581,7 +584,7 @@ impl FakeLanguageServer { }); let output_task = cx.background().spawn(async move { - let mut stdout = smol::io::BufWriter::new(PipeWriterCloseOnDrop(stdout)); + let mut stdout = smol::io::BufWriter::new(stdout); while let Some(message) = outgoing_rx.next().await { stdout .write_all(CONTENT_LEN_HEADER.as_bytes()) @@ -694,7 +697,7 @@ impl FakeLanguageServer { let message_len: usize = std::str::from_utf8(buffer) .unwrap() .strip_prefix(CONTENT_LEN_HEADER) - .unwrap() + .ok_or_else(|| anyhow!("invalid content length header"))? .trim_end() .parse() .unwrap(); @@ -704,47 +707,9 @@ impl FakeLanguageServer { } } -struct PipeWriterCloseOnDrop(async_pipe::PipeWriter); +struct ClearResponseHandlers(Arc>>); -impl Drop for PipeWriterCloseOnDrop { - fn drop(&mut self) { - self.0.close().ok(); - } -} - -impl AsyncWrite for PipeWriterCloseOnDrop { - fn poll_write( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - buf: &[u8], - ) -> std::task::Poll> { - let pipe = &mut self.0; - smol::pin!(pipe); - pipe.poll_write(cx, buf) - } - - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let pipe = &mut self.0; - smol::pin!(pipe); - pipe.poll_flush(cx) - } - - fn poll_close( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let pipe = &mut self.0; - smol::pin!(pipe); - pipe.poll_close(cx) - } -} - -struct ClearResponseChannels(Arc>>); - -impl Drop for ClearResponseChannels { +impl Drop for ClearResponseHandlers { fn drop(&mut self) { self.0.lock().clear(); }