From 7312428a44146c457d8b9c39ad7cc813cf3f7f97 Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Mon, 25 Sep 2023 22:58:40 -0400 Subject: [PATCH] P2P task per Tributary, not per message --- coordinator/src/main.rs | 233 ++++++++++++++++++++-------------------- 1 file changed, 119 insertions(+), 114 deletions(-) diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 41ffd527..65e6ba47 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -346,134 +346,139 @@ pub async fn handle_p2p( p2p: P, mut new_tributary: broadcast::Receiver>, ) { - let tributaries = Arc::new(RwLock::new(HashMap::new())); - loop { - while let Ok(tributary) = { - match new_tributary.try_recv() { - Ok(tributary) => Ok(tributary), - Err(broadcast::error::TryRecvError::Empty) => Err(()), - Err(broadcast::error::TryRecvError::Lagged(_)) => { - panic!("handle_p2p lagged to handle new_tributary") - } - Err(broadcast::error::TryRecvError::Closed) => panic!("new_tributary sender closed"), - } - } { - // TODO: Because the below maintains a read lock, this will never process until all prior P2P - // messages were handled. That's a notable latency spike - tributaries.write().await.insert(tributary.spec.genesis(), tributary); - } + let channels = Arc::new(RwLock::new(HashMap::new())); + tokio::spawn({ + let p2p = p2p.clone(); + let channels = channels.clone(); + async move { + loop { + let tributary = new_tributary.recv().await.unwrap(); + let genesis = tributary.spec.genesis(); - let mut msg = p2p.receive().await; - // Spawn a dedicated task to handle this message, ensuring any singularly latent message - // doesn't hold everything up - // TODO: Move to one task per tributary (or two. One for Tendermint, one for Tributary) - tokio::spawn({ - let p2p = p2p.clone(); - let tributaries = tributaries.clone(); - async move { - match msg.kind { - P2pMessageKind::KeepAlive => {} + let (send, mut recv) = mpsc::unbounded_channel(); + channels.write().await.insert(genesis, send); - P2pMessageKind::Tributary(genesis) => { - let tributaries = tributaries.read().await; - let Some(tributary) = tributaries.get(&genesis) else { - log::debug!("received p2p message for unknown network"); - return; - }; + tokio::spawn({ + let p2p = p2p.clone(); + async move { + let mut msg: Message

= recv.recv().await.unwrap(); + match msg.kind { + P2pMessageKind::KeepAlive => {} - log::trace!("handling message for tributary {:?}", tributary.spec.set()); - if tributary.tributary.handle_message(&msg.msg).await { - P2p::broadcast(&p2p, msg.kind, msg.msg).await; - } - } + P2pMessageKind::Tributary(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + log::trace!("handling message for tributary {:?}", tributary.spec.set()); + if tributary.tributary.handle_message(&msg.msg).await { + P2p::broadcast(&p2p, msg.kind, msg.msg).await; + } + } - // TODO2: Rate limit this per timestamp - // And/or slash on Heartbeat which justifies a response, since the node obviously was - // offline and we must now use our bandwidth to compensate for them? - P2pMessageKind::Heartbeat(genesis) => { - if msg.msg.len() != 40 { - log::error!("validator sent invalid heartbeat"); - return; - } + // TODO2: Rate limit this per timestamp + // And/or slash on Heartbeat which justifies a response, since the node obviously was + // offline and we must now use our bandwidth to compensate for them? + // TODO: Dedicated task for heartbeats + P2pMessageKind::Heartbeat(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + if msg.msg.len() != 40 { + log::error!("validator sent invalid heartbeat"); + return; + } - let tributaries = tributaries.read().await; - let Some(tributary) = tributaries.get(&genesis) else { - log::debug!("received heartbeat message for unknown network"); - return; - }; - let tributary_read = &tributary.tributary; + let tributary_read = &tributary.tributary; - /* - // Have sqrt(n) nodes reply with the blocks - let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; - // Try to have at least 3 responders - if responders < 3 { - responders = tributary.spec.n().min(3).into(); - } - */ + /* + // Have sqrt(n) nodes reply with the blocks + let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64; + // Try to have at least 3 responders + if responders < 3 { + responders = tributary.spec.n().min(3).into(); + } + */ - // Have up to three nodes respond - let responders = u64::from(tributary.spec.n().min(3)); + // Have up to three nodes respond + let responders = u64::from(tributary.spec.n().min(3)); - // Decide which nodes will respond by using the latest block's hash as a mutually - // agreed upon entropy source - // This isn't a secure source of entropy, yet it's fine for this - let entropy = u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); - // If n = 10, responders = 3, we want start to be 0 ..= 7 (so the highest is 7, 8, 9) - // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 - let start = - usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)).unwrap(); - let mut selected = false; - for validator in - &tributary.spec.validators()[start .. (start + usize::try_from(responders).unwrap())] - { - if our_key == validator.0 { - selected = true; - break; + // Decide which nodes will respond by using the latest block's hash as a mutually + // agreed upon entropy source + // This isn't a secure source of entropy, yet it's fine for this + let entropy = + u64::from_le_bytes(tributary_read.tip().await[.. 8].try_into().unwrap()); + // If n = 10, responders = 3, we want `start` to be 0 ..= 7 + // (so the highest is 7, 8, 9) + // entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7 + let start = + usize::try_from(entropy % (u64::from(tributary.spec.n() + 1) - responders)) + .unwrap(); + let mut selected = false; + for validator in &tributary.spec.validators() + [start .. (start + usize::try_from(responders).unwrap())] + { + if our_key == validator.0 { + selected = true; + break; + } + } + if !selected { + log::debug!("received heartbeat and not selected to respond"); + return; + } + + log::debug!("received heartbeat and selected to respond"); + + let reader = tributary_read.reader(); + + let mut latest = msg.msg[.. 32].try_into().unwrap(); + while let Some(next) = reader.block_after(&latest) { + let mut res = reader.block(&next).unwrap().serialize(); + res.extend(reader.commit(&next).unwrap()); + // Also include the timestamp used within the Heartbeat + res.extend(&msg.msg[32 .. 40]); + p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await; + latest = next; + } + } + + P2pMessageKind::Block(msg_genesis) => { + assert_eq!(msg_genesis, genesis); + let mut msg_ref: &[u8] = msg.msg.as_ref(); + let Ok(block) = Block::::read(&mut msg_ref) else { + log::error!("received block message with an invalidly serialized block"); + return; + }; + // Get just the commit + msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); + msg.msg.drain((msg.msg.len() - 8) ..); + + let res = tributary.tributary.sync_block(block, msg.msg).await; + log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); } } - if !selected { - log::debug!("received heartbeat and not selected to respond"); - return; - } - - log::debug!("received heartbeat and selected to respond"); - - let reader = tributary_read.reader(); - - let mut latest = msg.msg[.. 32].try_into().unwrap(); - while let Some(next) = reader.block_after(&latest) { - let mut res = reader.block(&next).unwrap().serialize(); - res.extend(reader.commit(&next).unwrap()); - // Also include the timestamp used within the Heartbeat - res.extend(&msg.msg[32 .. 40]); - p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await; - latest = next; - } } + }); + } + } + }); - P2pMessageKind::Block(genesis) => { - let mut msg_ref: &[u8] = msg.msg.as_ref(); - let Ok(block) = Block::::read(&mut msg_ref) else { - log::error!("received block message with an invalidly serialized block"); - return; - }; - // Get just the commit - msg.msg.drain(.. (msg.msg.len() - msg_ref.len())); - msg.msg.drain((msg.msg.len() - 8) ..); - - let tributaries = tributaries.read().await; - let Some(tributary) = tributaries.get(&genesis) else { - log::debug!("received block message for unknown network"); - return; - }; - - let res = tributary.tributary.sync_block(block, msg.msg).await; - log::debug!("received block from {:?}, sync_block returned {}", msg.sender, res); - } + loop { + let msg = p2p.receive().await; + match msg.kind { + P2pMessageKind::KeepAlive => {} + P2pMessageKind::Tributary(genesis) => { + if let Some(channel) = channels.read().await.get(&genesis) { + channel.send(msg).unwrap(); } } - }); + P2pMessageKind::Heartbeat(genesis) => { + if let Some(channel) = channels.read().await.get(&genesis) { + channel.send(msg).unwrap(); + } + } + P2pMessageKind::Block(genesis) => { + if let Some(channel) = channels.read().await.get(&genesis) { + channel.send(msg).unwrap(); + } + } + } } }