Inline broadcast_raw now that it doesn't have multiple callers

This commit is contained in:
Luke Parker
2024-04-23 06:44:21 -04:00
parent c3b6abf020
commit 485e454680

View File

@@ -36,7 +36,7 @@ use libp2p::{
IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError, IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError,
Behaviour as GsBehavior, Behaviour as GsBehavior,
}, },
swarm::{NetworkBehaviour, SwarmEvent, Swarm}, swarm::{NetworkBehaviour, SwarmEvent},
SwarmBuilder, SwarmBuilder,
}; };
@@ -456,32 +456,6 @@ impl LibP2p {
tokio::spawn({ tokio::spawn({
let mut time_of_last_p2p_message = Instant::now(); let mut time_of_last_p2p_message = Instant::now();
#[allow(clippy::needless_pass_by_ref_mut)] // False positive
fn broadcast_raw(
p2p: &mut Swarm<Behavior>,
time_of_last_p2p_message: &mut Instant,
set: Option<ValidatorSet>,
msg: Vec<u8>,
) {
// Update the time of last message
*time_of_last_p2p_message = Instant::now();
let topic =
if let Some(set) = set { topic_for_set(set) } else { IdentTopic::new(LIBP2P_TOPIC) };
match p2p.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
Err(PublishError::InsufficientPeers) => {
log::warn!("failed to send p2p message due to insufficient peers")
}
Err(PublishError::MessageTooLarge) => {
panic!("tried to send a too large message: {}", hex::encode(msg))
}
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
Err(PublishError::Duplicate) | Ok(_) => {}
}
}
async move { async move {
let connected_peers = connected_peers.clone(); let connected_peers = connected_peers.clone();
@@ -516,22 +490,41 @@ impl LibP2p {
// Handle any queued outbound messages // Handle any queued outbound messages
msg = broadcast_recv.recv() => { msg = broadcast_recv.recv() => {
// Update the time of last message
time_of_last_p2p_message = Instant::now();
let (kind, msg): (P2pMessageKind, Vec<u8>) = let (kind, msg): (P2pMessageKind, Vec<u8>) =
msg.expect("broadcast_recv closed. are we shutting down?"); msg.expect("broadcast_recv closed. are we shutting down?");
if matches!(kind, P2pMessageKind::ReqRes(_)) { if matches!(kind, P2pMessageKind::ReqRes(_)) {
// Use request/response // Use request/response, yet send to all connected peers
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() { for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone()); swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone());
} }
} else { } else {
// Use gossipsub // Use gossipsub
let set = kind.genesis().and_then(|genesis| set_for_genesis.get(&genesis).copied());
broadcast_raw( let set =
&mut swarm, kind.genesis().and_then(|genesis| set_for_genesis.get(&genesis).copied());
&mut time_of_last_p2p_message, let topic = if let Some(set) = set {
set, topic_for_set(set)
msg, } else {
); IdentTopic::new(LIBP2P_TOPIC)
};
match swarm.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
Err(PublishError::SigningError(e)) => {
panic!("signing error when broadcasting: {e}")
},
Err(PublishError::InsufficientPeers) => {
log::warn!("failed to send p2p message due to insufficient peers")
}
Err(PublishError::MessageTooLarge) => {
panic!("tried to send a too large message: {}", hex::encode(msg))
}
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
Err(PublishError::Duplicate) | Ok(_) => {}
}
} }
} }