From 96c397caa0932431f8bef910365033c7e083d68e Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Fri, 13 Oct 2023 19:47:58 -0400 Subject: [PATCH] Add content-based deduplication to the tests' shimmed P2P The tests have recently had their timing stilted, causing failures. The tests are... fine. They're fragile, as obvious, yet they're logical. The simplest fix is to unstilt their timing rather to make them non-fragile. The recent change, which presumably caused said stilting, was the the rebroadcasting added. This de-duplication prevents most of the impact of rebroadcasting. While there's still the async task, and the lock acquisition on attempt to rebroadcast, this hopefully is enough. --- coordinator/src/tests/mod.rs | 23 +++++++++++++++++------ coordinator/src/tests/tributary/sync.rs | 2 +- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/coordinator/src/tests/mod.rs b/coordinator/src/tests/mod.rs index 6aaa907a..9a01e122 100644 --- a/coordinator/src/tests/mod.rs +++ b/coordinator/src/tests/mod.rs @@ -1,7 +1,7 @@ use core::fmt::Debug; use std::{ sync::Arc, - collections::{VecDeque, HashMap}, + collections::{VecDeque, HashSet, HashMap}, }; use serai_client::primitives::NetworkId; @@ -45,11 +45,11 @@ impl Processors for MemProcessors { #[allow(clippy::type_complexity)] #[derive(Clone, Debug)] -pub struct LocalP2p(usize, pub Arc)>>>>); +pub struct LocalP2p(usize, pub Arc>, Vec)>>)>>); impl LocalP2p { pub fn new(validators: usize) -> Vec { - let shared = Arc::new(RwLock::new(vec![VecDeque::new(); validators])); + let shared = Arc::new(RwLock::new((HashSet::new(), vec![VecDeque::new(); validators]))); let mut res = vec![]; for i in 0 .. validators { res.push(LocalP2p(i, shared.clone())); @@ -63,11 +63,22 @@ impl P2p for LocalP2p { type Id = usize; async fn send_raw(&self, to: Self::Id, msg: Vec) { - self.1.write().await[to].push_back((self.0, msg)); + self.1.write().await.1[to].push_back((self.0, msg)); } async fn broadcast_raw(&self, msg: Vec) { - for (i, msg_queue) in self.1.write().await.iter_mut().enumerate() { + // Content-based deduplication + let mut lock = self.1.write().await; + { + let already_sent = &mut lock.0; + if already_sent.contains(&msg) { + return; + } + already_sent.insert(msg.clone()); + } + let queues = &mut lock.1; + + for (i, msg_queue) in queues.iter_mut().enumerate() { if i == self.0 { continue; } @@ -78,7 +89,7 @@ impl P2p for LocalP2p { async fn receive_raw(&self) -> (Self::Id, Vec) { // This is a cursed way to implement an async read from a Vec loop { - if let Some(res) = self.1.write().await[self.0].pop_front() { + if let Some(res) = self.1.write().await.1[self.0].pop_front() { return res; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; diff --git a/coordinator/src/tests/tributary/sync.rs b/coordinator/src/tests/tributary/sync.rs index 3dfc3757..bf13a735 100644 --- a/coordinator/src/tests/tributary/sync.rs +++ b/coordinator/src/tests/tributary/sync.rs @@ -70,7 +70,7 @@ async fn sync_test() { // Now that we've confirmed the other tributaries formed a net without issue, drop the syncer's // pending P2P messages - syncer_p2p.1.write().await.last_mut().unwrap().clear(); + syncer_p2p.1.write().await.1.last_mut().unwrap().clear(); // Have it join the net let syncer_key = Ristretto::generator() * *syncer_key;