Localize Tributary HashMaps, offering flexibility and removing contention

This commit is contained in:
Luke Parker
2023-09-25 19:28:53 -04:00
parent 7120bddc6f
commit 9f3840d1cf
3 changed files with 189 additions and 130 deletions

View File

@@ -28,13 +28,11 @@ use message_queue::{Service, client::MessageQueue};
use futures::stream::StreamExt; use futures::stream::StreamExt;
use tokio::{ use tokio::{
sync::{RwLock, mpsc}, sync::{RwLock, mpsc, broadcast},
time::sleep, time::sleep,
}; };
use ::tributary::{ use ::tributary::{ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary};
ReadWrite, ProvidedError, TransactionKind, TransactionTrait, Block, Tributary, TributaryReader,
};
mod tributary; mod tributary;
use crate::tributary::{ use crate::tributary::{
@@ -57,22 +55,21 @@ mod substrate;
#[cfg(test)] #[cfg(test)]
pub mod tests; pub mod tests;
#[derive(Clone)]
pub struct ActiveTributary<D: Db, P: P2p> { pub struct ActiveTributary<D: Db, P: P2p> {
pub spec: TributarySpec, pub spec: TributarySpec,
pub tributary: Arc<RwLock<Tributary<D, Transaction, P>>>, pub tributary: Arc<Tributary<D, Transaction, P>>,
} }
type Tributaries<D, P> = HashMap<[u8; 32], ActiveTributary<D, P>>;
// Adds a tributary into the specified HashMap // Adds a tributary into the specified HashMap
async fn add_tributary<D: Db, Pro: Processors, P: P2p>( async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
db: D, db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
processors: &Pro, processors: &Pro,
p2p: P, p2p: P,
tributaries: &mut Tributaries<D, P>, tributaries: &broadcast::Sender<ActiveTributary<D, P>>,
spec: TributarySpec, spec: TributarySpec,
) -> TributaryReader<D, Transaction> { ) {
log::info!("adding tributary {:?}", spec.set()); log::info!("adding tributary {:?}", spec.set());
let tributary = Tributary::<_, Transaction, _>::new( let tributary = Tributary::<_, Transaction, _>::new(
@@ -110,14 +107,10 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
) )
.await; .await;
let reader = tributary.reader(); tributaries
.send(ActiveTributary { spec, tributary: Arc::new(tributary) })
tributaries.insert( .map_err(|_| "all ActiveTributary recipients closed")
tributary.genesis(), .unwrap();
ActiveTributary { spec, tributary: Arc::new(RwLock::new(tributary)) },
);
reader
} }
pub async fn scan_substrate<D: Db, Pro: Processors>( pub async fn scan_substrate<D: Db, Pro: Processors>(
@@ -125,7 +118,7 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
processors: Pro, processors: Pro,
serai: Arc<Serai>, serai: Arc<Serai>,
new_tributary_channel: mpsc::UnboundedSender<TributarySpec>, new_tributary_spec: mpsc::UnboundedSender<TributarySpec>,
) { ) {
log::info!("scanning substrate"); log::info!("scanning substrate");
@@ -185,7 +178,7 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
// Add it to the queue // Add it to the queue
// If we reboot before this is read from the queue, the fact it was saved to the database // If we reboot before this is read from the queue, the fact it was saved to the database
// means it'll be handled on reboot // means it'll be handled on reboot
new_tributary_channel.send(spec).unwrap(); new_tributary_spec.send(spec).unwrap();
}, },
&processors, &processors,
&serai, &serai,
@@ -202,7 +195,7 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
} }
} }
#[allow(clippy::too_many_arguments, clippy::type_complexity)] #[allow(clippy::type_complexity)]
pub async fn scan_tributaries< pub async fn scan_tributaries<
D: Db, D: Db,
Pro: Processors, Pro: Processors,
@@ -216,38 +209,25 @@ pub async fn scan_tributaries<
p2p: P, p2p: P,
processors: Pro, processors: Pro,
serai: Arc<Serai>, serai: Arc<Serai>,
tributaries: Arc<RwLock<Tributaries<D, P>>>, mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
mut new_tributary_channel: mpsc::UnboundedReceiver<TributarySpec>,
) { ) {
log::info!("scanning tributaries"); log::info!("scanning tributaries");
let mut tributary_readers = vec![];
for ActiveTributary { spec, tributary } in tributaries.read().await.values() {
tributary_readers.push((spec.clone(), tributary.read().await.reader()));
}
// Handle new Tributary blocks // Handle new Tributary blocks
let mut tributary_readers = vec![];
let mut tributary_db = tributary::TributaryDb::new(raw_db.clone()); let mut tributary_db = tributary::TributaryDb::new(raw_db.clone());
loop { loop {
// The following handle_new_blocks function may take an arbitrary amount of time while let Ok(ActiveTributary { spec, tributary }) = {
// Accordingly, it may take a long time to acquire a write lock on the tributaries table match new_tributary.try_recv() {
// By definition of new_tributary_channel, we allow tributaries to be 'added' almost Ok(tributary) => Ok(tributary),
// immediately, meaning the Substrate scanner won't become blocked on this Err(broadcast::error::TryRecvError::Empty) => Err(()),
{ Err(broadcast::error::TryRecvError::Lagged(_)) => {
while let Ok(spec) = new_tributary_channel.try_recv() { panic!("scan_tributaries lagged to handle new_tributary")
let reader = add_tributary(
raw_db.clone(),
key.clone(),
&processors,
p2p.clone(),
// This is a short-lived write acquisition, which is why it should be fine
&mut *tributaries.write().await,
spec.clone(),
)
.await;
tributary_readers.push((spec, reader));
} }
Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"),
}
} {
tributary_readers.push((spec, tributary.reader()));
} }
for (spec, reader) in &tributary_readers { for (spec, reader) in &tributary_readers {
@@ -296,15 +276,24 @@ pub async fn scan_tributaries<
pub async fn heartbeat_tributaries<D: Db, P: P2p>( pub async fn heartbeat_tributaries<D: Db, P: P2p>(
p2p: P, p2p: P,
tributaries: Arc<RwLock<Tributaries<D, P>>>, mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
) { ) {
let ten_blocks_of_time = let ten_blocks_of_time =
Duration::from_secs((10 * Tributary::<D, Transaction, P>::block_time()).into()); Duration::from_secs((10 * Tributary::<D, Transaction, P>::block_time()).into());
loop {
let mut readers = vec![]; let mut readers = vec![];
for tributary in tributaries.read().await.values() { loop {
readers.push(tributary.tributary.read().await.reader()); while let Ok(ActiveTributary { spec, tributary }) = {
match new_tributary.try_recv() {
Ok(tributary) => Ok(tributary),
Err(broadcast::error::TryRecvError::Empty) => Err(()),
Err(broadcast::error::TryRecvError::Lagged(_)) => {
panic!("heartbeat lagged to handle new_tributary")
}
Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"),
}
} {
readers.push(tributary.reader());
} }
for tributary in &readers { for tributary in &readers {
@@ -337,8 +326,27 @@ pub async fn heartbeat_tributaries<D: Db, P: P2p>(
pub async fn handle_p2p<D: Db, P: P2p>( pub async fn handle_p2p<D: Db, P: P2p>(
our_key: <Ristretto as Ciphersuite>::G, our_key: <Ristretto as Ciphersuite>::G,
p2p: P, p2p: P,
tributaries: Arc<RwLock<Tributaries<D, P>>>, mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
) { ) {
// TODO: Merge this into the below loop. We don't need an extra task here
let tributaries = Arc::new(RwLock::new(HashMap::new()));
tokio::spawn({
let tributaries = tributaries.clone();
async move {
loop {
match new_tributary.recv().await {
Ok(tributary) => {
tributaries.write().await.insert(tributary.spec.genesis(), tributary);
}
Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("handle_p2p lagged to handle new_tributary")
}
Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"),
}
}
}
});
loop { loop {
let mut msg = p2p.receive().await; let mut msg = p2p.receive().await;
// Spawn a dedicated task to handle this message, ensuring any singularly latent message // Spawn a dedicated task to handle this message, ensuring any singularly latent message
@@ -359,7 +367,7 @@ pub async fn handle_p2p<D: Db, P: P2p>(
}; };
log::trace!("handling message for tributary {:?}", tributary.spec.set()); log::trace!("handling message for tributary {:?}", tributary.spec.set());
if tributary.tributary.read().await.handle_message(&msg.msg).await { if tributary.tributary.handle_message(&msg.msg).await {
P2p::broadcast(&p2p, msg.kind, msg.msg).await; P2p::broadcast(&p2p, msg.kind, msg.msg).await;
} }
} }
@@ -378,7 +386,7 @@ pub async fn handle_p2p<D: Db, P: P2p>(
log::debug!("received heartbeat message for unknown network"); log::debug!("received heartbeat message for unknown network");
return; return;
}; };
let tributary_read = tributary.tributary.read().await; let tributary_read = &tributary.tributary;
/* /*
// Have sqrt(n) nodes reply with the blocks // Have sqrt(n) nodes reply with the blocks
@@ -417,7 +425,6 @@ pub async fn handle_p2p<D: Db, P: P2p>(
log::debug!("received heartbeat and selected to respond"); log::debug!("received heartbeat and selected to respond");
let reader = tributary_read.reader(); let reader = tributary_read.reader();
drop(tributary_read);
let mut latest = msg.msg[.. 32].try_into().unwrap(); let mut latest = msg.msg[.. 32].try_into().unwrap();
while let Some(next) = reader.block_after(&latest) { while let Some(next) = reader.block_after(&latest) {
@@ -446,7 +453,7 @@ pub async fn handle_p2p<D: Db, P: P2p>(
return; return;
}; };
let res = tributary.tributary.read().await.sync_block(block, msg.msg).await; let res = tributary.tributary.sync_block(block, msg.msg).await;
log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res);
} }
} }
@@ -483,10 +490,29 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>, serai: Arc<Serai>,
mut processors: Pro, mut processors: Pro,
tributaries: Arc<RwLock<Tributaries<D, P>>>, mut new_tributary: broadcast::Receiver<ActiveTributary<D, P>>,
) { ) {
let pub_key = Ristretto::generator() * key.deref(); let pub_key = Ristretto::generator() * key.deref();
// TODO: Merge this into the below loop. We don't need an extra task here
let tributaries = Arc::new(RwLock::new(HashMap::new()));
tokio::spawn({
let tributaries = tributaries.clone();
async move {
loop {
match new_tributary.recv().await {
Ok(tributary) => {
tributaries.write().await.insert(tributary.spec.genesis(), tributary);
}
Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("handle_processors lagged to handle new_tributary")
}
Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"),
}
}
}
});
loop { loop {
// TODO: Dispatch this message to a task dedicated to handling this processor, preventing one // TODO: Dispatch this message to a task dedicated to handling this processor, preventing one
// processor from holding up all the others. This would require a peek method be added to the // processor from holding up all the others. This would require a peek method be added to the
@@ -738,15 +764,13 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
if let Some(mut tx) = tx { if let Some(mut tx) = tx {
log::trace!("processor message effected transaction {}", hex::encode(tx.hash())); log::trace!("processor message effected transaction {}", hex::encode(tx.hash()));
let tributaries = tributaries.read().await; let tributaries = tributaries.read().await;
log::trace!("read global tributaries");
let Some(tributary) = tributaries.get(&genesis) else { let Some(tributary) = tributaries.get(&genesis) else {
// TODO: This can happen since Substrate tells the Processor to generate commitments // TODO: This can happen since Substrate tells the Processor to generate commitments
// at the same time it tells the Tributary to be created // at the same time it tells the Tributary to be created
// There's no guarantee the Tributary will have been created though // There's no guarantee the Tributary will have been created though
panic!("processor is operating on tributary we don't have"); panic!("processor is operating on tributary we don't have");
}; };
let tributary = tributary.tributary.read().await; let tributary = &tributary.tributary;
log::trace!("read specific tributary");
match tx.kind() { match tx.kind() {
TransactionKind::Provided(_) => { TransactionKind::Provided(_) => {
@@ -782,7 +806,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
}; };
tx.sign(&mut OsRng, genesis, &key, nonce); tx.sign(&mut OsRng, genesis, &key, nonce);
publish_signed_transaction(&tributary, tx).await; publish_signed_transaction(tributary, tx).await;
} }
} }
} }
@@ -800,7 +824,11 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
) { ) {
let serai = Arc::new(serai); let serai = Arc::new(serai);
let (new_tributary_channel_send, new_tributary_channel_recv) = mpsc::unbounded_channel(); let (new_tributary_spec_send, mut new_tributary_spec_recv) = mpsc::unbounded_channel();
// Reload active tributaries from the database
for spec in MainDb::new(&mut raw_db).active_tributaries().1 {
new_tributary_spec_send.send(spec).unwrap();
}
// Handle new Substrate blocks // Handle new Substrate blocks
tokio::spawn(scan_substrate( tokio::spawn(scan_substrate(
@@ -808,33 +836,64 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
key.clone(), key.clone(),
processors.clone(), processors.clone(),
serai.clone(), serai.clone(),
new_tributary_channel_send, new_tributary_spec_send,
)); ));
// Handle the Tributaries // Handle the Tributaries
// Arc so this can be shared between the Tributary scanner task and the P2P task // This should be large enough for an entire rotation of all tributaries
// Write locks on this may take a while to acquire // If it's too small, the coordinator fail to boot, which is a decent sanity check
let tributaries = Arc::new(RwLock::new(HashMap::<[u8; 32], ActiveTributary<D, P>>::new())); let (new_tributary, mut new_tributary_listener_1) = broadcast::channel(32);
let new_tributary_listener_2 = new_tributary.subscribe();
let new_tributary_listener_3 = new_tributary.subscribe();
let new_tributary_listener_4 = new_tributary.subscribe();
let new_tributary_listener_5 = new_tributary.subscribe();
// Reload active tributaries from the database // Spawn a task to further add Tributaries as needed
for spec in MainDb::new(&mut raw_db).active_tributaries().1 { tokio::spawn({
let _ = add_tributary( let raw_db = raw_db.clone();
let key = key.clone();
let processors = processors.clone();
let p2p = p2p.clone();
async move {
loop {
let spec = new_tributary_spec_recv.recv().await.unwrap();
add_tributary(
raw_db.clone(), raw_db.clone(),
key.clone(), key.clone(),
&processors, &processors,
p2p.clone(), p2p.clone(),
&mut *tributaries.write().await, &new_tributary,
spec, spec.clone(),
) )
.await; .await;
} }
}
});
// When we reach synchrony on an event requiring signing, send our preprocess for it // When we reach synchrony on an event requiring signing, send our preprocess for it
let recognized_id = { let recognized_id = {
let raw_db = raw_db.clone(); let raw_db = raw_db.clone();
let key = key.clone(); let key = key.clone();
let tributaries = Arc::new(RwLock::new(HashMap::new()));
tokio::spawn({
let tributaries = tributaries.clone(); let tributaries = tributaries.clone();
async move {
loop {
match new_tributary_listener_1.recv().await {
Ok(tributary) => {
tributaries.write().await.insert(tributary.spec.genesis(), tributary);
}
Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("recognized_id lagged to handle new_tributary")
}
Err(broadcast::error::RecvError::Closed) => panic!("new_tributary sender closed"),
}
}
}
});
move |network, genesis, id_type, id, nonce| { move |network, genesis, id_type, id, nonce| {
let raw_db = raw_db.clone(); let raw_db = raw_db.clone();
let key = key.clone(); let key = key.clone();
@@ -876,8 +935,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
let Some(tributary) = tributaries.get(&genesis) else { let Some(tributary) = tributaries.get(&genesis) else {
panic!("tributary we don't have came to consensus on an Batch"); panic!("tributary we don't have came to consensus on an Batch");
}; };
let tributary = tributary.tributary.read().await; publish_signed_transaction(&tributary.tributary, tx).await;
publish_signed_transaction(&tributary, tx).await;
} }
} }
}; };
@@ -892,20 +950,19 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
p2p.clone(), p2p.clone(),
processors.clone(), processors.clone(),
serai.clone(), serai.clone(),
tributaries.clone(), new_tributary_listener_2,
new_tributary_channel_recv,
)); ));
} }
// Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block // Spawn the heartbeat task, which will trigger syncing if there hasn't been a Tributary block
// in a while (presumably because we're behind) // in a while (presumably because we're behind)
tokio::spawn(heartbeat_tributaries(p2p.clone(), tributaries.clone())); tokio::spawn(heartbeat_tributaries(p2p.clone(), new_tributary_listener_3));
// Handle P2P messages // Handle P2P messages
tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, tributaries.clone())); tokio::spawn(handle_p2p(Ristretto::generator() * key.deref(), p2p, new_tributary_listener_4));
// Handle all messages from processors // Handle all messages from processors
handle_processors(raw_db, key, serai, processors, tributaries).await; handle_processors(raw_db, key, serai, processors, new_tributary_listener_5).await;
} }
#[tokio::main] #[tokio::main]

View File

@@ -1,11 +1,11 @@
use core::time::Duration; use core::time::Duration;
use std::{sync::Arc, collections::HashMap}; use std::sync::Arc;
use rand_core::OsRng; use rand_core::OsRng;
use ciphersuite::{Ciphersuite, Ristretto}; use ciphersuite::{Ciphersuite, Ristretto};
use tokio::{sync::RwLock, time::sleep}; use tokio::{sync::broadcast, time::sleep};
use serai_db::MemDb; use serai_db::MemDb;
@@ -27,18 +27,18 @@ async fn handle_p2p_test() {
let mut tributaries = new_tributaries(&keys, &spec).await; let mut tributaries = new_tributaries(&keys, &spec).await;
let mut tributary_senders = vec![];
let mut tributary_arcs = vec![]; let mut tributary_arcs = vec![];
for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() { for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() {
let tributary = Arc::new(RwLock::new(tributary)); let tributary = Arc::new(tributary);
tributary_arcs.push(tributary.clone()); tributary_arcs.push(tributary.clone());
tokio::spawn(handle_p2p( let (new_tributary_send, new_tributary_recv) = broadcast::channel(5);
Ristretto::generator() * *keys[i], tokio::spawn(handle_p2p(Ristretto::generator() * *keys[i], p2p, new_tributary_recv));
p2p, new_tributary_send
Arc::new(RwLock::new(HashMap::from([( .send(ActiveTributary { spec: spec.clone(), tributary })
spec.genesis(), .map_err(|_| "failed to send ActiveTributary")
ActiveTributary { spec: spec.clone(), tributary }, .unwrap();
)]))), tributary_senders.push(new_tributary_send);
));
} }
let tributaries = tributary_arcs; let tributaries = tributary_arcs;
@@ -46,22 +46,22 @@ async fn handle_p2p_test() {
// We don't wait one block of time as we may have missed the chance for this block // We don't wait one block of time as we may have missed the chance for this block
sleep(Duration::from_secs((2 * Tributary::<MemDb, Transaction, LocalP2p>::block_time()).into())) sleep(Duration::from_secs((2 * Tributary::<MemDb, Transaction, LocalP2p>::block_time()).into()))
.await; .await;
let tip = tributaries[0].read().await.tip().await; let tip = tributaries[0].tip().await;
assert!(tip != spec.genesis()); assert!(tip != spec.genesis());
// Sleep one second to make sure this block propagates // Sleep one second to make sure this block propagates
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
// Make sure every tributary has it // Make sure every tributary has it
for tributary in &tributaries { for tributary in &tributaries {
assert!(tributary.read().await.reader().block(&tip).is_some()); assert!(tributary.reader().block(&tip).is_some());
} }
// Then after another block of time, we should have yet another new block // Then after another block of time, we should have yet another new block
sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await; sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await;
let new_tip = tributaries[0].read().await.tip().await; let new_tip = tributaries[0].tip().await;
assert!(new_tip != tip); assert!(new_tip != tip);
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
for tributary in tributaries { for tributary in tributaries {
assert!(tributary.read().await.reader().block(&new_tip).is_some()); assert!(tributary.reader().block(&new_tip).is_some());
} }
} }

View File

@@ -1,14 +1,11 @@
use core::time::Duration; use core::time::Duration;
use std::{ use std::{sync::Arc, collections::HashSet};
sync::Arc,
collections::{HashSet, HashMap},
};
use rand_core::OsRng; use rand_core::OsRng;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto}; use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use tokio::{sync::RwLock, time::sleep}; use tokio::{sync::broadcast, time::sleep};
use serai_db::MemDb; use serai_db::MemDb;
@@ -37,19 +34,20 @@ async fn sync_test() {
let (syncer_p2p, syncer_tributary) = tributaries.pop().unwrap(); let (syncer_p2p, syncer_tributary) = tributaries.pop().unwrap();
// Have the rest form a P2P net // Have the rest form a P2P net
let mut tributary_senders = vec![];
let mut tributary_arcs = vec![]; let mut tributary_arcs = vec![];
let mut p2p_threads = vec![]; let mut p2p_threads = vec![];
for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() { for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() {
let tributary = Arc::new(RwLock::new(tributary)); let tributary = Arc::new(tributary);
tributary_arcs.push(tributary.clone()); tributary_arcs.push(tributary.clone());
let thread = tokio::spawn(handle_p2p( let (new_tributary_send, new_tributary_recv) = broadcast::channel(5);
Ristretto::generator() * *keys[i], let thread =
p2p, tokio::spawn(handle_p2p(Ristretto::generator() * *keys[i], p2p, new_tributary_recv));
Arc::new(RwLock::new(HashMap::from([( new_tributary_send
spec.genesis(), .send(ActiveTributary { spec: spec.clone(), tributary })
ActiveTributary { spec: spec.clone(), tributary }, .map_err(|_| "failed to send ActiveTributary")
)]))), .unwrap();
)); tributary_senders.push(new_tributary_send);
p2p_threads.push(thread); p2p_threads.push(thread);
} }
let tributaries = tributary_arcs; let tributaries = tributary_arcs;
@@ -60,14 +58,14 @@ async fn sync_test() {
// propose by our 'offline' validator // propose by our 'offline' validator
let block_time = u64::from(Tributary::<MemDb, Transaction, LocalP2p>::block_time()); let block_time = u64::from(Tributary::<MemDb, Transaction, LocalP2p>::block_time());
sleep(Duration::from_secs(3 * block_time)).await; sleep(Duration::from_secs(3 * block_time)).await;
let tip = tributaries[0].read().await.tip().await; let tip = tributaries[0].tip().await;
assert!(tip != spec.genesis()); assert!(tip != spec.genesis());
// Sleep one second to make sure this block propagates // Sleep one second to make sure this block propagates
sleep(Duration::from_secs(1)).await; sleep(Duration::from_secs(1)).await;
// Make sure every tributary has it // Make sure every tributary has it
for tributary in &tributaries { for tributary in &tributaries {
assert!(tributary.read().await.reader().block(&tip).is_some()); assert!(tributary.reader().block(&tip).is_some());
} }
// Now that we've confirmed the other tributaries formed a net without issue, drop the syncer's // Now that we've confirmed the other tributaries formed a net without issue, drop the syncer's
@@ -76,31 +74,36 @@ async fn sync_test() {
// Have it join the net // Have it join the net
let syncer_key = Ristretto::generator() * *syncer_key; let syncer_key = Ristretto::generator() * *syncer_key;
let syncer_tributary = Arc::new(RwLock::new(syncer_tributary)); let syncer_tributary = Arc::new(syncer_tributary);
let syncer_tributaries = Arc::new(RwLock::new(HashMap::from([( let (syncer_tributary_send, syncer_tributary_recv) = broadcast::channel(5);
spec.genesis(), tokio::spawn(handle_p2p(syncer_key, syncer_p2p.clone(), syncer_tributary_recv));
ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() }, syncer_tributary_send
)]))); .send(ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() })
tokio::spawn(handle_p2p(syncer_key, syncer_p2p.clone(), syncer_tributaries.clone())); .map_err(|_| "failed to send ActiveTributary to syncer")
.unwrap();
// It shouldn't automatically catch up. If it somehow was, our test would be broken // It shouldn't automatically catch up. If it somehow was, our test would be broken
// Sanity check this // Sanity check this
let tip = tributaries[0].read().await.tip().await; let tip = tributaries[0].tip().await;
sleep(Duration::from_secs(2 * block_time)).await; sleep(Duration::from_secs(2 * block_time)).await;
assert!(tributaries[0].read().await.tip().await != tip); assert!(tributaries[0].tip().await != tip);
assert_eq!(syncer_tributary.read().await.tip().await, spec.genesis()); assert_eq!(syncer_tributary.tip().await, spec.genesis());
// Start the heartbeat protocol // Start the heartbeat protocol
tokio::spawn(heartbeat_tributaries(syncer_p2p, syncer_tributaries)); let (syncer_heartbeat_tributary_send, syncer_heartbeat_tributary_recv) = broadcast::channel(5);
tokio::spawn(heartbeat_tributaries(syncer_p2p, syncer_heartbeat_tributary_recv));
syncer_heartbeat_tributary_send
.send(ActiveTributary { spec: spec.clone(), tributary: syncer_tributary.clone() })
.map_err(|_| "failed to send ActiveTributary to heartbeat")
.unwrap();
// The heartbeat is once every 10 blocks // The heartbeat is once every 10 blocks
sleep(Duration::from_secs(10 * block_time)).await; sleep(Duration::from_secs(10 * block_time)).await;
assert!(syncer_tributary.read().await.tip().await != spec.genesis()); assert!(syncer_tributary.tip().await != spec.genesis());
// Verify it synced to the tip // Verify it synced to the tip
let syncer_tip = { let syncer_tip = {
let tributary = tributaries[0].write().await; let tributary = &tributaries[0];
let syncer_tributary = syncer_tributary.write().await;
let tip = tributary.tip().await; let tip = tributary.tip().await;
let syncer_tip = syncer_tributary.tip().await; let syncer_tip = syncer_tributary.tip().await;
@@ -114,7 +117,7 @@ async fn sync_test() {
sleep(Duration::from_secs(block_time)).await; sleep(Duration::from_secs(block_time)).await;
// Verify it's now keeping up // Verify it's now keeping up
assert!(syncer_tributary.read().await.tip().await != syncer_tip); assert!(syncer_tributary.tip().await != syncer_tip);
// Verify it's now participating in consensus // Verify it's now participating in consensus
// Because only `t` validators are used in a commit, take n - t nodes offline // Because only `t` validators are used in a commit, take n - t nodes offline
@@ -128,7 +131,6 @@ async fn sync_test() {
// wait for a block // wait for a block
sleep(Duration::from_secs(block_time)).await; sleep(Duration::from_secs(block_time)).await;
let syncer_tributary = syncer_tributary.read().await;
if syncer_tributary if syncer_tributary
.reader() .reader()
.parsed_commit(&syncer_tributary.tip().await) .parsed_commit(&syncer_tributary.tip().await)