use std::{ sync::Arc, collections::{HashSet, HashMap}, time::{Duration, Instant}, }; use serai_client::primitives::{NetworkId, PublicKey}; use tokio::sync::{mpsc, RwLock}; use futures_util::StreamExt; use libp2p::{ multihash::Multihash, identity::PeerId, swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm}, }; /// A struct to sync the validators from the Serai node in order to keep track of them. mod validators; use validators::Validators; /// The authentication protocol upgrade to limit the P2P network to active validators. mod authenticate; /// The dial task, to find new peers to connect to mod dial; /// The request-response messages and behavior mod reqres; use reqres::{Request, Response}; /// The gossip messages and behavior mod gossip; /// The heartbeat task, effecting sync of Tributaries mod heartbeat; const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') fn peer_id_from_public(public: PublicKey) -> PeerId { // 0 represents the identity Multihash, that no hash was performed // It's an internal constant so we can't refer to the constant inside libp2p PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap() } struct Peer; impl Peer { async fn send(&self, request: Request) -> Result { (async move { todo!("TODO") }).await } } #[derive(Clone)] struct Peers { peers: Arc>>>, } #[derive(Clone, Debug)] struct P2p; impl P2p { async fn peers(&self, set: NetworkId) -> Vec { (async move { todo!("TODO") }).await } } #[async_trait::async_trait] impl tributary::P2p for P2p { async fn broadcast(&self, genesis: [u8; 32], msg: Vec) { todo!("TODO") } } #[derive(NetworkBehaviour)] struct Behavior { reqres: reqres::Behavior, gossip: gossip::Behavior, } struct SwarmTask { to_dial: mpsc::UnboundedReceiver, validators: Arc>, last_refreshed_validators: Instant, next_refresh_validators: Instant, peers: Peers, rebuild_peers_at: Instant, swarm: Swarm, } impl SwarmTask { async fn run(mut self) { loop { let time_till_refresh_validators = self.next_refresh_validators.saturating_duration_since(Instant::now()); let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now()); tokio::select! { biased; // Refresh the instance of validators we use to track peers/share with authenticate () = tokio::time::sleep(time_till_refresh_validators) => { const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(60); const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5 * 60); let update = self.validators.write().await.update().await; match update { Ok(removed) => { for removed in removed { let _: Result<_, _> = self.swarm.disconnect_peer_id(removed); } self.last_refreshed_validators = Instant::now(); self.next_refresh_validators = Instant::now() + TIME_BETWEEN_REFRESH_VALIDATORS; } Err(e) => { log::warn!("couldn't refresh validators: {e:?}"); // Increase the delay before the next refresh by using the time since the last // refresh. This will be 5 seconds, then 5 seconds, then 10 seconds, then 20... let time_since_last = self .next_refresh_validators .saturating_duration_since(self.last_refreshed_validators); // But limit the delay self.next_refresh_validators = Instant::now() + time_since_last.min(MAX_TIME_BETWEEN_REFRESH_VALIDATORS); }, } } // Rebuild the peers every 10 minutes // // This handles edge cases such as when a validator changes the networks they're present // in, race conditions, or any other edge cases/quirks which would otherwise risk spiraling // out of control () = tokio::time::sleep(time_till_rebuild_peers) => { const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60); let validators_by_network = self.validators.read().await.by_network().clone(); let connected = self.swarm.connected_peers().copied().collect::>(); let mut peers = HashMap::new(); for (network, validators) in validators_by_network { peers.insert(network, validators.intersection(&connected).copied().collect()); } *self.peers.peers.write().await = peers; self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS; } // Dial peers we're instructed to dial_opts = self.to_dial.recv() => { let dial_opts = dial_opts.expect("DialTask was closed?"); let _: Result<_, _> = self.swarm.dial(dial_opts); } // Handle swarm events event = self.swarm.next() => { // `Swarm::next` will never return `Poll::Ready(None)` // https://docs.rs/ // libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E let event = event.unwrap(); match event { SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => todo!("TODO"), SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => todo!("TODO"), // New connection, so update peers SwarmEvent::ConnectionEstablished { peer_id, .. } => { let Some(networks) = self.validators.read().await.networks(&peer_id).cloned() else { continue }; for network in networks { self .peers .peers .write() .await .entry(network) .or_insert_with(HashSet::new) .insert(peer_id); } }, // Connection closed, so update peers SwarmEvent::ConnectionClosed { peer_id, .. } => { let Some(networks) = self.validators.read().await.networks(&peer_id).cloned() else { continue }; for network in networks { self .peers .peers .write() .await .entry(network) .or_insert_with(HashSet::new) .remove(&peer_id); } }, SwarmEvent::IncomingConnection { .. } | SwarmEvent::IncomingConnectionError { .. } | SwarmEvent::OutgoingConnectionError { .. } | SwarmEvent::NewListenAddr { .. } | SwarmEvent::ExpiredListenAddr { .. } | SwarmEvent::ListenerClosed { .. } | SwarmEvent::ListenerError { .. } | SwarmEvent::Dialing { .. } => {} } } } } } }