diff --git a/api/rust/src/input.rs b/api/rust/src/input.rs index 2605a36..b03ce36 100644 --- a/api/rust/src/input.rs +++ b/api/rust/src/input.rs @@ -177,6 +177,7 @@ impl Input { while let Some(Ok(_response)) = stream.next().await { action(); + tokio::task::yield_now().await; } } .boxed(), @@ -232,6 +233,7 @@ impl Input { while let Some(Ok(_response)) = stream.next().await { action(); + tokio::task::yield_now().await; } } .boxed(), diff --git a/api/rust/src/lib.rs b/api/rust/src/lib.rs index 68a2edf..7cb589e 100644 --- a/api/rust/src/lib.rs +++ b/api/rust/src/lib.rs @@ -82,8 +82,10 @@ use std::sync::OnceLock; use futures::{ - channel::mpsc::UnboundedReceiver, future::BoxFuture, stream::FuturesUnordered, Future, - StreamExt, + channel::mpsc::UnboundedReceiver, + future::{BoxFuture, Either}, + stream::FuturesUnordered, + Future, StreamExt, }; use input::Input; use output::Output; @@ -175,31 +177,17 @@ pub async fn connect( /// /// This function is inserted at the end of your config through the [`config`] macro. /// You should use the macro instead of this function directly. -pub async fn listen(fut_recv: UnboundedReceiver>) { - let mut future_set = FuturesUnordered::< - BoxFuture<( - Option>, - Option>>, - )>, - >::new(); +pub async fn listen(mut fut_recv: UnboundedReceiver>) { + let mut future_set = FuturesUnordered::>::new(); - future_set.push(Box::pin(async move { - let (fut, stream) = fut_recv.into_future().await; - (fut, Some(stream)) - })); - - while let Some((fut, stream)) = future_set.next().await { - if let Some(fut) = fut { - future_set.push(Box::pin(async move { - fut.await; - (None, None) - })); - } - if let Some(stream) = stream { - future_set.push(Box::pin(async move { - let (fut, stream) = stream.into_future().await; - (fut, Some(stream)) - })) + loop { + match futures::future::select(fut_recv.next(), future_set.next()).await { + Either::Left((fut, _)) => { + if let Some(fut) = fut { + future_set.push(fut); + } + } + Either::Right(_) => (), } } } diff --git a/api/rust/src/output.rs b/api/rust/src/output.rs index d8b27e3..3e9f8e4 100644 --- a/api/rust/src/output.rs +++ b/api/rust/src/output.rs @@ -166,6 +166,7 @@ impl Output { }; for_all(output); + tokio::task::yield_now().await; } } .boxed(), diff --git a/api/rust/src/process.rs b/api/rust/src/process.rs index c93a063..56b5dd1 100644 --- a/api/rust/src/process.rs +++ b/api/rust/src/process.rs @@ -153,6 +153,7 @@ impl Process { exit(response.exit_code, exit_msg); } } + tokio::task::yield_now().await; } } .boxed(),