Add keepalive pings to clients

This should clients to die if the compositor was killed but they weren't dropped
This commit is contained in:
Ottatop 2024-03-22 16:40:06 -05:00
parent 698cd1d973
commit 869a2223f5
7 changed files with 80 additions and 26 deletions

1
Cargo.lock generated
View file

@ -1802,6 +1802,7 @@ dependencies = [
"num_enum",
"pinnacle-api-defs",
"pinnacle-api-macros",
"rand",
"tokio",
"tokio-stream",
"tonic",

View file

@ -55,6 +55,17 @@ local client = {
version = "v0alpha1",
}
client.loop:wrap(function()
while true do
require("cqueues").sleep(60)
local success, err, errno = client.conn:ping(10)
if not success then
print("Compositor ping failed:", err, errno)
os.exit(1)
end
end
end)
---@class GrpcRequestParams
---@field service string
---@field method string

View file

@ -21,6 +21,20 @@ enum SetOrToggle {
message QuitRequest {}
// A manual ping request independent of any HTTP keepalive.
//
// Tonic does not seems to give you the means to run something
// when a keepalive ping fails, so this is for the Rust API to
// ping the compositor.
message PingRequest {
optional bytes payload = 1;
}
message PingResponse {
optional bytes payload = 1;
}
service PinnacleService {
rpc Quit(QuitRequest) returns (google.protobuf.Empty);
rpc Ping(PingRequest) returns (PingResponse);
}

View file

@ -19,3 +19,4 @@ tower = { version = "0.4.13", features = ["util"] }
futures = "0.3.30"
num_enum = "0.7.2"
xkbcommon = { workspace = true }
rand = "0.8.5"

View file

@ -80,13 +80,9 @@
//! ## 5. Begin crafting your config!
//! You can peruse the documentation for things to configure.
use std::sync::OnceLock;
use std::{sync::OnceLock, time::Duration};
use futures::{
future::{BoxFuture, Either},
stream::FuturesUnordered,
Future, StreamExt,
};
use futures::{future::BoxFuture, Future, StreamExt};
use input::Input;
use layout::Layout;
use output::Output;
@ -151,7 +147,8 @@ pub struct ApiModules {
/// You should use that macro instead of this function directly.
pub async fn connect(
) -> Result<(ApiModules, UnboundedReceiver<BoxFuture<'static, ()>>), Box<dyn std::error::Error>> {
let channel = Endpoint::try_from("http://[::]:50051")? // port doesn't matter, we use a unix socket
// port doesn't matter, we use a unix socket
let channel = Endpoint::try_from("http://[::]:50051")?
.connect_with_connector(service_fn(|_: Uri| {
tokio::net::UnixStream::connect(
std::env::var("PINNACLE_GRPC_SOCKET")
@ -198,19 +195,24 @@ pub async fn connect(
/// This function is inserted at the end of your config through the [`config`] macro.
/// You should use the macro instead of this function directly.
pub async fn listen(fut_recv: UnboundedReceiver<BoxFuture<'static, ()>>) {
let mut future_set = FuturesUnordered::<BoxFuture<'static, ()>>::new();
let mut fut_recv = UnboundedReceiverStream::new(fut_recv);
let pinnacle = PINNACLE.get().unwrap();
let keepalive = async move {
loop {
match futures::future::select(fut_recv.next(), future_set.next()).await {
Either::Left((fut, _)) => {
if let Some(fut) = fut {
future_set.push(fut);
tokio::time::sleep(Duration::from_secs(60)).await;
if let Err(err) = pinnacle.ping().await {
eprintln!("Failed to ping compositor: {err}");
std::process::exit(1);
}
}
Either::Right(_) => (),
}
};
tokio::spawn(keepalive);
while let Some(fut) = fut_recv.next().await {
tokio::spawn(fut);
}
}

View file

@ -6,26 +6,27 @@
//!
//! This module provides [`Pinnacle`], which allows you to quit the compositor.
use std::time::Duration;
use pinnacle_api_defs::pinnacle::v0alpha1::{
pinnacle_service_client::PinnacleServiceClient, QuitRequest,
pinnacle_service_client::PinnacleServiceClient, PingRequest, QuitRequest,
};
use tonic::transport::Channel;
use rand::RngCore;
use tonic::{transport::Channel, Request};
use crate::block_on_tokio;
/// A struct that allows you to quit the compositor.
#[derive(Debug, Clone)]
pub struct Pinnacle {
channel: Channel,
client: PinnacleServiceClient<Channel>,
}
impl Pinnacle {
pub(crate) fn new(channel: Channel) -> Self {
Self { channel }
Self {
client: PinnacleServiceClient::new(channel),
}
fn create_pinnacle_client(&self) -> PinnacleServiceClient<Channel> {
PinnacleServiceClient::new(self.channel.clone())
}
/// Quit Pinnacle.
@ -37,7 +38,26 @@ impl Pinnacle {
/// pinnacle.quit();
/// ```
pub fn quit(&self) {
let mut client = self.create_pinnacle_client();
let mut client = self.client.clone();
block_on_tokio(client.quit(QuitRequest {})).unwrap();
}
pub(super) async fn ping(&self) -> Result<(), String> {
let mut client = self.client.clone();
let mut payload = [0u8; 8];
rand::thread_rng().fill_bytes(&mut payload);
let mut request = Request::new(PingRequest {
payload: Some(payload.to_vec()),
});
request.set_timeout(Duration::from_secs(10));
let response = client
.ping(request)
.await
.map_err(|status| status.to_string())?;
(response.into_inner().payload() == payload)
.then_some(())
.ok_or("timed out".to_string())
}
}

View file

@ -24,7 +24,7 @@ use pinnacle_api_defs::pinnacle::{
SwitchToRequest,
},
},
v0alpha1::{pinnacle_service_server, QuitRequest, SetOrToggle},
v0alpha1::{pinnacle_service_server, PingRequest, PingResponse, QuitRequest, SetOrToggle},
};
use smithay::{
input::keyboard::XkbConfig,
@ -182,6 +182,11 @@ impl pinnacle_service_server::PinnacleService for PinnacleService {
})
.await
}
async fn ping(&self, request: Request<PingRequest>) -> Result<Response<PingResponse>, Status> {
let payload = request.into_inner().payload;
Ok(Response::new(PingResponse { payload }))
}
}
pub struct InputService {