Fix SYN events, high res mouse scrolling

This commit is contained in:
Jan Trefil 2023-04-18 20:30:24 +02:00
parent 1bcc9103b4
commit c9cfa4dd69
12 changed files with 260 additions and 207 deletions

14
Cargo.lock generated
View file

@ -522,9 +522,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4"
[[package]]
name = "inotify"
version = "0.9.6"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
checksum = "abf888f9575c290197b2c948dc9e9ff10bd1a39ad1ea8585f734585fa6b9d3f9"
dependencies = [
"bitflags",
"futures-core",
@ -831,6 +831,7 @@ dependencies = [
"libc",
"pkg-config",
"serde",
"smallvec",
"tokio",
"winapi",
]
@ -985,6 +986,15 @@ dependencies = [
"autocfg",
]
[[package]]
name = "smallvec"
version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0"
dependencies = [
"serde",
]
[[package]]
name = "socket2"
version = "0.4.9"

View file

@ -1,4 +1,4 @@
use rkvm_input::EventWriter;
use rkvm_input::{EventPack, EventWriter};
use rkvm_net::auth::{AuthChallenge, AuthStatus};
use rkvm_net::message::Message;
use rkvm_net::version::Version;
@ -77,10 +77,15 @@ pub async fn run(
let mut writer = EventWriter::new().await.map_err(Error::Input)?;
loop {
let event = Message::decode(&mut stream).await.map_err(Error::Network)?;
log::trace!("Received event");
let events = EventPack::decode(&mut stream)
.await
.map_err(Error::Network)?;
writer.write(&events).await.map_err(Error::Input)?;
writer.write(event).await.map_err(Error::Input)?;
log::trace!("Wrote event");
log::trace!(
"Wrote {} event{}",
events.len(),
if events.len() == 1 { "" } else { "s" }
);
}
}

View file

@ -9,10 +9,11 @@ edition = "2018"
[dependencies]
serde = { version = "1.0.117", features = ["derive"] }
futures = "0.3.8"
smallvec = { version = "1.10.0", features = ["serde"] }
[target.'cfg(target_os = "linux")'.dependencies]
inotify = "0.9.2"
tokio = { version = "1.0.1", features = ["fs", "io-util", "net", "sync", "rt", "time"] }
inotify = "0.10.0"
tokio = { version = "1.0.1", features = ["fs", "io-util", "net", "sync", "rt", "time", "macros"] }
libc = "0.2.77"
[target.'cfg(target_os = "windows")'.dependencies]

View file

@ -5,10 +5,13 @@ pub use button::Button;
pub use key::Key;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
pub type EventPack = SmallVec<[Event; 2]>;
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum Event {
MouseScroll { delta: i32 },
MouseScroll { axis: Axis, delta: i32 },
MouseMove { axis: Axis, delta: i32 },
Key { direction: Direction, kind: KeyKind },
}

View file

@ -12,4 +12,4 @@ pub use linux::{EventManager, EventWriter};
#[cfg(target_os = "windows")]
pub use windows::{EventManager, EventWriter};
pub use event::{Axis, Button, Direction, Event, Key, KeyKind};
pub use event::{Axis, Button, Direction, Event, EventPack, Key, KeyKind};

View file

@ -1,76 +1,16 @@
mod button;
mod key;
use crate::event::{Axis, Button, Direction, Event, Key, KeyKind};
use crate::linux::glue::{self, input_event, timeval};
impl Event {
pub(crate) fn to_raw(&self) -> input_event {
let (type_, code, value) = match *self {
Event::MouseScroll { delta } => (glue::EV_REL as _, glue::REL_WHEEL as _, delta),
Event::MouseMove {
axis: Axis::X,
delta,
} => (glue::EV_REL as _, glue::REL_X as _, delta),
Event::MouseMove {
axis: Axis::Y,
delta,
} => (glue::EV_REL as _, glue::REL_Y as _, delta),
Event::Key {
direction: Direction::Up,
kind,
} => (glue::EV_KEY as _, kind.to_raw(), 0),
Event::Key {
direction: Direction::Down,
kind,
} => (glue::EV_KEY as _, kind.to_raw(), 1),
};
input_event {
type_,
code,
value,
time: timeval {
tv_sec: 0,
tv_usec: 0,
},
}
}
pub(crate) fn from_raw(raw: input_event) -> Option<Self> {
let event = match (raw.type_ as _, raw.code as _, raw.value) {
(glue::EV_REL, glue::REL_WHEEL, value) => Event::MouseScroll { delta: value },
(glue::EV_REL, glue::REL_X, value) => Event::MouseMove {
axis: Axis::X,
delta: value,
},
(glue::EV_REL, glue::REL_Y, value) => Event::MouseMove {
axis: Axis::Y,
delta: value,
},
(glue::EV_KEY, code, 0) => Event::Key {
direction: Direction::Up,
kind: KeyKind::from_raw(code as _)?,
},
(glue::EV_KEY, code, 1) => Event::Key {
direction: Direction::Down,
kind: KeyKind::from_raw(code as _)?,
},
_ => return None,
};
Some(event)
}
}
use crate::event::{Button, Key, KeyKind};
impl KeyKind {
pub(crate) fn from_raw(code: u16) -> Option<KeyKind> {
pub(crate) fn from_raw(code: u32) -> Option<KeyKind> {
Key::from_raw(code)
.map(KeyKind::Key)
.or_else(|| Button::from_raw(code).map(KeyKind::Button))
}
pub(crate) fn to_raw(&self) -> u16 {
pub(crate) fn to_raw(&self) -> u32 {
match self {
KeyKind::Key(key) => key.to_raw(),
KeyKind::Button(button) => button.to_raw(),

View file

@ -1,7 +1,7 @@
use crate::event::Button;
impl Button {
pub(crate) fn to_raw(&self) -> u16 {
pub(crate) fn to_raw(&self) -> u32 {
use Button::*;
match *self {
@ -127,7 +127,7 @@ impl Button {
}
}
pub(crate) fn from_raw(code: u16) -> Option<Self> {
pub(crate) fn from_raw(code: u32) -> Option<Self> {
use Button::*;
// This is generated from linux headers, some patterns are unreachable, and we don't care.

View file

@ -1,7 +1,7 @@
use crate::event::Key;
impl Key {
pub(crate) fn to_raw(&self) -> u16 {
pub(crate) fn to_raw(&self) -> u32 {
use Key::*;
match *self {
@ -496,7 +496,7 @@ impl Key {
}
}
pub(crate) fn from_raw(code: u16) -> Option<Self> {
pub(crate) fn from_raw(code: u32) -> Option<Self> {
use Key::*;
// This is generated from linux headers, some patterns are unreachable, and we don't care.

View file

@ -1,28 +1,25 @@
use crate::event::Event;
use crate::event::{Event, EventPack};
use crate::linux::event_reader::{EventReader, OpenError};
use crate::linux::event_writer::EventWriter;
use futures::StreamExt;
use inotify::{Inotify, WatchMask};
use std::io::{Error, ErrorKind};
use std::io::Error;
use std::path::Path;
use std::time::Duration;
use tokio::fs;
use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
use tokio::sync::oneshot::{self, Receiver};
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time;
const EVENT_PATH: &str = "/dev/input";
pub struct EventManager {
writer: EventWriter,
event_receiver: UnboundedReceiver<Result<Event, Error>>,
watcher_receiver: Receiver<Error>,
event_writer: EventWriter,
event_receiver: Receiver<Result<EventPack, Error>>,
}
impl EventManager {
pub async fn new() -> Result<Self, Error> {
let (event_sender, event_receiver) = mpsc::unbounded_channel();
// HACK: When rkvm is run from the terminal, a race condition happens where the enter key
// release event is swallowed and the key will remain in a "pressed" state until the user manually presses it again.
// This is presumably due to the event being generated while we're in the process of grabbing
@ -33,50 +30,64 @@ impl EventManager {
// directly from the terminal for the time being until a proper fix is made.
time::sleep(Duration::from_millis(500)).await;
let (event_sender, event_receiver) = mpsc::channel(1);
let mut read_dir = fs::read_dir(EVENT_PATH).await?;
while let Some(entry) = read_dir.next_entry().await? {
spawn_reader(&entry.path(), event_sender.clone()).await?;
}
let writer = EventWriter::new().await?;
let event_writer = EventWriter::new().await?;
// Sleep for a while to give userspace time to register our devices.
time::sleep(Duration::from_secs(1)).await;
let (watcher_sender, watcher_receiver) = oneshot::channel();
tokio::spawn(async {
if let Err(err) = handle_notify(event_sender).await {
let _ = watcher_sender.send(err);
tokio::spawn(async move {
let run = async {
let mut inotify = Inotify::init()?;
inotify.add_watch(EVENT_PATH, WatchMask::CREATE)?;
// This buffer size should be OK, since we don't expect a lot of devices
// to be plugged in frequently.
let mut stream = inotify.event_stream([0u8; 512])?;
while let Some(event) = stream.next().await {
let event = event?;
if let Some(name) = event.name {
let path = Path::new(EVENT_PATH).join(&name);
spawn_reader(&path, event_sender.clone()).await?;
}
}
Ok(())
};
tokio::select! {
result = run => {
if let Err(err) = result {
let _ = event_sender.send(Err(err)).await;
}
}
_ = event_sender.closed() => {}
}
});
Ok(EventManager {
writer,
Ok(Self {
event_writer,
event_receiver,
watcher_receiver,
})
}
pub async fn read(&mut self) -> Result<Event, Error> {
if let Ok(err) = self.watcher_receiver.try_recv() {
return Err(err);
pub async fn read(&mut self) -> Result<EventPack, Error> {
self.event_receiver.recv().await.unwrap()
}
self.event_receiver
.recv()
.await
.ok_or_else(|| Error::new(ErrorKind::Other, "All devices closed"))?
}
pub async fn write(&mut self, event: Event) -> Result<(), Error> {
self.writer.write(event).await
pub async fn write(&mut self, events: &[Event]) -> Result<(), Error> {
self.event_writer.write(events).await
}
}
async fn spawn_reader(
path: &Path,
sender: UnboundedSender<Result<Event, Error>>,
) -> Result<(), Error> {
async fn spawn_reader(path: &Path, sender: Sender<Result<EventPack, Error>>) -> Result<(), Error> {
if path.is_dir() {
return Ok(());
}
@ -91,39 +102,17 @@ async fn spawn_reader(
return Ok(());
}
let reader = match EventReader::open(&path).await {
let mut reader = match EventReader::open(&path).await {
Ok(reader) => reader,
Err(OpenError::Io(err)) => return Err(err),
Err(OpenError::NotAppliable) => return Ok(()),
};
tokio::spawn(handle_events(reader, sender));
Ok(())
}
async fn handle_notify(sender: UnboundedSender<Result<Event, Error>>) -> Result<(), Error> {
let mut inotify = Inotify::init()?;
inotify.add_watch(EVENT_PATH, WatchMask::CREATE)?;
// This buffer size should be OK, since we don't expect a lot of devices
// to be plugged in frequently.
let mut stream = inotify.event_stream([0u8; 512])?;
while let Some(event) = stream.next().await {
let event = event?;
if let Some(name) = event.name {
let path = Path::new(EVENT_PATH).join(&name);
spawn_reader(&path, sender.clone()).await?;
}
}
Ok(())
}
async fn handle_events(mut reader: EventReader, sender: UnboundedSender<Result<Event, Error>>) {
tokio::spawn(async move {
let run = async {
loop {
let result = match reader.read().await {
Ok(event) => sender.send(Ok(event)).is_ok(),
Ok(events) => sender.send(Ok(events)).await.is_ok(),
// This happens if the device is disconnected.
// In that case simply terminate the reading task.
Err(ref err) if err.raw_os_error() == Some(libc::ENODEV) => false,
@ -137,4 +126,13 @@ async fn handle_events(mut reader: EventReader, sender: UnboundedSender<Result<E
break;
}
}
};
tokio::select! {
_ = run => {}
_ = sender.closed() => {}
}
});
Ok(())
}

View file

@ -1,6 +1,9 @@
use crate::event::Event;
use crate::event::{Axis, Direction, Event, EventPack};
use crate::linux::device_id;
use crate::linux::glue::{self, libevdev, libevdev_uinput};
use crate::linux::glue::{input_event, timeval};
use crate::KeyKind;
use std::fs::{File, OpenOptions};
use std::io::Error;
use std::mem::MaybeUninit;
@ -8,6 +11,7 @@ use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::AsRawFd;
use std::path::Path;
use tokio::io::unix::AsyncFd;
use tokio::task;
pub(crate) struct EventReader {
file: AsyncFd<File>,
@ -18,7 +22,7 @@ pub(crate) struct EventReader {
impl EventReader {
pub async fn open(path: &Path) -> Result<Self, OpenError> {
let path = path.to_owned();
tokio::task::spawn_blocking(move || Self::open_sync(&path))
task::spawn_blocking(move || Self::open_sync(&path))
.await
.map_err(|err| OpenError::Io(err.into()))?
}
@ -31,6 +35,7 @@ impl EventReader {
.and_then(AsyncFd::new)?;
let mut evdev = MaybeUninit::uninit();
let ret = unsafe { glue::libevdev_new_from_fd(file.as_raw_fd(), evdev.as_mut_ptr()) };
if ret < 0 {
return Err(Error::from_raw_os_error(-ret).into());
@ -49,6 +54,7 @@ impl EventReader {
let vendor = vendor == device_id::VENDOR as _
&& product == device_id::PRODUCT as _
&& version == device_id::VERSION as _;
// "Upon binding to a device or resuming from suspend, a driver must report
// the current switch state. This ensures that the device, kernel, and userspace
// state is in sync."
@ -88,19 +94,79 @@ impl EventReader {
};
if ret < 0 {
unsafe { glue::libevdev_free(evdev) };
unsafe {
glue::libevdev_free(evdev);
}
return Err(Error::from_raw_os_error(-ret).into());
}
let uinput = unsafe { uinput.assume_init() };
Ok(Self {
file,
evdev,
uinput,
uinput: unsafe { uinput.assume_init() },
})
}
pub async fn read(&mut self) -> Result<Event, Error> {
pub async fn read(&mut self) -> Result<EventPack, Error> {
let mut events = EventPack::new();
loop {
let raw = self.read_raw().await?;
let event = match (raw.type_ as _, raw.code as _, raw.value) {
// These should not be propagated, it will result in double scrolling otherwise.
(glue::EV_REL, glue::REL_HWHEEL | glue::REL_WHEEL, _) => continue,
(glue::EV_REL, glue::REL_HWHEEL_HI_RES, value) => Some(Event::MouseScroll {
axis: Axis::X,
delta: value,
}),
(glue::EV_REL, glue::REL_WHEEL_HI_RES, value) => Some(Event::MouseScroll {
axis: Axis::Y,
delta: value,
}),
(glue::EV_REL, glue::REL_X, value) => Some(Event::MouseMove {
axis: Axis::X,
delta: value,
}),
(glue::EV_REL, glue::REL_Y, value) => Some(Event::MouseMove {
axis: Axis::Y,
delta: value,
}),
(glue::EV_KEY, code, 0) => KeyKind::from_raw(code as _).map(|kind| Event::Key {
direction: Direction::Up,
kind,
}),
(glue::EV_KEY, code, 1) => KeyKind::from_raw(code as _).map(|kind| Event::Key {
direction: Direction::Down,
kind,
}),
(glue::EV_SYN, glue::SYN_REPORT, _) => break,
_ => None,
};
if let Some(event) = event {
events.push(event);
continue;
}
self.write_raw(&raw).await?;
}
self.write_raw(&input_event {
type_: glue::EV_SYN as _,
code: glue::SYN_REPORT as _,
value: 0,
time: timeval {
tv_sec: 0,
tv_usec: 0,
},
})
.await?;
Ok(events)
}
async fn read_raw(&mut self) -> Result<input_event, Error> {
loop {
let result = self.file.readable().await?.try_io(|_| {
let mut event = MaybeUninit::uninit();
@ -120,20 +186,19 @@ impl EventReader {
Ok(event)
});
let event = match result {
Ok(Ok(event)) => event,
match result {
Ok(Ok(event)) => return Ok(event),
Ok(Err(err)) => return Err(err),
Err(_) => continue, // This means it would block.
};
if let Some(event) = Event::from_raw(event) {
return Ok(event);
}
}
}
// Not understood, write it back.
async fn write_raw(&mut self, event: &input_event) -> Result<(), Error> {
// TODO: This can block.
let ret = unsafe {
glue::libevdev_uinput_write_event(
self.uinput as *const _,
self.uinput,
event.type_ as _,
event.code as _,
event.value,
@ -141,9 +206,10 @@ impl EventReader {
};
if ret < 0 {
return Err(Error::from_raw_os_error(-ret));
}
return Err(Error::from_raw_os_error(-ret).into());
}
Ok(())
}
}

View file

@ -1,6 +1,7 @@
use crate::event::Event;
use crate::linux::device_id;
use crate::linux::glue::{self, input_event, libevdev, libevdev_uinput};
use crate::linux::glue::{self, libevdev, libevdev_uinput};
use crate::{Axis, Direction};
use std::io::{Error, ErrorKind};
use std::mem::MaybeUninit;
use std::ops::RangeInclusive;
@ -39,35 +40,58 @@ impl EventWriter {
};
if ret < 0 {
unsafe { glue::libevdev_free(evdev) };
unsafe {
glue::libevdev_free(evdev);
}
return Err(Error::from_raw_os_error(-ret));
}
let uinput = unsafe { uinput.assume_init() };
Ok(Self { evdev, uinput })
Ok(Self {
evdev,
uinput: unsafe { uinput.assume_init() },
})
}
pub async fn write(&mut self, event: Event) -> Result<(), Error> {
self.write_raw(event.to_raw())
}
pub async fn write(&mut self, events: &[Event]) -> Result<(), Error> {
let events = events
.iter()
.map(|event| match event {
Event::MouseScroll {
axis: Axis::X,
delta,
} => (glue::EV_REL, glue::REL_HWHEEL_HI_RES as _, *delta),
Event::MouseScroll {
axis: Axis::Y,
delta,
} => (glue::EV_REL, glue::REL_WHEEL_HI_RES as _, *delta),
Event::MouseMove {
axis: Axis::X,
delta,
} => (glue::EV_REL, glue::REL_X as _, *delta),
Event::MouseMove {
axis: Axis::Y,
delta,
} => (glue::EV_REL, glue::REL_Y as _, *delta),
Event::Key {
direction: Direction::Up,
kind,
} => (glue::EV_KEY, kind.to_raw(), 0),
Event::Key {
direction: Direction::Down,
kind,
} => (glue::EV_KEY, kind.to_raw(), 1),
})
.chain(std::iter::once((glue::EV_SYN, glue::SYN_REPORT as _, 0)));
for (r#type, code, value) in events {
// TODO: Not exactly async.
pub(crate) fn write_raw(&mut self, event: input_event) -> Result<(), Error> {
// As far as tokio is concerned, the FD never becomes ready for writing, so just write it normally.
// If an error happens, it will be propagated to caller and the FD is opened in nonblocking mode anyway,
// so it shouldn't be an issue.
let events = [
(event.type_, event.code, event.value),
(glue::EV_SYN as _, glue::SYN_REPORT as _, 0), // Include EV_SYN.
];
for (r#type, code, value) in events.iter().cloned() {
let ret = unsafe {
glue::libevdev_uinput_write_event(
self.uinput as *const _,
r#type as _,
code as _,
value,
)
glue::libevdev_uinput_write_event(self.uinput as *const _, r#type, code, value)
};
if ret < 0 {

View file

@ -1,4 +1,4 @@
use rkvm_input::{Direction, Event, EventManager, Key, KeyKind};
use rkvm_input::{Direction, Event, EventManager, EventPack, Key, KeyKind};
use rkvm_net::auth::{AuthChallenge, AuthResponse, AuthStatus};
use rkvm_net::message::Message;
use rkvm_net::version::Version;
@ -67,18 +67,19 @@ pub async fn run(
});
}
result = manager.read() => {
let event = result.map_err(Error::Input)?;
let events = result.map_err(Error::Input)?;
for event in &events {
if let Event::Key { direction: Direction::Down, kind: KeyKind::Key(key) } = event {
if key == switch_key {
if *key == switch_key {
current = (current + 1) % (clients.len() + 1);
log::info!("Switching to client {}", current);
continue;
}
}
}
if current == 0 || clients[current - 1].send(event).await.is_err() {
if current == 0 || clients[current - 1].send(events.clone()).await.is_err() {
current = 0;
manager.write(event).await.map_err(Error::Input)?;
manager.write(&events).await.map_err(Error::Input)?;
}
}
}
@ -98,7 +99,7 @@ enum ClientError {
}
async fn client(
mut receiver: Receiver<Event>,
mut receiver: Receiver<EventPack>,
stream: TlsStream<TcpStream>,
addr: SocketAddr,
password: &str,
@ -136,11 +137,16 @@ async fn client(
log::info!("{}: Passed auth check", addr);
while let Some(event) = receiver.recv().await {
event.encode(&mut stream).await?;
while let Some(events) = receiver.recv().await {
events.encode(&mut stream).await?;
stream.flush().await?;
log::trace!("{}: Sent event", addr);
log::trace!(
"{}: Sent {} event{}",
addr,
events.len(),
if events.len() == 1 { "" } else { "s" }
);
}
Ok(())