diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index c5988709..b04b5388 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -42,206 +42,222 @@ lazy_static::lazy_static! { static ref NEW_TRIBUTARIES: RwLock> = RwLock::new(VecDeque::new()); } -async fn run( +// Specifies a new tributary +async fn create_new_tributary(db: D, spec: TributarySpec) { + // Save it to the database + MainDb(db).add_active_tributary(&spec); + // Add it to the queue + // If we reboot before this is read from the queue, the fact it was saved to the database + // means it'll be handled on reboot + NEW_TRIBUTARIES.write().await.push_back(spec); +} + +pub struct ActiveTributary { + spec: TributarySpec, + tributary: Arc>>, +} + +// Adds a tributary into the specified HahMap +async fn add_tributary( + db: D, + key: Zeroizing<::F>, + p2p: P, + tributaries: &mut HashMap<[u8; 32], ActiveTributary>, + spec: TributarySpec, +) { + let tributary = Tributary::<_, Transaction, _>::new( + // TODO: Use a db on a distinct volume + db, + spec.genesis(), + spec.start_time(), + key, + spec.validators(), + p2p, + ) + .await + .unwrap(); + + tributaries.insert( + tributary.genesis(), + ActiveTributary { spec, tributary: Arc::new(RwLock::new(tributary)) }, + ); +} + +pub async fn scan_substrate( + db: D, + key: Zeroizing<::F>, + mut processor: Pro, + serai: Serai, +) { + let mut db = substrate::SubstrateDb::new(db); + let mut last_substrate_block = db.last_block(); + + loop { + match substrate::handle_new_blocks( + &mut db, + &key, + create_new_tributary, + &mut processor, + &serai, + &mut last_substrate_block, + ) + .await + { + // TODO: Should this use a notification system for new blocks? + // Right now it's sleeping for half the block time. + Ok(()) => sleep(Duration::from_secs(3)).await, + Err(e) => { + log::error!("couldn't communicate with serai node: {e}"); + sleep(Duration::from_secs(5)).await; + } + } + } +} + +#[allow(clippy::type_complexity)] +pub async fn scan_tributaries( raw_db: D, key: Zeroizing<::F>, p2p: P, mut processor: Pro, - serai: Serai, + tributaries: Arc>>>, ) { - let add_new_tributary = |db, spec: TributarySpec| async { - // Save it to the database - MainDb(db).add_active_tributary(&spec); - // Add it to the queue - // If we reboot before this is read from the queue, the fact it was saved to the database - // means it'll be handled on reboot - NEW_TRIBUTARIES.write().await.push_back(spec); - }; - - // Handle new Substrate blocks - { - let mut substrate_db = substrate::SubstrateDb::new(raw_db.clone()); - let mut last_substrate_block = substrate_db.last_block(); - - let key = key.clone(); - let mut processor = processor.clone(); - tokio::spawn(async move { - loop { - match substrate::handle_new_blocks( - &mut substrate_db, - &key, - add_new_tributary, - &mut processor, - &serai, - &mut last_substrate_block, + // Handle new Tributary blocks + let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); + loop { + // The following handle_new_blocks function may take an arbitrary amount of time + // Accordingly, it may take a long time to acquire a write lock on the tributaries table + // By definition of NEW_TRIBUTARIES, we allow tributaries to be added almost immediately, + // meaning the Substrate scanner won't become blocked on this + { + let mut new_tributaries = NEW_TRIBUTARIES.write().await; + while let Some(spec) = new_tributaries.pop_front() { + add_tributary( + raw_db.clone(), + key.clone(), + p2p.clone(), + // This is a short-lived write acquisition, which is why it should be fine + &mut *tributaries.write().await, + spec, ) - .await - { - // TODO: Should this use a notification system for new blocks? - // Right now it's sleeping for half the block time. - Ok(()) => sleep(Duration::from_secs(3)).await, - Err(e) => { - log::error!("couldn't communicate with serai node: {e}"); - sleep(Duration::from_secs(5)).await; - } - } + .await; } - }); - } - - // Handle the Tributaries - { - struct ActiveTributary { - spec: TributarySpec, - tributary: Arc>>, } - // Arc so this can be shared between the Tributary scanner task and the P2P task - // Write locks on this may take a while to acquire - let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary>::new())); - - async fn add_tributary( - db: D, - key: Zeroizing<::F>, - p2p: P, - tributaries: &mut HashMap<[u8; 32], ActiveTributary>, - spec: TributarySpec, - ) { - let tributary = Tributary::<_, Transaction, _>::new( - // TODO: Use a db on a distinct volume - db, - spec.genesis(), - spec.start_time(), - key, - spec.validators(), - p2p, - ) - .await - .unwrap(); - - tributaries.insert( - tributary.genesis(), - ActiveTributary { spec, tributary: Arc::new(RwLock::new(tributary)) }, - ); - } - - // Reload active tributaries from the database - // TODO: Can MainDb take a borrow? - for spec in MainDb(raw_db.clone()).active_tributaries().1 { - add_tributary( - raw_db.clone(), - key.clone(), - p2p.clone(), - &mut *tributaries.write().await, + // TODO: Instead of holding this lock long term, should this take in Arc RwLock and + // re-acquire read locks? + for ActiveTributary { spec, tributary } in tributaries.read().await.values() { + tributary::scanner::handle_new_blocks::<_, _, P>( + &mut tributary_db, + &key, + &mut processor, spec, + &*tributary.read().await, ) .await; } - // Handle new Tributary blocks - let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); - { - let tributaries = tributaries.clone(); - let p2p = p2p.clone(); - tokio::spawn(async move { - loop { - // The following handle_new_blocks function may take an arbitrary amount of time - // Accordingly, it may take a long time to acquire a write lock on the tributaries table - // By definition of NEW_TRIBUTARIES, we allow tributaries to be added almost immediately, - // meaning the Substrate scanner won't become blocked on this - { - let mut new_tributaries = NEW_TRIBUTARIES.write().await; - while let Some(spec) = new_tributaries.pop_front() { - add_tributary( - raw_db.clone(), - key.clone(), - p2p.clone(), - // This is a short-lived write acquisition, which is why it should be fine - &mut *tributaries.write().await, - spec, - ) - .await; - } - } + // Sleep for half the block time + // TODO: Should we define a notification system for when a new block occurs? + sleep(Duration::from_secs((Tributary::::block_time() / 2).into())).await; + } +} - // TODO: Instead of holding this lock long term, should this take in Arc RwLock and - // re-acquire read locks? - for ActiveTributary { spec, tributary } in tributaries.read().await.values() { - tributary::scanner::handle_new_blocks::<_, _, P>( - &mut tributary_db, - &key, - &mut processor, - spec, - &*tributary.read().await, - ) - .await; - } +#[allow(clippy::type_complexity)] +pub async fn heartbeat_tributaries( + p2p: P, + tributaries: Arc>>>, +) { + let ten_blocks_of_time = + Duration::from_secs((Tributary::::block_time() * 10).into()); - // Sleep for half the block time - // TODO: Should we define a notification system for when a new block occurs? - sleep(Duration::from_secs((Tributary::::block_time() / 2).into())) - .await; - } - }); + loop { + for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() { + let tributary = tributary.read().await; + let tip = tributary.tip().await; + let block_time = SystemTime::UNIX_EPOCH + + Duration::from_secs(tributary.time_of_block(&tip).await.unwrap_or(0)); + + // Only trigger syncing if the block is more than a minute behind + if SystemTime::now() > (block_time + Duration::from_secs(60)) { + log::warn!("last known tributary block was over a minute ago"); + P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), tip.to_vec()).await; + } } - // If a Tributary has fallen behind, trigger syncing - { - let p2p = p2p.clone(); - let tributaries = tributaries.clone(); - tokio::spawn(async move { - let ten_blocks_of_time = - Duration::from_secs((Tributary::::block_time() * 10).into()); + // Only check once every 10 blocks of time + sleep(ten_blocks_of_time).await; + } +} - loop { - for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() { - let tributary = tributary.read().await; - let tip = tributary.tip(); - let block_time = SystemTime::UNIX_EPOCH + - Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0)); +#[allow(clippy::type_complexity)] +pub async fn handle_p2p( + p2p: P, + tributaries: Arc>>>, +) { + loop { + let msg = p2p.receive().await; + match msg.kind { + P2pMessageKind::Tributary(genesis) => { + let tributaries_read = tributaries.read().await; + let Some(tributary) = tributaries_read.get(&genesis) else { + log::debug!("received p2p message for unknown network"); + continue; + }; - // Only trigger syncing if the block is more than a minute behind - if SystemTime::now() > (block_time + Duration::from_secs(60)) { - log::warn!("last known tributary block was over a minute ago"); - P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), tip.to_vec()) - .await; - } - } - - // Only check once every 10 blocks of time - sleep(ten_blocks_of_time).await; + // This is misleading being read, as it will mutate the Tributary, yet there's + // greater efficiency when it is read + // The safety of it is also justified by Tributary::handle_message's documentation + if tributary.tributary.read().await.handle_message(&msg.msg).await { + P2p::broadcast(&p2p, msg.kind, msg.msg).await; } - }); - } + } - // Handle P2P messages - { - tokio::spawn(async move { - loop { - let msg = p2p.receive().await; - match msg.kind { - P2pMessageKind::Tributary(genesis) => { - let tributaries_read = tributaries.read().await; - let Some(tributary) = tributaries_read.get(&genesis) else { - log::debug!("received p2p message for unknown network"); - continue; - }; - - // This is misleading being read, as it will mutate the Tributary, yet there's - // greater efficiency when it is read - // The safety of it is also justified by Tributary::handle_message's documentation - if tributary.tributary.read().await.handle_message(&msg.msg).await { - P2p::broadcast(&p2p, msg.kind, msg.msg).await; - } - } - - // TODO: Respond with the missing block, if there are any - P2pMessageKind::Heartbeat(genesis) => todo!(), - } - } - }); + // TODO: Respond with the missing block, if there are any + P2pMessageKind::Heartbeat(genesis) => todo!(), } } +} + +pub async fn run( + raw_db: D, + key: Zeroizing<::F>, + p2p: P, + processor: Pro, + serai: Serai, +) { + // Handle new Substrate blocks + tokio::spawn(scan_substrate(raw_db.clone(), key.clone(), processor.clone(), serai.clone())); + + // Handle the Tributaries + + // Arc so this can be shared between the Tributary scanner task and the P2P task + // Write locks on this may take a while to acquire + let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary>::new())); + + // Reload active tributaries from the database + // TODO: Can MainDb take a borrow? + for spec in MainDb(raw_db.clone()).active_tributaries().1 { + add_tributary(raw_db.clone(), key.clone(), p2p.clone(), &mut *tributaries.write().await, spec) + .await; + } + + // Handle new blocks for each Tributary + tokio::spawn(scan_tributaries( + raw_db.clone(), + key.clone(), + p2p.clone(), + processor, + tributaries.clone(), + )); + + // Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block + // in a while (presumably because we're behind) + tokio::spawn(heartbeat_tributaries(p2p.clone(), tributaries.clone())); + + // Handle P2P messages + // TODO: We also have to broadcast blocks once they're added + tokio::spawn(handle_p2p(p2p, tributaries)); loop { // Handle all messages from processors diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 3d78d9d4..985860d3 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -1,12 +1,10 @@ use core::fmt::Debug; -use std::{ - sync::{Arc, RwLock}, - io::Read, - collections::VecDeque, -}; +use std::{sync::Arc, io::Read, collections::VecDeque}; use async_trait::async_trait; +use tokio::sync::RwLock; + pub use tributary::P2p as TributaryP2p; #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] @@ -94,6 +92,7 @@ pub trait P2p: Send + Sync + Clone + Debug + TributaryP2p { } } +// TODO: Move this to tests #[allow(clippy::type_complexity)] #[derive(Clone, Debug)] pub struct LocalP2p(usize, Arc)>>>>); @@ -114,11 +113,11 @@ impl P2p for LocalP2p { type Id = usize; async fn send_raw(&self, to: Self::Id, msg: Vec) { - self.1.write().unwrap()[to].push_back((self.0, msg)); + self.1.write().await[to].push_back((self.0, msg)); } async fn broadcast_raw(&self, msg: Vec) { - for (i, msg_queue) in self.1.write().unwrap().iter_mut().enumerate() { + for (i, msg_queue) in self.1.write().await.iter_mut().enumerate() { if i == self.0 { continue; } @@ -129,7 +128,7 @@ impl P2p for LocalP2p { async fn receive_raw(&self) -> (Self::Id, Vec) { // This is a cursed way to implement an async read from a Vec loop { - if let Some(res) = self.1.write().unwrap()[self.0].pop_front() { + if let Some(res) = self.1.write().await[self.0].pop_front() { return res; } tokio::time::sleep(std::time::Duration::from_millis(100)).await; diff --git a/coordinator/src/processor.rs b/coordinator/src/processor.rs index e288a2e9..c27f2681 100644 --- a/coordinator/src/processor.rs +++ b/coordinator/src/processor.rs @@ -1,7 +1,6 @@ -use std::{ - sync::{Arc, RwLock}, - collections::VecDeque, -}; +use std::{sync::Arc, collections::VecDeque}; + +use tokio::sync::RwLock; use processor_messages::{ProcessorMessage, CoordinatorMessage}; @@ -31,7 +30,7 @@ impl MemProcessor { #[async_trait::async_trait] impl Processor for MemProcessor { async fn send(&mut self, msg: CoordinatorMessage) { - self.0.write().unwrap().push_back(msg) + self.0.write().await.push_back(msg) } async fn recv(&mut self) -> Message { todo!() diff --git a/coordinator/src/tests/tributary/chain.rs b/coordinator/src/tests/tributary/chain.rs index 429351e9..ca0ccda6 100644 --- a/coordinator/src/tests/tributary/chain.rs +++ b/coordinator/src/tests/tributary/chain.rs @@ -118,20 +118,20 @@ pub async fn wait_for_tx_inclusion( hash: [u8; 32], ) -> [u8; 32] { loop { - let tip = tributary.tip(); + let tip = tributary.tip().await; if tip == last_checked { sleep(Duration::from_secs(1)).await; continue; } - let mut queue = vec![tributary.block(&tip).unwrap()]; + let mut queue = vec![tributary.block(&tip).await.unwrap()]; let mut block = None; while { let parent = queue.last().unwrap().parent(); if parent == tributary.genesis() { false } else { - block = Some(tributary.block(&parent).unwrap()); + block = Some(tributary.block(&parent).await.unwrap()); block.as_ref().unwrap().hash() != last_checked } } { @@ -176,7 +176,7 @@ async fn tributary_test() { } } - let tip = tributaries[0].1.tip(); + let tip = tributaries[0].1.tip().await; if tip != last_block { last_block = tip; blocks += 1; @@ -202,11 +202,18 @@ async fn tributary_test() { } } + // handle_message informed the Tendermint machine, yet it still has to process it + // Sleep for a second accordingly + // TODO: Is there a better way to handle this? + sleep(Duration::from_secs(1)).await; + // All tributaries should agree on the tip let mut final_block = None; for (_, tributary) in tributaries { - final_block = final_block.or_else(|| Some(tributary.tip())); - if tributary.tip() != final_block.unwrap() { + if final_block.is_none() { + final_block = Some(tributary.tip().await); + } + if tributary.tip().await != final_block.unwrap() { panic!("tributary had different tip"); } } diff --git a/coordinator/src/tests/tributary/dkg.rs b/coordinator/src/tests/tributary/dkg.rs index 2cf07999..fe000b2d 100644 --- a/coordinator/src/tests/tributary/dkg.rs +++ b/coordinator/src/tests/tributary/dkg.rs @@ -47,7 +47,7 @@ async fn dkg_test() { txs.push(tx); } - let block_before_tx = tributaries[0].1.tip(); + let block_before_tx = tributaries[0].1.tip().await; // Publish all commitments but one for (i, tx) in txs.iter().enumerate().skip(1) { @@ -87,10 +87,10 @@ async fn dkg_test() { // Instantiate a scanner and verify it has nothing to report let (mut scanner_db, mut processor) = new_processor(&keys[0], &spec, &tributaries[0].1).await; - assert!(processor.0.read().unwrap().is_empty()); + assert!(processor.0.read().await.is_empty()); // Publish the last commitment - let block_before_tx = tributaries[0].1.tip(); + let block_before_tx = tributaries[0].1.tip().await; assert!(tributaries[0].1.add_transaction(txs[0].clone()).await); wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, txs[0].hash()).await; sleep(Duration::from_secs(Tributary::::block_time().into())).await; @@ -98,7 +98,7 @@ async fn dkg_test() { // Verify the scanner emits a KeyGen::Commitments message handle_new_blocks(&mut scanner_db, &keys[0], &mut processor, &spec, &tributaries[0].1).await; { - let mut msgs = processor.0.write().unwrap(); + let mut msgs = processor.0.write().await; assert_eq!(msgs.pop_front().unwrap(), expected_commitments); assert!(msgs.is_empty()); } @@ -106,7 +106,7 @@ async fn dkg_test() { // Verify all keys exhibit this scanner behavior for (i, key) in keys.iter().enumerate() { let (_, processor) = new_processor(key, &spec, &tributaries[i].1).await; - let mut msgs = processor.0.write().unwrap(); + let mut msgs = processor.0.write().await; assert_eq!(msgs.pop_front().unwrap(), expected_commitments); assert!(msgs.is_empty()); } @@ -128,7 +128,7 @@ async fn dkg_test() { txs.push(tx); } - let block_before_tx = tributaries[0].1.tip(); + let block_before_tx = tributaries[0].1.tip().await; for (i, tx) in txs.iter().enumerate().skip(1) { assert!(tributaries[i].1.add_transaction(tx.clone()).await); } @@ -138,10 +138,10 @@ async fn dkg_test() { // With just 4 sets of shares, nothing should happen yet handle_new_blocks(&mut scanner_db, &keys[0], &mut processor, &spec, &tributaries[0].1).await; - assert!(processor.0.write().unwrap().is_empty()); + assert!(processor.0.write().await.is_empty()); // Publish the final set of shares - let block_before_tx = tributaries[0].1.tip(); + let block_before_tx = tributaries[0].1.tip().await; assert!(tributaries[0].1.add_transaction(txs[0].clone()).await); wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, txs[0].hash()).await; sleep(Duration::from_secs(Tributary::::block_time().into())).await; @@ -170,7 +170,7 @@ async fn dkg_test() { // Any scanner which has handled the prior blocks should only emit the new event handle_new_blocks(&mut scanner_db, &keys[0], &mut processor, &spec, &tributaries[0].1).await; { - let mut msgs = processor.0.write().unwrap(); + let mut msgs = processor.0.write().await; assert_eq!(msgs.pop_front().unwrap(), shares_for(0)); assert!(msgs.is_empty()); } @@ -178,7 +178,7 @@ async fn dkg_test() { // Yet new scanners should emit all events for (i, key) in keys.iter().enumerate() { let (_, processor) = new_processor(key, &spec, &tributaries[i].1).await; - let mut msgs = processor.0.write().unwrap(); + let mut msgs = processor.0.write().await; assert_eq!(msgs.pop_front().unwrap(), expected_commitments); assert_eq!(msgs.pop_front().unwrap(), shares_for(i)); assert!(msgs.is_empty()); diff --git a/coordinator/src/tests/tributary/tx.rs b/coordinator/src/tests/tributary/tx.rs index ae3afd0d..059781f2 100644 --- a/coordinator/src/tests/tributary/tx.rs +++ b/coordinator/src/tests/tributary/tx.rs @@ -34,7 +34,7 @@ async fn tx_test() { OsRng.fill_bytes(&mut commitments); // Create the TX with a null signature so we can get its sig hash - let block_before_tx = tributaries[sender].1.tip(); + let block_before_tx = tributaries[sender].1.tip().await; let mut tx = Transaction::DkgCommitments(attempt, commitments.clone(), Transaction::empty_signed()); tx.sign(&mut OsRng, spec.genesis(), &key, 0); @@ -46,7 +46,7 @@ async fn tx_test() { // All tributaries should have acknowledged this transaction in a block for (_, tributary) in tributaries { - let block = tributary.block(&included_in).unwrap(); + let block = tributary.block(&included_in).await.unwrap(); assert_eq!(block.transactions, vec![tx.clone()]); } } diff --git a/coordinator/src/tributary/scanner.rs b/coordinator/src/tributary/scanner.rs index e9f5fdbf..affdd689 100644 --- a/coordinator/src/tributary/scanner.rs +++ b/coordinator/src/tributary/scanner.rs @@ -300,14 +300,14 @@ pub async fn handle_new_blocks( let last_block = db.last_block(tributary.genesis()); // Check if there's been a new Tributary block - let latest = tributary.tip(); + let latest = tributary.tip().await; if latest == last_block { return; } let mut blocks = VecDeque::new(); // This is a new block, as per the prior if check - blocks.push_back(tributary.block(&latest).unwrap()); + blocks.push_back(tributary.block(&latest).await.unwrap()); let mut block = None; while { @@ -317,7 +317,7 @@ pub async fn handle_new_blocks( false } else { // Get this block - block = Some(tributary.block(&parent).unwrap()); + block = Some(tributary.block(&parent).await.unwrap()); // If it's the last block we've scanned, it's the end. Else, push it block.as_ref().unwrap().hash() != last_block } diff --git a/coordinator/tributary/Cargo.toml b/coordinator/tributary/Cargo.toml index f529abd1..824288c3 100644 --- a/coordinator/tributary/Cargo.toml +++ b/coordinator/tributary/Cargo.toml @@ -34,8 +34,5 @@ tendermint = { package = "tendermint-machine", path = "./tendermint" } tokio = { version = "1", features = ["macros", "sync", "time", "rt"] } -[dev-dependencies] -zeroize = "^1.5" - [features] tests = [] diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index f38a59a5..931cdf56 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -1,8 +1,5 @@ use core::fmt::Debug; -use std::{ - sync::{Arc, RwLock}, - io, -}; +use std::{sync::Arc, io}; use async_trait::async_trait; @@ -20,7 +17,7 @@ use ::tendermint::{ use serai_db::Db; -use tokio::sync::RwLock as AsyncRwLock; +use tokio::sync::RwLock; mod merkle; pub(crate) use merkle::*; @@ -90,7 +87,7 @@ pub struct Tributary { network: TendermintNetwork, synced_block: SyncedBlockSender>, - messages: Arc>>>, + messages: Arc>>>, } impl Tributary { @@ -124,7 +121,7 @@ impl Tributary { TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; tokio::task::spawn(machine.run()); - Some(Self { genesis, network, synced_block, messages: Arc::new(AsyncRwLock::new(messages)) }) + Some(Self { genesis, network, synced_block, messages: Arc::new(RwLock::new(messages)) }) } pub fn block_time() -> u32 { @@ -134,34 +131,37 @@ impl Tributary { pub fn genesis(&self) -> [u8; 32] { self.genesis } - pub fn block_number(&self) -> u32 { - self.network.blockchain.read().unwrap().block_number() + + // TODO: block, time_of_block, and commit shouldn't require acquiring the read lock + // These values can be safely read directly from the database since they're static + pub async fn block_number(&self) -> u32 { + self.network.blockchain.read().await.block_number() } - pub fn tip(&self) -> [u8; 32] { - self.network.blockchain.read().unwrap().tip() + pub async fn tip(&self) -> [u8; 32] { + self.network.blockchain.read().await.tip() } - pub fn block(&self, hash: &[u8; 32]) -> Option> { - self.network.blockchain.read().unwrap().block(hash) + pub async fn block(&self, hash: &[u8; 32]) -> Option> { + self.network.blockchain.read().await.block(hash) } - pub fn time_of_block(&self, hash: &[u8; 32]) -> Option { + pub async fn time_of_block(&self, hash: &[u8; 32]) -> Option { self .network .blockchain .read() - .unwrap() + .await .commit(hash) .map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap().end_time) } - pub fn commit(&self, hash: &[u8; 32]) -> Option> { - self.network.blockchain.read().unwrap().commit(hash) + pub async fn commit(&self, hash: &[u8; 32]) -> Option> { + self.network.blockchain.read().await.commit(hash) } - pub fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> { - self.network.blockchain.write().unwrap().provide_transaction(tx) + pub async fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> { + self.network.blockchain.write().await.provide_transaction(tx) } - pub fn next_nonce(&self, signer: ::G) -> Option { - self.network.blockchain.read().unwrap().next_nonce(signer) + pub async fn next_nonce(&self, signer: ::G) -> Option { + self.network.blockchain.read().await.next_nonce(signer) } // Returns if the transaction was valid. @@ -170,7 +170,7 @@ impl Tributary { pub async fn add_transaction(&self, tx: T) -> bool { let mut to_broadcast = vec![TRANSACTION_MESSAGE]; tx.write(&mut to_broadcast).unwrap(); - let res = self.network.blockchain.write().unwrap().add_transaction(true, tx); + let res = self.network.blockchain.write().await.add_transaction(true, tx); if res { self.network.p2p.broadcast(self.genesis, to_broadcast).await; } @@ -181,7 +181,7 @@ impl Tributary { // TODO: Since we have a static validator set, we should only need the tail commit? pub async fn sync_block(&mut self, block: Block, commit: Vec) -> bool { let (tip, block_number) = { - let blockchain = self.network.blockchain.read().unwrap(); + let blockchain = self.network.blockchain.read().await; (blockchain.tip(), blockchain.block_number()) }; @@ -215,7 +215,7 @@ impl Tributary { // TODO: Sync mempools with fellow peers // Can we just rebroadcast transactions not included for at least two blocks? - let res = self.network.blockchain.write().unwrap().add_transaction(false, tx); + let res = self.network.blockchain.write().await.add_transaction(false, tx); log::debug!("received transaction message. valid new transaction: {res}"); res } diff --git a/coordinator/tributary/src/tendermint.rs b/coordinator/tributary/src/tendermint.rs index 2754d1a2..49c79178 100644 --- a/coordinator/tributary/src/tendermint.rs +++ b/coordinator/tributary/src/tendermint.rs @@ -1,8 +1,5 @@ use core::ops::Deref; -use std::{ - sync::{Arc, RwLock}, - collections::HashMap, -}; +use std::{sync::Arc, collections::HashMap}; use async_trait::async_trait; @@ -34,7 +31,10 @@ use tendermint::{ }, }; -use tokio::time::{Duration, sleep}; +use tokio::{ + sync::RwLock, + time::{Duration, sleep}, +}; use crate::{ TENDERMINT_MESSAGE, ReadWrite, Transaction, BlockHeader, Block, BlockError, Blockchain, P2p, @@ -273,7 +273,7 @@ impl Network for TendermintNetwork { async fn validate(&mut self, block: &Self::Block) -> Result<(), TendermintBlockError> { let block = Block::read::<&[u8]>(&mut block.0.as_ref()).map_err(|_| TendermintBlockError::Fatal)?; - self.blockchain.read().unwrap().verify_block(&block).map_err(|e| match e { + self.blockchain.read().await.verify_block(&block).map_err(|e| match e { BlockError::NonLocalProvided(_) => TendermintBlockError::Temporal, _ => TendermintBlockError::Fatal, }) @@ -301,7 +301,7 @@ impl Network for TendermintNetwork { }; loop { - let block_res = self.blockchain.write().unwrap().add_block(&block, commit.encode()); + let block_res = self.blockchain.write().await.add_block(&block, commit.encode()); match block_res { Ok(()) => break, Err(BlockError::NonLocalProvided(hash)) => { @@ -316,6 +316,6 @@ impl Network for TendermintNetwork { } } - Some(TendermintBlock(self.blockchain.write().unwrap().build_block().serialize())) + Some(TendermintBlock(self.blockchain.write().await.build_block().serialize())) } }