mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-11 13:39:25 +00:00
Compare commits
10 Commits
b2bd5d3a44
...
201a444e89
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
201a444e89 | ||
|
|
9833911e06 | ||
|
|
465e8498c4 | ||
|
|
adf20773ac | ||
|
|
295c1bd044 | ||
|
|
dda6e3e899 | ||
|
|
75a00f2a1a | ||
|
|
6cde2bb6ef | ||
|
|
20326bba73 | ||
|
|
ce83b41712 |
2
.github/workflows/msrv.yml
vendored
2
.github/workflows/msrv.yml
vendored
@@ -177,6 +177,8 @@ jobs:
|
||||
cargo msrv verify --manifest-path coordinator/tributary/Cargo.toml
|
||||
cargo msrv verify --manifest-path coordinator/cosign/Cargo.toml
|
||||
cargo msrv verify --manifest-path coordinator/substrate/Cargo.toml
|
||||
cargo msrv verify --manifest-path coordinator/p2p/Cargo.toml
|
||||
cargo msrv verify --manifest-path coordinator/p2p/libp2p/Cargo.toml
|
||||
cargo msrv verify --manifest-path coordinator/Cargo.toml
|
||||
|
||||
msrv-substrate:
|
||||
|
||||
2
.github/workflows/tests.yml
vendored
2
.github/workflows/tests.yml
vendored
@@ -63,6 +63,8 @@ jobs:
|
||||
-p tributary-chain \
|
||||
-p serai-cosign \
|
||||
-p serai-coordinator-substrate \
|
||||
-p serai-coordinator-p2p \
|
||||
-p serai-coordinator-libp2p-p2p \
|
||||
-p serai-coordinator \
|
||||
-p serai-orchestrator \
|
||||
-p serai-docker-tests
|
||||
|
||||
82
Cargo.lock
generated
82
Cargo.lock
generated
@@ -840,6 +840,18 @@ dependencies = [
|
||||
"futures-core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-channel"
|
||||
version = "2.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a"
|
||||
dependencies = [
|
||||
"concurrent-queue",
|
||||
"event-listener-strategy",
|
||||
"futures-core",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "async-io"
|
||||
version = "2.4.0"
|
||||
@@ -2528,7 +2540,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -3047,7 +3059,10 @@ version = "2.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cef40d21ae2c515b51041df9ed313ed21e572df340ea58a922a0aefe7e8891a1"
|
||||
dependencies = [
|
||||
"fastrand",
|
||||
"futures-core",
|
||||
"futures-io",
|
||||
"parking",
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
@@ -3548,7 +3563,7 @@ dependencies = [
|
||||
"httpdate",
|
||||
"itoa",
|
||||
"pin-project-lite",
|
||||
"socket2 0.4.10",
|
||||
"socket2 0.5.8",
|
||||
"tokio",
|
||||
"tower-service",
|
||||
"tracing",
|
||||
@@ -4112,7 +4127,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "fc2f4eb4bc735547cfed7c0a4922cbd04a4655978c09b54f1f7b228750664c34"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
"windows-targets 0.48.5",
|
||||
"windows-targets 0.52.6",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -6907,7 +6922,7 @@ dependencies = [
|
||||
"errno",
|
||||
"libc",
|
||||
"linux-raw-sys",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -7450,7 +7465,7 @@ version = "0.10.0-dev"
|
||||
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
|
||||
dependencies = [
|
||||
"array-bytes",
|
||||
"async-channel",
|
||||
"async-channel 1.9.0",
|
||||
"async-trait",
|
||||
"asynchronous-codec",
|
||||
"bytes",
|
||||
@@ -7491,7 +7506,7 @@ name = "sc-network-bitswap"
|
||||
version = "0.10.0-dev"
|
||||
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-channel 1.9.0",
|
||||
"cid",
|
||||
"futures",
|
||||
"libp2p-identity",
|
||||
@@ -7548,7 +7563,7 @@ version = "0.10.0-dev"
|
||||
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
|
||||
dependencies = [
|
||||
"array-bytes",
|
||||
"async-channel",
|
||||
"async-channel 1.9.0",
|
||||
"futures",
|
||||
"libp2p-identity",
|
||||
"log",
|
||||
@@ -7569,7 +7584,7 @@ version = "0.10.0-dev"
|
||||
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
|
||||
dependencies = [
|
||||
"array-bytes",
|
||||
"async-channel",
|
||||
"async-channel 1.9.0",
|
||||
"async-trait",
|
||||
"fork-tree",
|
||||
"futures",
|
||||
@@ -7943,7 +7958,7 @@ name = "sc-utils"
|
||||
version = "4.0.0-dev"
|
||||
source = "git+https://github.com/serai-dex/substrate#6e3f07bf5c98a6a3ec15f2b1a46148aa8c7d737a"
|
||||
dependencies = [
|
||||
"async-channel",
|
||||
"async-channel 1.9.0",
|
||||
"futures",
|
||||
"futures-timer",
|
||||
"lazy_static",
|
||||
@@ -8311,7 +8326,6 @@ dependencies = [
|
||||
name = "serai-coordinator"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"bitvec",
|
||||
"blake2",
|
||||
"borsh",
|
||||
@@ -8319,9 +8333,7 @@ dependencies = [
|
||||
"env_logger",
|
||||
"flexible-transcript",
|
||||
"frost-schnorrkel",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"libp2p",
|
||||
"log",
|
||||
"modular-frost",
|
||||
"parity-scale-codec",
|
||||
@@ -8329,6 +8341,9 @@ dependencies = [
|
||||
"schnorr-signatures",
|
||||
"schnorrkel",
|
||||
"serai-client",
|
||||
"serai-coordinator-libp2p-p2p",
|
||||
"serai-coordinator-p2p",
|
||||
"serai-coordinator-substrate",
|
||||
"serai-cosign",
|
||||
"serai-db",
|
||||
"serai-env",
|
||||
@@ -8337,12 +8352,49 @@ dependencies = [
|
||||
"serai-task",
|
||||
"sp-application-crypto",
|
||||
"sp-runtime",
|
||||
"tokio",
|
||||
"tributary-chain",
|
||||
"zalloc",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serai-coordinator-libp2p-p2p"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"blake2",
|
||||
"borsh",
|
||||
"futures-util",
|
||||
"hex",
|
||||
"libp2p",
|
||||
"log",
|
||||
"rand_core",
|
||||
"schnorrkel",
|
||||
"serai-client",
|
||||
"serai-coordinator-p2p",
|
||||
"serai-cosign",
|
||||
"serai-task",
|
||||
"tokio",
|
||||
"tributary-chain",
|
||||
"void",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serai-coordinator-p2p"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-channel 2.3.1",
|
||||
"borsh",
|
||||
"futures-lite",
|
||||
"log",
|
||||
"serai-client",
|
||||
"serai-cosign",
|
||||
"serai-db",
|
||||
"serai-task",
|
||||
"tributary-chain",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serai-coordinator-substrate"
|
||||
version = "0.1.0"
|
||||
@@ -10491,7 +10543,7 @@ dependencies = [
|
||||
"getrandom",
|
||||
"once_cell",
|
||||
"rustix",
|
||||
"windows-sys 0.52.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -11715,7 +11767,7 @@ version = "0.1.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
|
||||
dependencies = [
|
||||
"windows-sys 0.48.0",
|
||||
"windows-sys 0.59.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
||||
@@ -100,6 +100,8 @@ members = [
|
||||
"coordinator/tributary",
|
||||
"coordinator/cosign",
|
||||
"coordinator/substrate",
|
||||
"coordinator/p2p",
|
||||
"coordinator/p2p/libp2p",
|
||||
"coordinator",
|
||||
|
||||
"substrate/primitives",
|
||||
|
||||
@@ -18,8 +18,6 @@ rustdoc-args = ["--cfg", "docsrs"]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait = { version = "0.1", default-features = false }
|
||||
|
||||
zeroize = { version = "^1.5", default-features = false, features = ["std"] }
|
||||
bitvec = { version = "1", default-features = false, features = ["std"] }
|
||||
rand_core = { version = "0.6", default-features = false, features = ["std"] }
|
||||
@@ -53,11 +51,10 @@ borsh = { version = "1", default-features = false, features = ["std", "derive",
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
env_logger = { version = "0.10", default-features = false, features = ["humantime"] }
|
||||
|
||||
futures-util = { version = "0.3", default-features = false, features = ["std"] }
|
||||
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
|
||||
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] }
|
||||
|
||||
serai-cosign = { path = "./cosign" }
|
||||
serai-coordinator-substrate = { path = "./substrate" }
|
||||
serai-coordinator-p2p = { path = "./p2p" }
|
||||
serai-coordinator-libp2p-p2p = { path = "./p2p/libp2p" }
|
||||
|
||||
[dev-dependencies]
|
||||
tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] }
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
AGPL-3.0-only license
|
||||
|
||||
Copyright (c) 2023-2024 Luke Parker
|
||||
Copyright (c) 2023-2025 Luke Parker
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License Version 3 as
|
||||
|
||||
33
coordinator/p2p/Cargo.toml
Normal file
33
coordinator/p2p/Cargo.toml
Normal file
@@ -0,0 +1,33 @@
|
||||
[package]
|
||||
name = "serai-coordinator-p2p"
|
||||
version = "0.1.0"
|
||||
description = "Serai coordinator's P2P abstraction"
|
||||
license = "AGPL-3.0-only"
|
||||
repository = "https://github.com/serai-dex/serai/tree/develop/coordinator/p2p"
|
||||
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
|
||||
keywords = []
|
||||
edition = "2021"
|
||||
publish = false
|
||||
rust-version = "1.81"
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
all-features = true
|
||||
rustdoc-args = ["--cfg", "docsrs"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
|
||||
|
||||
serai-db = { path = "../../common/db", version = "0.1" }
|
||||
|
||||
serai-client = { path = "../../substrate/client", default-features = false, features = ["serai", "borsh"] }
|
||||
serai-cosign = { path = "../cosign" }
|
||||
tributary = { package = "tributary-chain", path = "../tributary" }
|
||||
|
||||
async-channel = { version = "2", default-features = false, features = ["std"] }
|
||||
futures-lite = { version = "2", default-features = false, features = ["std"] }
|
||||
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
serai-task = { path = "../../common/task", version = "0.1" }
|
||||
15
coordinator/p2p/LICENSE
Normal file
15
coordinator/p2p/LICENSE
Normal file
@@ -0,0 +1,15 @@
|
||||
AGPL-3.0-only license
|
||||
|
||||
Copyright (c) 2023-2025 Luke Parker
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License Version 3 as
|
||||
published by the Free Software Foundation.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
3
coordinator/p2p/README.md
Normal file
3
coordinator/p2p/README.md
Normal file
@@ -0,0 +1,3 @@
|
||||
# Serai Coordinator P2P
|
||||
|
||||
The P2P abstraction used by Serai's coordinator.
|
||||
43
coordinator/p2p/libp2p/Cargo.toml
Normal file
43
coordinator/p2p/libp2p/Cargo.toml
Normal file
@@ -0,0 +1,43 @@
|
||||
[package]
|
||||
name = "serai-coordinator-libp2p-p2p"
|
||||
version = "0.1.0"
|
||||
description = "Serai coordinator's libp2p-based P2P backend"
|
||||
license = "AGPL-3.0-only"
|
||||
repository = "https://github.com/serai-dex/serai/tree/develop/coordinator/p2p/libp2p"
|
||||
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
|
||||
keywords = []
|
||||
edition = "2021"
|
||||
publish = false
|
||||
rust-version = "1.81"
|
||||
|
||||
[package.metadata.docs.rs]
|
||||
all-features = true
|
||||
rustdoc-args = ["--cfg", "docsrs"]
|
||||
|
||||
[lints]
|
||||
workspace = true
|
||||
|
||||
[dependencies]
|
||||
async-trait = { version = "0.1", default-features = false }
|
||||
|
||||
rand_core = { version = "0.6", default-features = false, features = ["std"] }
|
||||
|
||||
zeroize = { version = "^1.5", default-features = false, features = ["std"] }
|
||||
blake2 = { version = "0.10", default-features = false, features = ["std"] }
|
||||
schnorrkel = { version = "0.11", default-features = false, features = ["std"] }
|
||||
|
||||
hex = { version = "0.4", default-features = false, features = ["std"] }
|
||||
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
|
||||
|
||||
serai-client = { path = "../../../substrate/client", default-features = false, features = ["serai", "borsh"] }
|
||||
serai-cosign = { path = "../../cosign" }
|
||||
tributary = { package = "tributary-chain", path = "../../tributary" }
|
||||
|
||||
void = { version = "1", default-features = false }
|
||||
futures-util = { version = "0.3", default-features = false, features = ["std"] }
|
||||
tokio = { version = "1", default-features = false, features = ["sync"] }
|
||||
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "ping", "request-response", "gossipsub", "macros"] }
|
||||
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
serai-task = { path = "../../../common/task", version = "0.1" }
|
||||
serai-coordinator-p2p = { path = "../" }
|
||||
15
coordinator/p2p/libp2p/LICENSE
Normal file
15
coordinator/p2p/libp2p/LICENSE
Normal file
@@ -0,0 +1,15 @@
|
||||
AGPL-3.0-only license
|
||||
|
||||
Copyright (c) 2023-2025 Luke Parker
|
||||
|
||||
This program is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU Affero General Public License Version 3 as
|
||||
published by the Free Software Foundation.
|
||||
|
||||
This program is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU Affero General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU Affero General Public License
|
||||
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
14
coordinator/p2p/libp2p/README.md
Normal file
14
coordinator/p2p/libp2p/README.md
Normal file
@@ -0,0 +1,14 @@
|
||||
# Serai Coordinator libp2p P2P
|
||||
|
||||
A libp2p-backed P2P instantiation for Serai's coordinator.
|
||||
|
||||
The libp2p swarm is limited to validators from the Serai network. The swarm
|
||||
does not maintain any of its own peer finding/routing infrastructure, instead
|
||||
relying on the Serai network's connection information to dial peers. This does
|
||||
limit the listening peers to only the peers immediately reachable via the same
|
||||
IP address (despite the two distinct services), not hidden behind a NAT, yet is
|
||||
also quite simple and gives full control of who to connect to to us.
|
||||
|
||||
Peers are decided via the internal `DialTask` which aims to maintain a target
|
||||
amount of peers for each external network. This ensures cosigns are able to
|
||||
propagate across the external networks which sign them.
|
||||
@@ -1,5 +1,5 @@
|
||||
use core::{pin::Pin, future::Future};
|
||||
use std::{sync::Arc, io};
|
||||
use std::io;
|
||||
|
||||
use zeroize::Zeroizing;
|
||||
use rand_core::{RngCore, OsRng};
|
||||
@@ -9,8 +9,6 @@ use schnorrkel::{Keypair, PublicKey, Signature};
|
||||
|
||||
use serai_client::primitives::PublicKey as Public;
|
||||
|
||||
use tokio::sync::RwLock;
|
||||
|
||||
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
use libp2p::{
|
||||
core::UpgradeInfo,
|
||||
@@ -19,13 +17,12 @@ use libp2p::{
|
||||
noise,
|
||||
};
|
||||
|
||||
use crate::p2p::libp2p::{validators::Validators, peer_id_from_public};
|
||||
use crate::peer_id_from_public;
|
||||
|
||||
const PROTOCOL: &str = "/serai/coordinator/validators";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(crate) struct OnlyValidators {
|
||||
pub(crate) validators: Arc<RwLock<Validators>>,
|
||||
pub(crate) serai_key: Zeroizing<Keypair>,
|
||||
pub(crate) noise_keypair: identity::Keypair,
|
||||
}
|
||||
@@ -108,12 +105,7 @@ impl OnlyValidators {
|
||||
.verify_simple(PROTOCOL.as_bytes(), &msg, &sig)
|
||||
.map_err(|_| io::Error::other("invalid signature"))?;
|
||||
|
||||
let peer_id = peer_id_from_public(Public::from_raw(public_key.to_bytes()));
|
||||
if !self.validators.read().await.contains(&peer_id) {
|
||||
Err(io::Error::other("peer which tried to connect isn't a known active validator"))?;
|
||||
}
|
||||
|
||||
Ok(peer_id)
|
||||
Ok(peer_id_from_public(Public::from_raw(public_key.to_bytes())))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,7 +14,7 @@ use libp2p::{
|
||||
|
||||
use serai_task::ContinuallyRan;
|
||||
|
||||
use crate::p2p::libp2p::{PORT, Peers, validators::Validators};
|
||||
use crate::{PORT, Peers, validators::Validators};
|
||||
|
||||
const TARGET_PEERS_PER_NETWORK: usize = 5;
|
||||
/*
|
||||
@@ -37,7 +37,7 @@ pub(crate) struct DialTask {
|
||||
|
||||
impl DialTask {
|
||||
pub(crate) fn new(serai: Serai, peers: Peers, to_dial: mpsc::UnboundedSender<DialOpts>) -> Self {
|
||||
DialTask { serai: serai.clone(), validators: Validators::new(serai), peers, to_dial }
|
||||
DialTask { serai: serai.clone(), validators: Validators::new(serai).0, peers, to_dial }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,13 +2,11 @@ use core::time::Duration;
|
||||
|
||||
use blake2::{Digest, Blake2s256};
|
||||
|
||||
use scale::Encode;
|
||||
use borsh::{BorshSerialize, BorshDeserialize};
|
||||
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||
|
||||
use libp2p::gossipsub::{
|
||||
TopicHash, IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
|
||||
IdentityTransform, AllowAllSubscriptionFilter, Behaviour,
|
||||
IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentityTransform,
|
||||
AllowAllSubscriptionFilter, Behaviour,
|
||||
};
|
||||
pub use libp2p::gossipsub::Event;
|
||||
|
||||
@@ -17,26 +15,24 @@ use serai_cosign::SignedCosign;
|
||||
// Block size limit + 16 KB of space for signatures/metadata
|
||||
pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
|
||||
|
||||
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
|
||||
|
||||
const LIBP2P_PROTOCOL: &str = "/serai/coordinator/gossip/1.0.0";
|
||||
const BASE_TOPIC: &str = "/";
|
||||
|
||||
fn topic_for_set(set: ValidatorSet) -> IdentTopic {
|
||||
IdentTopic::new(format!("/set/{}", hex::encode(set.encode())))
|
||||
fn topic_for_tributary(tributary: [u8; 32]) -> IdentTopic {
|
||||
IdentTopic::new(format!("/tributary/{}", hex::encode(tributary)))
|
||||
}
|
||||
|
||||
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||
pub(crate) enum Message {
|
||||
Tributary { set: ValidatorSet, message: Vec<u8> },
|
||||
Tributary { tributary: [u8; 32], message: Vec<u8> },
|
||||
Cosign(SignedCosign),
|
||||
}
|
||||
|
||||
impl Message {
|
||||
pub(crate) fn topic(&self) -> TopicHash {
|
||||
pub(crate) fn topic(&self) -> IdentTopic {
|
||||
match self {
|
||||
Message::Tributary { set, .. } => topic_for_set(*set).hash(),
|
||||
Message::Cosign(_) => IdentTopic::new(BASE_TOPIC).hash(),
|
||||
Message::Tributary { tributary, .. } => topic_for_tributary(*tributary),
|
||||
Message::Cosign(_) => IdentTopic::new(BASE_TOPIC),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -60,7 +56,6 @@ pub(crate) fn new_behavior() -> Behavior {
|
||||
.history_gossip(usize::try_from(heartbeats_to_gossip).unwrap())
|
||||
.heartbeat_interval(Duration::from_millis(heartbeat_interval.into()))
|
||||
.max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE)
|
||||
.idle_timeout(KEEP_ALIVE_INTERVAL + Duration::from_secs(5))
|
||||
.duplicate_cache_time(Duration::from_millis((heartbeats_to_keep * heartbeat_interval).into()))
|
||||
.validation_mode(ValidationMode::Anonymous)
|
||||
// Uses a content based message ID to avoid duplicates as much as possible
|
||||
419
coordinator/p2p/libp2p/src/lib.rs
Normal file
419
coordinator/p2p/libp2p/src/lib.rs
Normal file
@@ -0,0 +1,419 @@
|
||||
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
|
||||
#![doc = include_str!("../README.md")]
|
||||
#![deny(missing_docs)]
|
||||
|
||||
use core::{future::Future, time::Duration};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
collections::{HashSet, HashMap},
|
||||
};
|
||||
|
||||
use rand_core::{RngCore, OsRng};
|
||||
|
||||
use zeroize::Zeroizing;
|
||||
use schnorrkel::Keypair;
|
||||
|
||||
use serai_client::{
|
||||
primitives::{NetworkId, PublicKey},
|
||||
validator_sets::primitives::ValidatorSet,
|
||||
Serai,
|
||||
};
|
||||
|
||||
use tokio::sync::{mpsc, Mutex, RwLock};
|
||||
|
||||
use serai_task::{Task, ContinuallyRan};
|
||||
|
||||
use serai_cosign::SignedCosign;
|
||||
|
||||
use libp2p::{
|
||||
multihash::Multihash,
|
||||
identity::{self, PeerId},
|
||||
tcp::Config as TcpConfig,
|
||||
yamux, allow_block_list,
|
||||
connection_limits::{self, ConnectionLimits},
|
||||
swarm::NetworkBehaviour,
|
||||
SwarmBuilder,
|
||||
};
|
||||
|
||||
use serai_coordinator_p2p::{oneshot, Heartbeat, TributaryBlockWithCommit};
|
||||
|
||||
/// A struct to sync the validators from the Serai node in order to keep track of them.
|
||||
mod validators;
|
||||
use validators::UpdateValidatorsTask;
|
||||
|
||||
/// The authentication protocol upgrade to limit the P2P network to active validators.
|
||||
mod authenticate;
|
||||
use authenticate::OnlyValidators;
|
||||
|
||||
/// The ping behavior, used to ensure connection latency is below the limit
|
||||
mod ping;
|
||||
|
||||
/// The request-response messages and behavior
|
||||
mod reqres;
|
||||
use reqres::{RequestId, Request, Response};
|
||||
|
||||
/// The gossip messages and behavior
|
||||
mod gossip;
|
||||
use gossip::Message;
|
||||
|
||||
/// The swarm task, running it and dispatching to/from it
|
||||
mod swarm;
|
||||
use swarm::SwarmTask;
|
||||
|
||||
/// The dial task, to find new peers to connect to
|
||||
mod dial;
|
||||
use dial::DialTask;
|
||||
|
||||
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
|
||||
|
||||
// usize::max, manually implemented, as max isn't a const fn
|
||||
const MAX_LIBP2P_MESSAGE_SIZE: usize =
|
||||
if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE {
|
||||
gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE
|
||||
} else {
|
||||
reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE
|
||||
};
|
||||
|
||||
fn peer_id_from_public(public: PublicKey) -> PeerId {
|
||||
// 0 represents the identity Multihash, that no hash was performed
|
||||
// It's an internal constant so we can't refer to the constant inside libp2p
|
||||
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
|
||||
}
|
||||
|
||||
/// The representation of a peer.
|
||||
pub struct Peer<'a> {
|
||||
outbound_requests: &'a mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||
id: PeerId,
|
||||
}
|
||||
impl serai_coordinator_p2p::Peer<'_> for Peer<'_> {
|
||||
fn send_heartbeat(
|
||||
&self,
|
||||
heartbeat: Heartbeat,
|
||||
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>> {
|
||||
async move {
|
||||
const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
let request = Request::Heartbeat(heartbeat);
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
self
|
||||
.outbound_requests
|
||||
.send((self.id, request, sender))
|
||||
.expect("outbound requests recv channel was dropped?");
|
||||
if let Ok(Ok(Response::Blocks(blocks))) =
|
||||
tokio::time::timeout(HEARTBEAT_TIMEOUT, receiver).await
|
||||
{
|
||||
Some(blocks)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Peers {
|
||||
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
|
||||
}
|
||||
|
||||
// Consider adding identify/kad/autonat/rendevous/(relay + dcutr). While we currently use the Serai
|
||||
// network for peers, we could use it solely for bootstrapping/as a fallback.
|
||||
#[derive(NetworkBehaviour)]
|
||||
struct Behavior {
|
||||
// Used to only allow Serai validators as peers
|
||||
allow_list: allow_block_list::Behaviour<allow_block_list::AllowedPeers>,
|
||||
// Used to limit each peer to a single connection
|
||||
connection_limits: connection_limits::Behaviour,
|
||||
// Used to ensure connection latency is within tolerances
|
||||
ping: ping::Behavior,
|
||||
// Used to request data from specific peers
|
||||
reqres: reqres::Behavior,
|
||||
// Used to broadcast messages to all other peers subscribed to a topic
|
||||
gossip: gossip::Behavior,
|
||||
}
|
||||
|
||||
/// The libp2p-backed P2P implementation.
|
||||
///
|
||||
/// The P2p trait implementation does not support backpressure and is expected to be fully
|
||||
/// utilized. Failure to poll the entire API will cause unbounded memory growth.
|
||||
#[allow(clippy::type_complexity)]
|
||||
#[derive(Clone)]
|
||||
pub struct Libp2p {
|
||||
peers: Peers,
|
||||
|
||||
gossip: mpsc::UnboundedSender<Message>,
|
||||
outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||
|
||||
tributary_gossip: Arc<Mutex<mpsc::UnboundedReceiver<([u8; 32], Vec<u8>)>>>,
|
||||
|
||||
signed_cosigns: Arc<Mutex<mpsc::UnboundedReceiver<SignedCosign>>>,
|
||||
signed_cosigns_send: mpsc::UnboundedSender<SignedCosign>,
|
||||
|
||||
heartbeat_requests: Arc<Mutex<mpsc::UnboundedReceiver<(RequestId, ValidatorSet, [u8; 32])>>>,
|
||||
notable_cosign_requests: Arc<Mutex<mpsc::UnboundedReceiver<(RequestId, [u8; 32])>>>,
|
||||
inbound_request_responses: mpsc::UnboundedSender<(RequestId, Response)>,
|
||||
}
|
||||
|
||||
impl Libp2p {
|
||||
/// Create a new libp2p-backed P2P instance.
|
||||
///
|
||||
/// This will spawn all of the internal tasks necessary for functioning.
|
||||
pub fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> Libp2p {
|
||||
// Define the object we track peers with
|
||||
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
|
||||
|
||||
// Define the dial task
|
||||
let (dial_task_def, dial_task) = Task::new();
|
||||
let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel();
|
||||
tokio::spawn(
|
||||
DialTask::new(serai.clone(), peers.clone(), to_dial_send)
|
||||
.continually_run(dial_task_def, vec![]),
|
||||
);
|
||||
|
||||
let swarm = {
|
||||
let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> {
|
||||
Ok(OnlyValidators { serai_key: serai_key.clone(), noise_keypair: noise_keypair.clone() })
|
||||
};
|
||||
|
||||
let new_yamux = || {
|
||||
let mut config = yamux::Config::default();
|
||||
// 1 MiB default + max message size
|
||||
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
|
||||
// 256 KiB default + max message size
|
||||
config
|
||||
.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
|
||||
config
|
||||
};
|
||||
|
||||
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
|
||||
.with_tokio()
|
||||
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
|
||||
.unwrap()
|
||||
.with_behaviour(|_| Behavior {
|
||||
allow_list: allow_block_list::Behaviour::default(),
|
||||
// Limit each per to a single connection
|
||||
connection_limits: connection_limits::Behaviour::new(
|
||||
ConnectionLimits::default().with_max_established_per_peer(Some(1)),
|
||||
),
|
||||
ping: ping::new_behavior(),
|
||||
reqres: reqres::new_behavior(),
|
||||
gossip: gossip::new_behavior(),
|
||||
})
|
||||
.unwrap()
|
||||
.with_swarm_config(|config| {
|
||||
config
|
||||
.with_idle_connection_timeout(ping::INTERVAL + ping::TIMEOUT + Duration::from_secs(5))
|
||||
})
|
||||
.build();
|
||||
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
|
||||
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
|
||||
swarm
|
||||
};
|
||||
|
||||
let (swarm_validators, validator_changes) = UpdateValidatorsTask::spawn(serai);
|
||||
|
||||
let (gossip_send, gossip_recv) = mpsc::unbounded_channel();
|
||||
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
|
||||
let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel();
|
||||
let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel();
|
||||
let (inbound_request_responses_send, inbound_request_responses_recv) =
|
||||
mpsc::unbounded_channel();
|
||||
|
||||
// Create the swarm task
|
||||
SwarmTask::spawn(
|
||||
dial_task,
|
||||
to_dial_recv,
|
||||
swarm_validators,
|
||||
validator_changes,
|
||||
peers.clone(),
|
||||
swarm,
|
||||
gossip_recv,
|
||||
signed_cosigns_send.clone(),
|
||||
tributary_gossip_send,
|
||||
outbound_requests_recv,
|
||||
heartbeat_requests_send,
|
||||
notable_cosign_requests_send,
|
||||
inbound_request_responses_recv,
|
||||
);
|
||||
|
||||
Libp2p {
|
||||
peers,
|
||||
|
||||
gossip: gossip_send,
|
||||
outbound_requests: outbound_requests_send,
|
||||
|
||||
tributary_gossip: Arc::new(Mutex::new(tributary_gossip_recv)),
|
||||
|
||||
signed_cosigns: Arc::new(Mutex::new(signed_cosigns_recv)),
|
||||
signed_cosigns_send,
|
||||
|
||||
heartbeat_requests: Arc::new(Mutex::new(heartbeat_requests_recv)),
|
||||
notable_cosign_requests: Arc::new(Mutex::new(notable_cosign_requests_recv)),
|
||||
inbound_request_responses: inbound_request_responses_send,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl tributary::P2p for Libp2p {
|
||||
fn broadcast(&self, tributary: [u8; 32], message: Vec<u8>) -> impl Send + Future<Output = ()> {
|
||||
async move {
|
||||
self
|
||||
.gossip
|
||||
.send(Message::Tributary { tributary, message })
|
||||
.expect("gossip recv channel was dropped?");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl serai_cosign::RequestNotableCosigns for Libp2p {
|
||||
type Error = ();
|
||||
|
||||
fn request_notable_cosigns(
|
||||
&self,
|
||||
global_session: [u8; 32],
|
||||
) -> impl Send + Future<Output = Result<(), Self::Error>> {
|
||||
async move {
|
||||
const AMOUNT_OF_PEERS_TO_REQUEST_FROM: usize = 3;
|
||||
const NOTABLE_COSIGNS_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
let request = Request::NotableCosigns { global_session };
|
||||
|
||||
let peers = self.peers.peers.read().await.clone();
|
||||
// HashSet of all peers
|
||||
let peers = peers.into_values().flat_map(<_>::into_iter).collect::<HashSet<_>>();
|
||||
// Vec of all peers
|
||||
let mut peers = peers.into_iter().collect::<Vec<_>>();
|
||||
|
||||
let mut channels = Vec::with_capacity(AMOUNT_OF_PEERS_TO_REQUEST_FROM);
|
||||
for _ in 0 .. AMOUNT_OF_PEERS_TO_REQUEST_FROM {
|
||||
if peers.is_empty() {
|
||||
break;
|
||||
}
|
||||
let i = usize::try_from(OsRng.next_u64() % u64::try_from(peers.len()).unwrap()).unwrap();
|
||||
let peer = peers.swap_remove(i);
|
||||
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
self
|
||||
.outbound_requests
|
||||
.send((peer, request, sender))
|
||||
.expect("outbound requests recv channel was dropped?");
|
||||
channels.push(receiver);
|
||||
}
|
||||
|
||||
// We could reduce our latency by using FuturesUnordered here but the latency isn't a concern
|
||||
for channel in channels {
|
||||
if let Ok(Ok(Response::NotableCosigns(cosigns))) =
|
||||
tokio::time::timeout(NOTABLE_COSIGNS_TIMEOUT, channel).await
|
||||
{
|
||||
for cosign in cosigns {
|
||||
self
|
||||
.signed_cosigns_send
|
||||
.send(cosign)
|
||||
.expect("signed_cosigns recv in this object was dropped?");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl serai_coordinator_p2p::P2p for Libp2p {
|
||||
type Peer<'a> = Peer<'a>;
|
||||
|
||||
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>> {
|
||||
async move {
|
||||
let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else {
|
||||
return vec![];
|
||||
};
|
||||
let mut res = vec![];
|
||||
for id in peer_ids {
|
||||
res.push(Peer { outbound_requests: &self.outbound_requests, id });
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
|
||||
fn heartbeat(
|
||||
&self,
|
||||
) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)> {
|
||||
async move {
|
||||
let (request_id, set, latest_block_hash) = self
|
||||
.heartbeat_requests
|
||||
.lock()
|
||||
.await
|
||||
.recv()
|
||||
.await
|
||||
.expect("heartbeat_requests_send was dropped?");
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
tokio::spawn({
|
||||
let respond = self.inbound_request_responses.clone();
|
||||
async move {
|
||||
// The swarm task expects us to respond to every request. If the caller drops this
|
||||
// channel, we'll receive `Err` and respond with `vec![]`, safely satisfying that bound
|
||||
// without requiring the caller send a value down this channel
|
||||
let response = if let Ok(blocks) = receiver.await {
|
||||
Response::Blocks(blocks)
|
||||
} else {
|
||||
Response::Blocks(vec![])
|
||||
};
|
||||
respond
|
||||
.send((request_id, response))
|
||||
.expect("inbound_request_responses_recv was dropped?");
|
||||
}
|
||||
});
|
||||
(Heartbeat { set, latest_block_hash }, sender)
|
||||
}
|
||||
}
|
||||
|
||||
fn notable_cosigns_request(
|
||||
&self,
|
||||
) -> impl Send + Future<Output = ([u8; 32], oneshot::Sender<Vec<SignedCosign>>)> {
|
||||
async move {
|
||||
let (request_id, global_session) = self
|
||||
.notable_cosign_requests
|
||||
.lock()
|
||||
.await
|
||||
.recv()
|
||||
.await
|
||||
.expect("notable_cosign_requests_send was dropped?");
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
tokio::spawn({
|
||||
let respond = self.inbound_request_responses.clone();
|
||||
async move {
|
||||
let response = if let Ok(notable_cosigns) = receiver.await {
|
||||
Response::NotableCosigns(notable_cosigns)
|
||||
} else {
|
||||
Response::NotableCosigns(vec![])
|
||||
};
|
||||
respond
|
||||
.send((request_id, response))
|
||||
.expect("inbound_request_responses_recv was dropped?");
|
||||
}
|
||||
});
|
||||
(global_session, sender)
|
||||
}
|
||||
}
|
||||
|
||||
fn tributary_message(&self) -> impl Send + Future<Output = ([u8; 32], Vec<u8>)> {
|
||||
async move {
|
||||
self.tributary_gossip.lock().await.recv().await.expect("tributary_gossip send was dropped?")
|
||||
}
|
||||
}
|
||||
|
||||
fn cosign(&self) -> impl Send + Future<Output = SignedCosign> {
|
||||
async move {
|
||||
self
|
||||
.signed_cosigns
|
||||
.lock()
|
||||
.await
|
||||
.recv()
|
||||
.await
|
||||
.expect("signed_cosigns couldn't recv despite send in same object?")
|
||||
}
|
||||
}
|
||||
}
|
||||
17
coordinator/p2p/libp2p/src/ping.rs
Normal file
17
coordinator/p2p/libp2p/src/ping.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
use core::time::Duration;
|
||||
|
||||
use tributary::tendermint::LATENCY_TIME;
|
||||
|
||||
use libp2p::ping::{self, Config, Behaviour};
|
||||
pub use ping::Event;
|
||||
|
||||
pub(crate) const INTERVAL: Duration = Duration::from_secs(30);
|
||||
// LATENCY_TIME represents the maximum latency for message delivery. Sending the ping, and
|
||||
// receiving the pong, each have to occur within this time bound to validate the connection. We
|
||||
// enforce that, as best we can, by requiring the round-trip be within twice the allowed latency.
|
||||
pub(crate) const TIMEOUT: Duration = Duration::from_millis((2 * LATENCY_TIME) as u64);
|
||||
|
||||
pub(crate) type Behavior = Behaviour;
|
||||
pub(crate) fn new_behavior() -> Behavior {
|
||||
Behavior::new(Config::default().with_interval(INTERVAL).with_timeout(TIMEOUT))
|
||||
}
|
||||
@@ -4,36 +4,33 @@ use std::io;
|
||||
use async_trait::async_trait;
|
||||
|
||||
use borsh::{BorshSerialize, BorshDeserialize};
|
||||
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||
|
||||
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||
|
||||
use libp2p::request_response::{
|
||||
self, Codec as CodecTrait, Event as GenericEvent, Config, Behaviour, ProtocolSupport,
|
||||
};
|
||||
pub use request_response::Message;
|
||||
pub use request_response::{RequestId, Message};
|
||||
|
||||
use serai_cosign::SignedCosign;
|
||||
|
||||
use crate::p2p::TributaryBlockWithCommit;
|
||||
use serai_coordinator_p2p::{Heartbeat, TributaryBlockWithCommit};
|
||||
|
||||
/// The maximum message size for the request-response protocol
|
||||
// This is derived from the heartbeat message size as it's our largest message
|
||||
pub(crate) const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
|
||||
(tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024;
|
||||
(tributary::BLOCK_SIZE_LIMIT * serai_coordinator_p2p::heartbeat::BLOCKS_PER_BATCH) + 1024;
|
||||
|
||||
const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";
|
||||
|
||||
/// Requests which can be made via the request-response protocol.
|
||||
#[derive(Clone, Copy, Debug, BorshSerialize, BorshDeserialize)]
|
||||
pub(crate) enum Request {
|
||||
/// A keep-alive to prevent our connections from being dropped.
|
||||
KeepAlive,
|
||||
/// A heartbeat informing our peers of our latest block, for the specified blockchain, on regular
|
||||
/// intervals.
|
||||
///
|
||||
/// If our peers have more blocks than us, they're expected to respond with those blocks.
|
||||
Heartbeat { set: ValidatorSet, latest_block_hash: [u8; 32] },
|
||||
Heartbeat(Heartbeat),
|
||||
/// A request for the notable cosigns for a global session.
|
||||
NotableCosigns { global_session: [u8; 32] },
|
||||
}
|
||||
@@ -105,7 +102,7 @@ impl CodecTrait for Codec {
|
||||
}
|
||||
async fn read_response<R: Send + Unpin + AsyncRead>(
|
||||
&mut self,
|
||||
proto: &Self::Protocol,
|
||||
_: &Self::Protocol,
|
||||
io: &mut R,
|
||||
) -> io::Result<Response> {
|
||||
Self::read(io).await
|
||||
@@ -120,7 +117,7 @@ impl CodecTrait for Codec {
|
||||
}
|
||||
async fn write_response<W: Send + Unpin + AsyncWrite>(
|
||||
&mut self,
|
||||
proto: &Self::Protocol,
|
||||
_: &Self::Protocol,
|
||||
io: &mut W,
|
||||
res: Response,
|
||||
) -> io::Result<()> {
|
||||
@@ -8,7 +8,7 @@ use borsh::BorshDeserialize;
|
||||
|
||||
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||
|
||||
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
use tokio::sync::{mpsc, RwLock};
|
||||
|
||||
use serai_task::TaskHandle;
|
||||
|
||||
@@ -21,14 +21,16 @@ use libp2p::{
|
||||
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
|
||||
};
|
||||
|
||||
use crate::p2p::libp2p::{
|
||||
use serai_coordinator_p2p::{oneshot, Heartbeat};
|
||||
|
||||
use crate::{
|
||||
Peers, BehaviorEvent, Behavior,
|
||||
validators::Validators,
|
||||
validators::{self, Validators},
|
||||
ping,
|
||||
reqres::{self, Request, Response},
|
||||
gossip,
|
||||
};
|
||||
|
||||
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
|
||||
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
|
||||
|
||||
/*
|
||||
@@ -52,16 +54,15 @@ pub(crate) struct SwarmTask {
|
||||
last_dial_task_run: Instant,
|
||||
|
||||
validators: Arc<RwLock<Validators>>,
|
||||
validator_changes: mpsc::UnboundedReceiver<validators::Changes>,
|
||||
peers: Peers,
|
||||
rebuild_peers_at: Instant,
|
||||
|
||||
swarm: Swarm<Behavior>,
|
||||
|
||||
last_message: Instant,
|
||||
|
||||
gossip: mpsc::UnboundedReceiver<gossip::Message>,
|
||||
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
|
||||
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
|
||||
tributary_gossip: mpsc::UnboundedSender<([u8; 32], Vec<u8>)>,
|
||||
|
||||
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||
outbound_request_responses: HashMap<RequestId, oneshot::Sender<Response>>,
|
||||
@@ -82,12 +83,13 @@ impl SwarmTask {
|
||||
match event {
|
||||
gossip::Event::Message { message, .. } => {
|
||||
let Ok(message) = gossip::Message::deserialize(&mut message.data.as_slice()) else {
|
||||
// TODO: Penalize the PeerId which sent this message
|
||||
// TODO: Penalize the PeerId which created this message, which requires authenticating
|
||||
// each message OR moving to explicit acknowledgement before re-gossiping
|
||||
return;
|
||||
};
|
||||
match message {
|
||||
gossip::Message::Tributary { set, message } => {
|
||||
let _: Result<_, _> = self.tributary_gossip.send((set, message));
|
||||
gossip::Message::Tributary { tributary, message } => {
|
||||
let _: Result<_, _> = self.tributary_gossip.send((tributary, message));
|
||||
}
|
||||
gossip::Message::Cosign(signed_cosign) => {
|
||||
let _: Result<_, _> = self.signed_cosigns.send(signed_cosign);
|
||||
@@ -105,11 +107,7 @@ impl SwarmTask {
|
||||
match event {
|
||||
reqres::Event::Message { message, .. } => match message {
|
||||
reqres::Message::Request { request_id, request, channel } => match request {
|
||||
reqres::Request::KeepAlive => {
|
||||
let _: Result<_, _> =
|
||||
self.swarm.behaviour_mut().reqres.send_response(channel, Response::None);
|
||||
}
|
||||
reqres::Request::Heartbeat { set, latest_block_hash } => {
|
||||
reqres::Request::Heartbeat(Heartbeat { set, latest_block_hash }) => {
|
||||
self.inbound_request_response_channels.insert(request_id, channel);
|
||||
let _: Result<_, _> =
|
||||
self.heartbeat_requests.send((request_id, set, latest_block_hash));
|
||||
@@ -137,17 +135,19 @@ impl SwarmTask {
|
||||
|
||||
async fn run(mut self) {
|
||||
loop {
|
||||
let time_till_keep_alive = Instant::now().saturating_duration_since(self.last_message);
|
||||
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
|
||||
|
||||
tokio::select! {
|
||||
() = tokio::time::sleep(time_till_keep_alive) => {
|
||||
let peers = self.swarm.connected_peers().copied().collect::<Vec<_>>();
|
||||
let behavior = self.swarm.behaviour_mut();
|
||||
for peer in peers {
|
||||
behavior.reqres.send_request(&peer, Request::KeepAlive);
|
||||
// If the validators have changed, update the allow list
|
||||
validator_changes = self.validator_changes.recv() => {
|
||||
let validator_changes = validator_changes.expect("validators update task shut down?");
|
||||
let behavior = &mut self.swarm.behaviour_mut().allow_list;
|
||||
for removed in validator_changes.removed {
|
||||
behavior.disallow_peer(removed);
|
||||
}
|
||||
for added in validator_changes.added {
|
||||
behavior.allow_peer(added);
|
||||
}
|
||||
self.last_message = Instant::now();
|
||||
}
|
||||
|
||||
// Dial peers we're instructed to
|
||||
@@ -170,26 +170,15 @@ impl SwarmTask {
|
||||
let validators_by_network = self.validators.read().await.by_network().clone();
|
||||
let connected_peers = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
|
||||
|
||||
// We initially populate the list of peers to disconnect with all peers
|
||||
let mut to_disconnect = connected_peers.clone();
|
||||
|
||||
// Build the new peers object
|
||||
let mut peers = HashMap::new();
|
||||
for (network, validators) in validators_by_network {
|
||||
peers.insert(network, validators.intersection(&connected_peers).copied().collect());
|
||||
|
||||
// If this peer is in this validator set, don't keep it flagged for disconnection
|
||||
to_disconnect.retain(|peer| !validators.contains(peer));
|
||||
}
|
||||
|
||||
// Write the new peers object
|
||||
*self.peers.peers.write().await = peers;
|
||||
self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS;
|
||||
|
||||
// Disconnect all peers marked for disconnection
|
||||
for peer in to_disconnect {
|
||||
let _: Result<_, _> = self.swarm.disconnect_peer_id(peer);
|
||||
}
|
||||
}
|
||||
|
||||
// Handle swarm events
|
||||
@@ -238,6 +227,19 @@ impl SwarmTask {
|
||||
}
|
||||
}
|
||||
|
||||
SwarmEvent::Behaviour(
|
||||
BehaviorEvent::AllowList(event) | BehaviorEvent::ConnectionLimits(event)
|
||||
) => {
|
||||
// Ensure these are unreachable cases, not actual events
|
||||
let _: void::Void = event;
|
||||
}
|
||||
SwarmEvent::Behaviour(
|
||||
BehaviorEvent::Ping(ping::Event { peer: _, connection, result, })
|
||||
) => {
|
||||
if result.is_err() {
|
||||
self.swarm.close_connection(connection);
|
||||
}
|
||||
}
|
||||
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => {
|
||||
self.handle_reqres(event)
|
||||
}
|
||||
@@ -261,8 +263,31 @@ impl SwarmTask {
|
||||
let message = message.expect("channel for messages to gossip was closed?");
|
||||
let topic = message.topic();
|
||||
let message = borsh::to_vec(&message).unwrap();
|
||||
let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message);
|
||||
self.last_message = Instant::now();
|
||||
|
||||
/*
|
||||
If we're sending a message for this topic, it's because this topic is relevant to us.
|
||||
Subscribe to it.
|
||||
|
||||
We create topics roughly weekly, one per validator set/session. Once present in a
|
||||
topic, we're interested in all messages for it until the validator set/session retires.
|
||||
Then there should no longer be any messages for the topic as we should drop the
|
||||
Tributary which creates the messages.
|
||||
|
||||
We use this as an argument to not bother implement unsubscribing from topics. They're
|
||||
incredibly infrequently created and old topics shouldn't still have messages published
|
||||
to them. Having the coordinator reboot being our method of unsubscribing is fine.
|
||||
|
||||
Alternatively, we could route an API to determine when a topic is retired, or retire
|
||||
any topics we haven't sent messages on in the past hour.
|
||||
*/
|
||||
let behavior = self.swarm.behaviour_mut();
|
||||
let _: Result<_, _> = behavior.gossip.subscribe(&topic);
|
||||
/*
|
||||
This may be an error of `InsufficientPeers`. If so, we could ask DialTask to dial more
|
||||
peers for this network. We don't as we assume DialTask will detect the lack of peers
|
||||
for this network, and will already successfully handle this.
|
||||
*/
|
||||
let _: Result<_, _> = behavior.gossip.publish(topic.hash(), message);
|
||||
}
|
||||
|
||||
request = self.outbound_requests.recv() => {
|
||||
@@ -290,13 +315,14 @@ impl SwarmTask {
|
||||
to_dial: mpsc::UnboundedReceiver<DialOpts>,
|
||||
|
||||
validators: Arc<RwLock<Validators>>,
|
||||
validator_changes: mpsc::UnboundedReceiver<validators::Changes>,
|
||||
peers: Peers,
|
||||
|
||||
swarm: Swarm<Behavior>,
|
||||
|
||||
gossip: mpsc::UnboundedReceiver<gossip::Message>,
|
||||
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
|
||||
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
|
||||
tributary_gossip: mpsc::UnboundedSender<([u8; 32], Vec<u8>)>,
|
||||
|
||||
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||
|
||||
@@ -311,13 +337,12 @@ impl SwarmTask {
|
||||
last_dial_task_run: Instant::now(),
|
||||
|
||||
validators,
|
||||
validator_changes,
|
||||
peers,
|
||||
rebuild_peers_at: Instant::now() + TIME_BETWEEN_REBUILD_PEERS,
|
||||
|
||||
swarm,
|
||||
|
||||
last_message: Instant::now(),
|
||||
|
||||
gossip,
|
||||
signed_cosigns,
|
||||
tributary_gossip,
|
||||
@@ -11,9 +11,14 @@ use serai_task::{Task, ContinuallyRan};
|
||||
use libp2p::PeerId;
|
||||
|
||||
use futures_util::stream::{StreamExt, FuturesUnordered};
|
||||
use tokio::sync::RwLock;
|
||||
use tokio::sync::{mpsc, RwLock};
|
||||
|
||||
use crate::p2p::libp2p::peer_id_from_public;
|
||||
use crate::peer_id_from_public;
|
||||
|
||||
pub(crate) struct Changes {
|
||||
pub(crate) removed: HashSet<PeerId>,
|
||||
pub(crate) added: HashSet<PeerId>,
|
||||
}
|
||||
|
||||
pub(crate) struct Validators {
|
||||
serai: Serai,
|
||||
@@ -24,16 +29,22 @@ pub(crate) struct Validators {
|
||||
by_network: HashMap<NetworkId, HashSet<PeerId>>,
|
||||
// The validators and their networks
|
||||
validators: HashMap<PeerId, HashSet<NetworkId>>,
|
||||
|
||||
// The channel to send the changes down
|
||||
changes: mpsc::UnboundedSender<Changes>,
|
||||
}
|
||||
|
||||
impl Validators {
|
||||
pub(crate) fn new(serai: Serai) -> Self {
|
||||
Validators {
|
||||
pub(crate) fn new(serai: Serai) -> (Self, mpsc::UnboundedReceiver<Changes>) {
|
||||
let (send, recv) = mpsc::unbounded_channel();
|
||||
let validators = Validators {
|
||||
serai,
|
||||
sessions: HashMap::new(),
|
||||
by_network: HashMap::new(),
|
||||
validators: HashMap::new(),
|
||||
}
|
||||
changes: send,
|
||||
};
|
||||
(validators, recv)
|
||||
}
|
||||
|
||||
async fn session_changes(
|
||||
@@ -89,6 +100,9 @@ impl Validators {
|
||||
&mut self,
|
||||
session_changes: Vec<(NetworkId, Session, HashSet<PeerId>)>,
|
||||
) {
|
||||
let mut removed = HashSet::new();
|
||||
let mut added = HashSet::new();
|
||||
|
||||
for (network, session, validators) in session_changes {
|
||||
// Remove the existing validators
|
||||
for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) {
|
||||
@@ -96,21 +110,40 @@ impl Validators {
|
||||
let mut networks = self.validators.remove(&validator).unwrap();
|
||||
// Remove this one
|
||||
networks.remove(&network);
|
||||
// Insert the networks back if the validator was present in other networks
|
||||
if !networks.is_empty() {
|
||||
// Insert the networks back if the validator was present in other networks
|
||||
self.validators.insert(validator, networks);
|
||||
} else {
|
||||
// Because this validator is no longer present in any network, mark them as removed
|
||||
/*
|
||||
This isn't accurate. The validator isn't present in the latest session for this
|
||||
network. The validator was present in the prior session which has yet to retire. Our
|
||||
lack of explicit inclusion for both the prior session and the current session causes
|
||||
only the validators mutually present in both sessions to be responsible for all actions
|
||||
still ongoing as the prior validator set retires.
|
||||
|
||||
TODO: Fix this
|
||||
*/
|
||||
removed.insert(validator);
|
||||
}
|
||||
}
|
||||
|
||||
// Add the new validators
|
||||
for validator in validators.iter().copied() {
|
||||
self.validators.entry(validator).or_insert_with(HashSet::new).insert(network);
|
||||
added.insert(validator);
|
||||
}
|
||||
self.by_network.insert(network, validators);
|
||||
|
||||
// Update the session we have populated
|
||||
self.sessions.insert(network, session);
|
||||
}
|
||||
|
||||
// Only flag validators for removal if they weren't simultaneously added by these changes
|
||||
removed.retain(|validator| !added.contains(validator));
|
||||
// Send the changes, dropping the error
|
||||
// This lets the caller opt-out of change notifications by dropping the receiver
|
||||
let _: Result<_, _> = self.changes.send(Changes { removed, added });
|
||||
}
|
||||
|
||||
/// Update the view of the validators.
|
||||
@@ -124,10 +157,6 @@ impl Validators {
|
||||
&self.by_network
|
||||
}
|
||||
|
||||
pub(crate) fn contains(&self, peer_id: &PeerId) -> bool {
|
||||
self.validators.contains_key(peer_id)
|
||||
}
|
||||
|
||||
pub(crate) fn networks(&self, peer_id: &PeerId) -> Option<&HashSet<NetworkId>> {
|
||||
self.validators.get(peer_id)
|
||||
}
|
||||
@@ -145,9 +174,10 @@ impl UpdateValidatorsTask {
|
||||
/// Spawn a new instance of the UpdateValidatorsTask.
|
||||
///
|
||||
/// This returns a reference to the Validators it updates after spawning itself.
|
||||
pub(crate) fn spawn(serai: Serai) -> Arc<RwLock<Validators>> {
|
||||
pub(crate) fn spawn(serai: Serai) -> (Arc<RwLock<Validators>>, mpsc::UnboundedReceiver<Changes>) {
|
||||
// The validators which will be updated
|
||||
let validators = Arc::new(RwLock::new(Validators::new(serai)));
|
||||
let (validators, changes) = Validators::new(serai);
|
||||
let validators = Arc::new(RwLock::new(validators));
|
||||
|
||||
// Define the task
|
||||
let (update_validators_task, update_validators_task_handle) = Task::new();
|
||||
@@ -159,7 +189,7 @@ impl UpdateValidatorsTask {
|
||||
);
|
||||
|
||||
// Return the validators
|
||||
validators
|
||||
(validators, changes)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,22 +3,19 @@ use std::time::{Duration, SystemTime};
|
||||
|
||||
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||
|
||||
use futures_util::FutureExt;
|
||||
use futures_lite::FutureExt;
|
||||
|
||||
use tributary::{ReadWrite, Block, Tributary, TributaryReader};
|
||||
use tributary::{ReadWrite, TransactionTrait, Block, Tributary, TributaryReader};
|
||||
|
||||
use serai_db::*;
|
||||
use serai_task::ContinuallyRan;
|
||||
|
||||
use crate::{
|
||||
tributary::Transaction,
|
||||
p2p::{Peer, P2p},
|
||||
};
|
||||
use crate::{Heartbeat, Peer, P2p};
|
||||
|
||||
// Amount of blocks in a minute
|
||||
const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize;
|
||||
|
||||
// Maximum amount of blocks to send in a batch of blocks
|
||||
/// The maximum amount of blocks to include/included within a batch.
|
||||
pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
|
||||
|
||||
/// Sends a heartbeat to other validators on regular intervals informing them of our Tributary's
|
||||
@@ -26,14 +23,14 @@ pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
|
||||
///
|
||||
/// If the other validator has more blocks then we do, they're expected to inform us. This forms
|
||||
/// the sync protocol for our Tributaries.
|
||||
struct HeartbeatTask<TD: Db, P: P2p> {
|
||||
pub struct HeartbeatTask<TD: Db, Tx: TransactionTrait, P: P2p> {
|
||||
set: ValidatorSet,
|
||||
tributary: Tributary<TD, Transaction, P>,
|
||||
reader: TributaryReader<TD, Transaction>,
|
||||
tributary: Tributary<TD, Tx, P>,
|
||||
reader: TributaryReader<TD, Tx>,
|
||||
p2p: P,
|
||||
}
|
||||
|
||||
impl<TD: Db, P: P2p> ContinuallyRan for HeartbeatTask<TD, P> {
|
||||
impl<TD: Db, Tx: TransactionTrait, P: P2p> ContinuallyRan for HeartbeatTask<TD, Tx, P> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||
async move {
|
||||
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
|
||||
@@ -73,7 +70,11 @@ impl<TD: Db, P: P2p> ContinuallyRan for HeartbeatTask<TD, P> {
|
||||
tip_is_stale = false;
|
||||
}
|
||||
// Necessary due to https://github.com/rust-lang/rust/issues/100013
|
||||
let Some(blocks) = peer.send_heartbeat(self.set, tip).boxed().await else {
|
||||
let Some(blocks) = peer
|
||||
.send_heartbeat(Heartbeat { set: self.set, latest_block_hash: tip })
|
||||
.boxed()
|
||||
.await
|
||||
else {
|
||||
continue 'peer;
|
||||
};
|
||||
|
||||
@@ -91,7 +92,14 @@ impl<TD: Db, P: P2p> ContinuallyRan for HeartbeatTask<TD, P> {
|
||||
|
||||
// Attempt to sync the block
|
||||
if !self.tributary.sync_block(block, block_with_commit.commit).await {
|
||||
// The block may be invalid or may simply be stale
|
||||
// The block may be invalid or stale if we added a block elsewhere
|
||||
if (!tip_is_stale) && (tip != self.reader.tip()) {
|
||||
// Since the Tributary's tip advanced on its own, return
|
||||
return Ok(false);
|
||||
}
|
||||
|
||||
// Since this block was invalid or stale in a way non-trivial to detect, try to
|
||||
// sync with the next peer
|
||||
continue 'peer;
|
||||
}
|
||||
|
||||
76
coordinator/p2p/src/lib.rs
Normal file
76
coordinator/p2p/src/lib.rs
Normal file
@@ -0,0 +1,76 @@
|
||||
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
|
||||
#![doc = include_str!("../README.md")]
|
||||
#![deny(missing_docs)]
|
||||
|
||||
use core::future::Future;
|
||||
|
||||
use borsh::{BorshSerialize, BorshDeserialize};
|
||||
|
||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
|
||||
|
||||
use serai_cosign::SignedCosign;
|
||||
|
||||
/// A oneshot channel.
|
||||
pub mod oneshot;
|
||||
|
||||
/// The heartbeat task, effecting sync of Tributaries
|
||||
pub mod heartbeat;
|
||||
|
||||
/// A heartbeat for a Tributary.
|
||||
#[derive(Clone, Copy, BorshSerialize, BorshDeserialize, Debug)]
|
||||
pub struct Heartbeat {
|
||||
/// The Tributary this is the heartbeat of.
|
||||
pub set: ValidatorSet,
|
||||
/// The hash of the latest block added to the Tributary.
|
||||
pub latest_block_hash: [u8; 32],
|
||||
}
|
||||
|
||||
/// A tributary block and its commit.
|
||||
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||
pub struct TributaryBlockWithCommit {
|
||||
/// The serialized block.
|
||||
pub block: Vec<u8>,
|
||||
/// The serialized commit.
|
||||
pub commit: Vec<u8>,
|
||||
}
|
||||
|
||||
/// A representation of a peer.
|
||||
pub trait Peer<'a>: Send {
|
||||
/// Send a heartbeat to this peer.
|
||||
fn send_heartbeat(
|
||||
&self,
|
||||
heartbeat: Heartbeat,
|
||||
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>>;
|
||||
}
|
||||
|
||||
/// The representation of the P2P network.
|
||||
pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotableCosigns {
|
||||
/// The representation of a peer.
|
||||
type Peer<'a>: Peer<'a>;
|
||||
|
||||
/// Fetch the peers for this network.
|
||||
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>>;
|
||||
|
||||
/// A cancel-safe future for the next heartbeat received over the P2P network.
|
||||
///
|
||||
/// Yields the validator set its for, the latest block hash observed, and a channel to return the
|
||||
/// descending blocks.
|
||||
fn heartbeat(
|
||||
&self,
|
||||
) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)>;
|
||||
|
||||
/// A cancel-safe future for the next request for the notable cosigns of a gloabl session.
|
||||
///
|
||||
/// Yields the global session the request is for and a channel to return the notable cosigns.
|
||||
fn notable_cosigns_request(
|
||||
&self,
|
||||
) -> impl Send + Future<Output = ([u8; 32], oneshot::Sender<Vec<SignedCosign>>)>;
|
||||
|
||||
/// A cancel-safe future for the next message regarding a Tributary.
|
||||
///
|
||||
/// Yields the message's Tributary's genesis block hash and the message.
|
||||
fn tributary_message(&self) -> impl Send + Future<Output = ([u8; 32], Vec<u8>)>;
|
||||
|
||||
/// A cancel-safe future for the next cosign received.
|
||||
fn cosign(&self) -> impl Send + Future<Output = SignedCosign>;
|
||||
}
|
||||
35
coordinator/p2p/src/oneshot.rs
Normal file
35
coordinator/p2p/src/oneshot.rs
Normal file
@@ -0,0 +1,35 @@
|
||||
use core::{
|
||||
pin::Pin,
|
||||
task::{Poll, Context},
|
||||
future::Future,
|
||||
};
|
||||
|
||||
pub use async_channel::{SendError, RecvError};
|
||||
|
||||
/// The sender for a oneshot channel.
|
||||
pub struct Sender<T: Send>(async_channel::Sender<T>);
|
||||
impl<T: Send> Sender<T> {
|
||||
/// Send a value down the channel.
|
||||
///
|
||||
/// Returns an error if the channel's receiver was dropped.
|
||||
pub fn send(self, msg: T) -> Result<(), SendError<T>> {
|
||||
self.0.send_blocking(msg)
|
||||
}
|
||||
}
|
||||
|
||||
/// The receiver for a oneshot channel.
|
||||
pub struct Receiver<T: Send>(async_channel::Receiver<T>);
|
||||
impl<T: Send> Future for Receiver<T> {
|
||||
type Output = Result<T, RecvError>;
|
||||
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||
let recv = self.0.recv();
|
||||
futures_lite::pin!(recv);
|
||||
recv.poll(cx)
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a new oneshot channel.
|
||||
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
|
||||
let (send, recv) = async_channel::bounded(1);
|
||||
(Sender(send), Receiver(recv))
|
||||
}
|
||||
@@ -1,5 +1,9 @@
|
||||
mod tributary;
|
||||
mod p2p;
|
||||
|
||||
mod p2p {
|
||||
use serai_coordinator_p2p::*;
|
||||
pub use serai_coordinator_libp2p_p2p::Libp2p;
|
||||
}
|
||||
|
||||
fn main() {
|
||||
todo!("TODO")
|
||||
|
||||
@@ -1,214 +0,0 @@
|
||||
use core::{future::Future, time::Duration};
|
||||
use std::{
|
||||
sync::Arc,
|
||||
collections::{HashSet, HashMap},
|
||||
};
|
||||
|
||||
use zeroize::Zeroizing;
|
||||
use schnorrkel::Keypair;
|
||||
|
||||
use serai_client::{
|
||||
primitives::{NetworkId, PublicKey},
|
||||
validator_sets::primitives::ValidatorSet,
|
||||
Serai,
|
||||
};
|
||||
|
||||
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||
|
||||
use serai_task::{Task, ContinuallyRan};
|
||||
|
||||
use libp2p::{
|
||||
multihash::Multihash,
|
||||
identity::{self, PeerId},
|
||||
tcp::Config as TcpConfig,
|
||||
yamux,
|
||||
swarm::NetworkBehaviour,
|
||||
SwarmBuilder,
|
||||
};
|
||||
|
||||
use crate::p2p::TributaryBlockWithCommit;
|
||||
|
||||
/// A struct to sync the validators from the Serai node in order to keep track of them.
|
||||
mod validators;
|
||||
use validators::UpdateValidatorsTask;
|
||||
|
||||
/// The authentication protocol upgrade to limit the P2P network to active validators.
|
||||
mod authenticate;
|
||||
use authenticate::OnlyValidators;
|
||||
|
||||
/// The dial task, to find new peers to connect to
|
||||
mod dial;
|
||||
use dial::DialTask;
|
||||
|
||||
/// The request-response messages and behavior
|
||||
mod reqres;
|
||||
use reqres::{Request, Response};
|
||||
|
||||
/// The gossip messages and behavior
|
||||
mod gossip;
|
||||
|
||||
/// The swarm task, running it and dispatching to/from it
|
||||
mod swarm;
|
||||
use swarm::SwarmTask;
|
||||
|
||||
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
|
||||
|
||||
// usize::max, manually implemented, as max isn't a const fn
|
||||
const MAX_LIBP2P_MESSAGE_SIZE: usize =
|
||||
if gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE > reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE {
|
||||
gossip::MAX_LIBP2P_GOSSIP_MESSAGE_SIZE
|
||||
} else {
|
||||
reqres::MAX_LIBP2P_REQRES_MESSAGE_SIZE
|
||||
};
|
||||
|
||||
fn peer_id_from_public(public: PublicKey) -> PeerId {
|
||||
// 0 represents the identity Multihash, that no hash was performed
|
||||
// It's an internal constant so we can't refer to the constant inside libp2p
|
||||
PeerId::from_multihash(Multihash::wrap(0, &public.0).unwrap()).unwrap()
|
||||
}
|
||||
|
||||
struct Peer<'a> {
|
||||
outbound_requests: &'a mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||
id: PeerId,
|
||||
}
|
||||
impl crate::p2p::Peer<'_> for Peer<'_> {
|
||||
fn send_heartbeat(
|
||||
&self,
|
||||
set: ValidatorSet,
|
||||
latest_block_hash: [u8; 32],
|
||||
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>> {
|
||||
const HEARBEAT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
async move {
|
||||
let request = Request::Heartbeat { set, latest_block_hash };
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
self
|
||||
.outbound_requests
|
||||
.send((self.id, request, sender))
|
||||
.expect("outbound requests recv channel was dropped?");
|
||||
match tokio::time::timeout(HEARBEAT_TIMEOUT, receiver).await.ok()?.ok()? {
|
||||
Response::None => Some(vec![]),
|
||||
Response::Blocks(blocks) => Some(blocks),
|
||||
// TODO: Disconnect this peer
|
||||
Response::NotableCosigns(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Peers {
|
||||
peers: Arc<RwLock<HashMap<NetworkId, HashSet<PeerId>>>>,
|
||||
}
|
||||
|
||||
#[derive(NetworkBehaviour)]
|
||||
struct Behavior {
|
||||
reqres: reqres::Behavior,
|
||||
gossip: gossip::Behavior,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Libp2p {
|
||||
peers: Peers,
|
||||
outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
|
||||
}
|
||||
|
||||
impl Libp2p {
|
||||
pub(crate) fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> Libp2p {
|
||||
// Define the object we track peers with
|
||||
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
|
||||
|
||||
// Define the dial task
|
||||
let (dial_task_def, dial_task) = Task::new();
|
||||
let (to_dial_send, to_dial_recv) = mpsc::unbounded_channel();
|
||||
tokio::spawn(
|
||||
DialTask::new(serai.clone(), peers.clone(), to_dial_send)
|
||||
.continually_run(dial_task_def, vec![]),
|
||||
);
|
||||
|
||||
// Define the Validators object used for validating new connections
|
||||
let connection_validators = UpdateValidatorsTask::spawn(serai.clone());
|
||||
let new_only_validators = |noise_keypair: &identity::Keypair| -> Result<_, ()> {
|
||||
Ok(OnlyValidators {
|
||||
serai_key: serai_key.clone(),
|
||||
validators: connection_validators.clone(),
|
||||
noise_keypair: noise_keypair.clone(),
|
||||
})
|
||||
};
|
||||
|
||||
let new_yamux = || {
|
||||
let mut config = yamux::Config::default();
|
||||
// 1 MiB default + max message size
|
||||
config.set_max_buffer_size((1024 * 1024) + MAX_LIBP2P_MESSAGE_SIZE);
|
||||
// 256 KiB default + max message size
|
||||
config.set_receive_window_size(((256 * 1024) + MAX_LIBP2P_MESSAGE_SIZE).try_into().unwrap());
|
||||
config
|
||||
};
|
||||
|
||||
let behavior = Behavior { reqres: reqres::new_behavior(), gossip: gossip::new_behavior() };
|
||||
|
||||
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
|
||||
.with_tokio()
|
||||
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
|
||||
.unwrap()
|
||||
.with_behaviour(|_| behavior)
|
||||
.unwrap()
|
||||
.build();
|
||||
swarm.listen_on(format!("/ip4/0.0.0.0/tcp/{PORT}").parse().unwrap()).unwrap();
|
||||
swarm.listen_on(format!("/ip6/::/tcp/{PORT}").parse().unwrap()).unwrap();
|
||||
|
||||
let swarm_validators = UpdateValidatorsTask::spawn(serai);
|
||||
|
||||
let (gossip_send, gossip_recv) = mpsc::unbounded_channel();
|
||||
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
|
||||
let (tributary_gossip_send, tributary_gossip_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let (outbound_requests_send, outbound_requests_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let (heartbeat_requests_send, heartbeat_requests_recv) = mpsc::unbounded_channel();
|
||||
let (notable_cosign_requests_send, notable_cosign_requests_recv) = mpsc::unbounded_channel();
|
||||
let (inbound_request_responses_send, inbound_request_responses_recv) =
|
||||
mpsc::unbounded_channel();
|
||||
|
||||
// Create the swarm task
|
||||
SwarmTask::spawn(
|
||||
dial_task,
|
||||
to_dial_recv,
|
||||
swarm_validators,
|
||||
peers,
|
||||
swarm,
|
||||
gossip_recv,
|
||||
signed_cosigns_send,
|
||||
tributary_gossip_send,
|
||||
outbound_requests_recv,
|
||||
heartbeat_requests_send,
|
||||
notable_cosign_requests_send,
|
||||
inbound_request_responses_recv,
|
||||
);
|
||||
|
||||
// gossip_send, signed_cosigns_recv, tributary_gossip_recv, outbound_requests_send,
|
||||
// heartbeat_requests_recv, notable_cosign_requests_recv, inbound_request_responses_send
|
||||
todo!("TODO");
|
||||
}
|
||||
}
|
||||
|
||||
impl tributary::P2p for Libp2p {
|
||||
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
|
||||
async move { todo!("TODO") }
|
||||
}
|
||||
}
|
||||
|
||||
impl crate::p2p::P2p for Libp2p {
|
||||
type Peer<'a> = Peer<'a>;
|
||||
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>> {
|
||||
async move {
|
||||
let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else {
|
||||
return vec![];
|
||||
};
|
||||
let mut res = vec![];
|
||||
for id in peer_ids {
|
||||
res.push(Peer { outbound_requests: &self.outbound_requests, id });
|
||||
}
|
||||
res
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,31 +0,0 @@
|
||||
use core::future::Future;
|
||||
|
||||
use borsh::{BorshSerialize, BorshDeserialize};
|
||||
|
||||
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
|
||||
|
||||
/// The libp2p-backed P2p network
|
||||
mod libp2p;
|
||||
|
||||
/// The heartbeat task, effecting sync of Tributaries
|
||||
mod heartbeat;
|
||||
|
||||
/// A tributary block and its commit.
|
||||
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||
pub(crate) struct TributaryBlockWithCommit {
|
||||
pub(crate) block: Vec<u8>,
|
||||
pub(crate) commit: Vec<u8>,
|
||||
}
|
||||
|
||||
trait Peer<'a>: Send {
|
||||
fn send_heartbeat(
|
||||
&self,
|
||||
set: ValidatorSet,
|
||||
latest_block_hash: [u8; 32],
|
||||
) -> impl Send + Future<Output = Option<Vec<TributaryBlockWithCommit>>>;
|
||||
}
|
||||
|
||||
trait P2p: Send + Sync + tributary::P2p {
|
||||
type Peer<'a>: Peer<'a>;
|
||||
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>>;
|
||||
}
|
||||
@@ -75,6 +75,8 @@ exceptions = [
|
||||
{ allow = ["AGPL-3.0"], name = "tributary-chain" },
|
||||
{ allow = ["AGPL-3.0"], name = "serai-cosign" },
|
||||
{ allow = ["AGPL-3.0"], name = "serai-coordinator-substrate" },
|
||||
{ allow = ["AGPL-3.0"], name = "serai-coordinator-p2p" },
|
||||
{ allow = ["AGPL-3.0"], name = "serai-coordinator-libp2p-p2p" },
|
||||
{ allow = ["AGPL-3.0"], name = "serai-coordinator" },
|
||||
|
||||
{ allow = ["AGPL-3.0"], name = "serai-coins-pallet" },
|
||||
|
||||
Reference in New Issue
Block a user