Fix Rust API not exiting correctly

My goodness was that not fun to debug
This commit is contained in:
Ottatop 2024-04-22 20:34:06 -05:00
parent 43891a2a48
commit ac15e2d566
7 changed files with 68 additions and 54 deletions

View file

@ -11,6 +11,7 @@ use std::{
sync::{Arc, Mutex, OnceLock},
};
use futures::{future::BoxFuture, FutureExt};
use pinnacle_api_defs::pinnacle::layout::v0alpha1::{
layout_request::{Body, ExplicitLayout, Geometries},
layout_service_client::LayoutServiceClient,
@ -34,13 +35,18 @@ use crate::{
pub struct Layout {
api: OnceLock<ApiModules>,
layout_client: LayoutServiceClient<Channel>,
fut_sender: UnboundedSender<BoxFuture<'static, ()>>,
}
impl Layout {
pub(crate) fn new(channel: Channel) -> Self {
pub(crate) fn new(
channel: Channel,
fut_sender: UnboundedSender<BoxFuture<'static, ()>>,
) -> Self {
Self {
api: OnceLock::new(),
layout_client: LayoutServiceClient::new(channel.clone()),
fut_sender,
}
}
@ -111,9 +117,10 @@ impl Layout {
})
.unwrap();
}
};
}
.boxed();
tokio::spawn(thing);
self.fut_sender.send(thing).unwrap();
requester
}
}

View file

@ -75,9 +75,9 @@
//! ## 5. Begin crafting your config!
//! You can peruse the documentation for things to configure.
use std::{sync::Arc, time::Duration};
use std::sync::Arc;
use futures::{future::BoxFuture, Future, StreamExt};
use futures::{future::BoxFuture, Future, FutureExt, StreamExt};
use input::Input;
use layout::Layout;
use output::Output;
@ -179,7 +179,7 @@ pub async fn connect(
let output = Box::leak(Box::new(Output::new(channel.clone())));
let tag = Box::leak(Box::new(Tag::new(channel.clone())));
let render = Box::leak(Box::new(Render::new(channel.clone())));
let layout = Box::leak(Box::new(Layout::new(channel.clone())));
let layout = Box::leak(Box::new(Layout::new(channel.clone(), fut_sender.clone())));
let modules = ApiModules {
pinnacle,
@ -213,25 +213,14 @@ pub async fn listen(api: ApiModules, fut_recv: UnboundedReceiver<BoxFuture<'stat
let mut fut_recv = UnboundedReceiverStream::new(fut_recv);
let mut set = futures::stream::FuturesUnordered::new();
let keepalive = async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await;
if let Err(err) = api.pinnacle.ping().await {
eprintln!("Failed to ping compositor: {err}");
panic!("failed to ping compositor");
}
}
};
let mut shutdown_stream = api.pinnacle.shutdown_watch().await;
let mut shutdown_stream = api.pinnacle.shutdown_watch();
let shutdown_watcher = async move {
let mut shutdown_watcher = async move {
// This will trigger either when the compositor sends the shutdown signal
// or when it exits (in which case the stream received an error)
shutdown_stream.next().await;
panic!("Shutdown received");
};
set.push(tokio::spawn(keepalive));
set.push(tokio::spawn(shutdown_watcher));
}
.boxed();
loop {
tokio::select! {
@ -243,11 +232,16 @@ pub async fn listen(api: ApiModules, fut_recv: UnboundedReceiver<BoxFuture<'stat
}
}
res = set.next() => {
match res {
Some(Err(_)) | None => break,
_ => (),
if let Some(Err(join_err)) = res {
eprintln!("tokio task panicked: {join_err}");
api.signal.write().await.shutdown();
break;
}
}
_ = &mut shutdown_watcher => {
api.signal.write().await.shutdown();
break;
}
}
}
}

View file

@ -40,22 +40,28 @@ impl Pinnacle {
/// ```
pub fn quit(&self) {
let mut client = self.client.clone();
block_on_tokio(client.quit(QuitRequest {})).unwrap();
// Ignore errors here, the config is meant to be killed
let _ = block_on_tokio(client.quit(QuitRequest {}));
}
/// Reload the currently active config.
pub fn reload_config(&self) {
let mut client = self.client.clone();
block_on_tokio(client.reload_config(ReloadConfigRequest {})).unwrap();
// Ignore errors here, the config is meant to be killed
let _ = block_on_tokio(client.reload_config(ReloadConfigRequest {}));
}
pub(crate) fn shutdown_watch(&self) -> Streaming<ShutdownWatchResponse> {
pub(crate) async fn shutdown_watch(&self) -> Streaming<ShutdownWatchResponse> {
let mut client = self.client.clone();
block_on_tokio(client.shutdown_watch(ShutdownWatchRequest {}))
client
.shutdown_watch(ShutdownWatchRequest {})
.await
.unwrap()
.into_inner()
}
/// TODO: eval if this is necessary
#[allow(dead_code)]
pub(super) async fn ping(&self) -> Result<(), String> {
let mut client = self.client.clone();
let mut payload = [0u8; 8];

View file

@ -298,6 +298,16 @@ impl SignalState {
self.window_pointer_leave.api.set(api.clone()).unwrap();
self.tag_active.api.set(api.clone()).unwrap();
}
pub(crate) fn shutdown(&mut self) {
self.output_connect.reset();
self.output_disconnect.reset();
self.output_resize.reset();
self.output_move.reset();
self.window_pointer_enter.reset();
self.window_pointer_leave.reset();
self.tag_active.reset();
}
}
#[derive(Default, Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
@ -413,7 +423,7 @@ where
}
}
_dc = dc_ping_recv_fuse => {
control_sender.send(Req::from_control(StreamControl::Disconnect)).expect("send failed");
let _ = control_sender.send(Req::from_control(StreamControl::Disconnect));
break;
}
}

