diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index beccfc9f..65c6203e 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -61,16 +61,27 @@ pub struct ActiveTributary { pub tributary: Arc>, } +#[derive(Clone)] +pub enum TributaryEvent { + NewTributary(ActiveTributary), + TributaryRetired(ValidatorSet), +} + // Creates a new tributary and sends it to all listeners. // TODO: retire_tributary async fn add_tributary( db: D, key: Zeroizing<::F>, + serai: &Serai, processors: &Pro, p2p: P, - tributaries: &broadcast::Sender>, + tributaries: &broadcast::Sender>, spec: TributarySpec, ) { + if !is_active_set(serai, spec.set()).await { + log::info!("not adding tributary {:?} since it's been retired", spec.set()); + } + log::info!("adding tributary {:?}", spec.set()); let tributary = Tributary::<_, Transaction, _>::new( @@ -107,7 +118,7 @@ async fn add_tributary( .await; tributaries - .send(ActiveTributary { spec, tributary: Arc::new(tributary) }) + .send(TributaryEvent::NewTributary(ActiveTributary { spec, tributary: Arc::new(tributary) })) .map_err(|_| "all ActiveTributary recipients closed") .unwrap(); } @@ -643,22 +654,30 @@ async fn handle_processor_messages( serai: Arc, mut processors: Pro, network: NetworkId, - mut new_tributary: mpsc::UnboundedReceiver>, + mut tributary_event: mpsc::UnboundedReceiver>, ) { let mut tributaries = HashMap::new(); loop { - match new_tributary.try_recv() { - Ok(tributary) => { - let set = tributary.spec.set(); - assert_eq!(set.network, network); - tributaries.insert(set.session, tributary); + match tributary_event.try_recv() { + Ok(event) => { + match event { + TributaryEvent::NewTributary(tributary) => { + let set = tributary.spec.set(); + assert_eq!(set.network, network); + tributaries.insert(set.session, tributary); + } + // TOOD + TributaryEvent::TributaryRetired(_) => todo!(), + } } Err(mpsc::error::TryRecvError::Empty) => {} Err(mpsc::error::TryRecvError::Disconnected) => { - panic!("handle_processor_messages new_tributary sender closed") + panic!("handle_processor_messages tributary_event sender closed") } } + // TODO: Remove the Tributary if it's retired + // TODO: Check this ID is sane (last handled ID or expected next ID) let msg = processors.recv(network).await; if handle_processor_message(&mut db, &key, &serai, &tributaries, network, &msg).await { @@ -672,7 +691,7 @@ pub async fn handle_processors( key: Zeroizing<::F>, serai: Arc, processors: Pro, - mut new_tributary: broadcast::Receiver>, + mut tributary_event: broadcast::Receiver>, ) { let mut channels = HashMap::new(); for network in serai_client::primitives::NETWORKS { @@ -693,8 +712,13 @@ pub async fn handle_processors( // Listen to new tributary events loop { - let tributary = new_tributary.recv().await.unwrap(); - channels[&tributary.spec.set().network].send(tributary).unwrap(); + match tributary_event.recv().await.unwrap() { + TributaryEvent::NewTributary(tributary) => channels[&tributary.spec.set().network] + .send(TributaryEvent::NewTributary(tributary)) + .unwrap(), + // TODO + TributaryEvent::TributaryRetired(_) => todo!(), + }; } } @@ -726,16 +750,17 @@ pub async fn run( // This should be large enough for an entire rotation of all tributaries // If it's too small, the coordinator fail to boot, which is a decent sanity check - let (new_tributary, mut new_tributary_listener_1) = broadcast::channel(32); - let new_tributary_listener_2 = new_tributary.subscribe(); - let new_tributary_listener_3 = new_tributary.subscribe(); - let new_tributary_listener_4 = new_tributary.subscribe(); - let new_tributary_listener_5 = new_tributary.subscribe(); + let (tributary_event, mut tributary_event_listener_1) = broadcast::channel(32); + let tributary_event_listener_2 = tributary_event.subscribe(); + let tributary_event_listener_3 = tributary_event.subscribe(); + let tributary_event_listener_4 = tributary_event.subscribe(); + let tributary_event_listener_5 = tributary_event.subscribe(); // Spawn a task to further add Tributaries as needed tokio::spawn({ let raw_db = raw_db.clone(); let key = key.clone(); + let serai = serai.clone(); let processors = processors.clone(); let p2p = p2p.clone(); async move { @@ -745,11 +770,12 @@ pub async fn run( tokio::spawn({ let raw_db = raw_db.clone(); let key = key.clone(); + let serai = serai.clone(); let processors = processors.clone(); let p2p = p2p.clone(); - let new_tributary = new_tributary.clone(); + let tributary_event = tributary_event.clone(); async move { - add_tributary(raw_db, key, &processors, p2p, &new_tributary, spec).await; + add_tributary(raw_db, key, &serai, &processors, p2p, &tributary_event, spec).await; } }); } @@ -766,14 +792,16 @@ pub async fn run( let tributaries = tributaries.clone(); async move { loop { - match new_tributary_listener_1.recv().await { - Ok(tributary) => { + match tributary_event_listener_1.recv().await { + Ok(TributaryEvent::NewTributary(tributary)) => { tributaries.write().await.insert(tributary.spec.genesis(), tributary.tributary); } + // TODO + Ok(TributaryEvent::TributaryRetired(_)) => todo!(), Err(broadcast::error::RecvError::Lagged(_)) => { - panic!("recognized_id lagged to handle new_tributary") + panic!("recognized_id lagged to handle tributary_event") } - Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), + Err(broadcast::error::RecvError::Closed) => panic!("tributary_event sender closed"), } } } @@ -848,23 +876,23 @@ pub async fn run( recognized_id, processors.clone(), serai.clone(), - new_tributary_listener_2, + tributary_event_listener_2, )); } // Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block // in a while (presumably because we're behind) - tokio::spawn(p2p::heartbeat_tributaries_task(p2p.clone(), new_tributary_listener_3)); + tokio::spawn(p2p::heartbeat_tributaries_task(p2p.clone(), tributary_event_listener_3)); // Handle P2P messages tokio::spawn(p2p::handle_p2p_task( Ristretto::generator() * key.deref(), p2p, - new_tributary_listener_4, + tributary_event_listener_4, )); // Handle all messages from processors - handle_processors(raw_db, key, serai, processors, new_tributary_listener_5).await; + handle_processors(raw_db, key, serai, processors, tributary_event_listener_5).await; } #[tokio::main] diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 5cd46abf..69720e2a 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -34,7 +34,7 @@ use libp2p::{ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p}; -use crate::{Transaction, Block, Tributary, ActiveTributary}; +use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; // TODO: Use distinct topics const LIBP2P_TOPIC: &str = "serai-coordinator"; @@ -383,27 +383,30 @@ impl TributaryP2p for LibP2p { pub async fn heartbeat_tributaries_task( p2p: P, - mut new_tributary: broadcast::Receiver>, + mut tributary_event: broadcast::Receiver>, ) { let ten_blocks_of_time = Duration::from_secs((10 * Tributary::::block_time()).into()); - let mut readers = vec![]; + let mut readers = HashMap::new(); loop { - while let Ok(ActiveTributary { spec: _, tributary }) = { - match new_tributary.try_recv() { - Ok(tributary) => Ok(tributary), - Err(broadcast::error::TryRecvError::Empty) => Err(()), - Err(broadcast::error::TryRecvError::Lagged(_)) => { - panic!("heartbeat_tributaries lagged to handle new_tributary") + loop { + match tributary_event.try_recv() { + Ok(TributaryEvent::NewTributary(ActiveTributary { spec, tributary })) => { + readers.insert(spec.set(), tributary.reader()); } - Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), + Ok(TributaryEvent::TributaryRetired(set)) => { + readers.remove(&set); + } + Err(broadcast::error::TryRecvError::Empty) => break, + Err(broadcast::error::TryRecvError::Lagged(_)) => { + panic!("heartbeat_tributaries lagged to handle tributary_event") + } + Err(broadcast::error::TryRecvError::Closed) => panic!("tributary_event sender closed"), } - } { - readers.push(tributary.reader()); } - for tributary in &readers { + for tributary in readers.values() { let tip = tributary.tip(); let block_time = SystemTime::UNIX_EPOCH + Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0)); @@ -433,7 +436,7 @@ pub async fn heartbeat_tributaries_task( pub async fn handle_p2p_task( our_key: ::G, p2p: P, - mut new_tributary: broadcast::Receiver>, + mut tributary_event: broadcast::Receiver>, ) { let channels = Arc::new(RwLock::new(HashMap::new())); tokio::spawn({ @@ -441,111 +444,122 @@ pub async fn handle_p2p_task( let channels = channels.clone(); async move { loop { - let tributary = new_tributary.recv().await.unwrap(); - let genesis = tributary.spec.genesis(); + match tributary_event.recv().await.unwrap() { + TributaryEvent::NewTributary(tributary) => { + let genesis = tributary.spec.genesis(); - let (send, mut recv) = mpsc::unbounded_channel(); - channels.write().await.insert(genesis, send); + let (send, mut recv) = mpsc::unbounded_channel(); + channels.write().await.insert(genesis, send); - tokio::spawn({ - let p2p = p2p.clone(); - async move { - loop { - let mut msg: Message

