Remove TaskHandle::close

TaskHandle::close meant run_now may panic if the task was closed. Now, tasks
are only closed when all handles are dropped, causing all handles to point to
running tasks (ensuring run_now won't panic).
This commit is contained in:
Luke Parker
2025-01-07 15:26:41 -05:00
parent 47a4e534ef
commit 052388285b

View File

@@ -3,27 +3,29 @@
#![deny(missing_docs)] #![deny(missing_docs)]
use core::{future::Future, time::Duration}; use core::{future::Future, time::Duration};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex}; use tokio::sync::mpsc;
enum Closed {
NotClosed(Option<oneshot::Receiver<()>>),
Closed,
}
/// A handle for a task. /// A handle for a task.
///
/// The task will only stop running once all handles for it are dropped.
//
// `run_now` isn't infallible if the task may have been closed. `run_now` on a closed task would
// either need to panic (historic behavior), silently drop the fact the task can't be run, or
// return an error. Instead of having a potential panic, and instead of modeling the error
// behavior, this task can't be closed unless all handles are dropped, ensuring calls to `run_now`
// are infallible.
#[derive(Clone)] #[derive(Clone)]
pub struct TaskHandle { pub struct TaskHandle {
run_now: mpsc::Sender<()>, run_now: mpsc::Sender<()>,
#[allow(dead_code)] // This is used to track if all handles have been dropped
close: mpsc::Sender<()>, close: mpsc::Sender<()>,
closed: Arc<Mutex<Closed>>,
} }
/// A task's internal structures. /// A task's internal structures.
pub struct Task { pub struct Task {
run_now: mpsc::Receiver<()>, run_now: mpsc::Receiver<()>,
close: mpsc::Receiver<()>, close: mpsc::Receiver<()>,
closed: oneshot::Sender<()>,
} }
impl Task { impl Task {
@@ -34,14 +36,9 @@ impl Task {
let (run_now_send, run_now_recv) = mpsc::channel(1); let (run_now_send, run_now_recv) = mpsc::channel(1);
// And any call to close satisfies all calls to close // And any call to close satisfies all calls to close
let (close_send, close_recv) = mpsc::channel(1); let (close_send, close_recv) = mpsc::channel(1);
let (closed_send, closed_recv) = oneshot::channel();
( (
Self { run_now: run_now_recv, close: close_recv, closed: closed_send }, Self { run_now: run_now_recv, close: close_recv },
TaskHandle { TaskHandle { run_now: run_now_send, close: close_send },
run_now: run_now_send,
close: close_send,
closed: Arc::new(Mutex::new(Closed::NotClosed(Some(closed_recv)))),
},
) )
} }
} }
@@ -61,24 +58,6 @@ impl TaskHandle {
} }
} }
} }
/// Close the task.
///
/// Returns once the task shuts down after it finishes its current iteration (which may be of
/// unbounded time).
pub async fn close(self) {
// If another instance of the handle called tfhis, don't error
let _ = self.close.send(()).await;
// Wait until we receive the closed message
let mut closed = self.closed.lock().await;
match &mut *closed {
Closed::NotClosed(ref mut recv) => {
assert_eq!(recv.take().unwrap().await, Ok(()), "continually ran task dropped itself?");
*closed = Closed::Closed;
}
Closed::Closed => {}
}
}
} }
/// A task to be continually ran. /// A task to be continually ran.
@@ -152,8 +131,6 @@ pub trait ContinuallyRan: Sized + Send {
}, },
} }
} }
task.closed.send(()).unwrap();
} }
} }
} }