From 563bf7d92ab2e3ac207006c2b7d0b3c227bea7fe Mon Sep 17 00:00:00 2001 From: Ottatop Date: Wed, 21 Feb 2024 18:08:23 -0600 Subject: [PATCH 01/11] Scaffold signals --- api/lua/pinnacle/grpc/client.lua | 6 + .../pinnacle/signal/v0alpha1/signal.proto | 53 +++++ pinnacle-api-defs/build.rs | 1 + pinnacle-api-defs/src/lib.rs | 6 + src/api.rs | 46 ++++- src/api/signal.rs | 188 ++++++++++++++++++ src/config.rs | 8 +- src/state.rs | 6 +- 8 files changed, 309 insertions(+), 5 deletions(-) create mode 100644 api/protocol/pinnacle/signal/v0alpha1/signal.proto create mode 100644 src/api/signal.rs diff --git a/api/lua/pinnacle/grpc/client.lua b/api/lua/pinnacle/grpc/client.lua index 7f45105..8d21d35 100644 --- a/api/lua/pinnacle/grpc/client.lua +++ b/api/lua/pinnacle/grpc/client.lua @@ -162,4 +162,10 @@ function client.server_streaming_request(grpc_request_params, callback) end) end +---@param grpc_request_params GrpcRequestParams +---@param callback fun(response: table) +--- +---@return H2Stream +function client.bidirectional_streaming_request(grpc_request_params, callback) end + return client diff --git a/api/protocol/pinnacle/signal/v0alpha1/signal.proto b/api/protocol/pinnacle/signal/v0alpha1/signal.proto new file mode 100644 index 0000000..c1da727 --- /dev/null +++ b/api/protocol/pinnacle/signal/v0alpha1/signal.proto @@ -0,0 +1,53 @@ +syntax = "proto2"; + +package pinnacle.signal.v0alpha1; + +import "google/protobuf/empty.proto"; + +enum StreamControl { + STREAM_CONTROL_UNSPECIFIED = 0; + // The client is ready to receive the next signal. + STREAM_CONTROL_READY = 1; + // The client wishes to disconnect a signal connection. + STREAM_CONTROL_DISCONNECT = 2; +} + +message OutputConnectRequest { + optional StreamControl control = 1; +} +message OutputConnectResponse { + optional string output_name = 1; +} + +message LayoutRequest { + optional StreamControl control = 1; +} +message LayoutResponse { + // The windows that need to be laid out. + repeated uint32 window_ids = 1; + // The tag that is being laid out. + optional uint32 tag_id = 2; +} + +message WindowPointerEnterRequest { + optional StreamControl control = 1; +} +message WindowPointerEnterResponse { + // The window that the pointer entered. + optional uint32 window_id = 1; +} + +message WindowPointerLeaveRequest { + optional StreamControl control = 1; +} +message WindowPointerLeaveResponse { + // The window that the pointer left. + optional uint32 window_id = 1; +} + +service SignalService { + rpc OutputConnect(stream OutputConnectRequest) returns (stream OutputConnectResponse); + rpc Layout(stream LayoutRequest) returns (stream LayoutResponse); + rpc WindowPointerEnter(stream WindowPointerEnterRequest) returns (stream WindowPointerEnterResponse); + rpc WindowPointerLeave(stream WindowPointerLeaveRequest) returns (stream WindowPointerLeaveResponse); +} diff --git a/pinnacle-api-defs/build.rs b/pinnacle-api-defs/build.rs index 7981037..62d8aaa 100644 --- a/pinnacle-api-defs/build.rs +++ b/pinnacle-api-defs/build.rs @@ -13,6 +13,7 @@ fn main() { formatcp!("../api/protocol/pinnacle/process/{VERSION}/process.proto"), formatcp!("../api/protocol/pinnacle/tag/{VERSION}/tag.proto"), formatcp!("../api/protocol/pinnacle/window/{VERSION}/window.proto"), + formatcp!("../api/protocol/pinnacle/signal/{VERSION}/signal.proto"), ]; let descriptor_path = PathBuf::from(std::env::var("OUT_DIR").unwrap()).join("pinnacle.bin"); diff --git a/pinnacle-api-defs/src/lib.rs b/pinnacle-api-defs/src/lib.rs index 5a83be4..4f1b000 100644 --- a/pinnacle-api-defs/src/lib.rs +++ b/pinnacle-api-defs/src/lib.rs @@ -33,6 +33,12 @@ pub mod pinnacle { tonic::include_proto!("pinnacle.process.v0alpha1"); } } + + pub mod signal { + pub mod v0alpha1 { + tonic::include_proto!("pinnacle.signal.v0alpha1"); + } + } } pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("pinnacle"); diff --git a/src/api.rs b/src/api.rs index cff0d85..5e11e99 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,3 +1,5 @@ +pub mod signal; + use std::{ffi::OsString, num::NonZeroU32, pin::Pin, process::Stdio}; use pinnacle_api_defs::pinnacle::{ @@ -45,8 +47,8 @@ use tokio::{ io::AsyncBufReadExt, sync::mpsc::{unbounded_channel, UnboundedSender}, }; -use tokio_stream::Stream; -use tonic::{Request, Response, Status}; +use tokio_stream::{Stream, StreamExt}; +use tonic::{Request, Response, Status, Streaming}; use crate::{ config::ConnectorSavedState, @@ -58,6 +60,8 @@ use crate::{ window::{window_state::WindowId, WindowElement}, }; +use self::signal::SignalData; + type ResponseStream = Pin> + Send>>; pub type StateFnSender = calloop::channel::Sender>; @@ -122,6 +126,44 @@ where Ok(Response::new(Box::pin(receiver_stream))) } +fn run_bidirectional_streaming( + fn_sender: StateFnSender, + mut in_stream: Streaming, + with_client_item: F1, + with_out_stream: F2, +) -> Result>, Status> +where + F1: Fn(&mut State, Result) + Clone + Send + 'static, + F2: FnOnce(&mut State, UnboundedSender>) + Send + 'static, + I: Send + 'static, + O: Send + 'static, +{ + let (sender, receiver) = unbounded_channel::>(); + + let with_out_stream = Box::new(|state: &mut State| { + with_out_stream(state, sender); + }); + + fn_sender + .send(with_out_stream) + .map_err(|_| Status::internal("failed to execute request"))?; + + let with_in_stream = async move { + while let Some(t) = in_stream.next().await { + let with_client_item = with_client_item.clone(); + // TODO: handle error + let _ = fn_sender.send(Box::new(move |state: &mut State| { + with_client_item(state, t); + })); + } + }; + + tokio::spawn(with_in_stream); + + let receiver_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(receiver); + Ok(Response::new(Box::pin(receiver_stream))) +} + pub struct PinnacleService { sender: StateFnSender, } diff --git a/src/api/signal.rs b/src/api/signal.rs new file mode 100644 index 0000000..5b10c74 --- /dev/null +++ b/src/api/signal.rs @@ -0,0 +1,188 @@ +use pinnacle_api_defs::pinnacle::signal::v0alpha1::{ + signal_service_server, LayoutRequest, LayoutResponse, OutputConnectRequest, + OutputConnectResponse, StreamControl, WindowPointerEnterRequest, WindowPointerEnterResponse, + WindowPointerLeaveRequest, WindowPointerLeaveResponse, +}; +use tokio::sync::mpsc::UnboundedSender; +use tonic::{Request, Response, Status, Streaming}; + +use crate::state::State; + +use super::{run_bidirectional_streaming, ResponseStream, StateFnSender}; + +#[derive(Debug, Default)] +pub struct SignalState { + pub output_connect: SignalData, + pub layout: SignalData, + pub window_pointer_enter: SignalData, + pub window_pointer_leave: SignalData, +} + +#[derive(Debug, Default)] +pub struct SignalData { + sender: Option>>, + ready: bool, + value: Option, +} + +impl SignalData { + pub fn signal(&mut self, with_data: impl FnOnce(Option) -> T) { + let Some(sender) = self.sender.as_ref() else { + return; + }; + + if self.ready { + sender + .send(Ok(with_data(self.value.take()))) + .expect("failed to send signal"); + self.ready = false; + } else { + self.value = Some(with_data(self.value.take())); + } + } + + pub fn connect(&mut self, sender: UnboundedSender>) { + self.sender.replace(sender); + } + + fn disconnect(&mut self) { + self.sender.take(); + self.ready = false; + self.value.take(); + } + + fn ready(&mut self) { + let Some(sender) = self.sender.as_ref() else { + return; + }; + + if let Some(value) = self.value.take() { + sender.send(Ok(value)).expect("failed to send signal"); + self.ready = false; + } else { + self.ready = true; + } + } +} + +trait SignalRequest { + fn control(&self) -> StreamControl; +} + +macro_rules! impl_signal_request { + ( $( $request:ident ),* ) => { + $( + impl SignalRequest for $request { + fn control(&self) -> StreamControl { + self.control() + } + } + )* + }; +} + +impl_signal_request!( + OutputConnectRequest, + LayoutRequest, + WindowPointerEnterRequest, + WindowPointerLeaveRequest +); + +fn start_signal_stream( + sender: StateFnSender, + in_stream: Streaming, + signal: impl Fn(&mut State) -> &mut SignalData + Clone + Send + 'static, +) -> Result>, Status> +where + I: Send + 'static, + O: Send + 'static, +{ + let signal_clone = signal.clone(); + + run_bidirectional_streaming( + sender, + in_stream, + move |state, request| { + let request = match request { + Ok(request) => request, + Err(status) => { + tracing::error!("Error in output_connect signal in stream: {status}"); + return; + } + }; + + let signal = signal(state); + match request.control() { + StreamControl::Ready => signal.ready(), + StreamControl::Disconnect => signal.disconnect(), + StreamControl::Unspecified => tracing::warn!("Received unspecified stream control"), + } + }, + move |state, sender| { + let signal = signal_clone(state); + signal.connect(sender); + }, + ) +} + +pub struct SignalService { + sender: StateFnSender, +} + +impl SignalService { + pub fn new(sender: StateFnSender) -> Self { + Self { sender } + } +} + +#[tonic::async_trait] +impl signal_service_server::SignalService for SignalService { + type OutputConnectStream = ResponseStream; + type LayoutStream = ResponseStream; + type WindowPointerEnterStream = ResponseStream; + type WindowPointerLeaveStream = ResponseStream; + + async fn output_connect( + &self, + request: Request>, + ) -> Result, Status> { + let in_stream = request.into_inner(); + + start_signal_stream(self.sender.clone(), in_stream, |state| { + &mut state.signal_state.output_connect + }) + } + + async fn layout( + &self, + request: Request>, + ) -> Result, Status> { + let in_stream = request.into_inner(); + + start_signal_stream(self.sender.clone(), in_stream, |state| { + &mut state.signal_state.layout + }) + } + + async fn window_pointer_enter( + &self, + request: Request>, + ) -> Result, Status> { + let in_stream = request.into_inner(); + + start_signal_stream(self.sender.clone(), in_stream, |state| { + &mut state.signal_state.window_pointer_enter + }) + } + + async fn window_pointer_leave( + &self, + request: Request>, + ) -> Result, Status> { + let in_stream = request.into_inner(); + + start_signal_stream(self.sender.clone(), in_stream, |state| { + &mut state.signal_state.window_pointer_leave + }) + } +} diff --git a/src/config.rs b/src/config.rs index 2aa7707..ebedc1c 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,7 @@ use crate::{ api::{ - InputService, OutputService, PinnacleService, ProcessService, TagService, WindowService, + signal::SignalService, InputService, OutputService, PinnacleService, ProcessService, + TagService, WindowService, }, input::ModifierMask, output::OutputName, @@ -18,6 +19,7 @@ use pinnacle_api_defs::pinnacle::{ input::v0alpha1::input_service_server::InputServiceServer, output::v0alpha1::{output_service_server::OutputServiceServer, ConnectForAllResponse}, process::v0alpha1::process_service_server::ProcessServiceServer, + signal::v0alpha1::signal_service_server::SignalServiceServer, tag::v0alpha1::tag_service_server::TagServiceServer, v0alpha1::pinnacle_service_server::PinnacleServiceServer, window::v0alpha1::window_service_server::WindowServiceServer, @@ -447,6 +449,7 @@ impl State { let tag_service = TagService::new(grpc_sender.clone()); let output_service = OutputService::new(grpc_sender.clone()); let window_service = WindowService::new(grpc_sender.clone()); + let signal_service = SignalService::new(grpc_sender.clone()); let refl_service = tonic_reflection::server::Builder::configure() .register_encoded_file_descriptor_set(pinnacle_api_defs::FILE_DESCRIPTOR_SET) @@ -464,7 +467,8 @@ impl State { .add_service(ProcessServiceServer::new(process_service)) .add_service(TagServiceServer::new(tag_service)) .add_service(OutputServiceServer::new(output_service)) - .add_service(WindowServiceServer::new(window_service)); + .add_service(WindowServiceServer::new(window_service)) + .add_service(SignalServiceServer::new(signal_service)); match self.xdisplay.as_ref() { Some(_) => { diff --git a/src/state.rs b/src/state.rs index 345a2bb..1726011 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,7 +1,7 @@ // SPDX-License-Identifier: GPL-3.0-or-later use crate::{ - backend::Backend, config::Config, cursor::Cursor, focus::FocusState, + api::signal::SignalState, backend::Backend, config::Config, cursor::Cursor, focus::FocusState, grab::resize_grab::ResizeSurfaceState, window::WindowElement, }; use anyhow::Context; @@ -93,6 +93,8 @@ pub struct State { pub grpc_server_join_handle: Option>, pub xdg_base_dirs: BaseDirectories, + + pub signal_state: SignalState, } impl State { @@ -269,6 +271,8 @@ impl State { xdg_base_dirs: BaseDirectories::with_prefix("pinnacle") .context("couldn't create xdg BaseDirectories")?, + + signal_state: SignalState::default(), }; Ok(state) From 1539f73e459771a9cf2d10c5597ff52bfa4dbdaf Mon Sep 17 00:00:00 2001 From: Ottatop Date: Wed, 21 Feb 2024 19:40:11 -0600 Subject: [PATCH 02/11] Get a signal to work --- api/lua/pinnacle-api-dev-1.rockspec | 1 + api/lua/pinnacle/grpc/client.lua | 70 ++++++++--- api/lua/pinnacle/grpc/protobuf.lua | 22 ++++ api/lua/pinnacle/signal.lua | 177 ++++++++++++++++++++++++++++ api/lua/pinnacle/tag.lua | 5 + api/lua/pinnacle/window.lua | 2 + src/api.rs | 32 +++-- src/api/signal.rs | 23 +++- 8 files changed, 296 insertions(+), 36 deletions(-) create mode 100644 api/lua/pinnacle/signal.lua diff --git a/api/lua/pinnacle-api-dev-1.rockspec b/api/lua/pinnacle-api-dev-1.rockspec index 3a727d9..13e56f9 100644 --- a/api/lua/pinnacle-api-dev-1.rockspec +++ b/api/lua/pinnacle-api-dev-1.rockspec @@ -26,5 +26,6 @@ build = { ["pinnacle.tag"] = "pinnacle/tag.lua", ["pinnacle.window"] = "pinnacle/window.lua", ["pinnacle.util"] = "pinnacle/util.lua", + ["pinnacle.signal"] = "pinnacle/signal.lua", }, } diff --git a/api/lua/pinnacle/grpc/client.lua b/api/lua/pinnacle/grpc/client.lua index eb62da5..7cbaacd 100644 --- a/api/lua/pinnacle/grpc/client.lua +++ b/api/lua/pinnacle/grpc/client.lua @@ -5,6 +5,7 @@ local socket = require("cqueues.socket") local headers = require("http.headers") local h2_connection = require("http.h2_connection") +local protobuf = require("pinnacle.grpc.protobuf") local pb = require("pb") ---@nodoc @@ -40,6 +41,8 @@ end ---@class H2Connection ---@field new_stream function +---@class H2Stream + ---@nodoc ---@class Client ---@field conn H2Connection @@ -76,12 +79,7 @@ function client.unary_request(grpc_request_params) local response_type = grpc_request_params.response_type or "google.protobuf.Empty" local data = grpc_request_params.data - local encoded_protobuf = assert(pb.encode(request_type, data), "wrong table schema") - - local packed_prefix = string.pack("I1", 0) - local payload_len = string.pack(">I4", encoded_protobuf:len()) - - local body = packed_prefix .. payload_len .. encoded_protobuf + local body = protobuf.encode(request_type, data) stream:write_headers(create_request_headers(service, method), false) stream:write_chunk(body, true) @@ -126,18 +124,7 @@ function client.server_streaming_request(grpc_request_params, callback) local response_type = grpc_request_params.response_type or "google.protobuf.Empty" local data = grpc_request_params.data - local success, obj = pcall(pb.encode, request_type, data) - if not success then - print("failed to encode:", obj, "for", service, method, request_type, response_type) - os.exit(1) - end - - local encoded_protobuf = obj - - local packed_prefix = string.pack("I1", 0) - local payload_len = string.pack(">I4", encoded_protobuf:len()) - - local body = packed_prefix .. payload_len .. encoded_protobuf + local body = protobuf.encode(request_type, data) stream:write_headers(create_request_headers(service, method), false) stream:write_chunk(body, true) @@ -175,6 +162,51 @@ end ---@param callback fun(response: table) --- ---@return H2Stream -function client.bidirectional_streaming_request(grpc_request_params, callback) end +function client.bidirectional_streaming_request(grpc_request_params, callback) + local stream = client.conn:new_stream() + + local service = grpc_request_params.service + local method = grpc_request_params.method + local request_type = grpc_request_params.request_type + local response_type = grpc_request_params.response_type or "google.protobuf.Empty" + local data = grpc_request_params.data + + local body = protobuf.encode(request_type, data) + + stream:write_headers(create_request_headers(service, method), false) + stream:write_chunk(body, false) + + -- TODO: check response headers for errors + local _ = stream:get_headers() + + client.loop:wrap(function() + for response_body in stream:each_chunk() do + -- Skip the 1-byte compressed flag and the 4-byte message length + ---@diagnostic disable-next-line: redefined-local + local response_body = response_body:sub(6) + + ---@diagnostic disable-next-line: redefined-local + local success, obj = pcall(pb.decode, response_type, response_body) + if not success then + print(obj) + os.exit(1) + end + + local response = obj + callback(response) + end + + local trailers = stream:get_headers() + if trailers then + for name, value, never_index in trailers:each() do + print(name, value, never_index) + end + end + + print("AFTER bidirectional_streaming_request ENDS") + end) + + return stream +end return client diff --git a/api/lua/pinnacle/grpc/protobuf.lua b/api/lua/pinnacle/grpc/protobuf.lua index 187888d..7c2c359 100644 --- a/api/lua/pinnacle/grpc/protobuf.lua +++ b/api/lua/pinnacle/grpc/protobuf.lua @@ -17,6 +17,7 @@ function protobuf.build_protos() PINNACLE_PROTO_DIR .. "/pinnacle/output/" .. version .. "/output.proto", PINNACLE_PROTO_DIR .. "/pinnacle/process/" .. version .. "/process.proto", PINNACLE_PROTO_DIR .. "/pinnacle/window/" .. version .. "/window.proto", + PINNACLE_PROTO_DIR .. "/pinnacle/signal/" .. version .. "/signal.proto", } local cmd = "protoc --descriptor_set_out=/tmp/pinnacle.pb --proto_path=" .. PINNACLE_PROTO_DIR .. " " @@ -38,4 +39,25 @@ function protobuf.build_protos() pb.option("enum_as_value") end +---Encode the given `data` as the protobuf `type`. +---@param type string The absolute protobuf type +---@param data table The table of data, conforming to its protobuf definition +---@return string buffer The encoded buffer +function protobuf.encode(type, data) + local success, obj = pcall(pb.encode, type, data) + if not success then + print("failed to encode:", obj, "type:", type) + os.exit(1) + end + + local encoded_protobuf = obj + + local packed_prefix = string.pack("I1", 0) + local payload_len = string.pack(">I4", encoded_protobuf:len()) + + local body = packed_prefix .. payload_len .. encoded_protobuf + + return body +end + return protobuf diff --git a/api/lua/pinnacle/signal.lua b/api/lua/pinnacle/signal.lua new file mode 100644 index 0000000..4783064 --- /dev/null +++ b/api/lua/pinnacle/signal.lua @@ -0,0 +1,177 @@ +-- This Source Code Form is subject to the terms of the Mozilla Public +-- License, v. 2.0. If a copy of the MPL was not distributed with this +-- file, You can obtain one at https://mozilla.org/MPL/2.0/. + +local client = require("pinnacle.grpc.client") + +---The protobuf absolute path prefix +local prefix = "pinnacle.signal." .. client.version .. "." +local service = prefix .. "SignalService" + +---@type table +---@enum (key) SignalServiceMethod +local rpc_types = { + OutputConnect = { + response_type = "OutputConnectResponse", + }, + Layout = { + response_type = "LayoutResponse", + }, + WindowPointerEnter = { + response_type = "WindowPointerEnterResponse", + }, + WindowPointerLeave = { + response_type = "WindowPointerLeaveResponse", + }, +} + +---Build GrpcRequestParams +---@param method SignalServiceMethod +---@param data table +---@return GrpcRequestParams +local function build_grpc_request_params(method, data) + local req_type = rpc_types[method].request_type + local resp_type = rpc_types[method].response_type + + ---@type GrpcRequestParams + return { + service = service, + method = method, + request_type = req_type and prefix .. req_type or prefix .. method .. "Request", + response_type = resp_type and prefix .. resp_type, + data = data, + } +end + +local stream_control = { + UNSPECIFIED = 0, + READY = 1, + DISCONNECT = 2, +} + +local signals = { + output_connect = { + ---@type H2Stream? + sender = nil, + ---@type (fun(output: OutputHandle))[] + callbacks = {}, + }, + layout = { + ---@type H2Stream? + sender = nil, + ---@type (fun(windows: WindowHandle[], tag: TagHandle))[] + callbacks = {}, + }, + window_pointer_enter = { + ---@type H2Stream? + sender = nil, + ---@type (fun(output: OutputHandle))[] + callbacks = {}, + }, + window_pointer_leave = { + ---@type H2Stream? + sender = nil, + ---@type (fun(output: OutputHandle))[] + callbacks = {}, + }, +} + +---@class Signal +local signal = {} + +---@param fn fun(windows: WindowHandle[], tag: TagHandle) +function signal.layout_add(fn) + if #signals.layout.callbacks == 0 then + signal.layout_connect() + end + + table.insert(signals.layout.callbacks, fn) +end + +function signal.layout_dc() + signal.layout_disconnect() +end + +function signal.output_connect_connect() + local stream = client.bidirectional_streaming_request( + build_grpc_request_params("OutputConnect", { + control = stream_control.READY, + }), + function(response) + ---@diagnostic disable-next-line: invisible + local handle = require("pinnacle.output").handle.new(response.output_name) + for _, callback in ipairs(signals.output_connect.callbacks) do + callback(handle) + end + + local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. "OutputConnectRequest", { + control = stream_control.READY, + }) + + if signals.layout.sender then + signals.layout.sender:write_chunk(chunk) + end + end + ) + + signals.output_connect.sender = stream +end + +function signal.output_connect_disconnect() + if signals.output_connect.sender then + local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. "OutputConnectRequest", { + control = stream_control.DISCONNECT, + }) + + signals.output_connect.sender:write_chunk(chunk) + signals.output_connect.sender = nil + end +end + +function signal.layout_connect() + local stream = client.bidirectional_streaming_request( + build_grpc_request_params("Layout", { + control = stream_control.READY, + }), + function(response) + ---@diagnostic disable-next-line: invisible + local window_handles = require("pinnacle.window").handle.new_from_table(response.window_ids or {}) + ---@diagnostic disable-next-line: invisible + local tag_handle = require("pinnacle.tag").handle.new(response.tag_id) + + for _, callback in ipairs(signals.layout.callbacks) do + print("calling layout callback") + callback(window_handles, tag_handle) + end + + print("creating control request") + local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. "LayoutRequest", { + control = stream_control.READY, + }) + + if signals.layout.sender then + local success, err = pcall(signals.layout.sender.write_chunk, signals.layout.sender, chunk) + if not success then + print("error sending to stream:", err) + os.exit(1) + end + end + end + ) + + signals.layout.sender = stream +end + +function signal.layout_disconnect() + if signals.layout.sender then + local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. "LayoutRequest", { + control = stream_control.DISCONNECT, + }) + + signals.layout.sender:write_chunk(chunk) + signals.layout.sender = nil + end + signals.layout.callbacks = {} +end + +return signal diff --git a/api/lua/pinnacle/tag.lua b/api/lua/pinnacle/tag.lua index 7497389..e1b61d2 100644 --- a/api/lua/pinnacle/tag.lua +++ b/api/lua/pinnacle/tag.lua @@ -319,6 +319,11 @@ function tag.new_layout_cycler(layouts) } end +---@param fn fun(windows: WindowHandle[], tag: TagHandle) +function tag.connect_layout(fn) + require("pinnacle.signal").layout_add(fn) +end + ---Remove this tag. --- ---### Example diff --git a/api/lua/pinnacle/window.lua b/api/lua/pinnacle/window.lua index 7e287fd..7a7668c 100644 --- a/api/lua/pinnacle/window.lua +++ b/api/lua/pinnacle/window.lua @@ -69,7 +69,9 @@ local WindowHandle = {} ---This module helps you deal with setting windows to fullscreen and maximized, setting their size, ---moving them between tags, and various other actions. ---@class Window +---@field private handle WindowHandleModule local window = {} +window.handle = window_handle ---Get all windows. --- diff --git a/src/api.rs b/src/api.rs index 5e11e99..92887b2 100644 --- a/src/api.rs +++ b/src/api.rs @@ -46,6 +46,7 @@ use sysinfo::ProcessRefreshKind; use tokio::{ io::AsyncBufReadExt, sync::mpsc::{unbounded_channel, UnboundedSender}, + task::JoinHandle, }; use tokio_stream::{Stream, StreamExt}; use tonic::{Request, Response, Status, Streaming}; @@ -60,8 +61,6 @@ use crate::{ window::{window_state::WindowId, WindowElement}, }; -use self::signal::SignalData; - type ResponseStream = Pin> + Send>>; pub type StateFnSender = calloop::channel::Sender>; @@ -134,31 +133,33 @@ fn run_bidirectional_streaming( ) -> Result>, Status> where F1: Fn(&mut State, Result) + Clone + Send + 'static, - F2: FnOnce(&mut State, UnboundedSender>) + Send + 'static, + F2: FnOnce(&mut State, UnboundedSender>, JoinHandle<()>) + Send + 'static, I: Send + 'static, O: Send + 'static, { let (sender, receiver) = unbounded_channel::>(); - let with_out_stream = Box::new(|state: &mut State| { - with_out_stream(state, sender); - }); - - fn_sender - .send(with_out_stream) - .map_err(|_| Status::internal("failed to execute request"))?; + let fn_sender_clone = fn_sender.clone(); let with_in_stream = async move { while let Some(t) = in_stream.next().await { let with_client_item = with_client_item.clone(); // TODO: handle error - let _ = fn_sender.send(Box::new(move |state: &mut State| { + let _ = fn_sender_clone.send(Box::new(move |state: &mut State| { with_client_item(state, t); })); } }; - tokio::spawn(with_in_stream); + let join_handle = tokio::spawn(with_in_stream); + + let with_out_stream = Box::new(|state: &mut State| { + with_out_stream(state, sender, join_handle); + }); + + fn_sender + .send(with_out_stream) + .map_err(|_| Status::internal("failed to execute request"))?; let receiver_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(receiver); Ok(Response::new(Box::pin(receiver_stream))) @@ -732,6 +733,13 @@ impl tag_service_server::TagService for TagService { state.update_windows(&output); state.update_focus(&output); state.schedule_render(&output); + + state.signal_state.layout.signal(|_| { + pinnacle_api_defs::pinnacle::signal::v0alpha1::LayoutResponse { + window_ids: vec![1, 2, 3], + tag_id: Some(1), + } + }); }) .await } diff --git a/src/api/signal.rs b/src/api/signal.rs index 5b10c74..452982c 100644 --- a/src/api/signal.rs +++ b/src/api/signal.rs @@ -3,7 +3,7 @@ use pinnacle_api_defs::pinnacle::signal::v0alpha1::{ OutputConnectResponse, StreamControl, WindowPointerEnterRequest, WindowPointerEnterResponse, WindowPointerLeaveRequest, WindowPointerLeaveResponse, }; -use tokio::sync::mpsc::UnboundedSender; +use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; use tonic::{Request, Response, Status, Streaming}; use crate::state::State; @@ -21,6 +21,7 @@ pub struct SignalState { #[derive(Debug, Default)] pub struct SignalData { sender: Option>>, + join_handle: Option>, ready: bool, value: Option, } @@ -41,12 +42,22 @@ impl SignalData { } } - pub fn connect(&mut self, sender: UnboundedSender>) { + pub fn connect( + &mut self, + sender: UnboundedSender>, + join_handle: JoinHandle<()>, + ) { self.sender.replace(sender); + if let Some(handle) = self.join_handle.replace(join_handle) { + handle.abort(); + } } fn disconnect(&mut self) { self.sender.take(); + if let Some(handle) = self.join_handle.take() { + handle.abort(); + } self.ready = false; self.value.take(); } @@ -88,7 +99,7 @@ impl_signal_request!( WindowPointerLeaveRequest ); -fn start_signal_stream( +fn start_signal_stream( sender: StateFnSender, in_stream: Streaming, signal: impl Fn(&mut State) -> &mut SignalData + Clone + Send + 'static, @@ -111,6 +122,8 @@ where } }; + tracing::info!("GOT {request:?} FROM CLIENT STREAM"); + let signal = signal(state); match request.control() { StreamControl::Ready => signal.ready(), @@ -118,9 +131,9 @@ where StreamControl::Unspecified => tracing::warn!("Received unspecified stream control"), } }, - move |state, sender| { + move |state, sender, join_handle| { let signal = signal_clone(state); - signal.connect(sender); + signal.connect(sender, join_handle); }, ) } From 7d94fc43629a07aa50c630953812a88da8098dfd Mon Sep 17 00:00:00 2001 From: Ottatop Date: Wed, 21 Feb 2024 22:48:09 -0600 Subject: [PATCH 03/11] Add output connect signal untested --- api/lua/pinnacle-api-dev-1.rockspec | 1 + api/lua/pinnacle/grpc/client.lua | 4 +- api/lua/pinnacle/output.lua | 23 ++++ api/lua/pinnacle/signal.lua | 195 ++++++++++++++++++---------- api/lua/pinnacle/tag.lua | 22 +++- api/lua/pinnacle/tag/layout.lua | 4 + src/api.rs | 43 +++--- src/api/signal.rs | 83 ++++++++---- src/backend/udev.rs | 10 +- src/window.rs | 101 ++++++++------ src/window/window_state.rs | 10 +- 11 files changed, 323 insertions(+), 173 deletions(-) create mode 100644 api/lua/pinnacle/tag/layout.lua diff --git a/api/lua/pinnacle-api-dev-1.rockspec b/api/lua/pinnacle-api-dev-1.rockspec index 13e56f9..ef7d345 100644 --- a/api/lua/pinnacle-api-dev-1.rockspec +++ b/api/lua/pinnacle-api-dev-1.rockspec @@ -24,6 +24,7 @@ build = { ["pinnacle.output"] = "pinnacle/output.lua", ["pinnacle.process"] = "pinnacle/process.lua", ["pinnacle.tag"] = "pinnacle/tag.lua", + ["pinnacle.tag.layout"] = "pinnacle/tag/layout.lua", ["pinnacle.window"] = "pinnacle/window.lua", ["pinnacle.util"] = "pinnacle/util.lua", ["pinnacle.signal"] = "pinnacle/signal.lua", diff --git a/api/lua/pinnacle/grpc/client.lua b/api/lua/pinnacle/grpc/client.lua index 7cbaacd..5382fd7 100644 --- a/api/lua/pinnacle/grpc/client.lua +++ b/api/lua/pinnacle/grpc/client.lua @@ -42,6 +42,8 @@ end ---@field new_stream function ---@class H2Stream +---@field write_chunk function +---@field shutdown function ---@nodoc ---@class Client @@ -202,8 +204,6 @@ function client.bidirectional_streaming_request(grpc_request_params, callback) print(name, value, never_index) end end - - print("AFTER bidirectional_streaming_request ENDS") end) return stream diff --git a/api/lua/pinnacle/output.lua b/api/lua/pinnacle/output.lua index b35da69..ff0e90b 100644 --- a/api/lua/pinnacle/output.lua +++ b/api/lua/pinnacle/output.lua @@ -166,6 +166,29 @@ function output.connect_for_all(callback) end) end +---@class OutputSignal +---@field connect fun(output: OutputHandle)? + +---@param signals OutputSignal +---@return SignalHandles +function output.connect_signal(signals) + ---@diagnostic disable-next-line: invisible + local handles = require("pinnacle.signal").handles.new({}) + + for signal, callback in pairs(signals) do + if signal == "connect" then + require("pinnacle.signal").add_callback("OutputConnect", callback) + ---@diagnostic disable-next-line: invisible + local handle = require("pinnacle.signal").handle.new("OutputConnect", callback) + handles[signal] = handle + end + end + + return handles +end + +--------------------------------------------------------------------- + ---Set the location of this output in the global space. --- ---On startup, Pinnacle will lay out all connected outputs starting at (0, 0) diff --git a/api/lua/pinnacle/signal.lua b/api/lua/pinnacle/signal.lua index 4783064..4ed7110 100644 --- a/api/lua/pinnacle/signal.lua +++ b/api/lua/pinnacle/signal.lua @@ -49,108 +49,156 @@ local stream_control = { DISCONNECT = 2, } +---@type table local signals = { - output_connect = { + OutputConnect = { ---@type H2Stream? sender = nil, ---@type (fun(output: OutputHandle))[] callbacks = {}, + ---@type fun(response: table) + on_response = nil, }, - layout = { + Layout = { ---@type H2Stream? sender = nil, - ---@type (fun(windows: WindowHandle[], tag: TagHandle))[] + ---@type (fun(tag: TagHandle, windows: WindowHandle[]))[] callbacks = {}, + ---@type fun(response: table) + on_response = nil, }, - window_pointer_enter = { + WindowPointerEnter = { ---@type H2Stream? sender = nil, ---@type (fun(output: OutputHandle))[] callbacks = {}, + ---@type fun(response: table) + on_response = nil, }, - window_pointer_leave = { + WindowPointerLeave = { ---@type H2Stream? sender = nil, ---@type (fun(output: OutputHandle))[] callbacks = {}, + ---@type fun(response: table) + on_response = nil, }, } +signals.OutputConnect.on_response = function(response) + ---@diagnostic disable-next-line: invisible + local handle = require("pinnacle.output").handle.new(response.output_name) + for _, callback in ipairs(signals.OutputConnect.callbacks) do + callback(handle) + end +end + +signals.Layout.on_response = function(response) + ---@diagnostic disable-next-line: invisible + local window_handles = require("pinnacle.window").handle.new_from_table(response.window_ids or {}) + ---@diagnostic disable-next-line: invisible + local tag_handle = require("pinnacle.tag").handle.new(response.tag_id) + + for _, callback in ipairs(signals.Layout.callbacks) do + callback(tag_handle, window_handles) + end +end + +----------------------------------------------------------------------------- + +---@class SignalHandleModule +local signal_handle = {} + +---@class SignalHandle +---@field signal SignalServiceMethod +---@field callback function The callback you connected +local SignalHandle = {} + +---@class SignalHandlesModule +local signal_handles = {} + +---@class SignalHandles +local SignalHandles = {} + ---@class Signal +---@field private handle SignalHandleModule +---@field private handles SignalHandlesModule local signal = {} +signal.handle = signal_handle +signal.handles = signal_handles ----@param fn fun(windows: WindowHandle[], tag: TagHandle) -function signal.layout_add(fn) - if #signals.layout.callbacks == 0 then - signal.layout_connect() - end - - table.insert(signals.layout.callbacks, fn) -end - -function signal.layout_dc() - signal.layout_disconnect() -end - -function signal.output_connect_connect() - local stream = client.bidirectional_streaming_request( - build_grpc_request_params("OutputConnect", { - control = stream_control.READY, - }), - function(response) - ---@diagnostic disable-next-line: invisible - local handle = require("pinnacle.output").handle.new(response.output_name) - for _, callback in ipairs(signals.output_connect.callbacks) do - callback(handle) - end - - local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. "OutputConnectRequest", { - control = stream_control.READY, - }) - - if signals.layout.sender then - signals.layout.sender:write_chunk(chunk) - end +function SignalHandle:disconnect() + local cb_index = nil + for i, cb in ipairs(signals[self.signal].callbacks) do + if cb == self.callback then + cb_index = i + break end - ) + end - signals.output_connect.sender = stream -end + if cb_index then + table.remove(signals[self.signal].callbacks, cb_index) + end -function signal.output_connect_disconnect() - if signals.output_connect.sender then - local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. "OutputConnectRequest", { - control = stream_control.DISCONNECT, - }) - - signals.output_connect.sender:write_chunk(chunk) - signals.output_connect.sender = nil + if #signals[self.signal].callbacks == 0 then + signal.disconnect(self.signal) end end -function signal.layout_connect() +---@return SignalHandle +function signal_handle.new(request, callback) + ---@type SignalHandle + local self = { + signal = request, + callback = callback, + } + setmetatable(self, { __index = SignalHandle }) + return self +end + +---@param self table +function SignalHandles:disconnect_all() + for _, sig in pairs(self) do + sig:disconnect() + end +end + +---@param signal_hdls table +---@return SignalHandles +function signal_handles.new(signal_hdls) + ---@type SignalHandles + local self = signal_hdls + setmetatable(self, { __index = SignalHandles }) + return self +end + +---@param request SignalServiceMethod +---@param callback function +function signal.add_callback(request, callback) + if #signals[request].callbacks == 0 then + signal.connect(request, signals[request].on_response) + end + + table.insert(signals[request].callbacks, callback) +end + +---@param request SignalServiceMethod +---@param callback fun(response: table) +function signal.connect(request, callback) local stream = client.bidirectional_streaming_request( build_grpc_request_params("Layout", { control = stream_control.READY, }), function(response) - ---@diagnostic disable-next-line: invisible - local window_handles = require("pinnacle.window").handle.new_from_table(response.window_ids or {}) - ---@diagnostic disable-next-line: invisible - local tag_handle = require("pinnacle.tag").handle.new(response.tag_id) + callback(response) - for _, callback in ipairs(signals.layout.callbacks) do - print("calling layout callback") - callback(window_handles, tag_handle) - end + if signals[request].sender then + local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. request .. "Request", { + control = stream_control.READY, + }) - print("creating control request") - local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. "LayoutRequest", { - control = stream_control.READY, - }) + local success, err = pcall(signals[request].sender.write_chunk, signals[request].sender, chunk) - if signals.layout.sender then - local success, err = pcall(signals.layout.sender.write_chunk, signals.layout.sender, chunk) if not success then print("error sending to stream:", err) os.exit(1) @@ -159,19 +207,26 @@ function signal.layout_connect() end ) - signals.layout.sender = stream + signals[request].sender = stream end -function signal.layout_disconnect() - if signals.layout.sender then - local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. "LayoutRequest", { +---This should only be called when call callbacks for the signal are removed +---@param request SignalServiceMethod +function signal.disconnect(request) + if signals[request].sender then + local chunk = require("pinnacle.grpc.protobuf").encode(prefix .. request .. "Request", { control = stream_control.DISCONNECT, }) - signals.layout.sender:write_chunk(chunk) - signals.layout.sender = nil + local success, err = pcall(signals[request].sender.write_chunk, signals[request].sender, chunk) + if not success then + print("error sending to stream:", err) + os.exit(1) + end + + signals[request].sender:shutdown() + signals[request].sender = nil end - signals.layout.callbacks = {} end return signal diff --git a/api/lua/pinnacle/tag.lua b/api/lua/pinnacle/tag.lua index e1b61d2..7e2fbd7 100644 --- a/api/lua/pinnacle/tag.lua +++ b/api/lua/pinnacle/tag.lua @@ -319,9 +319,25 @@ function tag.new_layout_cycler(layouts) } end ----@param fn fun(windows: WindowHandle[], tag: TagHandle) -function tag.connect_layout(fn) - require("pinnacle.signal").layout_add(fn) +---@class TagSignal +---@field layout fun(tag: TagHandle, windows: WindowHandle[])? + +---@param signals TagSignal +---@return SignalHandles +function tag.connect_signal(signals) + ---@diagnostic disable-next-line: invisible + local handles = require("pinnacle.signal").handles.new({}) + + for signal, callback in pairs(signals) do + if signal == "layout" then + require("pinnacle.signal").add_callback("Layout", callback) + ---@diagnostic disable-next-line: invisible + local handle = require("pinnacle.signal").handle.new("Layout", callback) + handles[signal] = handle + end + end + + return handles end ---Remove this tag. diff --git a/api/lua/pinnacle/tag/layout.lua b/api/lua/pinnacle/tag/layout.lua new file mode 100644 index 0000000..b184211 --- /dev/null +++ b/api/lua/pinnacle/tag/layout.lua @@ -0,0 +1,4 @@ +---@class LayoutModule +local layout = {} + +return layout diff --git a/src/api.rs b/src/api.rs index 92887b2..d91f610 100644 --- a/src/api.rs +++ b/src/api.rs @@ -128,7 +128,7 @@ where fn run_bidirectional_streaming( fn_sender: StateFnSender, mut in_stream: Streaming, - with_client_item: F1, + with_client_request: F1, with_out_stream: F2, ) -> Result>, Status> where @@ -142,11 +142,11 @@ where let fn_sender_clone = fn_sender.clone(); let with_in_stream = async move { - while let Some(t) = in_stream.next().await { - let with_client_item = with_client_item.clone(); + while let Some(request) = in_stream.next().await { + let with_client_request = with_client_request.clone(); // TODO: handle error let _ = fn_sender_clone.send(Box::new(move |state: &mut State| { - with_client_item(state, t); + with_client_request(state, request); })); } }; @@ -734,11 +734,13 @@ impl tag_service_server::TagService for TagService { state.update_focus(&output); state.schedule_render(&output); - state.signal_state.layout.signal(|_| { - pinnacle_api_defs::pinnacle::signal::v0alpha1::LayoutResponse { - window_ids: vec![1, 2, 3], - tag_id: Some(1), - } + state.signal_state.layout.signal(|buffer| { + buffer.push_back( + pinnacle_api_defs::pinnacle::signal::v0alpha1::LayoutResponse { + window_ids: vec![1, 2, 3], + tag_id: Some(1), + }, + ); }); }) .await @@ -1125,7 +1127,7 @@ impl window_service_server::WindowService for WindowService { async fn close(&self, request: Request) -> Result, Status> { let request = request.into_inner(); - let window_id = WindowId::Some( + let window_id = WindowId( request .window_id .ok_or_else(|| Status::invalid_argument("no window specified"))?, @@ -1154,7 +1156,7 @@ impl window_service_server::WindowService for WindowService { tracing::info!(request = ?request); - let window_id = WindowId::Some( + let window_id = WindowId( request .window_id .ok_or_else(|| Status::invalid_argument("no window specified"))?, @@ -1205,7 +1207,7 @@ impl window_service_server::WindowService for WindowService { ) -> Result, Status> { let request = request.into_inner(); - let window_id = WindowId::Some( + let window_id = WindowId( request .window_id .ok_or_else(|| Status::invalid_argument("no window specified"))?, @@ -1248,7 +1250,7 @@ impl window_service_server::WindowService for WindowService { ) -> Result, Status> { let request = request.into_inner(); - let window_id = WindowId::Some( + let window_id = WindowId( request .window_id .ok_or_else(|| Status::invalid_argument("no window specified"))?, @@ -1291,7 +1293,7 @@ impl window_service_server::WindowService for WindowService { ) -> Result, Status> { let request = request.into_inner(); - let window_id = WindowId::Some( + let window_id = WindowId( request .window_id .ok_or_else(|| Status::invalid_argument("no window specified"))?, @@ -1334,7 +1336,7 @@ impl window_service_server::WindowService for WindowService { ) -> Result, Status> { let request = request.into_inner(); - let window_id = WindowId::Some( + let window_id = WindowId( request .window_id .ok_or_else(|| Status::invalid_argument("no window specified"))?, @@ -1362,7 +1364,7 @@ impl window_service_server::WindowService for WindowService { async fn set_tag(&self, request: Request) -> Result, Status> { let request = request.into_inner(); - let window_id = WindowId::Some( + let window_id = WindowId( request .window_id .ok_or_else(|| Status::invalid_argument("no window specified"))?, @@ -1518,12 +1520,7 @@ impl window_service_server::WindowService for WindowService { let window_ids = state .windows .iter() - .map(|win| { - win.with_state(|state| match state.id { - WindowId::None => unreachable!(), - WindowId::Some(id) => id, - }) - }) + .map(|win| win.with_state(|state| state.id.0)) .collect::>(); window::v0alpha1::GetResponse { window_ids } @@ -1537,7 +1534,7 @@ impl window_service_server::WindowService for WindowService { ) -> Result, Status> { let request = request.into_inner(); - let window_id = WindowId::Some( + let window_id = WindowId( request .window_id .ok_or_else(|| Status::invalid_argument("no window specified"))?, diff --git a/src/api/signal.rs b/src/api/signal.rs index 452982c..0bfeaaa 100644 --- a/src/api/signal.rs +++ b/src/api/signal.rs @@ -1,3 +1,5 @@ +use std::collections::VecDeque; + use pinnacle_api_defs::pinnacle::signal::v0alpha1::{ signal_service_server, LayoutRequest, LayoutResponse, OutputConnectRequest, OutputConnectResponse, StreamControl, WindowPointerEnterRequest, WindowPointerEnterResponse, @@ -12,33 +14,61 @@ use super::{run_bidirectional_streaming, ResponseStream, StateFnSender}; #[derive(Debug, Default)] pub struct SignalState { - pub output_connect: SignalData, - pub layout: SignalData, - pub window_pointer_enter: SignalData, - pub window_pointer_leave: SignalData, + pub output_connect: SignalData>, + pub layout: SignalData>, + pub window_pointer_enter: + SignalData>, + pub window_pointer_leave: + SignalData>, } #[derive(Debug, Default)] -pub struct SignalData { +#[allow(private_bounds)] +pub struct SignalData> { sender: Option>>, join_handle: Option>, ready: bool, - value: Option, + buffer: B, } -impl SignalData { - pub fn signal(&mut self, with_data: impl FnOnce(Option) -> T) { +/// A trait that denotes different types of containers that can be used to buffer signals. +trait SignalBuffer: Default { + /// Get the next signal from this buffer. + fn next(&mut self) -> Option; +} + +impl SignalBuffer for VecDeque { + fn next(&mut self) -> Option { + self.pop_front() + } +} + +impl SignalBuffer for Option { + fn next(&mut self) -> Option { + self.take() + } +} + +#[allow(private_bounds)] +impl> SignalData { + /// Attempt to send a signal. + /// + /// If the client is ready to accept more of this signal, it will be sent immediately. + /// Otherwise, the signal will remain stored in the underlying buffer until the client is ready. + /// + /// Use `with_buffer` to populate and manipulate the buffer with the data you want. + pub fn signal(&mut self, with_buffer: impl FnOnce(&mut B)) { let Some(sender) = self.sender.as_ref() else { return; }; + with_buffer(&mut self.buffer); + if self.ready { - sender - .send(Ok(with_data(self.value.take()))) - .expect("failed to send signal"); - self.ready = false; - } else { - self.value = Some(with_data(self.value.take())); + if let Some(data) = self.buffer.next() { + sender.send(Ok(data)).expect("failed to send signal"); + self.ready = false; + } } } @@ -59,16 +89,19 @@ impl SignalData { handle.abort(); } self.ready = false; - self.value.take(); + self.buffer = B::default(); } + /// Mark this signal as ready to send. + /// + /// If there are signals already in the buffer, they will be sent. fn ready(&mut self) { let Some(sender) = self.sender.as_ref() else { return; }; - if let Some(value) = self.value.take() { - sender.send(Ok(value)).expect("failed to send signal"); + if let Some(data) = self.buffer.next() { + sender.send(Ok(data)).expect("failed to send signal"); self.ready = false; } else { self.ready = true; @@ -99,16 +132,18 @@ impl_signal_request!( WindowPointerLeaveRequest ); -fn start_signal_stream( +fn start_signal_stream( sender: StateFnSender, in_stream: Streaming, - signal: impl Fn(&mut State) -> &mut SignalData + Clone + Send + 'static, + with_signal_buffer: F, ) -> Result>, Status> where - I: Send + 'static, + I: SignalRequest + std::fmt::Debug + Send + 'static, O: Send + 'static, + B: SignalBuffer, + F: Fn(&mut State) -> &mut SignalData + Clone + Send + 'static, { - let signal_clone = signal.clone(); + let with_signal_buffer_clone = with_signal_buffer.clone(); run_bidirectional_streaming( sender, @@ -122,9 +157,9 @@ where } }; - tracing::info!("GOT {request:?} FROM CLIENT STREAM"); + tracing::debug!("Got {request:?} from client stream"); - let signal = signal(state); + let signal = with_signal_buffer(state); match request.control() { StreamControl::Ready => signal.ready(), StreamControl::Disconnect => signal.disconnect(), @@ -132,7 +167,7 @@ where } }, move |state, sender, join_handle| { - let signal = signal_clone(state); + let signal = with_signal_buffer_clone(state); signal.connect(sender, join_handle); }, ) diff --git a/src/backend/udev.rs b/src/backend/udev.rs index e3948cc..e4339ab 100644 --- a/src/backend/udev.rs +++ b/src/backend/udev.rs @@ -8,7 +8,9 @@ use std::{ }; use anyhow::Context; -use pinnacle_api_defs::pinnacle::output::v0alpha1::ConnectForAllResponse; +use pinnacle_api_defs::pinnacle::{ + output::v0alpha1::ConnectForAllResponse, signal::v0alpha1::OutputConnectResponse, +}; use smithay::{ backend::{ allocator::{ @@ -989,6 +991,12 @@ impl State { output_name: Some(output.name()), })); } + + self.signal_state.output_connect.signal(|buffer| { + buffer.push_back(OutputConnectResponse { + output_name: Some(output.name()), + }) + }); } } diff --git a/src/window.rs b/src/window.rs index 7cb43a9..c20f973 100644 --- a/src/window.rs +++ b/src/window.rs @@ -4,6 +4,9 @@ pub mod rules; use std::{cell::RefCell, time::Duration}; +use pinnacle_api_defs::pinnacle::signal::v0alpha1::{ + WindowPointerEnterResponse, WindowPointerLeaveResponse, +}; use smithay::{ backend::input::KeyState, desktop::{ @@ -330,33 +333,40 @@ impl WindowElement { } impl PointerTarget for WindowElement { - fn frame(&self, seat: &Seat, data: &mut State) { + fn frame(&self, seat: &Seat, state: &mut State) { match self { - WindowElement::Wayland(window) => window.frame(seat, data), + WindowElement::Wayland(window) => window.frame(seat, state), WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - surface.frame(seat, data) + surface.frame(seat, state) } _ => unreachable!(), } } - fn enter(&self, seat: &Seat, data: &mut State, event: &MotionEvent) { + fn enter(&self, seat: &Seat, state: &mut State, event: &MotionEvent) { // TODO: ssd match self { - WindowElement::Wayland(window) => PointerTarget::enter(window, seat, data, event), + WindowElement::Wayland(window) => PointerTarget::enter(window, seat, state, event), WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - PointerTarget::enter(surface, seat, data, event) + PointerTarget::enter(surface, seat, state, event) } _ => unreachable!(), } + + let window_id = Some(self.with_state(|state| state.id.0)); + + state + .signal_state + .window_pointer_enter + .signal(|buffer| buffer.push_back(WindowPointerEnterResponse { window_id })); } - fn motion(&self, seat: &Seat, data: &mut State, event: &MotionEvent) { + fn motion(&self, seat: &Seat, state: &mut State, event: &MotionEvent) { // TODO: ssd match self { - WindowElement::Wayland(window) => PointerTarget::motion(window, seat, data, event), + WindowElement::Wayland(window) => PointerTarget::motion(window, seat, state, event), WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - PointerTarget::motion(surface, seat, data, event) + PointerTarget::motion(surface, seat, state, event) } _ => unreachable!(), } @@ -365,16 +375,16 @@ impl PointerTarget for WindowElement { fn relative_motion( &self, seat: &Seat, - data: &mut State, + state: &mut State, event: &smithay::input::pointer::RelativeMotionEvent, ) { // TODO: ssd match self { WindowElement::Wayland(window) => { - PointerTarget::relative_motion(window, seat, data, event); + PointerTarget::relative_motion(window, seat, state, event); } WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - PointerTarget::relative_motion(surface, seat, data, event); + PointerTarget::relative_motion(surface, seat, state, event); } _ => unreachable!(), } @@ -383,47 +393,54 @@ impl PointerTarget for WindowElement { fn button( &self, seat: &Seat, - data: &mut State, + state: &mut State, event: &smithay::input::pointer::ButtonEvent, ) { // TODO: ssd match self { - WindowElement::Wayland(window) => PointerTarget::button(window, seat, data, event), + WindowElement::Wayland(window) => PointerTarget::button(window, seat, state, event), WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - PointerTarget::button(surface, seat, data, event) + PointerTarget::button(surface, seat, state, event) } _ => unreachable!(), } } - fn axis(&self, seat: &Seat, data: &mut State, frame: AxisFrame) { + fn axis(&self, seat: &Seat, state: &mut State, frame: AxisFrame) { // TODO: ssd match self { - WindowElement::Wayland(window) => PointerTarget::axis(window, seat, data, frame), + WindowElement::Wayland(window) => PointerTarget::axis(window, seat, state, frame), WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - PointerTarget::axis(surface, seat, data, frame) + PointerTarget::axis(surface, seat, state, frame) } _ => unreachable!(), } } - fn leave(&self, seat: &Seat, data: &mut State, serial: Serial, time: u32) { + fn leave(&self, seat: &Seat, state: &mut State, serial: Serial, time: u32) { // TODO: ssd match self { WindowElement::Wayland(window) => { - PointerTarget::leave(window, seat, data, serial, time); + PointerTarget::leave(window, seat, state, serial, time); } WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - PointerTarget::leave(surface, seat, data, serial, time) + PointerTarget::leave(surface, seat, state, serial, time) } _ => unreachable!(), } + + let window_id = Some(self.with_state(|state| state.id.0)); + + state + .signal_state + .window_pointer_leave + .signal(|buffer| buffer.push_back(WindowPointerLeaveResponse { window_id })); } fn gesture_swipe_begin( &self, _seat: &Seat, - _data: &mut State, + _state: &mut State, _event: &smithay::input::pointer::GestureSwipeBeginEvent, ) { todo!() @@ -432,7 +449,7 @@ impl PointerTarget for WindowElement { fn gesture_swipe_update( &self, _seat: &Seat, - _data: &mut State, + _state: &mut State, _event: &smithay::input::pointer::GestureSwipeUpdateEvent, ) { todo!() @@ -441,7 +458,7 @@ impl PointerTarget for WindowElement { fn gesture_swipe_end( &self, _seat: &Seat, - _data: &mut State, + _state: &mut State, _event: &smithay::input::pointer::GestureSwipeEndEvent, ) { todo!() @@ -450,7 +467,7 @@ impl PointerTarget for WindowElement { fn gesture_pinch_begin( &self, _seat: &Seat, - _data: &mut State, + _state: &mut State, _event: &smithay::input::pointer::GesturePinchBeginEvent, ) { todo!() @@ -459,7 +476,7 @@ impl PointerTarget for WindowElement { fn gesture_pinch_update( &self, _seat: &Seat, - _data: &mut State, + _state: &mut State, _event: &smithay::input::pointer::GesturePinchUpdateEvent, ) { todo!() @@ -468,7 +485,7 @@ impl PointerTarget for WindowElement { fn gesture_pinch_end( &self, _seat: &Seat, - _data: &mut State, + _state: &mut State, _event: &smithay::input::pointer::GesturePinchEndEvent, ) { todo!() @@ -477,7 +494,7 @@ impl PointerTarget for WindowElement { fn gesture_hold_begin( &self, _seat: &Seat, - _data: &mut State, + _state: &mut State, _event: &smithay::input::pointer::GestureHoldBeginEvent, ) { todo!() @@ -486,7 +503,7 @@ impl PointerTarget for WindowElement { fn gesture_hold_end( &self, _seat: &Seat, - _data: &mut State, + _state: &mut State, _event: &smithay::input::pointer::GestureHoldEndEvent, ) { todo!() @@ -497,26 +514,26 @@ impl KeyboardTarget for WindowElement { fn enter( &self, seat: &Seat, - data: &mut State, + state: &mut State, keys: Vec>, serial: Serial, ) { match self { WindowElement::Wayland(window) => { - KeyboardTarget::enter(window, seat, data, keys, serial); + KeyboardTarget::enter(window, seat, state, keys, serial); } WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - KeyboardTarget::enter(surface, seat, data, keys, serial) + KeyboardTarget::enter(surface, seat, state, keys, serial) } _ => unreachable!(), } } - fn leave(&self, seat: &Seat, data: &mut State, serial: Serial) { + fn leave(&self, seat: &Seat, state: &mut State, serial: Serial) { match self { - WindowElement::Wayland(window) => KeyboardTarget::leave(window, seat, data, serial), + WindowElement::Wayland(window) => KeyboardTarget::leave(window, seat, state, serial), WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - KeyboardTarget::leave(surface, seat, data, serial) + KeyboardTarget::leave(surface, seat, state, serial) } _ => unreachable!(), } @@ -525,18 +542,18 @@ impl KeyboardTarget for WindowElement { fn key( &self, seat: &Seat, - data: &mut State, + state: &mut State, key: KeysymHandle<'_>, - state: KeyState, + key_state: KeyState, serial: Serial, time: u32, ) { match self { WindowElement::Wayland(window) => { - KeyboardTarget::key(window, seat, data, key, state, serial, time); + KeyboardTarget::key(window, seat, state, key, key_state, serial, time); } WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - KeyboardTarget::key(surface, seat, data, key, state, serial, time); + KeyboardTarget::key(surface, seat, state, key, key_state, serial, time); } _ => unreachable!(), } @@ -545,16 +562,16 @@ impl KeyboardTarget for WindowElement { fn modifiers( &self, seat: &Seat, - data: &mut State, + state: &mut State, modifiers: ModifiersState, serial: Serial, ) { match self { WindowElement::Wayland(window) => { - KeyboardTarget::modifiers(window, seat, data, modifiers, serial); + KeyboardTarget::modifiers(window, seat, state, modifiers, serial); } WindowElement::X11(surface) | WindowElement::X11OverrideRedirect(surface) => { - KeyboardTarget::modifiers(surface, seat, data, modifiers, serial); + KeyboardTarget::modifiers(surface, seat, state, modifiers, serial); } _ => unreachable!(), } diff --git a/src/window/window_state.rs b/src/window/window_state.rs index 69a3f52..e9db349 100644 --- a/src/window/window_state.rs +++ b/src/window/window_state.rs @@ -17,20 +17,14 @@ use super::WindowElement; /// A unique identifier for each window. #[derive(Debug, Hash, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)] -pub enum WindowId { - /// A config API returned an invalid window. It should be using this variant. - None, - /// A valid window id. - #[serde(untagged)] - Some(u32), -} +pub struct WindowId(pub u32); static WINDOW_ID_COUNTER: AtomicU32 = AtomicU32::new(0); impl WindowId { /// Get the next available window id. This always starts at 0. pub fn next() -> Self { - Self::Some(WINDOW_ID_COUNTER.fetch_add(1, Ordering::Relaxed)) + Self(WINDOW_ID_COUNTER.fetch_add(1, Ordering::Relaxed)) } /// Get the window that has this WindowId. From 6b2b7066ac82f5c1539449d36b474b911e01f55d Mon Sep 17 00:00:00 2001 From: Ottatop Date: Wed, 21 Feb 2024 23:30:28 -0600 Subject: [PATCH 04/11] Make `connect_for_all` use signals --- api/lua/pinnacle/output.lua | 8 +++----- api/lua/pinnacle/signal.lua | 2 +- .../pinnacle/output/v0alpha1/output.proto | 6 ------ .../pinnacle/signal/v0alpha1/signal.proto | 2 -- src/api.rs | 18 +----------------- src/backend/udev.rs | 11 +---------- src/config.rs | 6 ++---- 7 files changed, 8 insertions(+), 45 deletions(-) diff --git a/api/lua/pinnacle/output.lua b/api/lua/pinnacle/output.lua index ff0e90b..6e802b8 100644 --- a/api/lua/pinnacle/output.lua +++ b/api/lua/pinnacle/output.lua @@ -159,11 +159,9 @@ function output.connect_for_all(callback) callback(handle) end - client.server_streaming_request(build_grpc_request_params("ConnectForAll", {}), function(response) - local output_name = response.output_name - local handle = output_handle.new(output_name) - callback(handle) - end) + output.connect_signal({ + connect = callback, + }) end ---@class OutputSignal diff --git a/api/lua/pinnacle/signal.lua b/api/lua/pinnacle/signal.lua index 4ed7110..c876ae1 100644 --- a/api/lua/pinnacle/signal.lua +++ b/api/lua/pinnacle/signal.lua @@ -186,7 +186,7 @@ end ---@param callback fun(response: table) function signal.connect(request, callback) local stream = client.bidirectional_streaming_request( - build_grpc_request_params("Layout", { + build_grpc_request_params(request, { control = stream_control.READY, }), function(response) diff --git a/api/protocol/pinnacle/output/v0alpha1/output.proto b/api/protocol/pinnacle/output/v0alpha1/output.proto index 620551b..8f2bc49 100644 --- a/api/protocol/pinnacle/output/v0alpha1/output.proto +++ b/api/protocol/pinnacle/output/v0alpha1/output.proto @@ -10,11 +10,6 @@ message SetLocationRequest { optional int32 y = 3; } -message ConnectForAllRequest {} -message ConnectForAllResponse { - optional string output_name = 1; -} - message GetRequest {} message GetResponse { repeated string output_names = 1; @@ -41,7 +36,6 @@ message GetPropertiesResponse { service OutputService { rpc SetLocation(SetLocationRequest) returns (google.protobuf.Empty); - rpc ConnectForAll(ConnectForAllRequest) returns (stream ConnectForAllResponse); rpc Get(GetRequest) returns (GetResponse); rpc GetProperties(GetPropertiesRequest) returns (GetPropertiesResponse); } diff --git a/api/protocol/pinnacle/signal/v0alpha1/signal.proto b/api/protocol/pinnacle/signal/v0alpha1/signal.proto index c1da727..4a0150a 100644 --- a/api/protocol/pinnacle/signal/v0alpha1/signal.proto +++ b/api/protocol/pinnacle/signal/v0alpha1/signal.proto @@ -2,8 +2,6 @@ syntax = "proto2"; package pinnacle.signal.v0alpha1; -import "google/protobuf/empty.proto"; - enum StreamControl { STREAM_CONTROL_UNSPECIFIED = 0; // The client is ready to receive the next signal. diff --git a/src/api.rs b/src/api.rs index d91f610..7ed2142 100644 --- a/src/api.rs +++ b/src/api.rs @@ -12,9 +12,7 @@ use pinnacle_api_defs::pinnacle::{ }, output::{ self, - v0alpha1::{ - output_service_server, ConnectForAllRequest, ConnectForAllResponse, SetLocationRequest, - }, + v0alpha1::{output_service_server, SetLocationRequest}, }, process::v0alpha1::{process_service_server, SetEnvRequest, SpawnRequest, SpawnResponse}, tag::{ @@ -944,8 +942,6 @@ impl OutputService { #[tonic::async_trait] impl output_service_server::OutputService for OutputService { - type ConnectForAllStream = ResponseStream; - async fn set_location( &self, request: Request, @@ -997,18 +993,6 @@ impl output_service_server::OutputService for OutputService { .await } - // TODO: remove this and integrate it into a signal/event system - async fn connect_for_all( - &self, - _request: Request, - ) -> Result, Status> { - tracing::trace!("OutputService.connect_for_all"); - - run_server_streaming(&self.sender, move |state, sender| { - state.config.output_callback_senders.push(sender); - }) - } - async fn get( &self, _request: Request, diff --git a/src/backend/udev.rs b/src/backend/udev.rs index e4339ab..d014ab4 100644 --- a/src/backend/udev.rs +++ b/src/backend/udev.rs @@ -8,9 +8,7 @@ use std::{ }; use anyhow::Context; -use pinnacle_api_defs::pinnacle::{ - output::v0alpha1::ConnectForAllResponse, signal::v0alpha1::OutputConnectResponse, -}; +use pinnacle_api_defs::pinnacle::signal::v0alpha1::OutputConnectResponse; use smithay::{ backend::{ allocator::{ @@ -985,13 +983,6 @@ impl State { output.with_state(|state| state.tags = tags.clone()); } else { - // Run any output callbacks - for sender in self.config.output_callback_senders.iter() { - let _ = sender.send(Ok(ConnectForAllResponse { - output_name: Some(output.name()), - })); - } - self.signal_state.output_connect.signal(|buffer| { buffer.push_back(OutputConnectResponse { output_name: Some(output.name()), diff --git a/src/config.rs b/src/config.rs index ebedc1c..99a6a7d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -17,7 +17,7 @@ use std::{ use anyhow::Context; use pinnacle_api_defs::pinnacle::{ input::v0alpha1::input_service_server::InputServiceServer, - output::v0alpha1::{output_service_server::OutputServiceServer, ConnectForAllResponse}, + output::v0alpha1::output_service_server::OutputServiceServer, process::v0alpha1::process_service_server::ProcessServiceServer, signal::v0alpha1::signal_service_server::SignalServiceServer, tag::v0alpha1::tag_service_server::TagServiceServer, @@ -30,7 +30,7 @@ use smithay::{ utils::{Logical, Point}, }; use sysinfo::ProcessRefreshKind; -use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; +use tokio::task::JoinHandle; use toml::Table; use xdg::BaseDirectories; @@ -166,7 +166,6 @@ pub enum Key { pub struct Config { /// Window rules and conditions on when those rules should apply pub window_rules: Vec<(WindowRuleCondition, WindowRule)>, - pub output_callback_senders: Vec>>, /// Saved states when outputs are disconnected pub connector_saved_states: HashMap, @@ -177,7 +176,6 @@ pub struct Config { impl Config { pub fn clear(&mut self, loop_handle: &LoopHandle) { self.window_rules.clear(); - self.output_callback_senders.clear(); self.connector_saved_states.clear(); if let Some(join_handle) = self.config_join_handle.take() { join_handle.abort(); From d76777a633eae38a1e0a0eda78187266b744db56 Mon Sep 17 00:00:00 2001 From: Ottatop Date: Wed, 21 Feb 2024 23:56:19 -0600 Subject: [PATCH 05/11] Add window pointer enter and leave signals --- api/lua/pinnacle/output.lua | 14 ++++++++------ api/lua/pinnacle/signal.lua | 22 ++++++++++++++++++++-- api/lua/pinnacle/tag.lua | 14 ++++++++------ api/lua/pinnacle/window.lua | 27 +++++++++++++++++++++++++++ src/api/signal.rs | 9 +++++++++ src/config.rs | 2 ++ 6 files changed, 74 insertions(+), 14 deletions(-) diff --git a/api/lua/pinnacle/output.lua b/api/lua/pinnacle/output.lua index 6e802b8..2e58cf6 100644 --- a/api/lua/pinnacle/output.lua +++ b/api/lua/pinnacle/output.lua @@ -164,6 +164,10 @@ function output.connect_for_all(callback) }) end +local signal_name_to_SignalName = { + connect = "OutputConnect", +} + ---@class OutputSignal ---@field connect fun(output: OutputHandle)? @@ -174,12 +178,10 @@ function output.connect_signal(signals) local handles = require("pinnacle.signal").handles.new({}) for signal, callback in pairs(signals) do - if signal == "connect" then - require("pinnacle.signal").add_callback("OutputConnect", callback) - ---@diagnostic disable-next-line: invisible - local handle = require("pinnacle.signal").handle.new("OutputConnect", callback) - handles[signal] = handle - end + require("pinnacle.signal").add_callback(signal_name_to_SignalName[signal], callback) + ---@diagnostic disable-next-line: invisible + local handle = require("pinnacle.signal").handle.new(signal_name_to_SignalName[signal], callback) + handles[signal] = handle end return handles diff --git a/api/lua/pinnacle/signal.lua b/api/lua/pinnacle/signal.lua index c876ae1..e0985cb 100644 --- a/api/lua/pinnacle/signal.lua +++ b/api/lua/pinnacle/signal.lua @@ -70,7 +70,7 @@ local signals = { WindowPointerEnter = { ---@type H2Stream? sender = nil, - ---@type (fun(output: OutputHandle))[] + ---@type (fun(window: WindowHandle))[] callbacks = {}, ---@type fun(response: table) on_response = nil, @@ -78,7 +78,7 @@ local signals = { WindowPointerLeave = { ---@type H2Stream? sender = nil, - ---@type (fun(output: OutputHandle))[] + ---@type (fun(window: WindowHandle))[] callbacks = {}, ---@type fun(response: table) on_response = nil, @@ -104,6 +104,24 @@ signals.Layout.on_response = function(response) end end +signals.WindowPointerEnter.on_response = function(response) + ---@diagnostic disable-next-line: invisible + local window_handle = require("pinnacle.window").handle.new(response.window_id) + + for _, callback in ipairs(signals.WindowPointerEnter.callbacks) do + callback(window_handle) + end +end + +signals.WindowPointerLeave.on_response = function(response) + ---@diagnostic disable-next-line: invisible + local window_handle = require("pinnacle.window").handle.new(response.window_id) + + for _, callback in ipairs(signals.WindowPointerLeave.callbacks) do + callback(window_handle) + end +end + ----------------------------------------------------------------------------- ---@class SignalHandleModule diff --git a/api/lua/pinnacle/tag.lua b/api/lua/pinnacle/tag.lua index 7e2fbd7..fb4efbd 100644 --- a/api/lua/pinnacle/tag.lua +++ b/api/lua/pinnacle/tag.lua @@ -319,6 +319,10 @@ function tag.new_layout_cycler(layouts) } end +local signal_name_to_SignalName = { + layout = "Layout", +} + ---@class TagSignal ---@field layout fun(tag: TagHandle, windows: WindowHandle[])? @@ -329,12 +333,10 @@ function tag.connect_signal(signals) local handles = require("pinnacle.signal").handles.new({}) for signal, callback in pairs(signals) do - if signal == "layout" then - require("pinnacle.signal").add_callback("Layout", callback) - ---@diagnostic disable-next-line: invisible - local handle = require("pinnacle.signal").handle.new("Layout", callback) - handles[signal] = handle - end + require("pinnacle.signal").add_callback(signal_name_to_SignalName[signal], callback) + ---@diagnostic disable-next-line: invisible + local handle = require("pinnacle.signal").handle.new(signal_name_to_SignalName[signal], callback) + handles[signal] = handle end return handles diff --git a/api/lua/pinnacle/window.lua b/api/lua/pinnacle/window.lua index 7a7668c..266cf78 100644 --- a/api/lua/pinnacle/window.lua +++ b/api/lua/pinnacle/window.lua @@ -311,6 +311,33 @@ function window.add_window_rule(rule) })) end +local signal_name_to_SignalName = { + pointer_enter = "WindowPointerEnter", + pointer_leave = "WindowPointerLeave", +} + +---@class WindowSignal +---@field pointer_enter fun(window: WindowHandle)? +---@field pointer_leave fun(window: WindowHandle)? + +---@param signals WindowSignal +---@return SignalHandles +function window.connect_signal(signals) + ---@diagnostic disable-next-line: invisible + local handles = require("pinnacle.signal").handles.new({}) + + for signal, callback in pairs(signals) do + require("pinnacle.signal").add_callback(signal_name_to_SignalName[signal], callback) + ---@diagnostic disable-next-line: invisible + local handle = require("pinnacle.signal").handle.new(signal_name_to_SignalName[signal], callback) + handles[signal] = handle + end + + return handles +end + +------------------------------------------------------------------------ + ---Send a close request to this window. --- ---### Example diff --git a/src/api/signal.rs b/src/api/signal.rs index 0bfeaaa..46beaf8 100644 --- a/src/api/signal.rs +++ b/src/api/signal.rs @@ -22,6 +22,15 @@ pub struct SignalState { SignalData>, } +impl SignalState { + pub fn clear(&mut self) { + self.output_connect.disconnect(); + self.layout.disconnect(); + self.window_pointer_enter.disconnect(); + self.window_pointer_leave.disconnect(); + } +} + #[derive(Debug, Default)] #[allow(private_bounds)] pub struct SignalData> { diff --git a/src/config.rs b/src/config.rs index 99a6a7d..e3b1ce3 100644 --- a/src/config.rs +++ b/src/config.rs @@ -265,6 +265,8 @@ impl State { self.config.clear(&self.loop_handle); + self.signal_state.clear(); + // Because the grpc server is implemented to only start once, // any updates to `socket_dir` won't be applied until restart. if self.grpc_server_join_handle.is_none() { From 2e0a8b940aeedd9b4bc64471411d835f8561a279 Mon Sep 17 00:00:00 2001 From: Ottatop Date: Fri, 23 Feb 2024 16:24:43 -0600 Subject: [PATCH 06/11] ong I don't remember --- Cargo.lock | 1 + Cargo.toml | 3 +- api/rust/Cargo.toml | 1 + api/rust/examples/default_config/main.rs | 4 +- api/rust/src/input.rs | 7 +- api/rust/src/lib.rs | 32 +- api/rust/src/output.rs | 107 +++---- api/rust/src/process.rs | 5 +- api/rust/src/signal.rs | 372 +++++++++++++++++++++++ api/rust/src/tag.rs | 96 +++--- api/rust/src/window.rs | 52 ++-- pinnacle-api-defs/src/lib.rs | 30 ++ src/api/signal.rs | 27 +- 13 files changed, 564 insertions(+), 173 deletions(-) create mode 100644 api/rust/src/signal.rs diff --git a/Cargo.lock b/Cargo.lock index c9ca093..62a28f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1622,6 +1622,7 @@ dependencies = [ "pinnacle-api-defs", "pinnacle-api-macros", "tokio", + "tokio-stream", "tonic", "tower", "xkbcommon", diff --git a/Cargo.toml b/Cargo.toml index 28cbeb8..ebc15eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ repository = "https://github.com/pinnacle-comp/pinnacle/" [workspace.dependencies] tokio = { version = "1.36.0", features = ["macros", "rt-multi-thread"]} +tokio-stream = { version = "0.1.14", features = ["net"] } prost = "0.12.3" tonic = "0.11.0" @@ -61,7 +62,7 @@ tonic = { workspace = true } tonic-reflection = { workspace = true } tokio = { workspace = true, features = ["process", "io-util", "signal"] } -tokio-stream = { version = "0.1.14", features = ["net"] } +tokio-stream = { workspace = true } bitflags = "2.4.2" pinnacle-api-defs = { workspace = true } diff --git a/api/rust/Cargo.toml b/api/rust/Cargo.toml index f5baa05..f964845 100644 --- a/api/rust/Cargo.toml +++ b/api/rust/Cargo.toml @@ -13,6 +13,7 @@ categories = ["api-bindings", "config"] pinnacle-api-defs = { workspace = true } pinnacle-api-macros = { path = "./pinnacle-api-macros" } tokio = { workspace = true, features = ["net"] } +tokio-stream = { workspace = true } tonic = { workspace = true } tower = { version = "0.4.13", features = ["util"] } futures = "0.3.30" diff --git a/api/rust/examples/default_config/main.rs b/api/rust/examples/default_config/main.rs index 7462eb5..1980f49 100644 --- a/api/rust/examples/default_config/main.rs +++ b/api/rust/examples/default_config/main.rs @@ -83,10 +83,10 @@ async fn main() { // Setup all monitors with tags "1" through "5" output.connect_for_all(move |op| { - let mut tags = tag.add(&op, tag_names); + let tags = tag.add(op, tag_names); // Be sure to set a tag to active or windows won't display - tags.next().unwrap().set_active(true); + tags.first().unwrap().set_active(true); }); process.spawn_once([terminal]); diff --git a/api/rust/src/input.rs b/api/rust/src/input.rs index 2605a36..43a784a 100644 --- a/api/rust/src/input.rs +++ b/api/rust/src/input.rs @@ -8,7 +8,7 @@ //! methods for setting key- and mousebinds, changing xkeyboard settings, and more. //! View the struct's documentation for more information. -use futures::{channel::mpsc::UnboundedSender, future::BoxFuture, FutureExt, StreamExt}; +use futures::{future::BoxFuture, FutureExt, StreamExt}; use num_enum::TryFromPrimitive; use pinnacle_api_defs::pinnacle::input::{ self, @@ -19,6 +19,7 @@ use pinnacle_api_defs::pinnacle::input::{ SetXkbConfigRequest, }, }; +use tokio::sync::mpsc::UnboundedSender; use tonic::transport::Channel; use xkbcommon::xkb::Keysym; @@ -162,7 +163,7 @@ impl Input { let modifiers = mods.into_iter().map(|modif| modif as i32).collect(); self.fut_sender - .unbounded_send( + .send( async move { let mut stream = client .set_keybind(SetKeybindRequest { @@ -218,7 +219,7 @@ impl Input { let modifiers = mods.into_iter().map(|modif| modif as i32).collect(); self.fut_sender - .unbounded_send( + .send( async move { let mut stream = client .set_mousebind(SetMousebindRequest { diff --git a/api/rust/src/lib.rs b/api/rust/src/lib.rs index 68a2edf..f9a1b66 100644 --- a/api/rust/src/lib.rs +++ b/api/rust/src/lib.rs @@ -81,15 +81,18 @@ use std::sync::OnceLock; -use futures::{ - channel::mpsc::UnboundedReceiver, future::BoxFuture, stream::FuturesUnordered, Future, - StreamExt, -}; +use futures::{future::BoxFuture, stream::FuturesUnordered, Future, StreamExt}; use input::Input; use output::Output; use pinnacle::Pinnacle; use process::Process; +use signal::SignalState; use tag::Tag; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedReceiver}, + RwLock, +}; +use tokio_stream::wrappers::UnboundedReceiverStream; use tonic::transport::{Endpoint, Uri}; use tower::service_fn; use window::Window; @@ -98,6 +101,7 @@ pub mod input; pub mod output; pub mod pinnacle; pub mod process; +pub mod signal; pub mod tag; pub mod util; pub mod window; @@ -112,6 +116,7 @@ static WINDOW: OnceLock = OnceLock::new(); static INPUT: OnceLock = OnceLock::new(); static OUTPUT: OnceLock = OnceLock::new(); static TAG: OnceLock = OnceLock::new(); +static SIGNAL: OnceLock> = OnceLock::new(); /// A struct containing static references to all of the configuration structs. #[derive(Debug, Clone, Copy)] @@ -145,16 +150,21 @@ pub async fn connect( })) .await?; - let (fut_sender, fut_recv) = futures::channel::mpsc::unbounded::>(); - - let output = Output::new(channel.clone(), fut_sender.clone()); + let (fut_sender, fut_recv) = unbounded_channel::>(); let pinnacle = PINNACLE.get_or_init(|| Pinnacle::new(channel.clone())); let process = PROCESS.get_or_init(|| Process::new(channel.clone(), fut_sender.clone())); let window = WINDOW.get_or_init(|| Window::new(channel.clone())); let input = INPUT.get_or_init(|| Input::new(channel.clone(), fut_sender.clone())); - let tag = TAG.get_or_init(|| Tag::new(channel.clone(), fut_sender.clone())); - let output = OUTPUT.get_or_init(|| output); + let tag = TAG.get_or_init(|| Tag::new(channel.clone())); + let output = OUTPUT.get_or_init(|| Output::new(channel.clone())); + + SIGNAL + .set(RwLock::new(SignalState::new( + channel.clone(), + fut_sender.clone(), + ))) + .map_err(|_| "failed to create SIGNAL")?; let modules = ApiModules { pinnacle, @@ -176,10 +186,12 @@ pub async fn connect( /// This function is inserted at the end of your config through the [`config`] macro. /// You should use the macro instead of this function directly. pub async fn listen(fut_recv: UnboundedReceiver>) { + let fut_recv = UnboundedReceiverStream::new(fut_recv); + let mut future_set = FuturesUnordered::< BoxFuture<( Option>, - Option>>, + Option>>, )>, >::new(); diff --git a/api/rust/src/output.rs b/api/rust/src/output.rs index d8b27e3..8d7bad8 100644 --- a/api/rust/src/output.rs +++ b/api/rust/src/output.rs @@ -9,39 +9,40 @@ //! This module provides [`Output`], which allows you to get [`OutputHandle`]s for different //! connected monitors and set them up. -use futures::{channel::mpsc::UnboundedSender, future::BoxFuture, FutureExt, StreamExt}; -use pinnacle_api_defs::pinnacle::{ - output::{ - self, - v0alpha1::{ - output_service_client::OutputServiceClient, ConnectForAllRequest, SetLocationRequest, - }, - }, - tag::v0alpha1::tag_service_client::TagServiceClient, +use futures::FutureExt; +use pinnacle_api_defs::pinnacle::output::{ + self, + v0alpha1::{output_service_client::OutputServiceClient, SetLocationRequest}, }; use tonic::transport::Channel; -use crate::{block_on_tokio, tag::TagHandle, util::Batch}; +use crate::{ + block_on_tokio, + signal::{OutputSignal, SignalHandle}, + tag::TagHandle, + util::Batch, + SIGNAL, TAG, +}; /// A struct that allows you to get handles to connected outputs and set them up. /// /// See [`OutputHandle`] for more information. #[derive(Debug, Clone)] pub struct Output { - fut_sender: UnboundedSender>, output_client: OutputServiceClient, - tag_client: TagServiceClient, } impl Output { - pub(crate) fn new( - channel: Channel, - fut_sender: UnboundedSender>, - ) -> Self { + pub(crate) fn new(channel: Channel) -> Self { Self { output_client: OutputServiceClient::new(channel.clone()), - tag_client: TagServiceClient::new(channel), - fut_sender, + } + } + + pub(crate) fn new_handle(&self, name: impl Into) -> OutputHandle { + OutputHandle { + name: name.into(), + output_client: self.output_client.clone(), } } @@ -52,14 +53,14 @@ impl Output { /// ``` /// let outputs = output.get_all(); /// ``` - pub fn get_all(&self) -> impl Iterator { + pub fn get_all(&self) -> Vec { block_on_tokio(self.get_all_async()) } /// The async version of [`Output::get_all`]. - pub async fn get_all_async(&self) -> impl Iterator { + pub async fn get_all_async(&self) -> Vec { let mut client = self.output_client.clone(); - let tag_client = self.tag_client.clone(); + client .get(output::v0alpha1::GetRequest {}) .await @@ -67,11 +68,8 @@ impl Output { .into_inner() .output_names .into_iter() - .map(move |name| OutputHandle { - output_client: client.clone(), - tag_client: tag_client.clone(), - name, - }) + .map(move |name| self.new_handle(name)) + .collect() } /// Get a handle to the output with the given name. @@ -93,6 +91,7 @@ impl Output { let name: String = name.into(); self.get_all_async() .await + .into_iter() .find(|output| output.name == name) } @@ -107,6 +106,7 @@ impl Output { /// ``` pub fn get_focused(&self) -> Option { self.get_all() + .into_iter() .find(|output| matches!(output.props().focused, Some(true))) } @@ -137,40 +137,26 @@ impl Output { /// tags.next().unwrap().set_active(true); /// }); /// ``` - pub fn connect_for_all(&self, mut for_all: impl FnMut(OutputHandle) + Send + 'static) { + pub fn connect_for_all(&self, mut for_all: impl FnMut(&OutputHandle) + Send + 'static) { for output in self.get_all() { - for_all(output); + for_all(&output); } - let mut client = self.output_client.clone(); - let tag_client = self.tag_client.clone(); + let mut signal_state = block_on_tokio(SIGNAL.get().expect("SIGNAL doesn't exist").write()); + signal_state.output_connect.add_callback(Box::new(for_all)); + } - self.fut_sender - .unbounded_send( - async move { - let mut stream = client - .connect_for_all(ConnectForAllRequest {}) - .await - .unwrap() - .into_inner(); + /// Connect to an output signal. + /// + /// The compositor will fire off signals that your config can listen for and act upon. + /// You can pass in an [`OutputSignal`] along with a callback and it will get run + /// with the necessary arguments every time a signal of that type is received. + pub fn connect_signal(&self, signal: OutputSignal) -> SignalHandle { + let mut signal_state = block_on_tokio(SIGNAL.get().expect("SIGNAL doesn't exist").write()); - while let Some(Ok(response)) = stream.next().await { - let Some(output_name) = response.output_name else { - continue; - }; - - let output = OutputHandle { - output_client: client.clone(), - tag_client: tag_client.clone(), - name: output_name, - }; - - for_all(output); - } - } - .boxed(), - ) - .unwrap(); + match signal { + OutputSignal::Connect(f) => signal_state.output_connect.add_callback(f), + } } } @@ -179,9 +165,8 @@ impl Output { /// This allows you to manipulate outputs and get their properties. #[derive(Clone, Debug)] pub struct OutputHandle { - pub(crate) output_client: OutputServiceClient, - pub(crate) tag_client: TagServiceClient, pub(crate) name: String, + output_client: OutputServiceClient, } impl PartialEq for OutputHandle { @@ -407,6 +392,8 @@ impl OutputHandle { .unwrap() .into_inner(); + let tag = TAG.get().expect("TAG doesn't exist"); + OutputProperties { make: response.make, model: response.model, @@ -421,11 +408,7 @@ impl OutputHandle { tags: response .tag_ids .into_iter() - .map(|id| TagHandle { - tag_client: self.tag_client.clone(), - output_client: self.output_client.clone(), - id, - }) + .map(|id| tag.new_handle(id)) .collect(), } } diff --git a/api/rust/src/process.rs b/api/rust/src/process.rs index c93a063..3192aaf 100644 --- a/api/rust/src/process.rs +++ b/api/rust/src/process.rs @@ -7,10 +7,11 @@ //! This module provides [`Process`], which allows you to spawn processes and set environment //! variables. -use futures::{channel::mpsc::UnboundedSender, future::BoxFuture, FutureExt, StreamExt}; +use futures::{future::BoxFuture, FutureExt, StreamExt}; use pinnacle_api_defs::pinnacle::process::v0alpha1::{ process_service_client::ProcessServiceClient, SetEnvRequest, SpawnRequest, }; +use tokio::sync::mpsc::UnboundedSender; use tonic::transport::Channel; use crate::block_on_tokio; @@ -133,7 +134,7 @@ impl Process { }; self.fut_sender - .unbounded_send( + .send( async move { let mut stream = client.spawn(request).await.unwrap().into_inner(); let Some(mut callbacks) = callbacks else { return }; diff --git a/api/rust/src/signal.rs b/api/rust/src/signal.rs new file mode 100644 index 0000000..24e7630 --- /dev/null +++ b/api/rust/src/signal.rs @@ -0,0 +1,372 @@ +//! Signals TODO: + +use std::{ + collections::{btree_map, BTreeMap}, + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, +}; + +use futures::{future::BoxFuture, pin_mut, FutureExt}; +use pinnacle_api_defs::pinnacle::signal::v0alpha1::{ + signal_service_client::SignalServiceClient, SignalRequest, StreamControl, +}; +use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedSender}, + oneshot, +}; +use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; +use tonic::{transport::Channel, Streaming}; + +use crate::{ + block_on_tokio, output::OutputHandle, tag::TagHandle, window::WindowHandle, OUTPUT, TAG, WINDOW, +}; + +pub(crate) trait Signal { + type Callback; +} + +macro_rules! signals { + ( $( + $( #[$cfg_enum:meta] )* $enum:ident => { + $( + $( #[$cfg:meta] )* $name:ident = { + enum_name = $renamed:ident, + callback_type = $cb:ty, + client_request = $req:ident, + on_response = $on_resp:expr, + } + )* + } + )* ) => {$( + $( + $( #[$cfg] )* + pub(crate) struct $name; + + impl $crate::signal::Signal for $name { + type Callback = $cb; + } + + impl SignalData<$name> { + pub(crate) fn add_callback(&mut self, callback: <$name as Signal>::Callback) -> SignalHandle { + if self.callback_count.load(::std::sync::atomic::Ordering::SeqCst) == 0 { + self.connect() + } + + let Some(callback_sender) = self.callback_sender.as_ref() else { + unreachable!("signal should already be connected here"); + }; + + let Some(remove_callback_sender) = self.remove_callback_sender.clone() else { + unreachable!("signal should already be connected here"); + }; + + callback_sender + .send((self.current_id, callback)) + .expect("failed to send callback"); + + let handle = SignalHandle::new(self.current_id, remove_callback_sender); + + self.current_id.0 += 1; + + handle + } + + fn reset(&mut self) { + self.callback_sender.take(); + self.dc_pinger.take(); + self.remove_callback_sender.take(); + self.callback_count.store(0, Ordering::SeqCst); + self.current_id = SignalConnId::default(); + } + + fn connect(&mut self) { + self.reset(); + + let channels = connect_signal::<_, _, <$name as Signal>::Callback, _, _>( + &self.fut_sender, + self.callback_count.clone(), + |out| { + block_on_tokio(self.client.$req(out)) + .expect("TODO") + .into_inner() + }, + $on_resp, + ); + + self.callback_sender.replace(channels.callback_sender); + self.dc_pinger.replace(channels.dc_pinger); + self.remove_callback_sender + .replace(channels.remove_callback_sender); + } + } + )* + + $( #[$cfg_enum] )* + pub enum $enum { + $( $( #[$cfg] )* $renamed($cb),)* + } + )*}; +} + +signals! { + /// Signals relating to tag events. + TagSignal => { + /// The compositor requested that the given windows be laid out. + Layout = { + enum_name = Layout, + callback_type = LayoutFn, + client_request = layout, + on_response = |response, callbacks| { + if let Some(tag_id) = response.tag_id { + let tag = TAG.get().expect("TAG doesn't exist"); + let window = WINDOW.get().expect("WINDOW doesn't exist"); + let tag = tag.new_handle(tag_id); + + let windows = response + .window_ids + .into_iter() + .map(|id| window.new_handle(id)) + .collect::>(); + + for callback in callbacks { + callback(&tag, windows.as_slice()); + } + } + }, + } + } + /// Signals relating to output events. + OutputSignal => { + /// An output was connected. + OutputConnect = { + enum_name = Connect, + callback_type = SingleOutputFn, + client_request = output_connect, + on_response = |response, callbacks| { + if let Some(output_name) = response.output_name { + let output = OUTPUT.get().expect("OUTPUT doesn't exist"); + let handle = output.new_handle(output_name); + + for callback in callbacks { + callback(&handle); + } + } + }, + } + } + /// Signals relating to window events. + WindowSignal => { + /// The pointer entered a window. + WindowPointerEnter = { + enum_name = PointerEnter, + callback_type = SingleWindowFn, + client_request = window_pointer_enter, + on_response = |response, callbacks| { + if let Some(window_id) = response.window_id { + let window = WINDOW.get().expect("WINDOW doesn't exist"); + let handle = window.new_handle(window_id); + + for callback in callbacks { + callback(&handle); + } + } + }, + } + /// The pointer left a window. + WindowPointerLeave = { + enum_name = PointerLeave, + callback_type = SingleWindowFn, + client_request = window_pointer_leave, + on_response = |response, callbacks| { + if let Some(window_id) = response.window_id { + let window = WINDOW.get().expect("WINDOW doesn't exist"); + let handle = window.new_handle(window_id); + + for callback in callbacks { + callback(&handle); + } + } + }, + } + } +} + +pub(crate) type LayoutFn = Box; +pub(crate) type SingleOutputFn = Box; +pub(crate) type SingleWindowFn = Box; + +pub(crate) struct SignalState { + pub(crate) layout: SignalData, + pub(crate) output_connect: SignalData, + pub(crate) window_pointer_enter: SignalData, + pub(crate) window_pointer_leave: SignalData, +} + +impl SignalState { + pub(crate) fn new( + channel: Channel, + fut_sender: UnboundedSender>, + ) -> Self { + let client = SignalServiceClient::new(channel); + Self { + layout: SignalData::new(client.clone(), fut_sender.clone()), + output_connect: SignalData::new(client.clone(), fut_sender.clone()), + window_pointer_enter: SignalData::new(client.clone(), fut_sender.clone()), + window_pointer_leave: SignalData::new(client.clone(), fut_sender.clone()), + } + } +} + +#[derive(Default, Clone, Copy, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] +pub(crate) struct SignalConnId(pub(crate) u32); + +pub(crate) struct SignalData { + client: SignalServiceClient, + fut_sender: UnboundedSender>, + callback_sender: Option>, + remove_callback_sender: Option>, + dc_pinger: Option>, + current_id: SignalConnId, + callback_count: Arc, +} + +impl SignalData { + fn new( + client: SignalServiceClient, + fut_sender: UnboundedSender>, + ) -> Self { + Self { + client, + fut_sender, + callback_sender: Default::default(), + remove_callback_sender: Default::default(), + dc_pinger: Default::default(), + current_id: Default::default(), + callback_count: Default::default(), + } + } +} + +struct ConnectSignalChannels { + callback_sender: UnboundedSender<(SignalConnId, F)>, + dc_pinger: oneshot::Sender<()>, + remove_callback_sender: UnboundedSender, +} + +fn connect_signal( + fut_sender: &UnboundedSender>, + callback_count: Arc, + to_in_stream: T, + mut on_response: O, +) -> ConnectSignalChannels +where + Req: SignalRequest + Send + 'static, + Resp: 'static, + F: Send + 'static, + T: FnOnce(UnboundedReceiverStream) -> Streaming, + O: FnMut(Resp, btree_map::ValuesMut<'_, SignalConnId, F>) + Send + 'static, +{ + let (control_sender, recv) = unbounded_channel::(); + let out_stream = UnboundedReceiverStream::new(recv); + + let mut in_stream = to_in_stream(out_stream); + + let (callback_sender, mut callback_recv) = unbounded_channel::<(SignalConnId, F)>(); + let (remove_callback_sender, mut remove_callback_recv) = unbounded_channel::(); + let (dc_pinger, mut dc_ping_recv) = oneshot::channel::<()>(); + + let signal_future = async move { + let mut callbacks = BTreeMap::::new(); + + control_sender + .send(Req::from_control(StreamControl::Ready)) + .expect("send failed"); + + loop { + let in_stream_next = in_stream.next().fuse(); + pin_mut!(in_stream_next); + let callback_recv_recv = callback_recv.recv().fuse(); + pin_mut!(callback_recv_recv); + let remove_callback_recv_recv = remove_callback_recv.recv().fuse(); + pin_mut!(remove_callback_recv_recv); + let mut dc_ping_recv_fuse = (&mut dc_ping_recv).fuse(); + + futures::select! { + response = in_stream_next => { + let Some(response) = response else { continue }; + + match response { + Ok(response) => { + on_response(response, callbacks.values_mut()); + + control_sender + .send(Req::from_control(StreamControl::Ready)) + .expect("send failed"); + } + Err(status) => eprintln!("Error in recv: {status}"), + } + } + callback = callback_recv_recv => { + if let Some((id, callback)) = callback { + callbacks.insert(id, callback); + callback_count.fetch_add(1, Ordering::SeqCst); + } + } + remove = remove_callback_recv_recv => { + if let Some(id) = remove { + if callbacks.remove(&id).is_some() { + assert!(callback_count.fetch_sub(1, Ordering::SeqCst) > 0); + } + if callbacks.is_empty() { + assert!(callback_count.load(Ordering::SeqCst) == 0); + control_sender.send(Req::from_control(StreamControl::Disconnect)).expect("send failed"); + break; + } + } + } + _dc = dc_ping_recv_fuse => { + println!("dc"); + control_sender.send(Req::from_control(StreamControl::Disconnect)).expect("send failed"); + break; + } + } + } + }; + + fut_sender.send(signal_future.boxed()).expect("send failed"); + + ConnectSignalChannels { + callback_sender, + dc_pinger, + remove_callback_sender, + } +} + +/// A handle that can be used to disconnect from a signal connection. +/// +/// This will remove the connected callback. +pub struct SignalHandle { + id: SignalConnId, + remove_callback_sender: UnboundedSender, +} + +impl SignalHandle { + pub(crate) fn new( + id: SignalConnId, + remove_callback_sender: UnboundedSender, + ) -> Self { + Self { + id, + remove_callback_sender, + } + } + + /// Disconnect this signal connection. + pub fn disconnect(self) { + self.remove_callback_sender + .send(self.id) + .expect("failed to disconnect signal"); + } +} diff --git a/api/rust/src/tag.rs b/api/rust/src/tag.rs index e760896..7aa9806 100644 --- a/api/rust/src/tag.rs +++ b/api/rust/src/tag.rs @@ -34,45 +34,42 @@ use std::{ sync::{Arc, Mutex}, }; -use futures::{channel::mpsc::UnboundedSender, future::BoxFuture, FutureExt}; +use futures::FutureExt; use num_enum::TryFromPrimitive; -use pinnacle_api_defs::pinnacle::{ - output::v0alpha1::output_service_client::OutputServiceClient, - tag::{ - self, - v0alpha1::{ - tag_service_client::TagServiceClient, AddRequest, RemoveRequest, SetActiveRequest, - SetLayoutRequest, SwitchToRequest, - }, +use pinnacle_api_defs::pinnacle::tag::{ + self, + v0alpha1::{ + tag_service_client::TagServiceClient, AddRequest, RemoveRequest, SetActiveRequest, + SetLayoutRequest, SwitchToRequest, }, }; use tonic::transport::Channel; use crate::{ block_on_tokio, - output::{Output, OutputHandle}, + output::OutputHandle, + signal::{SignalHandle, TagSignal}, util::Batch, + OUTPUT, SIGNAL, }; /// A struct that allows you to add and remove tags and get [`TagHandle`]s. #[derive(Clone, Debug)] pub struct Tag { - channel: Channel, - fut_sender: UnboundedSender>, tag_client: TagServiceClient, - output_client: OutputServiceClient, } impl Tag { - pub(crate) fn new( - channel: Channel, - fut_sender: UnboundedSender>, - ) -> Self { + pub(crate) fn new(channel: Channel) -> Self { Self { tag_client: TagServiceClient::new(channel.clone()), - output_client: OutputServiceClient::new(channel.clone()), - channel, - fut_sender, + } + } + + pub(crate) fn new_handle(&self, id: u32) -> TagHandle { + TagHandle { + id, + tag_client: self.tag_client.clone(), } } @@ -93,7 +90,7 @@ impl Tag { &self, output: &OutputHandle, tag_names: impl IntoIterator>, - ) -> impl Iterator { + ) -> Vec { block_on_tokio(self.add_async(output, tag_names)) } @@ -102,9 +99,8 @@ impl Tag { &self, output: &OutputHandle, tag_names: impl IntoIterator>, - ) -> impl Iterator { + ) -> Vec { let mut client = self.tag_client.clone(); - let output_client = self.output_client.clone(); let tag_names = tag_names.into_iter().map(Into::into).collect(); @@ -117,11 +113,11 @@ impl Tag { .unwrap() .into_inner(); - response.tag_ids.into_iter().map(move |id| TagHandle { - tag_client: client.clone(), - output_client: output_client.clone(), - id, - }) + response + .tag_ids + .into_iter() + .map(move |id| self.new_handle(id)) + .collect() } /// Get handles to all tags across all outputs. @@ -131,14 +127,13 @@ impl Tag { /// ``` /// let all_tags = tag.get_all(); /// ``` - pub fn get_all(&self) -> impl Iterator { + pub fn get_all(&self) -> Vec { block_on_tokio(self.get_all_async()) } /// The async version of [`Tag::get_all`]. - pub async fn get_all_async(&self) -> impl Iterator { + pub async fn get_all_async(&self) -> Vec { let mut client = self.tag_client.clone(); - let output_client = self.output_client.clone(); let response = client .get(tag::v0alpha1::GetRequest {}) @@ -146,11 +141,11 @@ impl Tag { .unwrap() .into_inner(); - response.tag_ids.into_iter().map(move |id| TagHandle { - tag_client: client.clone(), - output_client: output_client.clone(), - id, - }) + response + .tag_ids + .into_iter() + .map(move |id| self.new_handle(id)) + .collect() } /// Get a handle to the first tag with the given name on the focused output. @@ -170,7 +165,7 @@ impl Tag { /// The async version of [`Tag::get`]. pub async fn get_async(&self, name: impl Into) -> Option { let name = name.into(); - let output_module = Output::new(self.channel.clone(), self.fut_sender.clone()); + let output_module = OUTPUT.get().expect("OUTPUT doesn't exist"); let focused_output = output_module.get_focused(); if let Some(output) = focused_output { @@ -280,7 +275,7 @@ impl Tag { let layouts_clone = layouts.clone(); let len = layouts.len(); - let output_module = Output::new(self.channel.clone(), self.fut_sender.clone()); + let output_module = OUTPUT.get().expect("OUTPUT doesn't exist"); let output_module_clone = output_module.clone(); let next = move |output: Option<&OutputHandle>| { @@ -343,6 +338,19 @@ impl Tag { next: Box::new(next), } } + + /// Connect to a tag signal. + /// + /// The compositor will fire off signals that your config can listen for and act upon. + /// You can pass in a [`TagSignal`] along with a callback and it will get run + /// with the necessary arguments every time a signal of that type is received. + pub fn connect_signal(&self, signal: TagSignal) -> SignalHandle { + let mut signal_state = block_on_tokio(SIGNAL.get().expect("SIGNAL doesn't exist").write()); + + match signal { + TagSignal::Layout(layout_fn) => signal_state.layout.add_callback(layout_fn), + } + } } /// A layout cycler that keeps track of tags and their layouts and provides functions to cycle @@ -361,8 +369,7 @@ pub struct LayoutCycler { #[derive(Debug, Clone)] pub struct TagHandle { pub(crate) id: u32, - pub(crate) tag_client: TagServiceClient, - pub(crate) output_client: OutputServiceClient, + tag_client: TagServiceClient, } impl PartialEq for TagHandle { @@ -543,7 +550,6 @@ impl TagHandle { /// The async version of [`TagHandle::props`]. pub async fn props_async(&self) -> TagProperties { let mut client = self.tag_client.clone(); - let output_client = self.output_client.clone(); let response = client .get_properties(tag::v0alpha1::GetPropertiesRequest { @@ -553,14 +559,12 @@ impl TagHandle { .unwrap() .into_inner(); + let output = OUTPUT.get().expect("OUTPUT doesn't exist"); + TagProperties { active: response.active, name: response.name, - output: response.output_name.map(|name| OutputHandle { - output_client, - tag_client: client, - name, - }), + output: response.output_name.map(|name| output.new_handle(name)), } } diff --git a/api/rust/src/window.rs b/api/rust/src/window.rs index f9a84bb..6816080 100644 --- a/api/rust/src/window.rs +++ b/api/rust/src/window.rs @@ -15,8 +15,6 @@ use futures::FutureExt; use num_enum::TryFromPrimitive; use pinnacle_api_defs::pinnacle::{ - output::v0alpha1::output_service_client::OutputServiceClient, - tag::v0alpha1::tag_service_client::TagServiceClient, window::v0alpha1::{ window_service_client::WindowServiceClient, AddWindowRuleRequest, CloseRequest, MoveToTagRequest, SetTagRequest, @@ -34,8 +32,10 @@ use tonic::transport::Channel; use crate::{ block_on_tokio, input::MouseButton, + signal::{SignalHandle, WindowSignal}, tag::TagHandle, util::{Batch, Geometry}, + SIGNAL, TAG, }; use self::rules::{WindowRule, WindowRuleCondition}; @@ -48,16 +48,19 @@ pub mod rules; #[derive(Debug, Clone)] pub struct Window { window_client: WindowServiceClient, - tag_client: TagServiceClient, - output_client: OutputServiceClient, } impl Window { pub(crate) fn new(channel: Channel) -> Self { Self { window_client: WindowServiceClient::new(channel.clone()), - tag_client: TagServiceClient::new(channel.clone()), - output_client: OutputServiceClient::new(channel), + } + } + + pub(crate) fn new_handle(&self, id: u32) -> WindowHandle { + WindowHandle { + id, + window_client: self.window_client.clone(), } } @@ -126,8 +129,6 @@ impl Window { /// The async version of [`Window::get_all`]. pub async fn get_all_async(&self) -> impl Iterator { let mut client = self.window_client.clone(); - let tag_client = self.tag_client.clone(); - let output_client = self.output_client.clone(); client .get(GetRequest {}) .await @@ -135,12 +136,10 @@ impl Window { .into_inner() .window_ids .into_iter() - .map(move |id| WindowHandle { - window_client: client.clone(), - id, - tag_client: tag_client.clone(), - output_client: output_client.clone(), - }) + .map(move |id| self.new_handle(id)) + .collect::>() + // TODO: consider changing return type to Vec to avoid this into_iter + .into_iter() } /// Get the currently focused window. @@ -177,6 +176,20 @@ impl Window { })) .unwrap(); } + + /// Connect to a window signal. + /// + /// The compositor will fire off signals that your config can listen for and act upon. + /// You can pass in a [`WindowSignal`] along with a callback and it will get run + /// with the necessary arguments every time a signal of that type is received. + pub fn connect_signal(&self, signal: WindowSignal) -> SignalHandle { + let mut signal_state = block_on_tokio(SIGNAL.get().expect("SIGNAL doesn't exist").write()); + + match signal { + WindowSignal::PointerEnter(f) => signal_state.window_pointer_enter.add_callback(f), + WindowSignal::PointerLeave(f) => signal_state.window_pointer_leave.add_callback(f), + } + } } /// A handle to a window. @@ -186,8 +199,6 @@ impl Window { pub struct WindowHandle { id: u32, window_client: WindowServiceClient, - tag_client: TagServiceClient, - output_client: OutputServiceClient, } impl PartialEq for WindowHandle { @@ -476,7 +487,6 @@ impl WindowHandle { /// The async version of [`props`][Self::props]. pub async fn props_async(&self) -> WindowProperties { let mut client = self.window_client.clone(); - let tag_client = self.tag_client.clone(); let response = match client .get_properties(window::v0alpha1::GetPropertiesRequest { @@ -504,6 +514,8 @@ impl WindowHandle { height: geo.height() as u32, }); + let tag = TAG.get().expect("TAG doesn't exist"); + WindowProperties { geometry, class: response.class, @@ -514,11 +526,7 @@ impl WindowHandle { tags: response .tag_ids .into_iter() - .map(|id| TagHandle { - tag_client: tag_client.clone(), - output_client: self.output_client.clone(), - id, - }) + .map(|id| tag.new_handle(id)) .collect(), } } diff --git a/pinnacle-api-defs/src/lib.rs b/pinnacle-api-defs/src/lib.rs index 4f1b000..67c01f1 100644 --- a/pinnacle-api-defs/src/lib.rs +++ b/pinnacle-api-defs/src/lib.rs @@ -37,6 +37,36 @@ pub mod pinnacle { pub mod signal { pub mod v0alpha1 { tonic::include_proto!("pinnacle.signal.v0alpha1"); + + pub trait SignalRequest { + fn from_control(control: StreamControl) -> Self; + fn control(&self) -> StreamControl; + } + + macro_rules! impl_signal_request { + ( $( $request:ident ),* ) => { + $( + impl SignalRequest for $request { + fn from_control(control: StreamControl) -> Self { + $request { + control: Some(control as i32), + } + } + + fn control(&self) -> StreamControl { + self.control() + } + } + )* + }; + } + + impl_signal_request!( + OutputConnectRequest, + LayoutRequest, + WindowPointerEnterRequest, + WindowPointerLeaveRequest + ); } } } diff --git a/src/api/signal.rs b/src/api/signal.rs index 46beaf8..0d7c9a2 100644 --- a/src/api/signal.rs +++ b/src/api/signal.rs @@ -2,8 +2,8 @@ use std::collections::VecDeque; use pinnacle_api_defs::pinnacle::signal::v0alpha1::{ signal_service_server, LayoutRequest, LayoutResponse, OutputConnectRequest, - OutputConnectResponse, StreamControl, WindowPointerEnterRequest, WindowPointerEnterResponse, - WindowPointerLeaveRequest, WindowPointerLeaveResponse, + OutputConnectResponse, SignalRequest, StreamControl, WindowPointerEnterRequest, + WindowPointerEnterResponse, WindowPointerLeaveRequest, WindowPointerLeaveResponse, }; use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; use tonic::{Request, Response, Status, Streaming}; @@ -118,29 +118,6 @@ impl> SignalData { } } -trait SignalRequest { - fn control(&self) -> StreamControl; -} - -macro_rules! impl_signal_request { - ( $( $request:ident ),* ) => { - $( - impl SignalRequest for $request { - fn control(&self) -> StreamControl { - self.control() - } - } - )* - }; -} - -impl_signal_request!( - OutputConnectRequest, - LayoutRequest, - WindowPointerEnterRequest, - WindowPointerLeaveRequest -); - fn start_signal_stream( sender: StateFnSender, in_stream: Streaming, From 2427ba620ef841ee0631553060ce0562929c2faa Mon Sep 17 00:00:00 2001 From: Ottatop Date: Fri, 23 Feb 2024 16:51:56 -0600 Subject: [PATCH 07/11] Yield after running signal callbacks --- api/rust/src/signal.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/rust/src/signal.rs b/api/rust/src/signal.rs index 24e7630..00e53c1 100644 --- a/api/rust/src/signal.rs +++ b/api/rust/src/signal.rs @@ -263,7 +263,7 @@ fn connect_signal( ) -> ConnectSignalChannels where Req: SignalRequest + Send + 'static, - Resp: 'static, + Resp: Send + 'static, F: Send + 'static, T: FnOnce(UnboundedReceiverStream) -> Streaming, O: FnMut(Resp, btree_map::ValuesMut<'_, SignalConnId, F>) + Send + 'static, @@ -300,6 +300,7 @@ where match response { Ok(response) => { on_response(response, callbacks.values_mut()); + tokio::task::yield_now().await; control_sender .send(Req::from_control(StreamControl::Ready)) From 1abc17b5b41a9162ef93f4e4b7443d6b1e4174d1 Mon Sep 17 00:00:00 2001 From: Ottatop Date: Fri, 23 Feb 2024 17:15:55 -0600 Subject: [PATCH 08/11] Add docs --- api/rust/src/signal.rs | 27 ++++++++++++++++++++++++--- api/rust/src/window.rs | 8 +++----- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/api/rust/src/signal.rs b/api/rust/src/signal.rs index 00e53c1..dff4035 100644 --- a/api/rust/src/signal.rs +++ b/api/rust/src/signal.rs @@ -1,4 +1,10 @@ -//! Signals TODO: +//! Compositor signals. +//! +//! Your config can connect to various compositor signals that allow you to, for example, do +//! something when an output is connected or when the pointer enters a window. +//! +//! Some of the other modules have a `connect_signal` method that will allow you to pass in +//! callbacks to run on each signal. Use them to connect to the signals defined here. use std::{ collections::{btree_map, BTreeMap}, @@ -89,7 +95,7 @@ macro_rules! signals { self.callback_count.clone(), |out| { block_on_tokio(self.client.$req(out)) - .expect("TODO") + .expect("failed to request signal connection") .into_inner() }, $on_resp, @@ -114,6 +120,11 @@ signals! { /// Signals relating to tag events. TagSignal => { /// The compositor requested that the given windows be laid out. + /// + /// Callbacks receive the tag that is being laid out and the windows being laid out. + /// + /// Note: if multiple tags are active, only the first will be received, but all windows on those + /// active tags will be received. Layout = { enum_name = Layout, callback_type = LayoutFn, @@ -140,6 +151,11 @@ signals! { /// Signals relating to output events. OutputSignal => { /// An output was connected. + /// + /// Callbacks receive the newly connected output. + /// + /// FIXME: This will not run on outputs that have been previously connected. + /// | Tell the dev to fix this in the compositor. OutputConnect = { enum_name = Connect, callback_type = SingleOutputFn, @@ -159,6 +175,8 @@ signals! { /// Signals relating to window events. WindowSignal => { /// The pointer entered a window. + /// + /// Callbacks receive the window the pointer entered. WindowPointerEnter = { enum_name = PointerEnter, callback_type = SingleWindowFn, @@ -175,6 +193,8 @@ signals! { }, } /// The pointer left a window. + /// + /// Callbacks receive the window the pointer left. WindowPointerLeave = { enum_name = PointerLeave, callback_type = SingleWindowFn, @@ -300,11 +320,12 @@ where match response { Ok(response) => { on_response(response, callbacks.values_mut()); - tokio::task::yield_now().await; control_sender .send(Req::from_control(StreamControl::Ready)) .expect("send failed"); + + tokio::task::yield_now().await; } Err(status) => eprintln!("Error in recv: {status}"), } diff --git a/api/rust/src/window.rs b/api/rust/src/window.rs index 6816080..910f55d 100644 --- a/api/rust/src/window.rs +++ b/api/rust/src/window.rs @@ -122,12 +122,12 @@ impl Window { /// ``` /// let windows = window.get_all(); /// ``` - pub fn get_all(&self) -> impl Iterator { + pub fn get_all(&self) -> Vec { block_on_tokio(self.get_all_async()) } /// The async version of [`Window::get_all`]. - pub async fn get_all_async(&self) -> impl Iterator { + pub async fn get_all_async(&self) -> Vec { let mut client = self.window_client.clone(); client .get(GetRequest {}) @@ -138,8 +138,6 @@ impl Window { .into_iter() .map(move |id| self.new_handle(id)) .collect::>() - // TODO: consider changing return type to Vec to avoid this into_iter - .into_iter() } /// Get the currently focused window. @@ -166,7 +164,7 @@ impl Window { /// A window rule is a set of criteria that a window must open with. /// For it to apply, a [`WindowRuleCondition`] must evaluate to true for the window in question. /// - /// TODO: + /// See the [`rules`] module for more information. pub fn add_window_rule(&self, cond: WindowRuleCondition, rule: WindowRule) { let mut client = self.window_client.clone(); From d1e7c92774310afd593536c5f4a5aa284bb65767 Mon Sep 17 00:00:00 2001 From: Ottatop Date: Fri, 23 Feb 2024 17:36:55 -0600 Subject: [PATCH 09/11] Add Lua docs --- api/lua/pinnacle/grpc/client.lua | 2 +- api/lua/pinnacle/grpc/protobuf.lua | 1 + api/lua/pinnacle/output.lua | 28 ++++++++++++++++++++++++---- api/lua/pinnacle/signal.lua | 28 ++++++++++++++++++++++++++++ api/lua/pinnacle/tag.lua | 30 +++++++++++++++++++++++++----- api/lua/pinnacle/window.lua | 30 +++++++++++++++++++++++++----- 6 files changed, 104 insertions(+), 15 deletions(-) diff --git a/api/lua/pinnacle/grpc/client.lua b/api/lua/pinnacle/grpc/client.lua index 5382fd7..14f79bb 100644 --- a/api/lua/pinnacle/grpc/client.lua +++ b/api/lua/pinnacle/grpc/client.lua @@ -8,7 +8,6 @@ local h2_connection = require("http.h2_connection") local protobuf = require("pinnacle.grpc.protobuf") local pb = require("pb") ----@nodoc ---Create appropriate headers for a gRPC request. ---@param service string The desired service ---@param method string The desired method within the service @@ -160,6 +159,7 @@ function client.server_streaming_request(grpc_request_params, callback) end) end +---@nodoc ---@param grpc_request_params GrpcRequestParams ---@param callback fun(response: table) --- diff --git a/api/lua/pinnacle/grpc/protobuf.lua b/api/lua/pinnacle/grpc/protobuf.lua index 7c2c359..456dbf5 100644 --- a/api/lua/pinnacle/grpc/protobuf.lua +++ b/api/lua/pinnacle/grpc/protobuf.lua @@ -39,6 +39,7 @@ function protobuf.build_protos() pb.option("enum_as_value") end +---@nodoc ---Encode the given `data` as the protobuf `type`. ---@param type string The absolute protobuf type ---@param data table The table of data, conforming to its protobuf definition diff --git a/api/lua/pinnacle/output.lua b/api/lua/pinnacle/output.lua index 2e58cf6..409397f 100644 --- a/api/lua/pinnacle/output.lua +++ b/api/lua/pinnacle/output.lua @@ -168,11 +168,31 @@ local signal_name_to_SignalName = { connect = "OutputConnect", } ----@class OutputSignal ----@field connect fun(output: OutputHandle)? +---@class OutputSignal Signals related to output events. +---@field connect fun(output: OutputHandle)? An output was connected. FIXME: This currently does not fire for outputs that have been previously connected and disconnected. ----@param signals OutputSignal ----@return SignalHandles +---Connect to an output signal. +--- +---The compositor sends signals about various events. Use this function to run a callback when +---some output signal occurs. +--- +---This function returns a table of signal handles with each handle stored at the same key used +---to connect to the signal. See `SignalHandles` for more information. +--- +---# Example +---```lua +---Output.connect_signal({ +--- connect = function(output) +--- print("New output connected:", output.name) +--- end +---}) +---``` +--- +---@param signals OutputSignal The signal you want to connect to +--- +---@return SignalHandles signal_handles Handles to every signal you connected to wrapped in a table, with keys being the same as the connected signal. +--- +---@see SignalHandles.disconnect_all - To disconnect from these signals function output.connect_signal(signals) ---@diagnostic disable-next-line: invisible local handles = require("pinnacle.signal").handles.new({}) diff --git a/api/lua/pinnacle/signal.lua b/api/lua/pinnacle/signal.lua index e0985cb..ea62015 100644 --- a/api/lua/pinnacle/signal.lua +++ b/api/lua/pinnacle/signal.lua @@ -49,37 +49,51 @@ local stream_control = { DISCONNECT = 2, } +-- TODO: rewrite ldoc_gen so you don't have to stick @nodoc everywhere + ---@type table local signals = { OutputConnect = { + ---@nodoc ---@type H2Stream? sender = nil, + ---@nodoc ---@type (fun(output: OutputHandle))[] callbacks = {}, + ---@nodoc ---@type fun(response: table) on_response = nil, }, Layout = { + ---@nodoc ---@type H2Stream? sender = nil, + ---@nodoc ---@type (fun(tag: TagHandle, windows: WindowHandle[]))[] callbacks = {}, + ---@nodoc ---@type fun(response: table) on_response = nil, }, WindowPointerEnter = { + ---@nodoc ---@type H2Stream? sender = nil, + ---@nodoc ---@type (fun(window: WindowHandle))[] callbacks = {}, + ---@nodoc ---@type fun(response: table) on_response = nil, }, WindowPointerLeave = { + ---@nodoc ---@type H2Stream? sender = nil, + ---@nodoc ---@type (fun(window: WindowHandle))[] callbacks = {}, + ---@nodoc ---@type fun(response: table) on_response = nil, }, @@ -124,20 +138,26 @@ end ----------------------------------------------------------------------------- +---@nodoc ---@class SignalHandleModule local signal_handle = {} +---A handle to a connected signal that can be used to disconnect the provided callback. +--- ---@class SignalHandle ---@field signal SignalServiceMethod ---@field callback function The callback you connected local SignalHandle = {} +---@nodoc ---@class SignalHandlesModule local signal_handles = {} +---@nodoc ---@class SignalHandles local SignalHandles = {} +---@nodoc ---@class Signal ---@field private handle SignalHandleModule ---@field private handles SignalHandlesModule @@ -145,6 +165,7 @@ local signal = {} signal.handle = signal_handle signal.handles = signal_handles +---@nodoc function SignalHandle:disconnect() local cb_index = nil for i, cb in ipairs(signals[self.signal].callbacks) do @@ -163,6 +184,7 @@ function SignalHandle:disconnect() end end +---@nodoc ---@return SignalHandle function signal_handle.new(request, callback) ---@type SignalHandle @@ -174,6 +196,8 @@ function signal_handle.new(request, callback) return self end +---Disconnect the callbacks from all the signal connections that are stored in this handle collection. +--- ---@param self table function SignalHandles:disconnect_all() for _, sig in pairs(self) do @@ -181,6 +205,7 @@ function SignalHandles:disconnect_all() end end +---@nodoc ---@param signal_hdls table ---@return SignalHandles function signal_handles.new(signal_hdls) @@ -190,6 +215,7 @@ function signal_handles.new(signal_hdls) return self end +---@nodoc ---@param request SignalServiceMethod ---@param callback function function signal.add_callback(request, callback) @@ -200,6 +226,7 @@ function signal.add_callback(request, callback) table.insert(signals[request].callbacks, callback) end +---@nodoc ---@param request SignalServiceMethod ---@param callback fun(response: table) function signal.connect(request, callback) @@ -228,6 +255,7 @@ function signal.connect(request, callback) signals[request].sender = stream end +---@nodoc ---This should only be called when call callbacks for the signal are removed ---@param request SignalServiceMethod function signal.disconnect(request) diff --git a/api/lua/pinnacle/tag.lua b/api/lua/pinnacle/tag.lua index fb4efbd..e707eed 100644 --- a/api/lua/pinnacle/tag.lua +++ b/api/lua/pinnacle/tag.lua @@ -234,8 +234,8 @@ end ---layout_cycler.next() -- Layout is now "dwindle" ---layout_cycler.next() -- Layout is now "corner_top_left" ---layout_cycler.next() -- Layout is now "corner_top_right" +---layout_cycler.next() -- Layout is now "master_stack" ---layout_cycler.next() -- Layout is now "dwindle" ----layout_cycler.next() -- Layout is now "corner_top_right" --- --- -- Cycling on another output ---layout_cycler.next(Output.get_by_name("eDP-1")) @@ -323,11 +323,31 @@ local signal_name_to_SignalName = { layout = "Layout", } ----@class TagSignal ----@field layout fun(tag: TagHandle, windows: WindowHandle[])? +---@class TagSignal Signals related to tag events. +---@field layout fun(tag: TagHandle, windows: WindowHandle[])? The compositor requested a layout of the given tiled windows. You'll also receive the first active tag. ----@param signals TagSignal ----@return SignalHandles +---Connect to a tag signal. +--- +---The compositor sends signals about various events. Use this function to run a callback when +---some tag signal occurs. +--- +---This function returns a table of signal handles with each handle stored at the same key used +---to connect to the signal. See `SignalHandles` for more information. +--- +---# Example +---```lua +---Tag.connect_signal({ +--- layout = function(tag, windows) +--- print("Compositor requested a layout") +--- end +---}) +---``` +--- +---@param signals TagSignal The signal you want to connect to +--- +---@return SignalHandles signal_handles Handles to every signal you connected to wrapped in a table, with keys being the same as the connected signal. +--- +---@see SignalHandles.disconnect_all - To disconnect from these signals function tag.connect_signal(signals) ---@diagnostic disable-next-line: invisible local handles = require("pinnacle.signal").handles.new({}) diff --git a/api/lua/pinnacle/window.lua b/api/lua/pinnacle/window.lua index 266cf78..0cf5fff 100644 --- a/api/lua/pinnacle/window.lua +++ b/api/lua/pinnacle/window.lua @@ -316,12 +316,32 @@ local signal_name_to_SignalName = { pointer_leave = "WindowPointerLeave", } ----@class WindowSignal ----@field pointer_enter fun(window: WindowHandle)? ----@field pointer_leave fun(window: WindowHandle)? +---@class WindowSignal Signals related to compositor events. +---@field pointer_enter fun(window: WindowHandle)? The pointer entered a window. +---@field pointer_leave fun(window: WindowHandle)? The pointer left a window. ----@param signals WindowSignal ----@return SignalHandles +---Connect to a window signal. +--- +---The compositor sends signals about various events. Use this function to run a callback when +---some window signal occurs. +--- +---This function returns a table of signal handles with each handle stored at the same key used +---to connect to the signal. See `SignalHandles` for more information. +--- +---# Example +---```lua +---Window.connect_signal({ +--- pointer_enter = function(window) +--- print("Pointer entered", window:class()) +--- end +---}) +---``` +--- +---@param signals WindowSignal The signal you want to connect to +--- +---@return SignalHandles signal_handles Handles to every signal you connected to wrapped in a table, with keys being the same as the connected signal. +--- +---@see SignalHandles.disconnect_all - To disconnect from these signals function window.connect_signal(signals) ---@diagnostic disable-next-line: invisible local handles = require("pinnacle.signal").handles.new({}) From 3959a870199e613d9a2ee8b3d4975714fbd15840 Mon Sep 17 00:00:00 2001 From: Ottatop Date: Fri, 23 Feb 2024 17:49:09 -0600 Subject: [PATCH 10/11] Fixup Lua docs --- api/lua/pinnacle/grpc/client.lua | 1 + api/lua/pinnacle/signal.lua | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/api/lua/pinnacle/grpc/client.lua b/api/lua/pinnacle/grpc/client.lua index 14f79bb..b700913 100644 --- a/api/lua/pinnacle/grpc/client.lua +++ b/api/lua/pinnacle/grpc/client.lua @@ -8,6 +8,7 @@ local h2_connection = require("http.h2_connection") local protobuf = require("pinnacle.grpc.protobuf") local pb = require("pb") +---@nodoc ---Create appropriate headers for a gRPC request. ---@param service string The desired service ---@param method string The desired method within the service diff --git a/api/lua/pinnacle/signal.lua b/api/lua/pinnacle/signal.lua index ea62015..1cef10d 100644 --- a/api/lua/pinnacle/signal.lua +++ b/api/lua/pinnacle/signal.lua @@ -51,6 +51,7 @@ local stream_control = { -- TODO: rewrite ldoc_gen so you don't have to stick @nodoc everywhere +---@nodoc ---@type table local signals = { OutputConnect = { @@ -142,19 +143,20 @@ end ---@class SignalHandleModule local signal_handle = {} +---@classmod ---A handle to a connected signal that can be used to disconnect the provided callback. --- ---@class SignalHandle ----@field signal SignalServiceMethod ----@field callback function The callback you connected +---@field private signal SignalServiceMethod +---@field private callback function The callback you connected local SignalHandle = {} ---@nodoc ---@class SignalHandlesModule local signal_handles = {} ----@nodoc ----@class SignalHandles +---@classmod +---@class SignalHandles A collection of `SignalHandle`s retreived through a `connect_signal` function. local SignalHandles = {} ---@nodoc @@ -165,7 +167,7 @@ local signal = {} signal.handle = signal_handle signal.handles = signal_handles ----@nodoc +---Disconnect the provided callback from this signal. function SignalHandle:disconnect() local cb_index = nil for i, cb in ipairs(signals[self.signal].callbacks) do From b7c60a9e79e791581da6e12de079284dad31a76a Mon Sep 17 00:00:00 2001 From: Ottatop Date: Fri, 23 Feb 2024 17:54:53 -0600 Subject: [PATCH 11/11] Fixup Lua docs again --- api/lua/pinnacle/signal.lua | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/api/lua/pinnacle/signal.lua b/api/lua/pinnacle/signal.lua index 1cef10d..550cb63 100644 --- a/api/lua/pinnacle/signal.lua +++ b/api/lua/pinnacle/signal.lua @@ -155,8 +155,9 @@ local SignalHandle = {} ---@class SignalHandlesModule local signal_handles = {} +---A collection of `SignalHandle`s retreived through a `connect_signal` function. ---@classmod ----@class SignalHandles A collection of `SignalHandle`s retreived through a `connect_signal` function. +---@class SignalHandles local SignalHandles = {} ---@nodoc