Add keep-alive timeout to coordinator

The Heartbeat was meant to serve for this, yet no Heartbeats are fired when we
don't have active tributaries.

libp2p does offer an explicit KeepAlive protocol, yet it's not recommended in
prod. While this likely has the same pit falls as LibP2p's KeepAlive protocol,
it's at least tailored to our timing.
This commit is contained in:
Luke Parker
2023-08-21 02:36:03 -04:00
parent 45ea805620
commit dc88b29b92
2 changed files with 44 additions and 12 deletions

View File

@@ -53,8 +53,8 @@ mod substrate;
#[cfg(test)] #[cfg(test)]
pub mod tests; pub mod tests;
// This is a static to satisfy lifetime expectations
lazy_static::lazy_static! { lazy_static::lazy_static! {
// This is a static to satisfy lifetime expectations
static ref NEW_TRIBUTARIES: RwLock<VecDeque<TributarySpec>> = RwLock::new(VecDeque::new()); static ref NEW_TRIBUTARIES: RwLock<VecDeque<TributarySpec>> = RwLock::new(VecDeque::new());
} }
@@ -271,6 +271,8 @@ pub async fn handle_p2p<D: Db, P: P2p>(
loop { loop {
let mut msg = p2p.receive().await; let mut msg = p2p.receive().await;
match msg.kind { match msg.kind {
P2pMessageKind::KeepAlive => {}
P2pMessageKind::Tributary(genesis) => { P2pMessageKind::Tributary(genesis) => {
let tributaries = tributaries.read().await; let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else { let Some(tributary) = tributaries.get(&genesis) else {

View File

@@ -1,5 +1,5 @@
use core::{time::Duration, fmt, task::Poll}; use core::{time::Duration, fmt, task::Poll};
use std::{sync::Arc, collections::VecDeque, io::Read}; use std::{sync::Arc, time::Instant, collections::VecDeque, io::Read};
use async_trait::async_trait; use async_trait::async_trait;
@@ -27,6 +27,7 @@ const LIBP2P_TOPIC: &str = "serai-coordinator";
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind { pub enum P2pMessageKind {
KeepAlive,
Tributary([u8; 32]), Tributary([u8; 32]),
Heartbeat([u8; 32]), Heartbeat([u8; 32]),
Block([u8; 32]), Block([u8; 32]),
@@ -35,21 +36,22 @@ pub enum P2pMessageKind {
impl P2pMessageKind { impl P2pMessageKind {
fn serialize(&self) -> Vec<u8> { fn serialize(&self) -> Vec<u8> {
match self { match self {
P2pMessageKind::KeepAlive => vec![0],
P2pMessageKind::Tributary(genesis) => { P2pMessageKind::Tributary(genesis) => {
let mut res = vec![0];
res.extend(genesis);
res
}
P2pMessageKind::Heartbeat(genesis) => {
let mut res = vec![1]; let mut res = vec![1];
res.extend(genesis); res.extend(genesis);
res res
} }
P2pMessageKind::Block(genesis) => { P2pMessageKind::Heartbeat(genesis) => {
let mut res = vec![2]; let mut res = vec![2];
res.extend(genesis); res.extend(genesis);
res res
} }
P2pMessageKind::Block(genesis) => {
let mut res = vec![3];
res.extend(genesis);
res
}
} }
} }
@@ -57,17 +59,18 @@ impl P2pMessageKind {
let mut kind = [0; 1]; let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?; reader.read_exact(&mut kind).ok()?;
match kind[0] { match kind[0] {
0 => Some({ 0 => Some(P2pMessageKind::KeepAlive),
1 => Some({
let mut genesis = [0; 32]; let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?; reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Tributary(genesis) P2pMessageKind::Tributary(genesis)
}), }),
1 => Some({ 2 => Some({
let mut genesis = [0; 32]; let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?; reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Heartbeat(genesis) P2pMessageKind::Heartbeat(genesis)
}), }),
2 => Some({ 3 => Some({
let mut genesis = [0; 32]; let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?; reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Block(genesis) P2pMessageKind::Block(genesis)
@@ -103,6 +106,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
log::trace!( log::trace!(
"broadcasting p2p message (kind {})", "broadcasting p2p message (kind {})",
match kind { match kind {
P2pMessageKind::KeepAlive => "KeepAlive".to_string(),
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
@@ -128,6 +132,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
log::trace!( log::trace!(
"received p2p message (kind {})", "received p2p message (kind {})",
match kind { match kind {
P2pMessageKind::KeepAlive => "KeepAlive".to_string(),
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
@@ -144,6 +149,10 @@ struct Behavior {
mdns: libp2p::mdns::tokio::Behaviour, mdns: libp2p::mdns::tokio::Behaviour,
} }
lazy_static::lazy_static! {
static ref TIME_OF_LAST_P2P_MESSAGE: Mutex<Instant> = Mutex::new(Instant::now());
}
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
#[derive(Clone)] #[derive(Clone)]
pub struct LibP2p(Arc<Mutex<Swarm<Behavior>>>, Arc<Mutex<VecDeque<(PeerId, Vec<u8>)>>>); pub struct LibP2p(Arc<Mutex<Swarm<Behavior>>>, Arc<Mutex<VecDeque<(PeerId, Vec<u8>)>>>);
@@ -178,6 +187,8 @@ impl LibP2p {
use blake2::{Digest, Blake2s256}; use blake2::{Digest, Blake2s256};
let config = ConfigBuilder::default() let config = ConfigBuilder::default()
.max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE) .max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE)
// We send KeepAlive after 80s
.idle_timeout(Duration::from_secs(85))
.validation_mode(ValidationMode::Strict) .validation_mode(ValidationMode::Strict)
// Uses a content based message ID to avoid duplicates as much as possible // Uses a content based message ID to avoid duplicates as much as possible
.message_id_fn(|msg| { .message_id_fn(|msg| {
@@ -210,7 +221,7 @@ impl LibP2p {
// TODO: We do tests on release binaries as of right now... // TODO: We do tests on release binaries as of right now...
//#[cfg(debug_assertions)] //#[cfg(debug_assertions)]
mdns: { mdns: {
log::info!("spawning mdns"); log::info!("creating mdns service");
libp2p::mdns::tokio::Behaviour::new(libp2p::mdns::Config::default(), throwaway_peer_id) libp2p::mdns::tokio::Behaviour::new(libp2p::mdns::Config::default(), throwaway_peer_id)
.unwrap() .unwrap()
}, },
@@ -227,7 +238,21 @@ impl LibP2p {
async move { async move {
// Run this task ad-infinitum // Run this task ad-infinitum
loop { loop {
// If it's been >80s since we've published a message, publish a KeepAlive since we're
// still an active service
// This is useful when we have no active tributaries and accordingly aren't sending
// heartbeats
// If we are sending heartbeats, we should've sent one after 60s of no finalized blocks
// (where a finalized block only occurs due to network activity), meaning this won't be
// run
let time_since_last =
Instant::now().duration_since(*TIME_OF_LAST_P2P_MESSAGE.lock().await);
if time_since_last > Duration::from_secs(80) {
p2p.broadcast_raw(P2pMessageKind::KeepAlive.serialize()).await;
}
// Maintain this lock until it's out of events // Maintain this lock until it's out of events
// TODO: Is there a less contentious way to run this poll?
let mut p2p_lock = p2p.0.lock().await; let mut p2p_lock = p2p.0.lock().await;
loop { loop {
match futures::poll!(p2p_lock.next()) { match futures::poll!(p2p_lock.next()) {
@@ -236,6 +261,7 @@ impl LibP2p {
libp2p::mdns::Event::Discovered(list), libp2p::mdns::Event::Discovered(list),
)))) => { )))) => {
for (peer, mut addr) in list { for (peer, mut addr) in list {
// Check the port is as expected to prevent trying to peer with Substrate nodes
if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) { if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) {
log::info!("found peer via mdns"); log::info!("found peer via mdns");
p2p_lock.behaviour_mut().gossipsub.add_explicit_peer(&peer); p2p_lock.behaviour_mut().gossipsub.add_explicit_peer(&peer);
@@ -255,6 +281,7 @@ impl LibP2p {
Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( Poll::Ready(Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
GsEvent::Message { propagation_source, message, .. }, GsEvent::Message { propagation_source, message, .. },
)))) => { )))) => {
*TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now();
p2p.1.lock().await.push_back((propagation_source, message.data)); p2p.1.lock().await.push_back((propagation_source, message.data));
} }
Poll::Ready(Some(_)) => {} Poll::Ready(Some(_)) => {}
@@ -281,6 +308,9 @@ impl P2p for LibP2p {
} }
async fn broadcast_raw(&self, msg: Vec<u8>) { async fn broadcast_raw(&self, msg: Vec<u8>) {
// Update the time of last message
*TIME_OF_LAST_P2P_MESSAGE.lock().await = Instant::now();
match self match self
.0 .0
.lock() .lock()