diff --git a/api/rust/src/input.rs b/api/rust/src/input.rs index 43a784a..256fa31 100644 --- a/api/rust/src/input.rs +++ b/api/rust/src/input.rs @@ -178,6 +178,7 @@ impl Input { while let Some(Ok(_response)) = stream.next().await { action(); + tokio::task::yield_now().await; } } .boxed(), @@ -233,6 +234,7 @@ impl Input { while let Some(Ok(_response)) = stream.next().await { action(); + tokio::task::yield_now().await; } } .boxed(), diff --git a/api/rust/src/lib.rs b/api/rust/src/lib.rs index f9a1b66..728ecd5 100644 --- a/api/rust/src/lib.rs +++ b/api/rust/src/lib.rs @@ -81,7 +81,11 @@ use std::sync::OnceLock; -use futures::{future::BoxFuture, stream::FuturesUnordered, Future, StreamExt}; +use futures::{ + future::{BoxFuture, Either}, + stream::FuturesUnordered, + Future, StreamExt, +}; use input::Input; use output::Output; use pinnacle::Pinnacle; @@ -186,32 +190,18 @@ pub async fn connect( /// This function is inserted at the end of your config through the [`config`] macro. /// You should use the macro instead of this function directly. pub async fn listen(fut_recv: UnboundedReceiver>) { - let fut_recv = UnboundedReceiverStream::new(fut_recv); + let mut future_set = FuturesUnordered::>::new(); - let mut future_set = FuturesUnordered::< - BoxFuture<( - Option>, - Option>>, - )>, - >::new(); + let mut fut_recv = UnboundedReceiverStream::new(fut_recv); - future_set.push(Box::pin(async move { - let (fut, stream) = fut_recv.into_future().await; - (fut, Some(stream)) - })); - - while let Some((fut, stream)) = future_set.next().await { - if let Some(fut) = fut { - future_set.push(Box::pin(async move { - fut.await; - (None, None) - })); - } - if let Some(stream) = stream { - future_set.push(Box::pin(async move { - let (fut, stream) = stream.into_future().await; - (fut, Some(stream)) - })) + loop { + match futures::future::select(fut_recv.next(), future_set.next()).await { + Either::Left((fut, _)) => { + if let Some(fut) = fut { + future_set.push(fut); + } + } + Either::Right(_) => (), } } } diff --git a/api/rust/src/process.rs b/api/rust/src/process.rs index 3192aaf..e9f10e7 100644 --- a/api/rust/src/process.rs +++ b/api/rust/src/process.rs @@ -154,6 +154,7 @@ impl Process { exit(response.exit_code, exit_msg); } } + tokio::task::yield_now().await; } } .boxed(),