Get the result of block importing

This commit is contained in:
Luke Parker
2022-10-22 01:43:07 -04:00
parent 8ed0f1f1cf
commit 193281e387
5 changed files with 63 additions and 13 deletions

View File

@@ -17,7 +17,7 @@ async-trait = "0.1"
log = "0.4" log = "0.4"
tokio = "1" tokio = { version = "1", features = ["sync", "rt"] }
sp-core = { git = "https://github.com/serai-dex/substrate" } sp-core = { git = "https://github.com/serai-dex/substrate" }
sp-application-crypto = { git = "https://github.com/serai-dex/substrate" } sp-application-crypto = { git = "https://github.com/serai-dex/substrate" }

View File

@@ -13,7 +13,10 @@
use std::{ use std::{
marker::PhantomData, marker::PhantomData,
pin::Pin,
sync::{Arc, RwLock}, sync::{Arc, RwLock},
task::{Poll, Context},
future::Future,
time::Duration, time::Duration,
collections::HashMap, collections::HashMap,
}; };
@@ -22,7 +25,7 @@ use async_trait::async_trait;
use log::warn; use log::warn;
use tokio::sync::RwLock as AsyncRwLock; use tokio::{sync::RwLock as AsyncRwLock, runtime::Handle};
use sp_core::{Encode, Decode}; use sp_core::{Encode, Decode};
use sp_application_crypto::sr25519::Signature; use sp_application_crypto::sr25519::Signature;
@@ -45,6 +48,9 @@ use sc_consensus::{
BlockImport, BlockImport,
JustificationImport, JustificationImport,
import_queue::IncomingBlock, import_queue::IncomingBlock,
BlockImportStatus,
BlockImportError,
Link,
BasicQueue, BasicQueue,
}; };
@@ -82,7 +88,7 @@ struct TendermintImport<
providers: Arc<CIDP>, providers: Arc<CIDP>,
env: Arc<AsyncRwLock<E>>, env: Arc<AsyncRwLock<E>>,
queue: Arc<RwLock<Option<TendermintImportQueue<B, TransactionFor<C, B>>>>>, queue: Arc<AsyncRwLock<Option<TendermintImportQueue<B, TransactionFor<C, B>>>>>,
} }
impl< impl<
@@ -375,6 +381,46 @@ where
} }
} }
// Custom helpers for ImportQueue in order to obtain the result of a block's importing
struct ValidateLink<B: Block>(Option<(B::Hash, bool)>);
impl<B: Block> Link<B> for ValidateLink<B> {
fn blocks_processed(
&mut self,
imported: usize,
count: usize,
results: Vec<(
Result<BlockImportStatus<<B::Header as Header>::Number>, BlockImportError>,
B::Hash,
)>,
) {
assert_eq!(imported, 1);
assert_eq!(count, 1);
self.0 = Some((results[0].1, results[0].0.is_ok()));
}
}
struct ImportFuture<'a, B: Block, T: Send>(B::Hash, RwLock<&'a mut TendermintImportQueue<B, T>>);
impl<'a, B: Block, T: Send> ImportFuture<'a, B, T> {
fn new(hash: B::Hash, queue: &'a mut TendermintImportQueue<B, T>) -> ImportFuture<B, T> {
ImportFuture(hash, RwLock::new(queue))
}
}
impl<'a, B: Block, T: Send> Future for ImportFuture<'a, B, T> {
type Output = bool;
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let mut link = ValidateLink(None);
self.1.write().unwrap().poll_actions(ctx, &mut link);
if let Some(res) = link.0 {
assert_eq!(res.0, self.0);
Poll::Ready(res.1)
} else {
Poll::Pending
}
}
}
#[async_trait] #[async_trait]
impl< impl<
B: Block, B: Block,
@@ -410,11 +456,11 @@ where
todo!() todo!()
} }
fn validate(&mut self, block: &B) -> Result<(), BlockError> { async fn validate(&mut self, block: &B) -> Result<(), BlockError> {
let hash = block.hash(); let hash = block.hash();
let (header, body) = block.clone().deconstruct(); let (header, body) = block.clone().deconstruct();
*self.importing_block.write().unwrap() = Some(hash); *self.importing_block.write().unwrap() = Some(hash);
self.queue.write().unwrap().as_mut().unwrap().import_blocks( self.queue.write().await.as_mut().unwrap().import_blocks(
BlockOrigin::NetworkBroadcast, BlockOrigin::NetworkBroadcast,
vec![IncomingBlock { vec![IncomingBlock {
hash, hash,
@@ -430,8 +476,12 @@ where
state: None, state: None,
}], }],
); );
if ImportFuture::new(hash, self.queue.write().await.as_mut().unwrap()).await {
Ok(())
} else {
todo!() todo!()
// self.queue.poll_actions }
} }
async fn add_block(&mut self, block: B, commit: Commit<TendermintSigner>) -> B { async fn add_block(&mut self, block: B, commit: Commit<TendermintSigner>) -> B {
@@ -470,12 +520,12 @@ where
providers, providers,
env: Arc::new(AsyncRwLock::new(env)), env: Arc::new(AsyncRwLock::new(env)),
queue: Arc::new(RwLock::new(None)), queue: Arc::new(AsyncRwLock::new(None)),
}; };
let boxed = Box::new(import.clone()); let boxed = Box::new(import.clone());
let queue = let queue =
|| BasicQueue::new(import.clone(), boxed.clone(), Some(boxed.clone()), spawner, registry); || BasicQueue::new(import.clone(), boxed.clone(), Some(boxed.clone()), spawner, registry);
*import.queue.write().unwrap() = Some(queue()); *Handle::current().block_on(import.queue.write()) = Some(queue());
queue() queue()
} }

