From 20326bba733ea87c1c6e15767c4507190325e2ab Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Wed, 8 Jan 2025 23:01:09 -0500 Subject: [PATCH] Replace KeepAlive with ping This is more standard and allows measuring latency. --- coordinator/Cargo.toml | 2 +- coordinator/src/p2p/libp2p/gossip.rs | 1 - coordinator/src/p2p/libp2p/mod.rs | 15 ++++++++++++--- coordinator/src/p2p/libp2p/ping.rs | 17 +++++++++++++++++ coordinator/src/p2p/libp2p/reqres.rs | 2 -- coordinator/src/p2p/libp2p/swarm.rs | 23 ++++++++--------------- 6 files changed, 38 insertions(+), 22 deletions(-) create mode 100644 coordinator/src/p2p/libp2p/ping.rs diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index d0f8cb24..2fc373aa 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -55,7 +55,7 @@ env_logger = { version = "0.10", default-features = false, features = ["humantim futures-util = { version = "0.3", default-features = false, features = ["std"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } -libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] } +libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "ping", "request-response", "gossipsub", "macros"] } serai-cosign = { path = "./cosign" } diff --git a/coordinator/src/p2p/libp2p/gossip.rs b/coordinator/src/p2p/libp2p/gossip.rs index 66a0b24a..4d75d9ea 100644 --- a/coordinator/src/p2p/libp2p/gossip.rs +++ b/coordinator/src/p2p/libp2p/gossip.rs @@ -58,7 +58,6 @@ pub(crate) fn new_behavior() -> Behavior { .history_gossip(usize::try_from(heartbeats_to_gossip).unwrap()) .heartbeat_interval(Duration::from_millis(heartbeat_interval.into())) .max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE) - .idle_timeout(KEEP_ALIVE_INTERVAL + Duration::from_secs(5)) .duplicate_cache_time(Duration::from_millis((heartbeats_to_keep * heartbeat_interval).into())) .validation_mode(ValidationMode::Anonymous) // Uses a content based message ID to avoid duplicates as much as possible diff --git a/coordinator/src/p2p/libp2p/mod.rs b/coordinator/src/p2p/libp2p/mod.rs index 93db7c88..ce60d285 100644 --- a/coordinator/src/p2p/libp2p/mod.rs +++ b/coordinator/src/p2p/libp2p/mod.rs @@ -44,6 +44,9 @@ use authenticate::OnlyValidators; mod dial; use dial::DialTask; +/// The ping behavior, used to ensure connection latency is below the limit +mod ping; + /// The request-response messages and behavior mod reqres; use reqres::{RequestId, Request, Response}; @@ -109,6 +112,7 @@ struct Peers { #[derive(NetworkBehaviour)] struct Behavior { + ping: ping::Behavior, reqres: reqres::Behavior, gossip: gossip::Behavior, } @@ -162,14 +166,19 @@ impl Libp2p { config }; - let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() }; - let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519()) .with_tokio() .with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux) .unwrap() - .with_behaviour(|_| behavior) + .with_behaviour(|_| Behavior { + ping: ping::new_behavior(), + reqres: reqres::new_behavior(), + gossip: gossip::new_behavior(), + }) .unwrap() + .with_swarm_config(|config| { + config.with_idle_connection_timeout(ping::INTERVAL + ping::TIMEOUT + Duration::from_secs(5)) + }) .build(); swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap(); swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap(); diff --git a/coordinator/src/p2p/libp2p/ping.rs b/coordinator/src/p2p/libp2p/ping.rs new file mode 100644 index 00000000..d579af05 --- /dev/null +++ b/coordinator/src/p2p/libp2p/ping.rs @@ -0,0 +1,17 @@ +use core::time::Duration; + +use tributary::tendermint::LATENCY_TIME; + +use libp2p::ping::{self, Config, Behaviour}; +pub use ping::Event; + +pub(crate) const INTERVAL: Duration = Duration::from_secs(30); +// LATENCY_TIME represents the maximum latency for message delivery. Sending the ping, and +// receiving the pong, each have to occur within this time bound to validate the connection. We +// enforce that, as best we can, by requiring the round-trip be within twice the allowed latency. +pub(crate) const TIMEOUT: Duration = Duration::from_millis((2 * LATENCY_TIME) as u64); + +pub(crate) type Behavior = Behaviour; +pub(crate) fn new_behavior() -> Behavior { + Behavior::new(Config::default().with_interval(INTERVAL).with_timeout(TIMEOUT)) +} diff --git a/coordinator/src/p2p/libp2p/reqres.rs b/coordinator/src/p2p/libp2p/reqres.rs index f58abc8b..8fe02c30 100644 --- a/coordinator/src/p2p/libp2p/reqres.rs +++ b/coordinator/src/p2p/libp2p/reqres.rs @@ -27,8 +27,6 @@ const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0"; /// Requests which can be made via the request-response protocol. #[derive(Clone, Copy, Debug, BorshSerialize, BorshDeserialize)] pub(crate) enum Request { - /// A keep-alive to prevent our connections from being dropped. - KeepAlive, /// A heartbeat informing our peers of our latest block, for the specified blockchain, on regular /// intervals. /// diff --git a/coordinator/src/p2p/libp2p/swarm.rs b/coordinator/src/p2p/libp2p/swarm.rs index 148e615f..63f8f734 100644 --- a/coordinator/src/p2p/libp2p/swarm.rs +++ b/coordinator/src/p2p/libp2p/swarm.rs @@ -24,11 +24,11 @@ use libp2p::{ use crate::p2p::libp2p::{ Peers, BehaviorEvent, Behavior, validators::Validators, + ping, reqres::{self, Request, Response}, gossip, }; -const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80); const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60); /* @@ -106,10 +106,6 @@ impl SwarmTask { match event { reqres::Event::Message { message, .. } => match message { reqres::Message::Request { request_id, request, channel } => match request { - reqres::Request::KeepAlive => { - let _: Result<_, _> = - self.swarm.behaviour_mut().reqres.send_response(channel, Response::None); - } reqres::Request::Heartbeat { set, latest_block_hash } => { self.inbound_request_response_channels.insert(request_id, channel); let _: Result<_, _> = @@ -138,19 +134,9 @@ impl SwarmTask { async fn run(mut self) { loop { - let time_till_keep_alive = Instant::now().saturating_duration_since(self.last_message); let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now()); tokio::select! { - () = tokio::time::sleep(time_till_keep_alive) => { - let peers = self.swarm.connected_peers().copied().collect::>(); - let behavior = self.swarm.behaviour_mut(); - for peer in peers { - behavior.reqres.send_request(&peer, Request::KeepAlive); - } - self.last_message = Instant::now(); - } - // Dial peers we're instructed to dial_opts = self.to_dial.recv() => { let dial_opts = dial_opts.expect("DialTask was closed?"); @@ -239,6 +225,13 @@ impl SwarmTask { } } + SwarmEvent::Behaviour( + BehaviorEvent::Ping(ping::Event { peer: _, connection, result, }) + ) => { + if result.is_err() { + self.swarm.close_connection(connection); + } + } SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => { self.handle_reqres(event) }