5 Commits

Author SHA1 Message Date
Luke Parker
985261574c Add gossip behavior back to the coordinator 2025-01-03 14:00:20 -05:00
Luke Parker
3f3b0255f8 Tweak heartbeat task to run less often if there's no progress to be made 2025-01-03 13:59:14 -05:00
Luke Parker
5fc8500f8d Add task to heartbeat a tributary to the P2P code 2025-01-03 13:04:27 -05:00
Luke Parker
49c221cca2 Restore request-response code to the coordinator 2025-01-03 13:02:50 -05:00
Luke Parker
906e2fb669 Start cosigning on Cosign or Cosigned, not just on Cosigned 2025-01-03 10:30:39 -05:00
10 changed files with 426 additions and 26 deletions

1
Cargo.lock generated
View File

@@ -8328,6 +8328,7 @@ dependencies = [
"rand_core",
"schnorr-signatures",
"serai-client",
"serai-cosign",
"serai-db",
"serai-env",
"serai-message-queue",

View File

@@ -56,6 +56,8 @@ futures-util = { version = "0.3", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] }
serai-cosign = { path = "./cosign" }
[dev-dependencies]
tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] }
sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] }

View File

@@ -1,4 +1,5 @@
mod tributary;
mod p2p;
fn main() {
todo!("TODO")

View File

@@ -0,0 +1,70 @@
use core::time::Duration;
use blake2::{Digest, Blake2s256};
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::validator_sets::primitives::ValidatorSet;
use libp2p::gossipsub::{
IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentityTransform,
AllowAllSubscriptionFilter, Behaviour,
};
use serai_cosign::SignedCosign;
// Block size limit + 16 KB of space for signatures/metadata
const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
const LIBP2P_PROTOCOL: &str = "/serai/coordinator/gossip/1.0.0";
const BASE_TOPIC: &str = "/";
fn topic_for_set(set: ValidatorSet) -> IdentTopic {
IdentTopic::new(format!("/set/{}", hex::encode(set.encode())))
}
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) enum Message {
Tribuary { genesis: [u8; 32], message: Vec<u8> },
Cosign(SignedCosign),
}
pub(crate) type Behavior = Behaviour<IdentityTransform, AllowAllSubscriptionFilter>;
pub(crate) fn new_behavior() -> Behavior {
// The latency used by the Tendermint protocol, used here as the gossip epoch duration
// libp2p-rs defaults to 1 second, whereas ours will be ~2
let heartbeat_interval = tributary::tendermint::LATENCY_TIME;
// The amount of heartbeats which will occur within a single Tributary block
let heartbeats_per_block = tributary::tendermint::TARGET_BLOCK_TIME.div_ceil(heartbeat_interval);
// libp2p-rs defaults to 5, whereas ours will be ~8
let heartbeats_to_keep = 2 * heartbeats_per_block;
// libp2p-rs defaults to 3 whereas ours will be ~4
let heartbeats_to_gossip = heartbeats_per_block;
let config = ConfigBuilder::default()
.protocol_id_prefix(LIBP2P_PROTOCOL)
.history_length(usize::try_from(heartbeats_to_keep).unwrap())
.history_gossip(usize::try_from(heartbeats_to_gossip).unwrap())
.heartbeat_interval(Duration::from_millis(heartbeat_interval.into()))
.max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE)
.idle_timeout(KEEP_ALIVE_INTERVAL + Duration::from_secs(5))
.duplicate_cache_time(Duration::from_millis((heartbeats_to_keep * heartbeat_interval).into()))
.validation_mode(ValidationMode::Anonymous)
// Uses a content based message ID to avoid duplicates as much as possible
.message_id_fn(|msg| {
MessageId::new(&Blake2s256::digest([msg.topic.as_str().as_bytes(), &msg.data].concat()))
})
.build();
// TODO: Don't use IdentityTransform here. Authenticate using validator keys
let mut gossipsub = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap();
// Subscribe to the base topic
let topic = IdentTopic::new(BASE_TOPIC);
let _ = gossipsub.subscribe(&topic);
gossipsub
}

