Redo new_tributary from being over ActiveTributary to TributaryEvent

TributaryEvent also allows broadcasting a retiry event.
This commit is contained in:
Luke Parker
2023-10-14 14:56:02 -04:00
parent 5c5c097da9
commit f414735be5
6 changed files with 209 additions and 150 deletions

View File

@@ -175,13 +175,13 @@ pub(crate) async fn scan_tributaries_task<
recognized_id: RID,
processors: Pro,
serai: Arc<Serai>,
mut new_tributary: broadcast::Receiver<crate::ActiveTributary<D, P>>,
mut tributary_event: broadcast::Receiver<crate::TributaryEvent<D, P>>,
) {
log::info!("scanning tributaries");
loop {
match new_tributary.recv().await {
Ok(crate::ActiveTributary { spec, tributary }) => {
match tributary_event.recv().await {
Ok(crate::TributaryEvent::NewTributary(crate::ActiveTributary { spec, tributary })) => {
// For each Tributary, spawn a dedicated scanner task
tokio::spawn({
let raw_db = raw_db.clone();
@@ -266,10 +266,12 @@ pub(crate) async fn scan_tributaries_task<
}
});
}
// TODO
Ok(crate::TributaryEvent::TributaryRetired(_)) => todo!(),
Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("scan_tributaries lagged to handle new_tributary")
panic!("scan_tributaries lagged to handle tributary_event")
}
Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"),
Err(broadcast::error::RecvError::Closed) => panic!("tributary_event sender closed"),
}
}
}