31 Commits

Author SHA1 Message Date
Luke Parker
2ba6d77ee7 Reduce target peers a bit 2024-04-23 12:59:34 -04:00
Luke Parker
67a0ff825b Correct recv to try_recv when exhausting channel 2024-04-23 12:41:10 -04:00
Luke Parker
6518379981 Correct selection of to-try peers to prevent infinite loops when to-try < target 2024-04-23 12:41:00 -04:00
Luke Parker
0c6ab50e35 Use a constant for the target amount of peer 2024-04-23 12:40:50 -04:00
Luke Parker
f73ce37e18 Use a HashSet for which networks to try peer finding for
Prevents a flood of retries from individually failed attempts within a batch of
peer connection attempts.
2024-04-23 12:40:41 -04:00
Luke Parker
973dcf065e Correct port forward in orchestration 2024-04-23 09:47:11 -04:00
Luke Parker
8f5aaa8492 New coordinator port, genesis 2024-04-23 09:32:44 -04:00
Luke Parker
93ba8d840a Remove cbor 2024-04-23 09:31:33 -04:00
Luke Parker
485e454680 Inline broadcast_raw now that it doesn't have multiple callers 2024-04-23 09:31:17 -04:00
Luke Parker
c3b6abf020 Properly diversify ReqResMessageKind/GossipMessageKind 2024-04-23 09:31:09 -04:00
Luke Parker
f3ccf1cab0 Move keep alive, heartbeat, block to request/response 2024-04-23 09:30:58 -04:00
Luke Parker
0deee0ec6b Line for prior commit 2024-04-21 08:55:50 -04:00
Luke Parker
6b428948d4 Comment the insanely aggressive timeout future trace log 2024-04-21 08:55:32 -04:00
Luke Parker
6986257d4f Add missing continue to prevent dialing a node we're connected to 2024-04-21 08:37:06 -04:00
Luke Parker
a3c37cba21 Replace expect with debug log 2024-04-21 08:03:01 -04:00
Luke Parker
b5f2ff1397 Correct boolean NOT on is_fresh_dial 2024-04-21 07:30:18 -04:00
Luke Parker
c84931c6ae Retry if initial dials fail, not just upon disconnect 2024-04-21 07:26:29 -04:00
Luke Parker
63abf2d022 Restart coordinator peer finding upon disconnections 2024-04-21 07:03:03 -04:00
Luke Parker
a62d2d05ad Correct log which didn't work as intended 2024-04-20 19:55:17 -04:00
Luke Parker
967cc16748 Correct log targets in tendermint-machine 2024-04-20 19:55:06 -04:00
Luke Parker
ab4b8cc2d5 Better logs in tendermint-machine 2024-04-20 18:13:57 -04:00
Luke Parker
387ccbad3a Extend time in sync test 2024-04-18 16:39:16 -04:00
Luke Parker
26cdfdd824 fmt 2024-04-18 16:39:03 -04:00
Luke Parker
68e77384ac Don't broadcast added blocks
Online validators should inherently have them. Offline validators will receive
from the sync protocol.

This does somewhat eliminate the class of nodes who would follow the blockchain
(without validating it), yet that's fine for the performance benefit.
2024-04-18 16:38:52 -04:00
Luke Parker
68da88c1f3 Only reply to heartbeats after a certain distance 2024-04-18 16:38:43 -04:00
Luke Parker
2b481ab71e Ensure we don't reply to stale heartbeats 2024-04-18 16:38:21 -04:00
Luke Parker
05e6d81948 Only have some nodes respond to latent heartbeats
Also only respond if they're more than 2 blocks behind to minimize redundant
sending of blocks.
2024-04-18 16:38:16 -04:00
Luke Parker
e426cd00bd Correct protocol ID -> ID 2024-04-11 23:11:23 -04:00
Luke Parker
09e3881b7d Add worksmarter at the last minute 2024-04-11 16:08:14 -04:00
Luke Parker
10124ac4a8 Add Testnet 2 Config
Starts Tuesday, April 16th, with confirmed keys/boot nodes.
2024-04-11 15:49:32 -04:00
Luke Parker
1987983f88 Add bootnode code prior used in testnet-internal
Also performs the devnet/testnet differentation done since the testnet branch.
2024-04-11 14:37:09 -04:00
21 changed files with 849 additions and 345 deletions

1
Cargo.lock generated
View File

@@ -7613,6 +7613,7 @@ dependencies = [
"futures-util", "futures-util",
"hex", "hex",
"jsonrpsee", "jsonrpsee",
"libp2p",
"pallet-transaction-payment-rpc", "pallet-transaction-payment-rpc",
"rand_core", "rand_core",
"sc-authority-discovery", "sc-authority-discovery",

View File

@@ -51,7 +51,7 @@ env_logger = { version = "0.10", default-features = false, features = ["humantim
futures-util = { version = "0.3", default-features = false, features = ["std"] } futures-util = { version = "0.3", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] } tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "gossipsub", "macros"] } libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] }
[dev-dependencies] [dev-dependencies]
tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] } tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] }

View File

@@ -22,7 +22,7 @@ use serai_db::{Get, DbTxn, Db, create_db};
use processor_messages::coordinator::cosign_block_msg; use processor_messages::coordinator::cosign_block_msg;
use crate::{ use crate::{
p2p::{CosignedBlock, P2pMessageKind, P2p}, p2p::{CosignedBlock, GossipMessageKind, P2p},
substrate::LatestCosignedBlock, substrate::LatestCosignedBlock,
}; };
@@ -323,7 +323,7 @@ impl<D: Db> CosignEvaluator<D> {
for cosign in cosigns { for cosign in cosigns {
let mut buf = vec![]; let mut buf = vec![];
cosign.serialize(&mut buf).unwrap(); cosign.serialize(&mut buf).unwrap();
P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, buf).await; P2p::broadcast(&p2p, GossipMessageKind::CosignedBlock, buf).await;
} }
sleep(Duration::from_secs(60)).await; sleep(Duration::from_secs(60)).await;
} }

View File

@@ -260,7 +260,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
cosign_channel.send(cosigned_block).unwrap(); cosign_channel.send(cosigned_block).unwrap();
let mut buf = vec![]; let mut buf = vec![];
cosigned_block.serialize(&mut buf).unwrap(); cosigned_block.serialize(&mut buf).unwrap();
P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, buf).await; P2p::broadcast(p2p, GossipMessageKind::CosignedBlock, buf).await;
None None
} }
// This causes an action on Substrate yet not on any Tributary // This causes an action on Substrate yet not on any Tributary

View File