View File

@@ -0,0 +1,127 @@
use core::future::Future;
use std::time::{Duration, SystemTime};
use serai_client::validator_sets::primitives::ValidatorSet;
use tributary::{ReadWrite, Block, Tributary, TributaryReader};
use serai_db::*;
use serai_task::ContinuallyRan;
use crate::{
tributary::Transaction,
p2p::{
reqres::{Request, Response},
P2p,
},
};
// Amount of blocks in a minute
const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize;
// Maximum amount of blocks to send in a batch of blocks
pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
/// Sends a heartbeat to other validators on regular intervals informing them of our Tributary's
/// tip.
///
/// If the other validator has more blocks then we do, they're expected to inform us. This forms
/// the sync protocol for our Tributaries.
struct HeartbeatTask<TD: Db> {
set: ValidatorSet,
tributary: Tributary<TD, Transaction, P2p>,
reader: TributaryReader<TD, Transaction>,
p2p: P2p,
}
impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60);
let mut tip = self.reader.tip();
let time_since = {
let block_time = if let Some(time_of_block) = self.reader.time_of_block(&tip) {
SystemTime::UNIX_EPOCH + Duration::from_secs(time_of_block)
} else {
// If we couldn't fetch this block's time, assume it's old
// We don't want to declare its unix time as 0 and claim it's 50+ years old though
log::warn!(
"heartbeat task couldn't fetch the time of a block, flagging it as a minute old"
);
SystemTime::now() - TIME_TO_TRIGGER_SYNCING
};
SystemTime::now().duration_since(block_time).unwrap_or(Duration::ZERO)
};
let mut tip_is_stale = false;
let mut synced_block = false;
if TIME_TO_TRIGGER_SYNCING <= time_since {
log::warn!(
"last known tributary block for {:?} was {} seconds ago",
self.set,
time_since.as_secs()
);
// This requests all peers for this network, without differentiating by session
// This should be fine as most validators should overlap across sessions
'peer: for peer in self.p2p.peers(self.set.network).await {
loop {
// Create the request for blocks
if tip_is_stale {
tip = self.reader.tip();
tip_is_stale = false;
}
let request = Request::Heartbeat { set: self.set, latest_block_hash: tip };
let Ok(Response::Blocks(blocks)) = peer.send(request).await else { continue 'peer };
// This is the final batch if it has less than the maximum amount of blocks
// (signifying there weren't more blocks after this to fill the batch with)
let final_batch = blocks.len() < BLOCKS_PER_BATCH;
// Sync each block
for block_with_commit in blocks {
let Ok(block) = Block::read(&mut block_with_commit.block.as_slice()) else {
// TODO: Disconnect/slash this peer
log::warn!("received invalid Block inside response to heartbeat");
continue 'peer;
};
// Attempt to sync the block
if !self.tributary.sync_block(block, block_with_commit.commit).await {
// The block may be invalid or may simply be stale
continue 'peer;
}
// Because we synced a block, flag the tip as stale
tip_is_stale = true;
// And that we did sync a block
synced_block = true;
}
// If this was the final batch, move on from this peer
// We could assume they were honest and we are done syncing the chain, but this is a
// bit more robust
if final_batch {
continue 'peer;
}
}
}
// This will cause the tak to be run less and less often, ensuring we aren't spamming the
// net if we legitimately aren't making progress
if !synced_block {
Err(format!(
"tried to sync blocks for {:?} since we haven't seen one in {} seconds but didn't",
self.set,
time_since.as_secs(),
))?;
}
}
Ok(synced_block)
}
}
}

View File

@@ -0,0 +1,30 @@
use serai_client::primitives::NetworkId;
mod reqres;
use reqres::{Request, Response};
mod gossip;
mod heartbeat;
struct Peer;
impl Peer {
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> {
(async move { todo!("TODO") }).await
}
}
#[derive(Clone, Debug)]
struct P2p;
impl P2p {
async fn peers(&self, set: NetworkId) -> Vec<Peer> {
(async move { todo!("TODO") }).await
}
}
#[async_trait::async_trait]
impl tributary::P2p for P2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
todo!("TODO")
}
}

