4 Commits

Author SHA1 Message Date
Luke Parker
47eb793ce9 Slash upon Tendermint evidence
Decoding slash evidence requires specifying the instantiated generic
`TendermintNetwork`. While irrelevant, that generic includes a type satisfying
`tributary::P2p`. It was only possible to route now that we've redone the P2P
API.
2025-01-09 06:58:00 -05:00
Luke Parker
9b0b5fd1e2 Have serai-cosign index finalized blocks' numbers 2025-01-09 06:57:26 -05:00
Luke Parker
893a24a1cc Better document bounds in serai-coordinator-p2p 2025-01-09 06:57:12 -05:00
Luke Parker
b101e2211a Complete serai-coordinator-p2p 2025-01-09 06:23:14 -05:00
13 changed files with 212 additions and 103 deletions

24
Cargo.lock generated
View File

@@ -840,18 +840,6 @@ dependencies = [
"futures-core", "futures-core",
] ]
[[package]]
name = "async-channel"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
dependencies = [
"concurrent-queue",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]] [[package]]
name = "async-io" name = "async-io"
version = "2.4.0" version = "2.4.0"
@@ -7465,7 +7453,7 @@ version = "0.10.0-dev"
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
dependencies = [ dependencies = [
"array-bytes", "array-bytes",
"async-channel 1.9.0", "async-channel",
"async-trait", "async-trait",
"asynchronous-codec", "asynchronous-codec",
"bytes", "bytes",
@@ -7506,7 +7494,7 @@ name = "sc-network-bitswap"
version = "0.10.0-dev" version = "0.10.0-dev"
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
dependencies = [ dependencies = [
"async-channel 1.9.0", "async-channel",
"cid", "cid",
"futures", "futures",
"libp2p-identity", "libp2p-identity",
@@ -7563,7 +7551,7 @@ version = "0.10.0-dev"
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
dependencies = [ dependencies = [
"array-bytes", "array-bytes",
"async-channel 1.9.0", "async-channel",
"futures", "futures",
"libp2p-identity", "libp2p-identity",
"log", "log",
@@ -7584,7 +7572,7 @@ version = "0.10.0-dev"
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
dependencies = [ dependencies = [
"array-bytes", "array-bytes",
"async-channel 1.9.0", "async-channel",
"async-trait", "async-trait",
"fork-tree", "fork-tree",
"futures", "futures",
@@ -7958,7 +7946,7 @@ name = "sc-utils"
version = "4.0.0-dev" version = "4.0.0-dev"
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a" source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
dependencies = [ dependencies = [
"async-channel 1.9.0", "async-channel",
"futures", "futures",
"futures-timer", "futures-timer",
"lazy_static", "lazy_static",
@@ -8384,7 +8372,6 @@ dependencies = [
name = "serai-coordinator-p2p" name = "serai-coordinator-p2p"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"async-channel 2.3.1",
"borsh", "borsh",
"futures-lite", "futures-lite",
"log", "log",
@@ -8392,6 +8379,7 @@ dependencies = [
"serai-cosign", "serai-cosign",
"serai-db", "serai-db",
"serai-task", "serai-task",
"tokio",
"tributary-chain", "tributary-chain",
] ]

View File

@@ -78,7 +78,7 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
// Check we are indexing a linear chain // Check we are indexing a linear chain
if (block_number > 1) && if (block_number > 1) &&
(<[u8; 32]>::from(block.header.parent_hash) != (<[u8; 32]>::from(block.header.parent_hash) !=
SubstrateBlocks::get(&txn, block_number - 1) SubstrateBlockHash::get(&txn, block_number - 1)
.expect("indexing a block but haven't indexed its parent")) .expect("indexing a block but haven't indexed its parent"))
{ {
Err(format!( Err(format!(
@@ -86,14 +86,16 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
block_number - 1 block_number - 1
))?; ))?;
} }
SubstrateBlocks::set(&mut txn, block_number, &block.hash()); let block_hash = block.hash();
SubstrateBlockHash::set(&mut txn, block_number, &block_hash);
SubstrateBlockNumber::set(&mut txn, block_hash, &block_number);
let global_session_for_this_block = LatestGlobalSessionIntended::get(&txn); let global_session_for_this_block = LatestGlobalSessionIntended::get(&txn);
// If this is notable, it creates a new global session, which we index into the database // If this is notable, it creates a new global session, which we index into the database
// now // now
if has_events == HasEvents::Notable { if has_events == HasEvents::Notable {
let serai = self.serai.as_of(block.hash()); let serai = self.serai.as_of(block_hash);
let sets_and_keys = cosigning_sets(&serai).await?; let sets_and_keys = cosigning_sets(&serai).await?;
let global_session = let global_session =
GlobalSession::id(sets_and_keys.iter().map(|(set, _key)| *set).collect()); GlobalSession::id(sets_and_keys.iter().map(|(set, _key)| *set).collect());
@@ -159,7 +161,7 @@ impl<D: Db> ContinuallyRan for CosignIntendTask<D> {
&CosignIntent { &CosignIntent {
global_session: global_session_for_this_block, global_session: global_session_for_this_block,
block_number, block_number,
block_hash: block.hash(), block_hash,
notable: has_events == HasEvents::Notable, notable: has_events == HasEvents::Notable,
}, },
); );

View File

@@ -127,7 +127,8 @@ create_db! {
// The following are populated by the intend task and used throughout the library // The following are populated by the intend task and used throughout the library
// An index of Substrate blocks // An index of Substrate blocks
SubstrateBlocks: (block_number: u64) -> [u8; 32], SubstrateBlockHash: (block_number: u64) -> [u8; 32],
SubstrateBlockNumber: (block_hash: [u8; 32]) -> u64,
// A mapping from a global session's ID to its relevant information. // A mapping from a global session's ID to its relevant information.
GlobalSessions: (global_session: [u8; 32]) -> GlobalSession, GlobalSessions: (global_session: [u8; 32]) -> GlobalSession,
// The last block to be cosigned by a global session. // The last block to be cosigned by a global session.
@@ -270,17 +271,24 @@ impl<D: Db> Cosigning<D> {
Ok(LatestCosignedBlockNumber::get(getter).unwrap_or(0)) Ok(LatestCosignedBlockNumber::get(getter).unwrap_or(0))
} }
/// Fetch an cosigned Substrate block by its block number. /// Fetch a cosigned Substrate block's hash by its block number.
pub fn cosigned_block(getter: &impl Get, block_number: u64) -> Result<Option<[u8; 32]>, Faulted> { pub fn cosigned_block(getter: &impl Get, block_number: u64) -> Result<Option<[u8; 32]>, Faulted> {
if block_number > Self::latest_cosigned_block_number(getter)? { if block_number > Self::latest_cosigned_block_number(getter)? {
return Ok(None); return Ok(None);
} }
Ok(Some( Ok(Some(
SubstrateBlocks::get(getter, block_number).expect("cosigned block but didn't index it"), SubstrateBlockHash::get(getter, block_number).expect("cosigned block but didn't index it"),
)) ))
} }
/// Fetch a finalized block's number by its hash.
///
/// This block is not guaranteed to be cosigned.
pub fn finalized_block_number(getter: &impl Get, block_hash: [u8; 32]) -> Option<u64> {
SubstrateBlockNumber::get(getter, block_hash)
}
/// Fetch the notable cosigns for a global session in order to respond to requests. /// Fetch the notable cosigns for a global session in order to respond to requests.
/// ///
/// If this global session hasn't produced any notable cosigns, this will return the latest /// If this global session hasn't produced any notable cosigns, this will return the latest
@@ -345,7 +353,7 @@ impl<D: Db> Cosigning<D> {
let network = cosign.cosigner; let network = cosign.cosigner;
// Check our indexed blockchain includes a block with this block number // Check our indexed blockchain includes a block with this block number
let Some(our_block_hash) = SubstrateBlocks::get(&self.db, cosign.block_number) else { let Some(our_block_hash) = SubstrateBlockHash::get(&self.db, cosign.block_number) else {
return Ok(true); return Ok(true);
}; };
let faulty = cosign.block_hash != our_block_hash; let faulty = cosign.block_hash != our_block_hash;

View File

@@ -26,8 +26,8 @@ serai-client = { path = "../../substrate/client", default-features = false, feat
serai-cosign = { path = "../cosign" } serai-cosign = { path = "../cosign" }
tributary = { package = "tributary-chain", path = "../tributary" } tributary = { package = "tributary-chain", path = "../tributary" }
async-channel = { version = "2", default-features = false, features = ["std"] }
futures-lite = { version = "2", default-features = false, features = ["std"] } futures-lite = { version = "2", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false, features = ["sync", "macros"] }
log = { version = "0.4", default-features = false, features = ["std"] } log = { version = "0.4", default-features = false, features = ["std"] }
serai-task = { path = "../../common/task", version = "0.1" } serai-task = { path = "../../common/task", version = "0.1" }

View File

@@ -1,3 +1,3 @@
# Serai Coordinator P2P # Serai Coordinator P2P
The P2P abstraction used by Serai's coordinator. The P2P abstraction used by Serai's coordinator, and tasks over it.

View File

@@ -19,7 +19,7 @@ use serai_client::{
Serai, Serai,
}; };
use tokio::sync::{mpsc, Mutex, RwLock}; use tokio::sync::{mpsc, oneshot, Mutex, RwLock};
use serai_task::{Task, ContinuallyRan}; use serai_task::{Task, ContinuallyRan};
@@ -35,7 +35,7 @@ use libp2p::{
SwarmBuilder, SwarmBuilder,
}; };
use serai_coordinator_p2p::{oneshot, Heartbeat, TributaryBlockWithCommit}; use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit};
/// A struct to sync the validators from the Serai node in order to keep track of them. /// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators; mod validators;

View File

@@ -19,7 +19,7 @@ use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit};
/// The maximum message size for the request-response protocol /// The maximum message size for the request-response protocol
// This is derived from the heartbeat message size as it's our largest message // This is derived from the heartbeat message size as it's our largest message
pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize = pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
(tributary::BLOCK_SIZE_LIMIT * serai_coordinator_p2p::heartbeat::BLOCKS_PER_BATCH) + 1024; 1024 + serai_coordinator_p2p::heartbeat::BATCH_SIZE_LIMIT;
const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0"; const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";

View File

@@ -8,7 +8,7 @@ use borsh::BorshDeserialize;
use serai_client::validator_sets::primitives::ValidatorSet; use serai_client::validator_sets::primitives::ValidatorSet;
use tokio::sync::{mpsc, RwLock}; use tokio::sync::{mpsc, oneshot, RwLock};
use serai_task::TaskHandle; use serai_task::TaskHandle;
@@ -21,7 +21,7 @@ use libp2p::{
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm}, swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
}; };
use serai_coordinator_p2p::{oneshot, Heartbeat}; use serai_coordinator_p2p::Heartbeat;
use crate::{ use crate::{
Peers, BehaviorEvent, Behavior, Peers, BehaviorEvent, Behavior,
@@ -69,11 +69,6 @@ pub(crate) struct SwarmTask {
inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>, inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>,
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>, heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
/* TODO
let cosigns = Cosigning::<D>::notable_cosigns(&self.db, global_session);
let res = reqres::Response::NotableCosigns(cosigns);
let _: Result<_, _> = self.swarm.behaviour_mut().reqres.send_response(channel, res);
*/
notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>, notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>,
inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>, inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>,
} }

View File

@@ -1,7 +1,7 @@
use core::future::Future; use core::future::Future;
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use serai_client::validator_sets::primitives::ValidatorSet; use serai_client::validator_sets::primitives::{MAX_KEY_SHARES_PER_SET, ValidatorSet};
use futures_lite::FutureExt; use futures_lite::FutureExt;
@@ -15,19 +15,32 @@ use crate::{Heartbeat, Peer, P2p};
// Amount of blocks in a minute // Amount of blocks in a minute
const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize; const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize;
/// The maximum amount of blocks to include/included within a batch. /// The minimum amount of blocks to include/included within a batch, assuming there's blocks to
pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1; /// include in the batch.
///
/// This decides the size limit of the Batch (the Block size limit multiplied by the minimum amount
/// of blocks we'll send). The actual amount of blocks sent will be the amount which fits within
/// the size limit.
pub const MIN_BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
/// The size limit for a batch of blocks sent in response to a Heartbeat.
///
/// This estimates the size of a commit as `32 + (MAX_VALIDATORS * 128)`. At the time of writing, a
/// commit is `8 + (validators * 32) + (32 + (validators * 32))` (for the time, list of validators,
/// and aggregate signature). Accordingly, this should be a safe over-estimate.
pub const BATCH_SIZE_LIMIT: usize = MIN_BLOCKS_PER_BATCH *
(tributary::BLOCK_SIZE_LIMIT + 32 + ((MAX_KEY_SHARES_PER_SET as usize) * 128));
/// Sends a heartbeat to other validators on regular intervals informing them of our Tributary's /// Sends a heartbeat to other validators on regular intervals informing them of our Tributary's
/// tip. /// tip.
/// ///
/// If the other validator has more blocks then we do, they're expected to inform us. This forms /// If the other validator has more blocks then we do, they're expected to inform us. This forms
/// the sync protocol for our Tributaries. /// the sync protocol for our Tributaries.
pub struct HeartbeatTask<TD: Db, Tx: TransactionTrait, P: P2p> { pub(crate) struct HeartbeatTask<TD: Db, Tx: TransactionTrait, P: P2p> {
set: ValidatorSet, pub(crate) set: ValidatorSet,
tributary: Tributary<TD, Tx, P>, pub(crate) tributary: Tributary<TD, Tx, P>,
reader: TributaryReader<TD, Tx>, pub(crate) reader: TributaryReader<TD, Tx>,
p2p: P, pub(crate) p2p: P,
} }
impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> { impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> {
@@ -80,7 +93,7 @@ impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD,
// This is the final batch if it has less than the maximum amount of blocks // This is the final batch if it has less than the maximum amount of blocks
// (signifying there weren't more blocks after this to fill the batch with) // (signifying there weren't more blocks after this to fill the batch with)
let final_batch = blocks.len() < BLOCKS_PER_BATCH; let final_batch = blocks.len() < MIN_BLOCKS_PER_BATCH;
// Sync each block // Sync each block
for block_with_commit in blocks { for block_with_commit in blocks {

View File

@@ -3,18 +3,23 @@
#![deny(missing_docs)] #![deny(missing_docs)]
use core::future::Future; use core::future::Future;
use std::collections::HashMap;
use borsh::{BorshSerialize, BorshDeserialize}; use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet}; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use serai_cosign::SignedCosign; use serai_db::Db;
use tributary::{ReadWrite, TransactionTrait, Tributary, TributaryReader};
use serai_cosign::{SignedCosign, Cosigning};
/// A oneshot channel. use tokio::sync::{mpsc, oneshot};
pub mod oneshot;
use serai_task::{Task, ContinuallyRan};
/// The heartbeat task, effecting sync of Tributaries /// The heartbeat task, effecting sync of Tributaries
pub mod heartbeat; pub mod heartbeat;
use crate::heartbeat::HeartbeatTask;
/// A heartbeat for a Tributary. /// A heartbeat for a Tributary.
#[derive(Clone, Copy, BorshSerialize, BorshDeserialize, Debug)] #[derive(Clone, Copy, BorshSerialize, BorshDeserialize, Debug)]
@@ -54,7 +59,8 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab
/// A cancel-safe future for the next heartbeat received over the P2P network. /// A cancel-safe future for the next heartbeat received over the P2P network.
/// ///
/// Yields the validator set its for, the latest block hash observed, and a channel to return the /// Yields the validator set its for, the latest block hash observed, and a channel to return the
/// descending blocks. /// descending blocks. This channel MUST NOT and will not have its receiver dropped before a
/// message is sent.
fn heartbeat( fn heartbeat(
&self, &self,
) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)>; ) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)>;
@@ -62,6 +68,7 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab
/// A cancel-safe future for the next request for the notable cosigns of a gloabl session. /// A cancel-safe future for the next request for the notable cosigns of a gloabl session.
/// ///
/// Yields the global session the request is for and a channel to return the notable cosigns. /// Yields the global session the request is for and a channel to return the notable cosigns.
/// This channel MUST NOT and will not have its receiver dropped before a message is sent.
fn notable_cosigns_request( fn notable_cosigns_request(
&self, &self,
) -> impl Send + Future<Output = ([u8; 32], oneshot::Sender<Vec<SignedCosign>>)>; ) -> impl Send + Future<Output = ([u8; 32], oneshot::Sender<Vec<SignedCosign>>)>;
@@ -74,3 +81,119 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab
/// A cancel-safe future for the next cosign received. /// A cancel-safe future for the next cosign received.
fn cosign(&self) -> impl Send + Future<Output = SignedCosign>; fn cosign(&self) -> impl Send + Future<Output = SignedCosign>;
} }
fn handle_notable_cosigns_request<D: Db>(
db: &D,
global_session: [u8; 32],
channel: oneshot::Sender<Vec<SignedCosign>>,
) {
let cosigns = Cosigning::<D>::notable_cosigns(db, global_session);
channel.send(cosigns).expect("channel listening for cosign oneshot response was dropped?");
}
fn handle_heartbeat<D: Db, T: TransactionTrait>(
reader: &TributaryReader<D, T>,
mut latest_block_hash: [u8; 32],
channel: oneshot::Sender<Vec<TributaryBlockWithCommit>>,
) {
let mut res_size = 8;
let mut res = vec![];
// This former case should be covered by this latter case
while (res.len() < heartbeat::MIN_BLOCKS_PER_BATCH) || (res_size < heartbeat::BATCH_SIZE_LIMIT) {
let Some(block_after) = reader.block_after(&latest_block_hash) else { break };
// These `break` conditions should only occur under edge cases, such as if we're actively
// deleting this Tributary due to being done with it
let Some(block) = reader.block(&block_after) else { break };
let block = block.serialize();
let Some(commit) = reader.commit(&block_after) else { break };
res_size += 8 + block.len() + 8 + commit.len();
res.push(TributaryBlockWithCommit { block, commit });
latest_block_hash = block_after;
}
channel
.send(res)
.map_err(|_| ())
.expect("channel listening for heartbeat oneshot response was dropped?");
}
/// Run the P2P instance.
///
/// `add_tributary`'s and `retire_tributary's senders, along with `send_cosigns`'s receiver, must
/// never be dropped. `retire_tributary` is not required to only be instructed with added
/// Tributaries.
pub async fn run<TD: Db, Tx: TransactionTrait, P: P2p>(
db: impl Db,
p2p: P,
mut add_tributary: mpsc::UnboundedReceiver<(ValidatorSet, Tributary<TD, Tx, P>)>,
mut retire_tributary: mpsc::UnboundedReceiver<ValidatorSet>,
send_cosigns: mpsc::UnboundedSender<SignedCosign>,
) {
let mut readers = HashMap::<ValidatorSet, TributaryReader<TD, Tx>>::new();
let mut tributaries = HashMap::<[u8; 32], mpsc::UnboundedSender<Vec<u8>>>::new();
let mut heartbeat_tasks = HashMap::<ValidatorSet, _>::new();
loop {
tokio::select! {
tributary = add_tributary.recv() => {
let (set, tributary) = tributary.expect("add_tributary send was dropped");
let reader = tributary.reader();
readers.insert(set, reader.clone());
let (heartbeat_task_def, heartbeat_task) = Task::new();
tokio::spawn(
(HeartbeatTask {
set,
tributary: tributary.clone(),
reader: reader.clone(),
p2p: p2p.clone(),
}).continually_run(heartbeat_task_def, vec![])
);
heartbeat_tasks.insert(set, heartbeat_task);
let (tributary_message_send, mut tributary_message_recv) = mpsc::unbounded_channel();
tributaries.insert(tributary.genesis(), tributary_message_send);
// For as long as this sender exists, handle the messages from it on a dedicated task
tokio::spawn(async move {
while let Some(message) = tributary_message_recv.recv().await {
tributary.handle_message(&message).await;
}
});
}
set = retire_tributary.recv() => {
let set = set.expect("retire_tributary send was dropped");
let Some(reader) = readers.remove(&set) else { continue };
tributaries.remove(&reader.genesis()).expect("tributary reader but no tributary");
heartbeat_tasks.remove(&set).expect("tributary but no heartbeat task");
}
(heartbeat, channel) = p2p.heartbeat() => {
if let Some(reader) = readers.get(&heartbeat.set) {
let reader = reader.clone(); // This is a cheap clone
// We spawn this on a task due to the DB reads needed
tokio::spawn(async move {
handle_heartbeat(&reader, heartbeat.latest_block_hash, channel)
});
}
}
(global_session, channel) = p2p.notable_cosigns_request() => {
tokio::spawn({
let db = db.clone();
async move { handle_notable_cosigns_request(&db, global_session, channel) }
});
}
(tributary, message) = p2p.tributary_message() => {
if let Some(tributary) = tributaries.get(&tributary) {
tributary.send(message).expect("tributary message recv was dropped?");
}
}
cosign = p2p.cosign() => {
// We don't call `Cosigning::intake_cosign` here as that can only be called from a single
// location. We also need to intake the cosigns we produce, which means we need to merge
// these streams (signing, network) somehow. That's done with this mpsc channel
send_cosigns.send(cosign).expect("channel receiving cosigns was dropped");
}
}
}
}

View File

@@ -1,35 +0,0 @@
use core::{
pin::Pin,
task::{Poll, Context},
future::Future,
};
pub use async_channel::{SendError, RecvError};
/// The sender for a oneshot channel.
pub struct Sender<T: Send>(async_channel::Sender<T>);
impl<T: Send> Sender<T> {
/// Send a value down the channel.
///
/// Returns an error if the channel's receiver was dropped.
pub fn send(self, msg: T) -> Result<(), SendError<T>> {
self.0.send_blocking(msg)
}
}
/// The receiver for a oneshot channel.
pub struct Receiver<T: Send>(async_channel::Receiver<T>);
impl<T: Send> Future for Receiver<T> {
type Output = Result<T, RecvError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let recv = self.0.recv();
futures_lite::pin!(recv);
recv.poll(cx)
}
}
/// Create a new oneshot channel.
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (send, recv) = async_channel::bounded(1);
(Sender(send), Receiver(recv))
}

View File

@@ -1,7 +1,7 @@
mod tributary; mod tributary;
mod p2p { mod p2p {
use serai_coordinator_p2p::*; pub use serai_coordinator_p2p::*;
pub use serai_coordinator_libp2p_p2p::Libp2p; pub use serai_coordinator_libp2p_p2p::Libp2p;
} }

View File

@@ -1,4 +1,4 @@
use core::future::Future; use core::{marker::PhantomData, future::Future};
use std::collections::HashMap; use std::collections::HashMap;
use ciphersuite::group::GroupEncoding; use ciphersuite::group::GroupEncoding;
@@ -22,20 +22,27 @@ use serai_task::ContinuallyRan;
use messages::sign::VariantSignId; use messages::sign::VariantSignId;
use crate::tributary::{ use serai_cosign::Cosigning;
db::*,
transaction::{SigningProtocolRound, Signed, Transaction}, use crate::{
p2p::P2p,
tributary::{
db::*,
transaction::{SigningProtocolRound, Signed, Transaction},
},
}; };
struct ScanBlock<'a, D: DbTxn, TD: Db> { struct ScanBlock<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> {
txn: &'a mut D, _db: PhantomData<D>,
_p2p: PhantomData<P>,
txn: &'a mut DT,
set: ValidatorSet, set: ValidatorSet,
validators: &'a [SeraiAddress], validators: &'a [SeraiAddress],
total_weight: u64, total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>, validator_weights: &'a HashMap<SeraiAddress, u64>,
tributary: &'a TributaryReader<TD, Transaction>, tributary: &'a TributaryReader<TD, Transaction>,
} }
impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> { impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
fn potentially_start_cosign(&mut self) { fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one // Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.txn, self.set) { if TributaryDb::actively_cosigning(self.txn, self.set) {
@@ -49,7 +56,11 @@ impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
return; return;
}; };
let substrate_block_number = todo!("TODO"); let Some(substrate_block_number) =
Cosigning::<D>::finalized_block_number(self.txn, latest_substrate_block_to_cosign)
else {
panic!("cosigning a block our cosigner didn't index")
};
// Mark us as actively cosigning // Mark us as actively cosigning
TributaryDb::start_cosigning(self.txn, self.set, substrate_block_number); TributaryDb::start_cosigning(self.txn, self.set, substrate_block_number);
@@ -320,12 +331,11 @@ impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
Evidence::ConflictingMessages(first, second) => (first, Some(second)), Evidence::ConflictingMessages(first, second) => (first, Some(second)),
Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None), Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None),
}; };
/* TODO
let msgs = ( let msgs = (
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.0).unwrap(), decode_signed_message::<TendermintNetwork<TD, Transaction, P>>(&data.0).unwrap(),
if data.1.is_some() { if data.1.is_some() {
Some( Some(
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.1.unwrap()) decode_signed_message::<TendermintNetwork<TD, Transaction, P>>(&data.1.unwrap())
.unwrap(), .unwrap(),
) )
} else { } else {
@@ -336,9 +346,11 @@ impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
// Since anything with evidence is fundamentally faulty behavior, not just temporal // Since anything with evidence is fundamentally faulty behavior, not just temporal
// errors, mark the node as fatally slashed // errors, mark the node as fatally slashed
TributaryDb::fatal_slash( TributaryDb::fatal_slash(
self.txn, msgs.0.msg.sender, &format!("invalid tendermint messages: {msgs:?}")); self.txn,
*/ self.set,
todo!("TODO") SeraiAddress(msgs.0.msg.sender),
&format!("invalid tendermint messages: {msgs:?}"),
);
} }
TributaryTransaction::Application(tx) => { TributaryTransaction::Application(tx) => {
self.handle_application_tx(block_number, tx); self.handle_application_tx(block_number, tx);
@@ -348,15 +360,16 @@ impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
} }
} }
struct ScanTributaryTask<D: Db, TD: Db> { struct ScanTributaryTask<D: Db, TD: Db, P: P2p> {
db: D, db: D,
set: ValidatorSet, set: ValidatorSet,
validators: Vec<SeraiAddress>, validators: Vec<SeraiAddress>,
total_weight: u64, total_weight: u64,
validator_weights: HashMap<SeraiAddress, u64>, validator_weights: HashMap<SeraiAddress, u64>,
tributary: TributaryReader<TD, Transaction>, tributary: TributaryReader<TD, Transaction>,
_p2p: PhantomData<P>,
} }
impl<D: Db, TD: Db> ContinuallyRan for ScanTributaryTask<D, TD> { impl<D: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<D, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> { fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move { async move {
let (mut last_block_number, mut last_block_hash) = let (mut last_block_number, mut last_block_hash) =
@@ -386,6 +399,8 @@ impl<D: Db, TD: Db> ContinuallyRan for ScanTributaryTask<D, TD> {
let mut txn = self.db.txn(); let mut txn = self.db.txn();
(ScanBlock { (ScanBlock {
_db: PhantomData::<D>,
_p2p: PhantomData::<P>,
txn: &mut txn, txn: &mut txn,
set: self.set, set: self.set,
validators: &self.validators, validators: &self.validators,