From c84931c6ae84c61ea3b9a0a384c4461f15fd51ba Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Sun, 21 Apr 2024 07:26:16 -0400 Subject: [PATCH] Retry if initial dials fail, not just upon disconnect --- coordinator/src/p2p.rs | 59 ++++++++++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 14 deletions(-) diff --git a/coordinator/src/p2p.rs b/coordinator/src/p2p.rs index 8e88f6ee..b69da990 100644 --- a/coordinator/src/p2p.rs +++ b/coordinator/src/p2p.rs @@ -294,6 +294,8 @@ impl LibP2p { // The addrs we're currently dialing, and the networks associated with them let dialing_peers = Arc::new(RwLock::new(HashMap::new())); + // The peers we're currently connected to, and the networks associated with them + let connected_peers = Arc::new(RwLock::new(HashMap::>::new())); // Find and connect to peers let (connect_to_network_send, mut connect_to_network_recv) = @@ -301,12 +303,16 @@ impl LibP2p { let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel(); tokio::spawn({ let dialing_peers = dialing_peers.clone(); + let connected_peers = connected_peers.clone(); + let connect_to_network_send = connect_to_network_send.clone(); async move { loop { let connect = |network: NetworkId, addr: Multiaddr| { let dialing_peers = dialing_peers.clone(); + let connected_peers = connected_peers.clone(); let to_dial_send = to_dial_send.clone(); + let connect_to_network_send = connect_to_network_send.clone(); async move { log::info!("found peer from substrate: {addr}"); @@ -344,15 +350,37 @@ impl LibP2p { // within a temporal window tokio::spawn({ let dialing_peers = dialing_peers.clone(); + let connected_peers = connected_peers.clone(); + let connect_to_network_send = connect_to_network_send.clone(); let addr = addr.clone(); async move { tokio::time::sleep(core::time::Duration::from_secs(60)).await; let mut dialing_peers = dialing_peers.write().await; - dialing_peers.remove(&addr); + if let Some(expected_nets) = dialing_peers.remove(&addr) { + log::debug!("removed addr from dialing upon timeout: {addr}"); + + // TODO: De-duplicate this below instance + // If we failed to dial and haven't gotten enough actual connections, retry + let connected_peers = connected_peers.read().await; + for net in expected_nets { + let mut remaining_peers = 0; + for nets in connected_peers.values() { + if nets.contains(&net) { + remaining_peers += 1; + } + } + // If we do not, start connecting to this network again + if remaining_peers < 3 { + connect_to_network_send.send(net).expect( + "couldn't send net to connect to due to disconnects (receiver dropped?)", + ); + } + } + } } }); - if !is_fresh_dial { + if is_fresh_dial { to_dial_send.send((addr, nets)).unwrap(); } } @@ -374,8 +402,6 @@ impl LibP2p { network, nodes.len() ); - // TODO: We weren't retry so long as we're told of sufficient nodes - // We should stop retrying when we actually connect to sufficient nodes to_retry.push(network); for node in nodes { connect(network, node).await; @@ -435,8 +461,7 @@ impl LibP2p { } async move { - // The peers we're currently connected to, and the networks associated with them - let mut connected_peers = HashMap::new(); + let connected_peers = connected_peers.clone(); let mut set_for_genesis = HashMap::new(); loop { @@ -502,20 +527,26 @@ impl LibP2p { HashSet::new() } }; - connected_peers.insert(addr.clone(), nets); + { + let mut connected_peers = connected_peers.write().await; + connected_peers.insert(addr.clone(), nets); - log::debug!( - "connection established to peer {} in connection ID {}, connected peers: {}", - &peer_id, - &connection_id, - connected_peers.len(), - ); + log::debug!( + "connection established to peer {} in connection ID {}, connected peers: {}", + &peer_id, + &connection_id, + connected_peers.len(), + ); + } } Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, .. }) => { + let mut connected_peers = connected_peers.write().await; let nets = connected_peers .remove(endpoint.get_remote_address()) .expect("closed connection to peer which never connected"); + // Downgrade to a read lock + let connected_peers = connected_peers.downgrade(); // For each net we lost a peer for, check if we still have sufficient peers // overall @@ -558,7 +589,7 @@ impl LibP2p { addr_and_nets.expect("received address was None (sender dropped?)"); // If we've already dialed and connected to this address, don't further dial them // Just associate these networks with them - if let Some(existing_nets) = connected_peers.get_mut(&addr) { + if let Some(existing_nets) = connected_peers.write().await.get_mut(&addr) { for net in nets { existing_nets.insert(net); }