diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index 6aaa907a..9a01e122 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -1,7 +1,7 @@ use core::fmt::Debug; use std::{ sync::Arc, - collections::{VecDeque, HashMap}, + collections::{VecDeque, HashSet, HashMap}, }; use serai_client::primitives::NetworkId; @@ -45,11 +45,11 @@ impl Processors for MemProcessors { #[allow(clippy::type_complexity)] #[derive(Clone, Debug)] -pub struct LocalP2p(usize, pub Arc)>>>>); +pub struct LocalP2p(usize, pub Arc>, Vec)>>)>>); impl LocalP2p { pub fn new(validators: usize) -> Vec { - let shared = Arc::new(RwLock::new(vec![VecDeque::new(); validators])); + let shared = Arc::new(RwLock::new((HashSet::new(), vec![VecDeque::new(); validators]))); let mut res = vec![]; for i in 0 .. validators { res.push(LocalP2p(i, shared.clone())); @@ -63,11 +63,22 @@ impl P2p for LocalP2p { type Id = usize; async fn send_raw(&self, to: Self::Id, msg: Vec) { - self.1.write().await[to].push_back((self.0, msg)); + self.1.write().await.1[to].push_back((self.0, msg)); } async fn broadcast_raw(&self, msg: Vec) { - for (i, msg_queue) in self.1.write().await.iter_mut().enumerate() { + // Content-based deduplication + let mut lock = self.1.write().await; + { + let already_sent = &mut lock.0; + if already_sent.contains(&msg) { + return; + } + already_sent.insert(msg.clone()); + } + let queues = &mut lock.1; + + for (i, msg_queue) in queues.iter_mut().enumerate() { if i == self.0 { continue; } @@ -78,7 +89,7 @@ impl P2p for LocalP2p { async fn receive_raw(&self) -> (Self::Id, Vec) { // This is a cursed way to implement an async read from a Vec loop { - if let Some(res) = self.1.write().await[self.0].pop_front() { + if let Some(res) = self.1.write().await.1[self.0].pop_front() { return res; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; diff --git a/coordinator/src/tests/tributary/sync.rs b/coordinator/src/tests/tributary/sync.rs index 3dfc3757..bf13a735 100644 --- a/coordinator/src/tests/tributary/sync.rs +++ b/coordinator/src/tests/tributary/sync.rs @@ -70,7 +70,7 @@ async fn sync_test() { // Now that we've confirmed the other tributaries formed a net without issue, drop the syncer's // pending P2P messages - syncer_p2p.1.write().await.last_mut().unwrap().clear(); + syncer_p2p.1.write().await.1.last_mut().unwrap().clear(); // Have it join the net let syncer_key = Ristretto::generator() * *syncer_key;