use std::{ sync::{Arc, RwLock}, time::{UNIX_EPOCH, SystemTime, Duration}, }; use async_trait::async_trait; use log::warn; use tokio::task::yield_now; use sp_core::{Encode, Decode}; use sp_keystore::CryptoStore; use sp_runtime::{ traits::{Header, Block}, Digest, }; use sp_blockchain::HeaderBackend; use sp_api::BlockId; use sp_consensus::{Error, BlockOrigin, Proposer, Environment}; use sc_consensus::import_queue::IncomingBlock; use sc_service::ImportQueue; use sc_client_api::{BlockBackend, Finalizer}; use sc_network::NetworkBlock; use sc_network_gossip::GossipEngine; use substrate_prometheus_endpoint::Registry; use tendermint_machine::{ ext::{BlockError, BlockNumber, Commit, SignatureScheme, Network}, SignedMessage, TendermintMachine, }; use crate::{ CONSENSUS_ID, PROTOCOL_NAME, TendermintValidator, validators::{TendermintSigner, TendermintValidators}, tendermint::TendermintImport, }; mod gossip; use gossip::TendermintGossip; mod import_future; use import_future::ImportFuture; // Data for an active validator // This is distinct as even when we aren't an authority, we still create stubbed Authority objects // as it's only Authority which implements tendermint_machine::ext::Network. Network has // verify_commit provided, and even non-authorities have to verify commits struct ActiveAuthority { signer: TendermintSigner, // Block whose gossip is being tracked number: Arc>, // Outgoing message queue, placed here as the GossipEngine itself can't be gossip_queue: Arc< RwLock< Vec as SignatureScheme>::Signature>>, >, >, // Block producer env: T::Environment, announce: T::Network, } /// Tendermint Authority. Participates in the block proposal and voting process. pub struct TendermintAuthority { import: TendermintImport, active: Option>, } impl TendermintAuthority { /// Create a new TendermintAuthority. pub fn new(import: TendermintImport) -> Self { Self { import, active: None } } fn get_last(&self) -> (::Hash, (BlockNumber, u64)) { let info = self.import.client.info(); // TODO: Genesis start time + BLOCK_TIME let mut fake_genesis = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); // Round up to the nearest 5s increment fake_genesis += 5 - (fake_genesis % 5); ( info.finalized_hash, ( // Header::Number: TryInto doesn't implement Debug and can't be unwrapped match info.finalized_number.try_into() { Ok(best) => BlockNumber(best), Err(_) => panic!("BlockNumber exceeded u64"), }, // Get the last time by grabbing the last block's justification and reading the time from // that Commit::>::decode( &mut self .import .client .justifications(&BlockId::Hash(info.finalized_hash)) .unwrap() .map(|justifications| justifications.get(CONSENSUS_ID).cloned().unwrap()) .unwrap_or_default() .as_ref(), ) .map(|commit| commit.end_time) .unwrap_or(fake_genesis), ), ) } pub(crate) async fn get_proposal(&mut self, header: &::Header) -> T::Block { let parent = *header.parent_hash(); let proposer = self .active .as_mut() .unwrap() .env .init(header) .await .expect("Failed to create a proposer for the new block"); proposer .propose( self.import.inherent_data(parent).await, Digest::default(), // TODO: Production time, size limit Duration::from_secs(1), None, ) .await .expect("Failed to crate a new block proposal") .block } /// Act as a network authority, proposing and voting on blocks. This should be spawned on a task /// as it will not return until the P2P stack shuts down. pub async fn authority( mut self, keys: Arc, providers: T::CIDP, env: T::Environment, network: T::Network, registry: Option<&Registry>, ) { let (best_hash, last) = self.get_last(); let mut last_number = last.0 .0 + 1; // Shared references between us and the Tendermint machine (and its actions via its Network // trait) let number = Arc::new(RwLock::new(last_number)); let gossip_queue = Arc::new(RwLock::new(vec![])); // Create the gossip network let mut gossip = GossipEngine::new( network.clone(), PROTOCOL_NAME, Arc::new(TendermintGossip::new(number.clone(), self.import.validators.clone())), registry, ); // Create the Tendermint machine let handle = { // Set this struct as active *self.import.providers.write().await = Some(providers); self.active = Some(ActiveAuthority { signer: TendermintSigner(keys, self.import.validators.clone()), number: number.clone(), gossip_queue: gossip_queue.clone(), env, announce: network, }); let proposal = self .get_proposal(&self.import.client.header(BlockId::Hash(best_hash)).unwrap().unwrap()) .await; // We no longer need self, so let TendermintMachine become its owner TendermintMachine::new(self, last, proposal) }; // Start receiving messages about the Tendermint process for this block let mut recv = gossip.messages_for(TendermintGossip::::topic(last_number)); 'outer: loop { // Send out any queued messages let mut queue = gossip_queue.write().unwrap().drain(..).collect::>(); for msg in queue.drain(..) { gossip.gossip_message(TendermintGossip::::topic(msg.number().0), msg.encode(), false); } // Handle any received messages // This inner loop enables handling all pending messages before acquiring the out-queue lock // again // TODO: Move to a select model. The disadvantage of this is we'll more frequently acquire // the above lock, despite lack of reason to do so let _ = futures::poll!(&mut gossip); 'inner: loop { match recv.try_next() { Ok(Some(msg)) => handle .messages .send(match SignedMessage::decode(&mut msg.message.as_ref()) { Ok(msg) => msg, Err(e) => { warn!(target: "tendermint", "Couldn't decode valid message: {}", e); continue; } }) .await .unwrap(), // Ok(None) IS NOT when there aren't messages available. It's when the channel is closed // If we're no longer receiving messages from the network, it must no longer be running // We should no longer be accordingly Ok(None) => break 'outer, // No messages available Err(_) => { // Check if we the block updated and should be listening on a different topic let curr = *number.read().unwrap(); if last_number != curr { last_number = curr; // TODO: Will this return existing messages on the new height? Or will those have // been ignored and are now gone? recv = gossip.messages_for(TendermintGossip::::topic(last_number)); } // If there are no messages available, yield to not hog the thread, then return to the // outer loop yield_now().await; break 'inner; } } } } } } #[async_trait] impl Network for TendermintAuthority { type ValidatorId = u16; type SignatureScheme = TendermintValidators; type Weights = TendermintValidators; type Block = T::Block; const BLOCK_TIME: u32 = T::BLOCK_TIME_IN_SECONDS; fn signer(&self) -> TendermintSigner { self.active.as_ref().unwrap().signer.clone() } fn signature_scheme(&self) -> TendermintValidators { self.import.validators.clone() } fn weights(&self) -> TendermintValidators { self.import.validators.clone() } async fn broadcast( &mut self, msg: SignedMessage as SignatureScheme>::Signature>, ) { self.active.as_mut().unwrap().gossip_queue.write().unwrap().push(msg); } async fn slash(&mut self, _validator: u16) { todo!() } // The Tendermint machine will call add_block for any block which is committed to, regardless of // validity. To determine validity, it expects a validate function, which Substrate doesn't // directly offer, and an add function. In order to comply with Serai's modified view of inherent // transactions, validate MUST check inherents, yet add_block must not. // // In order to acquire a validate function, any block proposed by a legitimate proposer is // imported. This performs full validation and makes the block available as a tip. While this // would be incredibly unsafe thanks to the unchecked inherents, it's defined as a tip with less // work, despite being a child of some parent. This means it won't be moved to nor operated on by // the node. // // When Tendermint completes, the block is finalized, setting it as the tip regardless of work. async fn validate(&mut self, block: &T::Block) -> Result<(), BlockError> { let hash = block.hash(); let (header, body) = block.clone().deconstruct(); let parent = *header.parent_hash(); let number = *header.number(); let mut queue_write = self.import.queue.write().await; *self.import.importing_block.write().unwrap() = Some(hash); queue_write.as_mut().unwrap().import_blocks( // We do not want this block, which hasn't been confirmed, to be broadcast over the net // Substrate will generate notifications unless it's Genesis, which this isn't, InitialSync, // which changes telemtry behavior, or File, which is... close enough BlockOrigin::File, vec![IncomingBlock { hash, header: Some(header), body: Some(body), indexed_body: None, justifications: None, origin: None, allow_missing_state: false, skip_execution: false, // TODO: Only set to true if block was rejected due to its inherents import_existing: true, state: None, }], ); ImportFuture::new(hash, queue_write.as_mut().unwrap()).await?; // Sanity checks that a child block can have less work than its parent { let info = self.import.client.info(); assert_eq!(info.best_hash, parent); assert_eq!(info.finalized_hash, parent); assert_eq!(info.best_number, number - 1u8.into()); assert_eq!(info.finalized_number, number - 1u8.into()); } Ok(()) } async fn add_block( &mut self, block: T::Block, commit: Commit>, ) -> T::Block { let hash = block.hash(); let justification = (CONSENSUS_ID, commit.encode()); debug_assert!(self.import.verify_justification(hash, &justification).is_ok()); self .import .client .finalize_block(BlockId::Hash(hash), Some(justification), true) .map_err(|_| Error::InvalidJustification) .unwrap(); *self.active.as_mut().unwrap().number.write().unwrap() += 1; self.active.as_ref().unwrap().announce.announce_block(hash, None); self.get_proposal(block.header()).await } }