fixes multiple calls to recv for closed channels

This commit is contained in:
Keagan McClelland
2022-01-24 12:12:40 -07:00
committed by Aiden McClelland
parent 21fed0c0be
commit 259d186263
2 changed files with 17 additions and 10 deletions

View File

@@ -1,5 +1,6 @@
use tokio::sync::mpsc::{self, UnboundedReceiver}; use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::sync::oneshot::error::TryRecvError;
use super::{LockInfo, Request}; use super::{LockInfo, Request};
@@ -73,19 +74,27 @@ impl ActionMux {
let mut actions = Vec::new(); let mut actions = Vec::new();
// find all serviceable lock releases // find all serviceable lock releases
for mut r in std::mem::take(&mut self.unlock_receivers) { for mut r in std::mem::take(&mut self.unlock_receivers) {
if let Ok(lock_info) = r.try_recv() { match r.try_recv() {
actions.push(Action::HandleRelease(lock_info)); Ok(lock_info) => {
} else { actions.push(Action::HandleRelease(lock_info));
self.unlock_receivers.push(r); }
Err(TryRecvError::Empty) => {
self.unlock_receivers.push(r);
}
Err(TryRecvError::Closed) => (),
} }
} }
// find all serviceable lock cancellations // find all serviceable lock cancellations
for mut r in std::mem::take(&mut self.cancellation_receivers) { for mut r in std::mem::take(&mut self.cancellation_receivers) {
if let Ok(lock_info) = r.try_recv() { match r.try_recv() {
actions.push(Action::HandleCancel(lock_info)); Ok(lock_info) => {
} else { actions.push(Action::HandleCancel(lock_info));
self.cancellation_receivers.push(r); }
Err(TryRecvError::Empty) => {
self.cancellation_receivers.push(r);
}
Err(TryRecvError::Closed) => (),
} }
} }

View File

@@ -56,8 +56,6 @@ impl Locker {
#[cfg(feature = "tracing")] #[cfg(feature = "tracing")]
debug!("New lock release: {}", &lock_info); debug!("New lock release: {}", &lock_info);
println!("Release Called {}", &lock_info);
let new_unlock_receivers = lock_server.ret(&lock_info); let new_unlock_receivers = lock_server.ret(&lock_info);
action_mux.push_unlock_receivers(new_unlock_receivers); action_mux.push_unlock_receivers(new_unlock_receivers);
} }