mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 04:09:23 +00:00
Replace KeepAlive with ping
This is more standard and allows measuring latency.
This commit is contained in:
@@ -55,7 +55,7 @@ env_logger = { version = "0.10", default-features = false, features = ["humantim
|
|||||||
|
|
||||||
futures-util = { version = "0.3", default-features = false, features = ["std"] }
|
futures-util = { version = "0.3", default-features = false, features = ["std"] }
|
||||||
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
|
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
|
||||||
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] }
|
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "ping", "request-response", "gossipsub", "macros"] }
|
||||||
|
|
||||||
serai-cosign = { path = "./cosign" }
|
serai-cosign = { path = "./cosign" }
|
||||||
|
|
||||||
|
|||||||
@@ -58,7 +58,6 @@ pub(crate) fn new_behavior() -> Behavior {
|
|||||||
.history_gossip(usize::try_from(heartbeats_to_gossip).unwrap())
|
.history_gossip(usize::try_from(heartbeats_to_gossip).unwrap())
|
||||||
.heartbeat_interval(Duration::from_millis(heartbeat_interval.into()))
|
.heartbeat_interval(Duration::from_millis(heartbeat_interval.into()))
|
||||||
.max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE)
|
.max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE)
|
||||||
.idle_timeout(KEEP_ALIVE_INTERVAL + Duration::from_secs(5))
|
|
||||||
.duplicate_cache_time(Duration::from_millis((heartbeats_to_keep * heartbeat_interval).into()))
|
.duplicate_cache_time(Duration::from_millis((heartbeats_to_keep * heartbeat_interval).into()))
|
||||||
.validation_mode(ValidationMode::Anonymous)
|
.validation_mode(ValidationMode::Anonymous)
|
||||||
// Uses a content based message ID to avoid duplicates as much as possible
|
// Uses a content based message ID to avoid duplicates as much as possible
|
||||||
|
|||||||
@@ -44,6 +44,9 @@ use authenticate::OnlyValidators;
|
|||||||
mod dial;
|
mod dial;
|
||||||
use dial::DialTask;
|
use dial::DialTask;
|
||||||
|
|
||||||
|
/// The ping behavior, used to ensure connection latency is below the limit
|
||||||
|
mod ping;
|
||||||
|
|
||||||
/// The request-response messages and behavior
|
/// The request-response messages and behavior
|
||||||
mod reqres;
|
mod reqres;
|
||||||
use reqres::{RequestId, Request, Response};
|
use reqres::{RequestId, Request, Response};
|
||||||
@@ -109,6 +112,7 @@ struct Peers {
|
|||||||
|
|
||||||
#[derive(NetworkBehaviour)]
|
#[derive(NetworkBehaviour)]
|
||||||
struct Behavior {
|
struct Behavior {
|
||||||
|
ping: ping::Behavior,
|
||||||
reqres: reqres::Behavior,
|
reqres: reqres::Behavior,
|
||||||
gossip: gossip::Behavior,
|
gossip: gossip::Behavior,
|
||||||
}
|
}
|
||||||
@@ -162,14 +166,19 @@ impl Libp2p {
|
|||||||
config
|
config
|
||||||
};
|
};
|
||||||
|
|
||||||
let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() };
|
|
||||||
|
|
||||||
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
|
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
|
||||||
.with_tokio()
|
.with_tokio()
|
||||||
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
|
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.with_behaviour(|_| behavior)
|
.with_behaviour(|_| Behavior {
|
||||||
|
ping: ping::new_behavior(),
|
||||||
|
reqres: reqres::new_behavior(),
|
||||||
|
gossip: gossip::new_behavior(),
|
||||||
|
})
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
.with_swarm_config(|config| {
|
||||||
|
config.with_idle_connection_timeout(ping::INTERVAL + ping::TIMEOUT + Duration::from_secs(5))
|
||||||
|
})
|
||||||
.build();
|
.build();
|
||||||
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
|
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
|
||||||
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
|
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
|
||||||
|
|||||||
17
coordinator/src/p2p/libp2p/ping.rs
Normal file
17
coordinator/src/p2p/libp2p/ping.rs
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
use core::time::Duration;
|
||||||
|
|
||||||
|
use tributary::tendermint::LATENCY_TIME;
|
||||||
|
|
||||||
|
use libp2p::ping::{self, Config, Behaviour};
|
||||||
|
pub use ping::Event;
|
||||||
|
|
||||||
|
pub(crate) const INTERVAL: Duration = Duration::from_secs(30);
|
||||||
|
// LATENCY_TIME represents the maximum latency for message delivery. Sending the ping, and
|
||||||
|
// receiving the pong, each have to occur within this time bound to validate the connection. We
|
||||||
|
// enforce that, as best we can, by requiring the round-trip be within twice the allowed latency.
|
||||||
|
pub(crate) const TIMEOUT: Duration = Duration::from_millis((2 * LATENCY_TIME) as u64);
|
||||||
|
|
||||||
|
pub(crate) type Behavior = Behaviour;
|
||||||
|
pub(crate) fn new_behavior() -> Behavior {
|
||||||
|
Behavior::new(Config::default().with_interval(INTERVAL).with_timeout(TIMEOUT))
|
||||||
|
}
|
||||||
@@ -27,8 +27,6 @@ const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";
|
|||||||
/// Requests which can be made via the request-response protocol.
|
/// Requests which can be made via the request-response protocol.
|
||||||
#[derive(Clone, Copy, Debug, BorshSerialize, BorshDeserialize)]
|
#[derive(Clone, Copy, Debug, BorshSerialize, BorshDeserialize)]
|
||||||
pub(crate) enum Request {
|
pub(crate) enum Request {
|
||||||
/// A keep-alive to prevent our connections from being dropped.
|
|
||||||
KeepAlive,
|
|
||||||
/// A heartbeat informing our peers of our latest block, for the specified blockchain, on regular
|
/// A heartbeat informing our peers of our latest block, for the specified blockchain, on regular
|
||||||
/// intervals.
|
/// intervals.
|
||||||
///
|
///
|
||||||
|
|||||||
@@ -24,11 +24,11 @@ use libp2p::{
|
|||||||
use crate::p2p::libp2p::{
|
use crate::p2p::libp2p::{
|
||||||
Peers, BehaviorEvent, Behavior,
|
Peers, BehaviorEvent, Behavior,
|
||||||
validators::Validators,
|
validators::Validators,
|
||||||
|
ping,
|
||||||
reqres::{self, Request, Response},
|
reqres::{self, Request, Response},
|
||||||
gossip,
|
gossip,
|
||||||
};
|
};
|
||||||
|
|
||||||
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
|
|
||||||
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
|
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@@ -106,10 +106,6 @@ impl SwarmTask {
|
|||||||
match event {
|
match event {
|
||||||
reqres::Event::Message { message, .. } => match message {
|
reqres::Event::Message { message, .. } => match message {
|
||||||
reqres::Message::Request { request_id, request, channel } => match request {
|
reqres::Message::Request { request_id, request, channel } => match request {
|
||||||
reqres::Request::KeepAlive => {
|
|
||||||
let _: Result<_, _> =
|
|
||||||
self.swarm.behaviour_mut().reqres.send_response(channel, Response::None);
|
|
||||||
}
|
|
||||||
reqres::Request::Heartbeat { set, latest_block_hash } => {
|
reqres::Request::Heartbeat { set, latest_block_hash } => {
|
||||||
self.inbound_request_response_channels.insert(request_id, channel);
|
self.inbound_request_response_channels.insert(request_id, channel);
|
||||||
let _: Result<_, _> =
|
let _: Result<_, _> =
|
||||||
@@ -138,19 +134,9 @@ impl SwarmTask {
|
|||||||
|
|
||||||
async fn run(mut self) {
|
async fn run(mut self) {
|
||||||
loop {
|
loop {
|
||||||
let time_till_keep_alive = Instant::now().saturating_duration_since(self.last_message);
|
|
||||||
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
|
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
() = tokio::time::sleep(time_till_keep_alive) => {
|
|
||||||
let peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
|
|
||||||
let behavior = self.swarm.behaviour_mut();
|
|
||||||
for peer in peers {
|
|
||||||
behavior.reqres.send_request(&peer, Request::KeepAlive);
|
|
||||||
}
|
|
||||||
self.last_message = Instant::now();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dial peers we're instructed to
|
// Dial peers we're instructed to
|
||||||
dial_opts = self.to_dial.recv() => {
|
dial_opts = self.to_dial.recv() => {
|
||||||
let dial_opts = dial_opts.expect("DialTask was closed?");
|
let dial_opts = dial_opts.expect("DialTask was closed?");
|
||||||
@@ -239,6 +225,13 @@ impl SwarmTask {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SwarmEvent::Behaviour(
|
||||||
|
BehaviorEvent::Ping(ping::Event { peer: _, connection, result, })
|
||||||
|
) => {
|
||||||
|
if result.is_err() {
|
||||||
|
self.swarm.close_connection(connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => {
|
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => {
|
||||||
self.handle_reqres(event)
|
self.handle_reqres(event)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user