Build the swarm

Moves UpdateSharedValidatorsTask to validators.rs. While prior planned to
re-use a validators object across connecting and peer state management, the
current plan is to use an independent validators object for each to minimize
any contention. They should be built infrequently enough, and cheap enough to
update in the majority case (due to quickly checking if an update is needed),
that this is fine.
This commit is contained in:
Luke Parker
2025-01-07 18:09:25 -05:00
parent a731c0005d
commit 419223c54e
6 changed files with 243 additions and 64 deletions

View File

@@ -23,10 +23,11 @@ use crate::p2p::{validators::Validators, peer_id_from_public};
const PROTOCOL: &str = "/serai/coordinator/validators";
struct OnlyValidators {
validators: Arc<RwLock<Validators>>,
serai_key: Zeroizing<Keypair>,
noise_keypair: identity::Keypair,
#[derive(Clone)]
pub(crate) struct OnlyValidators {
pub(crate) validators: Arc<RwLock<Validators>>,
pub(crate) serai_key: Zeroizing<Keypair>,
pub(crate) noise_keypair: identity::Keypair,
}
impl OnlyValidators {

View File

@@ -15,7 +15,7 @@ pub use libp2p::gossipsub::Event;
use serai_cosign::SignedCosign;
// Block size limit + 16 KB of space for signatures/metadata
const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);

View File

@@ -1,23 +1,36 @@
use core::future::Future;
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use serai_client::primitives::{NetworkId, PublicKey};
use zeroize::Zeroizing;
use schnorrkel::Keypair;
use tokio::sync::RwLock;
use serai_client::{
primitives::{NetworkId, PublicKey},
Serai,
};
use serai_task::ContinuallyRan;
use tokio::sync::{mpsc, RwLock};
use libp2p::{multihash::Multihash, identity::PeerId, swarm::NetworkBehaviour};
use serai_task::Task;
use libp2p::{
multihash::Multihash,
identity::{self, PeerId},
tcp::Config as TcpConfig,
yamux,
swarm::NetworkBehaviour,
SwarmBuilder,
};
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
use validators::{Validators, update_shared_validators};
use validators::{Validators, UpdateValidatorsTask};
/// The authentication protocol upgrade to limit the P2P network to active validators.
mod authenticate;
use authenticate::OnlyValidators;
/// The dial task, to find new peers to connect to
mod dial;
@@ -34,9 +47,18 @@ mod heartbeat;
/// The swarm task, running it and dispatching to/from it
mod swarm;
use swarm::SwarmTask;
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
// usize::max, manually implemented, as max isn't a const fn
const MAX_LIBP2P_MESSAGE_SIZE: usize =
if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE {
gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE
} else {
reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE
};
fn peer_id_from_public(public: PublicKey) -> PeerId {
// 0 represents the identity Multihash, that no hash was performed
// It's an internal constant so we can't refer to the constant inside libp2p
@@ -76,19 +98,73 @@ struct Behavior {
gossip: gossip::Behavior,
}
struct UpdateSharedValidatorsTask {
validators: Arc<RwLock<Validators>>,
}
pub(crate) fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> P2p {
// Define the object we track peers with
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
impl ContinuallyRan for UpdateSharedValidatorsTask {
// Only run every minute, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
// Define the dial task
let (dial_task_def, dial_task) = Task::new();
let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel();
todo!("TODO: Dial task");
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
update_shared_validators(&self.validators).await.map_err(|e| format!("{e:?}"))?;
Ok(true)
}
}
// Define the Validators object used for validating new connections
let connection_validators = UpdateValidatorsTask::spawn(serai.clone());
let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> {
Ok(OnlyValidators {
serai_key: serai_key.clone(),
validators: connection_validators.clone(),
noise_keypair: noise_keypair.clone(),
})
};
let new_yamux = || {
let mut config = yamux::Config::default();
// 1 MiB default + max message size
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
// 256 KiB default + max message size
config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
config
};
let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() };
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
.with_tokio()
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
.unwrap()
.with_behaviour(|_| behavior)
.unwrap()
.build();
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
let swarm_validators = UpdateValidatorsTask::spawn(serai);
let (gossip_send, gossip_recv) = mpsc::unbounded_channel();
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel();
let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel();
let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel();
let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel();
let (inbound_request_responses_send, inbound_request_responses_recv) = mpsc::unbounded_channel();
// Create the swarm task
SwarmTask::new(
dial_task,
to_dial_recv,
swarm_validators,
peers,
swarm,
gossip_recv,
signed_cosigns_send,
tributary_gossip_send,
outbound_requests_recv,
heartbeat_requests_send,
notable_cosign_requests_send,
inbound_request_responses_recv,
);
todo!("TODO")
}

