4 Commits

Author SHA1 Message Date
Luke Parker
cbe83956aa Flesh out Coordinator main
Lot of TODOs as the APIs are all being routed together.
2025-01-10 02:24:24 -05:00
Luke Parker
091d485fd8 Have the Tributary scanner DB be distinct from the cosign DB
Allows deleting the entire Tributary scanner DB upon retiry.
2025-01-10 02:22:58 -05:00
Luke Parker
2a3eaf4d7e Wrap the entire Libp2p object in an Arc
Makes `Clone` calls significantly cheaper as now only the outer Arc is cloned
(the inner ones have been removed). Also wraps uses of Serai in an Arc as we
shouldn't actually need/want multiple caller connection pools.
2025-01-10 01:26:07 -05:00
Luke Parker
23122712cb Document validator jailing upon participation failures and slash report determination
These are TODOs. I just wanted to ensure this was written down and each seemed
too small for GH issues.
2025-01-09 19:50:39 -05:00
15 changed files with 488 additions and 103 deletions

2
Cargo.lock generated
View File

@@ -8319,7 +8319,6 @@ dependencies = [
"borsh",
"ciphersuite",
"env_logger",
"flexible-transcript",
"frost-schnorrkel",
"hex",
"log",
@@ -8340,6 +8339,7 @@ dependencies = [
"serai-task",
"sp-application-crypto",
"sp-runtime",
"tokio",
"tributary-chain",
"zalloc",
"zeroize",

View File

@@ -25,7 +25,6 @@ rand_core = { version = "0.6", default-features = false, features = ["std"] }
blake2 = { version = "0.10", default-features = false, features = ["std"] }
schnorrkel = { version = "0.11", default-features = false, features = ["std"] }
transcript = { package = "flexible-transcript", path = "../crypto/transcript", default-features = false, features = ["std", "recommended"] }
ciphersuite = { path = "../crypto/ciphersuite", default-features = false, features = ["std"] }
schnorr = { package = "schnorr-signatures", path = "../crypto/schnorr", default-features = false, features = ["std"] }
frost = { package = "modular-frost", path = "../crypto/frost" }
@@ -42,7 +41,6 @@ messages = { package = "serai-processor-messages", path = "../processor/messages
message-queue = { package = "serai-message-queue", path = "../message-queue" }
tributary = { package = "tributary-chain", path = "./tributary" }
sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] }
serai-client = { path = "../substrate/client", default-features = false, features = ["serai", "borsh"] }
hex = { version = "0.4", default-features = false, features = ["std"] }
@@ -51,17 +49,14 @@ borsh = { version = "1", default-features = false, features = ["std", "derive",
log = { version = "0.4", default-features = false, features = ["std"] }
env_logger = { version = "0.10", default-features = false, features = ["humantime"] }
tokio = { version = "1", default-features = false, features = ["time", "sync", "macros", "rt-multi-thread"] }
serai-cosign = { path = "./cosign" }
serai-coordinator-substrate = { path = "./substrate" }
serai-coordinator-p2p = { path = "./p2p" }
serai-coordinator-libp2p-p2p = { path = "./p2p/libp2p" }
[dev-dependencies]
tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] }
sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] }
sp-runtime = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] }
[features]
longer-reattempts = []
longer-reattempts = [] # TODO
parity-db = ["serai-db/parity-db"]
rocksdb = ["serai-db/rocksdb"]

View File

