Clean up client code

This commit is contained in:
Jan Trefil 2024-07-21 17:33:46 +02:00
parent 20fb8efb7e
commit 6d986741ff

View file

@ -21,6 +21,7 @@ use tokio::io::BufWriter;
use tokio::net; use tokio::net;
use tokio::sync::mpsc::{self, Sender}; use tokio::sync::mpsc::{self, Sender};
use tracing::Instrument; use tracing::Instrument;
use tracing::Span;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum Error { pub enum Error {
@ -136,6 +137,7 @@ pub async fn run(
} }
}; };
// TODO: Since this is a datagram, do we really want to wait here?
let _ = sender.send(datagram.events.into_owned()).await; let _ = sender.send(datagram.events.into_owned()).await;
continue; continue;
} }
@ -241,92 +243,103 @@ async fn stream<T: AsyncRead + Send + Unpin + 'static>(
.await .await
.map_err(NetworkError::from)?; .map_err(NetworkError::from)?;
let id = device_info.id; let span = tracing::info_span!("device", id = %device_info.id);
let span = tracing::info_span!("device", id = ?device_info.id); async {
let mut writer = build(&device_info).await.map_err(Error::Input)?;
let mut writer = build(device_info).await.map_err(Error::Input)?; tracing::info!(
name = ?device_info.name,
vendor = %device_info.vendor,
product = %device_info.product,
version = %device_info.version,
"Created new device"
);
let (datagram_sender, mut datagram_receiver) = mpsc::channel(1); let (datagram_sender, mut datagram_receiver) = mpsc::channel(1);
let _ = device_sender
.send(DeviceEvent::Create {
id: device_info.id,
sender: datagram_sender,
})
.await;
let event = DeviceEvent::Create { let (event_sender, mut event_receiver) = mpsc::channel(1);
id, tokio::spawn(
sender: datagram_sender, async move {
}; loop {
let event = tokio::select! {
if device_sender.send(event).await.is_err() { event = Event::decode(&mut read) => event,
return Ok(()); _ = event_sender.closed() => break,
}
let (event_sender, mut event_receiver) = mpsc::channel(1);
tokio::spawn(
async move {
loop {
let event = tokio::select! {
event = Event::decode(&mut read) => event,
_ = event_sender.closed() => break,
};
if event.is_err() | event_sender.send(event).await.is_err() {
break;
}
}
}
.instrument(span.clone()),
);
let result = async move {
loop {
let event = async { event_receiver.recv().await.unwrap() };
tokio::select! {
event = event => {
let event = event.map_err(NetworkError::from)?;
writer.write(&event).await.map_err(Error::Input)?;
}
datagram = datagram_receiver.recv() => {
let datagram = match datagram {
Some(datagram) => datagram,
None => break,
}; };
for event in datagram { if event.is_err() | event_sender.send(event).await.is_err() {
writer.write(&event).await.map_err(Error::Input)?; break;
} }
} }
} }
} .instrument(Span::current()),
);
Ok(()) let result = async {
loop {
let event = async { event_receiver.recv().await.unwrap() };
tokio::select! {
event = event => {
let event = event.map_err(NetworkError::from)?;
writer.write(&event).await.map_err(Error::Input)?;
tracing::trace!("Wrote an event");
}
datagram = datagram_receiver.recv() => {
let datagram = match datagram {
Some(datagram) => datagram,
None => break,
};
let length = datagram.len();
for event in datagram {
writer.write(&event).await.map_err(Error::Input)?;
}
tracing::trace!(
"Wrote {} unreliable event{}",
length,
if length == 1 { "" } else { "s" }
);
}
}
}
Ok(())
}
.await;
let _ = device_sender
.send(DeviceEvent::Destroy { id: device_info.id })
.await;
// Drop explicitly to make the log properly ordered.
drop(writer);
tracing::info!("Destroyed device");
result
} }
.instrument(span) .instrument(span)
.await; .await
let _ = device_sender.send(DeviceEvent::Destroy { id }).await;
result
} }
async fn build(device_info: DeviceInfo) -> Result<Writer, io::Error> { async fn build(device_info: &DeviceInfo) -> Result<Writer, io::Error> {
let writer = Writer::builder()? Writer::builder()?
.name(&device_info.name) .name(&device_info.name)
.vendor(device_info.vendor) .vendor(device_info.vendor)
.product(device_info.product) .product(device_info.product)
.version(device_info.version) .version(device_info.version)
.rel(device_info.rel)? .rel(device_info.rel.iter().copied())?
.abs(device_info.abs)? .abs(device_info.abs.iter().map(|(axis, info)| (*axis, *info)))?
.key(device_info.keys)? .key(device_info.keys.iter().copied())?
.delay(device_info.delay)? .delay(device_info.delay)?
.period(device_info.period)? .period(device_info.period)?
.build() .build()
.await?; .await
tracing::info!(
name = ?device_info.name,
vendor = %device_info.vendor,
product = %device_info.product,
version = %device_info.version,
"Created new device"
);
Ok(writer)
} }