Fix deadlock again (maybe)

This took like 4 hours to figure out, async sucks wtf
This commit is contained in:
Ottatop 2024-02-23 16:17:22 -06:00
parent a70e686bdf
commit 01a9874ecd
4 changed files with 18 additions and 26 deletions

View file

@ -177,6 +177,7 @@ impl Input {
while let Some(Ok(_response)) = stream.next().await { while let Some(Ok(_response)) = stream.next().await {
action(); action();
tokio::task::yield_now().await;
} }
} }
.boxed(), .boxed(),
@ -232,6 +233,7 @@ impl Input {
while let Some(Ok(_response)) = stream.next().await { while let Some(Ok(_response)) = stream.next().await {
action(); action();
tokio::task::yield_now().await;
} }
} }
.boxed(), .boxed(),

View file

@ -82,8 +82,10 @@
use std::sync::OnceLock; use std::sync::OnceLock;
use futures::{ use futures::{
channel::mpsc::UnboundedReceiver, future::BoxFuture, stream::FuturesUnordered, Future, channel::mpsc::UnboundedReceiver,
StreamExt, future::{BoxFuture, Either},
stream::FuturesUnordered,
Future, StreamExt,
}; };
use input::Input; use input::Input;
use output::Output; use output::Output;
@ -175,31 +177,17 @@ pub async fn connect(
/// ///
/// This function is inserted at the end of your config through the [`config`] macro. /// This function is inserted at the end of your config through the [`config`] macro.
/// You should use the macro instead of this function directly. /// You should use the macro instead of this function directly.
pub async fn listen(fut_recv: UnboundedReceiver<BoxFuture<'static, ()>>) { pub async fn listen(mut fut_recv: UnboundedReceiver<BoxFuture<'static, ()>>) {
let mut future_set = FuturesUnordered::< let mut future_set = FuturesUnordered::<BoxFuture<()>>::new();
BoxFuture<(
Option<BoxFuture<()>>,
Option<UnboundedReceiver<BoxFuture<()>>>,
)>,
>::new();
future_set.push(Box::pin(async move { loop {
let (fut, stream) = fut_recv.into_future().await; match futures::future::select(fut_recv.next(), future_set.next()).await {
(fut, Some(stream)) Either::Left((fut, _)) => {
}));
while let Some((fut, stream)) = future_set.next().await {
if let Some(fut) = fut { if let Some(fut) = fut {
future_set.push(Box::pin(async move { future_set.push(fut);
fut.await;
(None, None)
}));
} }
if let Some(stream) = stream { }
future_set.push(Box::pin(async move { Either::Right(_) => (),
let (fut, stream) = stream.into_future().await;
(fut, Some(stream))
}))
} }
} }
} }

View file

@ -166,6 +166,7 @@ impl Output {
}; };
for_all(output); for_all(output);
tokio::task::yield_now().await;
} }
} }
.boxed(), .boxed(),

View file

@ -153,6 +153,7 @@ impl Process {
exit(response.exit_code, exit_msg); exit(response.exit_code, exit_msg);
} }
} }
tokio::task::yield_now().await;
} }
} }
.boxed(), .boxed(),