diff --git a/common/task/src/lib.rs b/common/task/src/lib.rs index b5523cda..2a061c10 100644 --- a/common/task/src/lib.rs +++ b/common/task/src/lib.rs @@ -3,27 +3,29 @@ #![deny(missing_docs)] use core::{future::Future, time::Duration}; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot, Mutex}; - -enum Closed { - NotClosed(Option>), - Closed, -} +use tokio::sync::mpsc; /// A handle for a task. +/// +/// The task will only stop running once all handles for it are dropped. +// +// `run_now` isn't infallible if the task may have been closed. `run_now` on a closed task would +// either need to panic (historic behavior), silently drop the fact the task can't be run, or +// return an error. Instead of having a potential panic, and instead of modeling the error +// behavior, this task can't be closed unless all handles are dropped, ensuring calls to `run_now` +// are infallible. #[derive(Clone)] pub struct TaskHandle { run_now: mpsc::Sender<()>, + #[allow(dead_code)] // This is used to track if all handles have been dropped close: mpsc::Sender<()>, - closed: Arc>, } + /// A task's internal structures. pub struct Task { run_now: mpsc::Receiver<()>, close: mpsc::Receiver<()>, - closed: oneshot::Sender<()>, } impl Task { @@ -34,14 +36,9 @@ impl Task { let (run_now_send, run_now_recv) = mpsc::channel(1); // And any call to close satisfies all calls to close let (close_send, close_recv) = mpsc::channel(1); - let (closed_send, closed_recv) = oneshot::channel(); ( - Self { run_now: run_now_recv, close: close_recv, closed: closed_send }, - TaskHandle { - run_now: run_now_send, - close: close_send, - closed: Arc::new(Mutex::new(Closed::NotClosed(Some(closed_recv)))), - }, + Self { run_now: run_now_recv, close: close_recv }, + TaskHandle { run_now: run_now_send, close: close_send }, ) } } @@ -61,24 +58,6 @@ impl TaskHandle { } } } - - /// Close the task. - /// - /// Returns once the task shuts down after it finishes its current iteration (which may be of - /// unbounded time). - pub async fn close(self) { - // If another instance of the handle called tfhis, don't error - let _ = self.close.send(()).await; - // Wait until we receive the closed message - let mut closed = self.closed.lock().await; - match &mut *closed { - Closed::NotClosed(ref mut recv) => { - assert_eq!(recv.take().unwrap().await, Ok(()), "continually ran task dropped itself?"); - *closed = Closed::Closed; - } - Closed::Closed => {} - } - } } /// A task to be continually ran. @@ -152,8 +131,6 @@ pub trait ContinuallyRan: Sized + Send { }, } } - - task.closed.send(()).unwrap(); } } }