mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 12:19:24 +00:00
Inline broadcast_raw now that it doesn't have multiple callers
This commit is contained in:
@@ -36,7 +36,7 @@ use libp2p::{
|
|||||||
IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError,
|
IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError,
|
||||||
Behaviour as GsBehavior,
|
Behaviour as GsBehavior,
|
||||||
},
|
},
|
||||||
swarm::{NetworkBehaviour, SwarmEvent, Swarm},
|
swarm::{NetworkBehaviour, SwarmEvent},
|
||||||
SwarmBuilder,
|
SwarmBuilder,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -456,32 +456,6 @@ impl LibP2p {
|
|||||||
tokio::spawn({
|
tokio::spawn({
|
||||||
let mut time_of_last_p2p_message = Instant::now();
|
let mut time_of_last_p2p_message = Instant::now();
|
||||||
|
|
||||||
#[allow(clippy::needless_pass_by_ref_mut)] // False positive
|
|
||||||
fn broadcast_raw(
|
|
||||||
p2p: &mut Swarm<Behavior>,
|
|
||||||
time_of_last_p2p_message: &mut Instant,
|
|
||||||
set: Option<ValidatorSet>,
|
|
||||||
msg: Vec<u8>,
|
|
||||||
) {
|
|
||||||
// Update the time of last message
|
|
||||||
*time_of_last_p2p_message = Instant::now();
|
|
||||||
|
|
||||||
let topic =
|
|
||||||
if let Some(set) = set { topic_for_set(set) } else { IdentTopic::new(LIBP2P_TOPIC) };
|
|
||||||
|
|
||||||
match p2p.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
|
|
||||||
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
|
|
||||||
Err(PublishError::InsufficientPeers) => {
|
|
||||||
log::warn!("failed to send p2p message due to insufficient peers")
|
|
||||||
}
|
|
||||||
Err(PublishError::MessageTooLarge) => {
|
|
||||||
panic!("tried to send a too large message: {}", hex::encode(msg))
|
|
||||||
}
|
|
||||||
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
|
|
||||||
Err(PublishError::Duplicate) | Ok(_) => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async move {
|
async move {
|
||||||
let connected_peers = connected_peers.clone();
|
let connected_peers = connected_peers.clone();
|
||||||
|
|
||||||
@@ -516,22 +490,41 @@ impl LibP2p {
|
|||||||
|
|
||||||
// Handle any queued outbound messages
|
// Handle any queued outbound messages
|
||||||
msg = broadcast_recv.recv() => {
|
msg = broadcast_recv.recv() => {
|
||||||
|
// Update the time of last message
|
||||||
|
time_of_last_p2p_message = Instant::now();
|
||||||
|
|
||||||
let (kind, msg): (P2pMessageKind, Vec<u8>) =
|
let (kind, msg): (P2pMessageKind, Vec<u8>) =
|
||||||
msg.expect("broadcast_recv closed. are we shutting down?");
|
msg.expect("broadcast_recv closed. are we shutting down?");
|
||||||
|
|
||||||
if matches!(kind, P2pMessageKind::ReqRes(_)) {
|
if matches!(kind, P2pMessageKind::ReqRes(_)) {
|
||||||
// Use request/response
|
// Use request/response, yet send to all connected peers
|
||||||
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
|
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
|
||||||
swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone());
|
swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Use gossipsub
|
// Use gossipsub
|
||||||
let set = kind.genesis().and_then(|genesis| set_for_genesis.get(&genesis).copied());
|
|
||||||
broadcast_raw(
|
let set =
|
||||||
&mut swarm,
|
kind.genesis().and_then(|genesis| set_for_genesis.get(&genesis).copied());
|
||||||
&mut time_of_last_p2p_message,
|
let topic = if let Some(set) = set {
|
||||||
set,
|
topic_for_set(set)
|
||||||
msg,
|
} else {
|
||||||
);
|
IdentTopic::new(LIBP2P_TOPIC)
|
||||||
|
};
|
||||||
|
|
||||||
|
match swarm.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
|
||||||
|
Err(PublishError::SigningError(e)) => {
|
||||||
|
panic!("signing error when broadcasting: {e}")
|
||||||
|
},
|
||||||
|
Err(PublishError::InsufficientPeers) => {
|
||||||
|
log::warn!("failed to send p2p message due to insufficient peers")
|
||||||
|
}
|
||||||
|
Err(PublishError::MessageTooLarge) => {
|
||||||
|
panic!("tried to send a too large message: {}", hex::encode(msg))
|
||||||
|
}
|
||||||
|
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
|
||||||
|
Err(PublishError::Duplicate) | Ok(_) => {}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user