diff --git a/devices/Cargo.toml b/devices/Cargo.toml index 9bf43bb007..a173566bb0 100644 --- a/devices/Cargo.toml +++ b/devices/Cargo.toml @@ -69,7 +69,7 @@ vm_memory = { path = "../vm_memory" } [dependencies.futures] version = "*" -features = ["std"] +features = ["async-await", "std"] default-features = false [dev-dependencies] diff --git a/devices/src/virtio/snd/cras_backend/mod.rs b/devices/src/virtio/snd/cras_backend/mod.rs index eb0aa35ea7..276128acbc 100644 --- a/devices/src/virtio/snd/cras_backend/mod.rs +++ b/devices/src/virtio/snd/cras_backend/mod.rs @@ -4,22 +4,24 @@ // virtio-sound spec: https://github.com/oasis-tcs/virtio-spec/blob/master/virtio-sound.tex +use std::cell::RefCell; use std::fmt; use std::io; use std::rc::Rc; use std::str::{FromStr, ParseBoolError}; use std::thread; +use anyhow::Context; use audio_streams::{SampleFormat, StreamSource}; use base::{error, warn, Error as SysError, Event, RawDescriptor}; use cros_async::sync::{Condvar, Mutex as AsyncMutex}; -use cros_async::{select6, AsyncError, EventAsync, Executor, SelectResult}; +use cros_async::{AsyncError, EventAsync, Executor}; use data_model::DataInit; use futures::channel::{ mpsc, oneshot::{self, Canceled}, }; -use futures::{pin_mut, Future, TryFutureExt}; +use futures::{pin_mut, select, Future, FutureExt, TryFutureExt}; use libcras::{BoxError, CrasClient, CrasClientType, CrasSocketType}; use sys_util::{set_rt_prio_limit, set_rt_round_robin}; use thiserror::Error as ThisError; @@ -637,7 +639,8 @@ fn run_worker( streams.resize_with(snd_data.pcm_info.len(), Default::default); let streams = Rc::new(AsyncMutex::new(streams)); - let interrupt = Rc::new(interrupt); + let interrupt = Rc::new(RefCell::new(interrupt)); + let interrupt_ref = &*interrupt.borrow(); let ctrl_queue = queues.remove(0); let _event_queue = queues.remove(0); @@ -666,7 +669,7 @@ fn run_worker( &snd_data, ctrl_queue, ctrl_queue_evt, - interrupt.as_ref(), + interrupt_ref, tx_send, rx_send, ¶ms, @@ -683,45 +686,42 @@ fn run_worker( let f_tx = handle_pcm_queue(&mem, &streams, tx_send2, &tx_queue, tx_queue_evt); - let f_tx_response = send_pcm_response_worker(&mem, &tx_queue, interrupt.as_ref(), &mut tx_recv); + let f_tx_response = send_pcm_response_worker(&mem, &tx_queue, interrupt_ref, &mut tx_recv); let f_rx = handle_pcm_queue(&mem, &streams, rx_send2, &rx_queue, rx_queue_evt); - let f_rx_response = send_pcm_response_worker(&mem, &rx_queue, interrupt.as_ref(), &mut rx_recv); + let f_rx_response = send_pcm_response_worker(&mem, &rx_queue, interrupt_ref, &mut rx_recv); + + let f_resample = async_utils::handle_irq_resample(&ex, interrupt.clone()); // Exit if the kill event is triggered. let f_kill = async_utils::await_and_exit(&ex, kill_evt); - pin_mut!(f_ctrl, f_tx, f_tx_response, f_rx, f_rx_response, f_kill); - - match ex.run_until(select6( + pin_mut!( f_ctrl, f_tx, f_tx_response, f_rx, f_rx_response, - f_kill, - )) { - Ok((r_ctrl, r_tx, r_tx_response, r_rx, r_rx_response, _r_kill)) => { - if let SelectResult::Finished(Err(e)) = r_ctrl { - return Err(format!("Error in handling ctrl queue: {}", e)); - } - if let SelectResult::Finished(Err(e)) = r_tx { - return Err(format!("Error in handling tx queue: {}", e)); - } - if let SelectResult::Finished(Err(e)) = r_tx_response { - return Err(format!("Error in handling tx response: {}", e)); - } - if let SelectResult::Finished(Err(e)) = r_rx { - return Err(format!("Error in handling rx queue: {}", e)); - } - if let SelectResult::Finished(Err(e)) = r_rx_response { - return Err(format!("Error in handling rx response: {}", e)); - } - } - Err(e) => { - error!("Error happened in executor: {}", e); + f_resample, + f_kill + ); + + let done = async { + select! { + res = f_ctrl.fuse() => res.context("error in handling ctrl queue"), + res = f_tx.fuse() => res.context("error in handling tx queue"), + res = f_tx_response.fuse() => res.context("error in handling tx response"), + res = f_rx.fuse() => res.context("error in handling rx queue"), + res = f_rx_response.fuse() => res.context("error in handling rx response"), + res = f_resample.fuse() => res.context("error in handle_irq_resample"), + res = f_kill.fuse() => res.context("error in await_and_exit"), } + }; + match ex.run_until(done) { + Ok(Ok(())) => {} + Ok(Err(e)) => error!("Error in worker: {}", e), + Err(e) => error!("Error happened in executor: {}", e), } Ok(())