mirror of
https://github.com/htrefil/rkvm.git
synced 2025-01-13 20:01:29 +01:00
Prevent datagram races by never reusing device IDs
This commit is contained in:
parent
c4bad45532
commit
f1c11f0dc1
1 changed files with 10 additions and 7 deletions
|
@ -64,7 +64,8 @@ pub async fn run(
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut monitor = Monitor::new();
|
let mut monitor = Monitor::new();
|
||||||
let mut devices = Slab::<Device>::new();
|
let mut id = 0usize;
|
||||||
|
let mut devices = HashMap::<usize, Device>::new();
|
||||||
let mut clients = Slab::<(Sender<_>, SocketAddr)>::new();
|
let mut clients = Slab::<(Sender<_>, SocketAddr)>::new();
|
||||||
let mut current = 0;
|
let mut current = 0;
|
||||||
let mut previous = 0;
|
let mut previous = 0;
|
||||||
|
@ -94,7 +95,7 @@ pub async fn run(
|
||||||
let init_updates = devices
|
let init_updates = devices
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(id, device)| Update::CreateDevice {
|
.map(|(id, device)| Update::CreateDevice {
|
||||||
id,
|
id: *id,
|
||||||
name: device.name.clone(),
|
name: device.name.clone(),
|
||||||
version: device.version,
|
version: device.version,
|
||||||
vendor: device.vendor,
|
vendor: device.vendor,
|
||||||
|
@ -127,8 +128,9 @@ pub async fn run(
|
||||||
interceptor = monitor.read() => {
|
interceptor = monitor.read() => {
|
||||||
let mut interceptor = interceptor.map_err(Error::Input)?;
|
let mut interceptor = interceptor.map_err(Error::Input)?;
|
||||||
|
|
||||||
|
id = id.checked_add(1).unwrap();
|
||||||
|
|
||||||
let name = interceptor.name().to_owned();
|
let name = interceptor.name().to_owned();
|
||||||
let id = devices.vacant_key();
|
|
||||||
let version = interceptor.version();
|
let version = interceptor.version();
|
||||||
let vendor = interceptor.vendor();
|
let vendor = interceptor.vendor();
|
||||||
let product = interceptor.product();
|
let product = interceptor.product();
|
||||||
|
@ -155,7 +157,7 @@ pub async fn run(
|
||||||
}
|
}
|
||||||
|
|
||||||
let (interceptor_sender, mut interceptor_receiver) = mpsc::channel(32);
|
let (interceptor_sender, mut interceptor_receiver) = mpsc::channel(32);
|
||||||
devices.insert(Device {
|
devices.insert(id, Device {
|
||||||
name,
|
name,
|
||||||
version,
|
version,
|
||||||
vendor,
|
vendor,
|
||||||
|
@ -197,7 +199,7 @@ pub async fn run(
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let device = &devices[id];
|
let device = &devices[&id];
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
id = %id,
|
id = %id,
|
||||||
|
@ -267,8 +269,9 @@ pub async fn run(
|
||||||
// In this scenario, the interceptor task is sending events to the main task,
|
// In this scenario, the interceptor task is sending events to the main task,
|
||||||
// while the main task is simultaneously sending events back to the interceptor.
|
// while the main task is simultaneously sending events back to the interceptor.
|
||||||
// This creates a classic deadlock situation where both tasks are waiting for each other.
|
// This creates a classic deadlock situation where both tasks are waiting for each other.
|
||||||
|
let sender = &devices[&id].sender;
|
||||||
for event in events {
|
for event in events {
|
||||||
match devices[id].sender.try_send(event) {
|
match sender.try_send(event) {
|
||||||
Ok(()) | Err(TrySendError::Closed(_)) => {},
|
Ok(()) | Err(TrySendError::Closed(_)) => {},
|
||||||
Err(TrySendError::Full(_)) => return Err(Error::Overflow),
|
Err(TrySendError::Full(_)) => return Err(Error::Overflow),
|
||||||
}
|
}
|
||||||
|
@ -291,7 +294,7 @@ pub async fn run(
|
||||||
for (_, (sender, _)) in &clients {
|
for (_, (sender, _)) in &clients {
|
||||||
let _ = sender.send(Update::DestroyDevice { id }).await;
|
let _ = sender.send(Update::DestroyDevice { id }).await;
|
||||||
}
|
}
|
||||||
devices.remove(id);
|
devices.remove(&id);
|
||||||
|
|
||||||
tracing::info!(id = %id, "Destroyed device");
|
tracing::info!(id = %id, "Destroyed device");
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue