mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 12:19:24 +00:00
Validator DHT (#494)
* Route validators for any active set through sc-authority-discovery Additionally adds an RPC route to retrieve their P2P addresses. * Have the coordinator get peers from substrate * Have the RPC return one address, not up to 3 Prevents the coordinator from believing it has 3 peers when it has one. * Add missing feature to serai-client * Correct network argument in serai-client for p2p_validators call * Add a test in serai-client to check DHT population with a much quicker failure than the coordinator tests * Update to latest Substrate Removes distinguishing BABE/AuthorityDiscovery keys which causes sc_authority_discovery to populate as desired. * Update to a properly tagged substrate commit * Add all dialed to peers to GossipSub * cargo fmt * Reduce common code in serai-coordinator-tests with amore involved new_test * Use a recursive async function to spawn `n` DockerTests with the necessary networking configuration * Merge UNIQUE_ID and ONE_AT_A_TIME * Tidy up the new recursive code in tests/coordinator * Use a Mutex in CONTEXT to let it be set multiple times * Make complimentary edits to full-stack tests * Augment coordinator P2p connection logs * Drop lock acquisitions before recursing * Better scope lock acquisitions in full-stack, preventing a deadlock * Ensure OUTER_OPS is reset across the test boundary * Add cargo deny allowance for dockertest fork
This commit is contained in:
@@ -951,10 +951,8 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
|
||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||
p2p: P,
|
||||
processors: Pro,
|
||||
serai: Serai,
|
||||
serai: Arc<Serai>,
|
||||
) {
|
||||
let serai = Arc::new(serai);
|
||||
|
||||
let (new_tributary_spec_send, mut new_tributary_spec_recv) = mpsc::unbounded_channel();
|
||||
// Reload active tributaries from the database
|
||||
for spec in ActiveTributaryDb::active_tributaries(&raw_db).1 {
|
||||
@@ -1212,11 +1210,10 @@ async fn main() {
|
||||
key_bytes.zeroize();
|
||||
key
|
||||
};
|
||||
let p2p = LibP2p::new();
|
||||
|
||||
let processors = Arc::new(MessageQueue::from_env(Service::Coordinator));
|
||||
|
||||
let serai = || async {
|
||||
let serai = (async {
|
||||
loop {
|
||||
let Ok(serai) = Serai::new(format!(
|
||||
"http://{}:9944",
|
||||
@@ -1229,8 +1226,10 @@ async fn main() {
|
||||
continue;
|
||||
};
|
||||
log::info!("made initial connection to Serai node");
|
||||
return serai;
|
||||
return Arc::new(serai);
|
||||
}
|
||||
};
|
||||
run(db, key, p2p, processors, serai().await).await
|
||||
})
|
||||
.await;
|
||||
let p2p = LibP2p::new(serai.clone());
|
||||
run(db, key, p2p, processors, serai).await
|
||||
}
|
||||
|
||||
@@ -7,9 +7,11 @@ use std::{
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use rand_core::{RngCore, OsRng};
|
||||
|
||||
use scale::Encode;
|
||||
use borsh::{BorshSerialize, BorshDeserialize};
|
||||
use serai_client::primitives::NetworkId;
|
||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai};
|
||||
|
||||
use serai_db::Db;
|
||||
|
||||
@@ -20,6 +22,7 @@ use tokio::{
|
||||
};
|
||||
|
||||
use libp2p::{
|
||||
core::multiaddr::{Protocol, Multiaddr},
|
||||
identity::Keypair,
|
||||
PeerId,
|
||||
tcp::Config as TcpConfig,
|
||||
@@ -127,8 +130,8 @@ pub struct Message<P: P2p> {
|
||||
pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
|
||||
type Id: Send + Sync + Clone + Copy + fmt::Debug;
|
||||
|
||||
async fn subscribe(&self, genesis: [u8; 32]);
|
||||
async fn unsubscribe(&self, genesis: [u8; 32]);
|
||||
async fn subscribe(&self, set: ValidatorSet, genesis: [u8; 32]);
|
||||
async fn unsubscribe(&self, set: ValidatorSet, genesis: [u8; 32]);
|
||||
|
||||
async fn send_raw(&self, to: Self::Id, genesis: Option<[u8; 32]>, msg: Vec<u8>);
|
||||
async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec<u8>);
|
||||
@@ -190,14 +193,12 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
|
||||
#[derive(NetworkBehaviour)]
|
||||
struct Behavior {
|
||||
gossipsub: GsBehavior,
|
||||
#[cfg(debug_assertions)]
|
||||
mdns: libp2p::mdns::tokio::Behaviour,
|
||||
}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[derive(Clone)]
|
||||
pub struct LibP2p {
|
||||
subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, [u8; 32])>>>,
|
||||
subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, ValidatorSet, [u8; 32])>>>,
|
||||
broadcast: Arc<Mutex<mpsc::UnboundedSender<(Option<[u8; 32]>, Vec<u8>)>>>,
|
||||
receive: Arc<Mutex<mpsc::UnboundedReceiver<(PeerId, Vec<u8>)>>>,
|
||||
}
|
||||
@@ -209,14 +210,13 @@ impl fmt::Debug for LibP2p {
|
||||
|
||||
impl LibP2p {
|
||||
#[allow(clippy::new_without_default)]
|
||||
pub fn new() -> Self {
|
||||
pub fn new(serai: Arc<Serai>) -> Self {
|
||||
// Block size limit + 1 KB of space for signatures/metadata
|
||||
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
|
||||
|
||||
log::info!("creating a libp2p instance");
|
||||
|
||||
let throwaway_key_pair = Keypair::generate_ed25519();
|
||||
let throwaway_peer_id = PeerId::from(throwaway_key_pair.public());
|
||||
|
||||
let behavior = Behavior {
|
||||
gossipsub: {
|
||||
@@ -258,14 +258,6 @@ impl LibP2p {
|
||||
|
||||
gossipsub
|
||||
},
|
||||
|
||||
// Only use MDNS in debug environments, as it should have no value in a release build
|
||||
#[cfg(debug_assertions)]
|
||||
mdns: {
|
||||
log::info!("creating mdns service");
|
||||
libp2p::mdns::tokio::Behaviour::new(libp2p::mdns::Config::default(), throwaway_peer_id)
|
||||
.unwrap()
|
||||
},
|
||||
};
|
||||
|
||||
// Uses noise for authentication, yamux for multiplexing
|
||||
@@ -294,8 +286,8 @@ impl LibP2p {
|
||||
let (receive_send, receive_recv) = mpsc::unbounded_channel();
|
||||
let (subscribe_send, mut subscribe_recv) = mpsc::unbounded_channel();
|
||||
|
||||
fn topic_for_genesis(genesis: [u8; 32]) -> IdentTopic {
|
||||
IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(genesis)))
|
||||
fn topic_for_set(set: ValidatorSet) -> IdentTopic {
|
||||
IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode())))
|
||||
}
|
||||
|
||||
tokio::spawn({
|
||||
@@ -305,17 +297,14 @@ impl LibP2p {
|
||||
fn broadcast_raw(
|
||||
p2p: &mut Swarm<Behavior>,
|
||||
time_of_last_p2p_message: &mut Instant,
|
||||
genesis: Option<[u8; 32]>,
|
||||
set: Option<ValidatorSet>,
|
||||
msg: Vec<u8>,
|
||||
) {
|
||||
// Update the time of last message
|
||||
*time_of_last_p2p_message = Instant::now();
|
||||
|
||||
let topic = if let Some(genesis) = genesis {
|
||||
topic_for_genesis(genesis)
|
||||
} else {
|
||||
IdentTopic::new(LIBP2P_TOPIC)
|
||||
};
|
||||
let topic =
|
||||
if let Some(set) = set { topic_for_set(set) } else { IdentTopic::new(LIBP2P_TOPIC) };
|
||||
|
||||
match p2p.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
|
||||
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
|
||||
@@ -331,37 +320,97 @@ impl LibP2p {
|
||||
}
|
||||
|
||||
async move {
|
||||
let mut set_for_genesis = HashMap::new();
|
||||
let mut pending_p2p_connections = vec![];
|
||||
// Run this task ad-infinitum
|
||||
loop {
|
||||
// Handle pending P2P connections
|
||||
// TODO: Break this out onto its own task with better peer management logic?
|
||||
{
|
||||
let mut connect = |addr: Multiaddr| {
|
||||
log::info!("found peer from substrate: {addr}");
|
||||
|
||||
let protocols = addr.iter().filter_map(|piece| match piece {
|
||||
// Drop PeerIds from the Substrate P2p network
|
||||
Protocol::P2p(_) => None,
|
||||
// Use our own TCP port
|
||||
Protocol::Tcp(_) => Some(Protocol::Tcp(PORT)),
|
||||
other => Some(other),
|
||||
});
|
||||
|
||||
let mut new_addr = Multiaddr::empty();
|
||||
for protocol in protocols {
|
||||
new_addr.push(protocol);
|
||||
}
|
||||
let addr = new_addr;
|
||||
log::debug!("transformed found peer: {addr}");
|
||||
|
||||
if let Err(e) = swarm.dial(addr) {
|
||||
log::warn!("dialing peer failed: {e:?}");
|
||||
}
|
||||
};
|
||||
|
||||
while let Some(network) = pending_p2p_connections.pop() {
|
||||
if let Ok(mut nodes) = serai.p2p_validators(network).await {
|
||||
// If there's an insufficient amount of nodes known, connect to all yet add it back
|
||||
// and break
|
||||
if nodes.len() < 3 {
|
||||
log::warn!(
|
||||
"insufficient amount of P2P nodes known for {:?}: {}",
|
||||
network,
|
||||
nodes.len()
|
||||
);
|
||||
pending_p2p_connections.push(network);
|
||||
for node in nodes {
|
||||
connect(node);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// Randomly select up to 5
|
||||
for _ in 0 .. 5 {
|
||||
if !nodes.is_empty() {
|
||||
let to_connect = nodes.swap_remove(
|
||||
usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap())
|
||||
.unwrap(),
|
||||
);
|
||||
connect(to_connect);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let time_since_last = Instant::now().duration_since(time_of_last_p2p_message);
|
||||
tokio::select! {
|
||||
biased;
|
||||
|
||||
// Subscribe to any new topics
|
||||
topic = subscribe_recv.recv() => {
|
||||
let (subscribe, topic) = topic.expect("subscribe_recv closed. are we shutting down?");
|
||||
set = subscribe_recv.recv() => {
|
||||
let (subscribe, set, genesis): (_, ValidatorSet, [u8; 32]) =
|
||||
set.expect("subscribe_recv closed. are we shutting down?");
|
||||
let topic = topic_for_set(set);
|
||||
if subscribe {
|
||||
swarm
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.subscribe(&topic_for_genesis(topic))
|
||||
.unwrap();
|
||||
log::info!("subscribing to p2p messages for {set:?}");
|
||||
pending_p2p_connections.push(set.network);
|
||||
set_for_genesis.insert(genesis, set);
|
||||
swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap();
|
||||
} else {
|
||||
swarm
|
||||
.behaviour_mut()
|
||||
.gossipsub
|
||||
.unsubscribe(&topic_for_genesis(topic))
|
||||
.unwrap();
|
||||
log::info!("unsubscribing to p2p messages for {set:?}");
|
||||
set_for_genesis.remove(&genesis);
|
||||
swarm.behaviour_mut().gossipsub.unsubscribe(&topic).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
// Handle any queued outbound messages
|
||||
msg = broadcast_recv.recv() => {
|
||||
let (genesis, msg) = msg.expect("broadcast_recv closed. are we shutting down?");
|
||||
let (genesis, msg): (Option<[u8; 32]>, Vec<u8>) =
|
||||
msg.expect("broadcast_recv closed. are we shutting down?");
|
||||
let set = genesis.and_then(|genesis| set_for_genesis.get(&genesis).copied());
|
||||
broadcast_raw(
|
||||
&mut swarm,
|
||||
&mut time_of_last_p2p_message,
|
||||
genesis,
|
||||
set,
|
||||
msg,
|
||||
);
|
||||
}
|
||||
@@ -369,28 +418,17 @@ impl LibP2p {
|
||||
// Handle new incoming messages
|
||||
event = swarm.next() => {
|
||||
match event {
|
||||
#[cfg(debug_assertions)]
|
||||
Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns(
|
||||
libp2p::mdns::Event::Discovered(list),
|
||||
))) => {
|
||||
for (peer, mut addr) in list {
|
||||
// Check the port is as expected to prevent trying to peer with Substrate nodes
|
||||
if addr.pop() == Some(libp2p::multiaddr::Protocol::Tcp(PORT)) {
|
||||
log::info!("found peer via mdns");
|
||||
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer);
|
||||
}
|
||||
}
|
||||
Some(SwarmEvent::Dialing { connection_id, .. }) => {
|
||||
log::debug!("dialing to peer in connection ID {}", &connection_id);
|
||||
}
|
||||
#[cfg(debug_assertions)]
|
||||
Some(SwarmEvent::Behaviour(BehaviorEvent::Mdns(
|
||||
libp2p::mdns::Event::Expired(list),
|
||||
))) => {
|
||||
for (peer, _) in list {
|
||||
log::info!("disconnecting peer due to mdns");
|
||||
swarm.behaviour_mut().gossipsub.remove_explicit_peer(&peer);
|
||||
Some(SwarmEvent::ConnectionEstablished { peer_id, connection_id, .. }) => {
|
||||
log::debug!(
|
||||
"connection established to peer {} in connection ID {}",
|
||||
&peer_id,
|
||||
&connection_id,
|
||||
);
|
||||
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id)
|
||||
}
|
||||
}
|
||||
|
||||
Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
|
||||
GsEvent::Message { propagation_source, message, .. },
|
||||
))) => {
|
||||
@@ -434,21 +472,21 @@ impl LibP2p {
|
||||
impl P2p for LibP2p {
|
||||
type Id = PeerId;
|
||||
|
||||
async fn subscribe(&self, genesis: [u8; 32]) {
|
||||
async fn subscribe(&self, set: ValidatorSet, genesis: [u8; 32]) {
|
||||
self
|
||||
.subscribe
|
||||
.lock()
|
||||
.await
|
||||
.send((true, genesis))
|
||||
.send((true, set, genesis))
|
||||
.expect("subscribe_send closed. are we shutting down?");
|
||||
}
|
||||
|
||||
async fn unsubscribe(&self, genesis: [u8; 32]) {
|
||||
async fn unsubscribe(&self, set: ValidatorSet, genesis: [u8; 32]) {
|
||||
self
|
||||
.subscribe
|
||||
.lock()
|
||||
.await
|
||||
.send((false, genesis))
|
||||
.send((false, set, genesis))
|
||||
.expect("subscribe_send closed. are we shutting down?");
|
||||
}
|
||||
|
||||
@@ -552,7 +590,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||
channels.write().await.insert(genesis, send);
|
||||
|
||||
// Subscribe to the topic for this tributary
|
||||
p2p.subscribe(genesis).await;
|
||||
p2p.subscribe(tributary.spec.set(), genesis).await;
|
||||
|
||||
// Per-Tributary P2P message handler
|
||||
tokio::spawn({
|
||||
@@ -675,8 +713,8 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||
}
|
||||
TributaryEvent::TributaryRetired(set) => {
|
||||
if let Some(genesis) = set_to_genesis.remove(&set) {
|
||||
p2p.unsubscribe(set, genesis).await;
|
||||
channels.write().await.remove(&genesis);
|
||||
p2p.unsubscribe(genesis).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::{
|
||||
collections::{VecDeque, HashSet, HashMap},
|
||||
};
|
||||
|
||||
use serai_client::primitives::NetworkId;
|
||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
|
||||
|
||||
use processor_messages::CoordinatorMessage;
|
||||
|
||||
@@ -62,8 +62,8 @@ impl LocalP2p {
|
||||
impl P2p for LocalP2p {
|
||||
type Id = usize;
|
||||
|
||||
async fn subscribe(&self, _genesis: [u8; 32]) {}
|
||||
async fn unsubscribe(&self, _genesis: [u8; 32]) {}
|
||||
async fn subscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
|
||||
async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
|
||||
|
||||
async fn send_raw(&self, to: Self::Id, _genesis: Option<[u8; 32]>, msg: Vec<u8>) {
|
||||
self.1.write().await.1[to].push_back((self.0, msg));
|
||||
|
||||
Reference in New Issue
Block a user