= recv.recv().await.unwrap(); - match msg.kind { - P2pMessageKind::KeepAlive => {} + tokio::spawn({ + let p2p = p2p.clone(); + async move { + loop { + let mut msg: Message

= recv.recv().await.unwrap(); + match msg.kind { + P2pMessageKind::KeepAlive => {} - P2pMessageKind::Tributary(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - log::trace!("handling message for tributary {:?}", tributary.spec.set()); - if tributary.tributary.handle_message(&msg.msg).await { - P2p::broadcast(&p2p, msg.kind, msg.msg).await; - } - } - - // TODO2: Rate limit this per timestamp - // And/or slash on Heartbeat which justifies a response, since the node obviously - // was offline and we must now use our bandwidth to compensate for them? - P2pMessageKind::Heartbeat(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - if msg.msg.len() != 40 { - log::error!("validator sent invalid heartbeat"); - continue; - } - - let p2p = p2p.clone(); - let spec = tributary.spec.clone(); - let reader = tributary.tributary.reader(); - // Spawn a dedicated task as this may require loading large amounts of data from - // disk and take a notable amount of time - tokio::spawn(async move { - /* - // Have sqrt(n) nodes reply with the blocks - let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; - // Try to have at least 3 responders - if responders < 3 { - responders = tributary.spec.n().min(3).into(); - } - */ - - // Have up to three nodes respond - let responders = u64::from(spec.n().min(3)); - - // Decide which nodes will respond by using the latest block's hash as a - // mutually agreed upon entropy source - // This isn't a secure source of entropy, yet it's fine for this - let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap()); - // If n = 10, responders = 3, we want `start` to be 0 ..= 7 - // (so the highest is 7, 8, 9) - // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 - let start = - usize::try_from(entropy % (u64::from(spec.n() + 1) - responders)).unwrap(); - let mut selected = false; - for validator in - &spec.validators()[start .. (start + usize::try_from(responders).unwrap())] - { - if our_key == validator.0 { - selected = true; - break; + P2pMessageKind::Tributary(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + log::trace!("handling message for tributary {:?}", tributary.spec.set()); + if tributary.tributary.handle_message(&msg.msg).await { + P2p::broadcast(&p2p, msg.kind, msg.msg).await; } } - if !selected { - log::debug!("received heartbeat and not selected to respond"); - return; + + // TODO2: Rate limit this per timestamp + // And/or slash on Heartbeat which justifies a response, since the node + // obviously was offline and we must now use our bandwidth to compensate for + // them? + P2pMessageKind::Heartbeat(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + if msg.msg.len() != 40 { + log::error!("validator sent invalid heartbeat"); + continue; + } + + let p2p = p2p.clone(); + let spec = tributary.spec.clone(); + let reader = tributary.tributary.reader(); + // Spawn a dedicated task as this may require loading large amounts of data + // from disk and take a notable amount of time + tokio::spawn(async move { + /* + // Have sqrt(n) nodes reply with the blocks + let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; + // Try to have at least 3 responders + if responders < 3 { + responders = tributary.spec.n().min(3).into(); + } + */ + + // Have up to three nodes respond + let responders = u64::from(spec.n().min(3)); + + // Decide which nodes will respond by using the latest block's hash as a + // mutually agreed upon entropy source + // This isn't a secure source of entropy, yet it's fine for this + let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap()); + // If n = 10, responders = 3, we want `start` to be 0 ..= 7 + // (so the highest is 7, 8, 9) + // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 + let start = + usize::try_from(entropy % (u64::from(spec.n() + 1) - responders)) + .unwrap(); + let mut selected = false; + for validator in &spec.validators() + [start .. (start + usize::try_from(responders).unwrap())] + { + if our_key == validator.0 { + selected = true; + break; + } + } + if !selected { + log::debug!("received heartbeat and not selected to respond"); + return; + } + + log::debug!("received heartbeat and selected to respond"); + + let mut latest = msg.msg[.. 32].try_into().unwrap(); + while let Some(next) = reader.block_after(&latest) { + let mut res = reader.block(&next).unwrap().serialize(); + res.extend(reader.commit(&next).unwrap()); + // Also include the timestamp used within the Heartbeat + res.extend(&msg.msg[32 .. 40]); + p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await; + latest = next; + } + }); } - log::debug!("received heartbeat and selected to respond"); + P2pMessageKind::Block(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + let mut msg_ref: &[u8] = msg.msg.as_ref(); + let Ok(block) = Block::::read(&mut msg_ref) else { + log::error!("received block message with an invalidly serialized block"); + continue; + }; + // Get just the commit + msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); + msg.msg.drain((msg.msg.len() - 8) ..); - let mut latest = msg.msg[.. 32].try_into().unwrap(); - while let Some(next) = reader.block_after(&latest) { - let mut res = reader.block(&next).unwrap().serialize(); - res.extend(reader.commit(&next).unwrap()); - // Also include the timestamp used within the Heartbeat - res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await; - latest = next; + let res = tributary.tributary.sync_block(block, msg.msg).await; + log::debug!( + "received block from {:?}, sync_block returned {}", + msg.sender, + res + ); } - }); - } - - P2pMessageKind::Block(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - let mut msg_ref: &[u8] = msg.msg.as_ref(); - let Ok(block) = Block::::read(&mut msg_ref) else { - log::error!("received block message with an invalidly serialized block"); - continue; - }; - // Get just the commit - msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); - msg.msg.drain((msg.msg.len() - 8) ..); - - let res = tributary.tributary.sync_block(block, msg.msg).await; - log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); + } } } - } + }); } - }); + // TODO + TributaryEvent::TributaryRetired(_) => todo!(), + } } } }); diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index fca1fc03..12673782 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -423,6 +423,17 @@ pub async fn scan_task( |db: &mut D, spec: TributarySpec| { log::info!("creating new tributary for {:?}", spec.set()); + // Check it isn't already present in the DB due to rescanning this block upon reboot + for existing_spec in crate::MainDb::::active_tributaries(db).1 { + if spec.set() == existing_spec.set() { + log::warn!( + "already created tributary {:?}, this should only happen on reboot", + spec.set() + ); + return; + } + } + // Save it to the database let mut txn = db.txn(); crate::MainDb::::add_active_tributary(&mut txn, &spec); @@ -453,9 +464,7 @@ pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool { // call, instead of a series of network requests let serai = loop { let Ok(serai) = serai.with_current_latest_block().await else { - log::error!( - "couldn't get the latest block hash from serai when checking tributary relevancy" - ); + log::error!("couldn't get the latest block hash from serai when checking if set is active"); sleep(Duration::from_secs(5)).await; continue; }; @@ -464,7 +473,7 @@ pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool { let latest_session = loop { let Ok(res) = serai.session(set.network).await else { - log::error!("couldn't get the latest session from serai when checking tributary relevancy"); + log::error!("couldn't get the latest session from serai when checking if set is active"); sleep(Duration::from_secs(5)).await; continue; }; @@ -484,7 +493,7 @@ pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool { let keys = loop { let Ok(res) = serai.keys(set).await else { log::error!( - "couldn't get the keys for a session from serai when checking tributary relevancy" + "couldn't get the keys for a session from serai when checking if set is active" ); sleep(Duration::from_secs(5)).await; continue; diff --git a/coordinator/src/tests/tributary/handle_p2p.rs b/coordinator/src/tests/tributary/handle_p2p.rs index f69c0b1a..faa77de9 100644 --- a/coordinator/src/tests/tributary/handle_p2p.rs +++ b/coordinator/src/tests/tributary/handle_p2p.rs @@ -13,7 +13,7 @@ use tributary::Tributary; use crate::{ tributary::Transaction, - ActiveTributary, + ActiveTributary, TributaryEvent, p2p::handle_p2p_task, tests::{ LocalP2p, @@ -36,7 +36,7 @@ async fn handle_p2p_test() { let (new_tributary_send, new_tributary_recv) = broadcast::channel(5); tokio::spawn(handle_p2p_task(Ristretto::generator() * *keys[i], p2p, new_tributary_recv)); new_tributary_send - .send(ActiveTributary { spec: spec.clone(), tributary }) + .send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary })) .map_err(|_| "failed to send ActiveTributary") .unwrap(); tributary_senders.push(new_tributary_send); diff --git a/coordinator/src/tests/tributary/sync.rs b/coordinator/src/tests/tributary/sync.rs index 36f8e6d2..7c5a200b 100644 --- a/coordinator/src/tests/tributary/sync.rs +++ b/coordinator/src/tests/tributary/sync.rs @@ -13,7 +13,7 @@ use tributary::Tributary; use crate::{ tributary::Transaction, - ActiveTributary, + ActiveTributary, TributaryEvent, p2p::{heartbeat_tributaries_task, handle_p2p_task}, tests::{ LocalP2p, @@ -45,7 +45,7 @@ async fn sync_test() { let thread = tokio::spawn(handle_p2p_task(Ristretto::generator() * *keys[i], p2p, new_tributary_recv)); new_tributary_send - .send(ActiveTributary { spec: spec.clone(), tributary }) + .send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary })) .map_err(|_| "failed to send ActiveTributary") .unwrap(); tributary_senders.push(new_tributary_send); @@ -80,7 +80,10 @@ async fn sync_test() { let (syncer_tributary_send, syncer_tributary_recv) = broadcast::channel(5); tokio::spawn(handle_p2p_task(syncer_key, syncer_p2p.clone(), syncer_tributary_recv)); syncer_tributary_send - .send(ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() }) + .send(TributaryEvent::NewTributary(ActiveTributary { + spec: spec.clone(), + tributary: syncer_tributary.clone(), + })) .map_err(|_| "failed to send ActiveTributary to syncer") .unwrap(); @@ -98,7 +101,10 @@ async fn sync_test() { let (syncer_heartbeat_tributary_send, syncer_heartbeat_tributary_recv) = broadcast::channel(5); tokio::spawn(heartbeat_tributaries_task(syncer_p2p, syncer_heartbeat_tributary_recv)); syncer_heartbeat_tributary_send - .send(ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() }) + .send(TributaryEvent::NewTributary(ActiveTributary { + spec: spec.clone(), + tributary: syncer_tributary.clone(), + })) .map_err(|_| "failed to send ActiveTributary to heartbeat") .unwrap(); diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index 25f1a5dc..a12a8dd5 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -175,13 +175,13 @@ pub(crate) async fn scan_tributaries_task< recognized_id: RID, processors: Pro, serai: Arc, - mut new_tributary: broadcast::Receiver>, + mut tributary_event: broadcast::Receiver>, ) { log::info!("scanning tributaries"); loop { - match new_tributary.recv().await { - Ok(crate::ActiveTributary { spec, tributary }) => { + match tributary_event.recv().await { + Ok(crate::TributaryEvent::NewTributary(crate::ActiveTributary { spec, tributary })) => { // For each Tributary, spawn a dedicated scanner task tokio::spawn({ let raw_db = raw_db.clone(); @@ -266,10 +266,12 @@ pub(crate) async fn scan_tributaries_task< } }); } + // TODO + Ok(crate::TributaryEvent::TributaryRetired(_)) => todo!(), Err(broadcast::error::RecvError::Lagged(_)) => { - panic!("scan_tributaries lagged to handle new_tributary") + panic!("scan_tributaries lagged to handle tributary_event") } - Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"), + Err(broadcast::error::RecvError::Closed) => panic!("tributary_event sender closed"), } } }