View File

@@ -0,0 +1,126 @@
use core::time::Duration;
use std::io::{self, Read};
use async_trait::async_trait;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::validator_sets::primitives::ValidatorSet;
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::request_response::{Codec as CodecTrait, Config, Behaviour, ProtocolSupport};
use serai_cosign::SignedCosign;
/// The maximum message size for the request-response protocol
// This is derived from the heartbeat message size as it's our largest message
const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
(tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024;
const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";
/// Requests which can be made via the request-response protocol.
#[derive(Clone, Copy, Debug, BorshSerialize, BorshDeserialize)]
pub(crate) enum Request {
/// A keep-alive to prevent our connections from being dropped.
KeepAlive,
/// A heartbeat informing our peers of our latest block, for the specified blockchain, on regular
/// intervals.
///
/// If our peers have more blocks than us, they're expected to respond with those blocks.
Heartbeat { set: ValidatorSet, latest_block_hash: [u8; 32] },
/// A request for the notable cosigns for a global session.
NotableCosigns { global_session: [u8; 32] },
}
/// A tributary block and its commit.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) struct TributaryBlockWithCommit {
pub(crate) block: Vec<u8>,
pub(crate) commit: Vec<u8>,
}
/// Responses which can be received via the request-response protocol.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) enum Response {
Blocks(Vec<TributaryBlockWithCommit>),
NotableCosigns(Vec<SignedCosign>),
}
/// The codec used for the request-response protocol.
///
/// We don't use CBOR or JSON, but use borsh to create `Vec<u8>`s we then length-prefix. While
/// ideally, we'd use borsh directly with the `io` traits defined here, they're async and there
/// isn't an amenable API within borsh for incremental deserialization.
#[derive(Default, Clone, Copy, Debug)]
struct Codec;
impl Codec {
async fn read<M: BorshDeserialize>(io: &mut (impl Unpin + AsyncRead)) -> io::Result<M> {
let mut len = [0; 4];
io.read_exact(&mut len).await?;
let len = usize::try_from(u32::from_le_bytes(len)).expect("not at least a 32-bit platform?");
if len > MAX_LIBP2P_REQRES_MESSAGE_SIZE {
Err(io::Error::other("request length exceeded MAX_LIBP2P_REQRES_MESSAGE_SIZE"))?;
}
// This may be a non-trivial allocation easily causable
// While we could chunk the read, meaning we only perform the allocation as bandwidth is used,
// the max message size should be sufficiently sane
let mut buf = vec![0; len];
io.read_exact(&mut buf).await?;
let mut buf = buf.as_slice();
let res = M::deserialize(&mut buf)?;
if !buf.is_empty() {
Err(io::Error::other("p2p message had extra data appended to it"))?;
}
Ok(res)
}
async fn write(io: &mut (impl Unpin + AsyncWrite), msg: &impl BorshSerialize) -> io::Result<()> {
let msg = borsh::to_vec(msg).unwrap();
io.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await?;
io.write_all(&msg).await
}
}
#[async_trait]
impl CodecTrait for Codec {
type Protocol = &'static str;
type Request = Request;
type Response = Response;
async fn read_request<R: Send + Unpin + AsyncRead>(
&mut self,
_: &Self::Protocol,
io: &mut R,
) -> io::Result<Request> {
Self::read(io).await
}
async fn read_response<R: Send + Unpin + AsyncRead>(
&mut self,
proto: &Self::Protocol,
io: &mut R,
) -> io::Result<Response> {
Self::read(io).await
}
async fn write_request<W: Send + Unpin + AsyncWrite>(
&mut self,
_: &Self::Protocol,
io: &mut W,
req: Request,
) -> io::Result<()> {
Self::write(io, &req).await
}
async fn write_response<W: Send + Unpin + AsyncWrite>(
&mut self,
proto: &Self::Protocol,
io: &mut W,
res: Response,
) -> io::Result<()> {
Self::write(io, &res).await
}
}
pub(crate) type Behavior = Behaviour<Codec>;
pub(crate) fn new_behavior() -> Behavior {
let mut config = Config::default();
config.set_request_timeout(Duration::from_secs(5));
Behavior::new([(PROTOCOL, ProtocolSupport::Full)], config)
}

