Make connect_for_all use signals

This commit is contained in:
Ottatop 2024-02-21 23:30:28 -06:00
parent 7d94fc4362
commit 6b2b7066ac
7 changed files with 8 additions and 45 deletions

View file

@ -159,11 +159,9 @@ function output.connect_for_all(callback)
callback(handle) callback(handle)
end end
client.server_streaming_request(build_grpc_request_params("ConnectForAll", {}), function(response) output.connect_signal({
local output_name = response.output_name connect = callback,
local handle = output_handle.new(output_name) })
callback(handle)
end)
end end
---@class OutputSignal ---@class OutputSignal

View file

@ -186,7 +186,7 @@ end
---@param callback fun(response: table) ---@param callback fun(response: table)
function signal.connect(request, callback) function signal.connect(request, callback)
local stream = client.bidirectional_streaming_request( local stream = client.bidirectional_streaming_request(
build_grpc_request_params("Layout", { build_grpc_request_params(request, {
control = stream_control.READY, control = stream_control.READY,
}), }),
function(response) function(response)

View file

@ -10,11 +10,6 @@ message SetLocationRequest {
optional int32 y = 3; optional int32 y = 3;
} }
message ConnectForAllRequest {}
message ConnectForAllResponse {
optional string output_name = 1;
}
message GetRequest {} message GetRequest {}
message GetResponse { message GetResponse {
repeated string output_names = 1; repeated string output_names = 1;
@ -41,7 +36,6 @@ message GetPropertiesResponse {
service OutputService { service OutputService {
rpc SetLocation(SetLocationRequest) returns (google.protobuf.Empty); rpc SetLocation(SetLocationRequest) returns (google.protobuf.Empty);
rpc ConnectForAll(ConnectForAllRequest) returns (stream ConnectForAllResponse);
rpc Get(GetRequest) returns (GetResponse); rpc Get(GetRequest) returns (GetResponse);
rpc GetProperties(GetPropertiesRequest) returns (GetPropertiesResponse); rpc GetProperties(GetPropertiesRequest) returns (GetPropertiesResponse);
} }

View file

@ -2,8 +2,6 @@ syntax = "proto2";
package pinnacle.signal.v0alpha1; package pinnacle.signal.v0alpha1;
import "google/protobuf/empty.proto";
enum StreamControl { enum StreamControl {
STREAM_CONTROL_UNSPECIFIED = 0; STREAM_CONTROL_UNSPECIFIED = 0;
// The client is ready to receive the next signal. // The client is ready to receive the next signal.

View file

@ -12,9 +12,7 @@ use pinnacle_api_defs::pinnacle::{
}, },
output::{ output::{
self, self,
v0alpha1::{ v0alpha1::{output_service_server, SetLocationRequest},
output_service_server, ConnectForAllRequest, ConnectForAllResponse, SetLocationRequest,
},
}, },
process::v0alpha1::{process_service_server, SetEnvRequest, SpawnRequest, SpawnResponse}, process::v0alpha1::{process_service_server, SetEnvRequest, SpawnRequest, SpawnResponse},
tag::{ tag::{
@ -944,8 +942,6 @@ impl OutputService {
#[tonic::async_trait] #[tonic::async_trait]
impl output_service_server::OutputService for OutputService { impl output_service_server::OutputService for OutputService {
type ConnectForAllStream = ResponseStream<ConnectForAllResponse>;
async fn set_location( async fn set_location(
&self, &self,
request: Request<SetLocationRequest>, request: Request<SetLocationRequest>,
@ -997,18 +993,6 @@ impl output_service_server::OutputService for OutputService {
.await .await
} }
// TODO: remove this and integrate it into a signal/event system
async fn connect_for_all(
&self,
_request: Request<ConnectForAllRequest>,
) -> Result<Response<Self::ConnectForAllStream>, Status> {
tracing::trace!("OutputService.connect_for_all");
run_server_streaming(&self.sender, move |state, sender| {
state.config.output_callback_senders.push(sender);
})
}
async fn get( async fn get(
&self, &self,
_request: Request<output::v0alpha1::GetRequest>, _request: Request<output::v0alpha1::GetRequest>,

View file

@ -8,9 +8,7 @@ use std::{
}; };
use anyhow::Context; use anyhow::Context;
use pinnacle_api_defs::pinnacle::{ use pinnacle_api_defs::pinnacle::signal::v0alpha1::OutputConnectResponse;
output::v0alpha1::ConnectForAllResponse, signal::v0alpha1::OutputConnectResponse,
};
use smithay::{ use smithay::{
backend::{ backend::{
allocator::{ allocator::{
@ -985,13 +983,6 @@ impl State {
output.with_state(|state| state.tags = tags.clone()); output.with_state(|state| state.tags = tags.clone());
} else { } else {
// Run any output callbacks
for sender in self.config.output_callback_senders.iter() {
let _ = sender.send(Ok(ConnectForAllResponse {
output_name: Some(output.name()),
}));
}
self.signal_state.output_connect.signal(|buffer| { self.signal_state.output_connect.signal(|buffer| {
buffer.push_back(OutputConnectResponse { buffer.push_back(OutputConnectResponse {
output_name: Some(output.name()), output_name: Some(output.name()),

View file

@ -17,7 +17,7 @@ use std::{
use anyhow::Context; use anyhow::Context;
use pinnacle_api_defs::pinnacle::{ use pinnacle_api_defs::pinnacle::{
input::v0alpha1::input_service_server::InputServiceServer, input::v0alpha1::input_service_server::InputServiceServer,
output::v0alpha1::{output_service_server::OutputServiceServer, ConnectForAllResponse}, output::v0alpha1::output_service_server::OutputServiceServer,
process::v0alpha1::process_service_server::ProcessServiceServer, process::v0alpha1::process_service_server::ProcessServiceServer,
signal::v0alpha1::signal_service_server::SignalServiceServer, signal::v0alpha1::signal_service_server::SignalServiceServer,
tag::v0alpha1::tag_service_server::TagServiceServer, tag::v0alpha1::tag_service_server::TagServiceServer,
@ -30,7 +30,7 @@ use smithay::{
utils::{Logical, Point}, utils::{Logical, Point},
}; };
use sysinfo::ProcessRefreshKind; use sysinfo::ProcessRefreshKind;
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; use tokio::task::JoinHandle;
use toml::Table; use toml::Table;
use xdg::BaseDirectories; use xdg::BaseDirectories;
@ -166,7 +166,6 @@ pub enum Key {
pub struct Config { pub struct Config {
/// Window rules and conditions on when those rules should apply /// Window rules and conditions on when those rules should apply
pub window_rules: Vec<(WindowRuleCondition, WindowRule)>, pub window_rules: Vec<(WindowRuleCondition, WindowRule)>,
pub output_callback_senders: Vec<UnboundedSender<Result<ConnectForAllResponse, tonic::Status>>>,
/// Saved states when outputs are disconnected /// Saved states when outputs are disconnected
pub connector_saved_states: HashMap<OutputName, ConnectorSavedState>, pub connector_saved_states: HashMap<OutputName, ConnectorSavedState>,
@ -177,7 +176,6 @@ pub struct Config {
impl Config { impl Config {
pub fn clear(&mut self, loop_handle: &LoopHandle<State>) { pub fn clear(&mut self, loop_handle: &LoopHandle<State>) {
self.window_rules.clear(); self.window_rules.clear();
self.output_callback_senders.clear();
self.connector_saved_states.clear(); self.connector_saved_states.clear();
if let Some(join_handle) = self.config_join_handle.take() { if let Some(join_handle) = self.config_join_handle.take() {
join_handle.abort(); join_handle.abort();