diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index ce6be688..8fe609a0 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -290,6 +290,75 @@ impl LibP2p { IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode()))) } + // Find and connect to peers + let (pending_p2p_connections_send, mut pending_p2p_connections_recv) = + tokio::sync::mpsc::unbounded_channel(); + let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel(); + tokio::spawn({ + let pending_p2p_connections_send = pending_p2p_connections_send.clone(); + async move { + loop { + // TODO: Add better peer management logic? + { + let connect = |addr: Multiaddr| { + log::info!("found peer from substrate: {addr}"); + + let protocols = addr.iter().filter_map(|piece| match piece { + // Drop PeerIds from the Substrate P2p network + Protocol::P2p(_) => None, + // Use our own TCP port + Protocol::Tcp(_) => Some(Protocol::Tcp(PORT)), + other => Some(other), + }); + + let mut new_addr = Multiaddr::empty(); + for protocol in protocols { + new_addr.push(protocol); + } + let addr = new_addr; + log::debug!("transformed found peer: {addr}"); + + // TODO: Check this isn't a duplicate + to_dial_send.send(addr).unwrap(); + }; + + while let Some(network) = pending_p2p_connections_recv.recv().await { + if let Ok(mut nodes) = serai.p2p_validators(network).await { + // If there's an insufficient amount of nodes known, connect to all yet add it + // back and break + if nodes.len() < 3 { + log::warn!( + "insufficient amount of P2P nodes known for {:?}: {}", + network, + nodes.len() + ); + pending_p2p_connections_send.send(network).unwrap(); + for node in nodes { + connect(node); + } + break; + } + + // Randomly select up to 5 + for _ in 0 .. 5 { + if !nodes.is_empty() { + let to_connect = nodes.swap_remove( + usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap()) + .unwrap(), + ); + connect(to_connect); + } + } + } + } + } + // Sleep 60 seconds before moving to the next iteration + tokio::time::sleep(core::time::Duration::from_secs(60)).await; + } + } + }); + + // Manage the actual swarm tokio::spawn({ let mut time_of_last_p2p_message = Instant::now(); @@ -321,66 +390,7 @@ impl LibP2p { async move { let mut set_for_genesis = HashMap::new(); - let mut pending_p2p_connections = vec![]; - // Run this task ad-infinitum loop { - // Handle pending P2P connections - // TODO: Break this out onto its own task with better peer management logic? - { - let mut connect = |addr: Multiaddr| { - log::info!("found peer from substrate: {addr}"); - - let protocols = addr.iter().filter_map(|piece| match piece { - // Drop PeerIds from the Substrate P2p network - Protocol::P2p(_) => None, - // Use our own TCP port - Protocol::Tcp(_) => Some(Protocol::Tcp(PORT)), - other => Some(other), - }); - - let mut new_addr = Multiaddr::empty(); - for protocol in protocols { - new_addr.push(protocol); - } - let addr = new_addr; - log::debug!("transformed found peer: {addr}"); - - if let Err(e) = swarm.dial(addr) { - log::warn!("dialing peer failed: {e:?}"); - } - }; - - while let Some(network) = pending_p2p_connections.pop() { - if let Ok(mut nodes) = serai.p2p_validators(network).await { - // If there's an insufficient amount of nodes known, connect to all yet add it back - // and break - if nodes.len() < 3 { - log::warn!( - "insufficient amount of P2P nodes known for {:?}: {}", - network, - nodes.len() - ); - pending_p2p_connections.push(network); - for node in nodes { - connect(node); - } - break; - } - - // Randomly select up to 5 - for _ in 0 .. 5 { - if !nodes.is_empty() { - let to_connect = nodes.swap_remove( - usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap()) - .unwrap(), - ); - connect(to_connect); - } - } - } - } - } - let time_since_last = Instant::now().duration_since(time_of_last_p2p_message); tokio::select! { biased; @@ -392,7 +402,7 @@ impl LibP2p { let topic = topic_for_set(set); if subscribe { log::info!("subscribing to p2p messages for {set:?}"); - pending_p2p_connections.push(set.network); + pending_p2p_connections_send.send(set.network).unwrap(); set_for_genesis.insert(genesis, set); swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap(); } else { @@ -440,6 +450,14 @@ impl LibP2p { } } + // Handle peers to dial + addr = to_dial_recv.recv() => { + let addr = addr.expect("received address was None (sender dropped?)"); + if let Err(e) = swarm.dial(addr) { + log::warn!("dialing peer failed: {e:?}"); + } + } + // If it's been >80s since we've published a message, publish a KeepAlive since we're // still an active service // This is useful when we have no active tributaries and accordingly aren't sending