From f55165e01607f6ea71047b7118342d7ce0a10558 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Tue, 7 Jan 2025 15:51:15 -0500 Subject: [PATCH] Add channels to send requests/recv responses --- coordinator/src/p2p/mod.rs | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/coordinator/src/p2p/mod.rs b/coordinator/src/p2p/mod.rs index 55c14cdb..cedaed3e 100644 --- a/coordinator/src/p2p/mod.rs +++ b/coordinator/src/p2p/mod.rs @@ -8,7 +8,7 @@ use borsh::BorshDeserialize; use serai_client::primitives::{NetworkId, PublicKey}; -use tokio::sync::{mpsc, RwLock}; +use tokio::sync::{mpsc, oneshot, RwLock}; use serai_db::Db; use serai_task::TaskHandle; @@ -19,6 +19,7 @@ use futures_util::StreamExt; use libp2p::{ multihash::Multihash, identity::PeerId, + request_response::RequestId, swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm}, }; @@ -97,6 +98,9 @@ struct SwarmTask { db: D, swarm: Swarm, + + request_recv: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender>)>, + request_resp: HashMap>>, } impl SwarmTask { @@ -163,6 +167,13 @@ impl SwarmTask { let _: Result<_, _> = self.swarm.dial(dial_opts); } + request = self.request_recv.recv() => { + let (peer, request, response_channel) = + request.expect("channel for requests was closed?"); + let request_id = self.swarm.behaviour_mut().reqres.send_request(&peer, request); + self.request_resp.insert(request_id, response_channel); + } + // Handle swarm events event = self.swarm.next() => { // `Swarm::next` will never return `Poll::Ready(None)` @@ -178,6 +189,7 @@ impl SwarmTask { reqres::Request::KeepAlive => {}, reqres::Request::Heartbeat { set, latest_block_hash } => todo!("TODO"), reqres::Request::NotableCosigns { global_session } => { + // TODO: Move this out let cosigns = Cosigning::::notable_cosigns(&self.db, global_session); let res = reqres::Response::NotableCosigns(cosigns); let _: Result<_, _> = @@ -185,9 +197,19 @@ impl SwarmTask { }, } } - reqres::Message::Response { request_id, response } => todo!("TODO"), + reqres::Message::Response { request_id, response } => { + // Send Some(response) as the response for the request + if let Some(channel) = self.request_resp.remove(&request_id) { + let _: Result<_, _> = channel.send(Some(response)); + } + }, } - reqres::Event::OutboundFailure { request_id, .. } => todo!("TODO"), + reqres::Event::OutboundFailure { request_id, .. } => { + // Send None as the response for the request + if let Some(channel) = self.request_resp.remove(&request_id) { + let _: Result<_, _> = channel.send(None); + } + }, reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}, }, SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => match event {