View File

@@ -17,7 +17,7 @@ use serai_cosign::SignedCosign;
/// The maximum message size for the request-response protocol
// This is derived from the heartbeat message size as it's our largest message
const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
(tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024;
const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";
@@ -46,14 +46,14 @@ pub(crate) struct TributaryBlockWithCommit {
/// Responses which can be received via the request-response protocol.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) enum Response {
NoResponse,
None,
Blocks(Vec<TributaryBlockWithCommit>),
NotableCosigns(Vec<SignedCosign>),
}
impl fmt::Debug for Response {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Response::NoResponse => fmt.debug_struct("Response::NoResponse").finish(),
Response::None => fmt.debug_struct("Response::None").finish(),
Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(),
Response::NotableCosigns(_) => {
fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive()

View File

@@ -28,6 +28,9 @@ use crate::p2p::{
gossip,
};
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
/*
`SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the
contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of
@@ -43,7 +46,7 @@ use crate::p2p::{
- Dispatching received requests
- Sending responses
*/
struct SwarmTask {
pub(crate) struct SwarmTask {
dial_task: TaskHandle,
to_dial: mpsc::UnboundedReceiver<DialOpts>,
last_dial_task_run: Instant,
@@ -54,6 +57,8 @@ struct SwarmTask {
swarm: Swarm<Behavior>,
last_message: Instant,
gossip: mpsc::UnboundedReceiver<gossip::Message>,
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
@@ -99,24 +104,21 @@ impl SwarmTask {
fn handle_reqres(&mut self, event: reqres::Event) {
match event {
reqres::Event::Message { message, .. } => match message {
reqres::Message::Request { request_id, request, channel } => {
match request {
// TODO: Send these
reqres::Request::KeepAlive => {
let _: Result<_, _> =
self.swarm.behaviour_mut().reqres.send_response(channel, Response::NoResponse);
}
reqres::Request::Heartbeat { set, latest_block_hash } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> =
self.heartbeat_requests.send((request_id, set, latest_block_hash));
}
reqres::Request::NotableCosigns { global_session } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session));
}
reqres::Message::Request { request_id, request, channel } => match request {
reqres::Request::KeepAlive => {
let _: Result<_, _> =
self.swarm.behaviour_mut().reqres.send_response(channel, Response::None);
}
}
reqres::Request::Heartbeat { set, latest_block_hash } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> =
self.heartbeat_requests.send((request_id, set, latest_block_hash));
}
reqres::Request::NotableCosigns { global_session } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session));
}
},
reqres::Message::Response { request_id, response } => {
// Send Some(response) as the response for the request
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
@@ -136,9 +138,19 @@ impl SwarmTask {
async fn run(mut self) {
loop {
let time_till_keep_alive = Instant::now().saturating_duration_since(self.last_message);
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
tokio::select! {
() = tokio::time::sleep(time_till_keep_alive) => {
let peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
let behavior = self.swarm.behaviour_mut();
for peer in peers {
behavior.reqres.send_request(&peer, Request::KeepAlive);
}
self.last_message = Instant::now();
}
// Dial peers we're instructed to
dial_opts = self.to_dial.recv() => {
let dial_opts = dial_opts.expect("DialTask was closed?");
@@ -156,8 +168,6 @@ impl SwarmTask {
We also use this to disconnect all peers who are no longer active in any network.
*/
() = tokio::time::sleep(time_till_rebuild_peers) => {
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
let validators_by_network = self.validators.read().await.by_network().clone();
let connected_peers = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
@@ -253,6 +263,7 @@ impl SwarmTask {
let topic = message.topic();
let message = borsh::to_vec(&message).unwrap();
let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message);
self.last_message = Instant::now();
}
request = self.outbound_requests.recv() => {
@@ -273,4 +284,58 @@ impl SwarmTask {
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
dial_task: TaskHandle,
to_dial: mpsc::UnboundedReceiver<DialOpts>,
validators: Arc<RwLock<Validators>>,
peers: Peers,
swarm: Swarm<Behavior>,
gossip: mpsc::UnboundedReceiver<gossip::Message>,
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
outbound_requests: mpsc::UnboundedReceiver<(
PeerId,
Request,
oneshot::Sender<Option<Response>>,
)>,
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>,
inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>,
) {
tokio::spawn(
SwarmTask {
dial_task,
to_dial,
last_dial_task_run: Instant::now(),
validators,
peers,
rebuild_peers_at: Instant::now() + TIME_BETWEEN_REBUILD_PEERS,
swarm,
last_message: Instant::now(),
gossip,
signed_cosigns,
tributary_gossip,
outbound_requests,
outbound_request_responses: HashMap::new(),
inbound_request_response_channels: HashMap::new(),
heartbeat_requests,
notable_cosign_requests,
inbound_request_responses,
}
.run(),
);
}
}

View File

@@ -1,4 +1,4 @@
use core::borrow::Borrow;
use core::{borrow::Borrow, future::Future};
use std::{
sync::Arc,
collections::{HashSet, HashMap},
@@ -6,6 +6,8 @@ use std::{
use serai_client::{primitives::NetworkId, validator_sets::primitives::Session, Serai};
use serai_task::{Task, ContinuallyRan};
use libp2p::PeerId;
use futures_util::stream::{StreamExt, FuturesUnordered};
@@ -103,8 +105,6 @@ impl Validators {
}
/// Update the view of the validators.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update(&mut self) -> Result<(), String> {
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
self.incorporate_session_changes(session_changes);
@@ -124,19 +124,56 @@ impl Validators {
}
}
/// Update the view of the validators.
/// A task which updates a set of validators.
///
/// This minimizes the time an exclusive lock is held over the validators to minimize the
/// disruption to functioning.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update_shared_validators(
validators: &Arc<RwLock<Validators>>,
) -> Result<(), String> {
let session_changes = {
let validators = validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
};
validators.write().await.incorporate_session_changes(session_changes);
Ok(())
/// The validators managed by this tak will have their exclusive lock held for a minimal amount of
/// time while the update occurs to minimize the disruption to the services relying on it.
pub(crate) struct UpdateValidatorsTask {
validators: Arc<RwLock<Validators>>,
}
impl UpdateValidatorsTask {
/// Spawn a new instance of the UpdateValidatorsTask.
///
/// This returns a reference to the Validators it updates after spawning itself.
pub(crate) fn spawn(serai: Serai) -> Arc<RwLock<Validators>> {
// The validators which will be updated
let validators = Arc::new(RwLock::new(Validators {
serai,
sessions: HashMap::new(),
by_network: HashMap::new(),
validators: HashMap::new(),
}));
// Define the task
let (update_validators_task, update_validators_task_handle) = Task::new();
// Forget the handle, as dropping the handle would stop the task
core::mem::forget(update_validators_task_handle);
// Spawn the task
tokio::spawn(
(Self { validators: validators.clone() }).continually_run(update_validators_task, vec![]),
);
// Return the validators
validators
}
}
impl ContinuallyRan for UpdateValidatorsTask {
// Only run every minute, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let session_changes = {
let validators = self.validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone())
.await
.map_err(|e| format!("{e:?}"))?
};
self.validators.write().await.incorporate_session_changes(session_changes);
Ok(true)
}
}
}