mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-13 14:39:25 +00:00
Connect the Tendermint machine to a GossipEngine
This commit is contained in:
@@ -8,7 +8,7 @@ use async_trait::async_trait;
|
||||
|
||||
use log::warn;
|
||||
|
||||
use tokio::sync::RwLock as AsyncRwLock;
|
||||
use tokio::{sync::RwLock as AsyncRwLock, task::yield_now};
|
||||
|
||||
use sp_core::{Encode, Decode, sr25519::Signature};
|
||||
use sp_inherents::{InherentData, InherentDataProvider, CreateInherentDataProviders};
|
||||
@@ -24,6 +24,9 @@ use sc_consensus::{ForkChoiceStrategy, BlockImportParams, import_queue::Incoming
|
||||
|
||||
use sc_service::ImportQueue;
|
||||
use sc_client_api::{BlockBackend, Finalizer};
|
||||
use sc_network_gossip::GossipEngine;
|
||||
|
||||
use substrate_prometheus_endpoint::Registry;
|
||||
|
||||
use tendermint_machine::{
|
||||
ext::{BlockError, BlockNumber, Commit, Network},
|
||||
@@ -35,6 +38,7 @@ use crate::{
|
||||
types::TendermintValidator,
|
||||
validators::TendermintValidators,
|
||||
import_queue::{ImportFuture, TendermintImportQueue},
|
||||
gossip::TendermintGossip,
|
||||
Announce,
|
||||
};
|
||||
|
||||
@@ -43,6 +47,7 @@ pub(crate) struct TendermintImport<T: TendermintValidator> {
|
||||
|
||||
validators: Arc<TendermintValidators<T>>,
|
||||
|
||||
number: Arc<RwLock<u64>>,
|
||||
importing_block: Arc<RwLock<Option<<T::Block as Block>::Hash>>>,
|
||||
pub(crate) machine: Arc<RwLock<Option<TendermintHandle<Self>>>>,
|
||||
|
||||
@@ -57,7 +62,7 @@ pub(crate) struct TendermintImport<T: TendermintValidator> {
|
||||
|
||||
pub struct TendermintAuthority<T: TendermintValidator>(pub(crate) TendermintImport<T>);
|
||||
impl<T: TendermintValidator> TendermintAuthority<T> {
|
||||
pub async fn validate(mut self) {
|
||||
pub async fn validate(mut self, network: T::Network, registry: Option<&Registry>) {
|
||||
let info = self.0.client.info();
|
||||
|
||||
// Header::Number: TryInto<u64> doesn't implement Debug and can't be unwrapped
|
||||
@@ -84,13 +89,55 @@ impl<T: TendermintValidator> TendermintAuthority<T> {
|
||||
.get_proposal(&self.0.client.header(BlockId::Hash(info.best_hash)).unwrap().unwrap())
|
||||
.await;
|
||||
|
||||
*self.0.machine.write().unwrap() = Some(TendermintMachine::new(
|
||||
*self.0.number.write().unwrap() = last_number.0 + 1;
|
||||
let mut gossip = GossipEngine::new(
|
||||
network,
|
||||
"tendermint",
|
||||
Arc::new(TendermintGossip::new(self.0.number.clone(), self.0.validators.clone())),
|
||||
registry,
|
||||
);
|
||||
|
||||
let handle = TendermintMachine::new(
|
||||
self.0.clone(),
|
||||
// TODO
|
||||
0, // ValidatorId
|
||||
(last_number, last_time),
|
||||
proposal,
|
||||
));
|
||||
);
|
||||
|
||||
let mut last_number = last_number.0 + 1;
|
||||
let mut recv = gossip
|
||||
.messages_for(TendermintGossip::<TendermintValidators<T>>::topic::<T::Block>(last_number));
|
||||
loop {
|
||||
match recv.try_next() {
|
||||
Ok(Some(msg)) => handle
|
||||
.messages
|
||||
.send(match SignedMessage::decode(&mut msg.message.as_ref()) {
|
||||
Ok(msg) => msg,
|
||||
Err(e) => {
|
||||
warn!("couldn't decode valid message: {}", e);
|
||||
continue;
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap(),
|
||||
Ok(None) => break,
|
||||
// No messages available
|
||||
Err(_) => {
|
||||
// Check if we the block updated and should be listening on a different topic
|
||||
let curr = *self.0.number.read().unwrap();
|
||||
if last_number != curr {
|
||||
last_number = curr;
|
||||
// TODO: Will this return existing messages on the new height? Or will those have been
|
||||
// ignored and are now gone?
|
||||
recv = gossip.messages_for(TendermintGossip::<TendermintValidators<T>>::topic::<
|
||||
T::Block,
|
||||
>(last_number));
|
||||
}
|
||||
yield_now().await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +148,7 @@ impl<T: TendermintValidator> Clone for TendermintImport<T> {
|
||||
|
||||
validators: self.validators.clone(),
|
||||
|
||||
number: self.number.clone(),
|
||||
importing_block: self.importing_block.clone(),
|
||||
machine: self.machine.clone(),
|
||||
|
||||
@@ -126,6 +174,7 @@ impl<T: TendermintValidator> TendermintImport<T> {
|
||||
|
||||
validators: Arc::new(TendermintValidators::new(client.clone())),
|
||||
|
||||
number: Arc::new(RwLock::new(0)),
|
||||
importing_block: Arc::new(RwLock::new(None)),
|
||||
machine: Arc::new(RwLock::new(None)),
|
||||
|
||||
@@ -388,6 +437,7 @@ impl<T: TendermintValidator> Network for TendermintImport<T> {
|
||||
.finalize_block(BlockId::Hash(hash), Some(justification), true)
|
||||
.map_err(|_| Error::InvalidJustification)
|
||||
.unwrap();
|
||||
*self.number.write().unwrap() += 1;
|
||||
self.announce.announce(hash);
|
||||
|
||||
self.get_proposal(block.header()).await
|
||||
|
||||
Reference in New Issue
Block a user