diff --git a/Cargo.lock b/Cargo.lock index c9ca093..62a28f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1622,6 +1622,7 @@ dependencies = [ "pinnacle-api-defs", "pinnacle-api-macros", "tokio", + "tokio-stream", "tonic", "tower", "xkbcommon", diff --git a/Cargo.toml b/Cargo.toml index 28cbeb8..ebc15eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ repository = "https://github.com/pinnacle-comp/pinnacle/" [workspace.dependencies] tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"]} +tokio-stream = { version = "0.1.14", features = ["net"] } prost = "0.12.3" tonic = "0.11.0" @@ -61,7 +62,7 @@ tonic = { workspace = true } tonic-reflection = { workspace = true } tokio = { workspace = true, features = ["process", "io-util", "signal"] } -tokio-stream = { version = "0.1.14", features = ["net"] } +tokio-stream = { workspace = true } bitflags = "2.4.2" pinnacle-api-defs = { workspace = true } diff --git a/api/rust/Cargo.toml b/api/rust/Cargo.toml index f5baa05..f964845 100644 --- a/api/rust/Cargo.toml +++ b/api/rust/Cargo.toml @@ -13,6 +13,7 @@ categories = ["api-bindings", "config"] pinnacle-api-defs = { workspace = true } pinnacle-api-macros = { path = "./pinnacle-api-macros" } tokio = { workspace = true, features = ["net"] } +tokio-stream = { workspace = true } tonic = { workspace = true } tower = { version = "0.4.13", features = ["util"] } futures = "0.3.30" diff --git a/api/rust/examples/default_config/main.rs b/api/rust/examples/default_config/main.rs index 7462eb5..1980f49 100644 --- a/api/rust/examples/default_config/main.rs +++ b/api/rust/examples/default_config/main.rs @@ -83,10 +83,10 @@ async fn main() { // Setup all monitors with tags "1" through "5" output.connect_for_all(move |op| { - let mut tags = tag.add(&op, tag_names); + let tags = tag.add(op, tag_names); // Be sure to set a tag to active or windows won't display - tags.next().unwrap().set_active(true); + tags.first().unwrap().set_active(true); }); process.spawn_once([terminal]); diff --git a/api/rust/src/input.rs b/api/rust/src/input.rs index 2605a36..43a784a 100644 --- a/api/rust/src/input.rs +++ b/api/rust/src/input.rs @@ -8,7 +8,7 @@ //! methods for setting key- and mousebinds, changing xkeyboard settings, and more. //! View the struct's documentation for more information. -use futures::{channel::mpsc::UnboundedSender, future::BoxFuture, FutureExt, StreamExt}; +use futures::{future::BoxFuture, FutureExt, StreamExt}; use num_enum::TryFromPrimitive; use pinnacle_api_defs::pinnacle::input::{ self, @@ -19,6 +19,7 @@ use pinnacle_api_defs::pinnacle::input::{ SetXkbConfigRequest, }, }; +use tokio::sync::mpsc::UnboundedSender; use tonic::transport::Channel; use xkbcommon::xkb::Keysym; @@ -162,7 +163,7 @@ impl Input { let modifiers = mods.into_iter().map(|modif| modif as i32).collect(); self.fut_sender - .unbounded_send( + .send( async move { let mut stream = client .set_keybind(SetKeybindRequest { @@ -218,7 +219,7 @@ impl Input { let modifiers = mods.into_iter().map(|modif| modif as i32).collect(); self.fut_sender - .unbounded_send( + .send( async move { let mut stream = client .set_mousebind(SetMousebindRequest { diff --git a/api/rust/src/lib.rs b/api/rust/src/lib.rs index 68a2edf..f9a1b66 100644 --- a/api/rust/src/lib.rs +++ b/api/rust/src/lib.rs @@ -81,15 +81,18 @@ use std::sync::OnceLock; -use futures::{ - channel::mpsc::UnboundedReceiver, future::BoxFuture, stream::FuturesUnordered, Future, - StreamExt, -}; +use futures::{future::BoxFuture, stream::FuturesUnordered, Future, StreamExt}; use input::Input; use output::Output; use pinnacle::Pinnacle; use process::Process; +use signal::SignalState; use tag::Tag; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + RwLock, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::{Endpoint, Uri}; use tower::service_fn; use window::Window; @@ -98,6 +101,7 @@ pub mod input; pub mod output; pub mod pinnacle; pub mod process; +pub mod signal; pub mod tag; pub mod util; pub mod window; @@ -112,6 +116,7 @@ static WINDOW: OnceLock = OnceLock::new(); static INPUT: OnceLock = OnceLock::new(); static OUTPUT: OnceLock = OnceLock::new(); static TAG: OnceLock = OnceLock::new(); +static SIGNAL: OnceLock> = OnceLock::new(); /// A struct containing static references to all of the configuration structs. #[derive(Debug, Clone, Copy)] @@ -145,16 +150,21 @@ pub async fn connect( })) .await?; - let (fut_sender, fut_recv) = futures::channel::mpsc::unbounded::>(); - - let output = Output::new(channel.clone(), fut_sender.clone()); + let (fut_sender, fut_recv) = unbounded_channel::>(); let pinnacle = PINNACLE.get_or_init(|| Pinnacle::new(channel.clone())); let process = PROCESS.get_or_init(|| Process::new(channel.clone(), fut_sender.clone())); let window = WINDOW.get_or_init(|| Window::new(channel.clone())); let input = INPUT.get_or_init(|| Input::new(channel.clone(), fut_sender.clone())); - let tag = TAG.get_or_init(|| Tag::new(channel.clone(), fut_sender.clone())); - let output = OUTPUT.get_or_init(|| output); + let tag = TAG.get_or_init(|| Tag::new(channel.clone())); + let output = OUTPUT.get_or_init(|| Output::new(channel.clone())); + + SIGNAL + .set(RwLock::new(SignalState::new( + channel.clone(), + fut_sender.clone(), + ))) + .map_err(|_| "failed to create SIGNAL")?; let modules = ApiModules { pinnacle, @@ -176,10 +186,12 @@ pub async fn connect( /// This function is inserted at the end of your config through the [`config`] macro. /// You should use the macro instead of this function directly. pub async fn listen(fut_recv: UnboundedReceiver>) { + let fut_recv = UnboundedReceiverStream::new(fut_recv); + let mut future_set = FuturesUnordered::< BoxFuture<( Option>, - Option>>, + Option>>, )>, >::new(); diff --git a/api/rust/src/output.rs b/api/rust/src/output.rs index d8b27e3..8d7bad8 100644 --- a/api/rust/src/output.rs +++ b/api/rust/src/output.rs @@ -9,39 +9,40 @@ //! This module provides [`Output`], which allows you to get [`OutputHandle`]s for different //! connected monitors and set them up. -use futures::{channel::mpsc::UnboundedSender, future::BoxFuture, FutureExt, StreamExt}; -use pinnacle_api_defs::pinnacle::{ - output::{ - self, - v0alpha1::{ - output_service_client::OutputServiceClient, ConnectForAllRequest, SetLocationRequest, - }, - }, - tag::v0alpha1::tag_service_client::TagServiceClient, +use futures::FutureExt; +use pinnacle_api_defs::pinnacle::output::{ + self, + v0alpha1::{output_service_client::OutputServiceClient, SetLocationRequest}, }; use tonic::transport::Channel; -use crate::{block_on_tokio, tag::TagHandle, util::Batch}; +use crate::{ + block_on_tokio, + signal::{OutputSignal, SignalHandle}, + tag::TagHandle, + util::Batch, + SIGNAL, TAG, +}; /// A struct that allows you to get handles to connected outputs and set them up. /// /// See [`OutputHandle`] for more information. #[derive(Debug, Clone)] pub struct Output { - fut_sender: UnboundedSender>, output_client: OutputServiceClient, - tag_client: TagServiceClient, } impl Output { - pub(crate) fn new( - channel: Channel, - fut_sender: UnboundedSender>, - ) -> Self { + pub(crate) fn new(channel: Channel) -> Self { Self { output_client: OutputServiceClient::new(channel.clone()), - tag_client: TagServiceClient::new(channel), - fut_sender, + } + } + + pub(crate) fn new_handle(&self, name: impl Into) -> OutputHandle { + OutputHandle { + name: name.into(), + output_client: self.output_client.clone(), } } @@ -52,14 +53,14 @@ impl Output { /// ``` /// let outputs = output.get_all(); /// ``` - pub fn get_all(&self) -> impl Iterator { + pub fn get_all(&self) -> Vec { block_on_tokio(self.get_all_async()) } /// The async version of [`Output::get_all`]. - pub async fn get_all_async(&self) -> impl Iterator { + pub async fn get_all_async(&self) -> Vec { let mut client = self.output_client.clone(); - let tag_client = self.tag_client.clone(); + client .get(output::v0alpha1::GetRequest {}) .await @@ -67,11 +68,8 @@ impl Output { .into_inner() .output_names .into_iter() - .map(move |name| OutputHandle { - output_client: client.clone(), - tag_client: tag_client.clone(), - name, - }) + .map(move |name| self.new_handle(name)) + .collect() } /// Get a handle to the output with the given name. @@ -93,6 +91,7 @@ impl Output { let name: String = name.into(); self.get_all_async() .await + .into_iter() .find(|output| output.name == name) } @@ -107,6 +106,7 @@ impl Output { /// ``` pub fn get_focused(&self) -> Option { self.get_all() + .into_iter() .find(|output| matches!(output.props().focused, Some(true))) } @@ -137,40 +137,26 @@ impl Output { /// tags.next().unwrap().set_active(true); /// }); /// ``` - pub fn connect_for_all(&self, mut for_all: impl FnMut(OutputHandle) + Send + 'static) { + pub fn connect_for_all(&self, mut for_all: impl FnMut(&OutputHandle) + Send + 'static) { for output in self.get_all() { - for_all(output); + for_all(&output); } - let mut client = self.output_client.clone(); - let tag_client = self.tag_client.clone(); + let mut signal_state = block_on_tokio(SIGNAL.get().expect("SIGNAL doesn't exist").write()); + signal_state.output_connect.add_callback(Box::new(for_all)); + } - self.fut_sender - .unbounded_send( - async move { - let mut stream = client - .connect_for_all(ConnectForAllRequest {}) - .await - .unwrap() - .into_inner(); + /// Connect to an output signal. + /// + /// The compositor will fire off signals that your config can listen for and act upon. + /// You can pass in an [`OutputSignal`] along with a callback and it will get run + /// with the necessary arguments every time a signal of that type is received. + pub fn connect_signal(&self, signal: OutputSignal) -> SignalHandle { + let mut signal_state = block_on_tokio(SIGNAL.get().expect("SIGNAL doesn't exist").write()); - while let Some(Ok(response)) = stream.next().await { - let Some(output_name) = response.output_name else { - continue; - }; - - let output = OutputHandle { - output_client: client.clone(), - tag_client: tag_client.clone(), - name: output_name, - }; - - for_all(output); - } - } - .boxed(), - ) - .unwrap(); + match signal { + OutputSignal::Connect(f) => signal_state.output_connect.add_callback(f), + } } } @@ -179,9 +165,8 @@ impl Output { /// This allows you to manipulate outputs and get their properties. #[derive(Clone, Debug)] pub struct OutputHandle { - pub(crate) output_client: OutputServiceClient, - pub(crate) tag_client: TagServiceClient, pub(crate) name: String, + output_client: OutputServiceClient, } impl PartialEq for OutputHandle { @@ -407,6 +392,8 @@ impl OutputHandle { .unwrap() .into_inner(); + let tag = TAG.get().expect("TAG doesn't exist"); + OutputProperties { make: response.make, model: response.model, @@ -421,11 +408,7 @@ impl OutputHandle { tags: response .tag_ids .into_iter() - .map(|id| TagHandle { - tag_client: self.tag_client.clone(), - output_client: self.output_client.clone(), - id, - }) + .map(|id| tag.new_handle(id)) .collect(), } } diff --git a/api/rust/src/process.rs b/api/rust/src/process.rs index c93a063..3192aaf 100644 --- a/api/rust/src/process.rs +++ b/api/rust/src/process.rs @@ -7,10 +7,11 @@ //! This module provides [`Process`], which allows you to spawn processes and set environment //! variables. -use futures::{channel::mpsc::UnboundedSender, future::BoxFuture, FutureExt, StreamExt}; +use futures::{future::BoxFuture, FutureExt, StreamExt}; use pinnacle_api_defs::pinnacle::process::v0alpha1::{ process_service_client::ProcessServiceClient, SetEnvRequest, SpawnRequest, }; +use tokio::sync::mpsc::UnboundedSender; use tonic::transport::Channel; use crate::block_on_tokio; @@ -133,7 +134,7 @@ impl Process { }; self.fut_sender - .unbounded_send( + .send( async move { let mut stream = client.spawn(request).await.unwrap().into_inner(); let Some(mut callbacks) = callbacks else { return }; diff --git a/api/rust/src/signal.rs b/api/rust/src/signal.rs new file mode 100644 index 0000000..24e7630 --- /dev/null +++ b/api/rust/src/signal.rs @@ -0,0 +1,372 @@ +//! Signals TODO: + +use std::{ + collections::{btree_map, BTreeMap}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; + +use futures::{future::BoxFuture, pin_mut, FutureExt}; +use pinnacle_api_defs::pinnacle::signal::v0alpha1::{ + signal_service_client::SignalServiceClient, SignalRequest, StreamControl, +}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedSender}, + oneshot, +}; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; +use tonic::{transport::Channel, Streaming}; + +use crate::{ + block_on_tokio, output::OutputHandle, tag::TagHandle, window::WindowHandle, OUTPUT, TAG, WINDOW, +}; + +pub(crate) trait Signal { + type Callback; +} + +macro_rules! signals { + ( $( + $( #[$cfg_enum:meta] )* $enum:ident => { + $( + $( #[$cfg:meta] )* $name:ident = { + enum_name = $renamed:ident, + callback_type = $cb:ty, + client_request = $req:ident, + on_response = $on_resp:expr, + } + )* + } + )* ) => {$( + $( + $( #[$cfg] )* + pub(crate) struct $name; + + impl $crate::signal::Signal for $name { + type Callback = $cb; + } + + impl SignalData<$name> { + pub(crate) fn add_callback(&mut self, callback: <$name as Signal>::Callback) -> SignalHandle { + if self.callback_count.load(::std::sync::atomic::Ordering::SeqCst) == 0 { + self.connect() + } + + let Some(callback_sender) = self.callback_sender.as_ref() else { + unreachable!("signal should already be connected here"); + }; + + let Some(remove_callback_sender) = self.remove_callback_sender.clone() else { + unreachable!("signal should already be connected here"); + }; + + callback_sender + .send((self.current_id, callback)) + .expect("failed to send callback"); + + let handle = SignalHandle::new(self.current_id, remove_callback_sender); + + self.current_id.0 += 1; + + handle + } + + fn reset(&mut self) { + self.callback_sender.take(); + self.dc_pinger.take(); + self.remove_callback_sender.take(); + self.callback_count.store(0, Ordering::SeqCst); + self.current_id = SignalConnId::default(); + } + + fn connect(&mut self) { + self.reset(); + + let channels = connect_signal::<_, _, <$name as Signal>::Callback, _, _>( + &self.fut_sender, + self.callback_count.clone(), + |out| { + block_on_tokio(self.client.$req(out)) + .expect("TODO") + .into_inner() + }, + $on_resp, + ); + + self.callback_sender.replace(channels.callback_sender); + self.dc_pinger.replace(channels.dc_pinger); + self.remove_callback_sender + .replace(channels.remove_callback_sender); + } + } + )* + + $( #[$cfg_enum] )* + pub enum $enum { + $( $( #[$cfg] )* $renamed($cb),)* + } + )*}; +} + +signals! { + /// Signals relating to tag events. + TagSignal => { + /// The compositor requested that the given windows be laid out. + Layout = { + enum_name = Layout, + callback_type = LayoutFn, + client_request = layout, + on_response = |response, callbacks| { + if let Some(tag_id) = response.tag_id { + let tag = TAG.get().expect("TAG doesn't exist"); + let window = WINDOW.get().expect("WINDOW doesn't exist"); + let tag = tag.new_handle(tag_id); + + let windows = response + .window_ids + .into_iter() + .map(|id| window.new_handle(id)) + .collect::>(); + + for callback in callbacks { + callback(&tag, windows.as_slice()); + } + } + }, + } + } + /// Signals relating to output events. + OutputSignal => { + /// An output was connected. + OutputConnect = { + enum_name = Connect, + callback_type = SingleOutputFn, + client_request = output_connect, + on_response = |response, callbacks| { + if let Some(output_name) = response.output_name { + let output = OUTPUT.get().expect("OUTPUT doesn't exist"); + let handle = output.new_handle(output_name); + + for callback in callbacks { + callback(&handle); + } + } + }, + } + } + /// Signals relating to window events. + WindowSignal => { + /// The pointer entered a window. + WindowPointerEnter = { + enum_name = PointerEnter, + callback_type = SingleWindowFn, + client_request = window_pointer_enter, + on_response = |response, callbacks| { + if let Some(window_id) = response.window_id { + let window = WINDOW.get().expect("WINDOW doesn't exist"); + let handle = window.new_handle(window_id); + + for callback in callbacks { + callback(&handle); + } + } + }, + } + /// The pointer left a window. + WindowPointerLeave = { + enum_name = PointerLeave, + callback_type = SingleWindowFn, + client_request = window_pointer_leave, + on_response = |response, callbacks| { + if let Some(window_id) = response.window_id { + let window = WINDOW.get().expect("WINDOW doesn't exist"); + let handle = window.new_handle(window_id); + + for callback in callbacks { + callback(&handle); + } + } + }, + } + } +} + +pub(crate) type LayoutFn = Box; +pub(crate) type SingleOutputFn = Box; +pub(crate) type SingleWindowFn = Box; + +pub(crate) struct SignalState { + pub(crate) layout: SignalData, + pub(crate) output_connect: SignalData, + pub(crate) window_pointer_enter: SignalData, + pub(crate) window_pointer_leave: SignalData, +} + +impl SignalState { + pub(crate) fn new( + channel: Channel, + fut_sender: UnboundedSender>, + ) -> Self { + let client = SignalServiceClient::new(channel); + Self { + layout: SignalData::new(client.clone(), fut_sender.clone()), + output_connect: SignalData::new(client.clone(), fut_sender.clone()), + window_pointer_enter: SignalData::new(client.clone(), fut_sender.clone()), + window_pointer_leave: SignalData::new(client.clone(), fut_sender.clone()), + } + } +} + +#[derive(Default, Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct SignalConnId(pub(crate) u32); + +pub(crate) struct SignalData { + client: SignalServiceClient, + fut_sender: UnboundedSender>, + callback_sender: Option>, + remove_callback_sender: Option>, + dc_pinger: Option>, + current_id: SignalConnId, + callback_count: Arc, +} + +impl SignalData { + fn new( + client: SignalServiceClient, + fut_sender: UnboundedSender>, + ) -> Self { + Self { + client, + fut_sender, + callback_sender: Default::default(), + remove_callback_sender: Default::default(), + dc_pinger: Default::default(), + current_id: Default::default(), + callback_count: Default::default(), + } + } +} + +struct ConnectSignalChannels { + callback_sender: UnboundedSender<(SignalConnId, F)>, + dc_pinger: oneshot::Sender<()>, + remove_callback_sender: UnboundedSender, +} + +fn connect_signal( + fut_sender: &UnboundedSender>, + callback_count: Arc, + to_in_stream: T, + mut on_response: O, +) -> ConnectSignalChannels +where + Req: SignalRequest + Send + 'static, + Resp: 'static, + F: Send + 'static, + T: FnOnce(UnboundedReceiverStream) -> Streaming, + O: FnMut(Resp, btree_map::ValuesMut<'_, SignalConnId, F>) + Send + 'static, +{ + let (control_sender, recv) = unbounded_channel::(); + let out_stream = UnboundedReceiverStream::new(recv); + + let mut in_stream = to_in_stream(out_stream); + + let (callback_sender, mut callback_recv) = unbounded_channel::<(SignalConnId, F)>(); + let (remove_callback_sender, mut remove_callback_recv) = unbounded_channel::(); + let (dc_pinger, mut dc_ping_recv) = oneshot::channel::<()>(); + + let signal_future = async move { + let mut callbacks = BTreeMap::::new(); + + control_sender + .send(Req::from_control(StreamControl::Ready)) + .expect("send failed"); + + loop { + let in_stream_next = in_stream.next().fuse(); + pin_mut!(in_stream_next); + let callback_recv_recv = callback_recv.recv().fuse(); + pin_mut!(callback_recv_recv); + let remove_callback_recv_recv = remove_callback_recv.recv().fuse(); + pin_mut!(remove_callback_recv_recv); + let mut dc_ping_recv_fuse = (&mut dc_ping_recv).fuse(); + + futures::select! { + response = in_stream_next => { + let Some(response) = response else { continue }; + + match response { + Ok(response) => { + on_response(response, callbacks.values_mut()); + + control_sender + .send(Req::from_control(StreamControl::Ready)) + .expect("send failed"); + } + Err(status) => eprintln!("Error in recv: {status}"), + } + } + callback = callback_recv_recv => { + if let Some((id, callback)) = callback { + callbacks.insert(id, callback); + callback_count.fetch_add(1, Ordering::SeqCst); + } + } + remove = remove_callback_recv_recv => { + if let Some(id) = remove { + if callbacks.remove(&id).is_some() { + assert!(callback_count.fetch_sub(1, Ordering::SeqCst) > 0); + } + if callbacks.is_empty() { + assert!(callback_count.load(Ordering::SeqCst) == 0); + control_sender.send(Req::from_control(StreamControl::Disconnect)).expect("send failed"); + break; + } + } + } + _dc = dc_ping_recv_fuse => { + println!("dc"); + control_sender.send(Req::from_control(StreamControl::Disconnect)).expect("send failed"); + break; + } + } + } + }; + + fut_sender.send(signal_future.boxed()).expect("send failed"); + + ConnectSignalChannels { + callback_sender, + dc_pinger, + remove_callback_sender, + } +} + +/// A handle that can be used to disconnect from a signal connection. +/// +/// This will remove the connected callback. +pub struct SignalHandle { + id: SignalConnId, + remove_callback_sender: UnboundedSender, +} + +impl SignalHandle { + pub(crate) fn new( + id: SignalConnId, + remove_callback_sender: UnboundedSender, + ) -> Self { + Self { + id, + remove_callback_sender, + } + } + + /// Disconnect this signal connection. + pub fn disconnect(self) { + self.remove_callback_sender + .send(self.id) + .expect("failed to disconnect signal"); + } +} diff --git a/api/rust/src/tag.rs b/api/rust/src/tag.rs index e760896..7aa9806 100644 --- a/api/rust/src/tag.rs +++ b/api/rust/src/tag.rs @@ -34,45 +34,42 @@ use std::{ sync::{Arc, Mutex}, }; -use futures::{channel::mpsc::UnboundedSender, future::BoxFuture, FutureExt}; +use futures::FutureExt; use num_enum::TryFromPrimitive; -use pinnacle_api_defs::pinnacle::{ - output::v0alpha1::output_service_client::OutputServiceClient, - tag::{ - self, - v0alpha1::{ - tag_service_client::TagServiceClient, AddRequest, RemoveRequest, SetActiveRequest, - SetLayoutRequest, SwitchToRequest, - }, +use pinnacle_api_defs::pinnacle::tag::{ + self, + v0alpha1::{ + tag_service_client::TagServiceClient, AddRequest, RemoveRequest, SetActiveRequest, + SetLayoutRequest, SwitchToRequest, }, }; use tonic::transport::Channel; use crate::{ block_on_tokio, - output::{Output, OutputHandle}, + output::OutputHandle, + signal::{SignalHandle, TagSignal}, util::Batch, + OUTPUT, SIGNAL, }; /// A struct that allows you to add and remove tags and get [`TagHandle`]s. #[derive(Clone, Debug)] pub struct Tag { - channel: Channel, - fut_sender: UnboundedSender>, tag_client: TagServiceClient, - output_client: OutputServiceClient, } impl Tag { - pub(crate) fn new( - channel: Channel, - fut_sender: UnboundedSender>, - ) -> Self { + pub(crate) fn new(channel: Channel) -> Self { Self { tag_client: TagServiceClient::new(channel.clone()), - output_client: OutputServiceClient::new(channel.clone()), - channel, - fut_sender, + } + } + + pub(crate) fn new_handle(&self, id: u32) -> TagHandle { + TagHandle { + id, + tag_client: self.tag_client.clone(), } } @@ -93,7 +90,7 @@ impl Tag { &self, output: &OutputHandle, tag_names: impl IntoIterator>, - ) -> impl Iterator { + ) -> Vec { block_on_tokio(self.add_async(output, tag_names)) } @@ -102,9 +99,8 @@ impl Tag { &self, output: &OutputHandle, tag_names: impl IntoIterator>, - ) -> impl Iterator { + ) -> Vec { let mut client = self.tag_client.clone(); - let output_client = self.output_client.clone(); let tag_names = tag_names.into_iter().map(Into::into).collect(); @@ -117,11 +113,11 @@ impl Tag { .unwrap() .into_inner(); - response.tag_ids.into_iter().map(move |id| TagHandle { - tag_client: client.clone(), - output_client: output_client.clone(), - id, - }) + response + .tag_ids + .into_iter() + .map(move |id| self.new_handle(id)) + .collect() } /// Get handles to all tags across all outputs. @@ -131,14 +127,13 @@ impl Tag { /// ``` /// let all_tags = tag.get_all(); /// ``` - pub fn get_all(&self) -> impl Iterator { + pub fn get_all(&self) -> Vec { block_on_tokio(self.get_all_async()) } /// The async version of [`Tag::get_all`]. - pub async fn get_all_async(&self) -> impl Iterator { + pub async fn get_all_async(&self) -> Vec { let mut client = self.tag_client.clone(); - let output_client = self.output_client.clone(); let response = client .get(tag::v0alpha1::GetRequest {}) @@ -146,11 +141,11 @@ impl Tag { .unwrap() .into_inner(); - response.tag_ids.into_iter().map(move |id| TagHandle { - tag_client: client.clone(), - output_client: output_client.clone(), - id, - }) + response + .tag_ids + .into_iter() + .map(move |id| self.new_handle(id)) + .collect() } /// Get a handle to the first tag with the given name on the focused output. @@ -170,7 +165,7 @@ impl Tag { /// The async version of [`Tag::get`]. pub async fn get_async(&self, name: impl Into) -> Option { let name = name.into(); - let output_module = Output::new(self.channel.clone(), self.fut_sender.clone()); + let output_module = OUTPUT.get().expect("OUTPUT doesn't exist"); let focused_output = output_module.get_focused(); if let Some(output) = focused_output { @@ -280,7 +275,7 @@ impl Tag { let layouts_clone = layouts.clone(); let len = layouts.len(); - let output_module = Output::new(self.channel.clone(), self.fut_sender.clone()); + let output_module = OUTPUT.get().expect("OUTPUT doesn't exist"); let output_module_clone = output_module.clone(); let next = move |output: Option<&OutputHandle>| { @@ -343,6 +338,19 @@ impl Tag { next: Box::new(next), } } + + /// Connect to a tag signal. + /// + /// The compositor will fire off signals that your config can listen for and act upon. + /// You can pass in a [`TagSignal`] along with a callback and it will get run + /// with the necessary arguments every time a signal of that type is received. + pub fn connect_signal(&self, signal: TagSignal) -> SignalHandle { + let mut signal_state = block_on_tokio(SIGNAL.get().expect("SIGNAL doesn't exist").write()); + + match signal { + TagSignal::Layout(layout_fn) => signal_state.layout.add_callback(layout_fn), + } + } } /// A layout cycler that keeps track of tags and their layouts and provides functions to cycle @@ -361,8 +369,7 @@ pub struct LayoutCycler { #[derive(Debug, Clone)] pub struct TagHandle { pub(crate) id: u32, - pub(crate) tag_client: TagServiceClient, - pub(crate) output_client: OutputServiceClient, + tag_client: TagServiceClient, } impl PartialEq for TagHandle { @@ -543,7 +550,6 @@ impl TagHandle { /// The async version of [`TagHandle::props`]. pub async fn props_async(&self) -> TagProperties { let mut client = self.tag_client.clone(); - let output_client = self.output_client.clone(); let response = client .get_properties(tag::v0alpha1::GetPropertiesRequest { @@ -553,14 +559,12 @@ impl TagHandle { .unwrap() .into_inner(); + let output = OUTPUT.get().expect("OUTPUT doesn't exist"); + TagProperties { active: response.active, name: response.name, - output: response.output_name.map(|name| OutputHandle { - output_client, - tag_client: client, - name, - }), + output: response.output_name.map(|name| output.new_handle(name)), } } diff --git a/api/rust/src/window.rs b/api/rust/src/window.rs index f9a84bb..6816080 100644 --- a/api/rust/src/window.rs +++ b/api/rust/src/window.rs @@ -15,8 +15,6 @@ use futures::FutureExt; use num_enum::TryFromPrimitive; use pinnacle_api_defs::pinnacle::{ - output::v0alpha1::output_service_client::OutputServiceClient, - tag::v0alpha1::tag_service_client::TagServiceClient, window::v0alpha1::{ window_service_client::WindowServiceClient, AddWindowRuleRequest, CloseRequest, MoveToTagRequest, SetTagRequest, @@ -34,8 +32,10 @@ use tonic::transport::Channel; use crate::{ block_on_tokio, input::MouseButton, + signal::{SignalHandle, WindowSignal}, tag::TagHandle, util::{Batch, Geometry}, + SIGNAL, TAG, }; use self::rules::{WindowRule, WindowRuleCondition}; @@ -48,16 +48,19 @@ pub mod rules; #[derive(Debug, Clone)] pub struct Window { window_client: WindowServiceClient, - tag_client: TagServiceClient, - output_client: OutputServiceClient, } impl Window { pub(crate) fn new(channel: Channel) -> Self { Self { window_client: WindowServiceClient::new(channel.clone()), - tag_client: TagServiceClient::new(channel.clone()), - output_client: OutputServiceClient::new(channel), + } + } + + pub(crate) fn new_handle(&self, id: u32) -> WindowHandle { + WindowHandle { + id, + window_client: self.window_client.clone(), } } @@ -126,8 +129,6 @@ impl Window { /// The async version of [`Window::get_all`]. pub async fn get_all_async(&self) -> impl Iterator { let mut client = self.window_client.clone(); - let tag_client = self.tag_client.clone(); - let output_client = self.output_client.clone(); client .get(GetRequest {}) .await @@ -135,12 +136,10 @@ impl Window { .into_inner() .window_ids .into_iter() - .map(move |id| WindowHandle { - window_client: client.clone(), - id, - tag_client: tag_client.clone(), - output_client: output_client.clone(), - }) + .map(move |id| self.new_handle(id)) + .collect::>() + // TODO: consider changing return type to Vec to avoid this into_iter + .into_iter() } /// Get the currently focused window. @@ -177,6 +176,20 @@ impl Window { })) .unwrap(); } + + /// Connect to a window signal. + /// + /// The compositor will fire off signals that your config can listen for and act upon. + /// You can pass in a [`WindowSignal`] along with a callback and it will get run + /// with the necessary arguments every time a signal of that type is received. + pub fn connect_signal(&self, signal: WindowSignal) -> SignalHandle { + let mut signal_state = block_on_tokio(SIGNAL.get().expect("SIGNAL doesn't exist").write()); + + match signal { + WindowSignal::PointerEnter(f) => signal_state.window_pointer_enter.add_callback(f), + WindowSignal::PointerLeave(f) => signal_state.window_pointer_leave.add_callback(f), + } + } } /// A handle to a window. @@ -186,8 +199,6 @@ impl Window { pub struct WindowHandle { id: u32, window_client: WindowServiceClient, - tag_client: TagServiceClient, - output_client: OutputServiceClient, } impl PartialEq for WindowHandle { @@ -476,7 +487,6 @@ impl WindowHandle { /// The async version of [`props`][Self::props]. pub async fn props_async(&self) -> WindowProperties { let mut client = self.window_client.clone(); - let tag_client = self.tag_client.clone(); let response = match client .get_properties(window::v0alpha1::GetPropertiesRequest { @@ -504,6 +514,8 @@ impl WindowHandle { height: geo.height() as u32, }); + let tag = TAG.get().expect("TAG doesn't exist"); + WindowProperties { geometry, class: response.class, @@ -514,11 +526,7 @@ impl WindowHandle { tags: response .tag_ids .into_iter() - .map(|id| TagHandle { - tag_client: tag_client.clone(), - output_client: self.output_client.clone(), - id, - }) + .map(|id| tag.new_handle(id)) .collect(), } } diff --git a/pinnacle-api-defs/src/lib.rs b/pinnacle-api-defs/src/lib.rs index 4f1b000..67c01f1 100644 --- a/pinnacle-api-defs/src/lib.rs +++ b/pinnacle-api-defs/src/lib.rs @@ -37,6 +37,36 @@ pub mod pinnacle { pub mod signal { pub mod v0alpha1 { tonic::include_proto!("pinnacle.signal.v0alpha1"); + + pub trait SignalRequest { + fn from_control(control: StreamControl) -> Self; + fn control(&self) -> StreamControl; + } + + macro_rules! impl_signal_request { + ( $( $request:ident ),* ) => { + $( + impl SignalRequest for $request { + fn from_control(control: StreamControl) -> Self { + $request { + control: Some(control as i32), + } + } + + fn control(&self) -> StreamControl { + self.control() + } + } + )* + }; + } + + impl_signal_request!( + OutputConnectRequest, + LayoutRequest, + WindowPointerEnterRequest, + WindowPointerLeaveRequest + ); } } } diff --git a/src/api/signal.rs b/src/api/signal.rs index 46beaf8..0d7c9a2 100644 --- a/src/api/signal.rs +++ b/src/api/signal.rs @@ -2,8 +2,8 @@ use std::collections::VecDeque; use pinnacle_api_defs::pinnacle::signal::v0alpha1::{ signal_service_server, LayoutRequest, LayoutResponse, OutputConnectRequest, - OutputConnectResponse, StreamControl, WindowPointerEnterRequest, WindowPointerEnterResponse, - WindowPointerLeaveRequest, WindowPointerLeaveResponse, + OutputConnectResponse, SignalRequest, StreamControl, WindowPointerEnterRequest, + WindowPointerEnterResponse, WindowPointerLeaveRequest, WindowPointerLeaveResponse, }; use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; use tonic::{Request, Response, Status, Streaming}; @@ -118,29 +118,6 @@ impl> SignalData { } } -trait SignalRequest { - fn control(&self) -> StreamControl; -} - -macro_rules! impl_signal_request { - ( $( $request:ident ),* ) => { - $( - impl SignalRequest for $request { - fn control(&self) -> StreamControl { - self.control() - } - } - )* - }; -} - -impl_signal_request!( - OutputConnectRequest, - LayoutRequest, - WindowPointerEnterRequest, - WindowPointerLeaveRequest -); - fn start_signal_stream( sender: StateFnSender, in_stream: Streaming,