View File

@@ -180,7 +180,7 @@ pub trait Network: Send + Sync {
async fn slash(&mut self, validator: Self::ValidatorId); async fn slash(&mut self, validator: Self::ValidatorId);
/// Validate a block. /// Validate a block.
fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>; async fn validate(&mut self, block: &Self::Block) -> Result<(), BlockError>;
/// Add a block, returning the proposal for the next one. It's possible a block, which was never /// Add a block, returning the proposal for the next one. It's possible a block, which was never
/// validated or even failed validation, may be passed here if a supermajority of validators did /// validated or even failed validation, may be passed here if a supermajority of validators did
/// consider it valid and created a commit for it. This deviates from the paper which will have a /// consider it valid and created a commit for it. This deviates from the paper which will have a

View File

@@ -436,7 +436,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
// 22-33 // 22-33
if self.step == Step::Propose { if self.step == Step::Propose {
// Delay error handling (triggering a slash) until after we vote. // Delay error handling (triggering a slash) until after we vote.
let (valid, err) = match self.network.write().await.validate(block) { let (valid, err) = match self.network.write().await.validate(block).await {
Ok(_) => (true, Ok(None)), Ok(_) => (true, Ok(None)),
Err(BlockError::Temporal) => (false, Ok(None)), Err(BlockError::Temporal) => (false, Ok(None)),
Err(BlockError::Fatal) => (false, Err(TendermintError::Malicious(proposer))), Err(BlockError::Fatal) => (false, Err(TendermintError::Malicious(proposer))),
@@ -478,7 +478,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
// being set, or only being set historically, means this has yet to be run // being set, or only being set historically, means this has yet to be run
if self.log.has_consensus(self.round, Data::Prevote(Some(block.id()))) { if self.log.has_consensus(self.round, Data::Prevote(Some(block.id()))) {
match self.network.write().await.validate(block) { match self.network.write().await.validate(block).await {
Ok(_) => (), Ok(_) => (),
Err(BlockError::Temporal) => (), Err(BlockError::Temporal) => (),
Err(BlockError::Fatal) => Err(TendermintError::Malicious(proposer))?, Err(BlockError::Fatal) => Err(TendermintError::Malicious(proposer))?,

View File

@@ -109,7 +109,7 @@ impl Network for TestNetwork {
todo!() todo!()
} }
fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> { async fn validate(&mut self, block: &TestBlock) -> Result<(), BlockError> {
block.valid block.valid
} }