mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-11 21:49:26 +00:00
Fix tendermint chain sync (#581)
* fix p2p Reqres protocol * stabilize tributary chain sync * fix pr comments
This commit is contained in:
@@ -9,7 +9,7 @@ use std::{
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use rand_core::{RngCore, OsRng};
|
use rand_core::{RngCore, OsRng};
|
||||||
|
|
||||||
use scale::Encode;
|
use scale::{Decode, Encode};
|
||||||
use borsh::{BorshSerialize, BorshDeserialize};
|
use borsh::{BorshSerialize, BorshDeserialize};
|
||||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai};
|
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Serai};
|
||||||
|
|
||||||
@@ -29,7 +29,7 @@ use libp2p::{
|
|||||||
noise, yamux,
|
noise, yamux,
|
||||||
request_response::{
|
request_response::{
|
||||||
Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig,
|
Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig,
|
||||||
Behaviour as RrBehavior,
|
Behaviour as RrBehavior, ProtocolSupport,
|
||||||
},
|
},
|
||||||
gossipsub::{
|
gossipsub::{
|
||||||
IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
|
IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
|
||||||
@@ -45,9 +45,20 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p};
|
|||||||
use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
|
use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
|
||||||
|
|
||||||
// Block size limit + 1 KB of space for signatures/metadata
|
// Block size limit + 1 KB of space for signatures/metadata
|
||||||
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
|
const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
|
||||||
|
|
||||||
|
const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
|
||||||
|
(tributary::BLOCK_SIZE_LIMIT * BLOCKS_PER_BATCH) + 1024;
|
||||||
|
|
||||||
const LIBP2P_TOPIC: &str = "serai-coordinator";
|
const LIBP2P_TOPIC: &str = "serai-coordinator";
|
||||||
|
|
||||||
|
// Amount of blocks in a minute
|
||||||
|
// We can't use tendermint::TARGET_BLOCK_TIME here to calculate this since that is a u32.
|
||||||
|
const BLOCKS_PER_MINUTE: usize = 10;
|
||||||
|
|
||||||
|
// Maximum amount of blocks to send in a batch
|
||||||
|
const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
|
||||||
|
|
||||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
|
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
|
||||||
pub struct CosignedBlock {
|
pub struct CosignedBlock {
|
||||||
pub network: NetworkId,
|
pub network: NetworkId,
|
||||||
@@ -173,6 +184,18 @@ pub struct Message<P: P2p> {
|
|||||||
pub msg: Vec<u8>,
|
pub msg: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Encode, Decode)]
|
||||||
|
pub struct BlockCommit {
|
||||||
|
pub block: Vec<u8>,
|
||||||
|
pub commit: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Encode, Decode)]
|
||||||
|
pub struct HeartbeatBatch {
|
||||||
|
pub blocks: Vec<BlockCommit>,
|
||||||
|
pub timestamp: u64,
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
|
pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
|
||||||
type Id: Send + Sync + Clone + Copy + fmt::Debug;
|
type Id: Send + Sync + Clone + Copy + fmt::Debug;
|
||||||
@@ -228,8 +251,8 @@ impl RrCodecTrait for RrCodec {
|
|||||||
let mut len = [0; 4];
|
let mut len = [0; 4];
|
||||||
io.read_exact(&mut len).await?;
|
io.read_exact(&mut len).await?;
|
||||||
let len = usize::try_from(u32::from_le_bytes(len)).expect("not a 32-bit platform?");
|
let len = usize::try_from(u32::from_le_bytes(len)).expect("not a 32-bit platform?");
|
||||||
if len > MAX_LIBP2P_MESSAGE_SIZE {
|
if len > MAX_LIBP2P_REQRES_MESSAGE_SIZE {
|
||||||
Err(io::Error::other("request length exceeded MAX_LIBP2P_MESSAGE_SIZE"))?;
|
Err(io::Error::other("request length exceeded MAX_LIBP2P_REQRES_MESSAGE_SIZE"))?;
|
||||||
}
|
}
|
||||||
// This may be a non-trivial allocation easily causable
|
// This may be a non-trivial allocation easily causable
|
||||||
// While we could chunk the read, meaning we only perform the allocation as bandwidth is used,
|
// While we could chunk the read, meaning we only perform the allocation as bandwidth is used,
|
||||||
@@ -297,7 +320,7 @@ impl LibP2p {
|
|||||||
let throwaway_key_pair = Keypair::generate_ed25519();
|
let throwaway_key_pair = Keypair::generate_ed25519();
|
||||||
|
|
||||||
let behavior = Behavior {
|
let behavior = Behavior {
|
||||||
reqres: { RrBehavior::new([], RrConfig::default()) },
|
reqres: { RrBehavior::new([("/coordinator", ProtocolSupport::Full)], RrConfig::default()) },
|
||||||
gossipsub: {
|
gossipsub: {
|
||||||
let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2;
|
let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2;
|
||||||
let heartbeats_per_block =
|
let heartbeats_per_block =
|
||||||
@@ -308,7 +331,7 @@ impl LibP2p {
|
|||||||
.heartbeat_interval(Duration::from_millis(heartbeat_interval.into()))
|
.heartbeat_interval(Duration::from_millis(heartbeat_interval.into()))
|
||||||
.history_length(heartbeats_per_block * 2)
|
.history_length(heartbeats_per_block * 2)
|
||||||
.history_gossip(heartbeats_per_block)
|
.history_gossip(heartbeats_per_block)
|
||||||
.max_transmit_size(MAX_LIBP2P_MESSAGE_SIZE)
|
.max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE)
|
||||||
// We send KeepAlive after 80s
|
// We send KeepAlive after 80s
|
||||||
.idle_timeout(Duration::from_secs(85))
|
.idle_timeout(Duration::from_secs(85))
|
||||||
.validation_mode(ValidationMode::Strict)
|
.validation_mode(ValidationMode::Strict)
|
||||||
@@ -348,10 +371,11 @@ impl LibP2p {
|
|||||||
.with_tcp(TcpConfig::default().nodelay(true), noise::Config::new, || {
|
.with_tcp(TcpConfig::default().nodelay(true), noise::Config::new, || {
|
||||||
let mut config = yamux::Config::default();
|
let mut config = yamux::Config::default();
|
||||||
// 1 MiB default + max message size
|
// 1 MiB default + max message size
|
||||||
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
|
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_REQRES_MESSAGE_SIZE);
|
||||||
// 256 KiB default + max message size
|
// 256 KiB default + max message size
|
||||||
config
|
config.set_receive_window_size(
|
||||||
.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
|
((256 * 1024) + MAX_LIBP2P_REQRES_MESSAGE_SIZE).try_into().unwrap(),
|
||||||
|
);
|
||||||
config
|
config
|
||||||
})
|
})
|
||||||
.unwrap()
|
.unwrap()
|
||||||
@@ -868,7 +892,7 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
|||||||
let p2p = p2p.clone();
|
let p2p = p2p.clone();
|
||||||
async move {
|
async move {
|
||||||
loop {
|
loop {
|
||||||
let Some(mut msg) = recv.recv().await else {
|
let Some(msg) = recv.recv().await else {
|
||||||
// Channel closure happens when the tributary retires
|
// Channel closure happens when the tributary retires
|
||||||
break;
|
break;
|
||||||
};
|
};
|
||||||
@@ -913,35 +937,54 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
|||||||
latest = next;
|
latest = next;
|
||||||
}
|
}
|
||||||
if to_send.len() > 3 {
|
if to_send.len() > 3 {
|
||||||
for next in to_send {
|
// prepare the batch to sends
|
||||||
let mut res = reader.block(&next).unwrap().serialize();
|
let mut blocks = vec![];
|
||||||
res.extend(reader.commit(&next).unwrap());
|
for (i, next) in to_send.iter().enumerate() {
|
||||||
// Also include the timestamp used within the Heartbeat
|
if i >= BLOCKS_PER_BATCH {
|
||||||
res.extend(&msg.msg[32 .. 40]);
|
break;
|
||||||
p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blocks.push(BlockCommit {
|
||||||
|
block: reader.block(next).unwrap().serialize(),
|
||||||
|
commit: reader.commit(next).unwrap(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
let batch = HeartbeatBatch { blocks, timestamp: msg_time };
|
||||||
|
|
||||||
|
p2p
|
||||||
|
.send(msg.sender, ReqResMessageKind::Block(genesis), batch.encode())
|
||||||
|
.await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => {
|
P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => {
|
||||||
assert_eq!(msg_genesis, genesis);
|
assert_eq!(msg_genesis, genesis);
|
||||||
let mut msg_ref: &[u8] = msg.msg.as_ref();
|
// decode the batch
|
||||||
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
|
let Ok(batch) = HeartbeatBatch::decode(&mut msg.msg.as_ref()) else {
|
||||||
|
log::error!(
|
||||||
|
"received HeartBeatBatch message with an invalidly serialized batch"
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
|
||||||
|
// sync blocks
|
||||||
|
for bc in batch.blocks {
|
||||||
|
// TODO: why do we use ReadWrite instead of Encode/Decode for blocks?
|
||||||
|
// Should we use the same for batches so we can read both at the same time?
|
||||||
|
let Ok(block) = Block::<Transaction>::read(&mut bc.block.as_slice()) else {
|
||||||
log::error!("received block message with an invalidly serialized block");
|
log::error!("received block message with an invalidly serialized block");
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
// Get just the commit
|
|
||||||
msg.msg.drain(.. (msg.msg.len() - msg_ref.len()));
|
|
||||||
msg.msg.drain((msg.msg.len() - 8) ..);
|
|
||||||
|
|
||||||
let res = tributary.tributary.sync_block(block, msg.msg).await;
|
let res = tributary.tributary.sync_block(block, bc.commit).await;
|
||||||
log::debug!(
|
log::debug!(
|
||||||
"received block from {:?}, sync_block returned {}",
|
"received block from {:?}, sync_block returned {}",
|
||||||
msg.sender,
|
msg.sender,
|
||||||
res
|
res
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => {
|
P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => {
|
||||||
assert_eq!(msg_genesis, genesis);
|
assert_eq!(msg_genesis, genesis);
|
||||||
|
|||||||
Reference in New Issue
Block a user