stabilize tributary chain sync

This commit is contained in:
akildemir
2024-07-15 16:42:25 +03:00
parent d3d3bdc828
commit 1404446cd7

View File

@@ -9,7 +9,7 @@ use std::{
use async_trait::async_trait;
use rand_core::{RngCore, OsRng};
use scale::Encode;
use scale::{Decode, Encode};
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai};
@@ -48,6 +48,9 @@ use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
const LIBP2P_TOPIC: &str = "serai-coordinator";
// amount of blocks in a minute
const BLOCKS_PER_MINUTE: u32 = 60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000);
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
pub struct CosignedBlock {
pub network: NetworkId,
@@ -173,6 +176,18 @@ pub struct Message<P: P2p> {
pub msg: Vec<u8>,
}
#[derive(Clone, Debug, Encode, Decode)]
pub struct BlockCommit {
pub block: Vec<u8>,
pub commit: Vec<u8>,
}
#[derive(Clone, Debug, Encode, Decode)]
pub struct HeartbeatBatch {
pub blocks: Vec<BlockCommit>,
pub timestamp: u64,
}
#[async_trait]
pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
type Id: Send + Sync + Clone + Copy + fmt::Debug;
@@ -868,7 +883,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
let p2p = p2p.clone();
async move {
loop {
let Some(mut msg) = recv.recv().await else {
let Some(msg) = recv.recv().await else {
// Channel closure happens when the tributary retires
break;
};
@@ -913,35 +928,55 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
latest = next;
}
if to_send.len() > 3 {
for next in to_send {
let mut res = reader.block(&next).unwrap().serialize();
res.extend(reader.commit(&next).unwrap());
// Also include the timestamp used within the Heartbeat
res.extend(&msg.msg[32 .. 40]);
p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await;
// prepare the batch to sends
let mut blocks = vec![];
for (i, next) in to_send.iter().enumerate() {
// each batch contains BLOCKS_PER_MINUTE + 1 blocks
if i >= usize::try_from(BLOCKS_PER_MINUTE).unwrap() + 1 {
break;
}
blocks.push(BlockCommit {
block: reader.block(&next).unwrap().serialize(),
commit: reader.commit(&next).unwrap(),
});
}
let batch = HeartbeatBatch { blocks, timestamp: msg_time };
p2p
.send(msg.sender, ReqResMessageKind::Block(genesis), batch.encode())
.await;
}
});
}
P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => {
assert_eq!(msg_genesis, genesis);
let mut msg_ref: &[u8] = msg.msg.as_ref();
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
// decode the batch
let Ok(batch) = HeartbeatBatch::decode(&mut msg.msg.as_ref()) else {
log::error!(
"received HeartBeatBatch message with an invalidly serialized batch"
);
continue;
};
// sync blocks
for bc in batch.blocks {
// TODO: why do we use ReadWrite instead of Encode/Decode for blocks?
// Should we use the same for batches so we can read both at the same time?
let Ok(block) = Block::<Transaction>::read(&mut bc.block.as_slice()) else {
log::error!("received block message with an invalidly serialized block");
continue;
};
// Get just the commit
msg.msg.drain(.. (msg.msg.len() - msg_ref.len()));
msg.msg.drain((msg.msg.len() - 8) ..);
let res = tributary.tributary.sync_block(block, msg.msg).await;
let res = tributary.tributary.sync_block(block, bc.commit).await;
log::debug!(
"received block from {:?}, sync_block returned {}",
msg.sender,
res
);
}
}
P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => {
assert_eq!(msg_genesis, genesis);