mirror of
https://github.com/pinnacle-comp/pinnacle.git
synced 2025-01-30 20:34:49 +01:00
Scaffold signals
This commit is contained in:
parent
09e20e3a30
commit
563bf7d92a
8 changed files with 309 additions and 5 deletions
|
@ -162,4 +162,10 @@ function client.server_streaming_request(grpc_request_params, callback)
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
---@param grpc_request_params GrpcRequestParams
|
||||||
|
---@param callback fun(response: table)
|
||||||
|
---
|
||||||
|
---@return H2Stream
|
||||||
|
function client.bidirectional_streaming_request(grpc_request_params, callback) end
|
||||||
|
|
||||||
return client
|
return client
|
||||||
|
|
53
api/protocol/pinnacle/signal/v0alpha1/signal.proto
Normal file
53
api/protocol/pinnacle/signal/v0alpha1/signal.proto
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
syntax = "proto2";
|
||||||
|
|
||||||
|
package pinnacle.signal.v0alpha1;
|
||||||
|
|
||||||
|
import "google/protobuf/empty.proto";
|
||||||
|
|
||||||
|
enum StreamControl {
|
||||||
|
STREAM_CONTROL_UNSPECIFIED = 0;
|
||||||
|
// The client is ready to receive the next signal.
|
||||||
|
STREAM_CONTROL_READY = 1;
|
||||||
|
// The client wishes to disconnect a signal connection.
|
||||||
|
STREAM_CONTROL_DISCONNECT = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message OutputConnectRequest {
|
||||||
|
optional StreamControl control = 1;
|
||||||
|
}
|
||||||
|
message OutputConnectResponse {
|
||||||
|
optional string output_name = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message LayoutRequest {
|
||||||
|
optional StreamControl control = 1;
|
||||||
|
}
|
||||||
|
message LayoutResponse {
|
||||||
|
// The windows that need to be laid out.
|
||||||
|
repeated uint32 window_ids = 1;
|
||||||
|
// The tag that is being laid out.
|
||||||
|
optional uint32 tag_id = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message WindowPointerEnterRequest {
|
||||||
|
optional StreamControl control = 1;
|
||||||
|
}
|
||||||
|
message WindowPointerEnterResponse {
|
||||||
|
// The window that the pointer entered.
|
||||||
|
optional uint32 window_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message WindowPointerLeaveRequest {
|
||||||
|
optional StreamControl control = 1;
|
||||||
|
}
|
||||||
|
message WindowPointerLeaveResponse {
|
||||||
|
// The window that the pointer left.
|
||||||
|
optional uint32 window_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
service SignalService {
|
||||||
|
rpc OutputConnect(stream OutputConnectRequest) returns (stream OutputConnectResponse);
|
||||||
|
rpc Layout(stream LayoutRequest) returns (stream LayoutResponse);
|
||||||
|
rpc WindowPointerEnter(stream WindowPointerEnterRequest) returns (stream WindowPointerEnterResponse);
|
||||||
|
rpc WindowPointerLeave(stream WindowPointerLeaveRequest) returns (stream WindowPointerLeaveResponse);
|
||||||
|
}
|
|
@ -13,6 +13,7 @@ fn main() {
|
||||||
formatcp!("../api/protocol/pinnacle/process/{VERSION}/process.proto"),
|
formatcp!("../api/protocol/pinnacle/process/{VERSION}/process.proto"),
|
||||||
formatcp!("../api/protocol/pinnacle/tag/{VERSION}/tag.proto"),
|
formatcp!("../api/protocol/pinnacle/tag/{VERSION}/tag.proto"),
|
||||||
formatcp!("../api/protocol/pinnacle/window/{VERSION}/window.proto"),
|
formatcp!("../api/protocol/pinnacle/window/{VERSION}/window.proto"),
|
||||||
|
formatcp!("../api/protocol/pinnacle/signal/{VERSION}/signal.proto"),
|
||||||
];
|
];
|
||||||
|
|
||||||
let descriptor_path = PathBuf::from(std::env::var("OUT_DIR").unwrap()).join("pinnacle.bin");
|
let descriptor_path = PathBuf::from(std::env::var("OUT_DIR").unwrap()).join("pinnacle.bin");
|
||||||
|
|
|
@ -33,6 +33,12 @@ pub mod pinnacle {
|
||||||
tonic::include_proto!("pinnacle.process.v0alpha1");
|
tonic::include_proto!("pinnacle.process.v0alpha1");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub mod signal {
|
||||||
|
pub mod v0alpha1 {
|
||||||
|
tonic::include_proto!("pinnacle.signal.v0alpha1");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("pinnacle");
|
pub const FILE_DESCRIPTOR_SET: &[u8] = tonic::include_file_descriptor_set!("pinnacle");
|
||||||
|
|
46
src/api.rs
46
src/api.rs
|
@ -1,3 +1,5 @@
|
||||||
|
pub mod signal;
|
||||||
|
|
||||||
use std::{ffi::OsString, num::NonZeroU32, pin::Pin, process::Stdio};
|
use std::{ffi::OsString, num::NonZeroU32, pin::Pin, process::Stdio};
|
||||||
|
|
||||||
use pinnacle_api_defs::pinnacle::{
|
use pinnacle_api_defs::pinnacle::{
|
||||||
|
@ -45,8 +47,8 @@ use tokio::{
|
||||||
io::AsyncBufReadExt,
|
io::AsyncBufReadExt,
|
||||||
sync::mpsc::{unbounded_channel, UnboundedSender},
|
sync::mpsc::{unbounded_channel, UnboundedSender},
|
||||||
};
|
};
|
||||||
use tokio_stream::Stream;
|
use tokio_stream::{Stream, StreamExt};
|
||||||
use tonic::{Request, Response, Status};
|
use tonic::{Request, Response, Status, Streaming};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::ConnectorSavedState,
|
config::ConnectorSavedState,
|
||||||
|
@ -58,6 +60,8 @@ use crate::{
|
||||||
window::{window_state::WindowId, WindowElement},
|
window::{window_state::WindowId, WindowElement},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use self::signal::SignalData;
|
||||||
|
|
||||||
type ResponseStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
|
type ResponseStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
|
||||||
pub type StateFnSender = calloop::channel::Sender<Box<dyn FnOnce(&mut State) + Send>>;
|
pub type StateFnSender = calloop::channel::Sender<Box<dyn FnOnce(&mut State) + Send>>;
|
||||||
|
|
||||||
|
@ -122,6 +126,44 @@ where
|
||||||
Ok(Response::new(Box::pin(receiver_stream)))
|
Ok(Response::new(Box::pin(receiver_stream)))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn run_bidirectional_streaming<F1, F2, I, O>(
|
||||||
|
fn_sender: StateFnSender,
|
||||||
|
mut in_stream: Streaming<I>,
|
||||||
|
with_client_item: F1,
|
||||||
|
with_out_stream: F2,
|
||||||
|
) -> Result<Response<ResponseStream<O>>, Status>
|
||||||
|
where
|
||||||
|
F1: Fn(&mut State, Result<I, Status>) + Clone + Send + 'static,
|
||||||
|
F2: FnOnce(&mut State, UnboundedSender<Result<O, Status>>) + Send + 'static,
|
||||||
|
I: Send + 'static,
|
||||||
|
O: Send + 'static,
|
||||||
|
{
|
||||||
|
let (sender, receiver) = unbounded_channel::<Result<O, Status>>();
|
||||||
|
|
||||||
|
let with_out_stream = Box::new(|state: &mut State| {
|
||||||
|
with_out_stream(state, sender);
|
||||||
|
});
|
||||||
|
|
||||||
|
fn_sender
|
||||||
|
.send(with_out_stream)
|
||||||
|
.map_err(|_| Status::internal("failed to execute request"))?;
|
||||||
|
|
||||||
|
let with_in_stream = async move {
|
||||||
|
while let Some(t) = in_stream.next().await {
|
||||||
|
let with_client_item = with_client_item.clone();
|
||||||
|
// TODO: handle error
|
||||||
|
let _ = fn_sender.send(Box::new(move |state: &mut State| {
|
||||||
|
with_client_item(state, t);
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::spawn(with_in_stream);
|
||||||
|
|
||||||
|
let receiver_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(receiver);
|
||||||
|
Ok(Response::new(Box::pin(receiver_stream)))
|
||||||
|
}
|
||||||
|
|
||||||
pub struct PinnacleService {
|
pub struct PinnacleService {
|
||||||
sender: StateFnSender,
|
sender: StateFnSender,
|
||||||
}
|
}
|
||||||
|
|
188
src/api/signal.rs
Normal file
188
src/api/signal.rs
Normal file
|
@ -0,0 +1,188 @@
|
||||||
|
use pinnacle_api_defs::pinnacle::signal::v0alpha1::{
|
||||||
|
signal_service_server, LayoutRequest, LayoutResponse, OutputConnectRequest,
|
||||||
|
OutputConnectResponse, StreamControl, WindowPointerEnterRequest, WindowPointerEnterResponse,
|
||||||
|
WindowPointerLeaveRequest, WindowPointerLeaveResponse,
|
||||||
|
};
|
||||||
|
use tokio::sync::mpsc::UnboundedSender;
|
||||||
|
use tonic::{Request, Response, Status, Streaming};
|
||||||
|
|
||||||
|
use crate::state::State;
|
||||||
|
|
||||||
|
use super::{run_bidirectional_streaming, ResponseStream, StateFnSender};
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct SignalState {
|
||||||
|
pub output_connect: SignalData<OutputConnectResponse>,
|
||||||
|
pub layout: SignalData<LayoutResponse>,
|
||||||
|
pub window_pointer_enter: SignalData<WindowPointerEnterResponse>,
|
||||||
|
pub window_pointer_leave: SignalData<WindowPointerLeaveResponse>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Default)]
|
||||||
|
pub struct SignalData<T> {
|
||||||
|
sender: Option<UnboundedSender<Result<T, Status>>>,
|
||||||
|
ready: bool,
|
||||||
|
value: Option<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> SignalData<T> {
|
||||||
|
pub fn signal(&mut self, with_data: impl FnOnce(Option<T>) -> T) {
|
||||||
|
let Some(sender) = self.sender.as_ref() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if self.ready {
|
||||||
|
sender
|
||||||
|
.send(Ok(with_data(self.value.take())))
|
||||||
|
.expect("failed to send signal");
|
||||||
|
self.ready = false;
|
||||||
|
} else {
|
||||||
|
self.value = Some(with_data(self.value.take()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn connect(&mut self, sender: UnboundedSender<Result<T, Status>>) {
|
||||||
|
self.sender.replace(sender);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disconnect(&mut self) {
|
||||||
|
self.sender.take();
|
||||||
|
self.ready = false;
|
||||||
|
self.value.take();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ready(&mut self) {
|
||||||
|
let Some(sender) = self.sender.as_ref() else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some(value) = self.value.take() {
|
||||||
|
sender.send(Ok(value)).expect("failed to send signal");
|
||||||
|
self.ready = false;
|
||||||
|
} else {
|
||||||
|
self.ready = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
trait SignalRequest {
|
||||||
|
fn control(&self) -> StreamControl;
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! impl_signal_request {
|
||||||
|
( $( $request:ident ),* ) => {
|
||||||
|
$(
|
||||||
|
impl SignalRequest for $request {
|
||||||
|
fn control(&self) -> StreamControl {
|
||||||
|
self.control()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)*
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
impl_signal_request!(
|
||||||
|
OutputConnectRequest,
|
||||||
|
LayoutRequest,
|
||||||
|
WindowPointerEnterRequest,
|
||||||
|
WindowPointerLeaveRequest
|
||||||
|
);
|
||||||
|
|
||||||
|
fn start_signal_stream<I: SignalRequest, O>(
|
||||||
|
sender: StateFnSender,
|
||||||
|
in_stream: Streaming<I>,
|
||||||
|
signal: impl Fn(&mut State) -> &mut SignalData<O> + Clone + Send + 'static,
|
||||||
|
) -> Result<Response<ResponseStream<O>>, Status>
|
||||||
|
where
|
||||||
|
I: Send + 'static,
|
||||||
|
O: Send + 'static,
|
||||||
|
{
|
||||||
|
let signal_clone = signal.clone();
|
||||||
|
|
||||||
|
run_bidirectional_streaming(
|
||||||
|
sender,
|
||||||
|
in_stream,
|
||||||
|
move |state, request| {
|
||||||
|
let request = match request {
|
||||||
|
Ok(request) => request,
|
||||||
|
Err(status) => {
|
||||||
|
tracing::error!("Error in output_connect signal in stream: {status}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let signal = signal(state);
|
||||||
|
match request.control() {
|
||||||
|
StreamControl::Ready => signal.ready(),
|
||||||
|
StreamControl::Disconnect => signal.disconnect(),
|
||||||
|
StreamControl::Unspecified => tracing::warn!("Received unspecified stream control"),
|
||||||
|
}
|
||||||
|
},
|
||||||
|
move |state, sender| {
|
||||||
|
let signal = signal_clone(state);
|
||||||
|
signal.connect(sender);
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct SignalService {
|
||||||
|
sender: StateFnSender,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SignalService {
|
||||||
|
pub fn new(sender: StateFnSender) -> Self {
|
||||||
|
Self { sender }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tonic::async_trait]
|
||||||
|
impl signal_service_server::SignalService for SignalService {
|
||||||
|
type OutputConnectStream = ResponseStream<OutputConnectResponse>;
|
||||||
|
type LayoutStream = ResponseStream<LayoutResponse>;
|
||||||
|
type WindowPointerEnterStream = ResponseStream<WindowPointerEnterResponse>;
|
||||||
|
type WindowPointerLeaveStream = ResponseStream<WindowPointerLeaveResponse>;
|
||||||
|
|
||||||
|
async fn output_connect(
|
||||||
|
&self,
|
||||||
|
request: Request<Streaming<OutputConnectRequest>>,
|
||||||
|
) -> Result<Response<Self::OutputConnectStream>, Status> {
|
||||||
|
let in_stream = request.into_inner();
|
||||||
|
|
||||||
|
start_signal_stream(self.sender.clone(), in_stream, |state| {
|
||||||
|
&mut state.signal_state.output_connect
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn layout(
|
||||||
|
&self,
|
||||||
|
request: Request<Streaming<LayoutRequest>>,
|
||||||
|
) -> Result<Response<Self::LayoutStream>, Status> {
|
||||||
|
let in_stream = request.into_inner();
|
||||||
|
|
||||||
|
start_signal_stream(self.sender.clone(), in_stream, |state| {
|
||||||
|
&mut state.signal_state.layout
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn window_pointer_enter(
|
||||||
|
&self,
|
||||||
|
request: Request<Streaming<WindowPointerEnterRequest>>,
|
||||||
|
) -> Result<Response<Self::WindowPointerEnterStream>, Status> {
|
||||||
|
let in_stream = request.into_inner();
|
||||||
|
|
||||||
|
start_signal_stream(self.sender.clone(), in_stream, |state| {
|
||||||
|
&mut state.signal_state.window_pointer_enter
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn window_pointer_leave(
|
||||||
|
&self,
|
||||||
|
request: Request<Streaming<WindowPointerLeaveRequest>>,
|
||||||
|
) -> Result<Response<Self::WindowPointerLeaveStream>, Status> {
|
||||||
|
let in_stream = request.into_inner();
|
||||||
|
|
||||||
|
start_signal_stream(self.sender.clone(), in_stream, |state| {
|
||||||
|
&mut state.signal_state.window_pointer_leave
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
api::{
|
api::{
|
||||||
InputService, OutputService, PinnacleService, ProcessService, TagService, WindowService,
|
signal::SignalService, InputService, OutputService, PinnacleService, ProcessService,
|
||||||
|
TagService, WindowService,
|
||||||
},
|
},
|
||||||
input::ModifierMask,
|
input::ModifierMask,
|
||||||
output::OutputName,
|
output::OutputName,
|
||||||
|
@ -18,6 +19,7 @@ use pinnacle_api_defs::pinnacle::{
|
||||||
input::v0alpha1::input_service_server::InputServiceServer,
|
input::v0alpha1::input_service_server::InputServiceServer,
|
||||||
output::v0alpha1::{output_service_server::OutputServiceServer, ConnectForAllResponse},
|
output::v0alpha1::{output_service_server::OutputServiceServer, ConnectForAllResponse},
|
||||||
process::v0alpha1::process_service_server::ProcessServiceServer,
|
process::v0alpha1::process_service_server::ProcessServiceServer,
|
||||||
|
signal::v0alpha1::signal_service_server::SignalServiceServer,
|
||||||
tag::v0alpha1::tag_service_server::TagServiceServer,
|
tag::v0alpha1::tag_service_server::TagServiceServer,
|
||||||
v0alpha1::pinnacle_service_server::PinnacleServiceServer,
|
v0alpha1::pinnacle_service_server::PinnacleServiceServer,
|
||||||
window::v0alpha1::window_service_server::WindowServiceServer,
|
window::v0alpha1::window_service_server::WindowServiceServer,
|
||||||
|
@ -447,6 +449,7 @@ impl State {
|
||||||
let tag_service = TagService::new(grpc_sender.clone());
|
let tag_service = TagService::new(grpc_sender.clone());
|
||||||
let output_service = OutputService::new(grpc_sender.clone());
|
let output_service = OutputService::new(grpc_sender.clone());
|
||||||
let window_service = WindowService::new(grpc_sender.clone());
|
let window_service = WindowService::new(grpc_sender.clone());
|
||||||
|
let signal_service = SignalService::new(grpc_sender.clone());
|
||||||
|
|
||||||
let refl_service = tonic_reflection::server::Builder::configure()
|
let refl_service = tonic_reflection::server::Builder::configure()
|
||||||
.register_encoded_file_descriptor_set(pinnacle_api_defs::FILE_DESCRIPTOR_SET)
|
.register_encoded_file_descriptor_set(pinnacle_api_defs::FILE_DESCRIPTOR_SET)
|
||||||
|
@ -464,7 +467,8 @@ impl State {
|
||||||
.add_service(ProcessServiceServer::new(process_service))
|
.add_service(ProcessServiceServer::new(process_service))
|
||||||
.add_service(TagServiceServer::new(tag_service))
|
.add_service(TagServiceServer::new(tag_service))
|
||||||
.add_service(OutputServiceServer::new(output_service))
|
.add_service(OutputServiceServer::new(output_service))
|
||||||
.add_service(WindowServiceServer::new(window_service));
|
.add_service(WindowServiceServer::new(window_service))
|
||||||
|
.add_service(SignalServiceServer::new(signal_service));
|
||||||
|
|
||||||
match self.xdisplay.as_ref() {
|
match self.xdisplay.as_ref() {
|
||||||
Some(_) => {
|
Some(_) => {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
// SPDX-License-Identifier: GPL-3.0-or-later
|
// SPDX-License-Identifier: GPL-3.0-or-later
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
backend::Backend, config::Config, cursor::Cursor, focus::FocusState,
|
api::signal::SignalState, backend::Backend, config::Config, cursor::Cursor, focus::FocusState,
|
||||||
grab::resize_grab::ResizeSurfaceState, window::WindowElement,
|
grab::resize_grab::ResizeSurfaceState, window::WindowElement,
|
||||||
};
|
};
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
@ -93,6 +93,8 @@ pub struct State {
|
||||||
pub grpc_server_join_handle: Option<tokio::task::JoinHandle<()>>,
|
pub grpc_server_join_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
|
||||||
pub xdg_base_dirs: BaseDirectories,
|
pub xdg_base_dirs: BaseDirectories,
|
||||||
|
|
||||||
|
pub signal_state: SignalState,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
|
@ -269,6 +271,8 @@ impl State {
|
||||||
|
|
||||||
xdg_base_dirs: BaseDirectories::with_prefix("pinnacle")
|
xdg_base_dirs: BaseDirectories::with_prefix("pinnacle")
|
||||||
.context("couldn't create xdg BaseDirectories")?,
|
.context("couldn't create xdg BaseDirectories")?,
|
||||||
|
|
||||||
|
signal_state: SignalState::default(),
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(state)
|
Ok(state)
|
||||||
|
|
Loading…
Add table
Reference in a new issue