diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index bc91fdd6..a664b49c 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -53,8 +53,8 @@ mod substrate; #[cfg(test)] pub mod tests; -// This is a static to satisfy lifetime expectations lazy_static::lazy_static! { + // This is a static to satisfy lifetime expectations static ref NEW_TRIBUTARIES: RwLock> = RwLock::new(VecDeque::new()); } @@ -271,6 +271,8 @@ pub async fn handle_p2p( loop { let mut msg = p2p.receive().await; match msg.kind { + P2pMessageKind::KeepAlive => {} + P2pMessageKind::Tributary(genesis) => { let tributaries = tributaries.read().await; let Some(tributary) = tributaries.get(&genesis) else { diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 25147d3f..11db03b0 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -1,5 +1,5 @@ use core::{time::Duration, fmt, task::Poll}; -use std::{sync::Arc, collections::VecDeque, io::Read}; +use std::{sync::Arc, time::Instant, collections::VecDeque, io::Read}; use async_trait::async_trait; @@ -27,6 +27,7 @@ const LIBP2P_TOPIC: &str = "serai-coordinator"; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum P2pMessageKind { + KeepAlive, Tributary([u8; 32]), Heartbeat([u8; 32]), Block([u8; 32]), @@ -35,21 +36,22 @@ pub enum P2pMessageKind { impl P2pMessageKind { fn serialize(&self) -> Vec { match self { + P2pMessageKind::KeepAlive => vec![0], P2pMessageKind::Tributary(genesis) => { - let mut res = vec![0]; - res.extend(genesis); - res - } - P2pMessageKind::Heartbeat(genesis) => { let mut res = vec![1]; res.extend(genesis); res } - P2pMessageKind::Block(genesis) => { + P2pMessageKind::Heartbeat(genesis) => { let mut res = vec![2]; res.extend(genesis); res } + P2pMessageKind::Block(genesis) => { + let mut res = vec![3]; + res.extend(genesis); + res + } } } @@ -57,17 +59,18 @@ impl P2pMessageKind { let mut kind = [0; 1]; reader.read_exact(&mut kind).ok()?; match kind[0] { - 0 => Some({ + 0 => Some(P2pMessageKind::KeepAlive), + 1 => Some({ let mut genesis = [0; 32]; reader.read_exact(&mut genesis).ok()?; P2pMessageKind::Tributary(genesis) }), - 1 => Some({ + 2 => Some({ let mut genesis = [0; 32]; reader.read_exact(&mut genesis).ok()?; P2pMessageKind::Heartbeat(genesis) }), - 2 => Some({ + 3 => Some({ let mut genesis = [0; 32]; reader.read_exact(&mut genesis).ok()?; P2pMessageKind::Block(genesis) @@ -103,6 +106,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { log::trace!( "broadcasting p2p message (kind {})", match kind { + P2pMessageKind::KeepAlive => "KeepAlive".to_string(), P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), @@ -128,6 +132,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p { log::trace!( "received p2p message (kind {})", match kind { + P2pMessageKind::KeepAlive => "KeepAlive".to_string(), P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), @@ -144,6 +149,10 @@ struct Behavior { mdns: libp2p::mdns::tokio::Behaviour, } +lazy_static::lazy_static! { + static ref TIME_OF_LAST_P2P_MESSAGE: Mutex = Mutex::new(Instant::now()); +} + #[allow(clippy::type_complexity)] #[derive(Clone)] pub struct LibP2p(Arc>>, Arc)>>>); @@ -178,6 +187,8 @@ impl LibP2p { use blake2::{Digest, Blake2s256}; let config = ConfigBuilder::default() .max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE) + // We send KeepAlive after 80s + .idle_timeout(Duration::from_secs(85)) .validation_mode(ValidationMode::Strict) // Uses a content based message ID to avoid duplicates as much as possible .message_id_fn(|msg| { @@ -210,7 +221,7 @@ impl LibP2p { // TODO: We do tests on release binaries as of right now... //#[cfg(debug_assertions)] mdns: { - log::info!("spawning mdns"); + log::info!("creating mdns service"); libp2p::mdns::tokio::Behaviour::new(libp2p::mdns::Config::default(), throwaway_peer_id) .unwrap() }, @@ -227,7 +238,21 @@ impl LibP2p { async move { // Run this task ad-infinitum loop { + // If it's been >80s since we've published a message, publish a KeepAlive since we're + // still an active service + // This is useful when we have no active tributaries and accordingly aren't sending + // heartbeats + // If we are sending heartbeats, we should've sent one after 60s of no finalized blocks + // (where a finalized block only occurs due to network activity), meaning this won't be + // run + let time_since_last = + Instant::now().duration_since(*TIME_OF_LAST_P2P_MESSAGE.lock().await); + if time_since_last > Duration::from_secs(80) { + p2p.broadcast_raw(P2pMessageKind::KeepAlive.serialize()).await; + } + // Maintain this lock until it's out of events + // TODO: Is there a less contentious way to run this poll? let mut p2p_lock = p2p.0.lock().await; loop { match futures::poll!(p2p_lock.next()) { @@ -236,6 +261,7 @@ impl LibP2p { libp2p::mdns::Event::Discovered(list), )))) => { for (peer, mut addr) in list { + // Check the port is as expected to prevent trying to peer with Substrate nodes if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) { log::info!("found peer via mdns"); p2p_lock.behaviour_mut().gossipsub.add_explicit_peer(&peer); @@ -255,6 +281,7 @@ impl LibP2p { Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( GsEvent::Message { propagation_source, message, .. }, )))) => { + *TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now(); p2p.1.lock().await.push_back((propagation_source, message.data)); } Poll::Ready(Some(_)) => {} @@ -281,6 +308,9 @@ impl P2p for LibP2p { } async fn broadcast_raw(&self, msg: Vec) { + // Update the time of last message + *TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now(); + match self .0 .lock()