@@ -1,5 +1,5 @@
use core::future::Future;
use std::collections::HashMap;
use std::{sync::Arc, collections::HashMap};
use serai_client::{
primitives::{SeraiAddress, Amount},
@@ -57,7 +57,7 @@ async fn block_has_events_justifying_a_cosign(
/// A task to determine which blocks we should intend to cosign.
pub(crate) struct CosignIntendTask<D: Db> {
pub(crate) db: D,
pub(crate) serai: Serai,
pub(crate) serai: Arc<Serai>,
}
impl<D: Db> ContinuallyRan for CosignIntendTask<D> {

View File

@@ -3,7 +3,7 @@
#![deny(missing_docs)]
use core::{fmt::Debug, future::Future};
use std::collections::HashMap;
use std::{sync::Arc, collections::HashMap};
use blake2::{Digest, Blake2s256};
@@ -240,7 +240,7 @@ impl<D: Db> Cosigning<D> {
/// only used once at any given time.
pub fn spawn<R: RequestNotableCosigns>(
db: D,
serai: Serai,
serai: Arc<Serai>,
request: R,
tasks_to_run_upon_cosigning: Vec<TaskHandle>,
) -> Self {
@@ -334,10 +334,9 @@ impl<D: Db> Cosigning<D> {
}
}
/// Intake a cosign from the Serai network.
/// Intake a cosign.
///
/// - Returns Err(_) if there was an error trying to validate the cosign and it should be retired
/// later.
/// - Returns Err(_) if there was an error trying to validate the cosign.
/// - Returns Ok(true) if the cosign was successfully handled or could not be handled at this
/// time.
/// - Returns Ok(false) if the cosign was invalid.

View File

@@ -1,5 +1,5 @@
use core::future::Future;
use std::collections::HashSet;
use std::{sync::Arc, collections::HashSet};
use rand_core::{RngCore, OsRng};
@@ -29,14 +29,18 @@ const TARGET_PEERS_PER_NETWORK: usize = 5;
// TODO const TARGET_DIALED_PEERS_PER_NETWORK: usize = 3;
pub(crate) struct DialTask {
serai: Serai,
serai: Arc<Serai>,
validators: Validators,
peers: Peers,
to_dial: mpsc::UnboundedSender<DialOpts>,
}
impl DialTask {
pub(crate) fn new(serai: Serai, peers: Peers, to_dial: mpsc::UnboundedSender<DialOpts>) -> Self {
pub(crate) fn new(
serai: Arc<Serai>,
peers: Peers,
to_dial: mpsc::UnboundedSender<DialOpts>,
) -> Self {
DialTask { serai: serai.clone(), validators: Validators::new(serai).0, peers, to_dial }
}
}

View File

@@ -131,33 +131,35 @@ struct Behavior {
gossip: gossip::Behavior,
}
/// The libp2p-backed P2P implementation.
///
/// The P2p trait implementation does not support backpressure and is expected to be fully
/// utilized. Failure to poll the entire API will cause unbounded memory growth.
#[allow(clippy::type_complexity)]
#[derive(Clone)]
pub struct Libp2p {
struct Libp2pInner {
peers: Peers,
gossip: mpsc::UnboundedSender<Message>,
outbound_requests: mpsc::UnboundedSender<(PeerId, Request, oneshot::Sender<Response>)>,
tributary_gossip: Arc<Mutex<mpsc::UnboundedReceiver<([u8; 32], Vec<u8>)>>>,
tributary_gossip: Mutex<mpsc::UnboundedReceiver<([u8; 32], Vec<u8>)>>,
signed_cosigns: Arc<Mutex<mpsc::UnboundedReceiver<SignedCosign>>>,
signed_cosigns: Mutex<mpsc::UnboundedReceiver<SignedCosign>>,
signed_cosigns_send: mpsc::UnboundedSender<SignedCosign>,
heartbeat_requests: Arc<Mutex<mpsc::UnboundedReceiver<(RequestId, ValidatorSet, [u8; 32])>>>,
notable_cosign_requests: Arc<Mutex<mpsc::UnboundedReceiver<(RequestId, [u8; 32])>>>,
heartbeat_requests: Mutex<mpsc::UnboundedReceiver<(RequestId, ValidatorSet, [u8; 32])>>,
notable_cosign_requests: Mutex<mpsc::UnboundedReceiver<(RequestId, [u8; 32])>>,
inbound_request_responses: mpsc::UnboundedSender<(RequestId, Response)>,
}
/// The libp2p-backed P2P implementation.
///
/// The P2p trait implementation does not support backpressure and is expected to be fully
/// utilized. Failure to poll the entire API will cause unbounded memory growth.
#[derive(Clone)]
pub struct Libp2p(Arc<Libp2pInner>);
impl Libp2p {
/// Create a new libp2p-backed P2P instance.
///
/// This will spawn all of the internal tasks necessary for functioning.
pub fn new(serai_key: &Zeroizing<Keypair>, serai: Serai) -> Libp2p {
pub fn new(serai_key: &Zeroizing<Keypair>, serai: Arc<Serai>) -> Libp2p {
// Define the object we track peers with
let peers = Peers { peers: Arc::new(RwLock::new(HashMap::new())) };
@@ -239,21 +241,21 @@ impl Libp2p {
inbound_request_responses_recv,
);
Libp2p {
Libp2p(Arc::new(Libp2pInner {
peers,
gossip: gossip_send,
outbound_requests: outbound_requests_send,
tributary_gossip: Arc::new(Mutex::new(tributary_gossip_recv)),
tributary_gossip: Mutex::new(tributary_gossip_recv),
signed_cosigns: Arc::new(Mutex::new(signed_cosigns_recv)),
signed_cosigns: Mutex::new(signed_cosigns_recv),
signed_cosigns_send,
heartbeat_requests: Arc::new(Mutex::new(heartbeat_requests_recv)),
notable_cosign_requests: Arc::new(Mutex::new(notable_cosign_requests_recv)),
heartbeat_requests: Mutex::new(heartbeat_requests_recv),
notable_cosign_requests: Mutex::new(notable_cosign_requests_recv),
inbound_request_responses: inbound_request_responses_send,
}
}))
}
}
@@ -261,6 +263,7 @@ impl tributary::P2p for Libp2p {
fn broadcast(&self, tributary: [u8; 32], message: Vec<u8>) -> impl Send + Future<Output = ()> {
async move {
self
.0
.gossip
.send(Message::Tributary { tributary, message })
.expect("gossip recv channel was dropped?");
@@ -281,7 +284,7 @@ impl serai_cosign::RequestNotableCosigns for Libp2p {
let request = Request::NotableCosigns { global_session };
let peers = self.peers.peers.read().await.clone();
let peers = self.0.peers.peers.read().await.clone();
// HashSet of all peers
let peers = peers.into_values().flat_map(<_>::into_iter).collect::<HashSet<_>>();
// Vec of all peers
@@ -297,6 +300,7 @@ impl serai_cosign::RequestNotableCosigns for Libp2p {
let (sender, receiver) = oneshot::channel();
self
.0
.outbound_requests
.send((peer, request, sender))
.expect("outbound requests recv channel was dropped?");
@@ -310,6 +314,7 @@ impl serai_cosign::RequestNotableCosigns for Libp2p {
{
for cosign in cosigns {
self
.0
.signed_cosigns_send
.send(cosign)
.expect("signed_cosigns recv in this object was dropped?");
@@ -327,22 +332,29 @@ impl serai_coordinator_p2p::P2p for Libp2p {
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>> {
async move {
let Some(peer_ids) = self.peers.peers.read().await.get(&network).cloned() else {
let Some(peer_ids) = self.0.peers.peers.read().await.get(&network).cloned() else {
return vec![];
};
let mut res = vec![];
for id in peer_ids {
res.push(Peer { outbound_requests: &self.outbound_requests, id });
res.push(Peer { outbound_requests: &self.0.outbound_requests, id });
}
res
}
}
fn publish_cosign(&self, cosign: SignedCosign) -> impl Send + Future<Output = ()> {
async move {
self.0.gossip.send(Message::Cosign(cosign)).expect("gossip recv channel was dropped?");
}
}
fn heartbeat(
&self,
) -> impl Send + Future<Output = (Heartbeat, oneshot::Sender<Vec<TributaryBlockWithCommit>>)> {
async move {
let (request_id, set, latest_block_hash) = self
.0
.heartbeat_requests
.lock()
.await
@@ -351,7 +363,7 @@ impl serai_coordinator_p2p::P2p for Libp2p {
.expect("heartbeat_requests_send was dropped?");
let (sender, receiver) = oneshot::channel();
tokio::spawn({
let respond = self.inbound_request_responses.clone();
let respond = self.0.inbound_request_responses.clone();
async move {
// The swarm task expects us to respond to every request. If the caller drops this
// channel, we'll receive `Err` and respond with `vec![]`, safely satisfying that bound
@@ -375,6 +387,7 @@ impl serai_coordinator_p2p::P2p for Libp2p {
) -> impl Send + Future<Output = ([u8; 32], oneshot::Sender<Vec<SignedCosign>>)> {
async move {
let (request_id, global_session) = self
.0
.notable_cosign_requests
.lock()
.await
@@ -383,7 +396,7 @@ impl serai_coordinator_p2p::P2p for Libp2p {
.expect("notable_cosign_requests_send was dropped?");
let (sender, receiver) = oneshot::channel();
tokio::spawn({
let respond = self.inbound_request_responses.clone();
let respond = self.0.inbound_request_responses.clone();
async move {
let response = if let Ok(notable_cosigns) = receiver.await {
Response::NotableCosigns(notable_cosigns)
@@ -401,13 +414,14 @@ impl serai_coordinator_p2p::P2p for Libp2p {
fn tributary_message(&self) -> impl Send + Future<Output = ([u8; 32], Vec<u8>)> {
async move {
self.tributary_gossip.lock().await.recv().await.expect("tributary_gossip send was dropped?")
self.0.tributary_gossip.lock().await.recv().await.expect("tributary_gossip send was dropped?")
}
}
fn cosign(&self) -> impl Send + Future<Output = SignedCosign> {
async move {
self
.0
.signed_cosigns
.lock()
.await

View File

@@ -21,7 +21,7 @@ pub(crate) struct Changes {
}
pub(crate) struct Validators {
serai: Serai,
serai: Arc<Serai>,
// A cache for which session we're populated with the validators of
sessions: HashMap<NetworkId, Session>,
@@ -35,7 +35,7 @@ pub(crate) struct Validators {
}
impl Validators {
pub(crate) fn new(serai: Serai) -> (Self, mpsc::UnboundedReceiver<Changes>) {
pub(crate) fn new(serai: Arc<Serai>) -> (Self, mpsc::UnboundedReceiver<Changes>) {
let (send, recv) = mpsc::unbounded_channel();
let validators = Validators {
serai,
@@ -148,7 +148,7 @@ impl Validators {
/// Update the view of the validators.
pub(crate) async fn update(&mut self) -> Result<(), String> {
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
let session_changes = Self::session_changes(&*self.serai, &self.sessions).await?;
self.incorporate_session_changes(session_changes);
Ok(())
}
@@ -174,7 +174,9 @@ impl UpdateValidatorsTask {
/// Spawn a new instance of the UpdateValidatorsTask.
///
/// This returns a reference to the Validators it updates after spawning itself.
pub(crate) fn spawn(serai: Serai) -> (Arc<RwLock<Validators>>, mpsc::UnboundedReceiver<Changes>) {
pub(crate) fn spawn(
serai: Arc<Serai>,
) -> (Arc<RwLock<Validators>>, mpsc::UnboundedReceiver<Changes>) {
// The validators which will be updated
let (validators, changes) = Validators::new(serai);
let validators = Arc::new(RwLock::new(validators));

View File

@@ -56,6 +56,9 @@ pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotab
/// Fetch the peers for this network.
fn peers(&self, network: NetworkId) -> impl Send + Future<Output = Vec<Self::Peer<'_>>>;
/// Broadcast a cosign.
fn publish_cosign(&self, cosign: SignedCosign) -> impl Send + Future<Output = ()>;
/// A cancel-safe future for the next heartbeat received over the P2P network.
///
/// Yields the validator set its for, the latest block hash observed, and a channel to return the

View File

@@ -1,10 +1,329 @@
use core::{marker::PhantomData, ops::Deref, time::Duration};
use std::{sync::Arc, time::Instant, collections::HashMap};
use zeroize::{Zeroize, Zeroizing};
use rand_core::{RngCore, OsRng};
use blake2::{digest::typenum::U32, Digest, Blake2s};
use ciphersuite::{
group::{ff::PrimeField, GroupEncoding},
Ciphersuite, Ristretto,
};
use tokio::sync::mpsc;
use scale::Encode;
use serai_client::{
primitives::{NetworkId, PublicKey, SeraiAddress},
validator_sets::primitives::ValidatorSet,
Serai,
};
use message_queue::{Service, client::MessageQueue};
use ::tributary::Tributary;
use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{SignedCosign, Cosigning};
use serai_coordinator_substrate::{NewSetInformation, CanonicalEventStream, EphemeralEventStream};
mod tributary;
use tributary::{Transaction, ScanTributaryTask};
mod p2p {
pub use serai_coordinator_p2p::*;
pub use serai_coordinator_libp2p_p2p::Libp2p;
}
fn main() {
// Use a zeroizing allocator for this entire application
// While secrets should already be zeroized, the presence of secret keys in a networked application
// (at increased risk of OOB reads) justifies the performance hit in case any secrets weren't
// already
#[global_allocator]
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
type Db = serai_db::ParityDb;
#[cfg(feature = "rocksdb")]
type Db = serai_db::RocksDB;
#[allow(unused_variables, unreachable_code)]
fn db(path: &str) -> Db {
#[cfg(all(feature = "parity-db", feature = "rocksdb"))]
panic!("built with parity-db and rocksdb");
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
let db = serai_db::new_parity_db(path);
#[cfg(feature = "rocksdb")]
let db = serai_db::new_rocksdb(path);
db
}
fn coordinator_db() -> Db {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
db(&format!("{root_path}/coordinator"))
}
fn tributary_db(set: ValidatorSet) -> Db {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
let network = match set.network {
NetworkId::Serai => panic!("creating Tributary for the Serai network"),
NetworkId::Bitcoin => "Bitcoin",
NetworkId::Ethereum => "Ethereum",
NetworkId::Monero => "Monero",
};
db(&format!("{root_path}/tributary-{network}-{}", set.session.0))
}
async fn serai() -> Arc<Serai> {
const SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(10);
const MAX_SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(300);
let mut delay = SERAI_CONNECTION_DELAY;
loop {
let Ok(serai) = Serai::new(format!(
"http://{}:9944",
serai_env::var("SERAI_HOSTNAME").expect("Serai hostname wasn't provided")
))
.await
else {
log::error!("couldn't connect to the Serai node");
tokio::time::sleep(delay).await;
delay = (delay + SERAI_CONNECTION_DELAY).min(MAX_SERAI_CONNECTION_DELAY);
continue;
};
log::info!("made initial connection to Serai node");
return Arc::new(serai);
}
}
// TODO: intended_cosigns
fn spawn_cosigning(
db: impl serai_db::Db,
serai: Arc<Serai>,
p2p: impl p2p::P2p,
tasks_to_run_upon_cosigning: Vec<TaskHandle>,
mut p2p_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
mut signed_cosigns: mpsc::UnboundedReceiver<SignedCosign>,
) {
let mut cosigning = Cosigning::spawn(db, serai, p2p.clone(), tasks_to_run_upon_cosigning);
tokio::spawn(async move {
let last_cosign_rebroadcast = Instant::now();
loop {
let time_till_cosign_rebroadcast = (last_cosign_rebroadcast +
serai_cosign::BROADCAST_FREQUENCY)
.saturating_duration_since(Instant::now());
tokio::select! {
() = tokio::time::sleep(time_till_cosign_rebroadcast) => {
for cosign in cosigning.cosigns_to_rebroadcast() {
p2p.publish_cosign(cosign).await;
}
}
cosign = p2p_cosigns.recv() => {
let cosign = cosign.expect("p2p cosigns channel was dropped?");
let _: Result<_, _> = cosigning.intake_cosign(&cosign);
}
cosign = signed_cosigns.recv() => {
let cosign = cosign.expect("signed cosigns channel was dropped?");
// TODO: Handle this error
let _: Result<_, _> = cosigning.intake_cosign(&cosign);
p2p.publish_cosign(cosign).await;
}
}
}
});
}
/// Spawn an existing Tributary.
///
/// This will spawn the Tributary, the Tributary scanning task, and inform the P2P network.
async fn spawn_tributary<P: p2p::P2p>(
db: Db,
p2p: P,
p2p_add_tributary: mpsc::UnboundedSender<Tributary<Db, Transaction, P>>,
set: NewSetInformation,
serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
) {
let genesis = <[u8; 32]>::from(Blake2s::<U32>::digest((set.serai_block, set.set).encode()));
// Since the Serai block will be finalized, then cosigned, before we handle this, this time will
// be a couple of minutes stale. While the Tributary will still function with a start time in the
// past, the Tributary will immediately incur round timeouts. We reduce these by adding a
// constant delay of a couple of minutes.
const TRIBUTARY_START_TIME_DELAY: u64 = 120;
let start_time = set.declaration_time + TRIBUTARY_START_TIME_DELAY;
let mut tributary_validators = Vec::with_capacity(set.validators.len());
let mut validators = Vec::with_capacity(set.validators.len());
let mut total_weight = 0;
let mut validator_weights = HashMap::with_capacity(set.validators.len());
for (validator, weight) in set.validators {
let validator_key = <Ristretto as Ciphersuite>::read_G(&mut validator.0.as_slice())
.expect("Serai validator had an invalid public key");
let validator = SeraiAddress::from(validator);
let weight = u64::from(weight);
tributary_validators.push((validator_key, weight));
validators.push(validator);
total_weight += weight;
validator_weights.insert(validator, weight);
}
let tributary_db = tributary_db(set.set);
let tributary = Tributary::<_, Transaction, _>::new(
tributary_db.clone(),
genesis,
start_time,
serai_key,
tributary_validators,
p2p,
)
.await
.unwrap();
let reader = tributary.reader();
p2p_add_tributary.send(tributary).expect("p2p's add_tributary channel was closed?");
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn(
(ScanTributaryTask {
cosign_db: db,
tributary_db,
set: set.set,
validators,
total_weight,
validator_weights,
tributary: reader,
_p2p: PhantomData::<P>,
})
.continually_run(scan_tributary_task_def, vec![todo!("TODO")]),
);
// TODO^ On Tributary block, drain this task's ProcessorMessages
// Have the tributary scanner run as soon as there's a new block
// TODO: Implement retiry, this will hold the tributary/handle indefinitely
tokio::spawn(async move {
loop {
tributary
.next_block_notification()
.await
.await
.map_err(|_| ())
// unreachable since this owns the tributary object and doesn't drop it
.expect("tributary was dropped causing notification to error");
scan_tributary_task.run_now();
}
});
}
#[tokio::main]
async fn main() {
// Override the panic handler with one which will panic if any tokio task panics
{
let existing = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic| {
existing(panic);
const MSG: &str = "exiting the process due to a task panicking";
println!("{MSG}");
log::error!("{MSG}");
std::process::exit(1);
}));
}
// Initialize the logger
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", serai_env::var("RUST_LOG").unwrap_or_else(|| "info".to_string()));
}
env_logger::init();
log::info!("starting coordinator service...");
// Read the Serai key from the env
let serai_key = {
let mut key_hex = serai_env::var("SERAI_KEY").expect("Serai key wasn't provided");
let mut key_vec = hex::decode(&key_hex).map_err(|_| ()).expect("Serai key wasn't hex-encoded");
key_hex.zeroize();
if key_vec.len() != 32 {
key_vec.zeroize();
panic!("Serai key had an invalid length");
}
let mut key_bytes = [0; 32];
key_bytes.copy_from_slice(&key_vec);
key_vec.zeroize();
let key = Zeroizing::new(<Ristretto as Ciphersuite>::F::from_repr(key_bytes).unwrap());
key_bytes.zeroize();
key
};
// Open the database
let db = coordinator_db();
// Connect to the message-queue
let message_queue = MessageQueue::from_env(Service::Coordinator);
// Connect to the Serai node
let serai = serai().await;
let (p2p_add_tributary_send, p2p_add_tributary_recv) = mpsc::unbounded_channel();
let (p2p_retire_tributary_send, p2p_retire_tributary_recv) = mpsc::unbounded_channel();
let (p2p_cosigns_send, p2p_cosigns_recv) = mpsc::unbounded_channel();
// Spawn the P2P network
let p2p = {
let serai_keypair = {
let mut key_bytes = serai_key.to_bytes();
// Schnorrkel SecretKey is the key followed by 32 bytes of entropy for nonces
let mut expanded_key = Zeroizing::new([0; 64]);
expanded_key.as_mut_slice()[.. 32].copy_from_slice(&key_bytes);
OsRng.fill_bytes(&mut expanded_key.as_mut_slice()[32 ..]);
key_bytes.zeroize();
Zeroizing::new(
schnorrkel::SecretKey::from_bytes(expanded_key.as_slice()).unwrap().to_keypair(),
)
};
let p2p = p2p::Libp2p::new(&serai_keypair, serai.clone());
tokio::spawn(p2p::run::<Db, Transaction, _>(
db.clone(),
p2p.clone(),
p2p_add_tributary_recv,
p2p_retire_tributary_recv,
p2p_cosigns_send,
));
p2p
};
// TODO: p2p_add_tributary_send, p2p_retire_tributary_send
// Spawn the Substrate scanners
// TODO: Canonical, NewSet, SignSlashReport
let (substrate_canonical_task_def, substrate_canonical_task) = Task::new();
tokio::spawn(
CanonicalEventStream::new(db.clone(), serai.clone())
.continually_run(substrate_canonical_task_def, todo!("TODO")),
);
let (substrate_ephemeral_task_def, substrate_ephemeral_task) = Task::new();
tokio::spawn(
EphemeralEventStream::new(
db.clone(),
serai.clone(),
PublicKey::from_raw((<Ristretto as Ciphersuite>::generator() * serai_key.deref()).to_bytes()),
)
.continually_run(substrate_ephemeral_task_def, todo!("TODO")),
);
// Spawn the cosign handler
let (signed_cosigns_send, signed_cosigns_recv) = mpsc::unbounded_channel();
spawn_cosigning(
db.clone(),
serai.clone(),
p2p.clone(),
// Run the Substrate scanners once we cosign new blocks
vec![substrate_canonical_task, substrate_ephemeral_task],
p2p_cosigns_recv,
signed_cosigns_recv,
);
// TODO: Reload tributaries from disk, handle processor messages
// TODO: On NewSet, save to DB, send KeyGen, spawn tributary task, inform P2P network
todo!("TODO")
}

View File

@@ -272,7 +272,19 @@ impl TributaryDb {
pub(crate) fn start_of_block(txn: &mut impl DbTxn, set: ValidatorSet, block_number: u64) {
for topic in Reattempt::take(txn, set, block_number).unwrap_or(vec![]) {
// TODO: Slash all people who preprocessed but didn't share
/*
TODO: Slash all people who preprocessed but didn't share, and add a delay to their
participations in future protocols. When we call accumulate, if the participant has no
delay, their accumulation occurs immediately. Else, the accumulation occurs after the
specified delay.
This means even if faulty validators are first to preprocess, they won't be selected for
the signing set unless there's a lack of less faulty validators available.
We need to decrease this delay upon successful partipations, and set it to the maximum upon
`f + 1` validators voting to fatally slash the validator in question. This won't issue the
fatal slash but should still be effective.
*/
Self::recognize_topic(txn, set, topic);
if let Some(id) = topic.sign_id(set) {
Self::send_message(txn, set, messages::sign::CoordinatorMessage::Reattempt { id });

View File

@@ -4,3 +4,4 @@ pub use transaction::Transaction;
mod db;
mod scan;
pub(crate) use scan::ScanTributaryTask;

View File

@@ -32,41 +32,43 @@ use crate::{
},
};
struct ScanBlock<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> {
_db: PhantomData<D>,
struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> {
_p2p: PhantomData<P>,
txn: &'a mut DT,
cosign_db: &'a CD,
tributary_txn: &'a mut TDT,
set: ValidatorSet,
validators: &'a [SeraiAddress],
total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>,
tributary: &'a TributaryReader<TD, Transaction>,
}
impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.txn, self.set) {
if TributaryDb::actively_cosigning(self.tributary_txn, self.set) {
return;
}
// Start cosigning the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set)
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
else {
return;
};
let Some(substrate_block_number) =
Cosigning::<D>::finalized_block_number(self.txn, latest_substrate_block_to_cosign)
Cosigning::<CD>::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign)
else {
// This is a valid panic as we shouldn't be scanning this block if we didn't provide all
// Provided transactions within it, and the block to cosign is a Provided transaction
panic!("cosigning a block our cosigner didn't index")
};
// Mark us as actively cosigning
TributaryDb::start_cosigning(self.txn, self.set, substrate_block_number);
TributaryDb::start_cosigning(self.tributary_txn, self.set, substrate_block_number);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.txn,
self.tributary_txn,
self.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session,
@@ -81,7 +83,11 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() {
// Don't handle transactions from those fatally slashed
// TODO: The fact they can publish these TXs makes this a notable spam vector
if TributaryDb::is_fatally_slashed(self.txn, self.set, SeraiAddress(signer.to_bytes())) {
if TributaryDb::is_fatally_slashed(
self.tributary_txn,
self.set,
SeraiAddress(signer.to_bytes()),
) {
return;
}
}
@@ -94,7 +100,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Check the participant voted to be removed actually exists
if !self.validators.iter().any(|validator| *validator == participant) {
TributaryDb::fatal_slash(
self.txn,
self.tributary_txn,
self.set,
signer,
"voted to remove non-existent participant",
@@ -103,7 +109,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
}
match TributaryDb::accumulate(
self.txn,
self.tributary_txn,
self.set,
self.validators,
self.total_weight,
@@ -115,7 +121,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
) {
DataSet::None => {}
DataSet::Participating(_) => {
TributaryDb::fatal_slash(self.txn, self.set, participant, "voted to remove");
TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove");
}
};
}
@@ -123,7 +129,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Send the participation to the processor
Transaction::DkgParticipation { participation, signed } => {
TributaryDb::send_message(
self.txn,
self.tributary_txn,
self.set,
messages::key_gen::CoordinatorMessage::Participation {
session: self.set.session,
@@ -143,16 +149,20 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
Transaction::Cosign { substrate_block_hash } => {
// Update the latest intended-to-be-cosigned Substrate block
TributaryDb::set_latest_substrate_block_to_cosign(self.txn, self.set, substrate_block_hash);
TributaryDb::set_latest_substrate_block_to_cosign(
self.tributary_txn,
self.set,
substrate_block_hash,
);
// Start a new cosign if we weren't already working on one
self.potentially_start_cosign();
}
Transaction::Cosigned { substrate_block_hash } => {
TributaryDb::finish_cosigning(self.txn, self.set);
TributaryDb::finish_cosigning(self.tributary_txn, self.set);
// Fetch the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set)
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
else {
return;
};
@@ -178,7 +188,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
if slash_points.len() != self.validators.len() {
TributaryDb::fatal_slash(
self.txn,
self.tributary_txn,
self.set,
signer,
"slash report was for a distinct amount of signers",
@@ -188,7 +198,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Accumulate, and if past the threshold, calculate *the* slash report and start signing it
match TributaryDb::accumulate(
self.txn,
self.tributary_txn,
self.set,
self.validators,
self.total_weight,
@@ -201,7 +211,17 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
DataSet::None => {}
DataSet::Participating(data_set) => {
// Find the median reported slashes for this validator
// TODO: This lets 34% perform a fatal slash. Should that be allowed?
/*
TODO: This lets 34% perform a fatal slash. That shouldn't be allowed. We need
to accept slash reports for a period past the threshold, and only fatally slash if we
have a supermajority agree the slash should be fatal. If there isn't a supermajority,
but the median believe the slash should be fatal, we need to fallback to a large
constant.
Also, TODO, each slash point should probably be considered as
`MAX_KEY_SHARES_PER_SET * BLOCK_TIME` seconds of downtime. As this time crosses
various thresholds (1 day, 3 days, etc), a multiplier should be attached.
*/
let mut median_slash_report = Vec::with_capacity(self.validators.len());
for i in 0 .. self.validators.len() {
let mut this_validator =
@@ -250,7 +270,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Recognize the topic for signing the slash report
TributaryDb::recognize_topic(
self.txn,
self.tributary_txn,
self.set,
Topic::Sign {
id: VariantSignId::SlashReport,
@@ -260,7 +280,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.txn,
self.tributary_txn,
self.set,
messages::coordinator::CoordinatorMessage::SignSlashReport {
session: self.set.session,
@@ -277,7 +297,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] {
TributaryDb::fatal_slash(
self.txn,
self.tributary_txn,
self.set,
signer,
"signer signed with a distinct amount of key shares than they had key shares",
@@ -286,7 +306,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
}
match TributaryDb::accumulate(
self.txn,
self.tributary_txn,
self.set,
self.validators,
self.total_weight,
@@ -302,7 +322,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
let flatten_data_set = |data_set| todo!("TODO");
let data_set = flatten_data_set(data_set);
TributaryDb::send_message(
self.txn,
self.tributary_txn,
self.set,
match round {
SigningProtocolRound::Preprocess => {
@@ -320,7 +340,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
}
fn handle_block(mut self, block_number: u64, block: Block<Transaction>) {
TributaryDb::start_of_block(self.txn, self.set, block_number);
TributaryDb::start_of_block(self.tributary_txn, self.set, block_number);
for tx in block.transactions {
match tx {
@@ -346,7 +366,7 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
// Since anything with evidence is fundamentally faulty behavior, not just temporal
// errors, mark the node as fatally slashed
TributaryDb::fatal_slash(
self.txn,
self.tributary_txn,
self.set,
SeraiAddress(msgs.0.msg.sender),
&format!("invalid tendermint messages: {msgs:?}"),
@@ -360,20 +380,21 @@ impl<'a, D: Db, DT: DbTxn, TD: Db, P: P2p> ScanBlock<'a, D, DT, TD, P> {
}
}
struct ScanTributaryTask<D: Db, TD: Db, P: P2p> {
db: D,
set: ValidatorSet,
validators: Vec<SeraiAddress>,
total_weight: u64,
validator_weights: HashMap<SeraiAddress, u64>,
tributary: TributaryReader<TD, Transaction>,
_p2p: PhantomData<P>,
pub(crate) struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> {
pub(crate) cosign_db: CD,
pub(crate) tributary_db: TD,
pub(crate) set: ValidatorSet,
pub(crate) validators: Vec<SeraiAddress>,
pub(crate) total_weight: u64,
pub(crate) validator_weights: HashMap<SeraiAddress, u64>,
pub(crate) tributary: TributaryReader<TD, Transaction>,
pub(crate) _p2p: PhantomData<P>,
}
impl<D: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<D, TD, P> {
impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.db, self.set)
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)
.unwrap_or((0, self.tributary.genesis()));
let mut made_progess = false;
@@ -397,11 +418,11 @@ impl<D: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<D, TD, P> {
}
}
let mut txn = self.db.txn();
let mut tributary_txn = self.tributary_db.txn();
(ScanBlock {
_db: PhantomData::<D>,
_p2p: PhantomData::<P>,
txn: &mut txn,
cosign_db: &self.cosign_db,
tributary_txn: &mut tributary_txn,
set: self.set,
validators: &self.validators,
total_weight: self.total_weight,
@@ -409,10 +430,15 @@ impl<D: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<D, TD, P> {
tributary: &self.tributary,
})
.handle_block(block_number, block);
TributaryDb::set_last_handled_tributary_block(&mut txn, self.set, block_number, block_hash);
TributaryDb::set_last_handled_tributary_block(
&mut tributary_txn,
self.set,
block_number,
block_hash,
);
last_block_number = block_number;
last_block_hash = block_hash;
txn.commit();
tributary_txn.commit();
made_progess = true;
}

View File

@@ -1,4 +1,5 @@
use std::future::Future;
use core::future::Future;
use std::sync::Arc;
use futures::stream::{StreamExt, FuturesOrdered};
@@ -20,14 +21,14 @@ create_db!(
/// The event stream for canonical events.
pub struct CanonicalEventStream<D: Db> {
db: D,
serai: Serai,
serai: Arc<Serai>,
}
impl<D: Db> CanonicalEventStream<D> {
/// Create a new canonical event stream.
///
/// Only one of these may exist over the provided database.
pub fn new(db: D, serai: Serai) -> Self {
pub fn new(db: D, serai: Arc<Serai>) -> Self {
Self { db, serai }
}
}

View File

@@ -1,4 +1,5 @@
use std::future::Future;
use core::future::Future;
use std::sync::Arc;
use futures::stream::{StreamExt, FuturesOrdered};
@@ -24,7 +25,7 @@ create_db!(
/// The event stream for ephemeral events.
pub struct EphemeralEventStream<D: Db> {
db: D,
serai: Serai,
serai: Arc<Serai>,
validator: PublicKey,
}
@@ -32,7 +33,7 @@ impl<D: Db> EphemeralEventStream<D> {
/// Create a new ephemeral event stream.
///
/// Only one of these may exist over the provided database.
pub fn new(db: D, serai: Serai, validator: PublicKey) -> Self {
pub fn new(db: D, serai: Arc<Serai>, validator: PublicKey) -> Self {
Self { db, serai, validator }
}
}
@@ -216,7 +217,7 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
&NewSetInformation {
set: *set,
serai_block: block.block_hash,
start_time: block.time,
declaration_time: block.time,
// TODO: Why do we have this as an explicit field here?
// Shouldn't thiis be inlined into the Processor's key gen code, where it's used?
threshold: ((total_weight * 2) / 3) + 1,

View File

@@ -13,7 +13,9 @@ use serai_client::{
use serai_db::*;
mod canonical;
pub use canonical::CanonicalEventStream;
mod ephemeral;
pub use ephemeral::EphemeralEventStream;
fn borsh_serialize_validators<W: io::Write>(
validators: &Vec<(PublicKey, u16)>,
@@ -32,16 +34,22 @@ fn borsh_deserialize_validators<R: io::Read>(
/// The information for a new set.
#[derive(Debug, BorshSerialize, BorshDeserialize)]
pub struct NewSetInformation {
set: ValidatorSet,
serai_block: [u8; 32],
start_time: u64,
threshold: u16,
/// The set.
pub set: ValidatorSet,
/// The Serai block which declared it.
pub serai_block: [u8; 32],
/// The time of the block which declared it, in seconds.
pub declaration_time: u64,
/// The threshold to use.
pub threshold: u16,
/// The validators, with the amount of key shares they have.
#[borsh(
serialize_with = "borsh_serialize_validators",
deserialize_with = "borsh_deserialize_validators"
)]
validators: Vec<(PublicKey, u16)>,
evrf_public_keys: Vec<([u8; 32], Vec<u8>)>,
pub validators: Vec<(PublicKey, u16)>,
/// The eVRF public keys.
pub evrf_public_keys: Vec<([u8; 32], Vec<u8>)>,
}
mod _public_db {