Add a LibP2P instantiation to coordinator

It's largely unoptimized, and not yet exclusive to validators, yet has basic
sanity (using message content for ID instead of sender + index).

Fixes bugs as found. Notably, we used a time in milliseconds where the
Tributary expected  seconds.

Also has Tributary::new jump to the presumed round number. This reduces slashes
when starting new chains (whose times will be before the current time) and was
the only way I was able to observe successful confirmations given current
surrounding infrastructure.
This commit is contained in:
Luke Parker
2023-08-08 15:12:47 -04:00
parent 0dd8aed134
commit f6f945e747
18 changed files with 376 additions and 50 deletions

View File

@@ -1,3 +1,4 @@
use core::fmt::Debug;
use std::{
sync::Arc,
collections::{VecDeque, HashMap},
@@ -7,9 +8,14 @@ use serai_client::primitives::NetworkId;
use processor_messages::CoordinatorMessage;
use async_trait::async_trait;
use tokio::sync::RwLock;
use crate::processors::{Message, Processors};
use crate::{
processors::{Message, Processors},
TributaryP2p, P2pMessageKind, P2p,
};
pub mod tributary;
@@ -36,3 +42,53 @@ impl Processors for MemProcessors {
todo!()
}
}
#[allow(clippy::type_complexity)]
#[derive(Clone, Debug)]
pub struct LocalP2p(usize, pub Arc<RwLock<Vec<VecDeque<(usize, Vec<u8>)>>>>);
impl LocalP2p {
pub fn new(validators: usize) -> Vec<LocalP2p> {
let shared = Arc::new(RwLock::new(vec![VecDeque::new(); validators]));
let mut res = vec![];
for i in 0 .. validators {
res.push(LocalP2p(i, shared.clone()));
}
res
}
}
#[async_trait]
impl P2p for LocalP2p {
type Id = usize;
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
self.1.write().await[to].push_back((self.0, msg));
}
async fn broadcast_raw(&self, msg: Vec<u8>) {
for (i, msg_queue) in self.1.write().await.iter_mut().enumerate() {
if i == self.0 {
continue;
}
msg_queue.push_back((self.0, msg.clone()));
}
}
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
// This is a cursed way to implement an async read from a Vec
loop {
if let Some(res) = self.1.write().await[self.0].pop_front() {
return res;
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
#[async_trait]
impl TributaryP2p for LocalP2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
}
}