From 193281e387bbddfd2093fc5cb52dc968e9180bb8 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sat, 22 Oct 2022 01:43:07 -0400 Subject: [PATCH] Get the result of block importing --- substrate/consensus/Cargo.toml | 2 +- substrate/consensus/src/import_queue.rs | 66 ++++++++++++++++++++++--- substrate/tendermint/src/ext.rs | 2 +- substrate/tendermint/src/lib.rs | 4 +- substrate/tendermint/tests/ext.rs | 2 +- 5 files changed, 63 insertions(+), 13 deletions(-) diff --git a/substrate/consensus/Cargo.toml b/substrate/consensus/Cargo.toml index 756ac11d..0de507e4 100644 --- a/substrate/consensus/Cargo.toml +++ b/substrate/consensus/Cargo.toml @@ -17,7 +17,7 @@ async-trait = "0.1" log = "0.4" -tokio = "1" +tokio = { version = "1", features = ["sync", "rt"] } sp-core = { git = "https://github.com/serai-dex/substrate" } sp-application-crypto = { git = "https://github.com/serai-dex/substrate" } diff --git a/substrate/consensus/src/import_queue.rs b/substrate/consensus/src/import_queue.rs index 2be10e4c..9b4f24eb 100644 --- a/substrate/consensus/src/import_queue.rs +++ b/substrate/consensus/src/import_queue.rs @@ -13,7 +13,10 @@ use std::{ marker::PhantomData, + pin::Pin, sync::{Arc, RwLock}, + task::{Poll, Context}, + future::Future, time::Duration, collections::HashMap, }; @@ -22,7 +25,7 @@ use async_trait::async_trait; use log::warn; -use tokio::sync::RwLock as AsyncRwLock; +use tokio::{sync::RwLock as AsyncRwLock, runtime::Handle}; use sp_core::{Encode, Decode}; use sp_application_crypto::sr25519::Signature; @@ -45,6 +48,9 @@ use sc_consensus::{ BlockImport, JustificationImport, import_queue::IncomingBlock, + BlockImportStatus, + BlockImportError, + Link, BasicQueue, }; @@ -82,7 +88,7 @@ struct TendermintImport< providers: Arc, env: Arc>, - queue: Arc>>>>, + queue: Arc>>>>, } impl< @@ -375,6 +381,46 @@ where } } +// Custom helpers for ImportQueue in order to obtain the result of a block's importing +struct ValidateLink(Option<(B::Hash, bool)>); +impl Link for ValidateLink { + fn blocks_processed( + &mut self, + imported: usize, + count: usize, + results: Vec<( + Result::Number>, BlockImportError>, + B::Hash, + )>, + ) { + assert_eq!(imported, 1); + assert_eq!(count, 1); + self.0 = Some((results[0].1, results[0].0.is_ok())); + } +} + +struct ImportFuture<'a, B: Block, T: Send>(B::Hash, RwLock<&'a mut TendermintImportQueue>); +impl<'a, B: Block, T: Send> ImportFuture<'a, B, T> { + fn new(hash: B::Hash, queue: &'a mut TendermintImportQueue) -> ImportFuture { + ImportFuture(hash, RwLock::new(queue)) + } +} + +impl<'a, B: Block, T: Send> Future for ImportFuture<'a, B, T> { + type Output = bool; + + fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll { + let mut link = ValidateLink(None); + self.1.write().unwrap().poll_actions(ctx, &mut link); + if let Some(res) = link.0 { + assert_eq!(res.0, self.0); + Poll::Ready(res.1) + } else { + Poll::Pending + } + } +} + #[async_trait] impl< B: Block, @@ -410,11 +456,11 @@ where todo!() } - fn validate(&mut self, block: &B) -> Result<(), BlockError> { + async fn validate(&mut self, block: &B) -> Result<(), BlockError> { let hash = block.hash(); let (header, body) = block.clone().deconstruct(); *self.importing_block.write().unwrap() = Some(hash); - self.queue.write().unwrap().as_mut().unwrap().import_blocks( + self.queue.write().await.as_mut().unwrap().import_blocks( BlockOrigin::NetworkBroadcast, vec![IncomingBlock { hash, @@ -430,8 +476,12 @@ where state: None, }], ); - todo!() - // self.queue.poll_actions + + if ImportFuture::new(hash, self.queue.write().await.as_mut().unwrap()).await { + Ok(()) + } else { + todo!() + } } async fn add_block(&mut self, block: B, commit: Commit) -> B { @@ -470,12 +520,12 @@ where providers, env: Arc::new(AsyncRwLock::new(env)), - queue: Arc::new(RwLock::new(None)), + queue: Arc::new(AsyncRwLock::new(None)), }; let boxed = Box::new(import.clone()); let queue = || BasicQueue::new(import.clone(), boxed.clone(), Some(boxed.clone()), spawner, registry); - *import.queue.write().unwrap() = Some(queue()); + *Handle::current().block_on(import.queue.write()) = Some(queue()); queue() } diff --git a/substrate/tendermint/src/ext.rs b/substrate/tendermint/src/ext.rs index e015eee0..96536e12 100644 --- a/substrate/tendermint/src/ext.rs +++ b/substrate/tendermint/src/ext.rs @@ -180,7 +180,7 @@ pub trait Network: Send + Sync { async fn slash(&mut self, validator: Self::ValidatorId); /// Validate a block. - fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>; + async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>; /// Add a block, returning the proposal for the next one. It's possible a block, which was never /// validated or even failed validation, may be passed here if a supermajority of validators did /// consider it valid and created a commit for it. This deviates from the paper which will have a diff --git a/substrate/tendermint/src/lib.rs b/substrate/tendermint/src/lib.rs index b420f5dc..56dea0c3 100644 --- a/substrate/tendermint/src/lib.rs +++ b/substrate/tendermint/src/lib.rs @@ -436,7 +436,7 @@ impl TendermintMachine { // 22-33 if self.step == Step::Propose { // Delay error handling (triggering a slash) until after we vote. - let (valid, err) = match self.network.write().await.validate(block) { + let (valid, err) = match self.network.write().await.validate(block).await { Ok(_) => (true, Ok(None)), Err(BlockError::Temporal) => (false, Ok(None)), Err(BlockError::Fatal) => (false, Err(TendermintError::Malicious(proposer))), @@ -478,7 +478,7 @@ impl TendermintMachine { // being set, or only being set historically, means this has yet to be run if self.log.has_consensus(self.round, Data::Prevote(Some(block.id()))) { - match self.network.write().await.validate(block) { + match self.network.write().await.validate(block).await { Ok(_) => (), Err(BlockError::Temporal) => (), Err(BlockError::Fatal) => Err(TendermintError::Malicious(proposer))?, diff --git a/substrate/tendermint/tests/ext.rs b/substrate/tendermint/tests/ext.rs index 6122c939..9361d8da 100644 --- a/substrate/tendermint/tests/ext.rs +++ b/substrate/tendermint/tests/ext.rs @@ -109,7 +109,7 @@ impl Network for TestNetwork { todo!() } - fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> { + async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> { block.valid }