Switch to tracing

This commit is contained in:
Jan Trefil 2023-09-09 11:07:43 +02:00
parent 032ad6fe22
commit 59d5b6b6d7
14 changed files with 303 additions and 147 deletions

139
Cargo.lock generated
View file

@ -582,6 +582,15 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata",
]
[[package]]
name = "memchr"
version = "2.5.0"
@ -616,6 +625,16 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi",
]
[[package]]
name = "num_cpus"
version = "1.15.0"
@ -632,6 +651,12 @@ version = "1.17.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b7e5500299e16ebb147ae15a00a942af264cf3688f47923b8fc2cd5858f23ad3"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "peeking_take_while"
version = "0.1.2"
@ -737,9 +762,24 @@ checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax 0.7.1",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.7.1"
@ -776,7 +816,6 @@ version = "0.4.1"
dependencies = [
"clap",
"env_logger",
"log",
"rkvm-input",
"rkvm-net",
"rustls-pemfile",
@ -785,6 +824,8 @@ dependencies = [
"tokio",
"tokio-rustls",
"toml",
"tracing",
"tracing-subscriber",
]
[[package]]
@ -796,12 +837,12 @@ dependencies = [
"futures",
"inotify",
"libc",
"log",
"pkg-config",
"serde",
"smallvec",
"thiserror",
"tokio",
"tracing",
]
[[package]]
@ -811,7 +852,6 @@ dependencies = [
"async-trait",
"bincode",
"hmac",
"log",
"rand",
"rkvm-input",
"serde",
@ -819,6 +859,7 @@ dependencies = [
"socket2 0.5.3",
"thiserror",
"tokio",
"tracing",
]
[[package]]
@ -827,7 +868,6 @@ version = "0.4.1"
dependencies = [
"clap",
"env_logger",
"log",
"rand",
"rkvm-input",
"rkvm-net",
@ -838,6 +878,8 @@ dependencies = [
"tokio",
"tokio-rustls",
"toml",
"tracing",
"tracing-subscriber",
]
[[package]]
@ -932,6 +974,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "900fba806f70c630b0a382d0d825e17a0f19fcd059a2ade1ff237bcddf446b31"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.1.0"
@ -1056,6 +1107,16 @@ dependencies = [
"syn",
]
[[package]]
name = "thread_local"
version = "1.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152"
dependencies = [
"cfg-if",
"once_cell",
]
[[package]]
name = "tokio"
version = "1.28.1"
@ -1104,6 +1165,68 @@ dependencies = [
"serde",
]
[[package]]
name = "tracing"
version = "0.1.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8"
dependencies = [
"cfg-if",
"pin-project-lite",
"tracing-attributes",
"tracing-core",
]
[[package]]
name = "tracing-attributes"
version = "0.1.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "tracing-core"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922"
dependencies = [
"lazy_static",
"log",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"sharded-slab",
"smallvec",
"thread_local",
"tracing",
"tracing-core",
"tracing-log",
]
[[package]]
name = "typenum"
version = "1.16.0"
@ -1128,6 +1251,12 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a"
[[package]]
name = "valuable"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "version_check"
version = "0.9.4"

View file

@ -13,12 +13,13 @@ rkvm-input = { path = "../rkvm-input" }
rkvm-net = { path = "../rkvm-net" }
serde = { version = "1.0.117", features = ["derive"] }
toml = "0.5.7"
log = "0.4.11"
env_logger = "0.8.1"
clap = { version = "4.2.2", features = ["derive"] }
thiserror = "1.0.40"
tokio-rustls = "0.24.0"
rustls-pemfile = "1.0.2"
tracing = "0.1.37"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
[package.metadata.rpm]
package = "rkvm-client"

View file

@ -42,14 +42,14 @@ pub async fn run(
socket::configure(&stream).map_err(Error::Network)?;
log::info!("Connected to server");
tracing::info!("Connected to server");
let stream = connector
.connect(hostname.clone(), stream)
.await
.map_err(Error::Network)?;
log::info!("TLS connected");
tracing::info!("TLS connected");
let mut stream = BufStream::with_capacity(1024, 1024, stream);
@ -80,7 +80,7 @@ pub async fn run(
AuthStatus::Failed => return Err(Error::Auth),
}
log::info!("Authenticated successfully");
tracing::info!("Authenticated successfully");
let mut writers = HashMap::new();
@ -121,7 +121,7 @@ pub async fn run(
entry.or_insert(writer);
log::info!(
tracing::info!(
"Created new device {} (name {:?}, vendor {}, product {}, version {})",
id,
name,
@ -138,7 +138,7 @@ pub async fn run(
)));
}
log::info!("Destroyed device {}", id);
tracing::info!("Destroyed device {}", id);
}
Update::Event { id, event } => {
let writer = writers.get_mut(&id).ok_or_else(|| {
@ -150,7 +150,7 @@ pub async fn run(
writer.write(&event).await.map_err(Error::Input)?;
log::trace!("Wrote an event to device {}", id);
tracing::trace!("Wrote an event to device {}", id);
}
}
}

View file

@ -4,10 +4,13 @@ mod tls;
use clap::Parser;
use config::Config;
use log::LevelFilter;
use std::path::PathBuf;
use std::process::ExitCode;
use tokio::{fs, signal};
use tracing::subscriber;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::*;
#[derive(Parser)]
#[structopt(name = "rkvm-client", about = "The rkvm client application")]
@ -18,17 +21,21 @@ struct Args {
#[tokio::main]
async fn main() -> ExitCode {
env_logger::builder()
.format_timestamp(None)
.filter(None, LevelFilter::Info)
.parse_default_env()
.init();
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
let registry = tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().without_time());
subscriber::set_global_default(registry).unwrap();
let args = Args::parse();
let config = match fs::read_to_string(&args.config_path).await {
Ok(config) => config,
Err(err) => {
log::error!("Error reading config: {}", err);
tracing::error!("Error reading config: {}", err);
return ExitCode::FAILURE;
}
};
@ -36,7 +43,7 @@ async fn main() -> ExitCode {
let config = match toml::from_str::<Config>(&config) {
Ok(config) => config,
Err(err) => {
log::error!("Error parsing config: {}", err);
tracing::error!("Error parsing config: {}", err);
return ExitCode::FAILURE;
}
};
@ -44,7 +51,7 @@ async fn main() -> ExitCode {
let connector = match tls::configure(&config.certificate).await {
Ok(connector) => connector,
Err(err) => {
log::error!("Error configuring TLS: {}", err);
tracing::error!("Error configuring TLS: {}", err);
return ExitCode::FAILURE;
}
};
@ -52,18 +59,18 @@ async fn main() -> ExitCode {
tokio::select! {
result = client::run(&config.server.hostname, config.server.port, connector, &config.password) => {
if let Err(err) = result {
log::error!("Error: {}", err);
tracing::error!("Error: {}", err);
return ExitCode::FAILURE;
}
}
// This is needed to properly clean libevdev stuff up.
result = signal::ctrl_c() => {
if let Err(err) = result {
log::error!("Error setting up signal handler: {}", err);
tracing::error!("Error setting up signal handler: {}", err);
return ExitCode::FAILURE;
}
log::info!("Exiting on signal");
tracing::info!("Exiting on signal");
}
}

View file

@ -13,8 +13,8 @@ smallvec = { version = "1.10.0", features = ["serde"] }
inotify = "0.10.0"
tokio = { version = "1.0.1", features = ["fs", "io-util", "net", "sync", "rt", "time", "macros"] }
libc = "0.2.77"
log = "0.4.11"
thiserror = "1.0.40"
tracing = "0.1.37"
[build-dependencies]
bindgen = "0.65.1"

View file

@ -15,9 +15,9 @@ use crate::writer::Writer;
use std::collections::VecDeque;
use std::ffi::CStr;
use std::fs;
use std::io::{Error, ErrorKind};
use std::mem::MaybeUninit;
use std::os::unix::fs::MetadataExt;
use std::path::Path;
use thiserror::Error;
@ -34,9 +34,10 @@ pub struct Interceptor {
}
impl Interceptor {
#[tracing::instrument(fields(path = ?self.writer.path()), skip(self))]
pub async fn read(&mut self) -> Result<Event, Error> {
if let Some((r#type, code, value)) = self.writing {
log::trace!("Resuming interrupted write");
tracing::trace!("Resuming interrupted write");
self.writer.write_raw(r#type, code, value).await?;
self.writing = None;
@ -72,7 +73,7 @@ impl Interceptor {
Some(Event::Sync(SyncEvent::All))
}
glue::SYN_DROPPED => {
log::warn!(
tracing::warn!(
"Dropped {} event{}",
self.events.len(),
if self.events.len() == 1 { "" } else { "s" }
@ -175,15 +176,13 @@ impl Interceptor {
}
}
#[tracing::instrument(skip(registry))]
pub(crate) async fn open(path: &Path, registry: &Registry) -> Result<Self, OpenError> {
let evdev = Evdev::open(path).await?;
let metadata = evdev.file().unwrap().get_ref().metadata()?;
let reader_handle = registry
.register(Entry {
device: metadata.dev(),
inode: metadata.ino(),
})
.register(Entry::from_metadata(&metadata))
.ok_or(OpenError::NotAppliable)?;
// "Upon binding to a device or resuming from suspend, a driver must report
@ -209,7 +208,7 @@ impl Interceptor {
let max = abs_info.maximum;
if (min != 0 || max != 0) && max < min {
log::warn!(
tracing::warn!(
"Detected nonsense min ({}) and max ({}) values for absolute axis {}, disabling it",
min,
max,
@ -237,10 +236,13 @@ impl Interceptor {
}
let writer = Writer::from_evdev(&evdev).await?;
let entry = writer.entry()?;
let path = writer
.path()
.ok_or_else(|| Error::new(ErrorKind::Other, "No syspath for writer"))?;
let metadata = fs::metadata(path)?;
let writer_handle = registry
.register(entry)
.register(Entry::from_metadata(&metadata))
.ok_or_else(|| Error::new(ErrorKind::Other, "Writer already registered"))?;
Ok(Self {

View file

@ -7,7 +7,7 @@ use std::ffi::OsStr;
use std::io::{Error, ErrorKind};
use std::path::Path;
use tokio::fs;
use tokio::sync::mpsc::{self, Receiver};
use tokio::sync::mpsc::{self, Receiver, Sender};
const EVENT_PATH: &str = "/dev/input";
@ -18,69 +18,7 @@ pub struct Monitor {
impl Monitor {
pub fn new() -> Self {
let (sender, receiver) = mpsc::channel(1);
tokio::spawn(async move {
let run = async {
let registry = Registry::new();
let mut read_dir = fs::read_dir(EVENT_PATH).await?;
let mut inotify = Inotify::init()?;
inotify.add_watch(EVENT_PATH, WatchMask::CREATE)?;
// This buffer size should be OK, since we don't expect a lot of devices
// to be plugged in frequently.
let mut stream = inotify.event_stream([0; 512])?;
loop {
let path = match read_dir.next_entry().await? {
Some(entry) => entry.path(),
None => match stream.next().await {
Some(event) => {
let event = event?;
let name = match event.name {
Some(name) => name,
None => continue,
};
Path::new(EVENT_PATH).join(&name)
}
None => break,
},
};
if !path
.file_name()
.and_then(OsStr::to_str)
.map_or(false, |name| name.starts_with("event"))
{
continue;
}
let interceptor = match Interceptor::open(&path, &registry).await {
Ok(interceptor) => interceptor,
Err(OpenError::Io(err)) => return Err(err),
Err(OpenError::NotAppliable) => continue,
};
if sender.send(Ok(interceptor)).await.is_err() {
return Ok(());
}
}
Ok(())
};
tokio::select! {
result = run => match result {
Ok(_) => {},
Err(err) => {
let _ = sender.send(Err(err)).await;
}
},
_ = sender.closed() => {}
}
});
tokio::spawn(monitor(sender));
Self { receiver }
}
@ -92,3 +30,67 @@ impl Monitor {
.ok_or_else(|| Error::new(ErrorKind::BrokenPipe, "Monitor task exited"))?
}
}
async fn monitor(sender: Sender<Result<Interceptor, Error>>) {
let run = async {
let registry = Registry::new();
let mut read_dir = fs::read_dir(EVENT_PATH).await?;
let mut inotify = Inotify::init()?;
inotify.add_watch(EVENT_PATH, WatchMask::CREATE)?;
// This buffer size should be OK, since we don't expect a lot of devices
// to be plugged in frequently.
let mut stream = inotify.event_stream([0; 512])?;
loop {
let path = match read_dir.next_entry().await? {
Some(entry) => entry.path(),
None => match stream.next().await {
Some(event) => {
let event = event?;
let name = match event.name {
Some(name) => name,
None => continue,
};
Path::new(EVENT_PATH).join(&name)
}
None => break,
},
};
if !path
.file_name()
.and_then(OsStr::to_str)
.map_or(false, |name| name.starts_with("event"))
{
tracing::debug!("Skipping non event file {:?}", path);
continue;
}
let interceptor = match Interceptor::open(&path, &registry).await {
Ok(interceptor) => interceptor,
Err(OpenError::Io(err)) => return Err(err),
Err(OpenError::NotAppliable) => continue,
};
if sender.send(Ok(interceptor)).await.is_err() {
return Ok(());
}
}
Ok(())
};
tokio::select! {
result = run => match result {
Ok(_) => {},
Err(err) => {
let _ = sender.send(Err(err)).await;
}
},
_ = sender.closed() => {}
}
}

View file

@ -1,4 +1,6 @@
use std::collections::HashSet;
use std::fs::Metadata;
use std::os::unix::fs::MetadataExt;
use std::sync::{Arc, Mutex};
#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)]
@ -7,6 +9,15 @@ pub struct Entry {
pub inode: u64,
}
impl Entry {
pub fn from_metadata(metadata: &Metadata) -> Self {
Self {
device: metadata.dev(),
inode: metadata.ino(),
}
}
}
#[derive(Clone)]
pub struct Registry {
entries: Arc<Mutex<HashSet<Entry>>>,

View file

@ -4,16 +4,14 @@ use crate::evdev::Evdev;
use crate::event::Event;
use crate::glue::{self, input_absinfo};
use crate::key::{Key, KeyEvent};
use crate::registry::Entry;
use crate::rel::{RelAxis, RelEvent};
use crate::uinput::Uinput;
use std::ffi::{CStr, OsStr};
use std::io::{Error, ErrorKind};
use std::io::Error;
use std::os::unix::ffi::OsStrExt;
use std::os::unix::fs::MetadataExt;
use std::path::Path;
use std::{fs, ptr};
use std::ptr;
pub struct Writer {
uinput: Uinput,
@ -46,22 +44,17 @@ impl Writer {
Ok(())
}
pub(crate) fn entry(&self) -> Result<Entry, Error> {
pub fn path(&self) -> Option<&Path> {
let path = unsafe { glue::libevdev_uinput_get_devnode(self.uinput.as_ptr()) };
if path.is_null() {
return Err(Error::new(ErrorKind::Other, "No syspath for device"));
return None;
}
let path = unsafe { CStr::from_ptr(path) };
let path = OsStr::from_bytes(path.to_bytes());
let path = Path::new(path);
let metadata = fs::metadata(path)?;
Ok(Entry {
device: metadata.dev(),
inode: metadata.ino(),
})
Some(path)
}
pub(crate) async fn from_evdev(evdev: &Evdev) -> Result<Self, Error> {

View file

@ -16,5 +16,5 @@ thiserror = "1.0.40"
hmac = "0.12.1"
sha2 = "0.10.6"
rand = "0.8.5"
log = "0.4.11"
socket2 = { version = "0.5.3", features = ["all"] }
tracing = "0.1.37"

View file

@ -23,7 +23,7 @@ impl<T: DeserializeOwned + Serialize + Sync> Message for T {
.deserialize(&data)
.map_err(|err| Error::new(ErrorKind::InvalidData, err))?;
log::trace!("Read {} bytes", 2 + length);
tracing::trace!("Read {} bytes", 2 + length);
Ok(data)
}
@ -41,7 +41,7 @@ impl<T: DeserializeOwned + Serialize + Sync> Message for T {
stream.write_u16(length).await?;
stream.write_all(&data).await?;
log::trace!("Wrote {} bytes", 2 + data.len());
tracing::trace!("Wrote {} bytes", 2 + data.len());
Ok(())
}

View file

@ -9,11 +9,8 @@ edition = "2021"
[dependencies]
tokio = { version = "1.0.1", features = ["macros", "time", "fs", "net", "signal", "rt-multi-thread", "sync"] }
rkvm-input = { path = "../rkvm-input" }
rkvm-net = { path = "../rkvm-net" }
serde = { version = "1.0.117", features = ["derive"] }
toml = "0.5.7"
log = "0.4.11"
env_logger = "0.8.1"
clap = { version = "4.2.2", features = ["derive"] }
tokio-rustls = "0.24.0"
@ -21,6 +18,10 @@ rustls-pemfile = "1.0.2"
thiserror = "1.0.40"
slab = "0.4.8"
rand = "0.8.5"
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
tracing = "0.1.37"
rkvm-net = { path = "../rkvm-net" }
rkvm-input = { path = "../rkvm-input" }
[package.metadata.rpm]
package = "rkvm-server"

View file

@ -4,12 +4,15 @@ mod tls;
use clap::Parser;
use config::Config;
use log::LevelFilter;
use std::future;
use std::path::PathBuf;
use std::process::ExitCode;
use std::time::Duration;
use tokio::{fs, signal, time};
use tracing::subscriber;
use tracing_subscriber::filter::{EnvFilter, LevelFilter};
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::*;
#[derive(Parser)]
#[structopt(name = "rkvm-server", about = "The rkvm server application")]
@ -22,17 +25,21 @@ struct Args {
#[tokio::main]
async fn main() -> ExitCode {
env_logger::builder()
.format_timestamp(None)
.filter(None, LevelFilter::Info)
.parse_default_env()
.init();
let filter = EnvFilter::builder()
.with_default_directive(LevelFilter::INFO.into())
.from_env_lossy();
let registry = tracing_subscriber::registry()
.with(filter)
.with(fmt::layer().without_time());
subscriber::set_global_default(registry).unwrap();
let args = Args::parse();
let config = match fs::read_to_string(&args.config_path).await {
Ok(config) => config,
Err(err) => {
log::error!("Error reading config: {}", err);
tracing::error!("Error reading config: {}", err);
return ExitCode::FAILURE;
}
};
@ -40,7 +47,7 @@ async fn main() -> ExitCode {
let config = match toml::from_str::<Config>(&config) {
Ok(config) => config,
Err(err) => {
log::error!("Error parsing config: {}", err);
tracing::error!("Error parsing config: {}", err);
return ExitCode::FAILURE;
}
};
@ -48,7 +55,7 @@ async fn main() -> ExitCode {
let acceptor = match tls::configure(&config.certificate, &config.key).await {
Ok(acceptor) => acceptor,
Err(err) => {
log::error!("Error configuring TLS: {}", err);
tracing::error!("Error configuring TLS: {}", err);
return ExitCode::FAILURE;
}
};
@ -65,21 +72,21 @@ async fn main() -> ExitCode {
tokio::select! {
result = server::run(config.listen, acceptor, &config.password, &switch_keys) => {
if let Err(err) = result {
log::error!("Error: {}", err);
tracing::error!("Error: {}", err);
return ExitCode::FAILURE;
}
}
// This is needed to properly clean libevdev stuff up.
result = signal::ctrl_c() => {
if let Err(err) = result {
log::error!("Error setting up signal handler: {}", err);
tracing::error!("Error setting up signal handler: {}", err);
return ExitCode::FAILURE;
}
log::info!("Exiting on signal");
tracing::info!("Exiting on signal");
}
_ = shutdown => {
log::info!("Shutting down as requested");
tracing::info!("Shutting down as requested");
}
}

View file

@ -21,6 +21,7 @@ use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{self, Receiver, Sender};
use tokio::time;
use tokio_rustls::TlsAcceptor;
use tracing::Instrument;
#[derive(Error, Debug)]
pub enum Error {
@ -39,7 +40,7 @@ pub async fn run(
switch_keys: &HashSet<Key>,
) -> Result<(), Error> {
let listener = TcpListener::bind(&listen).await.map_err(Error::Network)?;
log::info!("Listening on {}", listen);
tracing::info!("Listening on {}", listen);
let mut monitor = Monitor::new();
let mut devices = Slab::<Device>::new();
@ -83,14 +84,18 @@ pub async fn run(
let (sender, receiver) = mpsc::channel(1);
clients.insert(sender);
tokio::spawn(async move {
log::info!("{}: Connected", addr);
let span = tracing::info_span!("connection", addr = %addr);
tokio::spawn(
async move {
tracing::info!("Connected");
match client(init_updates, receiver, stream, addr, acceptor, &password).await {
Ok(()) => log::info!("{}: Disconnected", addr),
Err(err) => log::error!("{}: Disconnected: {}", addr, err),
match client(init_updates, receiver, stream, acceptor, &password).await {
Ok(()) => tracing::info!("Disconnected"),
Err(err) => tracing::error!("Disconnected: {}", err),
}
}
});
.instrument(span),
);
}
result = monitor.read() => {
let mut interceptor = result.map_err(Error::Input)?;
@ -154,7 +159,7 @@ pub async fn run(
}
}
log::trace!("Wrote an event to device {}", id);
tracing::trace!("Wrote an event to device {}", id);
}
}
}
@ -162,7 +167,7 @@ pub async fn run(
let device = &devices[id];
log::info!(
tracing::info!(
"Registered new device {} (name {:?}, vendor {}, product {}, version {})",
id,
device.name,
@ -202,7 +207,7 @@ pub async fn run(
previous = idx;
changed = true;
log::debug!("Switched to client {}", current);
tracing::debug!("Switched to client {}", current);
} else if changed {
idx = previous;
@ -248,7 +253,7 @@ pub async fn run(
}
devices.remove(id);
log::info!("Destroyed device {}", id);
tracing::info!("Destroyed device {}", id);
}
Err(err) => return Err(Error::Input(err)),
}
@ -283,7 +288,6 @@ async fn client(
mut init_updates: VecDeque<Update>,
mut receiver: Receiver<Update>,
stream: TcpStream,
addr: SocketAddr,
acceptor: TlsAcceptor,
password: &str,
) -> Result<(), ClientError> {
@ -291,7 +295,7 @@ async fn client(
socket::configure(&stream)?;
let stream = acceptor.accept(stream).await?;
log::info!("{}: TLS connected", addr);
tracing::info!("TLS connected");
let mut stream = BufStream::with_capacity(1024, 1024, stream);
@ -324,7 +328,7 @@ async fn client(
return Err(ClientError::Auth);
}
log::info!("{}: Authenticated successfully", addr);
tracing::info!("Authenticated successfully");
Ok(stream)
};
@ -366,9 +370,8 @@ async fn client(
.await
.map_err(|_| io::Error::new(ErrorKind::TimedOut, "Update writing took too long"))??;
log::trace!(
"{}: Wrote {} update{}",
addr,
tracing::trace!(
"Wrote {} update{}",
count,
if count == 1 { "" } else { "s" }
);