Add channels to send requests/recv responses

This commit is contained in:
Luke Parker
2025-01-07 15:51:15 -05:00
parent d9e9887d34
commit f55165e016

View File

@@ -8,7 +8,7 @@ use borsh::BorshDeserialize;
use serai_client::primitives::{NetworkId, PublicKey}; use serai_client::primitives::{NetworkId, PublicKey};
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use serai_db::Db; use serai_db::Db;
use serai_task::TaskHandle; use serai_task::TaskHandle;
@@ -19,6 +19,7 @@ use futures_util::StreamExt;
use libp2p::{ use libp2p::{
multihash::Multihash, multihash::Multihash,
identity::PeerId, identity::PeerId,
request_response::RequestId,
swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm}, swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm},
}; };
@@ -97,6 +98,9 @@ struct SwarmTask<D: Db> {
db: D, db: D,
swarm: Swarm<Behavior>, swarm: Swarm<Behavior>,
request_recv: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Option<Response>>)>,
request_resp: HashMap<RequestId, oneshot::Sender<Option<Response>>>,
} }
impl<D: Db> SwarmTask<D> { impl<D: Db> SwarmTask<D> {
@@ -163,6 +167,13 @@ impl<D: Db> SwarmTask<D> {
let _: Result<_, _> = self.swarm.dial(dial_opts); let _: Result<_, _> = self.swarm.dial(dial_opts);
} }
request = self.request_recv.recv() => {
let (peer, request, response_channel) =
request.expect("channel for requests was closed?");
let request_id = self.swarm.behaviour_mut().reqres.send_request(&peer, request);
self.request_resp.insert(request_id, response_channel);
}
// Handle swarm events // Handle swarm events
event = self.swarm.next() => { event = self.swarm.next() => {
// `Swarm::next` will never return `Poll::Ready(None)` // `Swarm::next` will never return `Poll::Ready(None)`
@@ -178,6 +189,7 @@ impl<D: Db> SwarmTask<D> {
reqres::Request::KeepAlive => {}, reqres::Request::KeepAlive => {},
reqres::Request::Heartbeat { set, latest_block_hash } => todo!("TODO"), reqres::Request::Heartbeat { set, latest_block_hash } => todo!("TODO"),
reqres::Request::NotableCosigns { global_session } => { reqres::Request::NotableCosigns { global_session } => {
// TODO: Move this out
let cosigns = Cosigning::<D>::notable_cosigns(&self.db, global_session); let cosigns = Cosigning::<D>::notable_cosigns(&self.db, global_session);
let res = reqres::Response::NotableCosigns(cosigns); let res = reqres::Response::NotableCosigns(cosigns);
let _: Result<_, _> = let _: Result<_, _> =
@@ -185,9 +197,19 @@ impl<D: Db> SwarmTask<D> {
}, },
} }
} }
reqres::Message::Response { request_id, response } => todo!("TODO"), reqres::Message::Response { request_id, response } => {
// Send Some(response) as the response for the request
if let Some(channel) = self.request_resp.remove(&request_id) {
let _: Result<_, _> = channel.send(Some(response));
} }
reqres::Event::OutboundFailure { request_id, .. } => todo!("TODO"), },
}
reqres::Event::OutboundFailure { request_id, .. } => {
// Send None as the response for the request
if let Some(channel) = self.request_resp.remove(&request_id) {
let _: Result<_, _> = channel.send(None);
}
},
reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}, reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {},
}, },
SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => match event { SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => match event {