Move from threads to async for process output

This commit is contained in:
Seaotatop 2023-06-29 11:58:33 -05:00
parent 8f218c6be2
commit 24c13f6f7c
2 changed files with 57 additions and 43 deletions

View file

@ -17,6 +17,9 @@ image = {version = "0.24.0", default-features = false, optional = true}
serde = { version = "1.0.164", features = ["derive"] } serde = { version = "1.0.164", features = ["derive"] }
rmp = { version = "0.8.11" } rmp = { version = "0.8.11" }
rmp-serde = { version = "1.1.1" } rmp-serde = { version = "1.1.1" }
calloop = { version = "0.10.1", features = ["executor", "futures-io"] }
futures-lite = { version = "1.13.0" }
async-process = { version = "1.7.0" }
[features] [features]
default = ["egl", "winit", "udev"] default = ["egl", "winit", "udev"]

View file

@ -6,8 +6,7 @@
use std::{ use std::{
error::Error, error::Error,
ffi::{CString, OsString}, ffi::OsString,
io::{BufRead, BufReader},
os::{fd::AsRawFd, unix::net::UnixStream}, os::{fd::AsRawFd, unix::net::UnixStream},
process::Stdio, process::Stdio,
sync::{Arc, Mutex}, sync::{Arc, Mutex},
@ -21,6 +20,8 @@ use crate::{
focus::FocusState, focus::FocusState,
window::{window_state::WindowState, WindowProperties}, window::{window_state::WindowState, WindowProperties},
}; };
use calloop::futures::Scheduler;
use futures_lite::AsyncBufReadExt;
use smithay::{ use smithay::{
backend::renderer::element::RenderElementStates, backend::renderer::element::RenderElementStates,
desktop::{ desktop::{
@ -88,6 +89,8 @@ pub struct State<B: Backend> {
pub cursor_status: CursorImageStatus, pub cursor_status: CursorImageStatus,
pub pointer_location: Point<f64, Logical>, pub pointer_location: Point<f64, Logical>,
pub async_scheduler: Scheduler<()>,
} }
impl<B: Backend> State<B> { impl<B: Backend> State<B> {
@ -290,6 +293,8 @@ impl<B: Backend> State<B> {
})?; })?;
// We want to replace the client if a new one pops up // We want to replace the client if a new one pops up
// TODO: there should only ever be one client working at a time, and creating a new client
// | when one is already running should be impossible.
// INFO: this source try_clone()s the stream // INFO: this source try_clone()s the stream
loop_handle.insert_source(PinnacleSocketSource::new(tx_channel)?, |stream, _, data| { loop_handle.insert_source(PinnacleSocketSource::new(tx_channel)?, |stream, _, data| {
if let Some(old_stream) = data if let Some(old_stream) = data
@ -306,6 +311,9 @@ impl<B: Backend> State<B> {
} }
})?; })?;
let (executor, sched) = calloop::futures::executor::<()>().unwrap();
loop_handle.insert_source(executor, |_, _, _| {})?;
// TODO: move all this into the lua api // TODO: move all this into the lua api
let config_path = std::env::var("PINNACLE_CONFIG").unwrap_or_else(|_| { let config_path = std::env::var("PINNACLE_CONFIG").unwrap_or_else(|_| {
let mut default_path = let mut default_path =
@ -367,6 +375,8 @@ impl<B: Backend> State<B> {
socket_name: socket_name.to_string_lossy().to_string(), socket_name: socket_name.to_string_lossy().to_string(),
popup_manager: PopupManager::default(), popup_manager: PopupManager::default(),
async_scheduler: sched,
}) })
} }
@ -377,11 +387,8 @@ impl<B: Backend> State<B> {
return; return;
} }
let mut child = std::process::Command::new(OsString::from(command.next().unwrap())) let mut child = async_process::Command::new(OsString::from(command.next().unwrap()))
.env("WAYLAND_DISPLAY", self.socket_name.clone()) .env("WAYLAND_DISPLAY", self.socket_name.clone())
.stdin(Stdio::null())
.stdout(Stdio::null())
.stderr(Stdio::null())
.stdin(if callback_id.is_some() { .stdin(if callback_id.is_some() {
Stdio::piped() Stdio::piped()
} else { } else {
@ -411,14 +418,14 @@ impl<B: Backend> State<B> {
let stream_err = stream_out.clone(); let stream_err = stream_out.clone();
let stream_exit = stream_out.clone(); let stream_exit = stream_out.clone();
// TODO: make this not use 3 whole threads per process
if let Some(stdout) = stdout { if let Some(stdout) = stdout {
std::thread::spawn(move || { let future = async move {
let mut reader = BufReader::new(stdout); // TODO: use BufReader::new().lines()
let mut reader = futures_lite::io::BufReader::new(stdout);
loop { loop {
let mut buf = String::new(); let mut buf = String::new();
match reader.read_line(&mut buf) { match reader.read_line(&mut buf).await {
Ok(0) => break, // stream closed Ok(0) => break,
Ok(_) => { Ok(_) => {
let mut stream = stream_out.lock().unwrap(); let mut stream = stream_out.lock().unwrap();
crate::api::send_to_client( crate::api::send_to_client(
@ -436,20 +443,21 @@ impl<B: Backend> State<B> {
.unwrap(); .unwrap();
} }
Err(err) => { Err(err) => {
tracing::error!("child read err: {err}"); tracing::warn!("child read err: {err}");
break; break;
} },
} }
} }
}); };
self.async_scheduler.schedule(future).unwrap();
} }
if let Some(stderr) = stderr { if let Some(stderr) = stderr {
std::thread::spawn(move || { let future = async move {
let mut reader = BufReader::new(stderr); let mut reader = futures_lite::io::BufReader::new(stderr);
loop { loop {
let mut buf = String::new(); let mut buf = String::new();
match reader.read_line(&mut buf) { match reader.read_line(&mut buf).await {
Ok(0) => break, // stream closed Ok(0) => break,
Ok(_) => { Ok(_) => {
let mut stream = stream_err.lock().unwrap(); let mut stream = stream_err.lock().unwrap();
crate::api::send_to_client( crate::api::send_to_client(
@ -467,36 +475,39 @@ impl<B: Backend> State<B> {
.unwrap(); .unwrap();
} }
Err(err) => { Err(err) => {
tracing::error!("child read err: {err}"); tracing::warn!("child read err: {err}");
break; break;
} },
} }
} }
}); };
self.async_scheduler.schedule(future).unwrap();
} }
std::thread::spawn(move || match child.wait() {
Ok(exit_status) => { let future = async move {
let mut stream = stream_exit.lock().unwrap(); match child.status().await {
crate::api::send_to_client( Ok(exit_status) => {
&mut stream, let mut stream = stream_exit.lock().unwrap();
&OutgoingMsg::CallCallback { crate::api::send_to_client(
callback_id, &mut stream,
args: Some(Args::Spawn { &OutgoingMsg::CallCallback {
stdout: None, callback_id,
stderr: None, args: Some(Args::Spawn {
exit_code: exit_status.code(), stdout: None,
exit_msg: Some(exit_status.to_string()), stderr: None,
}), exit_code: exit_status.code(),
}, exit_msg: Some(exit_status.to_string()),
) }),
.unwrap() },
)
.unwrap()
}
Err(err) => {
tracing::warn!("child wait() err: {err}");
}
} }
Err(err) => { };
tracing::warn!("child wait() err: {err}"); self.async_scheduler.schedule(future).unwrap();
}
});
} else {
std::thread::spawn(move || child.wait());
} }
} }
} }