mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 04:09:23 +00:00
Compare commits
31 Commits
2ffdd2a01d
...
testnet-2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
2ba6d77ee7 | ||
|
|
67a0ff825b | ||
|
|
6518379981 | ||
|
|
0c6ab50e35 | ||
|
|
f73ce37e18 | ||
|
|
973dcf065e | ||
|
|
8f5aaa8492 | ||
|
|
93ba8d840a | ||
|
|
485e454680 | ||
|
|
c3b6abf020 | ||
|
|
f3ccf1cab0 | ||
|
|
0deee0ec6b | ||
|
|
6b428948d4 | ||
|
|
6986257d4f | ||
|
|
a3c37cba21 | ||
|
|
b5f2ff1397 | ||
|
|
c84931c6ae | ||
|
|
63abf2d022 | ||
|
|
a62d2d05ad | ||
|
|
967cc16748 | ||
|
|
ab4b8cc2d5 | ||
|
|
387ccbad3a | ||
|
|
26cdfdd824 | ||
|
|
68e77384ac | ||
|
|
68da88c1f3 | ||
|
|
2b481ab71e | ||
|
|
05e6d81948 | ||
|
|
e426cd00bd | ||
|
|
09e3881b7d | ||
|
|
10124ac4a8 | ||
|
|
1987983f88 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -7613,6 +7613,7 @@ dependencies = [
|
||||
"futures-util",
|
||||
"hex",
|
||||
"jsonrpsee",
|
||||
"libp2p",
|
||||
"pallet-transaction-payment-rpc",
|
||||
"rand_core",
|
||||
"sc-authority-discovery",
|
||||
|
||||
@@ -51,7 +51,7 @@ env_logger = { version = "0.10", default-features = false, features = ["humantim
|
||||
|
||||
futures-util = { version = "0.3", default-features = false, features = ["std"] }
|
||||
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
|
||||
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "gossipsub", "macros"] }
|
||||
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] }
|
||||
|
||||
@@ -22,7 +22,7 @@ use serai_db::{Get, DbTxn, Db, create_db};
|
||||
use processor_messages::coordinator::cosign_block_msg;
|
||||
|
||||
use crate::{
|
||||
p2p::{CosignedBlock, P2pMessageKind, P2p},
|
||||
p2p::{CosignedBlock, GossipMessageKind, P2p},
|
||||
substrate::LatestCosignedBlock,
|
||||
};
|
||||
|
||||
@@ -323,7 +323,7 @@ impl<D: Db> CosignEvaluator<D> {
|
||||
for cosign in cosigns {
|
||||
let mut buf = vec![];
|
||||
cosign.serialize(&mut buf).unwrap();
|
||||
P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, buf).await;
|
||||
P2p::broadcast(&p2p, GossipMessageKind::CosignedBlock, buf).await;
|
||||
}
|
||||
sleep(Duration::from_secs(60)).await;
|
||||
}
|
||||
|
||||
@@ -260,7 +260,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
|
||||
cosign_channel.send(cosigned_block).unwrap();
|
||||
let mut buf = vec![];
|
||||
cosigned_block.serialize(&mut buf).unwrap();
|
||||
P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, buf).await;
|
||||
P2p::broadcast(p2p, GossipMessageKind::CosignedBlock, buf).await;
|
||||
None
|
||||
}
|
||||
// This causes an action on Substrate yet not on any Tributary
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use core::{time::Duration, fmt};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
io::Read,
|
||||
collections::HashMap,
|
||||
io::{self, Read},
|
||||
collections::{HashSet, HashMap},
|
||||
time::{SystemTime, Instant},
|
||||
};
|
||||
|
||||
@@ -15,7 +15,7 @@ use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorS
|
||||
|
||||
use serai_db::Db;
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt};
|
||||
use tokio::{
|
||||
sync::{Mutex, RwLock, mpsc, broadcast},
|
||||
time::sleep,
|
||||
@@ -27,12 +27,16 @@ use libp2p::{
|
||||
PeerId,
|
||||
tcp::Config as TcpConfig,
|
||||
noise, yamux,
|
||||
request_response::{
|
||||
Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig,
|
||||
Behaviour as RrBehavior,
|
||||
},
|
||||
gossipsub::{
|
||||
IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
|
||||
IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError,
|
||||
Behaviour as GsBehavior,
|
||||
},
|
||||
swarm::{NetworkBehaviour, SwarmEvent, Swarm},
|
||||
swarm::{NetworkBehaviour, SwarmEvent},
|
||||
SwarmBuilder,
|
||||
};
|
||||
|
||||
@@ -40,6 +44,8 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p};
|
||||
|
||||
use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
|
||||
|
||||
// Block size limit + 1 KB of space for signatures/metadata
|
||||
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
|
||||
const LIBP2P_TOPIC: &str = "serai-coordinator";
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
|
||||
@@ -51,72 +57,113 @@ pub struct CosignedBlock {
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
||||
pub enum P2pMessageKind {
|
||||
pub enum ReqResMessageKind {
|
||||
KeepAlive,
|
||||
Tributary([u8; 32]),
|
||||
Heartbeat([u8; 32]),
|
||||
Block([u8; 32]),
|
||||
}
|
||||
|
||||
impl ReqResMessageKind {
|
||||
pub fn read<R: Read>(reader: &mut R) -> Option<ReqResMessageKind> {
|
||||
let mut kind = [0; 1];
|
||||
reader.read_exact(&mut kind).ok()?;
|
||||
match kind[0] {
|
||||
0 => Some(ReqResMessageKind::KeepAlive),
|
||||
1 => Some({
|
||||
let mut genesis = [0; 32];
|
||||
reader.read_exact(&mut genesis).ok()?;
|
||||
ReqResMessageKind::Heartbeat(genesis)
|
||||
}),
|
||||
2 => Some({
|
||||
let mut genesis = [0; 32];
|
||||
reader.read_exact(&mut genesis).ok()?;
|
||||
ReqResMessageKind::Block(genesis)
|
||||
}),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize(&self) -> Vec<u8> {
|
||||
match self {
|
||||
ReqResMessageKind::KeepAlive => vec![0],
|
||||
ReqResMessageKind::Heartbeat(genesis) => {
|
||||
let mut res = vec![1];
|
||||
res.extend(genesis);
|
||||
res
|
||||
}
|
||||
ReqResMessageKind::Block(genesis) => {
|
||||
let mut res = vec![2];
|
||||
res.extend(genesis);
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
||||
pub enum GossipMessageKind {
|
||||
Tributary([u8; 32]),
|
||||
CosignedBlock,
|
||||
}
|
||||
|
||||
impl GossipMessageKind {
|
||||
pub fn read<R: Read>(reader: &mut R) -> Option<GossipMessageKind> {
|
||||
let mut kind = [0; 1];
|
||||
reader.read_exact(&mut kind).ok()?;
|
||||
match kind[0] {
|
||||
0 => Some({
|
||||
let mut genesis = [0; 32];
|
||||
reader.read_exact(&mut genesis).ok()?;
|
||||
GossipMessageKind::Tributary(genesis)
|
||||
}),
|
||||
1 => Some(GossipMessageKind::CosignedBlock),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn serialize(&self) -> Vec<u8> {
|
||||
match self {
|
||||
GossipMessageKind::Tributary(genesis) => {
|
||||
let mut res = vec![0];
|
||||
res.extend(genesis);
|
||||
res
|
||||
}
|
||||
GossipMessageKind::CosignedBlock => {
|
||||
vec![1]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
|
||||
pub enum P2pMessageKind {
|
||||
ReqRes(ReqResMessageKind),
|
||||
Gossip(GossipMessageKind),
|
||||
}
|
||||
|
||||
impl P2pMessageKind {
|
||||
fn genesis(&self) -> Option<[u8; 32]> {
|
||||
match self {
|
||||
P2pMessageKind::KeepAlive | P2pMessageKind::CosignedBlock => None,
|
||||
P2pMessageKind::Tributary(genesis) |
|
||||
P2pMessageKind::Heartbeat(genesis) |
|
||||
P2pMessageKind::Block(genesis) => Some(*genesis),
|
||||
}
|
||||
}
|
||||
|
||||
fn serialize(&self) -> Vec<u8> {
|
||||
match self {
|
||||
P2pMessageKind::KeepAlive => vec![0],
|
||||
P2pMessageKind::Tributary(genesis) => {
|
||||
let mut res = vec![1];
|
||||
res.extend(genesis);
|
||||
res
|
||||
}
|
||||
P2pMessageKind::Heartbeat(genesis) => {
|
||||
let mut res = vec![2];
|
||||
res.extend(genesis);
|
||||
res
|
||||
}
|
||||
P2pMessageKind::Block(genesis) => {
|
||||
let mut res = vec![3];
|
||||
res.extend(genesis);
|
||||
res
|
||||
}
|
||||
P2pMessageKind::CosignedBlock => {
|
||||
vec![4]
|
||||
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) |
|
||||
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => None,
|
||||
P2pMessageKind::ReqRes(
|
||||
ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis),
|
||||
) |
|
||||
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => Some(*genesis),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn read<R: Read>(reader: &mut R) -> Option<P2pMessageKind> {
|
||||
let mut kind = [0; 1];
|
||||
reader.read_exact(&mut kind).ok()?;
|
||||
match kind[0] {
|
||||
0 => Some(P2pMessageKind::KeepAlive),
|
||||
1 => Some({
|
||||
let mut genesis = [0; 32];
|
||||
reader.read_exact(&mut genesis).ok()?;
|
||||
P2pMessageKind::Tributary(genesis)
|
||||
}),
|
||||
2 => Some({
|
||||
let mut genesis = [0; 32];
|
||||
reader.read_exact(&mut genesis).ok()?;
|
||||
P2pMessageKind::Heartbeat(genesis)
|
||||
}),
|
||||
3 => Some({
|
||||
let mut genesis = [0; 32];
|
||||
reader.read_exact(&mut genesis).ok()?;
|
||||
P2pMessageKind::Block(genesis)
|
||||
}),
|
||||
4 => Some(P2pMessageKind::CosignedBlock),
|
||||
_ => None,
|
||||
impl From<ReqResMessageKind> for P2pMessageKind {
|
||||
fn from(kind: ReqResMessageKind) -> P2pMessageKind {
|
||||
P2pMessageKind::ReqRes(kind)
|
||||
}
|
||||
}
|
||||
|
||||
impl From<GossipMessageKind> for P2pMessageKind {
|
||||
fn from(kind: GossipMessageKind) -> P2pMessageKind {
|
||||
P2pMessageKind::Gossip(kind)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
@@ -133,17 +180,21 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
|
||||
async fn subscribe(&self, set: ValidatorSet, genesis: [u8; 32]);
|
||||
async fn unsubscribe(&self, set: ValidatorSet, genesis: [u8; 32]);
|
||||
|
||||
async fn send_raw(&self, to: Self::Id, genesis: Option<[u8; 32]>, msg: Vec<u8>);
|
||||
async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec<u8>);
|
||||
async fn receive_raw(&self) -> (Self::Id, Vec<u8>);
|
||||
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>);
|
||||
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>);
|
||||
async fn receive(&self) -> Message<Self>;
|
||||
|
||||
async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec<u8>) {
|
||||
async fn send(&self, to: Self::Id, kind: ReqResMessageKind, msg: Vec<u8>) {
|
||||
let mut actual_msg = kind.serialize();
|
||||
actual_msg.extend(msg);
|
||||
self.send_raw(to, kind.genesis(), actual_msg).await;
|
||||
self.send_raw(to, actual_msg).await;
|
||||
}
|
||||
async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>) {
|
||||
let mut actual_msg = kind.serialize();
|
||||
async fn broadcast(&self, kind: impl Send + Into<P2pMessageKind>, msg: Vec<u8>) {
|
||||
let kind = kind.into();
|
||||
let mut actual_msg = match kind {
|
||||
P2pMessageKind::ReqRes(kind) => kind.serialize(),
|
||||
P2pMessageKind::Gossip(kind) => kind.serialize(),
|
||||
};
|
||||
actual_msg.extend(msg);
|
||||
/*
|
||||
log::trace!(
|
||||
@@ -157,41 +208,70 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
|
||||
}
|
||||
);
|
||||
*/
|
||||
self.broadcast_raw(kind.genesis(), actual_msg).await;
|
||||
self.broadcast_raw(kind, actual_msg).await;
|
||||
}
|
||||
async fn receive(&self) -> Message<Self> {
|
||||
let (sender, kind, msg) = loop {
|
||||
let (sender, msg) = self.receive_raw().await;
|
||||
if msg.is_empty() {
|
||||
log::error!("empty p2p message from {sender:?}");
|
||||
continue;
|
||||
}
|
||||
|
||||
let mut msg_ref = msg.as_ref();
|
||||
let Some(kind) = P2pMessageKind::read::<&[u8]>(&mut msg_ref) else {
|
||||
log::error!("invalid p2p message kind from {sender:?}");
|
||||
continue;
|
||||
};
|
||||
break (sender, kind, msg_ref.to_vec());
|
||||
};
|
||||
/*
|
||||
log::trace!(
|
||||
"received p2p message (kind {})",
|
||||
match kind {
|
||||
P2pMessageKind::KeepAlive => "KeepAlive".to_string(),
|
||||
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
|
||||
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
|
||||
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
|
||||
P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(),
|
||||
#[derive(Default, Clone, Copy, PartialEq, Eq, Debug)]
|
||||
struct RrCodec;
|
||||
#[async_trait]
|
||||
impl RrCodecTrait for RrCodec {
|
||||
type Protocol = &'static str;
|
||||
type Request = Vec<u8>;
|
||||
type Response = Vec<u8>;
|
||||
|
||||
async fn read_request<R: Send + Unpin + AsyncRead>(
|
||||
&mut self,
|
||||
_: &Self::Protocol,
|
||||
io: &mut R,
|
||||
) -> io::Result<Vec<u8>> {
|
||||
let mut len = [0; 4];
|
||||
io.read_exact(&mut len).await?;
|
||||
let len = usize::try_from(u32::from_le_bytes(len)).expect("not a 32-bit platform?");
|
||||
if len > MAX_LIBP2P_MESSAGE_SIZE {
|
||||
Err(io::Error::other("request length exceeded MAX_LIBP2P_MESSAGE_SIZE"))?;
|
||||
}
|
||||
);
|
||||
*/
|
||||
Message { sender, kind, msg }
|
||||
// This may be a non-trivial allocation easily causable
|
||||
// While we could chunk the read, meaning we only perform the allocation as bandwidth is used,
|
||||
// the max message size should be sufficiently sane
|
||||
let mut buf = vec![0; len];
|
||||
io.read_exact(&mut buf).await?;
|
||||
Ok(buf)
|
||||
}
|
||||
async fn read_response<R: Send + Unpin + AsyncRead>(
|
||||
&mut self,
|
||||
proto: &Self::Protocol,
|
||||
io: &mut R,
|
||||
) -> io::Result<Vec<u8>> {
|
||||
self.read_request(proto, io).await
|
||||
}
|
||||
async fn write_request<W: Send + Unpin + AsyncWrite>(
|
||||
&mut self,
|
||||
_: &Self::Protocol,
|
||||
io: &mut W,
|
||||
req: Vec<u8>,
|
||||
) -> io::Result<()> {
|
||||
io.write_all(
|
||||
&u32::try_from(req.len())
|
||||
.map_err(|_| io::Error::other("request length exceeded 2**32"))?
|
||||
.to_le_bytes(),
|
||||
)
|
||||
.await?;
|
||||
io.write_all(&req).await
|
||||
}
|
||||
async fn write_response<W: Send + Unpin + AsyncWrite>(
|
||||
&mut self,
|
||||
proto: &Self::Protocol,
|
||||
io: &mut W,
|
||||
res: Vec<u8>,
|
||||
) -> io::Result<()> {
|
||||
self.write_request(proto, io, res).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(NetworkBehaviour)]
|
||||
struct Behavior {
|
||||
reqres: RrBehavior<RrCodec>,
|
||||
gossipsub: GsBehavior,
|
||||
}
|
||||
|
||||
@@ -199,8 +279,9 @@ struct Behavior {
|
||||
#[derive(Clone)]
|
||||
pub struct LibP2p {
|
||||
subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, ValidatorSet, [u8; 32])>>>,
|
||||
broadcast: Arc<Mutex<mpsc::UnboundedSender<(Option<[u8; 32]>, Vec<u8>)>>>,
|
||||
receive: Arc<Mutex<mpsc::UnboundedReceiver<(PeerId, Vec<u8>)>>>,
|
||||
send: Arc<Mutex<mpsc::UnboundedSender<(PeerId, Vec<u8>)>>>,
|
||||
broadcast: Arc<Mutex<mpsc::UnboundedSender<(P2pMessageKind, Vec<u8>)>>>,
|
||||
receive: Arc<Mutex<mpsc::UnboundedReceiver<Message<Self>>>>,
|
||||
}
|
||||
impl fmt::Debug for LibP2p {
|
||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
@@ -211,14 +292,12 @@ impl fmt::Debug for LibP2p {
|
||||
impl LibP2p {
|
||||
#[allow(clippy::new_without_default)]
|
||||
pub fn new(serai: Arc<Serai>) -> Self {
|
||||
// Block size limit + 1 KB of space for signatures/metadata
|
||||
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
|
||||
|
||||
log::info!("creating a libp2p instance");
|
||||
|
||||
let throwaway_key_pair = Keypair::generate_ed25519();
|
||||
|
||||
let behavior = Behavior {
|
||||
reqres: { RrBehavior::new([], RrConfig::default()) },
|
||||
gossipsub: {
|
||||
let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2;
|
||||
let heartbeats_per_block =
|
||||
@@ -279,9 +358,10 @@ impl LibP2p {
|
||||
.with_behaviour(|_| behavior)
|
||||
.unwrap()
|
||||
.build();
|
||||
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
|
||||
const PORT: u16 = 30564; // 5132 ^ (('c' << 8) | 'o')
|
||||
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
|
||||
|
||||
let (send_send, mut send_recv) = mpsc::unbounded_channel();
|
||||
let (broadcast_send, mut broadcast_recv) = mpsc::unbounded_channel();
|
||||
let (receive_send, receive_recv) = mpsc::unbounded_channel();
|
||||
let (subscribe_send, mut subscribe_recv) = mpsc::unbounded_channel();
|
||||
@@ -290,17 +370,31 @@ impl LibP2p {
|
||||
IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode())))
|
||||
}
|
||||
|
||||
// TODO: If a network has less than TARGET_PEERS, this will cause retries ad infinitum
|
||||
const TARGET_PEERS: usize = 5;
|
||||
|
||||
// The addrs we're currently dialing, and the networks associated with them
|
||||
let dialing_peers = Arc::new(RwLock::new(HashMap::new()));
|
||||
// The peers we're currently connected to, and the networks associated with them
|
||||
let connected_peers = Arc::new(RwLock::new(HashMap::<Multiaddr, HashSet<NetworkId>>::new()));
|
||||
|
||||
// Find and connect to peers
|
||||
let (pending_p2p_connections_send, mut pending_p2p_connections_recv) =
|
||||
let (connect_to_network_send, mut connect_to_network_recv) =
|
||||
tokio::sync::mpsc::unbounded_channel();
|
||||
let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel();
|
||||
tokio::spawn({
|
||||
let pending_p2p_connections_send = pending_p2p_connections_send.clone();
|
||||
let dialing_peers = dialing_peers.clone();
|
||||
let connected_peers = connected_peers.clone();
|
||||
|
||||
let connect_to_network_send = connect_to_network_send.clone();
|
||||
async move {
|
||||
loop {
|
||||
// TODO: Add better peer management logic?
|
||||
{
|
||||
let connect = |addr: Multiaddr| {
|
||||
let connect = |network: NetworkId, addr: Multiaddr| {
|
||||
let dialing_peers = dialing_peers.clone();
|
||||
let connected_peers = connected_peers.clone();
|
||||
let to_dial_send = to_dial_send.clone();
|
||||
let connect_to_network_send = connect_to_network_send.clone();
|
||||
async move {
|
||||
log::info!("found peer from substrate: {addr}");
|
||||
|
||||
let protocols = addr.iter().filter_map(|piece| match piece {
|
||||
@@ -318,46 +412,99 @@ impl LibP2p {
|
||||
let addr = new_addr;
|
||||
log::debug!("transformed found peer: {addr}");
|
||||
|
||||
// TODO: Check this isn't a duplicate
|
||||
to_dial_send.send(addr).unwrap();
|
||||
let (is_fresh_dial, nets) = {
|
||||
let mut dialing_peers = dialing_peers.write().await;
|
||||
let is_fresh_dial = !dialing_peers.contains_key(&addr);
|
||||
if is_fresh_dial {
|
||||
dialing_peers.insert(addr.clone(), HashSet::new());
|
||||
}
|
||||
// Associate this network with this peer
|
||||
dialing_peers.get_mut(&addr).unwrap().insert(network);
|
||||
|
||||
let nets = dialing_peers.get(&addr).unwrap().clone();
|
||||
(is_fresh_dial, nets)
|
||||
};
|
||||
|
||||
// Spawn a task to remove this peer from 'dialing' in sixty seconds, in case dialing
|
||||
// fails
|
||||
// This performs cleanup and bounds the size of the map to whatever growth occurs
|
||||
// within a temporal window
|
||||
tokio::spawn({
|
||||
let dialing_peers = dialing_peers.clone();
|
||||
let connected_peers = connected_peers.clone();
|
||||
let connect_to_network_send = connect_to_network_send.clone();
|
||||
let addr = addr.clone();
|
||||
async move {
|
||||
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
|
||||
let mut dialing_peers = dialing_peers.write().await;
|
||||
if let Some(expected_nets) = dialing_peers.remove(&addr) {
|
||||
log::debug!("removed addr from dialing upon timeout: {addr}");
|
||||
|
||||
// TODO: De-duplicate this below instance
|
||||
// If we failed to dial and haven't gotten enough actual connections, retry
|
||||
let connected_peers = connected_peers.read().await;
|
||||
for net in expected_nets {
|
||||
let mut remaining_peers = 0;
|
||||
for nets in connected_peers.values() {
|
||||
if nets.contains(&net) {
|
||||
remaining_peers += 1;
|
||||
}
|
||||
}
|
||||
// If we do not, start connecting to this network again
|
||||
if remaining_peers < TARGET_PEERS {
|
||||
connect_to_network_send.send(net).expect(
|
||||
"couldn't send net to connect to due to disconnects (receiver dropped?)",
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if is_fresh_dial {
|
||||
to_dial_send.send((addr, nets)).unwrap();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: We should also connect to random peers from random nets as needed for
|
||||
// cosigning
|
||||
let mut to_retry = vec![];
|
||||
while let Some(network) = pending_p2p_connections_recv.recv().await {
|
||||
|
||||
// Drain the chainnel, de-duplicating any networks in it
|
||||
let mut connect_to_network_networks = HashSet::new();
|
||||
while let Ok(network) = connect_to_network_recv.try_recv() {
|
||||
connect_to_network_networks.insert(network);
|
||||
}
|
||||
for network in connect_to_network_networks {
|
||||
if let Ok(mut nodes) = serai.p2p_validators(network).await {
|
||||
// If there's an insufficient amount of nodes known, connect to all yet add it
|
||||
// back and break
|
||||
if nodes.len() < 3 {
|
||||
if nodes.len() < TARGET_PEERS {
|
||||
log::warn!(
|
||||
"insufficient amount of P2P nodes known for {:?}: {}",
|
||||
network,
|
||||
nodes.len()
|
||||
);
|
||||
to_retry.push(network);
|
||||
// Retry this later
|
||||
connect_to_network_send.send(network).unwrap();
|
||||
for node in nodes {
|
||||
connect(node);
|
||||
connect(network, node).await;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
// Randomly select up to 5
|
||||
for _ in 0 .. 5 {
|
||||
// Randomly select up to 150% of the TARGET_PEERS
|
||||
for _ in 0 .. ((3 * TARGET_PEERS) / 2) {
|
||||
if !nodes.is_empty() {
|
||||
let to_connect = nodes.swap_remove(
|
||||
usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap())
|
||||
.unwrap(),
|
||||
);
|
||||
connect(to_connect);
|
||||
connect(network, to_connect).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
for to_retry in to_retry {
|
||||
pending_p2p_connections_send.send(to_retry).unwrap();
|
||||
}
|
||||
}
|
||||
// Sleep 60 seconds before moving to the next iteration
|
||||
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
|
||||
}
|
||||
@@ -368,35 +515,10 @@ impl LibP2p {
|
||||
tokio::spawn({
|
||||
let mut time_of_last_p2p_message = Instant::now();
|
||||
|
||||
#[allow(clippy::needless_pass_by_ref_mut)] // False positive
|
||||
fn broadcast_raw(
|
||||
p2p: &mut Swarm<Behavior>,
|
||||
time_of_last_p2p_message: &mut Instant,
|
||||
set: Option<ValidatorSet>,
|
||||
msg: Vec<u8>,
|
||||
) {
|
||||
// Update the time of last message
|
||||
*time_of_last_p2p_message = Instant::now();
|
||||
|
||||
let topic =
|
||||
if let Some(set) = set { topic_for_set(set) } else { IdentTopic::new(LIBP2P_TOPIC) };
|
||||
|
||||
match p2p.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
|
||||
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
|
||||
Err(PublishError::InsufficientPeers) => {
|
||||
log::warn!("failed to send p2p message due to insufficient peers")
|
||||
}
|
||||
Err(PublishError::MessageTooLarge) => {
|
||||
panic!("tried to send a too large message: {}", hex::encode(msg))
|
||||
}
|
||||
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
|
||||
Err(PublishError::Duplicate) | Ok(_) => {}
|
||||
}
|
||||
}
|
||||
|
||||
async move {
|
||||
let connected_peers = connected_peers.clone();
|
||||
|
||||
let mut set_for_genesis = HashMap::new();
|
||||
let mut connected_peers = 0;
|
||||
loop {
|
||||
let time_since_last = Instant::now().duration_since(time_of_last_p2p_message);
|
||||
tokio::select! {
|
||||
@@ -409,7 +531,7 @@ impl LibP2p {
|
||||
let topic = topic_for_set(set);
|
||||
if subscribe {
|
||||
log::info!("subscribing to p2p messages for {set:?}");
|
||||
pending_p2p_connections_send.send(set.network).unwrap();
|
||||
connect_to_network_send.send(set.network).unwrap();
|
||||
set_for_genesis.insert(genesis, set);
|
||||
swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap();
|
||||
} else {
|
||||
@@ -419,17 +541,50 @@ impl LibP2p {
|
||||
}
|
||||
}
|
||||
|
||||
msg = send_recv.recv() => {
|
||||
let (peer, msg): (PeerId, Vec<u8>) =
|
||||
msg.expect("send_recv closed. are we shutting down?");
|
||||
swarm.behaviour_mut().reqres.send_request(&peer, msg);
|
||||
},
|
||||
|
||||
// Handle any queued outbound messages
|
||||
msg = broadcast_recv.recv() => {
|
||||
let (genesis, msg): (Option<[u8; 32]>, Vec<u8>) =
|
||||
// Update the time of last message
|
||||
time_of_last_p2p_message = Instant::now();
|
||||
|
||||
let (kind, msg): (P2pMessageKind, Vec<u8>) =
|
||||
msg.expect("broadcast_recv closed. are we shutting down?");
|
||||
let set = genesis.and_then(|genesis| set_for_genesis.get(&genesis).copied());
|
||||
broadcast_raw(
|
||||
&mut swarm,
|
||||
&mut time_of_last_p2p_message,
|
||||
set,
|
||||
msg,
|
||||
);
|
||||
|
||||
if matches!(kind, P2pMessageKind::ReqRes(_)) {
|
||||
// Use request/response, yet send to all connected peers
|
||||
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
|
||||
swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone());
|
||||
}
|
||||
} else {
|
||||
// Use gossipsub
|
||||
|
||||
let set =
|
||||
kind.genesis().and_then(|genesis| set_for_genesis.get(&genesis).copied());
|
||||
let topic = if let Some(set) = set {
|
||||
topic_for_set(set)
|
||||
} else {
|
||||
IdentTopic::new(LIBP2P_TOPIC)
|
||||
};
|
||||
|
||||
match swarm.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
|
||||
Err(PublishError::SigningError(e)) => {
|
||||
panic!("signing error when broadcasting: {e}")
|
||||
},
|
||||
Err(PublishError::InsufficientPeers) => {
|
||||
log::warn!("failed to send p2p message due to insufficient peers")
|
||||
}
|
||||
Err(PublishError::MessageTooLarge) => {
|
||||
panic!("tried to send a too large message: {}", hex::encode(msg))
|
||||
}
|
||||
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
|
||||
Err(PublishError::Duplicate) | Ok(_) => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle new incoming messages
|
||||
@@ -438,42 +593,119 @@ impl LibP2p {
|
||||
Some(SwarmEvent::Dialing { connection_id, .. }) => {
|
||||
log::debug!("dialing to peer in connection ID {}", &connection_id);
|
||||
}
|
||||
Some(SwarmEvent::ConnectionEstablished { peer_id, connection_id, .. }) => {
|
||||
Some(SwarmEvent::ConnectionEstablished {
|
||||
peer_id,
|
||||
connection_id,
|
||||
endpoint,
|
||||
..
|
||||
}) => {
|
||||
if &peer_id == swarm.local_peer_id() {
|
||||
log::warn!("established a libp2p connection to ourselves");
|
||||
swarm.close_connection(connection_id);
|
||||
continue;
|
||||
}
|
||||
|
||||
connected_peers += 1;
|
||||
let addr = endpoint.get_remote_address();
|
||||
let nets = {
|
||||
let mut dialing_peers = dialing_peers.write().await;
|
||||
if let Some(nets) = dialing_peers.remove(addr) {
|
||||
nets
|
||||
} else {
|
||||
log::debug!("connected to a peer who we didn't have within dialing");
|
||||
HashSet::new()
|
||||
}
|
||||
};
|
||||
{
|
||||
let mut connected_peers = connected_peers.write().await;
|
||||
connected_peers.insert(addr.clone(), nets);
|
||||
|
||||
log::debug!(
|
||||
"connection established to peer {} in connection ID {}, connected peers: {}",
|
||||
&peer_id,
|
||||
&connection_id,
|
||||
connected_peers,
|
||||
connected_peers.len(),
|
||||
);
|
||||
}
|
||||
Some(SwarmEvent::ConnectionClosed { peer_id, .. }) => {
|
||||
connected_peers -= 1;
|
||||
}
|
||||
Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, .. }) => {
|
||||
let mut connected_peers = connected_peers.write().await;
|
||||
let Some(nets) = connected_peers.remove(endpoint.get_remote_address()) else {
|
||||
log::debug!("closed connection to peer which wasn't in connected_peers");
|
||||
continue;
|
||||
};
|
||||
// Downgrade to a read lock
|
||||
let connected_peers = connected_peers.downgrade();
|
||||
|
||||
// For each net we lost a peer for, check if we still have sufficient peers
|
||||
// overall
|
||||
for net in nets {
|
||||
let mut remaining_peers = 0;
|
||||
for nets in connected_peers.values() {
|
||||
if nets.contains(&net) {
|
||||
remaining_peers += 1;
|
||||
}
|
||||
}
|
||||
// If we do not, start connecting to this network again
|
||||
if remaining_peers < TARGET_PEERS {
|
||||
connect_to_network_send
|
||||
.send(net)
|
||||
.expect(
|
||||
"couldn't send net to connect to due to disconnects (receiver dropped?)"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
log::debug!(
|
||||
"connection with peer {peer_id} closed, connected peers: {}",
|
||||
connected_peers,
|
||||
connected_peers.len(),
|
||||
);
|
||||
}
|
||||
Some(SwarmEvent::Behaviour(BehaviorEvent::Reqres(
|
||||
RrEvent::Message { peer, message },
|
||||
))) => {
|
||||
let message = match message {
|
||||
RrMessage::Request { request, .. } => request,
|
||||
RrMessage::Response { response, .. } => response,
|
||||
};
|
||||
|
||||
let mut msg_ref = message.as_slice();
|
||||
let Some(kind) = ReqResMessageKind::read(&mut msg_ref) else { continue };
|
||||
let message = Message {
|
||||
sender: peer,
|
||||
kind: P2pMessageKind::ReqRes(kind),
|
||||
msg: msg_ref.to_vec(),
|
||||
};
|
||||
receive_send.send(message).expect("receive_send closed. are we shutting down?");
|
||||
}
|
||||
Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
|
||||
GsEvent::Message { propagation_source, message, .. },
|
||||
))) => {
|
||||
receive_send
|
||||
.send((propagation_source, message.data))
|
||||
.expect("receive_send closed. are we shutting down?");
|
||||
let mut msg_ref = message.data.as_slice();
|
||||
let Some(kind) = GossipMessageKind::read(&mut msg_ref) else { continue };
|
||||
let message = Message {
|
||||
sender: propagation_source,
|
||||
kind: P2pMessageKind::Gossip(kind),
|
||||
msg: msg_ref.to_vec(),
|
||||
};
|
||||
receive_send.send(message).expect("receive_send closed. are we shutting down?");
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle peers to dial
|
||||
addr = to_dial_recv.recv() => {
|
||||
let addr = addr.expect("received address was None (sender dropped?)");
|
||||
addr_and_nets = to_dial_recv.recv() => {
|
||||
let (addr, nets) =
|
||||
addr_and_nets.expect("received address was None (sender dropped?)");
|
||||
// If we've already dialed and connected to this address, don't further dial them
|
||||
// Just associate these networks with them
|
||||
if let Some(existing_nets) = connected_peers.write().await.get_mut(&addr) {
|
||||
for net in nets {
|
||||
existing_nets.insert(net);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
if let Err(e) = swarm.dial(addr) {
|
||||
log::warn!("dialing peer failed: {e:?}");
|
||||
}
|
||||
@@ -487,12 +719,13 @@ impl LibP2p {
|
||||
// (where a finalized block only occurs due to network activity), meaning this won't be
|
||||
// run
|
||||
() = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => {
|
||||
broadcast_raw(
|
||||
&mut swarm,
|
||||
&mut time_of_last_p2p_message,
|
||||
None,
|
||||
P2pMessageKind::KeepAlive.serialize()
|
||||
);
|
||||
time_of_last_p2p_message = Instant::now();
|
||||
for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
|
||||
swarm
|
||||
.behaviour_mut()
|
||||
.reqres
|
||||
.send_request(&peer_id, ReqResMessageKind::KeepAlive.serialize());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -501,6 +734,7 @@ impl LibP2p {
|
||||
|
||||
LibP2p {
|
||||
subscribe: Arc::new(Mutex::new(subscribe_send)),
|
||||
send: Arc::new(Mutex::new(send_send)),
|
||||
broadcast: Arc::new(Mutex::new(broadcast_send)),
|
||||
receive: Arc::new(Mutex::new(receive_recv)),
|
||||
}
|
||||
@@ -529,22 +763,22 @@ impl P2p for LibP2p {
|
||||
.expect("subscribe_send closed. are we shutting down?");
|
||||
}
|
||||
|
||||
async fn send_raw(&self, _: Self::Id, genesis: Option<[u8; 32]>, msg: Vec<u8>) {
|
||||
self.broadcast_raw(genesis, msg).await;
|
||||
async fn send_raw(&self, peer: Self::Id, msg: Vec<u8>) {
|
||||
self.send.lock().await.send((peer, msg)).expect("send_send closed. are we shutting down?");
|
||||
}
|
||||
|
||||
async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec<u8>) {
|
||||
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>) {
|
||||
self
|
||||
.broadcast
|
||||
.lock()
|
||||
.await
|
||||
.send((genesis, msg))
|
||||
.send((kind, msg))
|
||||
.expect("broadcast_send closed. are we shutting down?");
|
||||
}
|
||||
|
||||
// TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant
|
||||
// lock acquisition?
|
||||
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
|
||||
async fn receive(&self) -> Message<Self> {
|
||||
self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?")
|
||||
}
|
||||
}
|
||||
@@ -552,7 +786,7 @@ impl P2p for LibP2p {
|
||||
#[async_trait]
|
||||
impl TributaryP2p for LibP2p {
|
||||
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
||||
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
|
||||
<Self as P2p>::broadcast(self, GossipMessageKind::Tributary(genesis), msg).await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -590,16 +824,12 @@ pub async fn heartbeat_tributaries_task<D: Db, P: P2p>(
|
||||
if SystemTime::now() > (block_time + Duration::from_secs(60)) {
|
||||
log::warn!("last known tributary block was over a minute ago");
|
||||
let mut msg = tip.to_vec();
|
||||
// Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating
|
||||
let timestamp = SystemTime::now()
|
||||
let time: u64 = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("system clock is wrong")
|
||||
.as_secs();
|
||||
// Divide by the block time so if multiple parties send a Heartbeat, they're more likely to
|
||||
// overlap
|
||||
let time_unit = timestamp / u64::from(Tributary::<D, Transaction, P>::block_time());
|
||||
msg.extend(time_unit.to_le_bytes());
|
||||
P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await;
|
||||
msg.extend(time.to_le_bytes());
|
||||
P2p::broadcast(&p2p, ReqResMessageKind::Heartbeat(tributary.genesis()), msg).await;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -631,6 +861,8 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||
// Subscribe to the topic for this tributary
|
||||
p2p.subscribe(tributary.spec.set(), genesis).await;
|
||||
|
||||
let spec_set = tributary.spec.set();
|
||||
|
||||
// Per-Tributary P2P message handler
|
||||
tokio::spawn({
|
||||
let p2p = p2p.clone();
|
||||
@@ -641,91 +873,58 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||
break;
|
||||
};
|
||||
match msg.kind {
|
||||
P2pMessageKind::KeepAlive => {}
|
||||
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {}
|
||||
|
||||
P2pMessageKind::Tributary(msg_genesis) => {
|
||||
assert_eq!(msg_genesis, genesis);
|
||||
log::trace!("handling message for tributary {:?}", tributary.spec.set());
|
||||
if tributary.tributary.handle_message(&msg.msg).await {
|
||||
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
|
||||
}
|
||||
}
|
||||
|
||||
// TODO2: Rate limit this per timestamp
|
||||
// And/or slash on Heartbeat which justifies a response, since the node
|
||||
// TODO: Slash on Heartbeat which justifies a response, since the node
|
||||
// obviously was offline and we must now use our bandwidth to compensate for
|
||||
// them?
|
||||
P2pMessageKind::Heartbeat(msg_genesis) => {
|
||||
P2pMessageKind::ReqRes(ReqResMessageKind::Heartbeat(msg_genesis)) => {
|
||||
assert_eq!(msg_genesis, genesis);
|
||||
if msg.msg.len() != 40 {
|
||||
log::error!("validator sent invalid heartbeat");
|
||||
continue;
|
||||
}
|
||||
// Only respond to recent heartbeats
|
||||
let msg_time = u64::from_le_bytes(msg.msg[32 .. 40].try_into().expect(
|
||||
"length-checked heartbeat message didn't have 8 bytes for the u64",
|
||||
));
|
||||
if SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("system clock is wrong")
|
||||
.as_secs()
|
||||
.saturating_sub(msg_time) >
|
||||
10
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
log::debug!("received heartbeat with a recent timestamp");
|
||||
|
||||
let reader = tributary.tributary.reader();
|
||||
|
||||
let p2p = p2p.clone();
|
||||
let spec = tributary.spec.clone();
|
||||
let reader = tributary.tributary.reader();
|
||||
// Spawn a dedicated task as this may require loading large amounts of data
|
||||
// from disk and take a notable amount of time
|
||||
tokio::spawn(async move {
|
||||
/*
|
||||
// Have sqrt(n) nodes reply with the blocks
|
||||
let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64;
|
||||
// Try to have at least 3 responders
|
||||
if responders < 3 {
|
||||
responders = tributary.spec.n().min(3).into();
|
||||
}
|
||||
*/
|
||||
|
||||
/*
|
||||
// Have up to three nodes respond
|
||||
let responders = u64::from(spec.n().min(3));
|
||||
|
||||
// Decide which nodes will respond by using the latest block's hash as a
|
||||
// mutually agreed upon entropy source
|
||||
// This isn't a secure source of entropy, yet it's fine for this
|
||||
let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap());
|
||||
// If n = 10, responders = 3, we want `start` to be 0 ..= 7
|
||||
// (so the highest is 7, 8, 9)
|
||||
// entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7
|
||||
let start =
|
||||
usize::try_from(entropy % (u64::from(spec.n() + 1) - responders))
|
||||
.unwrap();
|
||||
let mut selected = false;
|
||||
for validator in &spec.validators()
|
||||
[start .. (start + usize::try_from(responders).unwrap())]
|
||||
{
|
||||
if our_key == validator.0 {
|
||||
selected = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !selected {
|
||||
log::debug!("received heartbeat and not selected to respond");
|
||||
return;
|
||||
}
|
||||
|
||||
log::debug!("received heartbeat and selected to respond");
|
||||
*/
|
||||
|
||||
// Have every node respond
|
||||
// While we could only have a subset respond, LibP2P will sync all messages
|
||||
// it isn't aware of
|
||||
// It's cheaper to be aware from our disk than from over the network
|
||||
// TODO: Spawn a dedicated topic for this heartbeat response?
|
||||
let mut latest = msg.msg[.. 32].try_into().unwrap();
|
||||
let mut to_send = vec![];
|
||||
while let Some(next) = reader.block_after(&latest) {
|
||||
to_send.push(next);
|
||||
latest = next;
|
||||
}
|
||||
if to_send.len() > 3 {
|
||||
for next in to_send {
|
||||
let mut res = reader.block(&next).unwrap().serialize();
|
||||
res.extend(reader.commit(&next).unwrap());
|
||||
// Also include the timestamp used within the Heartbeat
|
||||
res.extend(&msg.msg[32 .. 40]);
|
||||
p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await;
|
||||
latest = next;
|
||||
p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
P2pMessageKind::Block(msg_genesis) => {
|
||||
P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => {
|
||||
assert_eq!(msg_genesis, genesis);
|
||||
let mut msg_ref: &[u8] = msg.msg.as_ref();
|
||||
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
|
||||
@@ -744,7 +943,15 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||
);
|
||||
}
|
||||
|
||||
P2pMessageKind::CosignedBlock => unreachable!(),
|
||||
P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => {
|
||||
assert_eq!(msg_genesis, genesis);
|
||||
log::trace!("handling message for tributary {:?}", spec_set);
|
||||
if tributary.tributary.handle_message(&msg.msg).await {
|
||||
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
|
||||
}
|
||||
}
|
||||
|
||||
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => unreachable!(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -764,15 +971,16 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
|
||||
loop {
|
||||
let msg = p2p.receive().await;
|
||||
match msg.kind {
|
||||
P2pMessageKind::KeepAlive => {}
|
||||
P2pMessageKind::Tributary(genesis) |
|
||||
P2pMessageKind::Heartbeat(genesis) |
|
||||
P2pMessageKind::Block(genesis) => {
|
||||
P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {}
|
||||
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) |
|
||||
P2pMessageKind::ReqRes(
|
||||
ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis),
|
||||
) => {
|
||||
if let Some(channel) = channels.read().await.get(&genesis) {
|
||||
channel.send(msg).unwrap();
|
||||
}
|
||||
}
|
||||
P2pMessageKind::CosignedBlock => {
|
||||
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => {
|
||||
let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else {
|
||||
log::error!("received CosignedBlock message with invalidly serialized contents");
|
||||
continue;
|
||||
|
||||
@@ -14,7 +14,7 @@ use tokio::sync::RwLock;
|
||||
|
||||
use crate::{
|
||||
processors::{Message, Processors},
|
||||
TributaryP2p, P2pMessageKind, P2p,
|
||||
TributaryP2p, ReqResMessageKind, GossipMessageKind, P2pMessageKind, Message as P2pMessage, P2p,
|
||||
};
|
||||
|
||||
pub mod tributary;
|
||||
@@ -45,7 +45,10 @@ impl Processors for MemProcessors {
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct LocalP2p(usize, pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, Vec<u8>)>>)>>);
|
||||
pub struct LocalP2p(
|
||||
usize,
|
||||
pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, P2pMessageKind, Vec<u8>)>>)>>,
|
||||
);
|
||||
|
||||
impl LocalP2p {
|
||||
pub fn new(validators: usize) -> Vec<LocalP2p> {
|
||||
@@ -65,11 +68,13 @@ impl P2p for LocalP2p {
|
||||
async fn subscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
|
||||
async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
|
||||
|
||||
async fn send_raw(&self, to: Self::Id, _genesis: Option<[u8; 32]>, msg: Vec<u8>) {
|
||||
self.1.write().await.1[to].push_back((self.0, msg));
|
||||
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
|
||||
let mut msg_ref = msg.as_slice();
|
||||
let kind = ReqResMessageKind::read(&mut msg_ref).unwrap();
|
||||
self.1.write().await.1[to].push_back((self.0, P2pMessageKind::ReqRes(kind), msg_ref.to_vec()));
|
||||
}
|
||||
|
||||
async fn broadcast_raw(&self, _genesis: Option<[u8; 32]>, msg: Vec<u8>) {
|
||||
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>) {
|
||||
// Content-based deduplication
|
||||
let mut lock = self.1.write().await;
|
||||
{
|
||||
@@ -81,19 +86,26 @@ impl P2p for LocalP2p {
|
||||
}
|
||||
let queues = &mut lock.1;
|
||||
|
||||
let kind_len = (match kind {
|
||||
P2pMessageKind::ReqRes(kind) => kind.serialize(),
|
||||
P2pMessageKind::Gossip(kind) => kind.serialize(),
|
||||
})
|
||||
.len();
|
||||
let msg = msg[kind_len ..].to_vec();
|
||||
|
||||
for (i, msg_queue) in queues.iter_mut().enumerate() {
|
||||
if i == self.0 {
|
||||
continue;
|
||||
}
|
||||
msg_queue.push_back((self.0, msg.clone()));
|
||||
msg_queue.push_back((self.0, kind, msg.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) {
|
||||
async fn receive(&self) -> P2pMessage<Self> {
|
||||
// This is a cursed way to implement an async read from a Vec
|
||||
loop {
|
||||
if let Some(res) = self.1.write().await.1[self.0].pop_front() {
|
||||
return res;
|
||||
if let Some((sender, kind, msg)) = self.1.write().await.1[self.0].pop_front() {
|
||||
return P2pMessage { sender, kind, msg };
|
||||
}
|
||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
||||
}
|
||||
@@ -103,6 +115,11 @@ impl P2p for LocalP2p {
|
||||
#[async_trait]
|
||||
impl TributaryP2p for LocalP2p {
|
||||
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
||||
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await
|
||||
<Self as P2p>::broadcast(
|
||||
self,
|
||||
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)),
|
||||
msg,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,7 +26,7 @@ use serai_db::MemDb;
|
||||
use tributary::Tributary;
|
||||
|
||||
use crate::{
|
||||
P2pMessageKind, P2p,
|
||||
GossipMessageKind, P2pMessageKind, P2p,
|
||||
tributary::{Transaction, TributarySpec},
|
||||
tests::LocalP2p,
|
||||
};
|
||||
@@ -98,7 +98,7 @@ pub async fn run_tributaries(
|
||||
for (p2p, tributary) in &mut tributaries {
|
||||
while let Poll::Ready(msg) = poll!(p2p.receive()) {
|
||||
match msg.kind {
|
||||
P2pMessageKind::Tributary(genesis) => {
|
||||
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
|
||||
assert_eq!(genesis, tributary.genesis());
|
||||
if tributary.handle_message(&msg.msg).await {
|
||||
p2p.broadcast(msg.kind, msg.msg).await;
|
||||
@@ -173,7 +173,7 @@ async fn tributary_test() {
|
||||
for (p2p, tributary) in &mut tributaries {
|
||||
while let Poll::Ready(msg) = poll!(p2p.receive()) {
|
||||
match msg.kind {
|
||||
P2pMessageKind::Tributary(genesis) => {
|
||||
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
|
||||
assert_eq!(genesis, tributary.genesis());
|
||||
tributary.handle_message(&msg.msg).await;
|
||||
}
|
||||
@@ -199,7 +199,7 @@ async fn tributary_test() {
|
||||
for (p2p, tributary) in &mut tributaries {
|
||||
while let Poll::Ready(msg) = poll!(p2p.receive()) {
|
||||
match msg.kind {
|
||||
P2pMessageKind::Tributary(genesis) => {
|
||||
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
|
||||
assert_eq!(genesis, tributary.genesis());
|
||||
tributary.handle_message(&msg.msg).await;
|
||||
}
|
||||
|
||||
@@ -116,8 +116,8 @@ async fn sync_test() {
|
||||
.map_err(|_| "failed to send ActiveTributary to heartbeat")
|
||||
.unwrap();
|
||||
|
||||
// The heartbeat is once every 10 blocks
|
||||
sleep(Duration::from_secs(10 * block_time)).await;
|
||||
// The heartbeat is once every 10 blocks, with some limitations
|
||||
sleep(Duration::from_secs(20 * block_time)).await;
|
||||
assert!(syncer_tributary.tip().await != spec.genesis());
|
||||
|
||||
// Verify it synced to the tip
|
||||
|
||||
@@ -74,7 +74,7 @@ impl TributarySpec {
|
||||
|
||||
pub fn genesis(&self) -> [u8; 32] {
|
||||
// Calculate the genesis for this Tributary
|
||||
let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis");
|
||||
let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis Testnet 2.1");
|
||||
// This locks it to a specific Serai chain
|
||||
genesis.append_message(b"serai_block", self.serai_block);
|
||||
genesis.append_message(b"session", self.set.session.0.to_le_bytes());
|
||||
|
||||
@@ -59,8 +59,7 @@ pub const ACCOUNT_MEMPOOL_LIMIT: u32 = 50;
|
||||
pub const BLOCK_SIZE_LIMIT: usize = 3_001_000;
|
||||
|
||||
pub(crate) const TENDERMINT_MESSAGE: u8 = 0;
|
||||
pub(crate) const BLOCK_MESSAGE: u8 = 1;
|
||||
pub(crate) const TRANSACTION_MESSAGE: u8 = 2;
|
||||
pub(crate) const TRANSACTION_MESSAGE: u8 = 2; // TODO: Normalize to 1
|
||||
|
||||
#[allow(clippy::large_enum_variant)]
|
||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||
@@ -336,9 +335,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||
|
||||
// Return true if the message should be rebroadcasted.
|
||||
pub async fn handle_message(&self, msg: &[u8]) -> bool {
|
||||
// Acquire the lock now to prevent sync_block from being run at the same time
|
||||
let mut sync_block = self.synced_block_result.write().await;
|
||||
|
||||
match msg.first() {
|
||||
Some(&TRANSACTION_MESSAGE) => {
|
||||
let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else {
|
||||
@@ -370,19 +366,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
|
||||
false
|
||||
}
|
||||
|
||||
Some(&BLOCK_MESSAGE) => {
|
||||
let mut msg_ref = &msg[1 ..];
|
||||
let Ok(block) = Block::<T>::read(&mut msg_ref) else {
|
||||
log::error!("received invalid block message");
|
||||
return false;
|
||||
};
|
||||
let commit = msg[(msg.len() - msg_ref.len()) ..].to_vec();
|
||||
if self.sync_block_internal(block, commit, &mut sync_block).await {
|
||||
log::debug!("synced block over p2p net instead of building the commit ourselves");
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,9 +41,8 @@ use tendermint::{
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use crate::{
|
||||
TENDERMINT_MESSAGE, TRANSACTION_MESSAGE, BLOCK_MESSAGE, ReadWrite,
|
||||
transaction::Transaction as TransactionTrait, Transaction, BlockHeader, Block, BlockError,
|
||||
Blockchain, P2p,
|
||||
TENDERMINT_MESSAGE, TRANSACTION_MESSAGE, ReadWrite, transaction::Transaction as TransactionTrait,
|
||||
Transaction, BlockHeader, Block, BlockError, Blockchain, P2p,
|
||||
};
|
||||
|
||||
pub mod tx;
|
||||
@@ -414,12 +413,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||
);
|
||||
match block_res {
|
||||
Ok(()) => {
|
||||
// If we successfully added this block, broadcast it
|
||||
// TODO: Move this under the coordinator once we set up on new block notifications?
|
||||
let mut msg = serialized_block.0;
|
||||
msg.insert(0, BLOCK_MESSAGE);
|
||||
msg.extend(encoded_commit);
|
||||
self.p2p.broadcast(self.genesis, msg).await;
|
||||
// If we successfully added this block, break
|
||||
break;
|
||||
}
|
||||
Err(BlockError::NonLocalProvided(hash)) => {
|
||||
@@ -428,6 +422,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
|
||||
hex::encode(hash),
|
||||
hex::encode(self.genesis)
|
||||
);
|
||||
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
|
||||
}
|
||||
_ => return invalid_block(),
|
||||
}
|
||||
|
||||
@@ -313,11 +313,16 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
let time_until_round_end = round_end.instant().saturating_duration_since(Instant::now());
|
||||
if time_until_round_end == Duration::ZERO {
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"resetting when prior round ended {}ms ago",
|
||||
Instant::now().saturating_duration_since(round_end.instant()).as_millis(),
|
||||
);
|
||||
}
|
||||
log::trace!("sleeping until round ends in {}ms", time_until_round_end.as_millis());
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"sleeping until round ends in {}ms",
|
||||
time_until_round_end.as_millis(),
|
||||
);
|
||||
sleep(time_until_round_end).await;
|
||||
|
||||
// Clear our outbound message queue
|
||||
@@ -598,7 +603,11 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
);
|
||||
let id = block.id();
|
||||
let proposal = self.network.add_block(block, commit).await;
|
||||
log::trace!("added block {} (produced by machine)", hex::encode(id.as_ref()));
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"added block {} (produced by machine)",
|
||||
hex::encode(id.as_ref()),
|
||||
);
|
||||
self.reset(msg.round, proposal).await;
|
||||
}
|
||||
Err(TendermintError::Malicious(sender, evidence)) => {
|
||||
@@ -692,7 +701,12 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
(msg.round == self.block.round().number) &&
|
||||
(msg.data.step() == Step::Propose)
|
||||
{
|
||||
log::trace!("received Propose for block {}, round {}", msg.block.0, msg.round.0);
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"received Propose for block {}, round {}",
|
||||
msg.block.0,
|
||||
msg.round.0,
|
||||
);
|
||||
}
|
||||
|
||||
// If this is a precommit, verify its signature
|
||||
@@ -710,7 +724,13 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
if !self.block.log.log(signed.clone())? {
|
||||
return Err(TendermintError::AlreadyHandled);
|
||||
}
|
||||
log::debug!(target: "tendermint", "received new tendermint message");
|
||||
log::debug!(
|
||||
target: "tendermint",
|
||||
"received new tendermint message (block: {}, round: {}, step: {:?})",
|
||||
msg.block.0,
|
||||
msg.round.0,
|
||||
msg.data.step(),
|
||||
);
|
||||
|
||||
// All functions, except for the finalizer and the jump, are locked to the current round
|
||||
|
||||
@@ -757,6 +777,13 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
// 55-56
|
||||
// Jump, enabling processing by the below code
|
||||
if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() {
|
||||
log::debug!(
|
||||
target: "tendermint",
|
||||
"jumping from round {} to round {}",
|
||||
self.block.round().number.0,
|
||||
msg.round.0,
|
||||
);
|
||||
|
||||
// Jump to the new round.
|
||||
let proposer = self.round(msg.round, None);
|
||||
|
||||
@@ -814,13 +841,26 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
|
||||
let (participation, weight) =
|
||||
self.block.log.message_instances(self.block.round().number, &Data::Prevote(None));
|
||||
let threshold_weight = self.weights.threshold();
|
||||
if participation < threshold_weight {
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"progess towards setting prevote timeout, participation: {}, needed: {}",
|
||||
participation,
|
||||
threshold_weight,
|
||||
);
|
||||
}
|
||||
// 34-35
|
||||
if participation >= self.weights.threshold() {
|
||||
if participation >= threshold_weight {
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"setting timeout for prevote due to sufficient participation",
|
||||
);
|
||||
self.block.round_mut().set_timeout(Step::Prevote);
|
||||
}
|
||||
|
||||
// 44-46
|
||||
if weight >= self.weights.threshold() {
|
||||
if weight >= threshold_weight {
|
||||
self.broadcast(Data::Precommit(None));
|
||||
return Ok(None);
|
||||
}
|
||||
@@ -830,6 +870,10 @@ impl<N: Network + 'static> TendermintMachine<N> {
|
||||
if matches!(msg.data, Data::Precommit(_)) &&
|
||||
self.block.log.has_participation(self.block.round().number, Step::Precommit)
|
||||
{
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
"setting timeout for precommit due to sufficient participation",
|
||||
);
|
||||
self.block.round_mut().set_timeout(Step::Precommit);
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::{sync::Arc, collections::HashMap};
|
||||
|
||||
use log::debug;
|
||||
use parity_scale_codec::Encode;
|
||||
|
||||
use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence};
|
||||
@@ -27,7 +26,7 @@ impl<N: Network> MessageLog<N> {
|
||||
let step = msg.data.step();
|
||||
if let Some(existing) = msgs.get(&step) {
|
||||
if existing.msg.data != msg.data {
|
||||
debug!(
|
||||
log::debug!(
|
||||
target: "tendermint",
|
||||
"Validator sent multiple messages for the same block + round + step"
|
||||
);
|
||||
|
||||
@@ -57,6 +57,7 @@ impl<N: Network> RoundData<N> {
|
||||
|
||||
// Poll all set timeouts, returning the Step whose timeout has just expired
|
||||
pub(crate) async fn timeout_future(&self) -> Step {
|
||||
/*
|
||||
let now = Instant::now();
|
||||
log::trace!(
|
||||
target: "tendermint",
|
||||
@@ -64,6 +65,7 @@ impl<N: Network> RoundData<N> {
|
||||
self.step,
|
||||
self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::<HashMap<_, _>>()
|
||||
);
|
||||
*/
|
||||
|
||||
let timeout_future = |step| {
|
||||
let timeout = self.timeouts.get(&step).copied();
|
||||
|
||||
@@ -511,7 +511,7 @@ fn start(network: Network, services: HashSet<String>) {
|
||||
command
|
||||
} else {
|
||||
// Publish the port
|
||||
command.arg("-p").arg("30563:30563")
|
||||
command.arg("-p").arg("30564:30564")
|
||||
}
|
||||
}
|
||||
"serai" => {
|
||||
|
||||
@@ -159,9 +159,11 @@ pub mod pallet {
|
||||
///
|
||||
/// Errors if any amount overflows.
|
||||
pub fn mint(to: Public, balance: Balance) -> Result<(), Error<T, I>> {
|
||||
/*
|
||||
if !T::AllowMint::is_allowed(&balance) {
|
||||
Err(Error::<T, I>::MintNotAllowed)?;
|
||||
}
|
||||
*/
|
||||
|
||||
// update the balance
|
||||
Self::increase_balance_internal(to, balance)?;
|
||||
|
||||
@@ -26,6 +26,8 @@ hex = "0.4"
|
||||
rand_core = "0.6"
|
||||
schnorrkel = "0.11"
|
||||
|
||||
libp2p = "0.52"
|
||||
|
||||
sp-core = { git = "https://github.com/serai-dex/substrate" }
|
||||
sp-keystore = { git = "https://github.com/serai-dex/substrate" }
|
||||
sp-timestamp = { git = "https://github.com/serai-dex/substrate" }
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use core::marker::PhantomData;
|
||||
use std::collections::HashSet;
|
||||
|
||||
use sp_core::Pair as PairTrait;
|
||||
use sp_core::{Decode, Pair as PairTrait, sr25519::Public};
|
||||
|
||||
use sc_service::ChainType;
|
||||
|
||||
@@ -23,7 +24,7 @@ fn wasm_binary() -> Vec<u8> {
|
||||
WASM_BINARY.ok_or("compiled in wasm not available").unwrap().to_vec()
|
||||
}
|
||||
|
||||
fn testnet_genesis(
|
||||
fn devnet_genesis(
|
||||
wasm_binary: &[u8],
|
||||
validators: &[&'static str],
|
||||
endowed_accounts: Vec<PublicKey>,
|
||||
@@ -72,6 +73,57 @@ fn testnet_genesis(
|
||||
}
|
||||
}
|
||||
|
||||
fn testnet_genesis(wasm_binary: &[u8], validators: Vec<&'static str>) -> RuntimeGenesisConfig {
|
||||
let validators = validators
|
||||
.into_iter()
|
||||
.map(|validator| Public::decode(&mut hex::decode(validator).unwrap().as_slice()).unwrap())
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
assert_eq!(validators.iter().collect::<HashSet<_>>().len(), validators.len());
|
||||
|
||||
RuntimeGenesisConfig {
|
||||
system: SystemConfig { code: wasm_binary.to_vec(), _config: PhantomData },
|
||||
|
||||
transaction_payment: Default::default(),
|
||||
|
||||
coins: CoinsConfig {
|
||||
accounts: validators
|
||||
.iter()
|
||||
.map(|a| (*a, Balance { coin: Coin::Serai, amount: Amount(5_000_000 * 10_u64.pow(8)) }))
|
||||
.collect(),
|
||||
_ignore: Default::default(),
|
||||
},
|
||||
|
||||
dex: DexConfig {
|
||||
pools: vec![Coin::Bitcoin, Coin::Ether, Coin::Dai, Coin::Monero],
|
||||
_ignore: Default::default(),
|
||||
},
|
||||
|
||||
validator_sets: ValidatorSetsConfig {
|
||||
networks: serai_runtime::primitives::NETWORKS
|
||||
.iter()
|
||||
.map(|network| match network {
|
||||
NetworkId::Serai => (NetworkId::Serai, Amount(50_000 * 10_u64.pow(8))),
|
||||
NetworkId::Bitcoin => (NetworkId::Bitcoin, Amount(1_000_000 * 10_u64.pow(8))),
|
||||
NetworkId::Ethereum => (NetworkId::Ethereum, Amount(1_000_000 * 10_u64.pow(8))),
|
||||
NetworkId::Monero => (NetworkId::Monero, Amount(100_000 * 10_u64.pow(8))),
|
||||
})
|
||||
.collect(),
|
||||
participants: validators.clone(),
|
||||
},
|
||||
signals: SignalsConfig::default(),
|
||||
babe: BabeConfig {
|
||||
authorities: validators.iter().map(|validator| ((*validator).into(), 1)).collect(),
|
||||
epoch_config: Some(BABE_GENESIS_EPOCH_CONFIG),
|
||||
_config: PhantomData,
|
||||
},
|
||||
grandpa: GrandpaConfig {
|
||||
authorities: validators.into_iter().map(|validator| (validator.into(), 1)).collect(),
|
||||
_config: PhantomData,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub fn development_config() -> ChainSpec {
|
||||
let wasm_binary = wasm_binary();
|
||||
|
||||
@@ -82,7 +134,7 @@ pub fn development_config() -> ChainSpec {
|
||||
"devnet",
|
||||
ChainType::Development,
|
||||
move || {
|
||||
testnet_genesis(
|
||||
devnet_genesis(
|
||||
&wasm_binary,
|
||||
&["Alice"],
|
||||
vec![
|
||||
@@ -100,7 +152,7 @@ pub fn development_config() -> ChainSpec {
|
||||
// Telemetry
|
||||
None,
|
||||
// Protocol ID
|
||||
Some("serai"),
|
||||
Some("serai-devnet"),
|
||||
// Fork ID
|
||||
None,
|
||||
// Properties
|
||||
@@ -110,7 +162,7 @@ pub fn development_config() -> ChainSpec {
|
||||
)
|
||||
}
|
||||
|
||||
pub fn testnet_config() -> ChainSpec {
|
||||
pub fn local_config() -> ChainSpec {
|
||||
let wasm_binary = wasm_binary();
|
||||
|
||||
ChainSpec::from_genesis(
|
||||
@@ -120,7 +172,7 @@ pub fn testnet_config() -> ChainSpec {
|
||||
"local",
|
||||
ChainType::Local,
|
||||
move || {
|
||||
testnet_genesis(
|
||||
devnet_genesis(
|
||||
&wasm_binary,
|
||||
&["Alice", "Bob", "Charlie", "Dave"],
|
||||
vec![
|
||||
@@ -138,7 +190,7 @@ pub fn testnet_config() -> ChainSpec {
|
||||
// Telemetry
|
||||
None,
|
||||
// Protocol ID
|
||||
Some("serai"),
|
||||
Some("serai-local"),
|
||||
// Fork ID
|
||||
None,
|
||||
// Properties
|
||||
@@ -147,3 +199,137 @@ pub fn testnet_config() -> ChainSpec {
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn testnet_config() -> ChainSpec {
|
||||
{
|
||||
use std::time::{Duration, SystemTime};
|
||||
let secs_since_epoch = SystemTime::now()
|
||||
.duration_since(SystemTime::UNIX_EPOCH)
|
||||
.expect("current time is before the epoch")
|
||||
.as_secs();
|
||||
let secs_till_start = 1713283200_u64.saturating_sub(secs_since_epoch);
|
||||
std::thread::sleep(Duration::from_secs(secs_till_start));
|
||||
}
|
||||
|
||||
let wasm_binary = wasm_binary();
|
||||
|
||||
ChainSpec::from_genesis(
|
||||
// Name
|
||||
"Test Network 2",
|
||||
// ID
|
||||
"testnet-2",
|
||||
ChainType::Live,
|
||||
move || {
|
||||
testnet_genesis(
|
||||
&wasm_binary,
|
||||
vec![
|
||||
// Kayaba
|
||||
"4cef4080d00c6ff5ad93d61d1ca631cc10f8c9bd733e8c0c873a85b5fbe5c625",
|
||||
// CommunityStaking
|
||||
"587723d333049d9f4e6f027bbd701d603544a422329ea4e1027d60f7947e1074",
|
||||
// SHossain
|
||||
"6e30ec71b331d73992307fa7c53719ff238666d7d895487a1b691cc1e4481344",
|
||||
// StormyCloud
|
||||
"b0ebef6d712b3eb0f01e69a80519e55feff4be8b226fa64d84691e4b3ca2fb38",
|
||||
// Yangu
|
||||
"c692a906f9c63b7e4d12ad3cde204c6715b9a96b5b8ce565794917b7eaaa5f08",
|
||||
// t-900
|
||||
"6a9d5a3ca9422baec670e47238decf4a8515f2de0060b0a566a56dfd72686e52",
|
||||
// tappokone
|
||||
"36acb4be05513bed670ef3b43dc3a0fdfde8dc45339f81c80b53b2289dc3730c",
|
||||
// Sleipnir
|
||||
"0e87d766c9acec45b39445579cd3f40c8a4b42e9a34049bdbef0da83d000410e",
|
||||
"c2f96300a956e949883a5e8952270fb8193154a68533d0dd6b10076224e30167",
|
||||
"7a66312c53dfb153e842456f4e9a38dcda7e1788a3366df3e54125e29821f870",
|
||||
// jberman
|
||||
"b6e23eec7dbdb2bf72a087e335b44464cedfcc11c669033d6e520b3bc8de1650",
|
||||
// krytie
|
||||
"82815723c498d2aaaead050e63b979bb49a94a00c97b971c22340dffeaa36829",
|
||||
// toplel
|
||||
"4243da92918333bfc46f4d17ddeda0c3420d920231627dca1b6049f2f13cac6d",
|
||||
// clamking
|
||||
"941a6efa9e4dee6c3015cc42339fe56f43c2230133787746828befcee957cb1f",
|
||||
// Helios
|
||||
"56a0e89cffe57337e9e232e41dca0ad7306a17fa0ca63fbac048190fdd45d511",
|
||||
// akil
|
||||
"1caffa33b0ea1c7ed95c8450c0baf57baf9e1c1f43af3e28a722ef6d3d4db27e",
|
||||
// Eumaios
|
||||
"9ec7b5edf854f6285205468ed7402e40e5bed8238dc226dd4fd718a40efdce44",
|
||||
// pigeons
|
||||
"66c71ebf040542ab467def0ad935ec30ea693953d4322b3b168f6f4e9fcacb63",
|
||||
// joe_land1
|
||||
"94e25d8247b2f0e718bee169213052c693b78743dd91f403398a8837c34e0e6a",
|
||||
// rlking1255
|
||||
"82592430fe65e353510d3c1018cebc9806290e2d9098a94a1190f120f471c52b",
|
||||
// Seth For Privacy
|
||||
"f8ebbdb8ff2a77527528577bad6fd3297017f7b35a0613ba31d8af8e7e78cd7b",
|
||||
// lemon_respector
|
||||
"ce4a4cd996e4601a0226f3c8d9c9cae84519a1a7277b4822e1694b4a8c3ef10b",
|
||||
// tuxsudo
|
||||
"c6804a561d07d77c2806844a59c24bb9472df16043767721aae0caa20e82391e",
|
||||
// Awakeninghumanity.eth
|
||||
"5046c9f55a65e08df86c132c142f055db0376563fabc190f47a6851e0ff2af2b",
|
||||
// ART3MIS.CLOUD
|
||||
"5c1793880b0c06a5ce232288c7789cf4451ab20a8da49b84c88789965bc67356",
|
||||
// michnovka
|
||||
"98db8174ec40046b1bae39cad69ea0000d67e120524d46bc298d167407410618",
|
||||
// kgminer
|
||||
"8eca72a4bf684d7c4a20a34048003b504a046bce1289d3ae79a3b4422afaf808",
|
||||
// Benny
|
||||
"74b4f2d2347a4426c536e6ba48efa14b989b05f03c0ea9b1c67b23696c1a831d",
|
||||
// Argo
|
||||
"4025bbbe9c9be72769a27e5e6a3749782f4c9b2a47624bdcb0bfbd29f5e2056a",
|
||||
// vdo
|
||||
"1c87bbcd666099abc1ee2ec3f065abd073c237f95c4d0658b945e9d66d67622d",
|
||||
// PotR
|
||||
"b29ffbb4a4c0f14eb8c22fabaaacb43f92a62214ff45f0b4f50b7031c3a61a5a",
|
||||
// Ghalleb
|
||||
"48f903ed592638cee1c7f239a6ac14cbb0224a3153cff0f85eb0873113cf163f",
|
||||
// monerobull
|
||||
"56a2e3b410cb87bdb8125ae19d76a7be042de49693dc27f03e7a0dcc72b42f6c",
|
||||
// Adorid
|
||||
"3430222157262d6187c4537b026bcbaeb133695bbb512a7be8f25cc5a082d933",
|
||||
// KeepKey
|
||||
"a0ce13fb50c3d56548334af703b6ffb9a1b2f66e9dccf4a3688140b77fa58a06",
|
||||
// Username
|
||||
"b0e62f04f625447673a840d9c5f0e5867b355a67b0dee322334dc00925547b71",
|
||||
// R0BC0D3R
|
||||
"7e32cebc21b7979c36e477f0a849df1830cc052c879baf13107888654c0be654",
|
||||
// worksmarter
|
||||
"c4f2f6ffead84fcaa2e3c894d57c342a24c461eab5d1d17cae3d1a9e61d73e46",
|
||||
],
|
||||
)
|
||||
},
|
||||
// Bootnodes
|
||||
vec![],
|
||||
// Telemetry
|
||||
None,
|
||||
// Protocol ID
|
||||
Some("serai-testnet-2"),
|
||||
// Fork ID
|
||||
None,
|
||||
// Properties
|
||||
None,
|
||||
// Extensions
|
||||
None,
|
||||
)
|
||||
}
|
||||
|
||||
pub fn bootnode_multiaddrs(id: &str) -> Vec<libp2p::Multiaddr> {
|
||||
match id {
|
||||
"local" | "devnet" => vec![],
|
||||
"testnet-2" => vec![
|
||||
// Kayaba
|
||||
"/ip4/107.161.20.133/tcp/30333".parse().unwrap(),
|
||||
// lemon_respector
|
||||
"/ip4/188.66.62.11/tcp/30333".parse().unwrap(),
|
||||
// Ghalleb
|
||||
"/ip4/65.21.156.202/tcp/30333".parse().unwrap(),
|
||||
// ART3MIS.CLOUD
|
||||
"/ip4/51.195.60.217/tcp/30333".parse().unwrap(),
|
||||
// worksmarter
|
||||
"/ip4/37.60.255.101/tcp/30333".parse().unwrap(),
|
||||
],
|
||||
_ => panic!("requesting bootnodes for an unrecognized network"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,7 +40,8 @@ impl SubstrateCli for Cli {
|
||||
fn load_spec(&self, id: &str) -> Result<Box<dyn sc_service::ChainSpec>, String> {
|
||||
match id {
|
||||
"dev" | "devnet" => Ok(Box::new(chain_spec::development_config())),
|
||||
"local" => Ok(Box::new(chain_spec::testnet_config())),
|
||||
"local" => Ok(Box::new(chain_spec::local_config())),
|
||||
"testnet" => Ok(Box::new(chain_spec::testnet_config())),
|
||||
_ => panic!("Unknown network ID"),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ pub use sc_rpc_api::DenyUnsafe;
|
||||
use sc_transaction_pool_api::TransactionPool;
|
||||
|
||||
pub struct FullDeps<C, P> {
|
||||
pub id: String,
|
||||
pub client: Arc<C>,
|
||||
pub pool: Arc<P>,
|
||||
pub deny_unsafe: DenyUnsafe,
|
||||
@@ -46,18 +47,19 @@ where
|
||||
use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer};
|
||||
|
||||
let mut module = RpcModule::new(());
|
||||
let FullDeps { client, pool, deny_unsafe, authority_discovery } = deps;
|
||||
let FullDeps { id, client, pool, deny_unsafe, authority_discovery } = deps;
|
||||
|
||||
module.merge(System::new(client.clone(), pool, deny_unsafe).into_rpc())?;
|
||||
module.merge(TransactionPayment::new(client.clone()).into_rpc())?;
|
||||
|
||||
if let Some(authority_discovery) = authority_discovery {
|
||||
let mut authority_discovery_module = RpcModule::new((client, RwLock::new(authority_discovery)));
|
||||
let mut authority_discovery_module =
|
||||
RpcModule::new((id, client, RwLock::new(authority_discovery)));
|
||||
authority_discovery_module.register_async_method(
|
||||
"p2p_validators",
|
||||
|params, context| async move {
|
||||
let network: NetworkId = params.parse()?;
|
||||
let (client, authority_discovery) = &*context;
|
||||
let (id, client, authority_discovery) = &*context;
|
||||
let latest_block = client.info().best_hash;
|
||||
|
||||
let validators = client.runtime_api().validators(latest_block, network).map_err(|_| {
|
||||
@@ -66,7 +68,9 @@ where
|
||||
"please report this at https://github.com/serai-dex/serai",
|
||||
)))
|
||||
})?;
|
||||
let mut all_p2p_addresses = vec![];
|
||||
// Always return the protocol's bootnodes
|
||||
let mut all_p2p_addresses = crate::chain_spec::bootnode_multiaddrs(id);
|
||||
// Additionally returns validators found over the DHT
|
||||
for validator in validators {
|
||||
let mut returned_addresses = authority_discovery
|
||||
.write()
|
||||
|
||||
@@ -161,7 +161,7 @@ pub fn new_partial(
|
||||
))
|
||||
}
|
||||
|
||||
pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
|
||||
pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError> {
|
||||
let (
|
||||
sc_service::PartialComponents {
|
||||
client,
|
||||
@@ -176,6 +176,11 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
|
||||
keystore_container,
|
||||
) = new_partial(&config)?;
|
||||
|
||||
config.network.node_name = "serai".to_string();
|
||||
config.network.client_version = "0.1.0".to_string();
|
||||
config.network.listen_addresses =
|
||||
vec!["/ip4/0.0.0.0/tcp/30333".parse().unwrap(), "/ip6/::/tcp/30333".parse().unwrap()];
|
||||
|
||||
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
|
||||
let grandpa_protocol_name =
|
||||
grandpa::protocol_standard_name(&client.block_hash(0).unwrap().unwrap(), &config.chain_spec);
|
||||
@@ -203,6 +208,59 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
|
||||
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
|
||||
})?;
|
||||
|
||||
task_manager.spawn_handle().spawn("bootnodes", "bootnodes", {
|
||||
let network = network.clone();
|
||||
let id = config.chain_spec.id().to_string();
|
||||
|
||||
async move {
|
||||
// Transforms the above Multiaddrs into MultiaddrWithPeerIds
|
||||
// While the PeerIds *should* be known in advance and hardcoded, that data wasn't collected in
|
||||
// time and this fine for a testnet
|
||||
let bootnodes = || async {
|
||||
use libp2p::{Transport as TransportTrait, tcp::tokio::Transport, noise::Config};
|
||||
|
||||
let bootnode_multiaddrs = crate::chain_spec::bootnode_multiaddrs(&id);
|
||||
|
||||
let mut tasks = vec![];
|
||||
for multiaddr in bootnode_multiaddrs {
|
||||
tasks.push(tokio::time::timeout(
|
||||
core::time::Duration::from_secs(10),
|
||||
tokio::task::spawn(async move {
|
||||
let Ok(noise) = Config::new(&sc_network::Keypair::generate_ed25519()) else { None? };
|
||||
let mut transport = Transport::default()
|
||||
.upgrade(libp2p::core::upgrade::Version::V1)
|
||||
.authenticate(noise)
|
||||
.multiplex(libp2p::yamux::Config::default());
|
||||
let Ok(transport) = transport.dial(multiaddr.clone()) else { None? };
|
||||
let Ok((peer_id, _)) = transport.await else { None? };
|
||||
Some(sc_network::config::MultiaddrWithPeerId { multiaddr, peer_id })
|
||||
}),
|
||||
));
|
||||
}
|
||||
|
||||
let mut res = vec![];
|
||||
for task in tasks {
|
||||
if let Ok(Ok(Some(bootnode))) = task.await {
|
||||
res.push(bootnode);
|
||||
}
|
||||
}
|
||||
res
|
||||
};
|
||||
|
||||
use sc_network::{NetworkStatusProvider, NetworkPeers};
|
||||
loop {
|
||||
if let Ok(status) = network.status().await {
|
||||
if status.num_connected_peers < 3 {
|
||||
for bootnode in bootnodes().await {
|
||||
let _ = network.add_reserved_peer(bootnode);
|
||||
}
|
||||
}
|
||||
}
|
||||
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if config.offchain_worker.enabled {
|
||||
task_manager.spawn_handle().spawn(
|
||||
"offchain-workers-runner",
|
||||
@@ -258,11 +316,13 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
|
||||
};
|
||||
|
||||
let rpc_builder = {
|
||||
let id = config.chain_spec.id().to_string();
|
||||
let client = client.clone();
|
||||
let pool = transaction_pool.clone();
|
||||
|
||||
Box::new(move |deny_unsafe, _| {
|
||||
crate::rpc::create_full(crate::rpc::FullDeps {
|
||||
id: id.clone(),
|
||||
client: client.clone(),
|
||||
pool: pool.clone(),
|
||||
deny_unsafe,
|
||||
|
||||
Reference in New Issue
Block a user