Merge main deadlock fix

This commit is contained in:
Ottatop 2024-02-23 16:30:59 -06:00
commit 5db8b1c54d
3 changed files with 18 additions and 25 deletions

View file

@ -178,6 +178,7 @@ impl Input {
while let Some(Ok(_response)) = stream.next().await {
action();
tokio::task::yield_now().await;
}
}
.boxed(),
@ -233,6 +234,7 @@ impl Input {
while let Some(Ok(_response)) = stream.next().await {
action();
tokio::task::yield_now().await;
}
}
.boxed(),

View file

@ -81,7 +81,11 @@
use std::sync::OnceLock;
use futures::{future::BoxFuture, stream::FuturesUnordered, Future, StreamExt};
use futures::{
future::{BoxFuture, Either},
stream::FuturesUnordered,
Future, StreamExt,
};
use input::Input;
use output::Output;
use pinnacle::Pinnacle;
@ -186,32 +190,18 @@ pub async fn connect(
/// This function is inserted at the end of your config through the [`config`] macro.
/// You should use the macro instead of this function directly.
pub async fn listen(fut_recv: UnboundedReceiver<BoxFuture<'static, ()>>) {
let fut_recv = UnboundedReceiverStream::new(fut_recv);
let mut future_set = FuturesUnordered::<BoxFuture<()>>::new();
let mut future_set = FuturesUnordered::<
BoxFuture<(
Option<BoxFuture<()>>,
Option<UnboundedReceiverStream<BoxFuture<()>>>,
)>,
>::new();
let mut fut_recv = UnboundedReceiverStream::new(fut_recv);
future_set.push(Box::pin(async move {
let (fut, stream) = fut_recv.into_future().await;
(fut, Some(stream))
}));
while let Some((fut, stream)) = future_set.next().await {
loop {
match futures::future::select(fut_recv.next(), future_set.next()).await {
Either::Left((fut, _)) => {
if let Some(fut) = fut {
future_set.push(Box::pin(async move {
fut.await;
(None, None)
}));
future_set.push(fut);
}
if let Some(stream) = stream {
future_set.push(Box::pin(async move {
let (fut, stream) = stream.into_future().await;
(fut, Some(stream))
}))
}
Either::Right(_) => (),
}
}
}

View file

@ -154,6 +154,7 @@ impl Process {
exit(response.exit_code, exit_msg);
}
}
tokio::task::yield_now().await;
}
}
.boxed(),