diff --git a/rkvm-client/src/client.rs b/rkvm-client/src/client.rs index 6ea5a35..4fd3aa0 100644 --- a/rkvm-client/src/client.rs +++ b/rkvm-client/src/client.rs @@ -140,22 +140,17 @@ pub async fn run( log::info!("Destroyed device {}", id); } - Update::EventBatch { id, events } => { + Update::Event { id, event } => { let writer = writers.get_mut(&id).ok_or_else(|| { Error::Network(io::Error::new( io::ErrorKind::InvalidData, - "Server sent events to a nonexistent device", + "Server sent an event to a nonexistent device", )) })?; - writer.write(&events).await.map_err(Error::Input)?; + writer.write(&event).await.map_err(Error::Input)?; - log::trace!( - "Wrote {} event{} to device {}", - events.len(), - if events.len() == 1 { "" } else { "s" }, - id - ); + log::trace!("Wrote an event to device {}", id); } } } diff --git a/rkvm-client/src/main.rs b/rkvm-client/src/main.rs index 499e4bd..bdc5da9 100644 --- a/rkvm-client/src/main.rs +++ b/rkvm-client/src/main.rs @@ -7,8 +7,7 @@ use config::Config; use log::LevelFilter; use std::path::PathBuf; use std::process::ExitCode; -use tokio::fs; -use tokio::signal; +use tokio::{fs, signal}; #[derive(Parser)] #[structopt(name = "rkvm-client", about = "The rkvm client application")] diff --git a/rkvm-client/src/tls.rs b/rkvm-client/src/tls.rs index 725a8e0..a32832a 100644 --- a/rkvm-client/src/tls.rs +++ b/rkvm-client/src/tls.rs @@ -1,5 +1,6 @@ +use std::io; +use std::path::Path; use std::sync::Arc; -use std::{io, path::Path}; use thiserror::Error; use tokio::fs; use tokio_rustls::rustls::{self, Certificate, ClientConfig, RootCertStore}; diff --git a/rkvm-input/src/abs.rs b/rkvm-input/src/abs.rs index 9f184b5..a42f323 100644 --- a/rkvm-input/src/abs.rs +++ b/rkvm-input/src/abs.rs @@ -2,25 +2,12 @@ use crate::glue; use serde::{Deserialize, Serialize}; -// #define ABS_MT_SLOT 0x2f /* MT slot being modified */ -// #define ABS_MT_TOUCH_MAJOR 0x30 /* Major axis of touching ellipse */ -// #define ABS_MT_TOUCH_MINOR 0x31 /* Minor axis (omit if circular) */ -// #define ABS_MT_WIDTH_MAJOR 0x32 /* Major axis of approaching ellipse */ -// #define ABS_MT_WIDTH_MINOR 0x33 /* Minor axis (omit if circular) */ -// #define ABS_MT_ORIENTATION 0x34 /* Ellipse orientation */ -// #define ABS_MT_POSITION_X 0x35 /* Center X touch position */ -// #define ABS_MT_POSITION_Y 0x36 /* Center Y touch position */ -// #define ABS_MT_TOOL_TYPE 0x37 /* Type of touching device */ -// #define ABS_MT_BLOB_ID 0x38 /* Group a set of packets as a blob */ -// #define ABS_MT_TRACKING_ID 0x39 /* Unique ID of initiated contact */ -// #define ABS_MT_PRESSURE 0x3a /* Pressure on contact area */ -// #define ABS_MT_DISTANCE 0x3b /* Contact hover distance */ -// #define ABS_MT_TOOL_X 0x3c /* Center X tool position */ -// #define ABS_MT_TOOL_Y 0x3d /* Center Y tool position */ #[derive(Clone, Copy, Debug, Serialize, Deserialize)] -pub struct AbsEvent { - pub axis: AbsAxis, - pub value: i32, +pub enum AbsEvent { + Axis { axis: AbsAxis, value: i32 }, + MtToolType { value: ToolType }, + // TODO: This might actually belong to the Axis variant. + MtBlobId { value: i32 }, } #[derive(Clone, Copy, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] @@ -52,6 +39,19 @@ pub enum AbsAxis { Volume, Profile, Misc, + MtSlot, + MtTouchMajor, + MtTouchMinor, + MtWidthMajor, + MtWidthMinor, + MtOrientation, + MtPositionX, + MtPositionY, + MtTrackingId, + MtPressure, + MtDistance, + MtToolX, + MtToolY, } impl AbsAxis { @@ -84,6 +84,19 @@ impl AbsAxis { glue::ABS_VOLUME => Self::Volume, glue::ABS_PROFILE => Self::Profile, glue::ABS_MISC => Self::Misc, + glue::ABS_MT_SLOT => Self::MtSlot, + glue::ABS_MT_TOUCH_MAJOR => Self::MtTouchMajor, + glue::ABS_MT_TOUCH_MINOR => Self::MtTouchMinor, + glue::ABS_MT_WIDTH_MAJOR => Self::MtWidthMajor, + glue::ABS_MT_WIDTH_MINOR => Self::MtWidthMinor, + glue::ABS_MT_ORIENTATION => Self::MtOrientation, + glue::ABS_MT_POSITION_X => Self::MtPositionX, + glue::ABS_MT_POSITION_Y => Self::MtPositionY, + glue::ABS_MT_TRACKING_ID => Self::MtTrackingId, + glue::ABS_MT_PRESSURE => Self::MtPressure, + glue::ABS_MT_DISTANCE => Self::MtDistance, + glue::ABS_MT_TOOL_X => Self::MtToolX, + glue::ABS_MT_TOOL_Y => Self::MtToolY, _ => return None, }; @@ -119,6 +132,19 @@ impl AbsAxis { Self::Volume => glue::ABS_VOLUME, Self::Profile => glue::ABS_PROFILE, Self::Misc => glue::ABS_MISC, + Self::MtSlot => glue::ABS_MT_SLOT, + Self::MtTouchMajor => glue::ABS_MT_TOUCH_MAJOR, + Self::MtTouchMinor => glue::ABS_MT_TOUCH_MINOR, + Self::MtWidthMajor => glue::ABS_MT_WIDTH_MAJOR, + Self::MtWidthMinor => glue::ABS_MT_WIDTH_MINOR, + Self::MtOrientation => glue::ABS_MT_ORIENTATION, + Self::MtPositionX => glue::ABS_MT_POSITION_X, + Self::MtPositionY => glue::ABS_MT_POSITION_Y, + Self::MtTrackingId => glue::ABS_MT_TRACKING_ID, + Self::MtPressure => glue::ABS_MT_PRESSURE, + Self::MtDistance => glue::ABS_MT_DISTANCE, + Self::MtToolX => glue::ABS_MT_TOOL_X, + Self::MtToolY => glue::ABS_MT_TOOL_Y, }; code as _ @@ -135,10 +161,35 @@ pub struct AbsInfo { pub resolution: i32, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Clone, Copy, Serialize, Deserialize, Debug)] pub enum ToolType { Finger, Pen, Palm, Dial, } + +impl ToolType { + pub(crate) fn from_raw(value: i32) -> Option { + let value = match value as _ { + glue::MT_TOOL_FINGER => Self::Finger, + glue::MT_TOOL_PEN => Self::Pen, + glue::MT_TOOL_PALM => Self::Palm, + glue::MT_TOOL_DIAL => Self::Dial, + _ => return None, + }; + + Some(value) + } + + pub(crate) fn to_raw(&self) -> i32 { + let value = match self { + Self::Finger => glue::MT_TOOL_FINGER, + Self::Pen => glue::MT_TOOL_PEN, + Self::Palm => glue::MT_TOOL_PALM, + Self::Dial => glue::MT_TOOL_DIAL, + }; + + value as _ + } +} diff --git a/rkvm-input/src/event.rs b/rkvm-input/src/event.rs index 0e6c602..38744a6 100644 --- a/rkvm-input/src/event.rs +++ b/rkvm-input/src/event.rs @@ -1,15 +1,14 @@ use crate::abs::AbsEvent; use crate::key::KeyEvent; use crate::rel::RelEvent; +use crate::sync::SyncEvent; use serde::{Deserialize, Serialize}; -use smallvec::SmallVec; - -pub type Packet = SmallVec<[Event; 2]>; #[derive(Debug, Serialize, Deserialize)] pub enum Event { Rel(RelEvent), Abs(AbsEvent), Key(KeyEvent), + Sync(SyncEvent), } diff --git a/rkvm-input/src/interceptor.rs b/rkvm-input/src/interceptor.rs index a406028..db77c50 100644 --- a/rkvm-input/src/interceptor.rs +++ b/rkvm-input/src/interceptor.rs @@ -2,17 +2,18 @@ mod caps; pub use caps::{AbsCaps, KeyCaps, RelCaps}; -use crate::abs::{AbsAxis, AbsEvent}; -use crate::event::{Event, Packet}; +use crate::abs::{AbsAxis, AbsEvent, ToolType}; +use crate::event::Event; use crate::glue::{self, libevdev}; use crate::key::{Key, KeyEvent}; use crate::rel::{RelAxis, RelEvent}; +use crate::sync::SyncEvent; use crate::writer::Writer; +use std::collections::VecDeque; use std::ffi::CStr; use std::fs::{File, OpenOptions}; use std::io::{Error, ErrorKind}; -use std::mem; use std::mem::MaybeUninit; use std::os::fd::AsRawFd; use std::os::unix::prelude::OpenOptionsExt; @@ -27,113 +28,82 @@ pub struct Interceptor { evdev: NonNull, writer: Writer, // The state of `read` is stored here to make it cancel safe. - events: Packet, - wrote: bool, + events: VecDeque, + writing: Option<(u16, u16, i32)>, dropped: bool, - writing: Option, } impl Interceptor { - pub async fn read(&mut self) -> Result { - if let Some(writing) = self.writing { - let (r#type, code, value) = match writing { - Writing::Event { - r#type, - code, - value, - } => (r#type, code, value), - Writing::Sync => (glue::EV_SYN as _, glue::SYN_REPORT as _, 0), - }; + pub async fn read(&mut self) -> Result { + if let Some((r#type, code, value)) = self.writing { + log::trace!("Resuming interrupted write"); self.writer.write_raw(r#type, code, value).await?; self.writing = None; } - loop { - loop { - let (r#type, code, value) = self.read_raw().await?; - let event = match r#type as _ { - glue::EV_REL if !self.dropped => { - RelAxis::from_raw(code).map(|axis| Event::Rel(RelEvent { axis, value })) + while !matches!(self.events.back(), Some(Event::Sync(SyncEvent::All))) { + let (r#type, code, value) = self.read_raw().await?; + let event = match r#type as _ { + glue::EV_REL if !self.dropped => { + RelAxis::from_raw(code).map(|axis| Event::Rel(RelEvent { axis, value })) + } + glue::EV_ABS if !self.dropped => match code as _ { + glue::ABS_MT_TOOL_TYPE => { + ToolType::from_raw(value).map(|value| AbsEvent::MtToolType { value }) } - glue::EV_ABS if !self.dropped => { - AbsAxis::from_raw(code).map(|axis| Event::Abs(AbsEvent { axis, value })) - } - glue::EV_KEY if !self.dropped && (value == 0 || value == 1) => { - Key::from_raw(code).map(|key| { - Event::Key(KeyEvent { - key, - down: value == 1, - }) + glue::ABS_MT_BLOB_ID => Some(AbsEvent::MtBlobId { value }), + _ => AbsAxis::from_raw(code).map(|axis| AbsEvent::Axis { axis, value }), + } + .map(Event::Abs), + glue::EV_KEY if !self.dropped && (value == 0 || value == 1) => Key::from_raw(code) + .map(|key| { + Event::Key(KeyEvent { + key, + down: value == 1, }) - } - glue::EV_SYN => match code as _ { - glue::SYN_REPORT => { - if self.dropped { - self.dropped = false; - continue; - } - - break; - } - glue::SYN_DROPPED => { - log::warn!( - "Dropped {} event{}", - self.events.len(), - if self.events.len() == 1 { "" } else { "s" } - ); - - self.events.clear(); - self.dropped = true; + }), + glue::EV_SYN => match code as _ { + glue::SYN_REPORT => { + if self.dropped { + self.dropped = false; continue; } - _ => continue, - }, - _ => None, - }; - if let Some(event) = event { - self.events.push(event); - continue; - } + Some(Event::Sync(SyncEvent::All)) + } + glue::SYN_DROPPED => { + log::warn!( + "Dropped {} event{}", + self.events.len(), + if self.events.len() == 1 { "" } else { "s" } + ); - log::trace!( - "Writing back unknown event (type {}, code {}, value {})", - r#type, - code, - value - ); + self.events.clear(); + self.dropped = true; + continue; + } + glue::SYN_MT_REPORT if !self.dropped => Some(Event::Sync(SyncEvent::Mt)), + _ => continue, + }, + _ => None, + }; - self.writing = Some(Writing::Event { - r#type, - code, - value, - }); - self.writer.write_raw(r#type, code, value).await?; - self.writing = None; - self.wrote = true; + if let Some(event) = event { + self.events.push_back(event); + continue; } - // Write an EV_SYN only if we actually wrote something back. - if self.wrote { - self.writing = Some(Writing::Sync); - self.writer - .write_raw(glue::EV_SYN as _, glue::SYN_REPORT as _, 0) - .await?; - self.writing = None; - self.wrote = false; - } - - if !self.events.is_empty() { - return Ok(mem::take(&mut self.events)); - } - - // At this point, we received an EV_SYN, but no actual events useful to us, so try again. + self.writing = Some((r#type, code, value)); + self.writer.write_raw(r#type, code, value).await?; + self.writing = None; } + + Ok(self.events.pop_front().unwrap()) } - pub async fn write(&mut self, events: &[Event]) -> Result<(), Error> { - self.writer.write(events).await + pub async fn write(&mut self, event: &Event) -> Result<(), Error> { + self.writer.write(event).await } pub fn name(&self) -> &CStr { @@ -276,9 +246,7 @@ impl Interceptor { file, evdev, writer, - - events: Packet::new(), - wrote: false, + events: VecDeque::new(), dropped: false, writing: None, }) @@ -295,12 +263,6 @@ impl Drop for Interceptor { unsafe impl Send for Interceptor {} -#[derive(Clone, Copy)] -enum Writing { - Event { r#type: u16, code: u16, value: i32 }, - Sync, -} - #[derive(Error, Debug)] pub(crate) enum OpenError { #[error("Not appliable")] diff --git a/rkvm-input/src/lib.rs b/rkvm-input/src/lib.rs index 11c5110..38d3b69 100644 --- a/rkvm-input/src/lib.rs +++ b/rkvm-input/src/lib.rs @@ -4,10 +4,7 @@ pub mod interceptor; pub mod key; pub mod monitor; pub mod rel; +pub mod sync; pub mod writer; mod glue; - -pub use event::{Event, Packet}; -pub use interceptor::Interceptor; -pub use monitor::Monitor; diff --git a/rkvm-input/src/sync.rs b/rkvm-input/src/sync.rs new file mode 100644 index 0000000..5477852 --- /dev/null +++ b/rkvm-input/src/sync.rs @@ -0,0 +1,20 @@ +use crate::glue; + +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub enum SyncEvent { + All, + Mt, +} + +impl SyncEvent { + pub(crate) fn to_raw(&self) -> u16 { + let code = match self { + Self::All => glue::SYN_REPORT, + Self::Mt => glue::SYN_MT_REPORT, + }; + + code as _ + } +} diff --git a/rkvm-input/src/writer.rs b/rkvm-input/src/writer.rs index b35c82a..70ed3d8 100644 --- a/rkvm-input/src/writer.rs +++ b/rkvm-input/src/writer.rs @@ -1,7 +1,7 @@ use crate::abs::{AbsAxis, AbsEvent, AbsInfo}; use crate::event::Event; use crate::glue::{self, input_absinfo, libevdev, libevdev_uinput}; -use crate::key::{Key, KeyEvent, Keyboard}; +use crate::key::{Key, KeyEvent}; use crate::rel::{RelAxis, RelEvent}; use std::ffi::{CStr, OsStr}; @@ -12,8 +12,8 @@ use std::os::fd::AsRawFd; use std::os::unix::ffi::OsStrExt; use std::os::unix::prelude::OpenOptionsExt; use std::path::Path; +use std::ptr; use std::ptr::NonNull; -use std::{iter, ptr}; use tokio::io::unix::AsyncFd; pub struct Writer { @@ -26,20 +26,21 @@ impl Writer { WriterBuilder::new() } - pub async fn write(&mut self, events: &[Event]) -> Result<(), Error> { - let events = events - .iter() - .map(|event| match event { - Event::Rel(RelEvent { axis, value }) => (glue::EV_REL, axis.to_raw(), *value), - Event::Abs(AbsEvent { axis, value }) => (glue::EV_ABS, axis.to_raw(), *value), - Event::Key(KeyEvent { down, key }) => (glue::EV_KEY, key.to_raw(), *down as _), - }) - .chain(iter::once((glue::EV_SYN, glue::SYN_REPORT as _, 0))); - - for (r#type, code, value) in events { - self.write_raw(r#type as _, code, value).await?; - } + pub async fn write(&mut self, event: &Event) -> Result<(), Error> { + let (r#type, code, value) = match event { + Event::Rel(RelEvent { axis, value }) => (glue::EV_REL, axis.to_raw(), *value), + Event::Abs(event) => match event { + AbsEvent::Axis { axis, value } => (glue::EV_ABS, axis.to_raw(), *value), + AbsEvent::MtToolType { value } => { + (glue::EV_ABS, glue::ABS_MT_TOOL_TYPE as _, value.to_raw()) + } + AbsEvent::MtBlobId { value } => (glue::EV_ABS, glue::ABS_MT_BLOB_ID as _, *value), + }, + Event::Key(KeyEvent { down, key }) => (glue::EV_KEY, key.to_raw(), *down as _), + Event::Sync(event) => (glue::EV_SYN, event.to_raw(), 0), + }; + self.write_raw(r#type as _, code, value).await?; Ok(()) } @@ -184,6 +185,19 @@ impl WriterBuilder { &mut self, items: T, ) -> Result<&mut Self, Error> { + let ret = unsafe { + glue::libevdev_enable_event_code( + self.evdev.as_ptr(), + glue::EV_SYN, + glue::SYN_MT_REPORT, + ptr::null(), + ) + }; + + if ret < 0 { + return Err(Error::from_raw_os_error(-ret)); + } + for (axis, info) in items { let info = input_absinfo { value: info.min, diff --git a/rkvm-net/src/auth.rs b/rkvm-net/src/auth.rs index c3d9de3..335fcf6 100644 --- a/rkvm-net/src/auth.rs +++ b/rkvm-net/src/auth.rs @@ -1,7 +1,6 @@ use hmac::{Hmac, Mac}; use rand::rngs::OsRng; -use rand::Error; -use rand::Rng; +use rand::{Error, Rng}; use serde::{Deserialize, Serialize}; use sha2::Sha256; use tokio::task; diff --git a/rkvm-net/src/lib.rs b/rkvm-net/src/lib.rs index 9e00ca1..32d339e 100644 --- a/rkvm-net/src/lib.rs +++ b/rkvm-net/src/lib.rs @@ -3,9 +3,9 @@ pub mod message; pub mod version; use rkvm_input::abs::{AbsAxis, AbsInfo}; +use rkvm_input::event::Event; use rkvm_input::key::Key; use rkvm_input::rel::RelAxis; -use rkvm_input::Packet; use serde::{Deserialize, Serialize}; use std::collections::{HashMap, HashSet}; use std::ffi::CString; @@ -21,13 +21,12 @@ pub enum Update { rel: HashSet, abs: HashMap, keys: HashSet, - // Multitouch events intentionally omitted. }, DestroyDevice { id: usize, }, - EventBatch { + Event { id: usize, - events: Packet, + event: Event, }, } diff --git a/rkvm-server/src/main.rs b/rkvm-server/src/main.rs index 4da77f9..df70ccc 100644 --- a/rkvm-server/src/main.rs +++ b/rkvm-server/src/main.rs @@ -9,9 +9,7 @@ use std::future; use std::path::PathBuf; use std::process::ExitCode; use std::time::Duration; -use tokio::fs; -use tokio::signal; -use tokio::time; +use tokio::{fs, signal, time}; #[derive(Parser)] #[structopt(name = "rkvm-server", about = "The rkvm server application")] diff --git a/rkvm-server/src/server.rs b/rkvm-server/src/server.rs index fe801a7..5e105e6 100644 --- a/rkvm-server/src/server.rs +++ b/rkvm-server/src/server.rs @@ -1,13 +1,14 @@ use rkvm_input::abs::{AbsAxis, AbsInfo}; +use rkvm_input::event::Event; use rkvm_input::key::{Key, KeyEvent, Keyboard}; +use rkvm_input::monitor::Monitor; use rkvm_input::rel::RelAxis; -use rkvm_input::{Event, Interceptor, Monitor, Packet}; use rkvm_net::auth::{AuthChallenge, AuthResponse, AuthStatus}; use rkvm_net::message::Message; use rkvm_net::version::Version; use rkvm_net::Update; use slab::Slab; -use std::collections::{HashMap, HashSet}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::ffi::CString; use std::io::{self, ErrorKind}; use std::net::SocketAddr; @@ -15,6 +16,7 @@ use std::time::Duration; use thiserror::Error; use tokio::io::{AsyncWriteExt, BufStream}; use tokio::net::{TcpListener, TcpStream}; +use tokio::sync::mpsc::error::TrySendError; use tokio::sync::mpsc::{self, Receiver, Sender}; use tokio::time; use tokio_rustls::TlsAcceptor; @@ -25,6 +27,8 @@ pub enum Error { Network(io::Error), #[error("Input error: {0}")] Input(io::Error), + #[error("Event queue overflow")] + Overflow, } pub async fn run( @@ -59,9 +63,9 @@ pub async fn run( current = 0; } - let (sender, receiver) = mpsc::channel(devices.len()); - for (id, device) in &devices { - let update = Update::CreateDevice { + let init_updates = devices + .iter() + .map(|(id, device)| Update::CreateDevice { id, name: device.name.clone(), version: device.version, @@ -70,17 +74,16 @@ pub async fn run( rel: device.rel.clone(), abs: device.abs.clone(), keys: device.keys.clone(), - }; - - sender.try_send(update).unwrap(); - } + }) + .collect(); + let (sender, receiver) = mpsc::channel(1); clients.insert(sender); tokio::spawn(async move { log::info!("{}: Connected", addr); - match client(receiver, stream, addr, acceptor, &password).await { + match client(init_updates, receiver, stream, addr, acceptor, &password).await { Ok(()) => log::info!("{}: Disconnected", addr), Err(err) => log::error!("{}: Disconnected: {}", addr, err), } @@ -113,7 +116,7 @@ pub async fn run( let _ = sender.send(update).await; } - let (interceptor_sender, mut interceptor_receiver) = mpsc::channel::(1); + let (interceptor_sender, mut interceptor_receiver) = mpsc::channel(32); devices.insert(Device { name, version, @@ -129,18 +132,18 @@ pub async fn run( tokio::spawn(async move { loop { tokio::select! { - events = interceptor.read() => { - if events.is_err() | events_sender.send((id, events)).await.is_err() { + event = interceptor.read() => { + if event.is_err() | events_sender.send((id, event)).await.is_err() { break; } } - events = interceptor_receiver.recv() => { - let events = match events { - Some(events) => events, + event = interceptor_receiver.recv() => { + let event = match event { + Some(event) => event, None => break, }; - match interceptor.write(&events).await { + match interceptor.write(&event).await { Ok(()) => {}, Err(err) => { let _ = events_sender.send((id, Err(err))).await; @@ -148,11 +151,7 @@ pub async fn run( } } - log::trace!( - "Wrote {} event{}", - events.len(), - if events.len() == 1 { "" } else { "s" } - ); + log::trace!("Wrote an event to device {}", id); } } } @@ -170,25 +169,18 @@ pub async fn run( ); } (id, result) = event => match result { - Ok(events) => { + Ok(event) => { let mut changed = false; - for event in &events { - let (key, down) = match event { - Event::Key(KeyEvent { key: Key::Key(key), down }) => (key, down), - _ => continue, - }; + if let Event::Key(KeyEvent { key: Key::Key(key), down }) = event { + if switch_keys.contains(&key) { + changed = true; - if !switch_keys.contains(key) { - continue; + match down { + true => pressed_keys.insert(key), + false => pressed_keys.remove(&key), + }; } - - changed = true; - - match down { - true => pressed_keys.insert(*key), - false => pressed_keys.remove(key), - }; } // Who to send this batch of events to. @@ -202,14 +194,23 @@ pub async fn run( break; } } + + log::debug!("Switched to client {}", current); } + // Index 0 - special case to keep the modular arithmetic above working. if idx == 0 { - let _ = devices[id].sender.send(events).await; - continue; + // We do a try_send() here rather than a "blocking" send in order to prevent deadlocks. + // In this scenario, the interceptor task is sending events to the main task, + // while the main task is simultaneously sending events back to the interceptor. + // This creates a classic deadlock situation where both tasks are waiting for each other. + match devices[id].sender.try_send(event) { + Ok(()) | Err(TrySendError::Closed(_)) => continue, + Err(TrySendError::Full(_)) => return Err(Error::Overflow), + } } - if clients[idx - 1].send(Update::EventBatch { id, events }).await.is_err() { + if clients[idx - 1].send(Update::Event { id, event }).await.is_err() { clients.remove(idx - 1); if current == idx { @@ -239,7 +240,7 @@ struct Device { rel: HashSet, abs: HashMap, keys: HashSet, - sender: Sender, + sender: Sender, } #[derive(Error, Debug)] @@ -255,6 +256,7 @@ enum ClientError { } async fn client( + mut init_updates: VecDeque, mut receiver: Receiver, stream: TcpStream, addr: SocketAddr, @@ -308,9 +310,32 @@ async fn client( .await .map_err(|_| io::Error::new(ErrorKind::TimedOut, "Negotiation took too long"))??; - while let Some(update) = receiver.recv().await { + loop { + let recv = async { + match init_updates.pop_front() { + Some(update) => Some((update, false)), + None => receiver.recv().await.map(|update| (update, true)), + } + }; + + let (update, more) = match recv.await { + Some(update) => update, + None => break, + }; + + let mut count = 1; + let write = async { update.encode(&mut stream).await?; + + // Coalesce multiple updates into one chunk. + if more { + while let Ok(update) = receiver.try_recv() { + update.encode(&mut stream).await?; + count += 1; + } + } + stream.flush().await }; @@ -318,7 +343,12 @@ async fn client( .await .map_err(|_| io::Error::new(ErrorKind::TimedOut, "Update writing took too long"))??; - log::trace!("{}: Wrote an update", addr); + log::trace!( + "{}: Wrote {} update{}", + addr, + count, + if count == 1 { "" } else { "s" } + ); } Ok(())