Add a test to the coordinator for running a Tributary

Impls a LocalP2p for testing.

Moves rebroadcasting into Tendermint, since it's what knows if a message is
fully valid + original.

Removes TributarySpec::validators() HashMap, as its non-determinism caused
different instances to have different round robin schedules. It was already
prior moved to a Vec for this issue, so I'm unsure why this remnant existed.

Also renames the GH no-std workflow from the prior commit.
This commit is contained in:
Luke Parker
2023-04-22 10:49:52 -04:00
parent 1e448dec21
commit 8c74576cf0
13 changed files with 259 additions and 69 deletions

View File

@@ -2,7 +2,6 @@ use core::fmt::Debug;
use std::{
sync::{Arc, RwLock},
io,
collections::HashMap,
};
use async_trait::async_trait;
@@ -96,16 +95,16 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
genesis: [u8; 32],
start_time: u64,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
validators: HashMap<<Ristretto as Ciphersuite>::G, u64>,
validators: Vec<(<Ristretto as Ciphersuite>::G, u64)>,
p2p: P,
) -> Option<Self> {
let validators_vec = validators.keys().cloned().collect::<Vec<_>>();
let validators_vec = validators.iter().map(|validator| validator.0).collect::<Vec<_>>();
let signer = Arc::new(Signer::new(genesis, key));
let validators = Arc::new(Validators::new(genesis, validators)?);
let mut blockchain = Blockchain::new(db, genesis, &validators_vec);
let block_number = blockchain.block_number();
let block_number = BlockNumber(blockchain.block_number().into());
let start_time = if let Some(commit) = blockchain.commit(&blockchain.tip()) {
Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time
@@ -117,8 +116,6 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };
// The genesis block is 0, so we're working on block #1
let block_number = BlockNumber((block_number + 1).into());
let TendermintHandle { synced_block, messages, machine } =
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
tokio::task::spawn(machine.run());
@@ -129,6 +126,9 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
pub fn genesis(&self) -> [u8; 32] {
self.network.blockchain.read().unwrap().genesis()
}
pub fn block_number(&self) -> u32 {
self.network.blockchain.read().unwrap().block_number()
}
pub fn tip(&self) -> [u8; 32] {
self.network.blockchain.read().unwrap().tip()
}
@@ -184,36 +184,31 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
}
// Return true if the message should be rebroadcasted.
pub async fn handle_message(&mut self, msg: Vec<u8>) -> bool {
match msg[0] {
TRANSACTION_MESSAGE => {
pub async fn handle_message(&mut self, msg: &[u8]) -> bool {
match msg.first() {
Some(&TRANSACTION_MESSAGE) => {
let Ok(tx) = T::read::<&[u8]>(&mut &msg[1 ..]) else {
log::error!("received invalid transaction message");
return false;
};
// TODO: Sync mempools with fellow peers
// Can we just rebroadcast transactions not included for at least two blocks?
self.network.blockchain.write().unwrap().add_transaction(false, tx)
let res = self.network.blockchain.write().unwrap().add_transaction(false, tx);
log::debug!("received transaction message. valid new transaction: {res}");
res
}
TENDERMINT_MESSAGE => {
Some(&TENDERMINT_MESSAGE) => {
let Ok(msg) = SignedMessageFor::<TendermintNetwork<D, T, P>>::decode::<&[u8]>(
&mut &msg[1 ..]
) else {
log::error!("received invalid tendermint message");
return false;
};
// If this message isn't to form consensus on the next block, ignore it
if msg.block().0 != (self.network.blockchain.read().unwrap().block_number() + 1).into() {
return false;
}
if !msg.verify_signature(&self.network.validators) {
return false;
}
self.messages.send(msg).await.unwrap();
true
false
}
_ => false,

View File

@@ -125,7 +125,7 @@ pub(crate) struct Validators {
impl Validators {
pub(crate) fn new(
genesis: [u8; 32],
validators: HashMap<<Ristretto as Ciphersuite>::G, u64>,
validators: Vec<(<Ristretto as Ciphersuite>::G, u64)>,
) -> Option<Validators> {
let mut total_weight = 0;
let mut weights = HashMap::new();

View File

@@ -6,8 +6,6 @@ use std::{
collections::VecDeque,
};
use log::debug;
use parity_scale_codec::{Encode, Decode};
use futures::{
@@ -109,6 +107,7 @@ impl<V: ValidatorId, B: Block, S: Signature> SignedMessage<V, B, S> {
enum TendermintError<V: ValidatorId> {
Malicious(V),
Temporal,
AlreadyHandled,
}
// Type aliases to abstract over generic hell
@@ -236,7 +235,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
async fn slash(&mut self, validator: N::ValidatorId) {
if !self.block.slashes.contains(&validator) {
debug!(target: "tendermint", "Slashing validator {:?}", validator);
log::info!(target: "tendermint", "Slashing validator {:?}", validator);
self.block.slashes.insert(validator);
self.network.slash(validator).await;
}
@@ -307,7 +306,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
let mut queue_future =
if self.queue.is_empty() { Fuse::terminated() } else { future::ready(()).fuse() };
if let Some((broadcast, msg)) = futures::select_biased! {
if let Some((our_message, msg, mut sig)) = futures::select_biased! {
// Handle a new block occuring externally (an external sync loop)
// Has the highest priority as it makes all other futures here irrelevant
msg = self.synced_block_recv.next() => {
@@ -332,7 +331,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
// Handle our messages
_ = queue_future => {
Some((true, self.queue.pop_front().unwrap()))
Some((true, self.queue.pop_front().unwrap(), None))
},
// Handle any timeouts
@@ -346,7 +345,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
match step {
Step::Propose => {
// Slash the validator for not proposing when they should've
debug!(target: "tendermint", "Validator didn't propose when they should have");
log::debug!(target: "tendermint", "Validator didn't propose when they should have");
self.slash(
self.weights.proposer(self.block.number, self.block.round().number)
).await;
@@ -368,19 +367,27 @@ impl<N: Network + 'static> TendermintMachine<N> {
if !msg.verify_signature(&self.validators) {
continue;
}
Some((false, msg.msg))
Some((false, msg.msg, Some(msg.sig)))
} else {
break;
}
}
} {
let res = self.message(msg.clone()).await;
if res.is_err() && broadcast {
panic!("honest node had invalid behavior");
if res.is_err() && our_message {
panic!("honest node (ourselves) had invalid behavior");
}
match res {
Ok(None) => (),
Ok(None) => {
if let Some(sig) = sig.take() {
// If it's our own message, it shouldn't already be signed
assert!(!our_message);
// Re-broadcast this since it's an original consensus message
self.network.broadcast(SignedMessage { msg: msg.clone(), sig }).await;
}
}
Ok(Some(block)) => {
let mut validators = vec![];
let mut sigs = vec![];
@@ -407,9 +414,11 @@ impl<N: Network + 'static> TendermintMachine<N> {
}
Err(TendermintError::Malicious(validator)) => self.slash(validator).await,
Err(TendermintError::Temporal) => (),
Err(TendermintError::AlreadyHandled) => (),
}
if broadcast {
if our_message {
assert!(sig.is_none());
let sig = self.signer.sign(&msg.encode()).await;
self.network.broadcast(SignedMessage { msg, sig }).await;
}
@@ -433,7 +442,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
// which forces us to calculate every end time
if let Some(end_time) = self.block.end_time.get(&round) {
if !self.validators.verify(sender, &commit_msg(end_time.canonical(), id.as_ref()), sig) {
debug!(target: "tendermint", "Validator produced an invalid commit signature");
log::warn!(target: "tendermint", "Validator produced an invalid commit signature");
Err(TendermintError::Malicious(sender))?;
}
return Ok(true);
@@ -457,13 +466,14 @@ impl<N: Network + 'static> TendermintMachine<N> {
if matches!(msg.data, Data::Proposal(..)) &&
(msg.sender != self.weights.proposer(msg.block, msg.round))
{
debug!(target: "tendermint", "Validator who wasn't the proposer proposed");
log::warn!(target: "tendermint", "Validator who wasn't the proposer proposed");
Err(TendermintError::Malicious(msg.sender))?;
};
if !self.block.log.log(msg.clone())? {
return Ok(None);
return Err(TendermintError::AlreadyHandled);
}
log::debug!(target: "tendermint", "received new tendermint message");
// All functions, except for the finalizer and the jump, are locked to the current round
@@ -481,6 +491,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
msg.round,
Data::Precommit(Some((block.id(), self.signer.sign(&[]).await))),
) {
log::debug!(target: "tendermint", "block {} has consensus", msg.block.0);
return Ok(Some(block.clone()));
}
}
@@ -573,7 +584,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
Ok(_) => (true, Ok(None)),
Err(BlockError::Temporal) => (false, Ok(None)),
Err(BlockError::Fatal) => (false, {
debug!(target: "tendermint", "Validator proposed a fatally invalid block");
log::warn!(target: "tendermint", "Validator proposed a fatally invalid block");
Err(TendermintError::Malicious(proposer))
}),
};
@@ -590,7 +601,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
if let Some(vr) = vr {
// Malformed message
if vr.0 >= self.block.round().number.0 {
debug!(target: "tendermint", "Validator claimed a round from the future was valid");
log::warn!(target: "tendermint", "Validator claimed a round from the future was valid");
Err(TendermintError::Malicious(msg.sender))?;
}
@@ -629,7 +640,7 @@ impl<N: Network + 'static> TendermintMachine<N> {
Ok(_) => (),
Err(BlockError::Temporal) => (),
Err(BlockError::Fatal) => {
debug!(target: "tendermint", "Validator proposed a fatally invalid block");
log::warn!(target: "tendermint", "Validator proposed a fatally invalid block");
Err(TendermintError::Malicious(proposer))?
}
};