mirror of
https://github.com/zed-industries/zed.git
synced 2025-01-24 11:01:54 +00:00
lsp: Parse LSP messages on background thread - again (#23122)
Some checks are pending
CI / check_docs_only (push) Waiting to run
CI / Check Postgres and Protobuf migrations, mergability (push) Waiting to run
CI / Check formatting and spelling (push) Waiting to run
CI / (macOS) Run Clippy and tests (push) Blocked by required conditions
CI / (Linux) Run Clippy and tests (push) Blocked by required conditions
CI / (Linux) Build Remote Server (push) Blocked by required conditions
CI / (Windows) Run Clippy and tests (push) Blocked by required conditions
CI / Create a macOS bundle (push) Blocked by required conditions
CI / Create a Linux bundle (push) Blocked by required conditions
CI / Create arm64 Linux bundle (push) Blocked by required conditions
CI / Auto release preview (push) Blocked by required conditions
Deploy Docs / Deploy Docs (push) Waiting to run
Docs / Check formatting (push) Waiting to run
Script / ShellCheck Scripts (push) Waiting to run
Some checks are pending
CI / check_docs_only (push) Waiting to run
CI / Check Postgres and Protobuf migrations, mergability (push) Waiting to run
CI / Check formatting and spelling (push) Waiting to run
CI / (macOS) Run Clippy and tests (push) Blocked by required conditions
CI / (Linux) Run Clippy and tests (push) Blocked by required conditions
CI / (Linux) Build Remote Server (push) Blocked by required conditions
CI / (Windows) Run Clippy and tests (push) Blocked by required conditions
CI / Create a macOS bundle (push) Blocked by required conditions
CI / Create a Linux bundle (push) Blocked by required conditions
CI / Create arm64 Linux bundle (push) Blocked by required conditions
CI / Auto release preview (push) Blocked by required conditions
Deploy Docs / Deploy Docs (push) Waiting to run
Docs / Check formatting (push) Waiting to run
Script / ShellCheck Scripts (push) Waiting to run
This is a follow-up to #12640. While profiling latency of working with a project with 8192 diagnostics I've noticed that while we're parsing the LSP messages into a generic message struct on a background thread, we can still block the main thread as the conversion between that generic message struct and the actual LSP message (for use by callback) is still happening on the main thread. This PR significantly constrains what a message callback can use, so that it can be executed on any thread; we also send off message conversion to the background thread. In practice new callback constraints were already satisfied by all call sites, so no code outside of the lsp crate had to be adjusted. This has improved throughput of my 8192-benchmark from 40s to send out all diagnostics after saving to ~20s. Now main thread is spending most of the time updating our diagnostics sets, which can probably be improved too. Closes #ISSUE Release Notes: - Improved app responsiveness with huge # of diagnostics.
This commit is contained in:
parent
8e65ec1022
commit
1b3b825c7f
3 changed files with 83 additions and 62 deletions
|
@ -4206,6 +4206,7 @@ async fn test_collaborating_with_lsp_progress_updates_and_diagnostics_ordering(
|
||||||
}],
|
}],
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
executor.run_until_parked();
|
||||||
}
|
}
|
||||||
fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
|
fake_language_server.notify::<lsp::notification::Progress>(&lsp::ProgressParams {
|
||||||
token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
|
token: lsp::NumberOrString::String("the-disk-based-token".to_string()),
|
||||||
|
|
|
@ -315,12 +315,12 @@ impl EditorLspTestContext {
|
||||||
|
|
||||||
pub fn handle_request<T, F, Fut>(
|
pub fn handle_request<T, F, Fut>(
|
||||||
&self,
|
&self,
|
||||||
mut handler: F,
|
handler: F,
|
||||||
) -> futures::channel::mpsc::UnboundedReceiver<()>
|
) -> futures::channel::mpsc::UnboundedReceiver<()>
|
||||||
where
|
where
|
||||||
T: 'static + request::Request,
|
T: 'static + request::Request,
|
||||||
T::Params: 'static + Send,
|
T::Params: 'static + Send,
|
||||||
F: 'static + Send + FnMut(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut,
|
F: 'static + Send + Sync + Fn(lsp::Url, T::Params, gpui::AsyncAppContext) -> Fut,
|
||||||
Fut: 'static + Send + Future<Output = Result<T::Result>>,
|
Fut: 'static + Send + Future<Output = Result<T::Result>>,
|
||||||
{
|
{
|
||||||
let url = self.buffer_lsp_url.clone();
|
let url = self.buffer_lsp_url.clone();
|
||||||
|
|
|
@ -45,7 +45,7 @@ const CONTENT_LEN_HEADER: &str = "Content-Length: ";
|
||||||
const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
|
const LSP_REQUEST_TIMEOUT: Duration = Duration::from_secs(60 * 2);
|
||||||
const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
const SERVER_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
type NotificationHandler = Box<dyn Send + FnMut(Option<RequestId>, Value, AsyncAppContext)>;
|
type NotificationHandler = Arc<dyn Send + Sync + Fn(Option<RequestId>, Value, AsyncAppContext)>;
|
||||||
type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
|
type ResponseHandler = Box<dyn Send + FnOnce(Result<String, Error>)>;
|
||||||
type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
|
type IoHandler = Box<dyn Send + FnMut(IoKind, &str)>;
|
||||||
|
|
||||||
|
@ -890,7 +890,7 @@ impl LanguageServer {
|
||||||
pub fn on_notification<T, F>(&self, f: F) -> Subscription
|
pub fn on_notification<T, F>(&self, f: F) -> Subscription
|
||||||
where
|
where
|
||||||
T: notification::Notification,
|
T: notification::Notification,
|
||||||
F: 'static + Send + FnMut(T::Params, AsyncAppContext),
|
F: 'static + Send + Sync + Fn(T::Params, AsyncAppContext),
|
||||||
{
|
{
|
||||||
self.on_custom_notification(T::METHOD, f)
|
self.on_custom_notification(T::METHOD, f)
|
||||||
}
|
}
|
||||||
|
@ -903,7 +903,7 @@ impl LanguageServer {
|
||||||
where
|
where
|
||||||
T: request::Request,
|
T: request::Request,
|
||||||
T::Params: 'static + Send,
|
T::Params: 'static + Send,
|
||||||
F: 'static + FnMut(T::Params, AsyncAppContext) -> Fut + Send,
|
F: 'static + Fn(T::Params, AsyncAppContext) -> Fut + Send + Sync,
|
||||||
Fut: 'static + Future<Output = Result<T::Result>>,
|
Fut: 'static + Future<Output = Result<T::Result>>,
|
||||||
{
|
{
|
||||||
self.on_custom_request(T::METHOD, f)
|
self.on_custom_request(T::METHOD, f)
|
||||||
|
@ -939,17 +939,27 @@ impl LanguageServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[must_use]
|
#[must_use]
|
||||||
fn on_custom_notification<Params, F>(&self, method: &'static str, mut f: F) -> Subscription
|
fn on_custom_notification<Params, F>(&self, method: &'static str, f: F) -> Subscription
|
||||||
where
|
where
|
||||||
F: 'static + FnMut(Params, AsyncAppContext) + Send,
|
F: 'static + Fn(Params, AsyncAppContext) + Send + Sync,
|
||||||
Params: DeserializeOwned,
|
Params: DeserializeOwned + Send + 'static,
|
||||||
{
|
{
|
||||||
|
let callback = Arc::new(f);
|
||||||
let prev_handler = self.notification_handlers.lock().insert(
|
let prev_handler = self.notification_handlers.lock().insert(
|
||||||
method,
|
method,
|
||||||
Box::new(move |_, params, cx| {
|
Arc::new(move |_, params, cx| {
|
||||||
if let Some(params) = serde_json::from_value(params).log_err() {
|
let callback = callback.clone();
|
||||||
f(params, cx);
|
|
||||||
}
|
cx.spawn(move |cx| async move {
|
||||||
|
if let Some(params) = cx
|
||||||
|
.background_executor()
|
||||||
|
.spawn(async move { serde_json::from_value(params).log_err() })
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
callback(params, cx);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.detach();
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
assert!(
|
assert!(
|
||||||
|
@ -963,64 +973,74 @@ impl LanguageServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[must_use]
|
#[must_use]
|
||||||
fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, mut f: F) -> Subscription
|
fn on_custom_request<Params, Res, Fut, F>(&self, method: &'static str, f: F) -> Subscription
|
||||||
where
|
where
|
||||||
F: 'static + FnMut(Params, AsyncAppContext) -> Fut + Send,
|
F: 'static + Fn(Params, AsyncAppContext) -> Fut + Send + Sync,
|
||||||
Fut: 'static + Future<Output = Result<Res>>,
|
Fut: 'static + Future<Output = Result<Res>>,
|
||||||
Params: DeserializeOwned + Send + 'static,
|
Params: DeserializeOwned + Send + 'static,
|
||||||
Res: Serialize,
|
Res: Serialize,
|
||||||
{
|
{
|
||||||
let outbound_tx = self.outbound_tx.clone();
|
let outbound_tx = self.outbound_tx.clone();
|
||||||
|
let f = Arc::new(f);
|
||||||
let prev_handler = self.notification_handlers.lock().insert(
|
let prev_handler = self.notification_handlers.lock().insert(
|
||||||
method,
|
method,
|
||||||
Box::new(move |id, params, cx| {
|
Arc::new(move |id, params, cx| {
|
||||||
if let Some(id) = id {
|
if let Some(id) = id {
|
||||||
match serde_json::from_value(params) {
|
let f = f.clone();
|
||||||
Ok(params) => {
|
let deserialized_params = cx
|
||||||
let response = f(params, cx.clone());
|
.background_executor()
|
||||||
cx.foreground_executor()
|
.spawn(async move { serde_json::from_value(params) });
|
||||||
.spawn({
|
|
||||||
let outbound_tx = outbound_tx.clone();
|
|
||||||
async move {
|
|
||||||
let response = match response.await {
|
|
||||||
Ok(result) => Response {
|
|
||||||
jsonrpc: JSON_RPC_VERSION,
|
|
||||||
id,
|
|
||||||
value: LspResult::Ok(Some(result)),
|
|
||||||
},
|
|
||||||
Err(error) => Response {
|
|
||||||
jsonrpc: JSON_RPC_VERSION,
|
|
||||||
id,
|
|
||||||
value: LspResult::Error(Some(Error {
|
|
||||||
message: error.to_string(),
|
|
||||||
})),
|
|
||||||
},
|
|
||||||
};
|
|
||||||
if let Some(response) =
|
|
||||||
serde_json::to_string(&response).log_err()
|
|
||||||
{
|
|
||||||
outbound_tx.try_send(response).ok();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.detach();
|
|
||||||
}
|
|
||||||
|
|
||||||
Err(error) => {
|
cx.spawn({
|
||||||
log::error!("error deserializing {} request: {:?}", method, error);
|
let outbound_tx = outbound_tx.clone();
|
||||||
let response = AnyResponse {
|
move |cx| async move {
|
||||||
jsonrpc: JSON_RPC_VERSION,
|
match deserialized_params.await {
|
||||||
id,
|
Ok(params) => {
|
||||||
result: None,
|
let response = f(params, cx.clone());
|
||||||
error: Some(Error {
|
let response = match response.await {
|
||||||
message: error.to_string(),
|
Ok(result) => Response {
|
||||||
}),
|
jsonrpc: JSON_RPC_VERSION,
|
||||||
};
|
id,
|
||||||
if let Some(response) = serde_json::to_string(&response).log_err() {
|
value: LspResult::Ok(Some(result)),
|
||||||
outbound_tx.try_send(response).ok();
|
},
|
||||||
|
Err(error) => Response {
|
||||||
|
jsonrpc: JSON_RPC_VERSION,
|
||||||
|
id,
|
||||||
|
value: LspResult::Error(Some(Error {
|
||||||
|
message: error.to_string(),
|
||||||
|
})),
|
||||||
|
},
|
||||||
|
};
|
||||||
|
if let Some(response) =
|
||||||
|
serde_json::to_string(&response).log_err()
|
||||||
|
{
|
||||||
|
outbound_tx.try_send(response).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(error) => {
|
||||||
|
log::error!(
|
||||||
|
"error deserializing {} request: {:?}",
|
||||||
|
method,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
let response = AnyResponse {
|
||||||
|
jsonrpc: JSON_RPC_VERSION,
|
||||||
|
id,
|
||||||
|
result: None,
|
||||||
|
error: Some(Error {
|
||||||
|
message: error.to_string(),
|
||||||
|
}),
|
||||||
|
};
|
||||||
|
if let Some(response) =
|
||||||
|
serde_json::to_string(&response).log_err()
|
||||||
|
{
|
||||||
|
outbound_tx.try_send(response).ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
})
|
||||||
|
.detach();
|
||||||
}
|
}
|
||||||
}),
|
}),
|
||||||
);
|
);
|
||||||
|
@ -1425,12 +1445,12 @@ impl FakeLanguageServer {
|
||||||
/// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
|
/// Registers a handler for a specific kind of request. Removes any existing handler for specified request type.
|
||||||
pub fn handle_request<T, F, Fut>(
|
pub fn handle_request<T, F, Fut>(
|
||||||
&self,
|
&self,
|
||||||
mut handler: F,
|
handler: F,
|
||||||
) -> futures::channel::mpsc::UnboundedReceiver<()>
|
) -> futures::channel::mpsc::UnboundedReceiver<()>
|
||||||
where
|
where
|
||||||
T: 'static + request::Request,
|
T: 'static + request::Request,
|
||||||
T::Params: 'static + Send,
|
T::Params: 'static + Send,
|
||||||
F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext) -> Fut,
|
F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext) -> Fut,
|
||||||
Fut: 'static + Send + Future<Output = Result<T::Result>>,
|
Fut: 'static + Send + Future<Output = Result<T::Result>>,
|
||||||
{
|
{
|
||||||
let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
|
let (responded_tx, responded_rx) = futures::channel::mpsc::unbounded();
|
||||||
|
@ -1454,12 +1474,12 @@ impl FakeLanguageServer {
|
||||||
/// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
|
/// Registers a handler for a specific kind of notification. Removes any existing handler for specified notification type.
|
||||||
pub fn handle_notification<T, F>(
|
pub fn handle_notification<T, F>(
|
||||||
&self,
|
&self,
|
||||||
mut handler: F,
|
handler: F,
|
||||||
) -> futures::channel::mpsc::UnboundedReceiver<()>
|
) -> futures::channel::mpsc::UnboundedReceiver<()>
|
||||||
where
|
where
|
||||||
T: 'static + notification::Notification,
|
T: 'static + notification::Notification,
|
||||||
T::Params: 'static + Send,
|
T::Params: 'static + Send,
|
||||||
F: 'static + Send + FnMut(T::Params, gpui::AsyncAppContext),
|
F: 'static + Send + Sync + Fn(T::Params, gpui::AsyncAppContext),
|
||||||
{
|
{
|
||||||
let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
|
let (handled_tx, handled_rx) = futures::channel::mpsc::unbounded();
|
||||||
self.server.remove_notification_handler::<T>();
|
self.server.remove_notification_handler::<T>();
|
||||||
|
|
Loading…
Reference in a new issue