Only have some nodes respond to latent heartbeats

Also only respond if they're more than 2 blocks behind to minimize redundant
sending of blocks.
This commit is contained in:
Luke Parker
2024-04-17 21:54:10 -04:00
parent dac46c8d7d
commit 6b4df4f2c0
4 changed files with 40 additions and 25 deletions

View File

@@ -3,6 +3,8 @@ use std::sync::Arc;
use rand_core::OsRng;
use ciphersuite::{Ciphersuite, Ristretto};
use tokio::{
sync::{mpsc, broadcast},
time::sleep,
@@ -35,12 +37,17 @@ async fn handle_p2p_test() {
let mut tributary_senders = vec![];
let mut tributary_arcs = vec![];
for (p2p, tributary) in tributaries.drain(..) {
for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() {
let tributary = Arc::new(tributary);
tributary_arcs.push(tributary.clone());
let (new_tributary_send, new_tributary_recv) = broadcast::channel(5);
let (cosign_send, _) = mpsc::unbounded_channel();
tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv));
tokio::spawn(handle_p2p_task(
p2p,
cosign_send,
new_tributary_recv,
<Ristretto as Ciphersuite>::generator() * *keys[i],
));
new_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary }))
.map_err(|_| "failed to send ActiveTributary")

View File

@@ -45,12 +45,17 @@ async fn sync_test() {
let mut tributary_senders = vec![];
let mut tributary_arcs = vec![];
let mut p2p_threads = vec![];
for (p2p, tributary) in tributaries.drain(..) {
for (i, (p2p, tributary)) in tributaries.drain(..).enumerate() {
let tributary = Arc::new(tributary);
tributary_arcs.push(tributary.clone());
let (new_tributary_send, new_tributary_recv) = broadcast::channel(5);
let (cosign_send, _) = mpsc::unbounded_channel();
let thread = tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv));
let thread = tokio::spawn(handle_p2p_task(
p2p,
cosign_send,
new_tributary_recv,
<Ristretto as Ciphersuite>::generator() * *keys[i],
));
new_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary }))
.map_err(|_| "failed to send ActiveTributary")
@@ -86,7 +91,7 @@ async fn sync_test() {
let syncer_tributary = Arc::new(syncer_tributary);
let (syncer_tributary_send, syncer_tributary_recv) = broadcast::channel(5);
let (cosign_send, _) = mpsc::unbounded_channel();
tokio::spawn(handle_p2p_task(syncer_p2p.clone(), cosign_send, syncer_tributary_recv));
tokio::spawn(handle_p2p_task(syncer_p2p.clone(), cosign_send, syncer_tributary_recv, syncer_key));
syncer_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary {
spec: spec.clone(),