diff --git a/coordinator/src/p2p/reqres.rs b/coordinator/src/p2p/reqres.rs index ad9075d7..fd6efcbd 100644 --- a/coordinator/src/p2p/reqres.rs +++ b/coordinator/src/p2p/reqres.rs @@ -46,16 +46,19 @@ pub(crate) struct TributaryBlockWithCommit { /// Responses which can be received via the request-response protocol. #[derive(Clone, BorshSerialize, BorshDeserialize)] pub(crate) enum Response { + NoResponse, Blocks(Vec), NotableCosigns(Vec), } impl fmt::Debug for Response { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - (match self { - Response::Blocks(_) => fmt.debug_struct("Response::Block"), - Response::NotableCosigns(_) => fmt.debug_struct("Response::NotableCosigns"), - }) - .finish_non_exhaustive() + match self { + Response::NoResponse => fmt.debug_struct("Response::NoResponse").finish(), + Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(), + Response::NotableCosigns(_) => { + fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive() + } + } } } diff --git a/coordinator/src/p2p/swarm.rs b/coordinator/src/p2p/swarm.rs index 8aab3d90..10d14a6d 100644 --- a/coordinator/src/p2p/swarm.rs +++ b/coordinator/src/p2p/swarm.rs @@ -6,17 +6,18 @@ use std::{ use borsh::BorshDeserialize; +use serai_client::validator_sets::primitives::ValidatorSet; + use tokio::sync::{mpsc, oneshot, RwLock}; -use serai_db::Db; use serai_task::TaskHandle; -use serai_cosign::Cosigning; +use serai_cosign::SignedCosign; use futures_util::StreamExt; use libp2p::{ identity::PeerId, - request_response::RequestId, + request_response::{RequestId, ResponseChannel}, swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, }; @@ -42,59 +43,36 @@ use crate::p2p::{ - Dispatching received requests - Sending responses */ -struct SwarmTask { +struct SwarmTask { dial_task: TaskHandle, to_dial: mpsc::UnboundedReceiver, last_dial_task_run: Instant, validators: Arc>, - peers: Peers, rebuild_peers_at: Instant, - db: D, swarm: Swarm, gossip: mpsc::UnboundedReceiver, + signed_cosigns: mpsc::UnboundedSender, + tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec)>, outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender>)>, - outbound_requests_responses: HashMap>>, + outbound_request_responses: HashMap>>, + + inbound_request_response_channels: HashMap>, + heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>, + /* TODO + let cosigns = Cosigning::::notable_cosigns(&self.db, global_session); + let res = reqres::Response::NotableCosigns(cosigns); + let _: Result<_, _> = self.swarm.behaviour_mut().reqres.send_response(channel, res); + */ + notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>, + inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>, } -impl SwarmTask { - fn handle_reqres(&mut self, event: reqres::Event) { - match event { - reqres::Event::Message { message, .. } => match message { - reqres::Message::Request { request_id: _, request, channel } => { - match request { - // TODO: Send these - reqres::Request::KeepAlive => {} - reqres::Request::Heartbeat { set, latest_block_hash } => todo!("TODO"), - reqres::Request::NotableCosigns { global_session } => { - // TODO: Move this out - let cosigns = Cosigning::::notable_cosigns(&self.db, global_session); - let res = reqres::Response::NotableCosigns(cosigns); - let _: Result<_, _> = self.swarm.behaviour_mut().reqres.send_response(channel, res); - } - } - } - reqres::Message::Response { request_id, response } => { - // Send Some(response) as the response for the request - if let Some(channel) = self.outbound_requests_responses.remove(&request_id) { - let _: Result<_, _> = channel.send(Some(response)); - } - } - }, - reqres::Event::OutboundFailure { request_id, .. } => { - // Send None as the response for the request - if let Some(channel) = self.outbound_requests_responses.remove(&request_id) { - let _: Result<_, _> = channel.send(None); - } - } - reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {} - } - } - +impl SwarmTask { fn handle_gossip(&mut self, event: gossip::Event) { match event { gossip::Event::Message { message, .. } => { @@ -103,8 +81,12 @@ impl SwarmTask { return; }; match message { - gossip::Message::Tributary { set, message } => todo!("TODO"), - gossip::Message::Cosign(signed_cosign) => todo!("TODO"), + gossip::Message::Tributary { set, message } => { + let _: Result<_, _> = self.tributary_gossip.send((set, message)); + } + gossip::Message::Cosign(signed_cosign) => { + let _: Result<_, _> = self.signed_cosigns.send(signed_cosign); + } } } gossip::Event::Subscribed { .. } | gossip::Event::Unsubscribed { .. } => {} @@ -114,6 +96,44 @@ impl SwarmTask { } } + fn handle_reqres(&mut self, event: reqres::Event) { + match event { + reqres::Event::Message { message, .. } => match message { + reqres::Message::Request { request_id, request, channel } => { + match request { + // TODO: Send these + reqres::Request::KeepAlive => { + let _: Result<_, _> = + self.swarm.behaviour_mut().reqres.send_response(channel, Response::NoResponse); + } + reqres::Request::Heartbeat { set, latest_block_hash } => { + self.inbound_request_response_channels.insert(request_id, channel); + let _: Result<_, _> = + self.heartbeat_requests.send((request_id, set, latest_block_hash)); + } + reqres::Request::NotableCosigns { global_session } => { + self.inbound_request_response_channels.insert(request_id, channel); + let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session)); + } + } + } + reqres::Message::Response { request_id, response } => { + // Send Some(response) as the response for the request + if let Some(channel) = self.outbound_request_responses.remove(&request_id) { + let _: Result<_, _> = channel.send(Some(response)); + } + } + }, + reqres::Event::OutboundFailure { request_id, .. } => { + // Send None as the response for the request + if let Some(channel) = self.outbound_request_responses.remove(&request_id) { + let _: Result<_, _> = channel.send(None); + } + } + reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {} + } + } + async fn run(mut self) { loop { let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now()); @@ -228,19 +248,28 @@ impl SwarmTask { } } - request = self.outbound_requests.recv() => { - let (peer, request, response_channel) = - request.expect("channel for requests was closed?"); - let request_id = self.swarm.behaviour_mut().reqres.send_request(&peer, request); - self.outbound_requests_responses.insert(request_id, response_channel); - } - message = self.gossip.recv() => { let message = message.expect("channel for messages to gossip was closed?"); let topic = message.topic(); let message = borsh::to_vec(&message).unwrap(); let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message); } + + request = self.outbound_requests.recv() => { + let (peer, request, response_channel) = + request.expect("channel for requests was closed?"); + let request_id = self.swarm.behaviour_mut().reqres.send_request(&peer, request); + self.outbound_request_responses.insert(request_id, response_channel); + } + + response = self.inbound_request_responses.recv() => { + let (request_id, response) = + response.expect("channel for inbound request responses was closed?"); + if let Some(channel) = self.inbound_request_response_channels.remove(&request_id) { + let _: Result<_, _> = + self.swarm.behaviour_mut().reqres.send_response(channel, response); + } + } } } }