View File

@@ -189,6 +189,8 @@ create_db!(
// The latest Substrate block to cosign.
LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32],
// If we're actively cosigning or not.
ActivelyCosigning: (set: ValidatorSet) -> (),
// The weight accumulated for a topic.
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
@@ -236,6 +238,33 @@ impl TributaryDb {
) {
LatestSubstrateBlockToCosign::set(txn, set, &substrate_block_hash);
}
pub(crate) fn actively_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) -> bool {
ActivelyCosigning::get(txn, set).is_some()
}
pub(crate) fn start_cosigning(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_number: u64,
) {
assert!(
ActivelyCosigning::get(txn, set).is_none(),
"starting cosigning while already cosigning"
);
ActivelyCosigning::set(txn, set, &());
TributaryDb::recognize_topic(
txn,
set,
Topic::Sign {
id: VariantSignId::Cosign(substrate_block_number),
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
}
pub(crate) fn finish_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) {
assert!(ActivelyCosigning::take(txn, set).is_some(), "finished cosigning but not cosigning");
}
pub(crate) fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) {
AccumulatedWeight::set(txn, set, topic, &0);

View File

@@ -36,6 +36,34 @@ struct ScanBlock<'a, D: DbTxn, TD: Db> {
tributary: &'a TributaryReader<TD, Transaction>,
}
impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.txn, self.set) {
return;
}
// Start cosigning the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set)
else {
return;
};
let substrate_block_number = todo!("TODO");
// Mark us as actively cosigning
TributaryDb::start_cosigning(self.txn, self.set, substrate_block_number);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.txn,
self.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session,
block_number: substrate_block_number,
block: latest_substrate_block_to_cosign,
},
);
}
fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) {
let signer = |signed: Signed| SeraiAddress(signed.signer.to_bytes());
@@ -105,41 +133,25 @@ impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
Transaction::Cosign { substrate_block_hash } => {
// Update the latest intended-to-be-cosigned Substrate block
TributaryDb::set_latest_substrate_block_to_cosign(self.txn, self.set, substrate_block_hash);
// TODO: If we aren't currently cosigning a block, start cosigning this one
// Start a new cosign if we weren't already working on one
self.potentially_start_cosign();
}
Transaction::Cosigned { substrate_block_hash } => {
// Start cosigning the latest intended-to-be-cosigned block
TributaryDb::finish_cosigning(self.txn, self.set);
// Fetch the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set)
else {
return;
};
// If this is the block we just cosigned, return
// If this is the block we just cosigned, return, preventing us from signing it again
if latest_substrate_block_to_cosign == substrate_block_hash {
return;
}
let substrate_block_number = todo!("TODO");
// Whitelist the topic
TributaryDb::recognize_topic(
self.txn,
self.set,
Topic::Sign {
id: VariantSignId::Cosign(substrate_block_number),
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.txn,
self.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session,
block_number: substrate_block_number,
block: substrate_block_hash,
},
);
// Since we do have a new cosign to work on, start it
self.potentially_start_cosign();
}
Transaction::SubstrateBlock { hash } => {
// Whitelist all of the IDs this Substrate block causes to be signed

View File

@@ -43,12 +43,14 @@ impl SigningProtocolRound {
}
}
/// `tributary::Signed` without the nonce.
/// `tributary::Signed` but without the nonce.
///
/// All of our nonces are deterministic to the type of transaction and fields within.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Signed {
/// The signer.
pub signer: <Ristretto as Ciphersuite>::G,
/// The signature.
pub signature: SchnorrSignature<Ristretto>,
}