mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 12:19:24 +00:00
Better document bounds in serai-coordinator-p2p
This commit is contained in:
@@ -59,7 +59,8 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab
|
|||||||
/// A cancel-safe future for the next heartbeat received over the P2P network.
|
/// A cancel-safe future for the next heartbeat received over the P2P network.
|
||||||
///
|
///
|
||||||
/// Yields the validator set its for, the latest block hash observed, and a channel to return the
|
/// Yields the validator set its for, the latest block hash observed, and a channel to return the
|
||||||
/// descending blocks.
|
/// descending blocks. This channel MUST NOT and will not have its receiver dropped before a
|
||||||
|
/// message is sent.
|
||||||
fn heartbeat(
|
fn heartbeat(
|
||||||
&self,
|
&self,
|
||||||
) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)>;
|
) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)>;
|
||||||
@@ -67,6 +68,7 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab
|
|||||||
/// A cancel-safe future for the next request for the notable cosigns of a gloabl session.
|
/// A cancel-safe future for the next request for the notable cosigns of a gloabl session.
|
||||||
///
|
///
|
||||||
/// Yields the global session the request is for and a channel to return the notable cosigns.
|
/// Yields the global session the request is for and a channel to return the notable cosigns.
|
||||||
|
/// This channel MUST NOT and will not have its receiver dropped before a message is sent.
|
||||||
fn notable_cosigns_request(
|
fn notable_cosigns_request(
|
||||||
&self,
|
&self,
|
||||||
) -> impl Send + Future<Output = ([u8; 32], oneshot::Sender<Vec<SignedCosign>>)>;
|
) -> impl Send + Future<Output = ([u8; 32], oneshot::Sender<Vec<SignedCosign>>)>;
|
||||||
@@ -100,8 +102,11 @@ fn handle_heartbeat<D: Db, T: TransactionTrait>(
|
|||||||
while (res.len() < heartbeat::MIN_BLOCKS_PER_BATCH) || (res_size < heartbeat::BATCH_SIZE_LIMIT) {
|
while (res.len() < heartbeat::MIN_BLOCKS_PER_BATCH) || (res_size < heartbeat::BATCH_SIZE_LIMIT) {
|
||||||
let Some(block_after) = reader.block_after(&latest_block_hash) else { break };
|
let Some(block_after) = reader.block_after(&latest_block_hash) else { break };
|
||||||
|
|
||||||
let block = reader.block(&block_after).unwrap().serialize();
|
// These `break` conditions should only occur under edge cases, such as if we're actively
|
||||||
let commit = reader.commit(&block_after).unwrap();
|
// deleting this Tributary due to being done with it
|
||||||
|
let Some(block) = reader.block(&block_after) else { break };
|
||||||
|
let block = block.serialize();
|
||||||
|
let Some(commit) = reader.commit(&block_after) else { break };
|
||||||
res_size += 8 + block.len() + 8 + commit.len();
|
res_size += 8 + block.len() + 8 + commit.len();
|
||||||
res.push(TributaryBlockWithCommit { block, commit });
|
res.push(TributaryBlockWithCommit { block, commit });
|
||||||
|
|
||||||
@@ -132,7 +137,7 @@ pub async fn run<TD: Db, Tx: TransactionTrait, P: P2p>(
|
|||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
tributary = add_tributary.recv() => {
|
tributary = add_tributary.recv() => {
|
||||||
let (set, tributary) = tributary.expect("add_tributary send was dropped?");
|
let (set, tributary) = tributary.expect("add_tributary send was dropped");
|
||||||
let reader = tributary.reader();
|
let reader = tributary.reader();
|
||||||
readers.insert(set, reader.clone());
|
readers.insert(set, reader.clone());
|
||||||
|
|
||||||
@@ -157,7 +162,7 @@ pub async fn run<TD: Db, Tx: TransactionTrait, P: P2p>(
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
set = retire_tributary.recv() => {
|
set = retire_tributary.recv() => {
|
||||||
let set = set.expect("retire_tributary send was dropped?");
|
let set = set.expect("retire_tributary send was dropped");
|
||||||
let Some(reader) = readers.remove(&set) else { continue };
|
let Some(reader) = readers.remove(&set) else { continue };
|
||||||
tributaries.remove(&reader.genesis()).expect("tributary reader but no tributary");
|
tributaries.remove(&reader.genesis()).expect("tributary reader but no tributary");
|
||||||
heartbeat_tasks.remove(&set).expect("tributary but no heartbeat task");
|
heartbeat_tasks.remove(&set).expect("tributary but no heartbeat task");
|
||||||
@@ -187,7 +192,7 @@ pub async fn run<TD: Db, Tx: TransactionTrait, P: P2p>(
|
|||||||
// We don't call `Cosigning::intake_cosign` here as that can only be called from a single
|
// We don't call `Cosigning::intake_cosign` here as that can only be called from a single
|
||||||
// location. We also need to intake the cosigns we produce, which means we need to merge
|
// location. We also need to intake the cosigns we produce, which means we need to merge
|
||||||
// these streams (signing, network) somehow. That's done with this mpsc channel
|
// these streams (signing, network) somehow. That's done with this mpsc channel
|
||||||
send_cosigns.send(cosign).expect("channel receiving cosigns was dropped?");
|
send_cosigns.send(cosign).expect("channel receiving cosigns was dropped");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user