From 257f69127787401fba13dc5a2ecb4757251df978 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 5 Jan 2025 01:23:28 -0500 Subject: [PATCH] Start filling out message handling in SwarmTask --- coordinator/cosign/src/lib.rs | 4 +-- coordinator/src/p2p/gossip.rs | 3 +- coordinator/src/p2p/mod.rs | 52 ++++++++++++++++++++++++++++++++--- coordinator/src/p2p/reqres.rs | 3 +- 4 files changed, 54 insertions(+), 8 deletions(-) diff --git a/coordinator/cosign/src/lib.rs b/coordinator/cosign/src/lib.rs index 3d2ab5ab..29420d56 100644 --- a/coordinator/cosign/src/lib.rs +++ b/coordinator/cosign/src/lib.rs @@ -285,10 +285,10 @@ impl Cosigning { /// /// If this global session hasn't produced any notable cosigns, this will return the latest /// cosigns for this session. - pub fn notable_cosigns(&self, global_session: [u8; 32]) -> Vec { + pub fn notable_cosigns(getter: &impl Get, global_session: [u8; 32]) -> Vec { let mut cosigns = Vec::with_capacity(serai_client::primitives::NETWORKS.len()); for network in serai_client::primitives::NETWORKS { - if let Some(cosign) = NetworksLatestCosignedBlock::get(&self.db, global_session, network) { + if let Some(cosign) = NetworksLatestCosignedBlock::get(getter, global_session, network) { cosigns.push(cosign); } } diff --git a/coordinator/src/p2p/gossip.rs b/coordinator/src/p2p/gossip.rs index 8e32180b..dc6a5849 100644 --- a/coordinator/src/p2p/gossip.rs +++ b/coordinator/src/p2p/gossip.rs @@ -10,6 +10,7 @@ use libp2p::gossipsub::{ IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentityTransform, AllowAllSubscriptionFilter, Behaviour, }; +pub use libp2p::gossipsub::Event; use serai_cosign::SignedCosign; @@ -27,7 +28,7 @@ fn topic_for_set(set: ValidatorSet) -> IdentTopic { #[derive(Clone, BorshSerialize, BorshDeserialize)] pub(crate) enum Message { - Tribuary { genesis: [u8; 32], message: Vec }, + Tributary { set: ValidatorSet, message: Vec }, Cosign(SignedCosign), } diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index 71984b8c..55cc311b 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -4,10 +4,15 @@ use std::{ time::{Duration, Instant}, }; +use borsh::BorshDeserialize; + use serai_client::primitives::{NetworkId, PublicKey}; use tokio::sync::{mpsc, RwLock}; +use serai_db::Db; +use serai_cosign::Cosigning; + use futures_util::StreamExt; use libp2p::{ multihash::Multihash, @@ -76,7 +81,7 @@ struct Behavior { gossip: gossip::Behavior, } -struct SwarmTask { +struct SwarmTask { to_dial: mpsc::UnboundedReceiver, validators: Arc>, @@ -86,10 +91,11 @@ struct SwarmTask { peers: Peers, rebuild_peers_at: Instant, + db: D, swarm: Swarm, } -impl SwarmTask { +impl SwarmTask { async fn run(mut self) { loop { let time_till_refresh_validators = @@ -160,8 +166,42 @@ impl SwarmTask { // libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E let event = event.unwrap(); match event { - SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => todo!("TODO"), - SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => todo!("TODO"), + SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => match event { + reqres::Event::Message { message, .. } => match message { + reqres::Message::Request { request_id: _, request, channel } => { + match request { + // TODO: Send these + reqres::Request::KeepAlive => {}, + reqres::Request::Heartbeat { set, latest_block_hash } => todo!("TODO"), + reqres::Request::NotableCosigns { global_session } => { + let cosigns = Cosigning::::notable_cosigns(&self.db, global_session); + let res = reqres::Response::NotableCosigns(cosigns); + let _: Result<_, _> = + self.swarm.behaviour_mut().reqres.send_response(channel, res); + }, + } + } + reqres::Message::Response { request_id, response } => todo!("TODO"), + } + reqres::Event::OutboundFailure { request_id, .. } => todo!("TODO"), + reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}, + }, + SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => match event { + gossip::Event::Message { message, .. } => { + let Ok(message) = gossip::Message::deserialize(&mut message.data.as_slice()) else { + continue + }; + match message { + gossip::Message::Tributary { set, message } => todo!("TODO"), + gossip::Message::Cosign(signed_cosign) => todo!("TODO"), + } + } + gossip::Event::Subscribed { .. } | gossip::Event::Unsubscribed { .. } => {}, + gossip::Event::GossipsubNotSupported { peer_id } => { + let _: Result<_, _> = self.swarm.disconnect_peer_id(peer_id); + } + }, + // New connection, so update peers SwarmEvent::ConnectionEstablished { peer_id, .. } => { let Some(networks) = @@ -177,6 +217,7 @@ impl SwarmTask { .insert(peer_id); } }, + // Connection closed, so update peers SwarmEvent::ConnectionClosed { peer_id, .. } => { let Some(networks) = @@ -191,7 +232,10 @@ impl SwarmTask { .or_insert_with(HashSet::new) .remove(&peer_id); } + // TODO: dial_task.run_now() if haven't in past minute }, + + // We don't handle any of these SwarmEvent::IncomingConnection { .. } | SwarmEvent::IncomingConnectionError { .. } | SwarmEvent::OutgoingConnectionError { .. } | diff --git a/coordinator/src/p2p/reqres.rs b/coordinator/src/p2p/reqres.rs index 7faf2f8b..0793e839 100644 --- a/coordinator/src/p2p/reqres.rs +++ b/coordinator/src/p2p/reqres.rs @@ -8,7 +8,8 @@ use serai_client::validator_sets::primitives::ValidatorSet; use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; -use libp2p::request_response::{Codec as CodecTrait, Config, Behaviour, ProtocolSupport}; +use libp2p::request_response::{self, Codec as CodecTrait, Config, Behaviour, ProtocolSupport}; +pub use request_response::{Message, Event}; use serai_cosign::SignedCosign;