mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-15 07:29:25 +00:00
* Route validators for any active set through sc-authority-discovery Additionally adds an RPC route to retrieve their P2P addresses. * Have the coordinator get peers from substrate * Have the RPC return one address, not up to 3 Prevents the coordinator from believing it has 3 peers when it has one. * Add missing feature to serai-client * Correct network argument in serai-client for p2p_validators call * Add a test in serai-client to check DHT population with a much quicker failure than the coordinator tests * Update to latest Substrate Removes distinguishing BABE/AuthorityDiscovery keys which causes sc_authority_discovery to populate as desired. * Update to a properly tagged substrate commit * Add all dialed to peers to GossipSub * cargo fmt * Reduce common code in serai-coordinator-tests with amore involved new_test * Use a recursive async function to spawn `n` DockerTests with the necessary networking configuration * Merge UNIQUE_ID and ONE_AT_A_TIME * Tidy up the new recursive code in tests/coordinator * Use a Mutex in CONTEXT to let it be set multiple times * Make complimentary edits to full-stack tests * Augment coordinator P2p connection logs * Drop lock acquisitions before recursing * Better scope lock acquisitions in full-stack, preventing a deadlock * Ensure OUTER_OPS is reset across the test boundary * Add cargo deny allowance for dockertest fork
109 lines
2.8 KiB
Rust
109 lines
2.8 KiB
Rust
use core::fmt::Debug;
|
|
use std::{
|
|
sync::Arc,
|
|
collections::{VecDeque, HashSet, HashMap},
|
|
};
|
|
|
|
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
|
|
|
|
use processor_messages::CoordinatorMessage;
|
|
|
|
use async_trait::async_trait;
|
|
|
|
use tokio::sync::RwLock;
|
|
|
|
use crate::{
|
|
processors::{Message, Processors},
|
|
TributaryP2p, P2pMessageKind, P2p,
|
|
};
|
|
|
|
pub mod tributary;
|
|
|
|
#[derive(Clone)]
|
|
pub struct MemProcessors(pub Arc<RwLock<HashMap<NetworkId, VecDeque<CoordinatorMessage>>>>);
|
|
impl MemProcessors {
|
|
#[allow(clippy::new_without_default)]
|
|
pub fn new() -> MemProcessors {
|
|
MemProcessors(Arc::new(RwLock::new(HashMap::new())))
|
|
}
|
|
}
|
|
|
|
#[async_trait::async_trait]
|
|
impl Processors for MemProcessors {
|
|
async fn send(&self, network: NetworkId, msg: impl Send + Into<CoordinatorMessage>) {
|
|
let mut processors = self.0.write().await;
|
|
let processor = processors.entry(network).or_insert_with(VecDeque::new);
|
|
processor.push_back(msg.into());
|
|
}
|
|
async fn recv(&self, _: NetworkId) -> Message {
|
|
todo!()
|
|
}
|
|
async fn ack(&self, _: Message) {
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
#[allow(clippy::type_complexity)]
|
|
#[derive(Clone, Debug)]
|
|
pub struct LocalP2p(usize, pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, Vec<u8>)>>)>>);
|
|
|
|
impl LocalP2p {
|
|
pub fn new(validators: usize) -> Vec<LocalP2p> {
|
|
let shared = Arc::new(RwLock::new((HashSet::new(), vec![VecDeque::new(); validators])));
|
|
let mut res = vec![];
|
|
for i in 0 .. validators {
|
|
res.push(LocalP2p(i, shared.clone()));
|
|
}
|
|
res
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl P2p for LocalP2p {
|
|
type Id = usize;
|
|
|
|
async fn subscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
|
|
async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
|
|
|
|
async fn send_raw(&self, to: Self::Id, _genesis: Option<[u8; 32]>, msg: Vec<u8>) {
|
|
self.1.write().await.1[to].push_back((self.0, msg));
|
|
}
|
|
|
|
async fn broadcast_raw(&self, _genesis: Option<[u8; 32]>, msg: Vec<u8>) {
|
|
// Content-based deduplication
|
|
let mut lock = self.1.write().await;
|
|
{
|
|
let already_sent = &mut lock.0;
|
|
if already_sent.contains(&msg) {
|
|
return;
|
|
}
|
|
already_sent.insert(msg.clone());
|
|
}
|
|
let queues = &mut lock.1;
|
|
|
|
for (i, msg_queue) in queues.iter_mut().enumerate() {
|
|
if i == self.0 {
|
|
continue;
|
|
}
|
|
msg_queue.push_back((self.0, msg.clone()));
|
|
}
|
|
}
|
|
|
|
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
|
|
// This is a cursed way to implement an async read from a Vec
|
|
loop {
|
|
if let Some(res) = self.1.write().await.1[self.0].pop_front() {
|
|
return res;
|
|
}
|
|
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl TributaryP2p for LocalP2p {
|
|
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
|
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
|
|
}
|
|
}
|