From 6d986741ffc338381123df9a1dd618ba6c87fa22 Mon Sep 17 00:00:00 2001 From: Jan Trefil <8711792+htrefil@users.noreply.github.com> Date: Sun, 21 Jul 2024 17:33:46 +0200 Subject: [PATCH] Clean up client code --- rkvm-client/src/client.rs | 149 +++++++++++++++++++++----------------- 1 file changed, 81 insertions(+), 68 deletions(-) diff --git a/rkvm-client/src/client.rs b/rkvm-client/src/client.rs index 4616541..f24d6c6 100644 --- a/rkvm-client/src/client.rs +++ b/rkvm-client/src/client.rs @@ -21,6 +21,7 @@ use tokio::io::BufWriter; use tokio::net; use tokio::sync::mpsc::{self, Sender}; use tracing::Instrument; +use tracing::Span; #[derive(Error, Debug)] pub enum Error { @@ -136,6 +137,7 @@ pub async fn run( } }; + // TODO: Since this is a datagram, do we really want to wait here? let _ = sender.send(datagram.events.into_owned()).await; continue; } @@ -241,92 +243,103 @@ async fn stream( .await .map_err(NetworkError::from)?; - let id = device_info.id; - let span = tracing::info_span!("device", id = ?device_info.id); + let span = tracing::info_span!("device", id = %device_info.id); + async { + let mut writer = build(&device_info).await.map_err(Error::Input)?; - let mut writer = build(device_info).await.map_err(Error::Input)?; + tracing::info!( + name = ?device_info.name, + vendor = %device_info.vendor, + product = %device_info.product, + version = %device_info.version, + "Created new device" + ); - let (datagram_sender, mut datagram_receiver) = mpsc::channel(1); + let (datagram_sender, mut datagram_receiver) = mpsc::channel(1); + let _ = device_sender + .send(DeviceEvent::Create { + id: device_info.id, + sender: datagram_sender, + }) + .await; - let event = DeviceEvent::Create { - id, - sender: datagram_sender, - }; - - if device_sender.send(event).await.is_err() { - return Ok(()); - } - - let (event_sender, mut event_receiver) = mpsc::channel(1); - tokio::spawn( - async move { - loop { - let event = tokio::select! { - event = Event::decode(&mut read) => event, - _ = event_sender.closed() => break, - }; - - if event.is_err() | event_sender.send(event).await.is_err() { - break; - } - } - } - .instrument(span.clone()), - ); - - let result = async move { - loop { - let event = async { event_receiver.recv().await.unwrap() }; - - tokio::select! { - event = event => { - let event = event.map_err(NetworkError::from)?; - writer.write(&event).await.map_err(Error::Input)?; - } - datagram = datagram_receiver.recv() => { - let datagram = match datagram { - Some(datagram) => datagram, - None => break, + let (event_sender, mut event_receiver) = mpsc::channel(1); + tokio::spawn( + async move { + loop { + let event = tokio::select! { + event = Event::decode(&mut read) => event, + _ = event_sender.closed() => break, }; - for event in datagram { - writer.write(&event).await.map_err(Error::Input)?; + if event.is_err() | event_sender.send(event).await.is_err() { + break; } } } - } + .instrument(Span::current()), + ); - Ok(()) + let result = async { + loop { + let event = async { event_receiver.recv().await.unwrap() }; + + tokio::select! { + event = event => { + let event = event.map_err(NetworkError::from)?; + writer.write(&event).await.map_err(Error::Input)?; + + tracing::trace!("Wrote an event"); + } + datagram = datagram_receiver.recv() => { + let datagram = match datagram { + Some(datagram) => datagram, + None => break, + }; + + let length = datagram.len(); + for event in datagram { + writer.write(&event).await.map_err(Error::Input)?; + } + + tracing::trace!( + "Wrote {} unreliable event{}", + length, + if length == 1 { "" } else { "s" } + ); + } + } + } + + Ok(()) + } + .await; + + let _ = device_sender + .send(DeviceEvent::Destroy { id: device_info.id }) + .await; + + // Drop explicitly to make the log properly ordered. + drop(writer); + tracing::info!("Destroyed device"); + + result } .instrument(span) - .await; - - let _ = device_sender.send(DeviceEvent::Destroy { id }).await; - - result + .await } -async fn build(device_info: DeviceInfo) -> Result { - let writer = Writer::builder()? +async fn build(device_info: &DeviceInfo) -> Result { + Writer::builder()? .name(&device_info.name) .vendor(device_info.vendor) .product(device_info.product) .version(device_info.version) - .rel(device_info.rel)? - .abs(device_info.abs)? - .key(device_info.keys)? + .rel(device_info.rel.iter().copied())? + .abs(device_info.abs.iter().map(|(axis, info)| (*axis, *info)))? + .key(device_info.keys.iter().copied())? .delay(device_info.delay)? .period(device_info.period)? .build() - .await?; - - tracing::info!( - name = ?device_info.name, - vendor = %device_info.vendor, - product = %device_info.product, - version = %device_info.version, - "Created new device" - ); - - Ok(writer) + .await }