From b67867c0661cca309566abb36d5a34c4d3c7b3dc Mon Sep 17 00:00:00 2001 From: Ottatop Date: Mon, 15 Jan 2024 17:46:08 -0600 Subject: [PATCH] Move grpc socket to `XDG_RUNTIME_DIR` --- api/lua_grpc/pinnacle/grpc/client.lua | 4 +- src/config.rs | 197 ++++++++++++++++++++++++-- src/state.rs | 183 ++++++++++++------------ 3 files changed, 276 insertions(+), 108 deletions(-) diff --git a/api/lua_grpc/pinnacle/grpc/client.lua b/api/lua_grpc/pinnacle/grpc/client.lua index 0166dad..7f86cdb 100644 --- a/api/lua_grpc/pinnacle/grpc/client.lua +++ b/api/lua_grpc/pinnacle/grpc/client.lua @@ -156,9 +156,7 @@ end ---@return Client function client.new(loop) local sock = socket.connect({ - -- host = "127.0.0.1", - -- port = "8080", - path = "/tmp/pinnacle/grpc.sock", + path = os.getenv("PINNACLE_GRPC_SOCKET"), }) sock:connect() diff --git a/src/config.rs b/src/config.rs index 2fdb7e0..701f588 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,11 @@ use crate::{ - api::{msg::ModifierMask, PinnacleSocketSource}, + api::{ + msg::ModifierMask, + protocol::{ + InputService, OutputService, PinnacleService, ProcessService, TagService, WindowService, + }, + PinnacleSocketSource, + }, output::OutputName, tag::Tag, window::rules::{WindowRule, WindowRuleCondition}, @@ -12,9 +18,17 @@ use std::{ }; use anyhow::Context; -use pinnacle_api_defs::pinnacle::output::v0alpha1::ConnectForAllResponse; +use pinnacle_api_defs::pinnacle::{ + input::v0alpha1::input_service_server::InputServiceServer, + output::v0alpha1::{output_service_server::OutputServiceServer, ConnectForAllResponse}, + process::v0alpha1::process_service_server::ProcessServiceServer, + tag::v0alpha1::tag_service_server::TagServiceServer, + v0alpha1::pinnacle_service_server::PinnacleServiceServer, + window::v0alpha1::window_service_server::WindowServiceServer, +}; use smithay::{ input::keyboard::keysyms, + reexports::calloop::{self, channel::Event}, utils::{Logical, Point}, }; use sysinfo::ProcessRefreshKind; @@ -172,6 +186,14 @@ impl State { pub fn start_config(&mut self, config_dir: impl AsRef) -> anyhow::Result<()> { let config_dir = config_dir.as_ref(); + let metaconfig = match parse(config_dir) { + Ok(metaconfig) => metaconfig, + Err(_) => { + self.start_config(crate::XDG_BASE_DIRS.get_data_home().join("lua"))?; + return Ok(()); + } + }; + tracing::info!("Starting config"); tracing::debug!("Clearing tags"); @@ -209,21 +231,8 @@ impl State { ); std::env::set_var("PINNACLE_LIB_DIR", data_home); - std::env::set_var( - "PINNACLE_LUA_GRPC_DIR", - crate::XDG_BASE_DIRS.get_data_file("lua_grpc"), - ); - tracing::debug!("config dir is {:?}", config_dir); - let metaconfig = match parse(config_dir) { - Ok(metaconfig) => metaconfig, - Err(_) => { - self.start_config(crate::XDG_BASE_DIRS.get_data_home().join("lua"))?; - return Ok(()); - } - }; - // If a socket is provided in the metaconfig, use it. let socket_dir = if let Some(socket_dir) = &metaconfig.socket_dir { let socket_dir = shellexpand::full(socket_dir)?.to_string(); @@ -244,6 +253,8 @@ impl State { .unwrap_or(PathBuf::from(DEFAULT_SOCKET_DIR)) }; + self.start_grpc_server(socket_dir.as_path())?; + self.system_processes .refresh_processes_specifics(ProcessRefreshKind::new()); @@ -341,4 +352,160 @@ impl State { Ok(()) } + + pub fn start_grpc_server(&mut self, socket_dir: &Path) -> anyhow::Result<()> { + if self.grpc_server_join_handle.is_some() { + tracing::info!("gRPC server already started"); + return Ok(()); + } + + self.system_processes + .refresh_processes_specifics(ProcessRefreshKind::new()); + + let multiple_instances = self + .system_processes + .processes_by_exact_name("pinnacle") + .filter(|proc| proc.thread_kind().is_none()) + .count() + > 1; + + std::fs::create_dir_all(socket_dir)?; + + let socket_name = if multiple_instances { + let mut suffix: u8 = 1; + while let Ok(true) = socket_dir + .join(format!("pinnacle-grpc-{suffix}.sock")) + .try_exists() + { + suffix += 1; + } + format!("pinnacle-grpc-{suffix}.sock") + } else { + "pinnacle-grpc.sock".to_string() + }; + + let socket_path = socket_dir.join(socket_name); + + // If there are multiple instances, don't touch other sockets + if multiple_instances { + if let Ok(true) = socket_path.try_exists() { + std::fs::remove_file(&socket_path) + .context(format!("Failed to remove old socket at {socket_path:?}"))?; + } + } else { + // If there aren't, remove them all + for file in std::fs::read_dir(socket_dir)? + .filter_map(|entry| entry.ok()) + .filter(|entry| { + entry + .file_name() + .to_string_lossy() + .starts_with("pinnacle-grpc") + }) + { + tracing::debug!("Removing socket at {:?}", file.path()); + std::fs::remove_file(file.path()) + .context(format!("Failed to remove old socket at {:?}", file.path()))?; + } + } + + std::env::set_var( + "PINNACLE_PROTO_DIR", + crate::XDG_BASE_DIRS.get_data_file("protobuf"), + ); + + std::env::set_var( + "PINNACLE_LUA_GRPC_DIR", + crate::XDG_BASE_DIRS.get_data_file("lua_grpc"), + ); + + let (grpc_sender, grpc_receiver) = + calloop::channel::channel::>(); + + self.loop_handle + .insert_source(grpc_receiver, |msg, _, data| match msg { + Event::Msg(f) => f(&mut data.state), + Event::Closed => panic!("grpc receiver was closed"), + }) + .expect("failed to insert grpc_receiver into loop"); + + let pinnacle_service = PinnacleService { + sender: grpc_sender.clone(), + }; + let input_service = InputService { + sender: grpc_sender.clone(), + }; + let process_service = ProcessService { + sender: grpc_sender.clone(), + }; + let tag_service = TagService { + sender: grpc_sender.clone(), + }; + let output_service = OutputService { + sender: grpc_sender.clone(), + }; + let window_service = WindowService { + sender: grpc_sender.clone(), + }; + + let refl_service = tonic_reflection::server::Builder::configure() + .register_encoded_file_descriptor_set(pinnacle_api_defs::FILE_DESCRIPTOR_SET) + .build()?; + + let uds = tokio::net::UnixListener::bind(&socket_path)?; + let uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); + + std::env::set_var("PINNACLE_GRPC_SOCKET", socket_path); + + let grpc_server = tonic::transport::Server::builder() + .add_service(refl_service) + .add_service(PinnacleServiceServer::new(pinnacle_service)) + .add_service(InputServiceServer::new(input_service)) + .add_service(ProcessServiceServer::new(process_service)) + .add_service(TagServiceServer::new(tag_service)) + .add_service(OutputServiceServer::new(output_service)) + .add_service(WindowServiceServer::new(window_service)); + + let (ping_tx, mut ping_rx) = tokio::sync::mpsc::unbounded_channel::<()>(); + + let ping_rx_future = async move { + ping_rx.recv().await; + }; + + let ping_tx_clone = ping_tx.clone(); + let loop_signal_clone = self.loop_signal.clone(); + tokio::spawn(async move { + tokio::signal::ctrl_c() + .await + .expect("failed to listen for ctrl-c"); + ping_tx_clone + .send(()) + .expect("failed to send grpc shutdown ping"); + loop_signal_clone.stop(); + }); + + self.grpc_kill_pinger = Some(ping_tx); + + match self.xdisplay.as_ref() { + Some(_) => { + self.grpc_server_join_handle = Some(tokio::spawn(async move { + grpc_server + .serve_with_incoming_shutdown(uds_stream, ping_rx_future) + .await + })); + } + None => self.schedule( + |data| data.state.xdisplay.is_some(), + move |data| { + data.state.grpc_server_join_handle = Some(tokio::spawn(async move { + grpc_server + .serve_with_incoming_shutdown(uds_stream, ping_rx_future) + .await + })); + }, + ), + } + + Ok(()) + } } diff --git a/src/state.rs b/src/state.rs index 0dd6c05..fff4cba 100644 --- a/src/state.rs +++ b/src/state.rs @@ -65,7 +65,7 @@ pub struct State { pub backend: Backend, /// A loop signal used to stop the compositor - loop_signal: LoopSignal, + pub loop_signal: LoopSignal, /// A handle to the event loop pub loop_handle: LoopHandle<'static, CalloopData>, pub display_handle: DisplayHandle, @@ -114,7 +114,9 @@ pub struct State { pub system_processes: sysinfo::System, pub config_join_handle: Option>, - pub grpc_kill_pinger: UnboundedSender<()>, + pub grpc_kill_pinger: Option>, + pub grpc_server_join_handle: + Option>>, } impl State { @@ -246,79 +248,79 @@ impl State { // gRPC stuff - std::env::set_var( - "PINNACLE_PROTO_DIR", - XDG_BASE_DIRS.get_data_file("protobuf"), - ); - - let (grpc_sender, grpc_receiver) = - calloop::channel::channel::>(); - - loop_handle.insert_idle(|data| { - data.state - .loop_handle - .insert_source(grpc_receiver, |msg, _, data| match msg { - Event::Msg(f) => f(&mut data.state), - Event::Closed => panic!("grpc receiver was closed"), - }) - .expect("failed to insert grpc_receiver into loop"); - }); - - let pinnacle_service = PinnacleService { - sender: grpc_sender.clone(), - }; - let input_service = InputService { - sender: grpc_sender.clone(), - }; - let process_service = ProcessService { - sender: grpc_sender.clone(), - }; - let tag_service = TagService { - sender: grpc_sender.clone(), - }; - let output_service = OutputService { - sender: grpc_sender.clone(), - }; - let window_service = WindowService { - sender: grpc_sender.clone(), - }; - - let refl_service = tonic_reflection::server::Builder::configure() - .register_encoded_file_descriptor_set(pinnacle_api_defs::FILE_DESCRIPTOR_SET) - .build()?; - - let p = std::path::Path::new("/tmp/pinnacle/grpc.sock"); - let _ = std::fs::remove_file(p); - std::fs::create_dir_all(p.parent().unwrap())?; - let uds = tokio::net::UnixListener::bind(p)?; - let uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); - - let grpc_server = tonic::transport::Server::builder() - .add_service(refl_service) - .add_service(PinnacleServiceServer::new(pinnacle_service)) - .add_service(InputServiceServer::new(input_service)) - .add_service(ProcessServiceServer::new(process_service)) - .add_service(TagServiceServer::new(tag_service)) - .add_service(OutputServiceServer::new(output_service)) - .add_service(WindowServiceServer::new(window_service)); - - let (ping_tx, mut ping_rx) = tokio::sync::mpsc::unbounded_channel::<()>(); - - let ping_rx_future = async move { - ping_rx.recv().await; - }; - - let ping_tx_clone = ping_tx.clone(); - let loop_signal_clone = loop_signal.clone(); - tokio::spawn(async move { - tokio::signal::ctrl_c() - .await - .expect("failed to listen for ctrl-c"); - ping_tx_clone - .send(()) - .expect("failed to send grpc shutdown ping"); - loop_signal_clone.stop(); - }); + // std::env::set_var( + // "PINNACLE_PROTO_DIR", + // XDG_BASE_DIRS.get_data_file("protobuf"), + // ); + // + // let (grpc_sender, grpc_receiver) = + // calloop::channel::channel::>(); + // + // loop_handle.insert_idle(|data| { + // data.state + // .loop_handle + // .insert_source(grpc_receiver, |msg, _, data| match msg { + // Event::Msg(f) => f(&mut data.state), + // Event::Closed => panic!("grpc receiver was closed"), + // }) + // .expect("failed to insert grpc_receiver into loop"); + // }); + // + // let pinnacle_service = PinnacleService { + // sender: grpc_sender.clone(), + // }; + // let input_service = InputService { + // sender: grpc_sender.clone(), + // }; + // let process_service = ProcessService { + // sender: grpc_sender.clone(), + // }; + // let tag_service = TagService { + // sender: grpc_sender.clone(), + // }; + // let output_service = OutputService { + // sender: grpc_sender.clone(), + // }; + // let window_service = WindowService { + // sender: grpc_sender.clone(), + // }; + // + // let refl_service = tonic_reflection::server::Builder::configure() + // .register_encoded_file_descriptor_set(pinnacle_api_defs::FILE_DESCRIPTOR_SET) + // .build()?; + // + // let p = std::path::Path::new("/tmp/pinnacle/grpc.sock"); + // let _ = std::fs::remove_file(p); + // std::fs::create_dir_all(p.parent().unwrap())?; + // let uds = tokio::net::UnixListener::bind(p)?; + // let uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds); + // + // let grpc_server = tonic::transport::Server::builder() + // .add_service(refl_service) + // .add_service(PinnacleServiceServer::new(pinnacle_service)) + // .add_service(InputServiceServer::new(input_service)) + // .add_service(ProcessServiceServer::new(process_service)) + // .add_service(TagServiceServer::new(tag_service)) + // .add_service(OutputServiceServer::new(output_service)) + // .add_service(WindowServiceServer::new(window_service)); + // + // let (ping_tx, mut ping_rx) = tokio::sync::mpsc::unbounded_channel::<()>(); + // + // let ping_rx_future = async move { + // ping_rx.recv().await; + // }; + // + // let ping_tx_clone = ping_tx.clone(); + // let loop_signal_clone = loop_signal.clone(); + // tokio::spawn(async move { + // tokio::signal::ctrl_c() + // .await + // .expect("failed to listen for ctrl-c"); + // ping_tx_clone + // .send(()) + // .expect("failed to send grpc shutdown ping"); + // loop_signal_clone.stop(); + // }); let state = Self { backend, @@ -371,20 +373,21 @@ impl State { ), config_join_handle: None, - grpc_kill_pinger: ping_tx, + grpc_kill_pinger: None, + grpc_server_join_handle: None, }; - state.schedule( - |data| data.state.xdisplay.is_some(), - move |_| { - tokio::spawn(async move { - grpc_server - .serve_with_incoming_shutdown(uds_stream, ping_rx_future) - .await - .expect("failed to serve grpc"); - }); - }, - ); + // state.schedule( + // |data| data.state.xdisplay.is_some(), + // move |_| { + // tokio::spawn(async move { + // grpc_server + // .serve_with_incoming_shutdown(uds_stream, ping_rx_future) + // .await + // .expect("failed to serve grpc"); + // }); + // }, + // ); Ok(state) } @@ -420,9 +423,9 @@ impl State { pub fn shutdown(&self) { tracing::info!("Shutting down Pinnacle"); - self.grpc_kill_pinger - .send(()) - .expect("failed to send grpc shutdown ping"); + if let Some(pinger) = self.grpc_kill_pinger.as_ref() { + pinger.send(()).expect("failed to send grpc shutdown ping"); + } self.loop_signal.stop(); } }