@@ -1,8 +1,8 @@
use core::{time::Duration, fmt}; use core::{time::Duration, fmt};
use std::{ use std::{
sync::Arc, sync::Arc,
io::Read, io::{self, Read},
collections::HashMap, collections::{HashSet, HashMap},
time::{SystemTime, Instant}, time::{SystemTime, Instant},
}; };
@@ -15,7 +15,7 @@ use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorS
use serai_db::Db; use serai_db::Db;
use futures_util::StreamExt; use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, StreamExt};
use tokio::{ use tokio::{
sync::{Mutex, RwLock, mpsc, broadcast}, sync::{Mutex, RwLock, mpsc, broadcast},
time::sleep, time::sleep,
@@ -27,12 +27,16 @@ use libp2p::{
PeerId, PeerId,
tcp::Config as TcpConfig, tcp::Config as TcpConfig,
noise, yamux, noise, yamux,
request_response::{
Codec as RrCodecTrait, Message as RrMessage, Event as RrEvent, Config as RrConfig,
Behaviour as RrBehavior,
},
gossipsub::{ gossipsub::{
IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentTopic, FastMessageId, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError, IdentityTransform, AllowAllSubscriptionFilter, Event as GsEvent, PublishError,
Behaviour as GsBehavior, Behaviour as GsBehavior,
}, },
swarm::{NetworkBehaviour, SwarmEvent, Swarm}, swarm::{NetworkBehaviour, SwarmEvent},
SwarmBuilder, SwarmBuilder,
}; };
@@ -40,6 +44,8 @@ pub(crate) use tributary::{ReadWrite, P2p as TributaryP2p};
use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent}; use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
// Block size limit + 1 KB of space for signatures/metadata
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
const LIBP2P_TOPIC: &str = "serai-coordinator"; const LIBP2P_TOPIC: &str = "serai-coordinator";
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, BorshSerialize, BorshDeserialize)]
@@ -51,72 +57,113 @@ pub struct CosignedBlock {
} }
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)] #[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind { pub enum ReqResMessageKind {
KeepAlive, KeepAlive,
Tributary([u8; 32]),
Heartbeat([u8; 32]), Heartbeat([u8; 32]),
Block([u8; 32]), Block([u8; 32]),
}
impl ReqResMessageKind {
pub fn read<R: Read>(reader: &mut R) -> Option<ReqResMessageKind> {
let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some(ReqResMessageKind::KeepAlive),
1 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
ReqResMessageKind::Heartbeat(genesis)
}),
2 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
ReqResMessageKind::Block(genesis)
}),
_ => None,
}
}
pub fn serialize(&self) -> Vec<u8> {
match self {
ReqResMessageKind::KeepAlive => vec![0],
ReqResMessageKind::Heartbeat(genesis) => {
let mut res = vec![1];
res.extend(genesis);
res
}
ReqResMessageKind::Block(genesis) => {
let mut res = vec![2];
res.extend(genesis);
res
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum GossipMessageKind {
Tributary([u8; 32]),
CosignedBlock, CosignedBlock,
} }
impl GossipMessageKind {
pub fn read<R: Read>(reader: &mut R) -> Option<GossipMessageKind> {
let mut kind = [0; 1];
reader.read_exact(&mut kind).ok()?;
match kind[0] {
0 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
GossipMessageKind::Tributary(genesis)
}),
1 => Some(GossipMessageKind::CosignedBlock),
_ => None,
}
}
pub fn serialize(&self) -> Vec<u8> {
match self {
GossipMessageKind::Tributary(genesis) => {
let mut res = vec![0];
res.extend(genesis);
res
}
GossipMessageKind::CosignedBlock => {
vec![1]
}
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind {
ReqRes(ReqResMessageKind),
Gossip(GossipMessageKind),
}
impl P2pMessageKind { impl P2pMessageKind {
fn genesis(&self) -> Option<[u8; 32]> { fn genesis(&self) -> Option<[u8; 32]> {
match self { match self {
P2pMessageKind::KeepAlive | P2pMessageKind::CosignedBlock => None, P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) |
P2pMessageKind::Tributary(genesis) | P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => None,
P2pMessageKind::Heartbeat(genesis) | P2pMessageKind::ReqRes(
P2pMessageKind::Block(genesis) => Some(*genesis), ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis),
} ) |
} P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => Some(*genesis),
fn serialize(&self) -> Vec<u8> {
match self {
P2pMessageKind::KeepAlive => vec![0],
P2pMessageKind::Tributary(genesis) => {
let mut res = vec![1];
res.extend(genesis);
res
}
P2pMessageKind::Heartbeat(genesis) => {
let mut res = vec![2];
res.extend(genesis);
res
}
P2pMessageKind::Block(genesis) => {
let mut res = vec![3];
res.extend(genesis);
res
}
P2pMessageKind::CosignedBlock => {
vec![4]
} }
} }
} }
fn read<R: Read>(reader: &mut R) -> Option<P2pMessageKind> { impl From<ReqResMessageKind> for P2pMessageKind {
let mut kind = [0; 1]; fn from(kind: ReqResMessageKind) -> P2pMessageKind {
reader.read_exact(&mut kind).ok()?; P2pMessageKind::ReqRes(kind)
match kind[0] {
0 => Some(P2pMessageKind::KeepAlive),
1 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Tributary(genesis)
}),
2 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Heartbeat(genesis)
}),
3 => Some({
let mut genesis = [0; 32];
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Block(genesis)
}),
4 => Some(P2pMessageKind::CosignedBlock),
_ => None,
} }
} }
impl From<GossipMessageKind> for P2pMessageKind {
fn from(kind: GossipMessageKind) -> P2pMessageKind {
P2pMessageKind::Gossip(kind)
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -133,17 +180,21 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
async fn subscribe(&self, set: ValidatorSet, genesis: [u8; 32]); async fn subscribe(&self, set: ValidatorSet, genesis: [u8; 32]);
async fn unsubscribe(&self, set: ValidatorSet, genesis: [u8; 32]); async fn unsubscribe(&self, set: ValidatorSet, genesis: [u8; 32]);
async fn send_raw(&self, to: Self::Id, genesis: Option<[u8; 32]>, msg: Vec<u8>); async fn send_raw(&self, to: Self::Id, msg: Vec<u8>);
async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec<u8>); async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>);
async fn receive_raw(&self) -> (Self::Id, Vec<u8>); async fn receive(&self) -> Message<Self>;
async fn send(&self, to: Self::Id, kind: P2pMessageKind, msg: Vec<u8>) { async fn send(&self, to: Self::Id, kind: ReqResMessageKind, msg: Vec<u8>) {
let mut actual_msg = kind.serialize(); let mut actual_msg = kind.serialize();
actual_msg.extend(msg); actual_msg.extend(msg);
self.send_raw(to, kind.genesis(), actual_msg).await; self.send_raw(to, actual_msg).await;
} }
async fn broadcast(&self, kind: P2pMessageKind, msg: Vec<u8>) { async fn broadcast(&self, kind: impl Send + Into<P2pMessageKind>, msg: Vec<u8>) {
let mut actual_msg = kind.serialize(); let kind = kind.into();
let mut actual_msg = match kind {
P2pMessageKind::ReqRes(kind) => kind.serialize(),
P2pMessageKind::Gossip(kind) => kind.serialize(),
};
actual_msg.extend(msg); actual_msg.extend(msg);
/* /*
log::trace!( log::trace!(
@@ -157,41 +208,70 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
} }
); );
*/ */
self.broadcast_raw(kind.genesis(), actual_msg).await; self.broadcast_raw(kind, actual_msg).await;
} }
async fn receive(&self) -> Message<Self> {
let (sender, kind, msg) = loop {
let (sender, msg) = self.receive_raw().await;
if msg.is_empty() {
log::error!("empty p2p message from {sender:?}");
continue;
} }
let mut msg_ref = msg.as_ref(); #[derive(Default, Clone, Copy, PartialEq, Eq, Debug)]
let Some(kind) = P2pMessageKind::read::<&[u8]>(&mut msg_ref) else { struct RrCodec;
log::error!("invalid p2p message kind from {sender:?}"); #[async_trait]
continue; impl RrCodecTrait for RrCodec {
}; type Protocol = &'static str;
break (sender, kind, msg_ref.to_vec()); type Request = Vec<u8>;
}; type Response = Vec<u8>;
/*
log::trace!( async fn read_request<R: Send + Unpin + AsyncRead>(
"received p2p message (kind {})", &mut self,
match kind { _: &Self::Protocol,
P2pMessageKind::KeepAlive => "KeepAlive".to_string(), io: &mut R,
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)), ) -> io::Result<Vec<u8>> {
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)), let mut len = [0; 4];
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)), io.read_exact(&mut len).await?;
P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(), let len = usize::try_from(u32::from_le_bytes(len)).expect("not a 32-bit platform?");
if len > MAX_LIBP2P_MESSAGE_SIZE {
Err(io::Error::other("request length exceeded MAX_LIBP2P_MESSAGE_SIZE"))?;
} }
); // This may be a non-trivial allocation easily causable
*/ // While we could chunk the read, meaning we only perform the allocation as bandwidth is used,
Message { sender, kind, msg } // the max message size should be sufficiently sane
let mut buf = vec![0; len];
io.read_exact(&mut buf).await?;
Ok(buf)
}
async fn read_response<R: Send + Unpin + AsyncRead>(
&mut self,
proto: &Self::Protocol,
io: &mut R,
) -> io::Result<Vec<u8>> {
self.read_request(proto, io).await
}
async fn write_request<W: Send + Unpin + AsyncWrite>(
&mut self,
_: &Self::Protocol,
io: &mut W,
req: Vec<u8>,
) -> io::Result<()> {
io.write_all(
&u32::try_from(req.len())
.map_err(|_| io::Error::other("request length exceeded 2**32"))?
.to_le_bytes(),
)
.await?;
io.write_all(&req).await
}
async fn write_response<W: Send + Unpin + AsyncWrite>(
&mut self,
proto: &Self::Protocol,
io: &mut W,
res: Vec<u8>,
) -> io::Result<()> {
self.write_request(proto, io, res).await
} }
} }
#[derive(NetworkBehaviour)] #[derive(NetworkBehaviour)]
struct Behavior { struct Behavior {
reqres: RrBehavior<RrCodec>,
gossipsub: GsBehavior, gossipsub: GsBehavior,
} }
@@ -199,8 +279,9 @@ struct Behavior {
#[derive(Clone)] #[derive(Clone)]
pub struct LibP2p { pub struct LibP2p {
subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, ValidatorSet, [u8; 32])>>>, subscribe: Arc<Mutex<mpsc::UnboundedSender<(bool, ValidatorSet, [u8; 32])>>>,
broadcast: Arc<Mutex<mpsc::UnboundedSender<(Option<[u8; 32]>, Vec<u8>)>>>, send: Arc<Mutex<mpsc::UnboundedSender<(PeerId, Vec<u8>)>>>,
receive: Arc<Mutex<mpsc::UnboundedReceiver<(PeerId, Vec<u8>)>>>, broadcast: Arc<Mutex<mpsc::UnboundedSender<(P2pMessageKind, Vec<u8>)>>>,
receive: Arc<Mutex<mpsc::UnboundedReceiver<Message<Self>>>>,
} }
impl fmt::Debug for LibP2p { impl fmt::Debug for LibP2p {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
@@ -211,14 +292,12 @@ impl fmt::Debug for LibP2p {
impl LibP2p { impl LibP2p {
#[allow(clippy::new_without_default)] #[allow(clippy::new_without_default)]
pub fn new(serai: Arc<Serai>) -> Self { pub fn new(serai: Arc<Serai>) -> Self {
// Block size limit + 1 KB of space for signatures/metadata
const MAX_LIBP2P_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 1024;
log::info!("creating a libp2p instance"); log::info!("creating a libp2p instance");
let throwaway_key_pair = Keypair::generate_ed25519(); let throwaway_key_pair = Keypair::generate_ed25519();
let behavior = Behavior { let behavior = Behavior {
reqres: { RrBehavior::new([], RrConfig::default()) },
gossipsub: { gossipsub: {
let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2; let heartbeat_interval = tributary::tendermint::LATENCY_TIME / 2;
let heartbeats_per_block = let heartbeats_per_block =
@@ -279,9 +358,10 @@ impl LibP2p {
.with_behaviour(|_| behavior) .with_behaviour(|_| behavior)
.unwrap() .unwrap()
.build(); .build();
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o') const PORT: u16 = 30564; // 5132 ^ (('c' << 8) | 'o')
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap(); swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
let (send_send, mut send_recv) = mpsc::unbounded_channel();
let (broadcast_send, mut broadcast_recv) = mpsc::unbounded_channel(); let (broadcast_send, mut broadcast_recv) = mpsc::unbounded_channel();
let (receive_send, receive_recv) = mpsc::unbounded_channel(); let (receive_send, receive_recv) = mpsc::unbounded_channel();
let (subscribe_send, mut subscribe_recv) = mpsc::unbounded_channel(); let (subscribe_send, mut subscribe_recv) = mpsc::unbounded_channel();
@@ -290,17 +370,31 @@ impl LibP2p {
IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode()))) IdentTopic::new(format!("{LIBP2P_TOPIC}-{}", hex::encode(set.encode())))
} }
// TODO: If a network has less than TARGET_PEERS, this will cause retries ad infinitum
const TARGET_PEERS: usize = 5;
// The addrs we're currently dialing, and the networks associated with them
let dialing_peers = Arc::new(RwLock::new(HashMap::new()));
// The peers we're currently connected to, and the networks associated with them
let connected_peers = Arc::new(RwLock::new(HashMap::<Multiaddr, HashSet<NetworkId>>::new()));
// Find and connect to peers // Find and connect to peers
let (pending_p2p_connections_send, mut pending_p2p_connections_recv) = let (connect_to_network_send, mut connect_to_network_recv) =
tokio::sync::mpsc::unbounded_channel(); tokio::sync::mpsc::unbounded_channel();
let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel(); let (to_dial_send, mut to_dial_recv) = tokio::sync::mpsc::unbounded_channel();
tokio::spawn({ tokio::spawn({
let pending_p2p_connections_send = pending_p2p_connections_send.clone(); let dialing_peers = dialing_peers.clone();
let connected_peers = connected_peers.clone();
let connect_to_network_send = connect_to_network_send.clone();
async move { async move {
loop { loop {
// TODO: Add better peer management logic? let connect = |network: NetworkId, addr: Multiaddr| {
{ let dialing_peers = dialing_peers.clone();
let connect = |addr: Multiaddr| { let connected_peers = connected_peers.clone();
let to_dial_send = to_dial_send.clone();
let connect_to_network_send = connect_to_network_send.clone();
async move {
log::info!("found peer from substrate: {addr}"); log::info!("found peer from substrate: {addr}");
let protocols = addr.iter().filter_map(|piece| match piece { let protocols = addr.iter().filter_map(|piece| match piece {
@@ -318,46 +412,99 @@ impl LibP2p {
let addr = new_addr; let addr = new_addr;
log::debug!("transformed found peer: {addr}"); log::debug!("transformed found peer: {addr}");
// TODO: Check this isn't a duplicate let (is_fresh_dial, nets) = {
to_dial_send.send(addr).unwrap(); let mut dialing_peers = dialing_peers.write().await;
let is_fresh_dial = !dialing_peers.contains_key(&addr);
if is_fresh_dial {
dialing_peers.insert(addr.clone(), HashSet::new());
}
// Associate this network with this peer
dialing_peers.get_mut(&addr).unwrap().insert(network);
let nets = dialing_peers.get(&addr).unwrap().clone();
(is_fresh_dial, nets)
};
// Spawn a task to remove this peer from 'dialing' in sixty seconds, in case dialing
// fails
// This performs cleanup and bounds the size of the map to whatever growth occurs
// within a temporal window
tokio::spawn({
let dialing_peers = dialing_peers.clone();
let connected_peers = connected_peers.clone();
let connect_to_network_send = connect_to_network_send.clone();
let addr = addr.clone();
async move {
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
let mut dialing_peers = dialing_peers.write().await;
if let Some(expected_nets) = dialing_peers.remove(&addr) {
log::debug!("removed addr from dialing upon timeout: {addr}");
// TODO: De-duplicate this below instance
// If we failed to dial and haven't gotten enough actual connections, retry
let connected_peers = connected_peers.read().await;
for net in expected_nets {
let mut remaining_peers = 0;
for nets in connected_peers.values() {
if nets.contains(&net) {
remaining_peers += 1;
}
}
// If we do not, start connecting to this network again
if remaining_peers < TARGET_PEERS {
connect_to_network_send.send(net).expect(
"couldn't send net to connect to due to disconnects (receiver dropped?)",
);
}
}
}
}
});
if is_fresh_dial {
to_dial_send.send((addr, nets)).unwrap();
}
}
}; };
// TODO: We should also connect to random peers from random nets as needed for // TODO: We should also connect to random peers from random nets as needed for
// cosigning // cosigning
let mut to_retry = vec![];
while let Some(network) = pending_p2p_connections_recv.recv().await { // Drain the chainnel, de-duplicating any networks in it
let mut connect_to_network_networks = HashSet::new();
while let Ok(network) = connect_to_network_recv.try_recv() {
connect_to_network_networks.insert(network);
}
for network in connect_to_network_networks {
if let Ok(mut nodes) = serai.p2p_validators(network).await { if let Ok(mut nodes) = serai.p2p_validators(network).await {
// If there's an insufficient amount of nodes known, connect to all yet add it // If there's an insufficient amount of nodes known, connect to all yet add it
// back and break // back and break
if nodes.len() < 3 { if nodes.len() < TARGET_PEERS {
log::warn!( log::warn!(
"insufficient amount of P2P nodes known for {:?}: {}", "insufficient amount of P2P nodes known for {:?}: {}",
network, network,
nodes.len() nodes.len()
); );
to_retry.push(network); // Retry this later
connect_to_network_send.send(network).unwrap();
for node in nodes { for node in nodes {
connect(node); connect(network, node).await;
} }
continue; continue;
} }
// Randomly select up to 5 // Randomly select up to 150% of the TARGET_PEERS
for _ in 0 .. 5 { for _ in 0 .. ((3 * TARGET_PEERS) / 2) {
if !nodes.is_empty() { if !nodes.is_empty() {
let to_connect = nodes.swap_remove( let to_connect = nodes.swap_remove(
usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap()) usize::try_from(OsRng.next_u64() % u64::try_from(nodes.len()).unwrap())
.unwrap(), .unwrap(),
); );
connect(to_connect); connect(network, to_connect).await;
} }
} }
} }
} }
for to_retry in to_retry {
pending_p2p_connections_send.send(to_retry).unwrap();
}
}
// Sleep 60 seconds before moving to the next iteration // Sleep 60 seconds before moving to the next iteration
tokio::time::sleep(core::time::Duration::from_secs(60)).await; tokio::time::sleep(core::time::Duration::from_secs(60)).await;
} }
@@ -368,35 +515,10 @@ impl LibP2p {
tokio::spawn({ tokio::spawn({
let mut time_of_last_p2p_message = Instant::now(); let mut time_of_last_p2p_message = Instant::now();
#[allow(clippy::needless_pass_by_ref_mut)] // False positive
fn broadcast_raw(
p2p: &mut Swarm<Behavior>,
time_of_last_p2p_message: &mut Instant,
set: Option<ValidatorSet>,
msg: Vec<u8>,
) {
// Update the time of last message
*time_of_last_p2p_message = Instant::now();
let topic =
if let Some(set) = set { topic_for_set(set) } else { IdentTopic::new(LIBP2P_TOPIC) };
match p2p.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
Err(PublishError::SigningError(e)) => panic!("signing error when broadcasting: {e}"),
Err(PublishError::InsufficientPeers) => {
log::warn!("failed to send p2p message due to insufficient peers")
}
Err(PublishError::MessageTooLarge) => {
panic!("tried to send a too large message: {}", hex::encode(msg))
}
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
Err(PublishError::Duplicate) | Ok(_) => {}
}
}
async move { async move {
let connected_peers = connected_peers.clone();
let mut set_for_genesis = HashMap::new(); let mut set_for_genesis = HashMap::new();
let mut connected_peers = 0;
loop { loop {
let time_since_last = Instant::now().duration_since(time_of_last_p2p_message); let time_since_last = Instant::now().duration_since(time_of_last_p2p_message);
tokio::select! { tokio::select! {
@@ -409,7 +531,7 @@ impl LibP2p {
let topic = topic_for_set(set); let topic = topic_for_set(set);
if subscribe { if subscribe {
log::info!("subscribing to p2p messages for {set:?}"); log::info!("subscribing to p2p messages for {set:?}");
pending_p2p_connections_send.send(set.network).unwrap(); connect_to_network_send.send(set.network).unwrap();
set_for_genesis.insert(genesis, set); set_for_genesis.insert(genesis, set);
swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap(); swarm.behaviour_mut().gossipsub.subscribe(&topic).unwrap();
} else { } else {
@@ -419,17 +541,50 @@ impl LibP2p {
} }
} }
msg = send_recv.recv() => {
let (peer, msg): (PeerId, Vec<u8>) =
msg.expect("send_recv closed. are we shutting down?");
swarm.behaviour_mut().reqres.send_request(&peer, msg);
},
// Handle any queued outbound messages // Handle any queued outbound messages
msg = broadcast_recv.recv() => { msg = broadcast_recv.recv() => {
let (genesis, msg): (Option<[u8; 32]>, Vec<u8>) = // Update the time of last message
time_of_last_p2p_message = Instant::now();
let (kind, msg): (P2pMessageKind, Vec<u8>) =
msg.expect("broadcast_recv closed. are we shutting down?"); msg.expect("broadcast_recv closed. are we shutting down?");
let set = genesis.and_then(|genesis| set_for_genesis.get(&genesis).copied());
broadcast_raw( if matches!(kind, P2pMessageKind::ReqRes(_)) {
&mut swarm, // Use request/response, yet send to all connected peers
&mut time_of_last_p2p_message, for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
set, swarm.behaviour_mut().reqres.send_request(&peer_id, msg.clone());
msg, }
); } else {
// Use gossipsub
let set =
kind.genesis().and_then(|genesis| set_for_genesis.get(&genesis).copied());
let topic = if let Some(set) = set {
topic_for_set(set)
} else {
IdentTopic::new(LIBP2P_TOPIC)
};
match swarm.behaviour_mut().gossipsub.publish(topic, msg.clone()) {
Err(PublishError::SigningError(e)) => {
panic!("signing error when broadcasting: {e}")
},
Err(PublishError::InsufficientPeers) => {
log::warn!("failed to send p2p message due to insufficient peers")
}
Err(PublishError::MessageTooLarge) => {
panic!("tried to send a too large message: {}", hex::encode(msg))
}
Err(PublishError::TransformFailed(e)) => panic!("IdentityTransform failed: {e}"),
Err(PublishError::Duplicate) | Ok(_) => {}
}
}
} }
// Handle new incoming messages // Handle new incoming messages
@@ -438,42 +593,119 @@ impl LibP2p {
Some(SwarmEvent::Dialing { connection_id, .. }) => { Some(SwarmEvent::Dialing { connection_id, .. }) => {
log::debug!("dialing to peer in connection ID {}", &connection_id); log::debug!("dialing to peer in connection ID {}", &connection_id);
} }
Some(SwarmEvent::ConnectionEstablished { peer_id, connection_id, .. }) => { Some(SwarmEvent::ConnectionEstablished {
peer_id,
connection_id,
endpoint,
..
}) => {
if &peer_id == swarm.local_peer_id() { if &peer_id == swarm.local_peer_id() {
log::warn!("established a libp2p connection to ourselves"); log::warn!("established a libp2p connection to ourselves");
swarm.close_connection(connection_id); swarm.close_connection(connection_id);
continue; continue;
} }
connected_peers += 1; let addr = endpoint.get_remote_address();
let nets = {
let mut dialing_peers = dialing_peers.write().await;
if let Some(nets) = dialing_peers.remove(addr) {
nets
} else {
log::debug!("connected to a peer who we didn't have within dialing");
HashSet::new()
}
};
{
let mut connected_peers = connected_peers.write().await;
connected_peers.insert(addr.clone(), nets);
log::debug!( log::debug!(
"connection established to peer {} in connection ID {}, connected peers: {}", "connection established to peer {} in connection ID {}, connected peers: {}",
&peer_id, &peer_id,
&connection_id, &connection_id,
connected_peers, connected_peers.len(),
); );
} }
Some(SwarmEvent::ConnectionClosed { peer_id, .. }) => { }
connected_peers -= 1; Some(SwarmEvent::ConnectionClosed { peer_id, endpoint, .. }) => {
let mut connected_peers = connected_peers.write().await;
let Some(nets) = connected_peers.remove(endpoint.get_remote_address()) else {
log::debug!("closed connection to peer which wasn't in connected_peers");
continue;
};
// Downgrade to a read lock
let connected_peers = connected_peers.downgrade();
// For each net we lost a peer for, check if we still have sufficient peers
// overall
for net in nets {
let mut remaining_peers = 0;
for nets in connected_peers.values() {
if nets.contains(&net) {
remaining_peers += 1;
}
}
// If we do not, start connecting to this network again
if remaining_peers < TARGET_PEERS {
connect_to_network_send
.send(net)
.expect(
"couldn't send net to connect to due to disconnects (receiver dropped?)"
);
}
}
log::debug!( log::debug!(
"connection with peer {peer_id} closed, connected peers: {}", "connection with peer {peer_id} closed, connected peers: {}",
connected_peers, connected_peers.len(),
); );
} }
Some(SwarmEvent::Behaviour(BehaviorEvent::Reqres(
RrEvent::Message { peer, message },
))) => {
let message = match message {
RrMessage::Request { request, .. } => request,
RrMessage::Response { response, .. } => response,
};
let mut msg_ref = message.as_slice();
let Some(kind) = ReqResMessageKind::read(&mut msg_ref) else { continue };
let message = Message {
sender: peer,
kind: P2pMessageKind::ReqRes(kind),
msg: msg_ref.to_vec(),
};
receive_send.send(message).expect("receive_send closed. are we shutting down?");
}
Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub( Some(SwarmEvent::Behaviour(BehaviorEvent::Gossipsub(
GsEvent::Message { propagation_source, message, .. }, GsEvent::Message { propagation_source, message, .. },
))) => { ))) => {
receive_send let mut msg_ref = message.data.as_slice();
.send((propagation_source, message.data)) let Some(kind) = GossipMessageKind::read(&mut msg_ref) else { continue };
.expect("receive_send closed. are we shutting down?"); let message = Message {
sender: propagation_source,
kind: P2pMessageKind::Gossip(kind),
msg: msg_ref.to_vec(),
};
receive_send.send(message).expect("receive_send closed. are we shutting down?");
} }
_ => {} _ => {}
} }
} }
// Handle peers to dial // Handle peers to dial
addr = to_dial_recv.recv() => { addr_and_nets = to_dial_recv.recv() => {
let addr = addr.expect("received address was None (sender dropped?)"); let (addr, nets) =
addr_and_nets.expect("received address was None (sender dropped?)");
// If we've already dialed and connected to this address, don't further dial them
// Just associate these networks with them
if let Some(existing_nets) = connected_peers.write().await.get_mut(&addr) {
for net in nets {
existing_nets.insert(net);
}
continue;
}
if let Err(e) = swarm.dial(addr) { if let Err(e) = swarm.dial(addr) {
log::warn!("dialing peer failed: {e:?}"); log::warn!("dialing peer failed: {e:?}");
} }
@@ -487,12 +719,13 @@ impl LibP2p {
// (where a finalized block only occurs due to network activity), meaning this won't be // (where a finalized block only occurs due to network activity), meaning this won't be
// run // run
() = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => { () = tokio::time::sleep(Duration::from_secs(80).saturating_sub(time_since_last)) => {
broadcast_raw( time_of_last_p2p_message = Instant::now();
&mut swarm, for peer_id in swarm.connected_peers().copied().collect::<Vec<_>>() {
&mut time_of_last_p2p_message, swarm
None, .behaviour_mut()
P2pMessageKind::KeepAlive.serialize() .reqres
); .send_request(&peer_id, ReqResMessageKind::KeepAlive.serialize());
}
} }
} }
} }
@@ -501,6 +734,7 @@ impl LibP2p {
LibP2p { LibP2p {
subscribe: Arc::new(Mutex::new(subscribe_send)), subscribe: Arc::new(Mutex::new(subscribe_send)),
send: Arc::new(Mutex::new(send_send)),
broadcast: Arc::new(Mutex::new(broadcast_send)), broadcast: Arc::new(Mutex::new(broadcast_send)),
receive: Arc::new(Mutex::new(receive_recv)), receive: Arc::new(Mutex::new(receive_recv)),
} }
@@ -529,22 +763,22 @@ impl P2p for LibP2p {
.expect("subscribe_send closed. are we shutting down?"); .expect("subscribe_send closed. are we shutting down?");
} }
async fn send_raw(&self, _: Self::Id, genesis: Option<[u8; 32]>, msg: Vec<u8>) { async fn send_raw(&self, peer: Self::Id, msg: Vec<u8>) {
self.broadcast_raw(genesis, msg).await; self.send.lock().await.send((peer, msg)).expect("send_send closed. are we shutting down?");
} }
async fn broadcast_raw(&self, genesis: Option<[u8; 32]>, msg: Vec<u8>) { async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>) {
self self
.broadcast .broadcast
.lock() .lock()
.await .await
.send((genesis, msg)) .send((kind, msg))
.expect("broadcast_send closed. are we shutting down?"); .expect("broadcast_send closed. are we shutting down?");
} }
// TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant // TODO: We only have a single handle call this. Differentiate Send/Recv to remove this constant
// lock acquisition? // lock acquisition?
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) { async fn receive(&self) -> Message<Self> {
self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?") self.receive.lock().await.recv().await.expect("receive_recv closed. are we shutting down?")
} }
} }
@@ -552,7 +786,7 @@ impl P2p for LibP2p {
#[async_trait] #[async_trait]
impl TributaryP2p for LibP2p { impl TributaryP2p for LibP2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) { async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await <Self as P2p>::broadcast(self, GossipMessageKind::Tributary(genesis), msg).await
} }
} }
@@ -590,16 +824,12 @@ pub async fn heartbeat_tributaries_task<D: Db, P: P2p>(
if SystemTime::now() > (block_time + Duration::from_secs(60)) { if SystemTime::now() > (block_time + Duration::from_secs(60)) {
log::warn!("last known tributary block was over a minute ago"); log::warn!("last known tributary block was over a minute ago");
let mut msg = tip.to_vec(); let mut msg = tip.to_vec();
// Also include the timestamp so LibP2p doesn't flag this as an old message re-circulating let time: u64 = SystemTime::now()
let timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH) .duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock is wrong") .expect("system clock is wrong")
.as_secs(); .as_secs();
// Divide by the block time so if multiple parties send a Heartbeat, they're more likely to msg.extend(time.to_le_bytes());
// overlap P2p::broadcast(&p2p, ReqResMessageKind::Heartbeat(tributary.genesis()), msg).await;
let time_unit = timestamp / u64::from(Tributary::<D, Transaction, P>::block_time());
msg.extend(time_unit.to_le_bytes());
P2p::broadcast(&p2p, P2pMessageKind::Heartbeat(tributary.genesis()), msg).await;
} }
} }
@@ -631,6 +861,8 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
// Subscribe to the topic for this tributary // Subscribe to the topic for this tributary
p2p.subscribe(tributary.spec.set(), genesis).await; p2p.subscribe(tributary.spec.set(), genesis).await;
let spec_set = tributary.spec.set();
// Per-Tributary P2P message handler // Per-Tributary P2P message handler
tokio::spawn({ tokio::spawn({
let p2p = p2p.clone(); let p2p = p2p.clone();
@@ -641,91 +873,58 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
break; break;
}; };
match msg.kind { match msg.kind {
P2pMessageKind::KeepAlive => {} P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {}
P2pMessageKind::Tributary(msg_genesis) => { // TODO: Slash on Heartbeat which justifies a response, since the node
assert_eq!(msg_genesis, genesis);
log::trace!("handling message for tributary {:?}", tributary.spec.set());
if tributary.tributary.handle_message(&msg.msg).await {
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
}
}
// TODO2: Rate limit this per timestamp
// And/or slash on Heartbeat which justifies a response, since the node
// obviously was offline and we must now use our bandwidth to compensate for // obviously was offline and we must now use our bandwidth to compensate for
// them? // them?
P2pMessageKind::Heartbeat(msg_genesis) => { P2pMessageKind::ReqRes(ReqResMessageKind::Heartbeat(msg_genesis)) => {
assert_eq!(msg_genesis, genesis); assert_eq!(msg_genesis, genesis);
if msg.msg.len() != 40 { if msg.msg.len() != 40 {
log::error!("validator sent invalid heartbeat"); log::error!("validator sent invalid heartbeat");
continue; continue;
} }
// Only respond to recent heartbeats
let msg_time = u64::from_le_bytes(msg.msg[32 .. 40].try_into().expect(
"length-checked heartbeat message didn't have 8 bytes for the u64",
));
if SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("system clock is wrong")
.as_secs()
.saturating_sub(msg_time) >
10
{
continue;
}
log::debug!("received heartbeat with a recent timestamp");
let reader = tributary.tributary.reader();
let p2p = p2p.clone(); let p2p = p2p.clone();
let spec = tributary.spec.clone();
let reader = tributary.tributary.reader();
// Spawn a dedicated task as this may require loading large amounts of data // Spawn a dedicated task as this may require loading large amounts of data
// from disk and take a notable amount of time // from disk and take a notable amount of time
tokio::spawn(async move { tokio::spawn(async move {
/*
// Have sqrt(n) nodes reply with the blocks
let mut responders = (tributary.spec.n() as f32).sqrt().floor() as u64;
// Try to have at least 3 responders
if responders < 3 {
responders = tributary.spec.n().min(3).into();
}
*/
/*
// Have up to three nodes respond
let responders = u64::from(spec.n().min(3));
// Decide which nodes will respond by using the latest block's hash as a
// mutually agreed upon entropy source
// This isn't a secure source of entropy, yet it's fine for this
let entropy = u64::from_le_bytes(reader.tip()[.. 8].try_into().unwrap());
// If n = 10, responders = 3, we want `start` to be 0 ..= 7
// (so the highest is 7, 8, 9)
// entropy % (10 + 1) - 3 = entropy % 8 = 0 ..= 7
let start =
usize::try_from(entropy % (u64::from(spec.n() + 1) - responders))
.unwrap();
let mut selected = false;
for validator in &spec.validators()
[start .. (start + usize::try_from(responders).unwrap())]
{
if our_key == validator.0 {
selected = true;
break;
}
}
if !selected {
log::debug!("received heartbeat and not selected to respond");
return;
}
log::debug!("received heartbeat and selected to respond");
*/
// Have every node respond
// While we could only have a subset respond, LibP2P will sync all messages
// it isn't aware of
// It's cheaper to be aware from our disk than from over the network
// TODO: Spawn a dedicated topic for this heartbeat response?
let mut latest = msg.msg[.. 32].try_into().unwrap(); let mut latest = msg.msg[.. 32].try_into().unwrap();
let mut to_send = vec![];
while let Some(next) = reader.block_after(&latest) { while let Some(next) = reader.block_after(&latest) {
to_send.push(next);
latest = next;
}
if to_send.len() > 3 {
for next in to_send {
let mut res = reader.block(&next).unwrap().serialize(); let mut res = reader.block(&next).unwrap().serialize();
res.extend(reader.commit(&next).unwrap()); res.extend(reader.commit(&next).unwrap());
// Also include the timestamp used within the Heartbeat // Also include the timestamp used within the Heartbeat
res.extend(&msg.msg[32 .. 40]); res.extend(&msg.msg[32 .. 40]);
p2p.send(msg.sender, P2pMessageKind::Block(spec.genesis()), res).await; p2p.send(msg.sender, ReqResMessageKind::Block(genesis), res).await;
latest = next; }
} }
}); });
} }
P2pMessageKind::Block(msg_genesis) => { P2pMessageKind::ReqRes(ReqResMessageKind::Block(msg_genesis)) => {
assert_eq!(msg_genesis, genesis); assert_eq!(msg_genesis, genesis);
let mut msg_ref: &[u8] = msg.msg.as_ref(); let mut msg_ref: &[u8] = msg.msg.as_ref();
let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else { let Ok(block) = Block::<Transaction>::read(&mut msg_ref) else {
@@ -744,7 +943,15 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
); );
} }
P2pMessageKind::CosignedBlock => unreachable!(), P2pMessageKind::Gossip(GossipMessageKind::Tributary(msg_genesis)) => {
assert_eq!(msg_genesis, genesis);
log::trace!("handling message for tributary {:?}", spec_set);
if tributary.tributary.handle_message(&msg.msg).await {
P2p::broadcast(&p2p, msg.kind, msg.msg).await;
}
}
P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => unreachable!(),
} }
} }
} }
@@ -764,15 +971,16 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
loop { loop {
let msg = p2p.receive().await; let msg = p2p.receive().await;
match msg.kind { match msg.kind {
P2pMessageKind::KeepAlive => {} P2pMessageKind::ReqRes(ReqResMessageKind::KeepAlive) => {}
P2pMessageKind::Tributary(genesis) | P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) |
P2pMessageKind::Heartbeat(genesis) | P2pMessageKind::ReqRes(
P2pMessageKind::Block(genesis) => { ReqResMessageKind::Heartbeat(genesis) | ReqResMessageKind::Block(genesis),
) => {
if let Some(channel) = channels.read().await.get(&genesis) { if let Some(channel) = channels.read().await.get(&genesis) {
channel.send(msg).unwrap(); channel.send(msg).unwrap();
} }
} }
P2pMessageKind::CosignedBlock => { P2pMessageKind::Gossip(GossipMessageKind::CosignedBlock) => {
let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else { let Ok(msg) = CosignedBlock::deserialize_reader(&mut msg.msg.as_slice()) else {
log::error!("received CosignedBlock message with invalidly serialized contents"); log::error!("received CosignedBlock message with invalidly serialized contents");
continue; continue;

View File

@@ -14,7 +14,7 @@ use tokio::sync::RwLock;
use crate::{ use crate::{
processors::{Message, Processors}, processors::{Message, Processors},
TributaryP2p, P2pMessageKind, P2p, TributaryP2p, ReqResMessageKind, GossipMessageKind, P2pMessageKind, Message as P2pMessage, P2p,
}; };
pub mod tributary; pub mod tributary;
@@ -45,7 +45,10 @@ impl Processors for MemProcessors {
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct LocalP2p(usize, pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, Vec<u8>)>>)>>); pub struct LocalP2p(
usize,
pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, P2pMessageKind, Vec<u8>)>>)>>,
);
impl LocalP2p { impl LocalP2p {
pub fn new(validators: usize) -> Vec<LocalP2p> { pub fn new(validators: usize) -> Vec<LocalP2p> {
@@ -65,11 +68,13 @@ impl P2p for LocalP2p {
async fn subscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {} async fn subscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {} async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
async fn send_raw(&self, to: Self::Id, _genesis: Option<[u8; 32]>, msg: Vec<u8>) { async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
self.1.write().await.1[to].push_back((self.0, msg)); let mut msg_ref = msg.as_slice();
let kind = ReqResMessageKind::read(&mut msg_ref).unwrap();
self.1.write().await.1[to].push_back((self.0, P2pMessageKind::ReqRes(kind), msg_ref.to_vec()));
} }
async fn broadcast_raw(&self, _genesis: Option<[u8; 32]>, msg: Vec<u8>) { async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>) {
// Content-based deduplication // Content-based deduplication
let mut lock = self.1.write().await; let mut lock = self.1.write().await;
{ {
@@ -81,19 +86,26 @@ impl P2p for LocalP2p {
} }
let queues = &mut lock.1; let queues = &mut lock.1;
let kind_len = (match kind {
P2pMessageKind::ReqRes(kind) => kind.serialize(),
P2pMessageKind::Gossip(kind) => kind.serialize(),
})
.len();
let msg = msg[kind_len ..].to_vec();
for (i, msg_queue) in queues.iter_mut().enumerate() { for (i, msg_queue) in queues.iter_mut().enumerate() {
if i == self.0 { if i == self.0 {
continue; continue;
} }
msg_queue.push_back((self.0, msg.clone())); msg_queue.push_back((self.0, kind, msg.clone()));
} }
} }
async fn receive_raw(&self) -> (Self::Id, Vec<u8>) { async fn receive(&self) -> P2pMessage<Self> {
// This is a cursed way to implement an async read from a Vec // This is a cursed way to implement an async read from a Vec
loop { loop {
if let Some(res) = self.1.write().await.1[self.0].pop_front() { if let Some((sender, kind, msg)) = self.1.write().await.1[self.0].pop_front() {
return res; return P2pMessage { sender, kind, msg };
} }
tokio::time::sleep(std::time::Duration::from_millis(100)).await; tokio::time::sleep(std::time::Duration::from_millis(100)).await;
} }
@@ -103,6 +115,11 @@ impl P2p for LocalP2p {
#[async_trait] #[async_trait]
impl TributaryP2p for LocalP2p { impl TributaryP2p for LocalP2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) { async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
<Self as P2p>::broadcast(self, P2pMessageKind::Tributary(genesis), msg).await <Self as P2p>::broadcast(
self,
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)),
msg,
)
.await
} }
} }

View File

@@ -26,7 +26,7 @@ use serai_db::MemDb;
use tributary::Tributary; use tributary::Tributary;
use crate::{ use crate::{
P2pMessageKind, P2p, GossipMessageKind, P2pMessageKind, P2p,
tributary::{Transaction, TributarySpec}, tributary::{Transaction, TributarySpec},
tests::LocalP2p, tests::LocalP2p,
}; };
@@ -98,7 +98,7 @@ pub async fn run_tributaries(
for (p2p, tributary) in &mut tributaries { for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) { while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind { match msg.kind {
P2pMessageKind::Tributary(genesis) => { P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
assert_eq!(genesis, tributary.genesis()); assert_eq!(genesis, tributary.genesis());
if tributary.handle_message(&msg.msg).await { if tributary.handle_message(&msg.msg).await {
p2p.broadcast(msg.kind, msg.msg).await; p2p.broadcast(msg.kind, msg.msg).await;
@@ -173,7 +173,7 @@ async fn tributary_test() {
for (p2p, tributary) in &mut tributaries { for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) { while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind { match msg.kind {
P2pMessageKind::Tributary(genesis) => { P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
assert_eq!(genesis, tributary.genesis()); assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await; tributary.handle_message(&msg.msg).await;
} }
@@ -199,7 +199,7 @@ async fn tributary_test() {
for (p2p, tributary) in &mut tributaries { for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) { while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind { match msg.kind {
P2pMessageKind::Tributary(genesis) => { P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
assert_eq!(genesis, tributary.genesis()); assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await; tributary.handle_message(&msg.msg).await;
} }

View File

@@ -116,8 +116,8 @@ async fn sync_test() {
.map_err(|_| "failed to send ActiveTributary to heartbeat") .map_err(|_| "failed to send ActiveTributary to heartbeat")
.unwrap(); .unwrap();
// The heartbeat is once every 10 blocks // The heartbeat is once every 10 blocks, with some limitations
sleep(Duration::from_secs(10 * block_time)).await; sleep(Duration::from_secs(20 * block_time)).await;
assert!(syncer_tributary.tip().await != spec.genesis()); assert!(syncer_tributary.tip().await != spec.genesis());
// Verify it synced to the tip // Verify it synced to the tip

View File

@@ -74,7 +74,7 @@ impl TributarySpec {
pub fn genesis(&self) -> [u8; 32] { pub fn genesis(&self) -> [u8; 32] {
// Calculate the genesis for this Tributary // Calculate the genesis for this Tributary
let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis"); let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis Testnet 2.1");
// This locks it to a specific Serai chain // This locks it to a specific Serai chain
genesis.append_message(b"serai_block", self.serai_block); genesis.append_message(b"serai_block", self.serai_block);
genesis.append_message(b"session", self.set.session.0.to_le_bytes()); genesis.append_message(b"session", self.set.session.0.to_le_bytes());

View File

@@ -59,8 +59,7 @@ pub const ACCOUNT_MEMPOOL_LIMIT: u32 = 50;
pub const BLOCK_SIZE_LIMIT: usize = 3_001_000; pub const BLOCK_SIZE_LIMIT: usize = 3_001_000;
pub(crate) const TENDERMINT_MESSAGE: u8 = 0; pub(crate) const TENDERMINT_MESSAGE: u8 = 0;
pub(crate) const BLOCK_MESSAGE: u8 = 1; pub(crate) const TRANSACTION_MESSAGE: u8 = 2; // TODO: Normalize to 1
pub(crate) const TRANSACTION_MESSAGE: u8 = 2;
#[allow(clippy::large_enum_variant)] #[allow(clippy::large_enum_variant)]
#[derive(Clone, PartialEq, Eq, Debug)] #[derive(Clone, PartialEq, Eq, Debug)]
@@ -336,9 +335,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
// Return true if the message should be rebroadcasted. // Return true if the message should be rebroadcasted.
pub async fn handle_message(&self, msg: &[u8]) -> bool { pub async fn handle_message(&self, msg: &[u8]) -> bool {
// Acquire the lock now to prevent sync_block from being run at the same time
let mut sync_block = self.synced_block_result.write().await;
match msg.first() { match msg.first() {
Some(&TRANSACTION_MESSAGE) => { Some(&TRANSACTION_MESSAGE) => {
let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else { let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else {
@@ -370,19 +366,6 @@ impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
false false
} }
Some(&BLOCK_MESSAGE) => {
let mut msg_ref = &msg[1 ..];
let Ok(block) = Block::<T>::read(&mut msg_ref) else {
log::error!("received invalid block message");
return false;
};
let commit = msg[(msg.len() - msg_ref.len()) ..].to_vec();
if self.sync_block_internal(block, commit, &mut sync_block).await {
log::debug!("synced block over p2p net instead of building the commit ourselves");
}
false
}
_ => false, _ => false,
} }
} }

View File

@@ -41,9 +41,8 @@ use tendermint::{
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::{ use crate::{
TENDERMINT_MESSAGE, TRANSACTION_MESSAGE, BLOCK_MESSAGE, ReadWrite, TENDERMINT_MESSAGE, TRANSACTION_MESSAGE, ReadWrite, transaction::Transaction as TransactionTrait,
transaction::Transaction as TransactionTrait, Transaction, BlockHeader, Block, BlockError, Transaction, BlockHeader, Block, BlockError, Blockchain, P2p,
Blockchain, P2p,
}; };
pub mod tx; pub mod tx;
@@ -414,12 +413,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
); );
match block_res { match block_res {
Ok(()) => { Ok(()) => {
// If we successfully added this block, broadcast it // If we successfully added this block, break
// TODO: Move this under the coordinator once we set up on new block notifications?
let mut msg = serialized_block.0;
msg.insert(0, BLOCK_MESSAGE);
msg.extend(encoded_commit);
self.p2p.broadcast(self.genesis, msg).await;
break; break;
} }
Err(BlockError::NonLocalProvided(hash)) => { Err(BlockError::NonLocalProvided(hash)) => {
@@ -428,6 +422,7 @@ impl<D: Db, T: TransactionTrait, P: P2p> Network for TendermintNetwork<D, T, P>
hex::encode(hash), hex::encode(hash),
hex::encode(self.genesis) hex::encode(self.genesis)
); );
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
} }
_ => return invalid_block(), _ => return invalid_block(),
} }

View File

@@ -313,11 +313,16 @@ impl<N: Network + 'static> TendermintMachine<N> {
let time_until_round_end = round_end.instant().saturating_duration_since(Instant::now()); let time_until_round_end = round_end.instant().saturating_duration_since(Instant::now());
if time_until_round_end == Duration::ZERO { if time_until_round_end == Duration::ZERO {
log::trace!( log::trace!(
target: "tendermint",
"resetting when prior round ended {}ms ago", "resetting when prior round ended {}ms ago",
Instant::now().saturating_duration_since(round_end.instant()).as_millis(), Instant::now().saturating_duration_since(round_end.instant()).as_millis(),
); );
} }
log::trace!("sleeping until round ends in {}ms", time_until_round_end.as_millis()); log::trace!(
target: "tendermint",
"sleeping until round ends in {}ms",
time_until_round_end.as_millis(),
);
sleep(time_until_round_end).await; sleep(time_until_round_end).await;
// Clear our outbound message queue // Clear our outbound message queue
@@ -598,7 +603,11 @@ impl<N: Network + 'static> TendermintMachine<N> {
); );
let id = block.id(); let id = block.id();
let proposal = self.network.add_block(block, commit).await; let proposal = self.network.add_block(block, commit).await;
log::trace!("added block {} (produced by machine)", hex::encode(id.as_ref())); log::trace!(
target: "tendermint",
"added block {} (produced by machine)",
hex::encode(id.as_ref()),
);
self.reset(msg.round, proposal).await; self.reset(msg.round, proposal).await;
} }
Err(TendermintError::Malicious(sender, evidence)) => { Err(TendermintError::Malicious(sender, evidence)) => {
@@ -692,7 +701,12 @@ impl<N: Network + 'static> TendermintMachine<N> {
(msg.round == self.block.round().number) && (msg.round == self.block.round().number) &&
(msg.data.step() == Step::Propose) (msg.data.step() == Step::Propose)
{ {
log::trace!("received Propose for block {}, round {}", msg.block.0, msg.round.0); log::trace!(
target: "tendermint",
"received Propose for block {}, round {}",
msg.block.0,
msg.round.0,
);
} }
// If this is a precommit, verify its signature // If this is a precommit, verify its signature
@@ -710,7 +724,13 @@ impl<N: Network + 'static> TendermintMachine<N> {
if !self.block.log.log(signed.clone())? { if !self.block.log.log(signed.clone())? {
return Err(TendermintError::AlreadyHandled); return Err(TendermintError::AlreadyHandled);
} }
log::debug!(target: "tendermint", "received new tendermint message"); log::debug!(
target: "tendermint",
"received new tendermint message (block: {}, round: {}, step: {:?})",
msg.block.0,
msg.round.0,
msg.data.step(),
);
// All functions, except for the finalizer and the jump, are locked to the current round // All functions, except for the finalizer and the jump, are locked to the current round
@@ -757,6 +777,13 @@ impl<N: Network + 'static> TendermintMachine<N> {
// 55-56 // 55-56
// Jump, enabling processing by the below code // Jump, enabling processing by the below code
if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() { if self.block.log.round_participation(msg.round) > self.weights.fault_threshold() {
log::debug!(
target: "tendermint",
"jumping from round {} to round {}",
self.block.round().number.0,
msg.round.0,
);
// Jump to the new round. // Jump to the new round.
let proposer = self.round(msg.round, None); let proposer = self.round(msg.round, None);
@@ -814,13 +841,26 @@ impl<N: Network + 'static> TendermintMachine<N> {
if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) { if (self.block.round().step == Step::Prevote) && matches!(msg.data, Data::Prevote(_)) {
let (participation, weight) = let (participation, weight) =
self.block.log.message_instances(self.block.round().number, &Data::Prevote(None)); self.block.log.message_instances(self.block.round().number, &Data::Prevote(None));
let threshold_weight = self.weights.threshold();
if participation < threshold_weight {
log::trace!(
target: "tendermint",
"progess towards setting prevote timeout, participation: {}, needed: {}",
participation,
threshold_weight,
);
}
// 34-35 // 34-35
if participation >= self.weights.threshold() { if participation >= threshold_weight {
log::trace!(
target: "tendermint",
"setting timeout for prevote due to sufficient participation",
);
self.block.round_mut().set_timeout(Step::Prevote); self.block.round_mut().set_timeout(Step::Prevote);
} }
// 44-46 // 44-46
if weight >= self.weights.threshold() { if weight >= threshold_weight {
self.broadcast(Data::Precommit(None)); self.broadcast(Data::Precommit(None));
return Ok(None); return Ok(None);
} }
@@ -830,6 +870,10 @@ impl<N: Network + 'static> TendermintMachine<N> {
if matches!(msg.data, Data::Precommit(_)) && if matches!(msg.data, Data::Precommit(_)) &&
self.block.log.has_participation(self.block.round().number, Step::Precommit) self.block.log.has_participation(self.block.round().number, Step::Precommit)
{ {
log::trace!(
target: "tendermint",
"setting timeout for precommit due to sufficient participation",
);
self.block.round_mut().set_timeout(Step::Precommit); self.block.round_mut().set_timeout(Step::Precommit);
} }

View File

@@ -1,6 +1,5 @@
use std::{sync::Arc, collections::HashMap}; use std::{sync::Arc, collections::HashMap};
use log::debug;
use parity_scale_codec::Encode; use parity_scale_codec::Encode;
use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence}; use crate::{ext::*, RoundNumber, Step, DataFor, TendermintError, SignedMessageFor, Evidence};
@@ -27,7 +26,7 @@ impl<N: Network> MessageLog<N> {
let step = msg.data.step(); let step = msg.data.step();
if let Some(existing) = msgs.get(&step) { if let Some(existing) = msgs.get(&step) {
if existing.msg.data != msg.data { if existing.msg.data != msg.data {
debug!( log::debug!(
target: "tendermint", target: "tendermint",
"Validator sent multiple messages for the same block + round + step" "Validator sent multiple messages for the same block + round + step"
); );

View File

@@ -57,6 +57,7 @@ impl<N: Network> RoundData<N> {
// Poll all set timeouts, returning the Step whose timeout has just expired // Poll all set timeouts, returning the Step whose timeout has just expired
pub(crate) async fn timeout_future(&self) -> Step { pub(crate) async fn timeout_future(&self) -> Step {
/*
let now = Instant::now(); let now = Instant::now();
log::trace!( log::trace!(
target: "tendermint", target: "tendermint",
@@ -64,6 +65,7 @@ impl<N: Network> RoundData<N> {
self.step, self.step,
self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::<HashMap<_, _>>() self.timeouts.iter().map(|(k, v)| (k, v.duration_since(now))).collect::<HashMap<_, _>>()
); );
*/
let timeout_future = |step| { let timeout_future = |step| {
let timeout = self.timeouts.get(&step).copied(); let timeout = self.timeouts.get(&step).copied();

View File

@@ -511,7 +511,7 @@ fn start(network: Network, services: HashSet<String>) {
command command
} else { } else {
// Publish the port // Publish the port
command.arg("-p").arg("30563:30563") command.arg("-p").arg("30564:30564")
} }
} }
"serai" => { "serai" => {

View File

@@ -159,9 +159,11 @@ pub mod pallet {
/// ///
/// Errors if any amount overflows. /// Errors if any amount overflows.
pub fn mint(to: Public, balance: Balance) -> Result<(), Error<T, I>> { pub fn mint(to: Public, balance: Balance) -> Result<(), Error<T, I>> {
/*
if !T::AllowMint::is_allowed(&balance) { if !T::AllowMint::is_allowed(&balance) {
Err(Error::<T, I>::MintNotAllowed)?; Err(Error::<T, I>::MintNotAllowed)?;
} }
*/
// update the balance // update the balance
Self::increase_balance_internal(to, balance)?; Self::increase_balance_internal(to, balance)?;

View File

@@ -26,6 +26,8 @@ hex = "0.4"
rand_core = "0.6" rand_core = "0.6"
schnorrkel = "0.11" schnorrkel = "0.11"
libp2p = "0.52"
sp-core = { git = "https://github.com/serai-dex/substrate" } sp-core = { git = "https://github.com/serai-dex/substrate" }
sp-keystore = { git = "https://github.com/serai-dex/substrate" } sp-keystore = { git = "https://github.com/serai-dex/substrate" }
sp-timestamp = { git = "https://github.com/serai-dex/substrate" } sp-timestamp = { git = "https://github.com/serai-dex/substrate" }

View File

@@ -1,6 +1,7 @@
use core::marker::PhantomData; use core::marker::PhantomData;
use std::collections::HashSet;
use sp_core::Pair as PairTrait; use sp_core::{Decode, Pair as PairTrait, sr25519::Public};
use sc_service::ChainType; use sc_service::ChainType;
@@ -23,7 +24,7 @@ fn wasm_binary() -> Vec<u8> {
WASM_BINARY.ok_or("compiled in wasm not available").unwrap().to_vec() WASM_BINARY.ok_or("compiled in wasm not available").unwrap().to_vec()
} }
fn testnet_genesis( fn devnet_genesis(
wasm_binary: &[u8], wasm_binary: &[u8],
validators: &[&'static str], validators: &[&'static str],
endowed_accounts: Vec<PublicKey>, endowed_accounts: Vec<PublicKey>,
@@ -72,6 +73,57 @@ fn testnet_genesis(
} }
} }
fn testnet_genesis(wasm_binary: &[u8], validators: Vec<&'static str>) -> RuntimeGenesisConfig {
let validators = validators
.into_iter()
.map(|validator| Public::decode(&mut hex::decode(validator).unwrap().as_slice()).unwrap())
.collect::<Vec<_>>();
assert_eq!(validators.iter().collect::<HashSet<_>>().len(), validators.len());
RuntimeGenesisConfig {
system: SystemConfig { code: wasm_binary.to_vec(), _config: PhantomData },
transaction_payment: Default::default(),
coins: CoinsConfig {
accounts: validators
.iter()
.map(|a| (*a, Balance { coin: Coin::Serai, amount: Amount(5_000_000 * 10_u64.pow(8)) }))
.collect(),
_ignore: Default::default(),
},
dex: DexConfig {
pools: vec![Coin::Bitcoin, Coin::Ether, Coin::Dai, Coin::Monero],
_ignore: Default::default(),
},
validator_sets: ValidatorSetsConfig {
networks: serai_runtime::primitives::NETWORKS
.iter()
.map(|network| match network {
NetworkId::Serai => (NetworkId::Serai, Amount(50_000 * 10_u64.pow(8))),
NetworkId::Bitcoin => (NetworkId::Bitcoin, Amount(1_000_000 * 10_u64.pow(8))),
NetworkId::Ethereum => (NetworkId::Ethereum, Amount(1_000_000 * 10_u64.pow(8))),
NetworkId::Monero => (NetworkId::Monero, Amount(100_000 * 10_u64.pow(8))),
})
.collect(),
participants: validators.clone(),
},
signals: SignalsConfig::default(),
babe: BabeConfig {
authorities: validators.iter().map(|validator| ((*validator).into(), 1)).collect(),
epoch_config: Some(BABE_GENESIS_EPOCH_CONFIG),
_config: PhantomData,
},
grandpa: GrandpaConfig {
authorities: validators.into_iter().map(|validator| (validator.into(), 1)).collect(),
_config: PhantomData,
},
}
}
pub fn development_config() -> ChainSpec { pub fn development_config() -> ChainSpec {
let wasm_binary = wasm_binary(); let wasm_binary = wasm_binary();
@@ -82,7 +134,7 @@ pub fn development_config() -> ChainSpec {
"devnet", "devnet",
ChainType::Development, ChainType::Development,
move || { move || {
testnet_genesis( devnet_genesis(
&wasm_binary, &wasm_binary,
&["Alice"], &["Alice"],
vec![ vec![
@@ -100,7 +152,7 @@ pub fn development_config() -> ChainSpec {
// Telemetry // Telemetry
None, None,
// Protocol ID // Protocol ID
Some("serai"), Some("serai-devnet"),
// Fork ID // Fork ID
None, None,
// Properties // Properties
@@ -110,7 +162,7 @@ pub fn development_config() -> ChainSpec {
) )
} }
pub fn testnet_config() -> ChainSpec { pub fn local_config() -> ChainSpec {
let wasm_binary = wasm_binary(); let wasm_binary = wasm_binary();
ChainSpec::from_genesis( ChainSpec::from_genesis(
@@ -120,7 +172,7 @@ pub fn testnet_config() -> ChainSpec {
"local", "local",
ChainType::Local, ChainType::Local,
move || { move || {
testnet_genesis( devnet_genesis(
&wasm_binary, &wasm_binary,
&["Alice", "Bob", "Charlie", "Dave"], &["Alice", "Bob", "Charlie", "Dave"],
vec![ vec![
@@ -138,7 +190,7 @@ pub fn testnet_config() -> ChainSpec {
// Telemetry // Telemetry
None, None,
// Protocol ID // Protocol ID
Some("serai"), Some("serai-local"),
// Fork ID // Fork ID
None, None,
// Properties // Properties
@@ -147,3 +199,137 @@ pub fn testnet_config() -> ChainSpec {
None, None,
) )
} }
pub fn testnet_config() -> ChainSpec {
{
use std::time::{Duration, SystemTime};
let secs_since_epoch = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("current time is before the epoch")
.as_secs();
let secs_till_start = 1713283200_u64.saturating_sub(secs_since_epoch);
std::thread::sleep(Duration::from_secs(secs_till_start));
}
let wasm_binary = wasm_binary();
ChainSpec::from_genesis(
// Name
"Test Network 2",
// ID
"testnet-2",
ChainType::Live,
move || {
testnet_genesis(
&wasm_binary,
vec![
// Kayaba
"4cef4080d00c6ff5ad93d61d1ca631cc10f8c9bd733e8c0c873a85b5fbe5c625",
// CommunityStaking
"587723d333049d9f4e6f027bbd701d603544a422329ea4e1027d60f7947e1074",
// SHossain
"6e30ec71b331d73992307fa7c53719ff238666d7d895487a1b691cc1e4481344",
// StormyCloud
"b0ebef6d712b3eb0f01e69a80519e55feff4be8b226fa64d84691e4b3ca2fb38",
// Yangu
"c692a906f9c63b7e4d12ad3cde204c6715b9a96b5b8ce565794917b7eaaa5f08",
// t-900
"6a9d5a3ca9422baec670e47238decf4a8515f2de0060b0a566a56dfd72686e52",
// tappokone
"36acb4be05513bed670ef3b43dc3a0fdfde8dc45339f81c80b53b2289dc3730c",
// Sleipnir
"0e87d766c9acec45b39445579cd3f40c8a4b42e9a34049bdbef0da83d000410e",
"c2f96300a956e949883a5e8952270fb8193154a68533d0dd6b10076224e30167",
"7a66312c53dfb153e842456f4e9a38dcda7e1788a3366df3e54125e29821f870",
// jberman
"b6e23eec7dbdb2bf72a087e335b44464cedfcc11c669033d6e520b3bc8de1650",
// krytie
"82815723c498d2aaaead050e63b979bb49a94a00c97b971c22340dffeaa36829",
// toplel
"4243da92918333bfc46f4d17ddeda0c3420d920231627dca1b6049f2f13cac6d",
// clamking
"941a6efa9e4dee6c3015cc42339fe56f43c2230133787746828befcee957cb1f",
// Helios
"56a0e89cffe57337e9e232e41dca0ad7306a17fa0ca63fbac048190fdd45d511",
// akil
"1caffa33b0ea1c7ed95c8450c0baf57baf9e1c1f43af3e28a722ef6d3d4db27e",
// Eumaios
"9ec7b5edf854f6285205468ed7402e40e5bed8238dc226dd4fd718a40efdce44",
// pigeons
"66c71ebf040542ab467def0ad935ec30ea693953d4322b3b168f6f4e9fcacb63",
// joe_land1
"94e25d8247b2f0e718bee169213052c693b78743dd91f403398a8837c34e0e6a",
// rlking1255
"82592430fe65e353510d3c1018cebc9806290e2d9098a94a1190f120f471c52b",
// Seth For Privacy
"f8ebbdb8ff2a77527528577bad6fd3297017f7b35a0613ba31d8af8e7e78cd7b",
// lemon_respector
"ce4a4cd996e4601a0226f3c8d9c9cae84519a1a7277b4822e1694b4a8c3ef10b",
// tuxsudo
"c6804a561d07d77c2806844a59c24bb9472df16043767721aae0caa20e82391e",
// Awakeninghumanity.eth
"5046c9f55a65e08df86c132c142f055db0376563fabc190f47a6851e0ff2af2b",
// ART3MIS.CLOUD
"5c1793880b0c06a5ce232288c7789cf4451ab20a8da49b84c88789965bc67356",
// michnovka
"98db8174ec40046b1bae39cad69ea0000d67e120524d46bc298d167407410618",
// kgminer
"8eca72a4bf684d7c4a20a34048003b504a046bce1289d3ae79a3b4422afaf808",
// Benny
"74b4f2d2347a4426c536e6ba48efa14b989b05f03c0ea9b1c67b23696c1a831d",
// Argo
"4025bbbe9c9be72769a27e5e6a3749782f4c9b2a47624bdcb0bfbd29f5e2056a",
// vdo
"1c87bbcd666099abc1ee2ec3f065abd073c237f95c4d0658b945e9d66d67622d",
// PotR
"b29ffbb4a4c0f14eb8c22fabaaacb43f92a62214ff45f0b4f50b7031c3a61a5a",
// Ghalleb
"48f903ed592638cee1c7f239a6ac14cbb0224a3153cff0f85eb0873113cf163f",
// monerobull
"56a2e3b410cb87bdb8125ae19d76a7be042de49693dc27f03e7a0dcc72b42f6c",
// Adorid
"3430222157262d6187c4537b026bcbaeb133695bbb512a7be8f25cc5a082d933",
// KeepKey
"a0ce13fb50c3d56548334af703b6ffb9a1b2f66e9dccf4a3688140b77fa58a06",
// Username
"b0e62f04f625447673a840d9c5f0e5867b355a67b0dee322334dc00925547b71",
// R0BC0D3R
"7e32cebc21b7979c36e477f0a849df1830cc052c879baf13107888654c0be654",
// worksmarter
"c4f2f6ffead84fcaa2e3c894d57c342a24c461eab5d1d17cae3d1a9e61d73e46",
],
)
},
// Bootnodes
vec![],
// Telemetry
None,
// Protocol ID
Some("serai-testnet-2"),
// Fork ID
None,
// Properties
None,
// Extensions
None,
)
}
pub fn bootnode_multiaddrs(id: &str) -> Vec<libp2p::Multiaddr> {
match id {
"local" | "devnet" => vec![],
"testnet-2" => vec![
// Kayaba
"/ip4/107.161.20.133/tcp/30333".parse().unwrap(),
// lemon_respector
"/ip4/188.66.62.11/tcp/30333".parse().unwrap(),
// Ghalleb
"/ip4/65.21.156.202/tcp/30333".parse().unwrap(),
// ART3MIS.CLOUD
"/ip4/51.195.60.217/tcp/30333".parse().unwrap(),
// worksmarter
"/ip4/37.60.255.101/tcp/30333".parse().unwrap(),
],
_ => panic!("requesting bootnodes for an unrecognized network"),
}
}

View File

@@ -40,7 +40,8 @@ impl SubstrateCli for Cli {
fn load_spec(&self, id: &str) -> Result<Box<dyn sc_service::ChainSpec>, String> { fn load_spec(&self, id: &str) -> Result<Box<dyn sc_service::ChainSpec>, String> {
match id { match id {
"dev" | "devnet" => Ok(Box::new(chain_spec::development_config())), "dev" | "devnet" => Ok(Box::new(chain_spec::development_config())),
"local" => Ok(Box::new(chain_spec::testnet_config())), "local" => Ok(Box::new(chain_spec::local_config())),
"testnet" => Ok(Box::new(chain_spec::testnet_config())),
_ => panic!("Unknown network ID"), _ => panic!("Unknown network ID"),
} }
} }

View File

@@ -19,6 +19,7 @@ pub use sc_rpc_api::DenyUnsafe;
use sc_transaction_pool_api::TransactionPool; use sc_transaction_pool_api::TransactionPool;
pub struct FullDeps<C, P> { pub struct FullDeps<C, P> {
pub id: String,
pub client: Arc<C>, pub client: Arc<C>,
pub pool: Arc<P>, pub pool: Arc<P>,
pub deny_unsafe: DenyUnsafe, pub deny_unsafe: DenyUnsafe,
@@ -46,18 +47,19 @@ where
use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer}; use pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer};
let mut module = RpcModule::new(()); let mut module = RpcModule::new(());
let FullDeps { client, pool, deny_unsafe, authority_discovery } = deps; let FullDeps { id, client, pool, deny_unsafe, authority_discovery } = deps;
module.merge(System::new(client.clone(), pool, deny_unsafe).into_rpc())?; module.merge(System::new(client.clone(), pool, deny_unsafe).into_rpc())?;
module.merge(TransactionPayment::new(client.clone()).into_rpc())?; module.merge(TransactionPayment::new(client.clone()).into_rpc())?;
if let Some(authority_discovery) = authority_discovery { if let Some(authority_discovery) = authority_discovery {
let mut authority_discovery_module = RpcModule::new((client, RwLock::new(authority_discovery))); let mut authority_discovery_module =
RpcModule::new((id, client, RwLock::new(authority_discovery)));
authority_discovery_module.register_async_method( authority_discovery_module.register_async_method(
"p2p_validators", "p2p_validators",
|params, context| async move { |params, context| async move {
let network: NetworkId = params.parse()?; let network: NetworkId = params.parse()?;
let (client, authority_discovery) = &*context; let (id, client, authority_discovery) = &*context;
let latest_block = client.info().best_hash; let latest_block = client.info().best_hash;
let validators = client.runtime_api().validators(latest_block, network).map_err(|_| { let validators = client.runtime_api().validators(latest_block, network).map_err(|_| {
@@ -66,7 +68,9 @@ where
"please report this at https://github.com/serai-dex/serai", "please report this at https://github.com/serai-dex/serai",
))) )))
})?; })?;
let mut all_p2p_addresses = vec![]; // Always return the protocol's bootnodes
let mut all_p2p_addresses = crate::chain_spec::bootnode_multiaddrs(id);
// Additionally returns validators found over the DHT
for validator in validators { for validator in validators {
let mut returned_addresses = authority_discovery let mut returned_addresses = authority_discovery
.write() .write()

View File

@@ -161,7 +161,7 @@ pub fn new_partial(
)) ))
} }
pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> { pub fn new_full(mut config: Configuration) -> Result<TaskManager, ServiceError> {
let ( let (
sc_service::PartialComponents { sc_service::PartialComponents {
client, client,
@@ -176,6 +176,11 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
keystore_container, keystore_container,
) = new_partial(&config)?; ) = new_partial(&config)?;
config.network.node_name = "serai".to_string();
config.network.client_version = "0.1.0".to_string();
config.network.listen_addresses =
vec!["/ip4/0.0.0.0/tcp/30333".parse().unwrap(), "/ip6/::/tcp/30333".parse().unwrap()];
let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network); let mut net_config = sc_network::config::FullNetworkConfiguration::new(&config.network);
let grandpa_protocol_name = let grandpa_protocol_name =
grandpa::protocol_standard_name(&client.block_hash(0).unwrap().unwrap(), &config.chain_spec); grandpa::protocol_standard_name(&client.block_hash(0).unwrap().unwrap(), &config.chain_spec);
@@ -203,6 +208,59 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)), warp_sync_params: Some(WarpSyncParams::WithProvider(warp_sync)),
})?; })?;
task_manager.spawn_handle().spawn("bootnodes", "bootnodes", {
let network = network.clone();
let id = config.chain_spec.id().to_string();
async move {
// Transforms the above Multiaddrs into MultiaddrWithPeerIds
// While the PeerIds *should* be known in advance and hardcoded, that data wasn't collected in
// time and this fine for a testnet
let bootnodes = || async {
use libp2p::{Transport as TransportTrait, tcp::tokio::Transport, noise::Config};
let bootnode_multiaddrs = crate::chain_spec::bootnode_multiaddrs(&id);
let mut tasks = vec![];
for multiaddr in bootnode_multiaddrs {
tasks.push(tokio::time::timeout(
core::time::Duration::from_secs(10),
tokio::task::spawn(async move {
let Ok(noise) = Config::new(&sc_network::Keypair::generate_ed25519()) else { None? };
let mut transport = Transport::default()
.upgrade(libp2p::core::upgrade::Version::V1)
.authenticate(noise)
.multiplex(libp2p::yamux::Config::default());
let Ok(transport) = transport.dial(multiaddr.clone()) else { None? };
let Ok((peer_id, _)) = transport.await else { None? };
Some(sc_network::config::MultiaddrWithPeerId { multiaddr, peer_id })
}),
));
}
let mut res = vec![];
for task in tasks {
if let Ok(Ok(Some(bootnode))) = task.await {
res.push(bootnode);
}
}
res
};
use sc_network::{NetworkStatusProvider, NetworkPeers};
loop {
if let Ok(status) = network.status().await {
if status.num_connected_peers < 3 {
for bootnode in bootnodes().await {
let _ = network.add_reserved_peer(bootnode);
}
}
}
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
}
}
});
if config.offchain_worker.enabled { if config.offchain_worker.enabled {
task_manager.spawn_handle().spawn( task_manager.spawn_handle().spawn(
"offchain-workers-runner", "offchain-workers-runner",
@@ -258,11 +316,13 @@ pub fn new_full(config: Configuration) -> Result<TaskManager, ServiceError> {
}; };
let rpc_builder = { let rpc_builder = {
let id = config.chain_spec.id().to_string();
let client = client.clone(); let client = client.clone();
let pool = transaction_pool.clone(); let pool = transaction_pool.clone();
Box::new(move |deny_unsafe, _| { Box::new(move |deny_unsafe, _| {
crate::rpc::create_full(crate::rpc::FullDeps { crate::rpc::create_full(crate::rpc::FullDeps {
id: id.clone(),
client: client.clone(), client: client.clone(),
pool: pool.clone(), pool: pool.clone(),
deny_unsafe, deny_unsafe,