Time code

This commit is contained in:
Luke Parker
2022-10-12 21:36:40 -04:00
parent b56c88468e
commit d081934725

View File

@@ -1,5 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use tokio::{task::{JoinHandle, spawn}, sync::mpsc};
type ValidatorId = u16; type ValidatorId = u16;
const VALIDATORS: ValidatorId = 5; const VALIDATORS: ValidatorId = 5;
@@ -68,11 +70,7 @@ fn proposer(height: u32, round: u32) -> ValidatorId {
ValidatorId::try_from((height + round) % u32::try_from(VALIDATORS).unwrap()).unwrap() ValidatorId::try_from((height + round) % u32::try_from(VALIDATORS).unwrap()).unwrap()
} }
fn broadcast(msg: Message) { #[derive(Debug)]
todo!();
}
#[derive(Clone, PartialEq, Eq, Debug)]
struct TendermintMachine { struct TendermintMachine {
proposer: ValidatorId, proposer: ValidatorId,
personal_proposal: Option<Block>, personal_proposal: Option<Block>,
@@ -86,13 +84,23 @@ struct TendermintMachine {
step: Step, step: Step,
locked: Option<(u32, Block)>, locked: Option<(u32, Block)>,
valid: Option<(u32, Block)>, valid: Option<(u32, Block)>,
timeouts: Arc<RwLock<HashMap<Step, Instant>>>, // TODO: Remove Arc RwLock
}
#[derive(Debug)]
struct TendermintHandle {
block: Arc<RwLock<Option<Block>>>,
messages: mpsc::Sender<Message>,
broadcast: mpsc::Receiver<Message>,
handle: JoinHandle<()>,
} }
impl TendermintMachine { impl TendermintMachine {
fn broadcast(&self, data: Data) -> Option<Block> { fn broadcast(&self, data: Data) -> Option<Block> {
let msg = Message { sender: self.proposer, height: self.height, round: self.round, data }; let msg = Message { sender: self.proposer, height: self.height, round: self.round, data };
let res = self.message(msg).unwrap(); let res = self.message(msg).unwrap();
broadcast(msg); self.broadcast.send(msg).unwrap();
res res
} }
@@ -114,7 +122,7 @@ impl TendermintMachine {
}; };
debug_assert!(self.broadcast(Data::Proposal(round, block)).is_none()); debug_assert!(self.broadcast(Data::Proposal(round, block)).is_none());
} else { } else {
// TODO schedule timeout propose self.timeouts.write().unwrap().insert(Step::Precommit, self.timeout(Step::Precommit));
} }
} }
@@ -126,7 +134,7 @@ impl TendermintMachine {
} }
/// Called whenever a new height occurs /// Called whenever a new height occurs
pub fn propose(&mut self, block: Block) { fn propose(&mut self, block: Block) {
self.personal_proposal = Some(block); self.personal_proposal = Some(block);
self.round_propose(); self.round_propose();
} }
@@ -147,21 +155,63 @@ impl TendermintMachine {
} }
// 10 // 10
pub fn new(proposer: ValidatorId, height: u32) -> TendermintMachine { pub fn new(proposer: ValidatorId, height: u32) -> TendermintHandle {
TendermintMachine { let block = Arc::new(RwLock::new(None));
proposer, let (msg_send, mut msg_recv) = mpsc::channel(100); // Backlog to accept. Currently arbitrary
personal_proposal: None, let (broadcast_send, broadcast_recv) = mpsc::channel(5);
TendermintHandle {
block: block.clone(),
messages: msg_send,
broadcast: broadcast_recv,
handle: tokio::spawn(async {
let machine = TendermintMachine {
proposer,
personal_proposal: None,
height, height,
log_map: HashMap::new(), log_map: HashMap::new(),
precommitted: HashMap::new(), precommitted: HashMap::new(),
locked: None, locked: None,
valid: None, valid: None,
round: 0, round: 0,
step: Step::Propose step: Step::Propose
};
loop {
if self.personal_proposal.is_none() {
let block = block.lock().unwrap();
if block.is_some() {
self.personal_proposal = Some(block.take());
} else {
tokio::yield_now().await;
continue;
}
}
let now = Instant::now();
let (t1, t2, t3) = {
let timeouts = self.timeouts.read().unwrap();
let ready = |step| timeouts.get(step).unwrap_or(now) < now;
(ready(Step::Propose), ready(Step::Prevote), ready(Step::Precommit))
};
if t1 { // Propose timeout
}
if t2 { // Prevote timeout
}
if t3 { // Precommit timeout
}
match recv.try_recv() {
Ok(msg) => machine.message(msg),
Err(TryRecvError::Empty) => tokio::yield_now().await,
Err(TryRecvError::Disconnected) => break
}
}
})
} }
} }
@@ -269,7 +319,7 @@ impl TendermintMachine {
None None
} }
pub fn message(&mut self, msg: Message) -> Result<Option<Block>, TendermintError> { fn message(&mut self, msg: Message) -> Result<Option<Block>, TendermintError> {
if msg.height != self.height { if msg.height != self.height {
Err(TendermintError::Temporal)?; Err(TendermintError::Temporal)?;
} }
@@ -332,7 +382,7 @@ impl TendermintMachine {
let (participation, weight) = self.message_instances(self.round, Data::Prevote(None)); let (participation, weight) = self.message_instances(self.round, Data::Prevote(None));
// 34-35 // 34-35
if (participation > (((VALIDATORS / 3) * 2) + 1).into()) && first { if (participation > (((VALIDATORS / 3) * 2) + 1).into()) && first {
// TODO: Schedule timeout prevote self.timeouts.write().unwrap().insert(Step::Prevote, self.timeout(Step::Prevote));
} }
// 44-46 // 44-46
@@ -344,7 +394,7 @@ impl TendermintMachine {
// 47-48 // 47-48
if self.has_participation(self.round, Step::Precommit) && first { if self.has_participation(self.round, Step::Precommit) && first {
// TODO: Schedule timeout precommit self.timeouts.write().unwrap().insert(Step::Precommit, self.timeout(Step::Precommit));
} }
Ok(None) Ok(None)