fixes multiple calls to recv for closed channels

This commit is contained in:
Keagan McClelland
2022-01-24 12:12:40 -07:00
parent 21fed0c0be
commit ca724b2340
2 changed files with 17 additions and 10 deletions

View File

@@ -1,5 +1,6 @@
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use super::{LockInfo, Request};
@@ -73,20 +74,28 @@ impl ActionMux {
let mut actions = Vec::new();
// find all serviceable lock releases
for mut r in std::mem::take(&mut self.unlock_receivers) {
if let Ok(lock_info) = r.try_recv() {
match r.try_recv() {
Ok(lock_info) => {
actions.push(Action::HandleRelease(lock_info));
} else {
}
Err(TryRecvError::Empty) => {
self.unlock_receivers.push(r);
}
Err(TryRecvError::Closed) => (),
}
}
// find all serviceable lock cancellations
for mut r in std::mem::take(&mut self.cancellation_receivers) {
if let Ok(lock_info) = r.try_recv() {
match r.try_recv() {
Ok(lock_info) => {
actions.push(Action::HandleCancel(lock_info));
} else {
}
Err(TryRecvError::Empty) => {
self.cancellation_receivers.push(r);
}
Err(TryRecvError::Closed) => (),
}
}
// finally add the action that started it all

View File

@@ -56,8 +56,6 @@ impl Locker {
#[cfg(feature = "tracing")]
debug!("New lock release: {}", &lock_info);
println!("Release Called {}", &lock_info);
let new_unlock_receivers = lock_server.ret(&lock_info);
action_mux.push_unlock_receivers(new_unlock_receivers);
}