From 893a24a1cc615c57a1dc6d80e9f1ea21ba7c70fa Mon Sep 17 00:00:00 2001 From: Luke Parker Date: Thu, 9 Jan 2025 06:57:12 -0500 Subject: [PATCH] Better document bounds in serai-coordinator-p2p --- coordinator/p2p/src/lib.rs | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/coordinator/p2p/src/lib.rs b/coordinator/p2p/src/lib.rs index 1f7743cb..d285c8f0 100644 --- a/coordinator/p2p/src/lib.rs +++ b/coordinator/p2p/src/lib.rs @@ -59,7 +59,8 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab /// A cancel-safe future for the next heartbeat received over the P2P network. /// /// Yields the validator set its for, the latest block hash observed, and a channel to return the - /// descending blocks. + /// descending blocks. This channel MUST NOT and will not have its receiver dropped before a + /// message is sent. fn heartbeat( &self, ) -> impl Send + Future>)>; @@ -67,6 +68,7 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab /// A cancel-safe future for the next request for the notable cosigns of a gloabl session. /// /// Yields the global session the request is for and a channel to return the notable cosigns. + /// This channel MUST NOT and will not have its receiver dropped before a message is sent. fn notable_cosigns_request( &self, ) -> impl Send + Future>)>; @@ -100,8 +102,11 @@ fn handle_heartbeat( while (res.len() < heartbeat::MIN_BLOCKS_PER_BATCH) || (res_size < heartbeat::BATCH_SIZE_LIMIT) { let Some(block_after) = reader.block_after(&latest_block_hash) else { break }; - let block = reader.block(&block_after).unwrap().serialize(); - let commit = reader.commit(&block_after).unwrap(); + // These `break` conditions should only occur under edge cases, such as if we're actively + // deleting this Tributary due to being done with it + let Some(block) = reader.block(&block_after) else { break }; + let block = block.serialize(); + let Some(commit) = reader.commit(&block_after) else { break }; res_size += 8 + block.len() + 8 + commit.len(); res.push(TributaryBlockWithCommit { block, commit }); @@ -132,7 +137,7 @@ pub async fn run( loop { tokio::select! { tributary = add_tributary.recv() => { - let (set, tributary) = tributary.expect("add_tributary send was dropped?"); + let (set, tributary) = tributary.expect("add_tributary send was dropped"); let reader = tributary.reader(); readers.insert(set, reader.clone()); @@ -157,7 +162,7 @@ pub async fn run( }); } set = retire_tributary.recv() => { - let set = set.expect("retire_tributary send was dropped?"); + let set = set.expect("retire_tributary send was dropped"); let Some(reader) = readers.remove(&set) else { continue }; tributaries.remove(&reader.genesis()).expect("tributary reader but no tributary"); heartbeat_tasks.remove(&set).expect("tributary but no heartbeat task"); @@ -187,7 +192,7 @@ pub async fn run( // We don't call `Cosigning::intake_cosign` here as that can only be called from a single // location. We also need to intake the cosigns we produce, which means we need to merge // these streams (signing, network) somehow. That's done with this mpsc channel - send_cosigns.send(cosign).expect("channel receiving cosigns was dropped?"); + send_cosigns.send(cosign).expect("channel receiving cosigns was dropped"); } } }