View file

@ -49,7 +49,7 @@ use tokio::{
};
use tokio_stream::{Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
use tracing::{debug, error, warn};
use tracing::{debug, error, info, trace, warn};
use crate::{
backend::BackendData,
@ -87,7 +87,7 @@ where
let f = Box::new(|state: &mut State| {
// TODO: find a way to handle this error
if sender.send(with_state(state)).is_err() {
panic!("failed to send result to config");
warn!("failed to send result of API call to config; receiver already dropped");
}
});
@ -189,7 +189,7 @@ impl pinnacle_service_server::PinnacleService for PinnacleService {
type ShutdownWatchStream = ResponseStream<ShutdownWatchResponse>;
async fn quit(&self, _request: Request<QuitRequest>) -> Result<Response<()>, Status> {
tracing::trace!("PinnacleService.quit");
trace!("PinnacleService.quit");
run_unary_no_response(&self.sender, |state| {
state.shutdown();
@ -202,6 +202,7 @@ impl pinnacle_service_server::PinnacleService for PinnacleService {
_request: Request<ReloadConfigRequest>,
) -> Result<Response<()>, Status> {
run_unary_no_response(&self.sender, |state| {
info!("Reloading config");
state
.start_config(Some(state.config.dir(&state.xdg_base_dirs)))
.expect("failed to restart config");

View file

@ -35,7 +35,7 @@ use sysinfo::ProcessRefreshKind;
use tokio::task::JoinHandle;
use toml::Table;
use tracing::{debug, error, info};
use tracing::{debug, error, info, warn};
use xdg::BaseDirectories;
use xkbcommon::xkb::Keysym;
@ -215,11 +215,9 @@ impl Config {
join_handle.abort();
}
if let Some(shutdown_sender) = self.shutdown_sender.take() {
shutdown_sender
.send(Ok(
pinnacle_api_defs::pinnacle::v0alpha1::ShutdownWatchResponse {},
))
.expect("failed to send shutdown signal to config");
if let Err(err) = shutdown_sender.send(Ok(ShutdownWatchResponse {})) {
warn!("Failed to send shutdown signal to config: {err}");
}
}
if let Some(token) = self.config_reload_on_crash_token.take() {
loop_handle.remove(token);
@ -273,11 +271,9 @@ impl State {
/// If `config_dir` is `None`, the builtin Rust config will be used.
pub fn start_config(&mut self, mut config_dir: Option<impl AsRef<Path>>) -> anyhow::Result<()> {
if let Some(shutdown_sender) = self.config.shutdown_sender.take() {
shutdown_sender
.send(Ok(
pinnacle_api_defs::pinnacle::v0alpha1::ShutdownWatchResponse {},
))
.expect("failed to send shutdown signal to config");
if let Err(err) = shutdown_sender.send(Ok(ShutdownWatchResponse {})) {
warn!("Failed to send shutdown signal to config: {err}");
}
}
let config_dir_clone = config_dir.as_ref().map(|dir| dir.as_ref().to_path_buf());
@ -434,7 +430,7 @@ impl State {
let token = self
.loop_handle
.insert_source(ping_source, move |_, _, state| {
error!("Config crashed! Falling back to default Lua config");
error!("Config crashed! Falling back to default config");
state
.start_config(None::<PathBuf>)
.expect("failed to start default config");
@ -456,11 +452,12 @@ impl State {
panic!("builtin rust config crashed; this is a bug");
})?;
self.config.config_join_handle = Some(tokio::task::spawn_blocking(move || {
std::thread::spawn(move || {
info!("Starting builtin Rust config");
builtin::run();
info!("Builtin config exited");
pinger.ping();
}));
});
self.config.config_reload_on_crash_token = Some(token);
}

View file

@ -12,6 +12,7 @@ use crate::{
window::WindowElement,
};
use anyhow::Context;
use pinnacle_api_defs::pinnacle::v0alpha1::ShutdownWatchResponse;
use smithay::{
desktop::{PopupManager, Space},
input::{keyboard::XkbConfig, pointer::CursorImageStatus, Seat, SeatState},
@ -43,7 +44,7 @@ use smithay::{
};
use std::{cell::RefCell, path::PathBuf, sync::Arc, time::Duration};
use sysinfo::{ProcessRefreshKind, RefreshKind};
use tracing::{error, info};
use tracing::{error, info, warn};
use xdg::BaseDirectories;
use crate::input::InputState;
@ -332,11 +333,9 @@ impl State {
join_handle.abort();
}
if let Some(shutdown_sender) = self.config.shutdown_sender.take() {
shutdown_sender
.send(Ok(
pinnacle_api_defs::pinnacle::v0alpha1::ShutdownWatchResponse {},
))
.expect("failed to send shutdown signal to config");
if let Err(err) = shutdown_sender.send(Ok(ShutdownWatchResponse {})) {
warn!("Failed to send shutdown signal to config: {err}");
}
}
}
}