Move grpc socket to XDG_RUNTIME_DIR

This commit is contained in:
Ottatop 2024-01-15 17:46:08 -06:00
parent 7841ea3422
commit b67867c066
3 changed files with 276 additions and 108 deletions

View file

@ -156,9 +156,7 @@ end
---@return Client
function client.new(loop)
local sock = socket.connect({
-- host = "127.0.0.1",
-- port = "8080",
path = "/tmp/pinnacle/grpc.sock",
path = os.getenv("PINNACLE_GRPC_SOCKET"),
})
sock:connect()

View file

@ -1,5 +1,11 @@
use crate::{
api::{msg::ModifierMask, PinnacleSocketSource},
api::{
msg::ModifierMask,
protocol::{
InputService, OutputService, PinnacleService, ProcessService, TagService, WindowService,
},
PinnacleSocketSource,
},
output::OutputName,
tag::Tag,
window::rules::{WindowRule, WindowRuleCondition},
@ -12,9 +18,17 @@ use std::{
};
use anyhow::Context;
use pinnacle_api_defs::pinnacle::output::v0alpha1::ConnectForAllResponse;
use pinnacle_api_defs::pinnacle::{
input::v0alpha1::input_service_server::InputServiceServer,
output::v0alpha1::{output_service_server::OutputServiceServer, ConnectForAllResponse},
process::v0alpha1::process_service_server::ProcessServiceServer,
tag::v0alpha1::tag_service_server::TagServiceServer,
v0alpha1::pinnacle_service_server::PinnacleServiceServer,
window::v0alpha1::window_service_server::WindowServiceServer,
};
use smithay::{
input::keyboard::keysyms,
reexports::calloop::{self, channel::Event},
utils::{Logical, Point},
};
use sysinfo::ProcessRefreshKind;
@ -172,6 +186,14 @@ impl State {
pub fn start_config(&mut self, config_dir: impl AsRef<Path>) -> anyhow::Result<()> {
let config_dir = config_dir.as_ref();
let metaconfig = match parse(config_dir) {
Ok(metaconfig) => metaconfig,
Err(_) => {
self.start_config(crate::XDG_BASE_DIRS.get_data_home().join("lua"))?;
return Ok(());
}
};
tracing::info!("Starting config");
tracing::debug!("Clearing tags");
@ -209,21 +231,8 @@ impl State {
);
std::env::set_var("PINNACLE_LIB_DIR", data_home);
std::env::set_var(
"PINNACLE_LUA_GRPC_DIR",
crate::XDG_BASE_DIRS.get_data_file("lua_grpc"),
);
tracing::debug!("config dir is {:?}", config_dir);
let metaconfig = match parse(config_dir) {
Ok(metaconfig) => metaconfig,
Err(_) => {
self.start_config(crate::XDG_BASE_DIRS.get_data_home().join("lua"))?;
return Ok(());
}
};
// If a socket is provided in the metaconfig, use it.
let socket_dir = if let Some(socket_dir) = &metaconfig.socket_dir {
let socket_dir = shellexpand::full(socket_dir)?.to_string();
@ -244,6 +253,8 @@ impl State {
.unwrap_or(PathBuf::from(DEFAULT_SOCKET_DIR))
};
self.start_grpc_server(socket_dir.as_path())?;
self.system_processes
.refresh_processes_specifics(ProcessRefreshKind::new());
@ -341,4 +352,160 @@ impl State {
Ok(())
}
pub fn start_grpc_server(&mut self, socket_dir: &Path) -> anyhow::Result<()> {
if self.grpc_server_join_handle.is_some() {
tracing::info!("gRPC server already started");
return Ok(());
}
self.system_processes
.refresh_processes_specifics(ProcessRefreshKind::new());
let multiple_instances = self
.system_processes
.processes_by_exact_name("pinnacle")
.filter(|proc| proc.thread_kind().is_none())
.count()
> 1;
std::fs::create_dir_all(socket_dir)?;
let socket_name = if multiple_instances {
let mut suffix: u8 = 1;
while let Ok(true) = socket_dir
.join(format!("pinnacle-grpc-{suffix}.sock"))
.try_exists()
{
suffix += 1;
}
format!("pinnacle-grpc-{suffix}.sock")
} else {
"pinnacle-grpc.sock".to_string()
};
let socket_path = socket_dir.join(socket_name);
// If there are multiple instances, don't touch other sockets
if multiple_instances {
if let Ok(true) = socket_path.try_exists() {
std::fs::remove_file(&socket_path)
.context(format!("Failed to remove old socket at {socket_path:?}"))?;
}
} else {
// If there aren't, remove them all
for file in std::fs::read_dir(socket_dir)?
.filter_map(|entry| entry.ok())
.filter(|entry| {
entry
.file_name()
.to_string_lossy()
.starts_with("pinnacle-grpc")
})
{
tracing::debug!("Removing socket at {:?}", file.path());
std::fs::remove_file(file.path())
.context(format!("Failed to remove old socket at {:?}", file.path()))?;
}
}
std::env::set_var(
"PINNACLE_PROTO_DIR",
crate::XDG_BASE_DIRS.get_data_file("protobuf"),
);
std::env::set_var(
"PINNACLE_LUA_GRPC_DIR",
crate::XDG_BASE_DIRS.get_data_file("lua_grpc"),
);
let (grpc_sender, grpc_receiver) =
calloop::channel::channel::<Box<dyn FnOnce(&mut Self) + Send>>();
self.loop_handle
.insert_source(grpc_receiver, |msg, _, data| match msg {
Event::Msg(f) => f(&mut data.state),
Event::Closed => panic!("grpc receiver was closed"),
})
.expect("failed to insert grpc_receiver into loop");
let pinnacle_service = PinnacleService {
sender: grpc_sender.clone(),
};
let input_service = InputService {
sender: grpc_sender.clone(),
};
let process_service = ProcessService {
sender: grpc_sender.clone(),
};
let tag_service = TagService {
sender: grpc_sender.clone(),
};
let output_service = OutputService {
sender: grpc_sender.clone(),
};
let window_service = WindowService {
sender: grpc_sender.clone(),
};
let refl_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(pinnacle_api_defs::FILE_DESCRIPTOR_SET)
.build()?;
let uds = tokio::net::UnixListener::bind(&socket_path)?;
let uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);
std::env::set_var("PINNACLE_GRPC_SOCKET", socket_path);
let grpc_server = tonic::transport::Server::builder()
.add_service(refl_service)
.add_service(PinnacleServiceServer::new(pinnacle_service))
.add_service(InputServiceServer::new(input_service))
.add_service(ProcessServiceServer::new(process_service))
.add_service(TagServiceServer::new(tag_service))
.add_service(OutputServiceServer::new(output_service))
.add_service(WindowServiceServer::new(window_service));
let (ping_tx, mut ping_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
let ping_rx_future = async move {
ping_rx.recv().await;
};
let ping_tx_clone = ping_tx.clone();
let loop_signal_clone = self.loop_signal.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for ctrl-c");
ping_tx_clone
.send(())
.expect("failed to send grpc shutdown ping");
loop_signal_clone.stop();
});
self.grpc_kill_pinger = Some(ping_tx);
match self.xdisplay.as_ref() {
Some(_) => {
self.grpc_server_join_handle = Some(tokio::spawn(async move {
grpc_server
.serve_with_incoming_shutdown(uds_stream, ping_rx_future)
.await
}));
}
None => self.schedule(
|data| data.state.xdisplay.is_some(),
move |data| {
data.state.grpc_server_join_handle = Some(tokio::spawn(async move {
grpc_server
.serve_with_incoming_shutdown(uds_stream, ping_rx_future)
.await
}));
},
),
}
Ok(())
}
}

View file

@ -65,7 +65,7 @@ pub struct State {
pub backend: Backend,
/// A loop signal used to stop the compositor
loop_signal: LoopSignal,
pub loop_signal: LoopSignal,
/// A handle to the event loop
pub loop_handle: LoopHandle<'static, CalloopData>,
pub display_handle: DisplayHandle,
@ -114,7 +114,9 @@ pub struct State {
pub system_processes: sysinfo::System,
pub config_join_handle: Option<tokio::task::JoinHandle<()>>,
pub grpc_kill_pinger: UnboundedSender<()>,
pub grpc_kill_pinger: Option<UnboundedSender<()>>,
pub grpc_server_join_handle:
Option<tokio::task::JoinHandle<Result<(), tonic::transport::Error>>>,
}
impl State {
@ -246,79 +248,79 @@ impl State {
// gRPC stuff
std::env::set_var(
"PINNACLE_PROTO_DIR",
XDG_BASE_DIRS.get_data_file("protobuf"),
);
let (grpc_sender, grpc_receiver) =
calloop::channel::channel::<Box<dyn FnOnce(&mut Self) + Send>>();
loop_handle.insert_idle(|data| {
data.state
.loop_handle
.insert_source(grpc_receiver, |msg, _, data| match msg {
Event::Msg(f) => f(&mut data.state),
Event::Closed => panic!("grpc receiver was closed"),
})
.expect("failed to insert grpc_receiver into loop");
});
let pinnacle_service = PinnacleService {
sender: grpc_sender.clone(),
};
let input_service = InputService {
sender: grpc_sender.clone(),
};
let process_service = ProcessService {
sender: grpc_sender.clone(),
};
let tag_service = TagService {
sender: grpc_sender.clone(),
};
let output_service = OutputService {
sender: grpc_sender.clone(),
};
let window_service = WindowService {
sender: grpc_sender.clone(),
};
let refl_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(pinnacle_api_defs::FILE_DESCRIPTOR_SET)
.build()?;
let p = std::path::Path::new("/tmp/pinnacle/grpc.sock");
let _ = std::fs::remove_file(p);
std::fs::create_dir_all(p.parent().unwrap())?;
let uds = tokio::net::UnixListener::bind(p)?;
let uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);
let grpc_server = tonic::transport::Server::builder()
.add_service(refl_service)
.add_service(PinnacleServiceServer::new(pinnacle_service))
.add_service(InputServiceServer::new(input_service))
.add_service(ProcessServiceServer::new(process_service))
.add_service(TagServiceServer::new(tag_service))
.add_service(OutputServiceServer::new(output_service))
.add_service(WindowServiceServer::new(window_service));
let (ping_tx, mut ping_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
let ping_rx_future = async move {
ping_rx.recv().await;
};
let ping_tx_clone = ping_tx.clone();
let loop_signal_clone = loop_signal.clone();
tokio::spawn(async move {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for ctrl-c");
ping_tx_clone
.send(())
.expect("failed to send grpc shutdown ping");
loop_signal_clone.stop();
});
// std::env::set_var(
// "PINNACLE_PROTO_DIR",
// XDG_BASE_DIRS.get_data_file("protobuf"),
// );
//
// let (grpc_sender, grpc_receiver) =
// calloop::channel::channel::<Box<dyn FnOnce(&mut Self) + Send>>();
//
// loop_handle.insert_idle(|data| {
// data.state
// .loop_handle
// .insert_source(grpc_receiver, |msg, _, data| match msg {
// Event::Msg(f) => f(&mut data.state),
// Event::Closed => panic!("grpc receiver was closed"),
// })
// .expect("failed to insert grpc_receiver into loop");
// });
//
// let pinnacle_service = PinnacleService {
// sender: grpc_sender.clone(),
// };
// let input_service = InputService {
// sender: grpc_sender.clone(),
// };
// let process_service = ProcessService {
// sender: grpc_sender.clone(),
// };
// let tag_service = TagService {
// sender: grpc_sender.clone(),
// };
// let output_service = OutputService {
// sender: grpc_sender.clone(),
// };
// let window_service = WindowService {
// sender: grpc_sender.clone(),
// };
//
// let refl_service = tonic_reflection::server::Builder::configure()
// .register_encoded_file_descriptor_set(pinnacle_api_defs::FILE_DESCRIPTOR_SET)
// .build()?;
//
// let p = std::path::Path::new("/tmp/pinnacle/grpc.sock");
// let _ = std::fs::remove_file(p);
// std::fs::create_dir_all(p.parent().unwrap())?;
// let uds = tokio::net::UnixListener::bind(p)?;
// let uds_stream = tokio_stream::wrappers::UnixListenerStream::new(uds);
//
// let grpc_server = tonic::transport::Server::builder()
// .add_service(refl_service)
// .add_service(PinnacleServiceServer::new(pinnacle_service))
// .add_service(InputServiceServer::new(input_service))
// .add_service(ProcessServiceServer::new(process_service))
// .add_service(TagServiceServer::new(tag_service))
// .add_service(OutputServiceServer::new(output_service))
// .add_service(WindowServiceServer::new(window_service));
//
// let (ping_tx, mut ping_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
//
// let ping_rx_future = async move {
// ping_rx.recv().await;
// };
//
// let ping_tx_clone = ping_tx.clone();
// let loop_signal_clone = loop_signal.clone();
// tokio::spawn(async move {
// tokio::signal::ctrl_c()
// .await
// .expect("failed to listen for ctrl-c");
// ping_tx_clone
// .send(())
// .expect("failed to send grpc shutdown ping");
// loop_signal_clone.stop();
// });
let state = Self {
backend,
@ -371,20 +373,21 @@ impl State {
),
config_join_handle: None,
grpc_kill_pinger: ping_tx,
grpc_kill_pinger: None,
grpc_server_join_handle: None,
};
state.schedule(
|data| data.state.xdisplay.is_some(),
move |_| {
tokio::spawn(async move {
grpc_server
.serve_with_incoming_shutdown(uds_stream, ping_rx_future)
.await
.expect("failed to serve grpc");
});
},
);
// state.schedule(
// |data| data.state.xdisplay.is_some(),
// move |_| {
// tokio::spawn(async move {
// grpc_server
// .serve_with_incoming_shutdown(uds_stream, ping_rx_future)
// .await
// .expect("failed to serve grpc");
// });
// },
// );
Ok(state)
}
@ -420,9 +423,9 @@ impl State {
pub fn shutdown(&self) {
tracing::info!("Shutting down Pinnacle");
self.grpc_kill_pinger
.send(())
.expect("failed to send grpc shutdown ping");
if let Some(pinger) = self.grpc_kill_pinger.as_ref() {
pinger.send(()).expect("failed to send grpc shutdown ping");
}
self.loop_signal.stop();
}
}