From 80e5ca932835e05dcb392f6eda129cccf9e6e233 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 13 Oct 2023 22:40:11 -0400 Subject: [PATCH] Move heartbeat_tributaries and handle_p2p to p2p.rs --- coordinator/src/main.rs | 204 +---------------- coordinator/src/p2p.rs | 212 +++++++++++++++++- coordinator/src/tests/tributary/handle_p2p.rs | 5 +- coordinator/src/tests/tributary/sync.rs | 9 +- 4 files changed, 225 insertions(+), 205 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index e07faf9a..853e2ca1 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,7 +1,7 @@ use core::ops::Deref; use std::{ sync::Arc, - time::{SystemTime, Duration}, + time::Duration, collections::{VecDeque, HashMap}, }; @@ -27,7 +27,7 @@ use tokio::{ time::sleep, }; -use ::tributary::{ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary}; +use ::tributary::{ProvidedError, TransactionKind, TransactionTrait, Block, Tributary}; mod tributary; use crate::tributary::{ @@ -107,198 +107,6 @@ async fn add_tributary( .unwrap(); } -pub async fn heartbeat_tributaries( - p2p: P, - mut new_tributary: broadcast::Receiver>, -) { - let ten_blocks_of_time = - Duration::from_secs((10 * Tributary::::block_time()).into()); - - let mut readers = vec![]; - loop { - while let Ok(ActiveTributary { spec: _, tributary }) = { - match new_tributary.try_recv() { - Ok(tributary) => Ok(tributary), - Err(broadcast::error::TryRecvError::Empty) => Err(()), - Err(broadcast::error::TryRecvError::Lagged(_)) => { - panic!("heartbeat_tributaries lagged to handle new_tributary") - } - Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), - } - } { - readers.push(tributary.reader()); - } - - for tributary in &readers { - let tip = tributary.tip(); - let block_time = - SystemTime::UNIX_EPOCH + Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0)); - - // Only trigger syncing if the block is more than a minute behind - if SystemTime::now() > (block_time + Duration::from_secs(60)) { - log::warn!("last known tributary block was over a minute ago"); - let mut msg = tip.to_vec(); - // Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating - let timestamp = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .expect("system clock is wrong") - .as_secs(); - // Divide by the block time so if multiple parties send a Heartbeat, they're more likely to - // overlap - let time_unit = timestamp / u64::from(Tributary::::block_time()); - msg.extend(time_unit.to_le_bytes()); - P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await; - } - } - - // Only check once every 10 blocks of time - sleep(ten_blocks_of_time).await; - } -} - -pub async fn handle_p2p( - our_key: ::G, - p2p: P, - mut new_tributary: broadcast::Receiver>, -) { - let channels = Arc::new(RwLock::new(HashMap::new())); - tokio::spawn({ - let p2p = p2p.clone(); - let channels = channels.clone(); - async move { - loop { - let tributary = new_tributary.recv().await.unwrap(); - let genesis = tributary.spec.genesis(); - - let (send, mut recv) = mpsc::unbounded_channel(); - channels.write().await.insert(genesis, send); - - tokio::spawn({ - let p2p = p2p.clone(); - async move { - loop { - let mut msg: Message

= recv.recv().await.unwrap(); - match msg.kind { - P2pMessageKind::KeepAlive => {} - - P2pMessageKind::Tributary(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - log::trace!("handling message for tributary {:?}", tributary.spec.set()); - if tributary.tributary.handle_message(&msg.msg).await { - P2p::broadcast(&p2p, msg.kind, msg.msg).await; - } - } - - // TODO2: Rate limit this per timestamp - // And/or slash on Heartbeat which justifies a response, since the node obviously - // was offline and we must now use our bandwidth to compensate for them? - P2pMessageKind::Heartbeat(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - if msg.msg.len() != 40 { - log::error!("validator sent invalid heartbeat"); - continue; - } - - let p2p = p2p.clone(); - let spec = tributary.spec.clone(); - let reader = tributary.tributary.reader(); - // Spawn a dedicated task as this may require loading large amounts of data from - // disk and take a notable amount of time - tokio::spawn(async move { - /* - // Have sqrt(n) nodes reply with the blocks - let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; - // Try to have at least 3 responders - if responders < 3 { - responders = tributary.spec.n().min(3).into(); - } - */ - - // Have up to three nodes respond - let responders = u64::from(spec.n().min(3)); - - // Decide which nodes will respond by using the latest block's hash as a - // mutually agreed upon entropy source - // This isn't a secure source of entropy, yet it's fine for this - let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap()); - // If n = 10, responders = 3, we want `start` to be 0 ..= 7 - // (so the highest is 7, 8, 9) - // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 - let start = - usize::try_from(entropy % (u64::from(spec.n() + 1) - responders)).unwrap(); - let mut selected = false; - for validator in - &spec.validators()[start .. (start + usize::try_from(responders).unwrap())] - { - if our_key == validator.0 { - selected = true; - break; - } - } - if !selected { - log::debug!("received heartbeat and not selected to respond"); - return; - } - - log::debug!("received heartbeat and selected to respond"); - - let mut latest = msg.msg[.. 32].try_into().unwrap(); - while let Some(next) = reader.block_after(&latest) { - let mut res = reader.block(&next).unwrap().serialize(); - res.extend(reader.commit(&next).unwrap()); - // Also include the timestamp used within the Heartbeat - res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await; - latest = next; - } - }); - } - - P2pMessageKind::Block(msg_genesis) => { - assert_eq!(msg_genesis, genesis); - let mut msg_ref: &[u8] = msg.msg.as_ref(); - let Ok(block) = Block::::read(&mut msg_ref) else { - log::error!("received block message with an invalidly serialized block"); - continue; - }; - // Get just the commit - msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); - msg.msg.drain((msg.msg.len() - 8) ..); - - let res = tributary.tributary.sync_block(block, msg.msg).await; - log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); - } - } - } - } - }); - } - } - }); - - loop { - let msg = p2p.receive().await; - match msg.kind { - P2pMessageKind::KeepAlive => {} - P2pMessageKind::Tributary(genesis) => { - if let Some(channel) = channels.read().await.get(&genesis) { - channel.send(msg).unwrap(); - } - } - P2pMessageKind::Heartbeat(genesis) => { - if let Some(channel) = channels.read().await.get(&genesis) { - channel.send(msg).unwrap(); - } - } - P2pMessageKind::Block(genesis) => { - if let Some(channel) = channels.read().await.get(&genesis) { - channel.send(msg).unwrap(); - } - } - } - } -} - async fn publish_signed_transaction( db: &mut D, tributary: &Tributary, @@ -1075,10 +883,14 @@ pub async fn run( // Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block // in a while (presumably because we're behind) - tokio::spawn(heartbeat_tributaries(p2p.clone(), new_tributary_listener_3)); + tokio::spawn(p2p::heartbeat_tributaries_task(p2p.clone(), new_tributary_listener_3)); // Handle P2P messages - tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, new_tributary_listener_4)); + tokio::spawn(p2p::handle_p2p_task( + Ristretto::generator() * key.deref(), + p2p, + new_tributary_listener_4, + )); // Handle all messages from processors handle_processors(raw_db, key, serai, processors, new_tributary_listener_5).await; diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index bc252d50..5cd46abf 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -1,9 +1,21 @@ use core::{time::Duration, fmt}; -use std::{sync::Arc, time::Instant, io::Read}; +use std::{ + sync::Arc, + io::Read, + collections::HashMap, + time::{SystemTime, Instant}, +}; use async_trait::async_trait; -use tokio::sync::{mpsc, Mutex}; +use ciphersuite::{Ciphersuite, Ristretto}; + +use serai_db::Db; + +use tokio::{ + sync::{Mutex, RwLock, mpsc, broadcast}, + time::sleep, +}; use libp2p::{ futures::StreamExt, @@ -20,7 +32,9 @@ use libp2p::{ swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, Swarm}, }; -pub use tributary::P2p as TributaryP2p; +pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p}; + +use crate::{Transaction, Block, Tributary, ActiveTributary}; // TODO: Use distinct topics const LIBP2P_TOPIC: &str = "serai-coordinator"; @@ -366,3 +380,195 @@ impl TributaryP2p for LibP2p { ::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await } } + +pub async fn heartbeat_tributaries_task( + p2p: P, + mut new_tributary: broadcast::Receiver>, +) { + let ten_blocks_of_time = + Duration::from_secs((10 * Tributary::::block_time()).into()); + + let mut readers = vec![]; + loop { + while let Ok(ActiveTributary { spec: _, tributary }) = { + match new_tributary.try_recv() { + Ok(tributary) => Ok(tributary), + Err(broadcast::error::TryRecvError::Empty) => Err(()), + Err(broadcast::error::TryRecvError::Lagged(_)) => { + panic!("heartbeat_tributaries lagged to handle new_tributary") + } + Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), + } + } { + readers.push(tributary.reader()); + } + + for tributary in &readers { + let tip = tributary.tip(); + let block_time = + SystemTime::UNIX_EPOCH + Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0)); + + // Only trigger syncing if the block is more than a minute behind + if SystemTime::now() > (block_time + Duration::from_secs(60)) { + log::warn!("last known tributary block was over a minute ago"); + let mut msg = tip.to_vec(); + // Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating + let timestamp = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("system clock is wrong") + .as_secs(); + // Divide by the block time so if multiple parties send a Heartbeat, they're more likely to + // overlap + let time_unit = timestamp / u64::from(Tributary::::block_time()); + msg.extend(time_unit.to_le_bytes()); + P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await; + } + } + + // Only check once every 10 blocks of time + sleep(ten_blocks_of_time).await; + } +} + +pub async fn handle_p2p_task( + our_key: ::G, + p2p: P, + mut new_tributary: broadcast::Receiver>, +) { + let channels = Arc::new(RwLock::new(HashMap::new())); + tokio::spawn({ + let p2p = p2p.clone(); + let channels = channels.clone(); + async move { + loop { + let tributary = new_tributary.recv().await.unwrap(); + let genesis = tributary.spec.genesis(); + + let (send, mut recv) = mpsc::unbounded_channel(); + channels.write().await.insert(genesis, send); + + tokio::spawn({ + let p2p = p2p.clone(); + async move { + loop { + let mut msg: Message

= recv.recv().await.unwrap(); + match msg.kind { + P2pMessageKind::KeepAlive => {} + + P2pMessageKind::Tributary(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + log::trace!("handling message for tributary {:?}", tributary.spec.set()); + if tributary.tributary.handle_message(&msg.msg).await { + P2p::broadcast(&p2p, msg.kind, msg.msg).await; + } + } + + // TODO2: Rate limit this per timestamp + // And/or slash on Heartbeat which justifies a response, since the node obviously + // was offline and we must now use our bandwidth to compensate for them? + P2pMessageKind::Heartbeat(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + if msg.msg.len() != 40 { + log::error!("validator sent invalid heartbeat"); + continue; + } + + let p2p = p2p.clone(); + let spec = tributary.spec.clone(); + let reader = tributary.tributary.reader(); + // Spawn a dedicated task as this may require loading large amounts of data from + // disk and take a notable amount of time + tokio::spawn(async move { + /* + // Have sqrt(n) nodes reply with the blocks + let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; + // Try to have at least 3 responders + if responders < 3 { + responders = tributary.spec.n().min(3).into(); + } + */ + + // Have up to three nodes respond + let responders = u64::from(spec.n().min(3)); + + // Decide which nodes will respond by using the latest block's hash as a + // mutually agreed upon entropy source + // This isn't a secure source of entropy, yet it's fine for this + let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap()); + // If n = 10, responders = 3, we want `start` to be 0 ..= 7 + // (so the highest is 7, 8, 9) + // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 + let start = + usize::try_from(entropy % (u64::from(spec.n() + 1) - responders)).unwrap(); + let mut selected = false; + for validator in + &spec.validators()[start .. (start + usize::try_from(responders).unwrap())] + { + if our_key == validator.0 { + selected = true; + break; + } + } + if !selected { + log::debug!("received heartbeat and not selected to respond"); + return; + } + + log::debug!("received heartbeat and selected to respond"); + + let mut latest = msg.msg[.. 32].try_into().unwrap(); + while let Some(next) = reader.block_after(&latest) { + let mut res = reader.block(&next).unwrap().serialize(); + res.extend(reader.commit(&next).unwrap()); + // Also include the timestamp used within the Heartbeat + res.extend(&msg.msg[32 .. 40]); + p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await; + latest = next; + } + }); + } + + P2pMessageKind::Block(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + let mut msg_ref: &[u8] = msg.msg.as_ref(); + let Ok(block) = Block::::read(&mut msg_ref) else { + log::error!("received block message with an invalidly serialized block"); + continue; + }; + // Get just the commit + msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); + msg.msg.drain((msg.msg.len() - 8) ..); + + let res = tributary.tributary.sync_block(block, msg.msg).await; + log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); + } + } + } + } + }); + } + } + }); + + loop { + let msg = p2p.receive().await; + match msg.kind { + P2pMessageKind::KeepAlive => {} + P2pMessageKind::Tributary(genesis) => { + if let Some(channel) = channels.read().await.get(&genesis) { + channel.send(msg).unwrap(); + } + } + P2pMessageKind::Heartbeat(genesis) => { + if let Some(channel) = channels.read().await.get(&genesis) { + channel.send(msg).unwrap(); + } + } + P2pMessageKind::Block(genesis) => { + if let Some(channel) = channels.read().await.get(&genesis) { + channel.send(msg).unwrap(); + } + } + } + } +} diff --git a/coordinator/src/tests/tributary/handle_p2p.rs b/coordinator/src/tests/tributary/handle_p2p.rs index 87576dd8..f69c0b1a 100644 --- a/coordinator/src/tests/tributary/handle_p2p.rs +++ b/coordinator/src/tests/tributary/handle_p2p.rs @@ -13,7 +13,8 @@ use tributary::Tributary; use crate::{ tributary::Transaction, - ActiveTributary, handle_p2p, + ActiveTributary, + p2p::handle_p2p_task, tests::{ LocalP2p, tributary::{new_keys, new_spec, new_tributaries}, @@ -33,7 +34,7 @@ async fn handle_p2p_test() { let tributary = Arc::new(tributary); tributary_arcs.push(tributary.clone()); let (new_tributary_send, new_tributary_recv) = broadcast::channel(5); - tokio::spawn(handle_p2p(Ristretto::generator() * *keys[i], p2p, new_tributary_recv)); + tokio::spawn(handle_p2p_task(Ristretto::generator() * *keys[i], p2p, new_tributary_recv)); new_tributary_send .send(ActiveTributary { spec: spec.clone(), tributary }) .map_err(|_| "failed to send ActiveTributary") diff --git a/coordinator/src/tests/tributary/sync.rs b/coordinator/src/tests/tributary/sync.rs index 3cc7f2c6..36f8e6d2 100644 --- a/coordinator/src/tests/tributary/sync.rs +++ b/coordinator/src/tests/tributary/sync.rs @@ -13,7 +13,8 @@ use tributary::Tributary; use crate::{ tributary::Transaction, - ActiveTributary, handle_p2p, heartbeat_tributaries, + ActiveTributary, + p2p::{heartbeat_tributaries_task, handle_p2p_task}, tests::{ LocalP2p, tributary::{new_keys, new_spec, new_tributaries}, @@ -42,7 +43,7 @@ async fn sync_test() { tributary_arcs.push(tributary.clone()); let (new_tributary_send, new_tributary_recv) = broadcast::channel(5); let thread = - tokio::spawn(handle_p2p(Ristretto::generator() * *keys[i], p2p, new_tributary_recv)); + tokio::spawn(handle_p2p_task(Ristretto::generator() * *keys[i], p2p, new_tributary_recv)); new_tributary_send .send(ActiveTributary { spec: spec.clone(), tributary }) .map_err(|_| "failed to send ActiveTributary") @@ -77,7 +78,7 @@ async fn sync_test() { let syncer_key = Ristretto::generator() * *syncer_key; let syncer_tributary = Arc::new(syncer_tributary); let (syncer_tributary_send, syncer_tributary_recv) = broadcast::channel(5); - tokio::spawn(handle_p2p(syncer_key, syncer_p2p.clone(), syncer_tributary_recv)); + tokio::spawn(handle_p2p_task(syncer_key, syncer_p2p.clone(), syncer_tributary_recv)); syncer_tributary_send .send(ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() }) .map_err(|_| "failed to send ActiveTributary to syncer") @@ -95,7 +96,7 @@ async fn sync_test() { // Start the heartbeat protocol let (syncer_heartbeat_tributary_send, syncer_heartbeat_tributary_recv) = broadcast::channel(5); - tokio::spawn(heartbeat_tributaries(syncer_p2p, syncer_heartbeat_tributary_recv)); + tokio::spawn(heartbeat_tributaries_task(syncer_p2p, syncer_heartbeat_tributary_recv)); syncer_heartbeat_tributary_send .send(ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() }) .map_err(|_| "failed to send ActiveTributary to heartbeat")