Start filling out message handling in SwarmTask

This commit is contained in:
Luke Parker
2025-01-05 01:23:28 -05:00
parent c6d0fb477c
commit 257f691277
4 changed files with 54 additions and 8 deletions

View File

@@ -285,10 +285,10 @@ impl<D: Db> Cosigning<D> {
/// ///
/// If this global session hasn't produced any notable cosigns, this will return the latest /// If this global session hasn't produced any notable cosigns, this will return the latest
/// cosigns for this session. /// cosigns for this session.
pub fn notable_cosigns(&self, global_session: [u8; 32]) -> Vec<SignedCosign> { pub fn notable_cosigns(getter: &impl Get, global_session: [u8; 32]) -> Vec<SignedCosign> {
let mut cosigns = Vec::with_capacity(serai_client::primitives::NETWORKS.len()); let mut cosigns = Vec::with_capacity(serai_client::primitives::NETWORKS.len());
for network in serai_client::primitives::NETWORKS { for network in serai_client::primitives::NETWORKS {
if let Some(cosign) = NetworksLatestCosignedBlock::get(&self.db, global_session, network) { if let Some(cosign) = NetworksLatestCosignedBlock::get(getter, global_session, network) {
cosigns.push(cosign); cosigns.push(cosign);
} }
} }

View File

@@ -10,6 +10,7 @@ use libp2p::gossipsub::{
IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentityTransform, IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentityTransform,
AllowAllSubscriptionFilter, Behaviour, AllowAllSubscriptionFilter, Behaviour,
}; };
pub use libp2p::gossipsub::Event;
use serai_cosign::SignedCosign; use serai_cosign::SignedCosign;
@@ -27,7 +28,7 @@ fn topic_for_set(set: ValidatorSet) -> IdentTopic {
#[derive(Clone, BorshSerialize, BorshDeserialize)] #[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) enum Message { pub(crate) enum Message {
Tribuary { genesis: [u8; 32], message: Vec<u8> }, Tributary { set: ValidatorSet, message: Vec<u8> },
Cosign(SignedCosign), Cosign(SignedCosign),
} }

View File

@@ -4,10 +4,15 @@ use std::{
time::{Duration, Instant}, time::{Duration, Instant},
}; };
use borsh::BorshDeserialize;
use serai_client::primitives::{NetworkId, PublicKey}; use serai_client::primitives::{NetworkId, PublicKey};
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, RwLock};
use serai_db::Db;
use serai_cosign::Cosigning;
use futures_util::StreamExt; use futures_util::StreamExt;
use libp2p::{ use libp2p::{
multihash::Multihash, multihash::Multihash,
@@ -76,7 +81,7 @@ struct Behavior {
gossip: gossip::Behavior, gossip: gossip::Behavior,
} }
struct SwarmTask { struct SwarmTask<D: Db> {
to_dial: mpsc::UnboundedReceiver<DialOpts>, to_dial: mpsc::UnboundedReceiver<DialOpts>,
validators: Arc<RwLock<Validators>>, validators: Arc<RwLock<Validators>>,
@@ -86,10 +91,11 @@ struct SwarmTask {
peers: Peers, peers: Peers,
rebuild_peers_at: Instant, rebuild_peers_at: Instant,
db: D,
swarm: Swarm<Behavior>, swarm: Swarm<Behavior>,
} }
impl SwarmTask { impl<D: Db> SwarmTask<D> {
async fn run(mut self) { async fn run(mut self) {
loop { loop {
let time_till_refresh_validators = let time_till_refresh_validators =
@@ -160,8 +166,42 @@ impl SwarmTask {
// libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E // libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E
let event = event.unwrap(); let event = event.unwrap();
match event { match event {
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => todo!("TODO"), SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => match event {
SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => todo!("TODO"), reqres::Event::Message { message, .. } => match message {
reqres::Message::Request { request_id: _, request, channel } => {
match request {
// TODO: Send these
reqres::Request::KeepAlive => {},
reqres::Request::Heartbeat { set, latest_block_hash } => todo!("TODO"),
reqres::Request::NotableCosigns { global_session } => {
let cosigns = Cosigning::<D>::notable_cosigns(&self.db, global_session);
let res = reqres::Response::NotableCosigns(cosigns);
let _: Result<_, _> =
self.swarm.behaviour_mut().reqres.send_response(channel, res);
},
}
}
reqres::Message::Response { request_id, response } => todo!("TODO"),
}
reqres::Event::OutboundFailure { request_id, .. } => todo!("TODO"),
reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {},
},
SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => match event {
gossip::Event::Message { message, .. } => {
let Ok(message) = gossip::Message::deserialize(&mut message.data.as_slice()) else {
continue
};
match message {
gossip::Message::Tributary { set, message } => todo!("TODO"),
gossip::Message::Cosign(signed_cosign) => todo!("TODO"),
}
}
gossip::Event::Subscribed { .. } | gossip::Event::Unsubscribed { .. } => {},
gossip::Event::GossipsubNotSupported { peer_id } => {
let _: Result<_, _> = self.swarm.disconnect_peer_id(peer_id);
}
},
// New connection, so update peers // New connection, so update peers
SwarmEvent::ConnectionEstablished { peer_id, .. } => { SwarmEvent::ConnectionEstablished { peer_id, .. } => {
let Some(networks) = let Some(networks) =
@@ -177,6 +217,7 @@ impl SwarmTask {
.insert(peer_id); .insert(peer_id);
} }
}, },
// Connection closed, so update peers // Connection closed, so update peers
SwarmEvent::ConnectionClosed { peer_id, .. } => { SwarmEvent::ConnectionClosed { peer_id, .. } => {
let Some(networks) = let Some(networks) =
@@ -191,7 +232,10 @@ impl SwarmTask {
.or_insert_with(HashSet::new) .or_insert_with(HashSet::new)
.remove(&peer_id); .remove(&peer_id);
} }
// TODO: dial_task.run_now() if haven't in past minute
}, },
// We don't handle any of these
SwarmEvent::IncomingConnection { .. } | SwarmEvent::IncomingConnection { .. } |
SwarmEvent::IncomingConnectionError { .. } | SwarmEvent::IncomingConnectionError { .. } |
SwarmEvent::OutgoingConnectionError { .. } | SwarmEvent::OutgoingConnectionError { .. } |

View File

@@ -8,7 +8,8 @@ use serai_client::validator_sets::primitives::ValidatorSet;
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::request_response::{Codec as CodecTrait, Config, Behaviour, ProtocolSupport}; use libp2p::request_response::{self, Codec as CodecTrait, Config, Behaviour, ProtocolSupport};
pub use request_response::{Message, Event};
use serai_cosign::SignedCosign; use serai_cosign::SignedCosign;