diff --git a/Cargo.lock b/Cargo.lock index aaf0c4b1..7aa653d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1309,6 +1309,7 @@ dependencies = [ "blake2", "ciphersuite", "flexible-transcript", + "futures", "lazy_static", "log", "modular-frost", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index a2620c21..548ba53c 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -42,4 +42,5 @@ log = "0.4" tokio = { version = "1", features = ["full"] } [dev-dependencies] +futures = "0.3" tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] } diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 0b079681..8bad36dc 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -52,16 +52,19 @@ async fn run( serai: Serai, ) { let add_new_tributary = |db, spec: TributarySpec| async { + // Save it to the database MainDb(db).add_active_tributary(&spec); + // Add it to the queue + // If we reboot before this is read from the queue, the fact it was saved to the database + // means it'll be handled on reboot NEW_TRIBUTARIES.write().await.push_back(spec); }; + // Handle new Substrate blocks { let mut substrate_db = substrate::SubstrateDb::new(raw_db.clone()); let mut last_substrate_block = substrate_db.last_block(); - let p2p = p2p.clone(); - let key = key.clone(); let mut processor = processor.clone(); tokio::spawn(async move { @@ -70,7 +73,6 @@ async fn run( &mut substrate_db, &key, add_new_tributary, - &p2p, &mut processor, &serai, &mut last_substrate_block, @@ -87,15 +89,14 @@ async fn run( }); } + // Handle the Tributaries { struct ActiveTributary { spec: TributarySpec, tributary: Tributary, } + let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary>::new())); - let mut tributaries = HashMap::<[u8; 32], ActiveTributary>::new(); - - // TODO: Use a db on a distinct volume async fn add_tributary( db: D, key: Zeroizing<::F>, @@ -104,6 +105,7 @@ async fn run( spec: TributarySpec, ) { let tributary = Tributary::<_, Transaction, _>::new( + // TODO: Use a db on a distinct volume db, spec.genesis(), spec.start_time(), @@ -117,40 +119,85 @@ async fn run( tributaries.insert(tributary.genesis(), ActiveTributary { spec, tributary }); } + // Reload active tributaries from the database // TODO: Can MainDb take a borrow? for spec in MainDb(raw_db.clone()).active_tributaries().1 { - add_tributary(raw_db.clone(), key.clone(), p2p.clone(), &mut tributaries, spec).await; + add_tributary( + raw_db.clone(), + key.clone(), + p2p.clone(), + &mut *tributaries.write().await, + spec, + ) + .await; } + // Handle new Tributary blocks let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); - tokio::spawn(async move { - loop { - // The following handle_new_blocks function may take an arbitrary amount of time - // If registering a new tributary waited for a lock on the tributaries table, the substrate - // scanner may wait on a lock for an arbitrary amount of time - // By instead using the distinct NEW_TRIBUTARIES, there should be minimal - // competition/blocking - { - let mut new_tributaries = NEW_TRIBUTARIES.write().await; - while let Some(spec) = new_tributaries.pop_front() { - add_tributary(raw_db.clone(), key.clone(), p2p.clone(), &mut tributaries, spec).await; + { + let tributaries = tributaries.clone(); + let p2p = p2p.clone(); + tokio::spawn(async move { + loop { + // The following handle_new_blocks function may take an arbitrary amount of time + // If registering a new tributary waited for a lock on the tributaries table, the + // substrate scanner may wait on a lock for an arbitrary amount of time + // By instead using the distinct NEW_TRIBUTARIES, there should be minimal + // competition/blocking + { + let mut new_tributaries = NEW_TRIBUTARIES.write().await; + while let Some(spec) = new_tributaries.pop_front() { + add_tributary( + raw_db.clone(), + key.clone(), + p2p.clone(), + // This is a short-lived write acquisition, which is why it should be fine + &mut *tributaries.write().await, + spec, + ) + .await; + } + } + + // Unknown-length read acquisition. This would risk screwing over the P2P process EXCEPT + // they both use read locks. Accordingly, they can co-exist + for ActiveTributary { spec, tributary } in tributaries.read().await.values() { + tributary::scanner::handle_new_blocks::<_, _, P>( + &mut tributary_db, + &key, + &mut processor, + spec, + tributary, + ) + .await; + } + + sleep(Duration::from_secs(3)).await; + } + }); + } + + // Handle P2P messages + { + tokio::spawn(async move { + loop { + let msg = p2p.receive().await; + match msg.kind { + P2pMessageKind::Tributary(genesis) => { + let tributaries_read = tributaries.read().await; + let Some(tributary) = tributaries_read.get(&genesis) else { + log::debug!("received p2p message for unknown network"); + continue; + }; + + if tributary.tributary.handle_message(&msg.msg).await { + P2p::broadcast(&p2p, msg.kind, msg.msg).await; + } + } } } - - for ActiveTributary { spec, tributary } in tributaries.values() { - tributary::scanner::handle_new_blocks::<_, _, P>( - &mut tributary_db, - &key, - &mut processor, - spec, - tributary, - ) - .await; - } - - sleep(Duration::from_secs(3)).await; - } - }); + }); + } } loop { diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 97c78827..85f1f909 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -1,6 +1,7 @@ use core::fmt::Debug; use std::{ sync::{Arc, RwLock}, + io::Read, collections::VecDeque, }; @@ -10,33 +11,81 @@ pub use tributary::P2p as TributaryP2p; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] pub enum P2pMessageKind { - Tributary, + Tributary([u8; 32]), } impl P2pMessageKind { - fn to_byte(self) -> u8 { + fn serialize(&self) -> Vec { match self { - P2pMessageKind::Tributary => 0, + P2pMessageKind::Tributary(genesis) => { + let mut res = vec![0]; + res.extend(genesis); + res + } } } - fn from_byte(byte: u8) -> Option { - match byte { - 0 => Some(P2pMessageKind::Tributary), + fn read(reader: &mut R) -> Option { + let mut kind = [0; 1]; + reader.read_exact(&mut kind).ok()?; + match kind[0] { + 0 => Some({ + let mut genesis = [0; 32]; + reader.read_exact(&mut genesis).ok()?; + P2pMessageKind::Tributary(genesis) + }), _ => None, } } } -// TODO -#[async_trait] -pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { - async fn broadcast(&self, kind: P2pMessageKind, msg: Vec); - async fn receive(&self) -> Option<(P2pMessageKind, Vec)>; +#[derive(Clone, Debug)] +pub struct Message { + pub sender: P::Id, + pub kind: P2pMessageKind, + pub msg: Vec, } +#[async_trait] +pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { + type Id: Send + Sync + Clone + Debug; + + async fn send_raw(&self, to: Self::Id, msg: Vec); + async fn broadcast_raw(&self, msg: Vec); + async fn receive_raw(&self) -> (Self::Id, Vec); + + async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec) { + let mut actual_msg = kind.serialize(); + actual_msg.extend(msg); + self.send_raw(to, actual_msg).await; + } + async fn broadcast(&self, kind: P2pMessageKind, msg: Vec) { + let mut actual_msg = kind.serialize(); + actual_msg.extend(msg); + self.broadcast_raw(actual_msg).await; + } + async fn receive(&self) -> Message { + let (sender, kind, msg) = loop { + let (sender, msg) = self.receive_raw().await; + if msg.is_empty() { + log::error!("empty p2p message from {sender:?}"); + continue; + } + + let mut msg_ref = msg.as_ref(); + let Some(kind) = P2pMessageKind::read::<&[u8]>(&mut msg_ref) else { + log::error!("invalid p2p message kind from {sender:?}"); + continue; + }; + break (sender, kind, msg_ref.to_vec()); + }; + Message { sender, kind, msg } + } +} + +#[allow(clippy::type_complexity)] #[derive(Clone, Debug)] -pub struct LocalP2p(usize, Arc>>>>); +pub struct LocalP2p(usize, Arc)>>>>); impl LocalP2p { pub fn new(validators: usize) -> Vec { @@ -51,29 +100,35 @@ impl LocalP2p { #[async_trait] impl P2p for LocalP2p { - async fn broadcast(&self, kind: P2pMessageKind, mut msg: Vec) { - msg.insert(0, kind.to_byte()); + type Id = usize; + + async fn send_raw(&self, to: Self::Id, msg: Vec) { + self.1.write().unwrap()[to].push_back((self.0, msg)); + } + + async fn broadcast_raw(&self, msg: Vec) { for (i, msg_queue) in self.1.write().unwrap().iter_mut().enumerate() { if i == self.0 { continue; } - msg_queue.push_back(msg.clone()); + msg_queue.push_back((self.0, msg.clone())); } } - async fn receive(&self) -> Option<(P2pMessageKind, Vec)> { - let mut msg = self.1.write().unwrap()[self.0].pop_front()?; - if msg.is_empty() { - log::error!("empty p2p message"); - return None; + async fn receive_raw(&self) -> (Self::Id, Vec) { + // This is a cursed way to implement an async read from a Vec + loop { + if let Some(res) = self.1.write().unwrap()[self.0].pop_front() { + return res; + } + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } - Some((P2pMessageKind::from_byte(msg.remove(0))?, msg)) } } #[async_trait] impl TributaryP2p for LocalP2p { - async fn broadcast(&self, msg: Vec) { - ::broadcast(self, P2pMessageKind::Tributary, msg).await + async fn broadcast(&self, genesis: [u8; 32], msg: Vec) { + ::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await } } diff --git a/coordinator/src/substrate/mod.rs b/coordinator/src/substrate/mod.rs index fb9a2791..5ddf4c74 100644 --- a/coordinator/src/substrate/mod.rs +++ b/coordinator/src/substrate/mod.rs @@ -21,7 +21,7 @@ use serai_db::DbTxn; use processor_messages::{SubstrateContext, key_gen::KeyGenId, CoordinatorMessage}; -use crate::{Db, P2p, processor::Processor, tributary::TributarySpec}; +use crate::{Db, processor::Processor, tributary::TributarySpec}; mod db; pub use db::*; @@ -63,7 +63,6 @@ async fn handle_new_set< // We already have a unique event ID based on block, event index (where event index is // the one generated in this handle_block function) // We could use that on this end and the processor end? - // TODO: Should this be handled in the Tributary code? processor .send(CoordinatorMessage::KeyGen( processor_messages::key_gen::CoordinatorMessage::GenerateKey { @@ -212,12 +211,10 @@ async fn handle_block< Fut: Future, ANT: Clone + Fn(D, TributarySpec) -> Fut, Pro: Processor, - P: P2p, >( db: &mut SubstrateDb, key: &Zeroizing<::F>, add_new_tributary: ANT, - p2p: &P, processor: &mut Pro, serai: &Serai, block: Block, @@ -235,7 +232,6 @@ async fn handle_block< // stable) if !SubstrateDb::::handled_event(&db.0, hash, event_id) { if let ValidatorSetsEvent::NewSet { set } = new_set { - // TODO2: Use a DB on a dedicated volume handle_new_set(&db.0, key, add_new_tributary.clone(), processor, serai, &block, set) .await?; } else { @@ -283,12 +279,10 @@ pub async fn handle_new_blocks< Fut: Future, ANT: Clone + Fn(D, TributarySpec) -> Fut, Pro: Processor, - P: P2p, >( db: &mut SubstrateDb, key: &Zeroizing<::F>, add_new_tributary: ANT, - p2p: &P, processor: &mut Pro, serai: &Serai, last_block: &mut u64, @@ -306,7 +300,6 @@ pub async fn handle_new_blocks< db, key, add_new_tributary.clone(), - p2p, processor, serai, if b == latest_number { diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs index f6fb19cf..e355d265 100644 --- a/coordinator/src/tests/tributary/chain.rs +++ b/coordinator/src/tests/tributary/chain.rs @@ -1,8 +1,8 @@ use std::time::{Duration, SystemTime}; use zeroize::Zeroizing; - use rand_core::{RngCore, CryptoRng, OsRng}; +use futures::{task::Poll, poll}; use ciphersuite::{ group::{ff::Field, GroupEncoding}, @@ -95,11 +95,12 @@ pub async fn run_tributaries( ) { loop { for (p2p, tributary) in tributaries.iter_mut() { - while let Some(msg) = p2p.receive().await { - match msg.0 { - P2pMessageKind::Tributary => { - if tributary.handle_message(&msg.1).await { - p2p.broadcast(msg.0, msg.1).await; + while let Poll::Ready(msg) = poll!(p2p.receive()) { + match msg.kind { + P2pMessageKind::Tributary(genesis) => { + assert_eq!(genesis, tributary.genesis()); + if tributary.handle_message(&msg.msg).await { + p2p.broadcast(msg.kind, msg.msg).await; } } } @@ -163,10 +164,11 @@ async fn tributary_test() { let timeout = SystemTime::now() + Duration::from_secs(65); while (blocks < 10) && (SystemTime::now().duration_since(timeout).is_err()) { for (p2p, tributary) in tributaries.iter_mut() { - while let Some(msg) = p2p.receive().await { - match msg.0 { - P2pMessageKind::Tributary => { - tributary.handle_message(&msg.1).await; + while let Poll::Ready(msg) = poll!(p2p.receive()) { + match msg.kind { + P2pMessageKind::Tributary(genesis) => { + assert_eq!(genesis, tributary.genesis()); + tributary.handle_message(&msg.msg).await; } } } @@ -187,10 +189,11 @@ async fn tributary_test() { // Handle all existing messages for (p2p, tributary) in tributaries.iter_mut() { - while let Some(msg) = p2p.receive().await { - match msg.0 { - P2pMessageKind::Tributary => { - tributary.handle_message(&msg.1).await; + while let Poll::Ready(msg) = poll!(p2p.receive()) { + match msg.kind { + P2pMessageKind::Tributary(genesis) => { + assert_eq!(genesis, tributary.genesis()); + tributary.handle_message(&msg.msg).await; } } } diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index 2cc3fd3c..2cf07999 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -30,7 +30,7 @@ async fn dkg_test() { let keys = new_keys(&mut OsRng); let spec = new_spec(&mut OsRng, &keys); - let mut tributaries = new_tributaries(&keys, &spec).await; + let tributaries = new_tributaries(&keys, &spec).await; // Run the tributaries in the background tokio::spawn(run_tributaries(tributaries.clone())); diff --git a/coordinator/src/tests/tributary/tx.rs b/coordinator/src/tests/tributary/tx.rs index 9c6ea156..ae3afd0d 100644 --- a/coordinator/src/tests/tributary/tx.rs +++ b/coordinator/src/tests/tributary/tx.rs @@ -19,7 +19,7 @@ async fn tx_test() { let keys = new_keys(&mut OsRng); let spec = new_spec(&mut OsRng, &keys); - let mut tributaries = new_tributaries(&keys, &spec).await; + let tributaries = new_tributaries(&keys, &spec).await; // Run the tributaries in the background tokio::spawn(run_tributaries(tributaries.clone())); diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary/src/blockchain.rs index c08c6ff6..8038dcd1 100644 --- a/coordinator/tributary/src/blockchain.rs +++ b/coordinator/tributary/src/blockchain.rs @@ -84,10 +84,6 @@ impl Blockchain { res } - pub(crate) fn genesis(&self) -> [u8; 32] { - self.genesis - } - pub(crate) fn tip(&self) -> [u8; 32] { self.tip } diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 7b2516d4..ae47fdd9 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -20,6 +20,8 @@ use ::tendermint::{ use serai_db::Db; +use tokio::sync::RwLock as AsyncRwLock; + mod merkle; pub(crate) use merkle::*; @@ -72,22 +74,23 @@ pub trait ReadWrite: Sized { #[async_trait] pub trait P2p: 'static + Send + Sync + Clone + Debug { - async fn broadcast(&self, msg: Vec); + async fn broadcast(&self, genesis: [u8; 32], msg: Vec); } #[async_trait] impl P2p for Arc

{ - async fn broadcast(&self, msg: Vec) { - (*self).broadcast(msg).await + async fn broadcast(&self, genesis: [u8; 32], msg: Vec) { + (*self).broadcast(genesis, msg).await } } #[derive(Clone)] pub struct Tributary { + genesis: [u8; 32], network: TendermintNetwork, synced_block: SyncedBlockSender>, - messages: MessageSender>, + messages: Arc>>>, } impl Tributary { @@ -121,7 +124,7 @@ impl Tributary { TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; tokio::task::spawn(machine.run()); - Some(Self { network, synced_block, messages }) + Some(Self { genesis, network, synced_block, messages: Arc::new(AsyncRwLock::new(messages)) }) } pub fn block_time() -> u32 { @@ -129,7 +132,7 @@ impl Tributary { } pub fn genesis(&self) -> [u8; 32] { - self.network.blockchain.read().unwrap().genesis() + self.genesis } pub fn block_number(&self) -> u32 { self.network.blockchain.read().unwrap().block_number() @@ -153,12 +156,14 @@ impl Tributary { } // Returns if the transaction was valid. - pub async fn add_transaction(&mut self, tx: T) -> bool { + // Safe to be &self since the only meaningful usage of self is self.network.blockchain which + // successfully acquires its own write lock. + pub async fn add_transaction(&self, tx: T) -> bool { let mut to_broadcast = vec![TRANSACTION_MESSAGE]; tx.write(&mut to_broadcast).unwrap(); let res = self.network.blockchain.write().unwrap().add_transaction(true, tx); if res { - self.network.p2p.broadcast(to_broadcast).await; + self.network.p2p.broadcast(self.genesis, to_broadcast).await; } res } @@ -189,7 +194,9 @@ impl Tributary { } // Return true if the message should be rebroadcasted. - pub async fn handle_message(&mut self, msg: &[u8]) -> bool { + // Safe to be &self since the only usage of self is on self.network.blockchain and self.messages, + // both which successfully acquire their own write locks and don't rely on each other + pub async fn handle_message(&self, msg: &[u8]) -> bool { match msg.first() { Some(&TRANSACTION_MESSAGE) => { let Ok(tx) = T::read::<&[u8]>(&mut &msg[1 ..]) else { @@ -212,7 +219,7 @@ impl Tributary { return false; }; - self.messages.send(msg).await.unwrap(); + self.messages.write().await.send(msg).await.unwrap(); false } diff --git a/coordinator/tributary/src/tendermint.rs b/coordinator/tributary/src/tendermint.rs index 5145651a..dc97baec 100644 --- a/coordinator/tributary/src/tendermint.rs +++ b/coordinator/tributary/src/tendermint.rs @@ -256,7 +256,7 @@ impl Network for TendermintNetwork { async fn broadcast(&mut self, msg: SignedMessageFor) { let mut to_broadcast = vec![TENDERMINT_MESSAGE]; to_broadcast.extend(msg.encode()); - self.p2p.broadcast(to_broadcast).await + self.p2p.broadcast(self.genesis, to_broadcast).await } async fn slash(&mut self, validator: Self::ValidatorId) { // TODO: Handle this slash diff --git a/coordinator/tributary/tendermint/src/ext.rs b/coordinator/tributary/tendermint/src/ext.rs index 796015f0..1c922c1d 100644 --- a/coordinator/tributary/tendermint/src/ext.rs +++ b/coordinator/tributary/tendermint/src/ext.rs @@ -263,6 +263,7 @@ pub trait Network: Send + Sync { /// Trigger a slash for the validator in question who was definitively malicious. /// /// The exact process of triggering a slash is undefined and left to the network as a whole. + // TODO: We need to provide some evidence for this. async fn slash(&mut self, validator: Self::ValidatorId); /// Validate a block.