diff --git a/.github/workflows/msrv.yml b/.github/workflows/msrv.yml index e1636482..acf0eb32 100644 --- a/.github/workflows/msrv.yml +++ b/.github/workflows/msrv.yml @@ -173,10 +173,11 @@ jobs: - name: Run cargo msrv on coordinator run: | - cargo msrv verify --manifest-path coordinator/tributary/tendermint/Cargo.toml - cargo msrv verify --manifest-path coordinator/tributary/Cargo.toml + cargo msrv verify --manifest-path coordinator/tributary-sdk/tendermint/Cargo.toml + cargo msrv verify --manifest-path coordinator/tributary-sdk/Cargo.toml cargo msrv verify --manifest-path coordinator/cosign/Cargo.toml cargo msrv verify --manifest-path coordinator/substrate/Cargo.toml + cargo msrv verify --manifest-path coordinator/tributary/Cargo.toml cargo msrv verify --manifest-path coordinator/p2p/Cargo.toml cargo msrv verify --manifest-path coordinator/p2p/libp2p/Cargo.toml cargo msrv verify --manifest-path coordinator/Cargo.toml diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 0c311b99..af93154e 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -60,9 +60,10 @@ jobs: -p serai-ethereum-processor \ -p serai-monero-processor \ -p tendermint-machine \ - -p tributary-chain \ + -p tributary-sdk \ -p serai-cosign \ -p serai-coordinator-substrate \ + -p serai-coordinator-tributary \ -p serai-coordinator-p2p \ -p serai-coordinator-libp2p-p2p \ -p serai-coordinator \ diff --git a/Cargo.lock b/Cargo.lock index ede4518f..f93ff6c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8331,6 +8331,7 @@ dependencies = [ "serai-coordinator-libp2p-p2p", "serai-coordinator-p2p", "serai-coordinator-substrate", + "serai-coordinator-tributary", "serai-cosign", "serai-db", "serai-env", @@ -8338,7 +8339,7 @@ dependencies = [ "serai-processor-messages", "serai-task", "tokio", - "tributary-chain", + "tributary-sdk", "zalloc", "zeroize", ] @@ -8361,7 +8362,7 @@ dependencies = [ "serai-cosign", "serai-task", "tokio", - "tributary-chain", + "tributary-sdk", "void", "zeroize", ] @@ -8378,7 +8379,7 @@ dependencies = [ "serai-db", "serai-task", "tokio", - "tributary-chain", + "tributary-sdk", ] [[package]] @@ -8422,6 +8423,27 @@ dependencies = [ "zeroize", ] +[[package]] +name = "serai-coordinator-tributary" +version = "0.1.0" +dependencies = [ + "blake2", + "borsh", + "ciphersuite", + "log", + "parity-scale-codec", + "rand_core", + "schnorr-signatures", + "serai-client", + "serai-coordinator-substrate", + "serai-cosign", + "serai-db", + "serai-processor-messages", + "serai-task", + "tributary-sdk", + "zeroize", +] + [[package]] name = "serai-cosign" version = "0.1.0" @@ -10975,7 +10997,7 @@ dependencies = [ ] [[package]] -name = "tributary-chain" +name = "tributary-sdk" version = "0.1.0" dependencies = [ "blake2", diff --git a/Cargo.toml b/Cargo.toml index 39507b16..f11d5644 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,10 +96,11 @@ members = [ "processor/ethereum", "processor/monero", - "coordinator/tributary/tendermint", - "coordinator/tributary", + "coordinator/tributary-sdk/tendermint", + "coordinator/tributary-sdk", "coordinator/cosign", "coordinator/substrate", + "coordinator/tributary", "coordinator/p2p", "coordinator/p2p/libp2p", "coordinator", diff --git a/coordinator/Cargo.toml b/coordinator/Cargo.toml index 38adbf15..2eec60c8 100644 --- a/coordinator/Cargo.toml +++ b/coordinator/Cargo.toml @@ -39,7 +39,7 @@ serai-task = { path = "../common/task", version = "0.1" } messages = { package = "serai-processor-messages", path = "../processor/messages" } message-queue = { package = "serai-message-queue", path = "../message-queue" } -tributary = { package = "tributary-chain", path = "./tributary" } +tributary-sdk = { path = "./tributary-sdk" } serai-client = { path = "../substrate/client", default-features = false, features = ["serai", "borsh"] } @@ -53,10 +53,11 @@ tokio = { version = "1", default-features = false, features = ["time", "sync", " serai-cosign = { path = "./cosign" } serai-coordinator-substrate = { path = "./substrate" } +serai-coordinator-tributary = { path = "./tributary" } serai-coordinator-p2p = { path = "./p2p" } serai-coordinator-libp2p-p2p = { path = "./p2p/libp2p" } [features] -longer-reattempts = [] # TODO +longer-reattempts = ["serai-coordinator-tributary/longer-reattempts"] parity-db = ["serai-db/parity-db"] rocksdb = ["serai-db/rocksdb"] diff --git a/coordinator/p2p/Cargo.toml b/coordinator/p2p/Cargo.toml index 7b7c055c..0e55e8e6 100644 --- a/coordinator/p2p/Cargo.toml +++ b/coordinator/p2p/Cargo.toml @@ -24,7 +24,7 @@ serai-db = { path = "../../common/db", version = "0.1" } serai-client = { path = "../../substrate/client", default-features = false, features = ["serai", "borsh"] } serai-cosign = { path = "../cosign" } -tributary = { package = "tributary-chain", path = "../tributary" } +tributary-sdk = { path = "../tributary-sdk" } futures-lite = { version = "2", default-features = false, features = ["std"] } tokio = { version = "1", default-features = false, features = ["sync", "macros"] } diff --git a/coordinator/p2p/libp2p/Cargo.toml b/coordinator/p2p/libp2p/Cargo.toml index 8916d961..7a393588 100644 --- a/coordinator/p2p/libp2p/Cargo.toml +++ b/coordinator/p2p/libp2p/Cargo.toml @@ -31,7 +31,7 @@ borsh = { version = "1", default-features = false, features = ["std", "derive", serai-client = { path = "../../../substrate/client", default-features = false, features = ["serai", "borsh"] } serai-cosign = { path = "../../cosign" } -tributary = { package = "tributary-chain", path = "../../tributary" } +tributary-sdk = { path = "../../tributary-sdk" } void = { version = "1", default-features = false } futures-util = { version = "0.3", default-features = false, features = ["std"] } diff --git a/coordinator/p2p/libp2p/src/gossip.rs b/coordinator/p2p/libp2p/src/gossip.rs index f48c1c4e..f4ec666b 100644 --- a/coordinator/p2p/libp2p/src/gossip.rs +++ b/coordinator/p2p/libp2p/src/gossip.rs @@ -13,7 +13,7 @@ pub use libp2p::gossipsub::Event; use serai_cosign::SignedCosign; // Block size limit + 16 KB of space for signatures/metadata -pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384; +pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary_sdk::BLOCK_SIZE_LIMIT + 16384; const LIBP2P_PROTOCOL: &str = "/serai/coordinator/gossip/1.0.0"; const BASE_TOPIC: &str = "/"; @@ -42,9 +42,10 @@ pub(crate) type Behavior = Behaviour Behavior { // The latency used by the Tendermint protocol, used here as the gossip epoch duration // libp2p-rs defaults to 1 second, whereas ours will be ~2 - let heartbeat_interval = tributary::tendermint::LATENCY_TIME; + let heartbeat_interval = tributary_sdk::tendermint::LATENCY_TIME; // The amount of heartbeats which will occur within a single Tributary block - let heartbeats_per_block = tributary::tendermint::TARGET_BLOCK_TIME.div_ceil(heartbeat_interval); + let heartbeats_per_block = + tributary_sdk::tendermint::TARGET_BLOCK_TIME.div_ceil(heartbeat_interval); // libp2p-rs defaults to 5, whereas ours will be ~8 let heartbeats_to_keep = 2 * heartbeats_per_block; // libp2p-rs defaults to 3 whereas ours will be ~4 diff --git a/coordinator/p2p/libp2p/src/lib.rs b/coordinator/p2p/libp2p/src/lib.rs index d3f09e61..d92eae42 100644 --- a/coordinator/p2p/libp2p/src/lib.rs +++ b/coordinator/p2p/libp2p/src/lib.rs @@ -259,7 +259,7 @@ impl Libp2p { } } -impl tributary::P2p for Libp2p { +impl tributary_sdk::P2p for Libp2p { fn broadcast(&self, tributary: [u8; 32], message: Vec) -> impl Send + Future { async move { self diff --git a/coordinator/p2p/libp2p/src/ping.rs b/coordinator/p2p/libp2p/src/ping.rs index d579af05..2b9afa41 100644 --- a/coordinator/p2p/libp2p/src/ping.rs +++ b/coordinator/p2p/libp2p/src/ping.rs @@ -1,6 +1,6 @@ use core::time::Duration; -use tributary::tendermint::LATENCY_TIME; +use tributary_sdk::tendermint::LATENCY_TIME; use libp2p::ping::{self, Config, Behaviour}; pub use ping::Event; diff --git a/coordinator/p2p/src/heartbeat.rs b/coordinator/p2p/src/heartbeat.rs index 76d160ea..8a2f3220 100644 --- a/coordinator/p2p/src/heartbeat.rs +++ b/coordinator/p2p/src/heartbeat.rs @@ -5,7 +5,7 @@ use serai_client::validator_sets::primitives::{MAX_KEY_SHARES_PER_SET, Validator use futures_lite::FutureExt; -use tributary::{ReadWrite, TransactionTrait, Block, Tributary, TributaryReader}; +use tributary_sdk::{ReadWrite, TransactionTrait, Block, Tributary, TributaryReader}; use serai_db::*; use serai_task::ContinuallyRan; @@ -13,7 +13,8 @@ use serai_task::ContinuallyRan; use crate::{Heartbeat, Peer, P2p}; // Amount of blocks in a minute -const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize; +const BLOCKS_PER_MINUTE: usize = + (60 / (tributary_sdk::tendermint::TARGET_BLOCK_TIME / 1000)) as usize; /// The minimum amount of blocks to include/included within a batch, assuming there's blocks to /// include in the batch. @@ -29,7 +30,7 @@ pub const MIN_BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1; /// commit is `8 + (validators * 32) + (32 + (validators * 32))` (for the time, list of validators, /// and aggregate signature). Accordingly, this should be a safe over-estimate. pub const BATCH_SIZE_LIMIT: usize = MIN_BLOCKS_PER_BATCH * - (tributary::BLOCK_SIZE_LIMIT + 32 + ((MAX_KEY_SHARES_PER_SET as usize) * 128)); + (tributary_sdk::BLOCK_SIZE_LIMIT + 32 + ((MAX_KEY_SHARES_PER_SET as usize) * 128)); /// Sends a heartbeat to other validators on regular intervals informing them of our Tributary's /// tip. diff --git a/coordinator/p2p/src/lib.rs b/coordinator/p2p/src/lib.rs index 71eb8f2c..9bf245ca 100644 --- a/coordinator/p2p/src/lib.rs +++ b/coordinator/p2p/src/lib.rs @@ -10,7 +10,7 @@ use borsh::{BorshSerialize, BorshDeserialize}; use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet}; use serai_db::Db; -use tributary::{ReadWrite, TransactionTrait, Tributary, TributaryReader}; +use tributary_sdk::{ReadWrite, TransactionTrait, Tributary, TributaryReader}; use serai_cosign::{SignedCosign, Cosigning}; use tokio::sync::{mpsc, oneshot}; @@ -49,7 +49,9 @@ pub trait Peer<'a>: Send { } /// The representation of the P2P network. -pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotableCosigns { +pub trait P2p: + Send + Sync + Clone + tributary_sdk::P2p + serai_cosign::RequestNotableCosigns +{ /// The representation of a peer. type Peer<'a>: Peer<'a>; diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 71f73d65..0e2db23c 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -1,5 +1,5 @@ -use core::{marker::PhantomData, ops::Deref, future::Future, time::Duration}; -use std::{sync::Arc, collections::HashMap, time::Instant}; +use core::{ops::Deref, time::Duration}; +use std::{sync::Arc, time::Instant}; use zeroize::{Zeroize, Zeroizing}; use rand_core::{RngCore, OsRng}; @@ -13,23 +13,25 @@ use ciphersuite::{ use tokio::sync::mpsc; use scale::Encode; -use serai_client::{ - primitives::{PublicKey, SeraiAddress}, - validator_sets::primitives::{Session, ValidatorSet}, - Serai, -}; -use message_queue::{Service, Metadata, client::MessageQueue}; +use serai_client::{primitives::PublicKey, validator_sets::primitives::ValidatorSet, Serai}; +use message_queue::{Service, client::MessageQueue}; + +use tributary_sdk::Tributary; use serai_task::{Task, TaskHandle, ContinuallyRan}; use serai_cosign::{SignedCosign, Cosigning}; use serai_coordinator_substrate::{NewSetInformation, CanonicalEventStream, EphemeralEventStream}; +use serai_coordinator_tributary::{Transaction, ScanTributaryTask}; mod db; use db::*; mod tributary; -use tributary::{Transaction, ScanTributaryTask, ScanTributaryMessagesTask}; +use tributary::ScanTributaryMessagesTask; + +mod substrate; +use substrate::SubstrateTask; mod p2p { pub use serai_coordinator_p2p::*; @@ -44,8 +46,6 @@ mod p2p { static ALLOCATOR: zalloc::ZeroizingAlloc = zalloc::ZeroizingAlloc(std::alloc::System); -type Tributary

= ::tributary::Tributary; - async fn serai() -> Arc { const SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(10); const MAX_SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(300); @@ -111,7 +111,7 @@ async fn spawn_tributary( db: Db, message_queue: Arc, p2p: P, - p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary

)>, + p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary)>, set: NewSetInformation, serai_key: Zeroizing<::F>, ) { @@ -131,18 +131,11 @@ async fn spawn_tributary( let start_time = set.declaration_time + TRIBUTARY_START_TIME_DELAY; let mut tributary_validators = Vec::with_capacity(set.validators.len()); - let mut validators = Vec::with_capacity(set.validators.len()); - let mut total_weight = 0; - let mut validator_weights = HashMap::with_capacity(set.validators.len()); for (validator, weight) in set.validators.iter().copied() { let validator_key = ::read_G(&mut validator.0.as_slice()) .expect("Serai validator had an invalid public key"); - let validator = SeraiAddress::from(validator); let weight = u64::from(weight); tributary_validators.push((validator_key, weight)); - validators.push(validator); - total_weight += weight; - validator_weights.insert(validator, weight); } let tributary_db = tributary_db(set.set); @@ -165,161 +158,15 @@ async fn spawn_tributary( let (scan_tributary_task_def, scan_tributary_task) = Task::new(); tokio::spawn( - (ScanTributaryTask { - cosign_db: db.clone(), - tributary_db, - set: set.set, - validators, - total_weight, - validator_weights, - tributary: reader, - _p2p: PhantomData::

, - }) - // This is the only handle for this ScanTributaryMessagesTask, so when this task is dropped, it - // will be too - .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), + ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db, &set, reader) + // This is the only handle for this ScanTributaryMessagesTask, so when this task is dropped, + // it will be too + .continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]), ); tokio::spawn(tributary::run(db, set, tributary, scan_tributary_task)); } -struct SubstrateTask { - serai_key: Zeroizing<::F>, - db: Db, - message_queue: Arc, - p2p: P, - p2p_add_tributary: mpsc::UnboundedSender<(ValidatorSet, Tributary

)>, - p2p_retire_tributary: mpsc::UnboundedSender, -} - -impl ContinuallyRan for SubstrateTask

{ - fn run_iteration(&mut self) -> impl Send + Future> { - async move { - let mut made_progress = false; - - // Handle the Canonical events - for network in serai_client::primitives::NETWORKS { - loop { - let mut txn = self.db.txn(); - let Some(msg) = serai_coordinator_substrate::Canonical::try_recv(&mut txn, network) - else { - break; - }; - - match msg { - // TODO: Stop trying to confirm the DKG - messages::substrate::CoordinatorMessage::SetKeys { .. } => todo!("TODO"), - messages::substrate::CoordinatorMessage::SlashesReported { session } => { - let prior_retired = RetiredTributary::get(&txn, network); - let next_to_be_retired = - prior_retired.map(|session| Session(session.0 + 1)).unwrap_or(Session(0)); - assert_eq!(session, next_to_be_retired); - RetiredTributary::set(&mut txn, network, &session); - self - .p2p_retire_tributary - .send(ValidatorSet { network, session }) - .expect("p2p retire_tributary channel dropped?"); - } - messages::substrate::CoordinatorMessage::Block { .. } => {} - } - - let msg = messages::CoordinatorMessage::from(msg); - let metadata = Metadata { - from: Service::Coordinator, - to: Service::Processor(network), - intent: msg.intent(), - }; - let msg = borsh::to_vec(&msg).unwrap(); - // TODO: Make this fallible - self.message_queue.queue(metadata, msg).await; - txn.commit(); - made_progress = true; - } - } - - // Handle the NewSet events - loop { - let mut txn = self.db.txn(); - let Some(new_set) = serai_coordinator_substrate::NewSet::try_recv(&mut txn) else { break }; - - if let Some(historic_session) = new_set.set.session.0.checked_sub(2) { - // We should have retired this session if we're here - if RetiredTributary::get(&txn, new_set.set.network).map(|session| session.0) < - Some(historic_session) - { - /* - If we haven't, it's because we're processing the NewSet event before the retiry - event from the Canonical event stream. This happens if the Canonical event, and - then the NewSet event, is fired while we're already iterating over NewSet events. - - We break, dropping the txn, restoring this NewSet to the database, so we'll only - handle it once a future iteration of this loop handles the retiry event. - */ - break; - } - - /* - Queue this historical Tributary for deletion. - - We explicitly don't queue this upon Tributary retire, instead here, to give time to - investigate retired Tributaries if questions are raised post-retiry. This gives a - week (the duration of the following session) after the Tributary has been retired to - make a backup of the data directory for any investigations. - */ - TributaryCleanup::send( - &mut txn, - &ValidatorSet { network: new_set.set.network, session: Session(historic_session) }, - ); - } - - // Save this Tributary as active to the database - { - let mut active_tributaries = - ActiveTributaries::get(&txn).unwrap_or(Vec::with_capacity(1)); - active_tributaries.push(new_set.clone()); - ActiveTributaries::set(&mut txn, &active_tributaries); - } - - // Send GenerateKey to the processor - let msg = messages::key_gen::CoordinatorMessage::GenerateKey { - session: new_set.set.session, - threshold: new_set.threshold, - evrf_public_keys: new_set.evrf_public_keys.clone(), - }; - let msg = messages::CoordinatorMessage::from(msg); - let metadata = Metadata { - from: Service::Coordinator, - to: Service::Processor(new_set.set.network), - intent: msg.intent(), - }; - let msg = borsh::to_vec(&msg).unwrap(); - // TODO: Make this fallible - self.message_queue.queue(metadata, msg).await; - - // Commit the transaction for all of this - txn.commit(); - - // Now spawn the Tributary - // If we reboot after committing the txn, but before this is called, this will be called - // on boot - spawn_tributary( - self.db.clone(), - self.message_queue.clone(), - self.p2p.clone(), - &self.p2p_add_tributary, - new_set, - self.serai_key.clone(), - ) - .await; - - made_progress = true; - } - - Ok(made_progress) - } - } -} - #[tokio::main] async fn main() { // Override the panic handler with one which will panic if any tokio task panics diff --git a/coordinator/src/substrate.rs b/coordinator/src/substrate.rs new file mode 100644 index 00000000..7ea5c257 --- /dev/null +++ b/coordinator/src/substrate.rs @@ -0,0 +1,160 @@ +use core::future::Future; +use std::sync::Arc; + +use zeroize::Zeroizing; + +use ciphersuite::{Ciphersuite, Ristretto}; + +use tokio::sync::mpsc; + +use serai_db::{DbTxn, Db as DbTrait}; + +use serai_client::validator_sets::primitives::{Session, ValidatorSet}; +use message_queue::{Service, Metadata, client::MessageQueue}; + +use tributary_sdk::Tributary; + +use serai_task::ContinuallyRan; + +use serai_coordinator_tributary::Transaction; +use serai_coordinator_p2p::P2p; + +use crate::Db; + +pub(crate) struct SubstrateTask { + pub(crate) serai_key: Zeroizing<::F>, + pub(crate) db: Db, + pub(crate) message_queue: Arc, + pub(crate) p2p: P, + pub(crate) p2p_add_tributary: + mpsc::UnboundedSender<(ValidatorSet, Tributary)>, + pub(crate) p2p_retire_tributary: mpsc::UnboundedSender, +} + +impl ContinuallyRan for SubstrateTask

{ + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let mut made_progress = false; + + // Handle the Canonical events + for network in serai_client::primitives::NETWORKS { + loop { + let mut txn = self.db.txn(); + let Some(msg) = serai_coordinator_substrate::Canonical::try_recv(&mut txn, network) + else { + break; + }; + + match msg { + // TODO: Stop trying to confirm the DKG + messages::substrate::CoordinatorMessage::SetKeys { .. } => todo!("TODO"), + messages::substrate::CoordinatorMessage::SlashesReported { session } => { + let prior_retired = crate::db::RetiredTributary::get(&txn, network); + let next_to_be_retired = + prior_retired.map(|session| Session(session.0 + 1)).unwrap_or(Session(0)); + assert_eq!(session, next_to_be_retired); + crate::db::RetiredTributary::set(&mut txn, network, &session); + self + .p2p_retire_tributary + .send(ValidatorSet { network, session }) + .expect("p2p retire_tributary channel dropped?"); + } + messages::substrate::CoordinatorMessage::Block { .. } => {} + } + + let msg = messages::CoordinatorMessage::from(msg); + let metadata = Metadata { + from: Service::Coordinator, + to: Service::Processor(network), + intent: msg.intent(), + }; + let msg = borsh::to_vec(&msg).unwrap(); + // TODO: Make this fallible + self.message_queue.queue(metadata, msg).await; + txn.commit(); + made_progress = true; + } + } + + // Handle the NewSet events + loop { + let mut txn = self.db.txn(); + let Some(new_set) = serai_coordinator_substrate::NewSet::try_recv(&mut txn) else { break }; + + if let Some(historic_session) = new_set.set.session.0.checked_sub(2) { + // We should have retired this session if we're here + if crate::db::RetiredTributary::get(&txn, new_set.set.network).map(|session| session.0) < + Some(historic_session) + { + /* + If we haven't, it's because we're processing the NewSet event before the retiry + event from the Canonical event stream. This happens if the Canonical event, and + then the NewSet event, is fired while we're already iterating over NewSet events. + + We break, dropping the txn, restoring this NewSet to the database, so we'll only + handle it once a future iteration of this loop handles the retiry event. + */ + break; + } + + /* + Queue this historical Tributary for deletion. + + We explicitly don't queue this upon Tributary retire, instead here, to give time to + investigate retired Tributaries if questions are raised post-retiry. This gives a + week (the duration of the following session) after the Tributary has been retired to + make a backup of the data directory for any investigations. + */ + crate::db::TributaryCleanup::send( + &mut txn, + &ValidatorSet { network: new_set.set.network, session: Session(historic_session) }, + ); + } + + // Save this Tributary as active to the database + { + let mut active_tributaries = + crate::db::ActiveTributaries::get(&txn).unwrap_or(Vec::with_capacity(1)); + active_tributaries.push(new_set.clone()); + crate::db::ActiveTributaries::set(&mut txn, &active_tributaries); + } + + // Send GenerateKey to the processor + let msg = messages::key_gen::CoordinatorMessage::GenerateKey { + session: new_set.set.session, + threshold: new_set.threshold, + evrf_public_keys: new_set.evrf_public_keys.clone(), + }; + let msg = messages::CoordinatorMessage::from(msg); + let metadata = Metadata { + from: Service::Coordinator, + to: Service::Processor(new_set.set.network), + intent: msg.intent(), + }; + let msg = borsh::to_vec(&msg).unwrap(); + // TODO: Make this fallible + self.message_queue.queue(metadata, msg).await; + + // Commit the transaction for all of this + txn.commit(); + + // Now spawn the Tributary + // If we reboot after committing the txn, but before this is called, this will be called + // on boot + crate::spawn_tributary( + self.db.clone(), + self.message_queue.clone(), + self.p2p.clone(), + &self.p2p_add_tributary, + new_set, + self.serai_key.clone(), + ) + .await; + + made_progress = true; + } + + Ok(made_progress) + } + } +} diff --git a/coordinator/src/tributary/mod.rs b/coordinator/src/tributary.rs similarity index 94% rename from coordinator/src/tributary/mod.rs rename to coordinator/src/tributary.rs index 4ca3cbbe..4fb193b3 100644 --- a/coordinator/src/tributary/mod.rs +++ b/coordinator/src/tributary.rs @@ -5,7 +5,7 @@ use serai_db::{DbTxn, Db}; use serai_client::validator_sets::primitives::ValidatorSet; -use ::tributary::{ProvidedError, Tributary}; +use tributary_sdk::{ProvidedError, Tributary}; use serai_task::{TaskHandle, ContinuallyRan}; @@ -13,16 +13,9 @@ use message_queue::{Service, Metadata, client::MessageQueue}; use serai_cosign::Cosigning; use serai_coordinator_substrate::NewSetInformation; +use serai_coordinator_tributary::{Transaction, ProcessorMessages}; use serai_coordinator_p2p::P2p; -mod transaction; -pub use transaction::Transaction; - -mod db; - -mod scan; -pub(crate) use scan::ScanTributaryTask; - pub(crate) struct ScanTributaryMessagesTask { pub(crate) tributary_db: TD, pub(crate) set: ValidatorSet, @@ -35,7 +28,7 @@ impl ContinuallyRan for ScanTributaryMessagesTask { let mut made_progress = false; loop { let mut txn = self.tributary_db.txn(); - let Some(msg) = db::TributaryDb::try_recv_message(&mut txn, self.set) else { break }; + let Some(msg) = ProcessorMessages::try_recv(&mut txn, self.set) else { break }; let metadata = Metadata { from: Service::Coordinator, to: Service::Processor(self.set.network), @@ -152,7 +145,7 @@ pub(crate) async fn run( // Have the tributary scanner run as soon as there's a new block // This is wrapped in a timeout so we don't go too long without running the above code match tokio::time::timeout( - Duration::from_millis(::tributary::tendermint::TARGET_BLOCK_TIME.into()), + Duration::from_millis(tributary_sdk::tendermint::TARGET_BLOCK_TIME.into()), tributary.next_block_notification().await, ) .await diff --git a/coordinator/src/tributary/scan.rs b/coordinator/src/tributary/scan.rs deleted file mode 100644 index ac7fd43b..00000000 --- a/coordinator/src/tributary/scan.rs +++ /dev/null @@ -1,466 +0,0 @@ -use core::{marker::PhantomData, future::Future}; -use std::collections::HashMap; - -use ciphersuite::group::GroupEncoding; - -use serai_client::{ - primitives::SeraiAddress, - validator_sets::primitives::{ValidatorSet, Slash}, -}; - -use tributary::{ - Signed as TributarySigned, TransactionKind, TransactionTrait, - Transaction as TributaryTransaction, Block, TributaryReader, - tendermint::{ - tx::{TendermintTx, Evidence, decode_signed_message}, - TendermintNetwork, - }, -}; - -use serai_db::*; -use serai_task::ContinuallyRan; - -use messages::sign::VariantSignId; - -use serai_cosign::Cosigning; - -use crate::{ - p2p::P2p, - tributary::{ - db::*, - transaction::{SigningProtocolRound, Signed, Transaction}, - }, -}; - -struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> { - _p2p: PhantomData

, - cosign_db: &'a CD, - tributary_txn: &'a mut TDT, - set: ValidatorSet, - validators: &'a [SeraiAddress], - total_weight: u64, - validator_weights: &'a HashMap, - tributary: &'a TributaryReader, -} -impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { - fn potentially_start_cosign(&mut self) { - // Don't start a new cosigning instance if we're actively running one - if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() { - return; - } - - // Fetch the latest intended-to-be-cosigned block - let Some(latest_substrate_block_to_cosign) = - TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set) - else { - return; - }; - - // If it was already cosigned, return - if TributaryDb::cosigned(self.tributary_txn, self.set, latest_substrate_block_to_cosign) { - return; - } - - let Some(substrate_block_number) = - Cosigning::::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign) - else { - // This is a valid panic as we shouldn't be scanning this block if we didn't provide all - // Provided transactions within it, and the block to cosign is a Provided transaction - panic!("cosigning a block our cosigner didn't index") - }; - - // Mark us as actively cosigning - TributaryDb::start_cosigning( - self.tributary_txn, - self.set, - latest_substrate_block_to_cosign, - substrate_block_number, - ); - // Send the message for the processor to start signing - TributaryDb::send_message( - self.tributary_txn, - self.set, - messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { - session: self.set.session, - block_number: substrate_block_number, - block: latest_substrate_block_to_cosign, - }, - ); - } - fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) { - let signer = |signed: Signed| SeraiAddress(signed.signer.to_bytes()); - - if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() { - // Don't handle transactions from those fatally slashed - // TODO: The fact they can publish these TXs makes this a notable spam vector - if TributaryDb::is_fatally_slashed( - self.tributary_txn, - self.set, - SeraiAddress(signer.to_bytes()), - ) { - return; - } - } - - match tx { - // Accumulate this vote and fatally slash the participant if past the threshold - Transaction::RemoveParticipant { participant, signed } => { - let signer = signer(signed); - - // Check the participant voted to be removed actually exists - if !self.validators.iter().any(|validator| *validator == participant) { - TributaryDb::fatal_slash( - self.tributary_txn, - self.set, - signer, - "voted to remove non-existent participant", - ); - return; - } - - match TributaryDb::accumulate( - self.tributary_txn, - self.set, - self.validators, - self.total_weight, - block_number, - Topic::RemoveParticipant { participant }, - signer, - self.validator_weights[&signer], - &(), - ) { - DataSet::None => {} - DataSet::Participating(_) => { - TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove"); - } - }; - } - - // Send the participation to the processor - Transaction::DkgParticipation { participation, signed } => { - TributaryDb::send_message( - self.tributary_txn, - self.set, - messages::key_gen::CoordinatorMessage::Participation { - session: self.set.session, - participant: todo!("TODO"), - participation, - }, - ); - } - Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed } => { - // Accumulate the preprocesses into our own FROST attempt manager - todo!("TODO") - } - Transaction::DkgConfirmationShare { attempt, share, signed } => { - // Accumulate the shares into our own FROST attempt manager - todo!("TODO") - } - - Transaction::Cosign { substrate_block_hash } => { - // Update the latest intended-to-be-cosigned Substrate block - TributaryDb::set_latest_substrate_block_to_cosign( - self.tributary_txn, - self.set, - substrate_block_hash, - ); - // Start a new cosign if we aren't already working on one - self.potentially_start_cosign(); - } - Transaction::Cosigned { substrate_block_hash } => { - /* - We provide one Cosigned per Cosign transaction, but they have independent orders. This - means we may receive Cosigned before Cosign. In order to ensure we only start work on - not-yet-Cosigned cosigns, we flag all cosigned blocks as cosigned. Then, when we choose - the next block to work on, we won't if it's already been cosigned. - */ - TributaryDb::mark_cosigned(self.tributary_txn, self.set, substrate_block_hash); - - // If we aren't actively cosigning this block, return - // This occurs when we have Cosign TXs A, B, C, we received Cosigned for A and start on C, - // and then receive Cosigned for B - if TributaryDb::actively_cosigning(self.tributary_txn, self.set) != - Some(substrate_block_hash) - { - return; - } - - // Since this is the block we were cosigning, mark us as having finished cosigning - TributaryDb::finish_cosigning(self.tributary_txn, self.set); - - // Start working on the next cosign - self.potentially_start_cosign(); - } - Transaction::SubstrateBlock { hash } => { - // Whitelist all of the IDs this Substrate block causes to be signed - todo!("TODO") - } - Transaction::Batch { hash } => { - // Whitelist the signing of this batch, publishing our own preprocess - todo!("TODO") - } - - Transaction::SlashReport { slash_points, signed } => { - let signer = signer(signed); - - if slash_points.len() != self.validators.len() { - TributaryDb::fatal_slash( - self.tributary_txn, - self.set, - signer, - "slash report was for a distinct amount of signers", - ); - return; - } - - // Accumulate, and if past the threshold, calculate *the* slash report and start signing it - match TributaryDb::accumulate( - self.tributary_txn, - self.set, - self.validators, - self.total_weight, - block_number, - Topic::SlashReport, - signer, - self.validator_weights[&signer], - &slash_points, - ) { - DataSet::None => {} - DataSet::Participating(data_set) => { - // Find the median reported slashes for this validator - /* - TODO: This lets 34% perform a fatal slash. That shouldn't be allowed. We need - to accept slash reports for a period past the threshold, and only fatally slash if we - have a supermajority agree the slash should be fatal. If there isn't a supermajority, - but the median believe the slash should be fatal, we need to fallback to a large - constant. - - Also, TODO, each slash point should probably be considered as - `MAX_KEY_SHARES_PER_SET * BLOCK_TIME` seconds of downtime. As this time crosses - various thresholds (1 day, 3 days, etc), a multiplier should be attached. - */ - let mut median_slash_report = Vec::with_capacity(self.validators.len()); - for i in 0 .. self.validators.len() { - let mut this_validator = - data_set.values().map(|report| report[i]).collect::>(); - this_validator.sort_unstable(); - // Choose the median, where if there are two median values, the lower one is chosen - let median_index = if (this_validator.len() % 2) == 1 { - this_validator.len() / 2 - } else { - (this_validator.len() / 2) - 1 - }; - median_slash_report.push(this_validator[median_index]); - } - - // We only publish slashes for the `f` worst performers to: - // 1) Effect amnesty if there were network disruptions which affected everyone - // 2) Ensure the signing threshold doesn't have a disincentive to do their job - - // Find the worst performer within the signing threshold's slash points - let f = (self.validators.len() - 1) / 3; - let worst_validator_in_supermajority_slash_points = { - let mut sorted_slash_points = median_slash_report.clone(); - sorted_slash_points.sort_unstable(); - // This won't be a valid index if `f == 0`, which means we don't have any validators - // to slash - let index_of_first_validator_to_slash = self.validators.len() - f; - let index_of_worst_validator_in_supermajority = index_of_first_validator_to_slash - 1; - sorted_slash_points[index_of_worst_validator_in_supermajority] - }; - - // Perform the amortization - for slash_points in &mut median_slash_report { - *slash_points = - slash_points.saturating_sub(worst_validator_in_supermajority_slash_points) - } - let amortized_slash_report = median_slash_report; - - // Create the resulting slash report - let mut slash_report = vec![]; - for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) { - if points != 0 { - slash_report.push(Slash { key: validator.into(), points }); - } - } - assert!(slash_report.len() <= f); - - // Recognize the topic for signing the slash report - TributaryDb::recognize_topic( - self.tributary_txn, - self.set, - Topic::Sign { - id: VariantSignId::SlashReport, - attempt: 0, - round: SigningProtocolRound::Preprocess, - }, - ); - // Send the message for the processor to start signing - TributaryDb::send_message( - self.tributary_txn, - self.set, - messages::coordinator::CoordinatorMessage::SignSlashReport { - session: self.set.session, - report: slash_report, - }, - ); - } - }; - } - - Transaction::Sign { id, attempt, round, data, signed } => { - let topic = Topic::Sign { id, attempt, round }; - let signer = signer(signed); - - if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] { - TributaryDb::fatal_slash( - self.tributary_txn, - self.set, - signer, - "signer signed with a distinct amount of key shares than they had key shares", - ); - return; - } - - match TributaryDb::accumulate( - self.tributary_txn, - self.set, - self.validators, - self.total_weight, - block_number, - topic, - signer, - self.validator_weights[&signer], - &data, - ) { - DataSet::None => {} - DataSet::Participating(data_set) => { - let id = topic.sign_id(self.set).expect("Topic::Sign didn't have SignId"); - let flatten_data_set = |data_set| todo!("TODO"); - let data_set = flatten_data_set(data_set); - TributaryDb::send_message( - self.tributary_txn, - self.set, - match round { - SigningProtocolRound::Preprocess => { - messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set } - } - SigningProtocolRound::Share => { - messages::sign::CoordinatorMessage::Shares { id, shares: data_set } - } - }, - ) - } - }; - } - } - } - - fn handle_block(mut self, block_number: u64, block: Block) { - TributaryDb::start_of_block(self.tributary_txn, self.set, block_number); - - for tx in block.transactions { - match tx { - TributaryTransaction::Tendermint(TendermintTx::SlashEvidence(ev)) => { - // Since the evidence is on the chain, it will have already been validated - // We can just punish the signer - let data = match ev { - Evidence::ConflictingMessages(first, second) => (first, Some(second)), - Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None), - }; - let msgs = ( - decode_signed_message::>(&data.0).unwrap(), - if data.1.is_some() { - Some( - decode_signed_message::>(&data.1.unwrap()) - .unwrap(), - ) - } else { - None - }, - ); - - // Since anything with evidence is fundamentally faulty behavior, not just temporal - // errors, mark the node as fatally slashed - TributaryDb::fatal_slash( - self.tributary_txn, - self.set, - SeraiAddress(msgs.0.msg.sender), - &format!("invalid tendermint messages: {msgs:?}"), - ); - } - TributaryTransaction::Application(tx) => { - self.handle_application_tx(block_number, tx); - } - } - } - } -} - -pub(crate) struct ScanTributaryTask { - pub(crate) cosign_db: CD, - pub(crate) tributary_db: TD, - pub(crate) set: ValidatorSet, - pub(crate) validators: Vec, - pub(crate) total_weight: u64, - pub(crate) validator_weights: HashMap, - pub(crate) tributary: TributaryReader, - pub(crate) _p2p: PhantomData

, -} -impl ContinuallyRan for ScanTributaryTask { - fn run_iteration(&mut self) -> impl Send + Future> { - async move { - let (mut last_block_number, mut last_block_hash) = - TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set) - .unwrap_or((0, self.tributary.genesis())); - - let mut made_progress = false; - while let Some(next) = self.tributary.block_after(&last_block_hash) { - let block = self.tributary.block(&next).unwrap(); - let block_number = last_block_number + 1; - let block_hash = block.hash(); - - // Make sure we have all of the provided transactions for this block - for tx in &block.transactions { - let TransactionKind::Provided(order) = tx.kind() else { - continue; - }; - - // make sure we have all the provided txs in this block locally - if !self.tributary.locally_provided_txs_in_block(&block_hash, order) { - return Err(format!( - "didn't have the provided Transactions on-chain for set (ephemeral error): {:?}", - self.set - )); - } - } - - let mut tributary_txn = self.tributary_db.txn(); - (ScanBlock { - _p2p: PhantomData::

, - cosign_db: &self.cosign_db, - tributary_txn: &mut tributary_txn, - set: self.set, - validators: &self.validators, - total_weight: self.total_weight, - validator_weights: &self.validator_weights, - tributary: &self.tributary, - }) - .handle_block(block_number, block); - TributaryDb::set_last_handled_tributary_block( - &mut tributary_txn, - self.set, - block_number, - block_hash, - ); - last_block_number = block_number; - last_block_hash = block_hash; - tributary_txn.commit(); - - made_progress = true; - } - - Ok(made_progress) - } - } -} diff --git a/coordinator/src/tributary/transaction.rs b/coordinator/src/tributary/transaction.rs deleted file mode 100644 index 34528cb9..00000000 --- a/coordinator/src/tributary/transaction.rs +++ /dev/null @@ -1,340 +0,0 @@ -use core::{ops::Deref, fmt::Debug}; -use std::io; - -use zeroize::Zeroizing; -use rand_core::{RngCore, CryptoRng}; - -use blake2::{digest::typenum::U32, Digest, Blake2b}; -use ciphersuite::{ - group::{ff::Field, GroupEncoding}, - Ciphersuite, Ristretto, -}; -use schnorr::SchnorrSignature; - -use scale::Encode; -use borsh::{BorshSerialize, BorshDeserialize}; - -use serai_client::{primitives::SeraiAddress, validator_sets::primitives::MAX_KEY_SHARES_PER_SET}; - -use messages::sign::VariantSignId; - -use tributary::{ - ReadWrite, - transaction::{ - Signed as TributarySigned, TransactionError, TransactionKind, Transaction as TransactionTrait, - }, -}; - -/// The round this data is for, within a signing protocol. -#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)] -pub enum SigningProtocolRound { - /// A preprocess. - Preprocess, - /// A signature share. - Share, -} - -impl SigningProtocolRound { - fn nonce(&self) -> u32 { - match self { - SigningProtocolRound::Preprocess => 0, - SigningProtocolRound::Share => 1, - } - } -} - -/// `tributary::Signed` but without the nonce. -/// -/// All of our nonces are deterministic to the type of transaction and fields within. -#[derive(Clone, Copy, PartialEq, Eq, Debug)] -pub struct Signed { - /// The signer. - pub signer: ::G, - /// The signature. - pub signature: SchnorrSignature, -} - -impl BorshSerialize for Signed { - fn serialize(&self, writer: &mut W) -> Result<(), io::Error> { - writer.write_all(self.signer.to_bytes().as_ref())?; - self.signature.write(writer) - } -} -impl BorshDeserialize for Signed { - fn deserialize_reader(reader: &mut R) -> Result { - let signer = Ristretto::read_G(reader)?; - let signature = SchnorrSignature::read(reader)?; - Ok(Self { signer, signature }) - } -} - -impl Signed { - /// Provide a nonce to convert a `Signed` into a `tributary::Signed`. - fn nonce(&self, nonce: u32) -> TributarySigned { - TributarySigned { signer: self.signer, nonce, signature: self.signature } - } -} - -/// The Tributary transaction definition used by Serai -#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] -pub enum Transaction { - /// A vote to remove a participant for invalid behavior - RemoveParticipant { - /// The participant to remove - participant: SeraiAddress, - /// The transaction's signer and signature - signed: Signed, - }, - - /// A participation in the DKG - DkgParticipation { - participation: Vec, - /// The transaction's signer and signature - signed: Signed, - }, - /// The preprocess to confirm the DKG results on-chain - DkgConfirmationPreprocess { - /// The attempt number of this signing protocol - attempt: u32, - // The preprocess - preprocess: [u8; 64], - /// The transaction's signer and signature - signed: Signed, - }, - /// The signature share to confirm the DKG results on-chain - DkgConfirmationShare { - /// The attempt number of this signing protocol - attempt: u32, - // The signature share - share: [u8; 32], - /// The transaction's signer and signature - signed: Signed, - }, - - /// Intend to co-sign a finalized Substrate block - /// - /// When the time comes to start a new co-signing protocol, the most recent Substrate block will - /// be the one selected to be cosigned. - Cosign { - /// The hash of the Substrate block to sign - substrate_block_hash: [u8; 32], - }, - - /// The cosign for a Substrate block - /// - /// After producing this cosign, we need to start work on the latest intended-to-be cosigned - /// block. That requires agreement on when this cosign was produced, which we solve by embedding - /// this cosign on chain. - /// - /// We ideally don't have this transaction at all. The coordinator, without access to any of the - /// key shares, could observe the FROST signing session and determine a successful completion. - /// Unfortunately, that functionality is not present in modular-frost, so we do need to support - /// *some* asynchronous flow (where the processor or P2P network informs us of the successful - /// completion). - /// - /// If we use a `Provided` transaction, that requires everyone observe this cosign. - /// - /// If we use an `Unsigned` transaction, we can't verify the cosign signature inside - /// `Transaction::verify` unless we embedded the full `SignedCosign` on-chain. The issue is since - /// a Tributary is stateless with regards to the on-chain logic, including `Transaction::verify`, - /// we can't verify the signature against the group's public key unless we also include that (but - /// then we open a DoS where arbitrary group keys are specified to cause inclusion of arbitrary - /// blobs on chain). - /// - /// If we use a `Signed` transaction, we mitigate the DoS risk by having someone to fatally - /// slash. We have horrible performance though as for 100 validators, all 100 will publish this - /// transaction. - /// - /// We could use a signed `Unsigned` transaction, where it includes a signer and signature but - /// isn't technically a Signed transaction. This lets us de-duplicate the transaction premised on - /// its contents. - /// - /// The optimal choice is likely to use a `Provided` transaction. We don't actually need to - /// observe the produced cosign (which is ephemeral). As long as it's agreed the cosign in - /// question no longer needs to produced, which would mean the cosigning protocol at-large - /// cosigning the block in question, it'd be safe to provide this and move on to the next cosign. - Cosigned { substrate_block_hash: [u8; 32] }, - - /// Acknowledge a Substrate block - /// - /// This is provided after the block has been cosigned. - /// - /// With the acknowledgement of a Substrate block, we can whitelist all the `VariantSignId`s - /// resulting from its handling. - SubstrateBlock { - /// The hash of the Substrate block - hash: [u8; 32], - }, - - /// Acknowledge a Batch - /// - /// Once everyone has acknowledged the Batch, we can begin signing it. - Batch { - /// The hash of the Batch's serialization. - /// - /// Generally, we refer to a Batch by its ID/the hash of its instructions. Here, we want to - /// ensure consensus on the Batch, and achieving consensus on its hash is the most effective - /// way to do that. - hash: [u8; 32], - }, - - /// Data from a signing protocol. - Sign { - /// The ID of the object being signed - id: VariantSignId, - /// The attempt number of this signing protocol - attempt: u32, - /// The round this data is for, within the signing protocol - round: SigningProtocolRound, - /// The data itself - /// - /// There will be `n` blobs of data where `n` is the amount of key shares the validator sending - /// this transaction has. - data: Vec>, - /// The transaction's signer and signature - signed: Signed, - }, - - /// The local view of slashes observed by the transaction's sender - SlashReport { - /// The slash points accrued by each validator - slash_points: Vec, - /// The transaction's signer and signature - signed: Signed, - }, -} - -impl ReadWrite for Transaction { - fn read(reader: &mut R) -> io::Result { - borsh::from_reader(reader) - } - - fn write(&self, writer: &mut W) -> io::Result<()> { - borsh::to_writer(writer, self) - } -} - -impl TransactionTrait for Transaction { - fn kind(&self) -> TransactionKind { - match self { - Transaction::RemoveParticipant { participant, signed } => { - TransactionKind::Signed((b"RemoveParticipant", participant).encode(), signed.nonce(0)) - } - - Transaction::DkgParticipation { signed, .. } => { - TransactionKind::Signed(b"DkgParticipation".encode(), signed.nonce(0)) - } - Transaction::DkgConfirmationPreprocess { attempt, signed, .. } => { - TransactionKind::Signed((b"DkgConfirmation", attempt).encode(), signed.nonce(0)) - } - Transaction::DkgConfirmationShare { attempt, signed, .. } => { - TransactionKind::Signed((b"DkgConfirmation", attempt).encode(), signed.nonce(1)) - } - - Transaction::Cosign { .. } => TransactionKind::Provided("Cosign"), - Transaction::Cosigned { .. } => TransactionKind::Provided("Cosigned"), - // TODO: Provide this - Transaction::SubstrateBlock { .. } => TransactionKind::Provided("SubstrateBlock"), - // TODO: Provide this - Transaction::Batch { .. } => TransactionKind::Provided("Batch"), - - Transaction::Sign { id, attempt, round, signed, .. } => { - TransactionKind::Signed((b"Sign", id, attempt).encode(), signed.nonce(round.nonce())) - } - - Transaction::SlashReport { signed, .. } => { - TransactionKind::Signed(b"SlashReport".encode(), signed.nonce(0)) - } - } - } - - fn hash(&self) -> [u8; 32] { - let mut tx = ReadWrite::serialize(self); - if let TransactionKind::Signed(_, signed) = self.kind() { - // Make sure the part we're cutting off is the signature - assert_eq!(tx.drain((tx.len() - 64) ..).collect::>(), signed.signature.serialize()); - } - Blake2b::::digest(&tx).into() - } - - // This is a stateless verification which we use to enforce some size limits. - fn verify(&self) -> Result<(), TransactionError> { - #[allow(clippy::match_same_arms)] - match self { - // Fixed-length TX - Transaction::RemoveParticipant { .. } => {} - - // TODO: MAX_DKG_PARTICIPATION_LEN - Transaction::DkgParticipation { .. } => {} - // These are fixed-length TXs - Transaction::DkgConfirmationPreprocess { .. } | Transaction::DkgConfirmationShare { .. } => {} - - // Provided TXs - Transaction::Cosign { .. } | - Transaction::Cosigned { .. } | - Transaction::SubstrateBlock { .. } | - Transaction::Batch { .. } => {} - - Transaction::Sign { data, .. } => { - if data.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() { - Err(TransactionError::InvalidContent)? - } - // TODO: MAX_SIGN_LEN - } - - Transaction::SlashReport { slash_points, .. } => { - if slash_points.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() { - Err(TransactionError::InvalidContent)? - } - } - }; - Ok(()) - } -} - -impl Transaction { - // Sign a transaction - // - // Panics if signing a transaction type which isn't `TransactionKind::Signed` - pub fn sign( - &mut self, - rng: &mut R, - genesis: [u8; 32], - key: &Zeroizing<::F>, - ) { - fn signed(tx: &mut Transaction) -> &mut Signed { - #[allow(clippy::match_same_arms)] // This doesn't make semantic sense here - match tx { - Transaction::RemoveParticipant { ref mut signed, .. } | - Transaction::DkgParticipation { ref mut signed, .. } | - Transaction::DkgConfirmationPreprocess { ref mut signed, .. } => signed, - Transaction::DkgConfirmationShare { ref mut signed, .. } => signed, - - Transaction::Cosign { .. } => panic!("signing CosignSubstrateBlock"), - Transaction::Cosigned { .. } => panic!("signing Cosigned"), - Transaction::SubstrateBlock { .. } => panic!("signing SubstrateBlock"), - Transaction::Batch { .. } => panic!("signing Batch"), - - Transaction::Sign { ref mut signed, .. } => signed, - - Transaction::SlashReport { ref mut signed, .. } => signed, - } - } - - // Decide the nonce to sign with - let sig_nonce = Zeroizing::new(::F::random(rng)); - - { - // Set the signer and the nonce - let signed = signed(self); - signed.signer = Ristretto::generator() * key.deref(); - signed.signature.R = ::generator() * sig_nonce.deref(); - } - - // Get the signature hash (which now includes `R || A` making it valid as the challenge) - let sig_hash = self.sig_hash(genesis); - - // Sign the signature - signed(self).signature = SchnorrSignature::::sign(key, sig_nonce, sig_hash); - } -} diff --git a/coordinator/tributary-sdk/Cargo.toml b/coordinator/tributary-sdk/Cargo.toml new file mode 100644 index 00000000..be72ff0c --- /dev/null +++ b/coordinator/tributary-sdk/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "tributary-sdk" +version = "0.1.0" +description = "A micro-blockchain to provide consensus and ordering to P2P communication" +license = "AGPL-3.0-only" +repository = "https://github.com/serai-dex/serai/tree/develop/coordinator/tributary-sdk" +authors = ["Luke Parker "] +edition = "2021" +rust-version = "1.81" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[lints] +workspace = true + +[dependencies] +thiserror = { version = "2", default-features = false, features = ["std"] } + +subtle = { version = "^2", default-features = false, features = ["std"] } +zeroize = { version = "^1.5", default-features = false, features = ["std"] } + +rand = { version = "0.8", default-features = false, features = ["std"] } +rand_chacha = { version = "0.3", default-features = false, features = ["std"] } + +blake2 = { version = "0.10", default-features = false, features = ["std"] } +transcript = { package = "flexible-transcript", path = "../../crypto/transcript", default-features = false, features = ["std", "recommended"] } + +ciphersuite = { package = "ciphersuite", path = "../../crypto/ciphersuite", default-features = false, features = ["std", "ristretto"] } +schnorr = { package = "schnorr-signatures", path = "../../crypto/schnorr", default-features = false, features = ["std"] } + +hex = { version = "0.4", default-features = false, features = ["std"] } +log = { version = "0.4", default-features = false, features = ["std"] } + +serai-db = { path = "../../common/db" } + +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } +futures-util = { version = "0.3", default-features = false, features = ["std", "sink", "channel"] } +futures-channel = { version = "0.3", default-features = false, features = ["std", "sink"] } +tendermint = { package = "tendermint-machine", path = "./tendermint" } + +tokio = { version = "1", default-features = false, features = ["sync", "time", "rt"] } + +[dev-dependencies] +tokio = { version = "1", features = ["macros"] } + +[features] +tests = [] diff --git a/coordinator/tributary-sdk/LICENSE b/coordinator/tributary-sdk/LICENSE new file mode 100644 index 00000000..f684d027 --- /dev/null +++ b/coordinator/tributary-sdk/LICENSE @@ -0,0 +1,15 @@ +AGPL-3.0-only license + +Copyright (c) 2023 Luke Parker + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License Version 3 as +published by the Free Software Foundation. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +You should have received a copy of the GNU Affero General Public License +along with this program. If not, see . diff --git a/coordinator/tributary-sdk/README.md b/coordinator/tributary-sdk/README.md new file mode 100644 index 00000000..6fce976e --- /dev/null +++ b/coordinator/tributary-sdk/README.md @@ -0,0 +1,3 @@ +# Tributary + +A verifiable, ordered broadcast layer implemented as a BFT micro-blockchain. diff --git a/coordinator/tributary/src/block.rs b/coordinator/tributary-sdk/src/block.rs similarity index 100% rename from coordinator/tributary/src/block.rs rename to coordinator/tributary-sdk/src/block.rs diff --git a/coordinator/tributary/src/blockchain.rs b/coordinator/tributary-sdk/src/blockchain.rs similarity index 100% rename from coordinator/tributary/src/blockchain.rs rename to coordinator/tributary-sdk/src/blockchain.rs diff --git a/coordinator/tributary-sdk/src/lib.rs b/coordinator/tributary-sdk/src/lib.rs new file mode 100644 index 00000000..2e4a6115 --- /dev/null +++ b/coordinator/tributary-sdk/src/lib.rs @@ -0,0 +1,388 @@ +use core::{marker::PhantomData, fmt::Debug, future::Future}; +use std::{sync::Arc, io}; + +use zeroize::Zeroizing; + +use ciphersuite::{Ciphersuite, Ristretto}; + +use scale::Decode; +use futures_channel::mpsc::UnboundedReceiver; +use futures_util::{StreamExt, SinkExt}; +use ::tendermint::{ + ext::{BlockNumber, Commit, Block as BlockTrait, Network}, + SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender, + TendermintMachine, TendermintHandle, +}; + +pub use ::tendermint::Evidence; + +use serai_db::Db; + +use tokio::sync::RwLock; + +mod merkle; +pub(crate) use merkle::*; + +pub mod transaction; +pub use transaction::{TransactionError, Signed, TransactionKind, Transaction as TransactionTrait}; + +use crate::tendermint::tx::TendermintTx; + +mod provided; +pub(crate) use provided::*; +pub use provided::ProvidedError; + +mod block; +pub use block::*; + +mod blockchain; +pub(crate) use blockchain::*; + +mod mempool; +pub(crate) use mempool::*; + +pub mod tendermint; +pub(crate) use crate::tendermint::*; + +#[cfg(any(test, feature = "tests"))] +pub mod tests; + +/// Size limit for an individual transaction. +// This needs to be big enough to participate in a 101-of-150 eVRF DKG with each element taking +// `MAX_KEY_LEN`. This also needs to be big enough to pariticpate in signing 520 Bitcoin inputs +// with 49 key shares, and signing 120 Monero inputs with 49 key shares. +// TODO: Add a test for these properties +pub const TRANSACTION_SIZE_LIMIT: usize = 2_000_000; +/// Amount of transactions a single account may have in the mempool. +pub const ACCOUNT_MEMPOOL_LIMIT: u32 = 50; +/// Block size limit. +// This targets a growth limit of roughly 30 GB a day, under load, in order to prevent a malicious +// participant from flooding disks and causing out of space errors in order processes. +pub const BLOCK_SIZE_LIMIT: usize = 2_001_000; + +pub(crate) const TENDERMINT_MESSAGE: u8 = 0; +pub(crate) const TRANSACTION_MESSAGE: u8 = 1; + +#[allow(clippy::large_enum_variant)] +#[derive(Clone, PartialEq, Eq, Debug)] +pub enum Transaction { + Tendermint(TendermintTx), + Application(T), +} + +impl ReadWrite for Transaction { + fn read(reader: &mut R) -> io::Result { + let mut kind = [0]; + reader.read_exact(&mut kind)?; + match kind[0] { + 0 => { + let tx = TendermintTx::read(reader)?; + Ok(Transaction::Tendermint(tx)) + } + 1 => { + let tx = T::read(reader)?; + Ok(Transaction::Application(tx)) + } + _ => Err(io::Error::other("invalid transaction type")), + } + } + fn write(&self, writer: &mut W) -> io::Result<()> { + match self { + Transaction::Tendermint(tx) => { + writer.write_all(&[0])?; + tx.write(writer) + } + Transaction::Application(tx) => { + writer.write_all(&[1])?; + tx.write(writer) + } + } + } +} + +impl Transaction { + pub fn hash(&self) -> [u8; 32] { + match self { + Transaction::Tendermint(tx) => tx.hash(), + Transaction::Application(tx) => tx.hash(), + } + } + + pub fn kind(&self) -> TransactionKind { + match self { + Transaction::Tendermint(tx) => tx.kind(), + Transaction::Application(tx) => tx.kind(), + } + } +} + +/// An item which can be read and written. +pub trait ReadWrite: Sized { + fn read(reader: &mut R) -> io::Result; + fn write(&self, writer: &mut W) -> io::Result<()>; + + fn serialize(&self) -> Vec { + // BlockHeader is 64 bytes and likely the smallest item in this system + let mut buf = Vec::with_capacity(64); + self.write(&mut buf).unwrap(); + buf + } +} + +pub trait P2p: 'static + Send + Sync + Clone { + /// Broadcast a message to all other members of the Tributary with the specified genesis. + /// + /// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't + /// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based + /// deduplication to ensure a sane amount of load. + fn broadcast(&self, genesis: [u8; 32], msg: Vec) -> impl Send + Future; +} + +impl P2p for Arc

{ + fn broadcast(&self, genesis: [u8; 32], msg: Vec) -> impl Send + Future { + P::broadcast(self, genesis, msg) + } +} + +#[derive(Clone)] +pub struct Tributary { + db: D, + + genesis: [u8; 32], + network: TendermintNetwork, + + synced_block: Arc>>>, + synced_block_result: Arc>, + messages: Arc>>>, +} + +impl Tributary { + pub async fn new( + db: D, + genesis: [u8; 32], + start_time: u64, + key: Zeroizing<::F>, + validators: Vec<(::G, u64)>, + p2p: P, + ) -> Option { + log::info!("new Tributary with genesis {}", hex::encode(genesis)); + + let validators_vec = validators.iter().map(|validator| validator.0).collect::>(); + + let signer = Arc::new(Signer::new(genesis, key)); + let validators = Arc::new(Validators::new(genesis, validators)?); + + let mut blockchain = Blockchain::new(db.clone(), genesis, &validators_vec); + let block_number = BlockNumber(blockchain.block_number()); + + let start_time = if let Some(commit) = blockchain.commit(&blockchain.tip()) { + Commit::::decode(&mut commit.as_ref()).unwrap().end_time + } else { + start_time + }; + let proposal = TendermintBlock( + blockchain.build_block::>(&validators).serialize(), + ); + let blockchain = Arc::new(RwLock::new(blockchain)); + + let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; + + let TendermintHandle { synced_block, synced_block_result, messages, machine } = + TendermintMachine::new( + db.clone(), + network.clone(), + genesis, + block_number, + start_time, + proposal, + ) + .await; + tokio::spawn(machine.run()); + + Some(Self { + db, + genesis, + network, + synced_block: Arc::new(RwLock::new(synced_block)), + synced_block_result: Arc::new(RwLock::new(synced_block_result)), + messages: Arc::new(RwLock::new(messages)), + }) + } + + pub fn block_time() -> u32 { + TendermintNetwork::::block_time() + } + + pub fn genesis(&self) -> [u8; 32] { + self.genesis + } + + pub async fn block_number(&self) -> u64 { + self.network.blockchain.read().await.block_number() + } + pub async fn tip(&self) -> [u8; 32] { + self.network.blockchain.read().await.tip() + } + + pub fn reader(&self) -> TributaryReader { + TributaryReader(self.db.clone(), self.genesis, PhantomData) + } + + pub async fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> { + self.network.blockchain.write().await.provide_transaction(tx) + } + + pub async fn next_nonce( + &self, + signer: &::G, + order: &[u8], + ) -> Option { + self.network.blockchain.read().await.next_nonce(signer, order) + } + + // Returns Ok(true) if new, Ok(false) if an already present unsigned, or the error. + // Safe to be &self since the only meaningful usage of self is self.network.blockchain which + // successfully acquires its own write lock + pub async fn add_transaction(&self, tx: T) -> Result { + let tx = Transaction::Application(tx); + let mut to_broadcast = vec![TRANSACTION_MESSAGE]; + tx.write(&mut to_broadcast).unwrap(); + let res = self.network.blockchain.write().await.add_transaction::>( + true, + tx, + &self.network.signature_scheme(), + ); + if res == Ok(true) { + self.network.p2p.broadcast(self.genesis, to_broadcast).await; + } + res + } + + async fn sync_block_internal( + &self, + block: Block, + commit: Vec, + result: &mut UnboundedReceiver, + ) -> bool { + let (tip, block_number) = { + let blockchain = self.network.blockchain.read().await; + (blockchain.tip(), blockchain.block_number()) + }; + + if block.header.parent != tip { + log::debug!("told to sync a block whose parent wasn't our tip"); + return false; + } + + let block = TendermintBlock(block.serialize()); + let mut commit_ref = commit.as_ref(); + let Ok(commit) = Commit::>::decode(&mut commit_ref) else { + log::error!("sent an invalidly serialized commit"); + return false; + }; + // Storage DoS vector. We *could* truncate to solely the relevant portion, trying to save this, + // yet then we'd have to test the truncation was performed correctly. + if !commit_ref.is_empty() { + log::error!("sent an commit with additional data after it"); + return false; + } + if !self.network.verify_commit(block.id(), &commit) { + log::error!("sent an invalid commit"); + return false; + } + + let number = BlockNumber(block_number + 1); + self.synced_block.write().await.send(SyncedBlock { number, block, commit }).await.unwrap(); + result.next().await.unwrap() + } + + // Sync a block. + // TODO: Since we have a static validator set, we should only need the tail commit? + pub async fn sync_block(&self, block: Block, commit: Vec) -> bool { + let mut result = self.synced_block_result.write().await; + self.sync_block_internal(block, commit, &mut result).await + } + + // Return true if the message should be rebroadcasted. + pub async fn handle_message(&self, msg: &[u8]) -> bool { + match msg.first() { + Some(&TRANSACTION_MESSAGE) => { + let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else { + log::error!("received invalid transaction message"); + return false; + }; + + // TODO: Sync mempools with fellow peers + // Can we just rebroadcast transactions not included for at least two blocks? + let res = + self.network.blockchain.write().await.add_transaction::>( + false, + tx, + &self.network.signature_scheme(), + ); + log::debug!("received transaction message. valid new transaction: {res:?}"); + res == Ok(true) + } + + Some(&TENDERMINT_MESSAGE) => { + let Ok(msg) = + SignedMessageFor::>::decode::<&[u8]>(&mut &msg[1 ..]) + else { + log::error!("received invalid tendermint message"); + return false; + }; + + self.messages.write().await.send(msg).await.unwrap(); + false + } + + _ => false, + } + } + + /// Get a Future which will resolve once the next block has been added. + pub async fn next_block_notification( + &self, + ) -> impl Send + Sync + core::future::Future> { + let (tx, rx) = tokio::sync::oneshot::channel(); + self.network.blockchain.write().await.next_block_notifications.push_back(tx); + rx + } +} + +#[derive(Clone)] +pub struct TributaryReader(D, [u8; 32], PhantomData); +impl TributaryReader { + pub fn genesis(&self) -> [u8; 32] { + self.1 + } + + // Since these values are static once set, they can be safely read from the database without lock + // acquisition + pub fn block(&self, hash: &[u8; 32]) -> Option> { + Blockchain::::block_from_db(&self.0, self.1, hash) + } + pub fn commit(&self, hash: &[u8; 32]) -> Option> { + Blockchain::::commit_from_db(&self.0, self.1, hash) + } + pub fn parsed_commit(&self, hash: &[u8; 32]) -> Option> { + self.commit(hash).map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap()) + } + pub fn block_after(&self, hash: &[u8; 32]) -> Option<[u8; 32]> { + Blockchain::::block_after(&self.0, self.1, hash) + } + pub fn time_of_block(&self, hash: &[u8; 32]) -> Option { + self + .commit(hash) + .map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap().end_time) + } + + pub fn locally_provided_txs_in_block(&self, hash: &[u8; 32], order: &str) -> bool { + Blockchain::::locally_provided_txs_in_block(&self.0, &self.1, hash, order) + } + + // This isn't static, yet can be read with only minor discrepancy risks + pub fn tip(&self) -> [u8; 32] { + Blockchain::::tip_from_db(&self.0, self.1) + } +} diff --git a/coordinator/tributary/src/mempool.rs b/coordinator/tributary-sdk/src/mempool.rs similarity index 100% rename from coordinator/tributary/src/mempool.rs rename to coordinator/tributary-sdk/src/mempool.rs diff --git a/coordinator/tributary/src/merkle.rs b/coordinator/tributary-sdk/src/merkle.rs similarity index 100% rename from coordinator/tributary/src/merkle.rs rename to coordinator/tributary-sdk/src/merkle.rs diff --git a/coordinator/tributary/src/provided.rs b/coordinator/tributary-sdk/src/provided.rs similarity index 100% rename from coordinator/tributary/src/provided.rs rename to coordinator/tributary-sdk/src/provided.rs diff --git a/coordinator/tributary/src/tendermint/mod.rs b/coordinator/tributary-sdk/src/tendermint/mod.rs similarity index 100% rename from coordinator/tributary/src/tendermint/mod.rs rename to coordinator/tributary-sdk/src/tendermint/mod.rs diff --git a/coordinator/tributary/src/tendermint/tx.rs b/coordinator/tributary-sdk/src/tendermint/tx.rs similarity index 100% rename from coordinator/tributary/src/tendermint/tx.rs rename to coordinator/tributary-sdk/src/tendermint/tx.rs diff --git a/coordinator/tributary/src/tests/block.rs b/coordinator/tributary-sdk/src/tests/block.rs similarity index 100% rename from coordinator/tributary/src/tests/block.rs rename to coordinator/tributary-sdk/src/tests/block.rs diff --git a/coordinator/tributary/src/tests/blockchain.rs b/coordinator/tributary-sdk/src/tests/blockchain.rs similarity index 100% rename from coordinator/tributary/src/tests/blockchain.rs rename to coordinator/tributary-sdk/src/tests/blockchain.rs diff --git a/coordinator/tributary/src/tests/mempool.rs b/coordinator/tributary-sdk/src/tests/mempool.rs similarity index 100% rename from coordinator/tributary/src/tests/mempool.rs rename to coordinator/tributary-sdk/src/tests/mempool.rs diff --git a/coordinator/tributary/src/tests/merkle.rs b/coordinator/tributary-sdk/src/tests/merkle.rs similarity index 100% rename from coordinator/tributary/src/tests/merkle.rs rename to coordinator/tributary-sdk/src/tests/merkle.rs diff --git a/coordinator/tributary/src/tests/mod.rs b/coordinator/tributary-sdk/src/tests/mod.rs similarity index 100% rename from coordinator/tributary/src/tests/mod.rs rename to coordinator/tributary-sdk/src/tests/mod.rs diff --git a/coordinator/tributary/src/tests/p2p.rs b/coordinator/tributary-sdk/src/tests/p2p.rs similarity index 100% rename from coordinator/tributary/src/tests/p2p.rs rename to coordinator/tributary-sdk/src/tests/p2p.rs diff --git a/coordinator/tributary/src/tests/tendermint.rs b/coordinator/tributary-sdk/src/tests/tendermint.rs similarity index 100% rename from coordinator/tributary/src/tests/tendermint.rs rename to coordinator/tributary-sdk/src/tests/tendermint.rs diff --git a/coordinator/tributary/src/tests/transaction/mod.rs b/coordinator/tributary-sdk/src/tests/transaction/mod.rs similarity index 100% rename from coordinator/tributary/src/tests/transaction/mod.rs rename to coordinator/tributary-sdk/src/tests/transaction/mod.rs diff --git a/coordinator/tributary/src/tests/transaction/signed.rs b/coordinator/tributary-sdk/src/tests/transaction/signed.rs similarity index 100% rename from coordinator/tributary/src/tests/transaction/signed.rs rename to coordinator/tributary-sdk/src/tests/transaction/signed.rs diff --git a/coordinator/tributary/src/tests/transaction/tendermint.rs b/coordinator/tributary-sdk/src/tests/transaction/tendermint.rs similarity index 100% rename from coordinator/tributary/src/tests/transaction/tendermint.rs rename to coordinator/tributary-sdk/src/tests/transaction/tendermint.rs diff --git a/coordinator/tributary-sdk/src/transaction.rs b/coordinator/tributary-sdk/src/transaction.rs new file mode 100644 index 00000000..d7ff4092 --- /dev/null +++ b/coordinator/tributary-sdk/src/transaction.rs @@ -0,0 +1,218 @@ +use core::fmt::Debug; +use std::io; + +use zeroize::Zeroize; +use thiserror::Error; + +use blake2::{Digest, Blake2b512}; + +use ciphersuite::{ + group::{Group, GroupEncoding}, + Ciphersuite, Ristretto, +}; +use schnorr::SchnorrSignature; + +use crate::{TRANSACTION_SIZE_LIMIT, ReadWrite}; + +#[derive(Clone, PartialEq, Eq, Debug, Error)] +pub enum TransactionError { + /// Transaction exceeded the size limit. + #[error("transaction is too large")] + TooLargeTransaction, + /// Transaction's signer isn't a participant. + #[error("invalid signer")] + InvalidSigner, + /// Transaction's nonce isn't the prior nonce plus one. + #[error("invalid nonce")] + InvalidNonce, + /// Transaction's signature is invalid. + #[error("invalid signature")] + InvalidSignature, + /// Transaction's content is invalid. + #[error("transaction content is invalid")] + InvalidContent, + /// Transaction's signer has too many transactions in the mempool. + #[error("signer has too many transactions in the mempool")] + TooManyInMempool, + /// Provided Transaction added to mempool. + #[error("provided transaction added to mempool")] + ProvidedAddedToMempool, +} + +/// Data for a signed transaction. +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct Signed { + pub signer: ::G, + pub nonce: u32, + pub signature: SchnorrSignature, +} + +impl ReadWrite for Signed { + fn read(reader: &mut R) -> io::Result { + let signer = Ristretto::read_G(reader)?; + + let mut nonce = [0; 4]; + reader.read_exact(&mut nonce)?; + let nonce = u32::from_le_bytes(nonce); + if nonce >= (u32::MAX - 1) { + Err(io::Error::other("nonce exceeded limit"))?; + } + + let mut signature = SchnorrSignature::::read(reader)?; + if signature.R.is_identity().into() { + // Anyone malicious could remove this and try to find zero signatures + // We should never produce zero signatures though meaning this should never come up + // If it does somehow come up, this is a decent courtesy + signature.zeroize(); + Err(io::Error::other("signature nonce was identity"))?; + } + + Ok(Signed { signer, nonce, signature }) + } + + fn write(&self, writer: &mut W) -> io::Result<()> { + // This is either an invalid signature or a private key leak + if self.signature.R.is_identity().into() { + Err(io::Error::other("signature nonce was identity"))?; + } + writer.write_all(&self.signer.to_bytes())?; + writer.write_all(&self.nonce.to_le_bytes())?; + self.signature.write(writer) + } +} + +impl Signed { + pub fn read_without_nonce(reader: &mut R, nonce: u32) -> io::Result { + let signer = Ristretto::read_G(reader)?; + + let mut signature = SchnorrSignature::::read(reader)?; + if signature.R.is_identity().into() { + // Anyone malicious could remove this and try to find zero signatures + // We should never produce zero signatures though meaning this should never come up + // If it does somehow come up, this is a decent courtesy + signature.zeroize(); + Err(io::Error::other("signature nonce was identity"))?; + } + + Ok(Signed { signer, nonce, signature }) + } + + pub fn write_without_nonce(&self, writer: &mut W) -> io::Result<()> { + // This is either an invalid signature or a private key leak + if self.signature.R.is_identity().into() { + Err(io::Error::other("signature nonce was identity"))?; + } + writer.write_all(&self.signer.to_bytes())?; + self.signature.write(writer) + } +} + +#[allow(clippy::large_enum_variant)] +#[derive(Clone, PartialEq, Eq, Debug)] +pub enum TransactionKind { + /// This transaction should be provided by every validator, in an exact order. + /// + /// The contained static string names the orderer to use. This allows two distinct provided + /// transaction kinds, without a synchronized order, to be ordered within their own kind without + /// requiring ordering with each other. + /// + /// The only malleability is in when this transaction appears on chain. The block producer will + /// include it when they have it. Block verification will fail for validators without it. + /// + /// If a supermajority of validators produce a commit for a block with a provided transaction + /// which isn't locally held, the block will be added to the local chain. When the transaction is + /// locally provided, it will be compared for correctness to the on-chain version + /// + /// In order to ensure TXs aren't accidentally provided multiple times, all provided transactions + /// must have a unique hash which is also unique to all Unsigned transactions. + Provided(&'static str), + + /// An unsigned transaction, only able to be included by the block producer. + /// + /// Once an Unsigned transaction is included on-chain, it may not be included again. In order to + /// have multiple Unsigned transactions with the same values included on-chain, some distinct + /// nonce must be included in order to cause a distinct hash. + /// + /// The hash must also be unique with all Provided transactions. + Unsigned, + + /// A signed transaction. + Signed(Vec, Signed), +} + +// TODO: Should this be renamed TransactionTrait now that a literal Transaction exists? +// Or should the literal Transaction be renamed to Event? +pub trait Transaction: 'static + Send + Sync + Clone + Eq + Debug + ReadWrite { + /// Return what type of transaction this is. + fn kind(&self) -> TransactionKind; + + /// Return the hash of this transaction. + /// + /// The hash must NOT commit to the signature. + fn hash(&self) -> [u8; 32]; + + /// Perform transaction-specific verification. + fn verify(&self) -> Result<(), TransactionError>; + + /// Obtain the challenge for this transaction's signature. + /// + /// Do not override this unless you know what you're doing. + /// + /// Panics if called on non-signed transactions. + fn sig_hash(&self, genesis: [u8; 32]) -> ::F { + match self.kind() { + TransactionKind::Signed(order, Signed { signature, .. }) => { + ::F::from_bytes_mod_order_wide( + &Blake2b512::digest( + [ + b"Tributary Signed Transaction", + genesis.as_ref(), + &self.hash(), + order.as_ref(), + signature.R.to_bytes().as_ref(), + ] + .concat(), + ) + .into(), + ) + } + _ => panic!("sig_hash called on non-signed transaction"), + } + } +} + +pub trait GAIN: FnMut(&::G, &[u8]) -> Option {} +impl::G, &[u8]) -> Option> GAIN for F {} + +pub(crate) fn verify_transaction( + tx: &T, + genesis: [u8; 32], + get_and_increment_nonce: &mut F, +) -> Result<(), TransactionError> { + if tx.serialize().len() > TRANSACTION_SIZE_LIMIT { + Err(TransactionError::TooLargeTransaction)?; + } + + tx.verify()?; + + match tx.kind() { + TransactionKind::Provided(_) | TransactionKind::Unsigned => {} + TransactionKind::Signed(order, Signed { signer, nonce, signature }) => { + if let Some(next_nonce) = get_and_increment_nonce(&signer, &order) { + if nonce != next_nonce { + Err(TransactionError::InvalidNonce)?; + } + } else { + // Not a participant + Err(TransactionError::InvalidSigner)?; + } + + // TODO: Use a batch verification here + if !signature.verify(signer, tx.sig_hash(genesis)) { + Err(TransactionError::InvalidSignature)?; + } + } + } + + Ok(()) +} diff --git a/coordinator/tributary/tendermint/Cargo.toml b/coordinator/tributary-sdk/tendermint/Cargo.toml similarity index 100% rename from coordinator/tributary/tendermint/Cargo.toml rename to coordinator/tributary-sdk/tendermint/Cargo.toml diff --git a/coordinator/tributary/tendermint/LICENSE b/coordinator/tributary-sdk/tendermint/LICENSE similarity index 100% rename from coordinator/tributary/tendermint/LICENSE rename to coordinator/tributary-sdk/tendermint/LICENSE diff --git a/coordinator/tributary/tendermint/README.md b/coordinator/tributary-sdk/tendermint/README.md similarity index 100% rename from coordinator/tributary/tendermint/README.md rename to coordinator/tributary-sdk/tendermint/README.md diff --git a/coordinator/tributary/tendermint/src/block.rs b/coordinator/tributary-sdk/tendermint/src/block.rs similarity index 100% rename from coordinator/tributary/tendermint/src/block.rs rename to coordinator/tributary-sdk/tendermint/src/block.rs diff --git a/coordinator/tributary/tendermint/src/ext.rs b/coordinator/tributary-sdk/tendermint/src/ext.rs similarity index 100% rename from coordinator/tributary/tendermint/src/ext.rs rename to coordinator/tributary-sdk/tendermint/src/ext.rs diff --git a/coordinator/tributary/tendermint/src/lib.rs b/coordinator/tributary-sdk/tendermint/src/lib.rs similarity index 100% rename from coordinator/tributary/tendermint/src/lib.rs rename to coordinator/tributary-sdk/tendermint/src/lib.rs diff --git a/coordinator/tributary/tendermint/src/message_log.rs b/coordinator/tributary-sdk/tendermint/src/message_log.rs similarity index 100% rename from coordinator/tributary/tendermint/src/message_log.rs rename to coordinator/tributary-sdk/tendermint/src/message_log.rs diff --git a/coordinator/tributary/tendermint/src/round.rs b/coordinator/tributary-sdk/tendermint/src/round.rs similarity index 100% rename from coordinator/tributary/tendermint/src/round.rs rename to coordinator/tributary-sdk/tendermint/src/round.rs diff --git a/coordinator/tributary/tendermint/src/time.rs b/coordinator/tributary-sdk/tendermint/src/time.rs similarity index 100% rename from coordinator/tributary/tendermint/src/time.rs rename to coordinator/tributary-sdk/tendermint/src/time.rs diff --git a/coordinator/tributary/tendermint/tests/ext.rs b/coordinator/tributary-sdk/tendermint/tests/ext.rs similarity index 100% rename from coordinator/tributary/tendermint/tests/ext.rs rename to coordinator/tributary-sdk/tendermint/tests/ext.rs diff --git a/coordinator/tributary/Cargo.toml b/coordinator/tributary/Cargo.toml index d88c3b33..3e374bc0 100644 --- a/coordinator/tributary/Cargo.toml +++ b/coordinator/tributary/Cargo.toml @@ -1,11 +1,13 @@ [package] -name = "tributary-chain" +name = "serai-coordinator-tributary" version = "0.1.0" -description = "A micro-blockchain to provide consensus and ordering to P2P communication" +description = "The Tributary used by the Serai Coordinator" license = "AGPL-3.0-only" repository = "https://github.com/serai-dex/serai/tree/develop/coordinator/tributary" authors = ["Luke Parker "] +keywords = [] edition = "2021" +publish = false rust-version = "1.81" [package.metadata.docs.rs] @@ -16,34 +18,29 @@ rustdoc-args = ["--cfg", "docsrs"] workspace = true [dependencies] -thiserror = { version = "2", default-features = false, features = ["std"] } - -subtle = { version = "^2", default-features = false, features = ["std"] } zeroize = { version = "^1.5", default-features = false, features = ["std"] } - -rand = { version = "0.8", default-features = false, features = ["std"] } -rand_chacha = { version = "0.3", default-features = false, features = ["std"] } +rand_core = { version = "0.6", default-features = false, features = ["std"] } blake2 = { version = "0.10", default-features = false, features = ["std"] } -transcript = { package = "flexible-transcript", path = "../../crypto/transcript", default-features = false, features = ["std", "recommended"] } - -ciphersuite = { package = "ciphersuite", path = "../../crypto/ciphersuite", default-features = false, features = ["std", "ristretto"] } +ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] } schnorr = { package = "schnorr-signatures", path = "../../crypto/schnorr", default-features = false, features = ["std"] } -hex = { version = "0.4", default-features = false, features = ["std"] } -log = { version = "0.4", default-features = false, features = ["std"] } +scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } +borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] } + +serai-client = { path = "../../substrate/client", default-features = false, features = ["serai", "borsh"] } serai-db = { path = "../../common/db" } +serai-task = { path = "../../common/task", version = "0.1" } -scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] } -futures-util = { version = "0.3", default-features = false, features = ["std", "sink", "channel"] } -futures-channel = { version = "0.3", default-features = false, features = ["std", "sink"] } -tendermint = { package = "tendermint-machine", path = "./tendermint" } +tributary-sdk = { path = "../tributary-sdk" } -tokio = { version = "1", default-features = false, features = ["sync", "time", "rt"] } +serai-cosign = { path = "../cosign" } +serai-coordinator-substrate = { path = "../substrate" } -[dev-dependencies] -tokio = { version = "1", features = ["macros"] } +messages = { package = "serai-processor-messages", path = "../../processor/messages" } + +log = { version = "0.4", default-features = false, features = ["std"] } [features] -tests = [] +longer-reattempts = [] diff --git a/coordinator/tributary/LICENSE b/coordinator/tributary/LICENSE index f684d027..621233a9 100644 --- a/coordinator/tributary/LICENSE +++ b/coordinator/tributary/LICENSE @@ -1,6 +1,6 @@ AGPL-3.0-only license -Copyright (c) 2023 Luke Parker +Copyright (c) 2023-2025 Luke Parker This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License Version 3 as diff --git a/coordinator/tributary/README.md b/coordinator/tributary/README.md index 6fce976e..384b8f97 100644 --- a/coordinator/tributary/README.md +++ b/coordinator/tributary/README.md @@ -1,3 +1,4 @@ -# Tributary +# Serai Coordinator Tributary -A verifiable, ordered broadcast layer implemented as a BFT micro-blockchain. +The Tributary used by the Serai Coordinator. This includes the `Transaction` +definition and the code to handle blocks added on-chain. diff --git a/coordinator/src/tributary/db.rs b/coordinator/tributary/src/db.rs similarity index 97% rename from coordinator/src/tributary/db.rs rename to coordinator/tributary/src/db.rs index 99fbe69a..87567846 100644 --- a/coordinator/src/tributary/db.rs +++ b/coordinator/tributary/src/db.rs @@ -9,7 +9,7 @@ use messages::sign::{VariantSignId, SignId}; use serai_db::*; -use crate::tributary::transaction::SigningProtocolRound; +use crate::transaction::SigningProtocolRound; /// A topic within the database which the group participates in #[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)] @@ -167,6 +167,9 @@ impl Topic { } } +pub(crate) trait Borshy: BorshSerialize + BorshDeserialize {} +impl Borshy for T {} + /// The resulting data set from an accumulation pub(crate) enum DataSet { /// Accumulating this did not produce a data set to act on @@ -176,9 +179,6 @@ pub(crate) enum DataSet { Participating(HashMap), } -trait Borshy: BorshSerialize + BorshDeserialize {} -impl Borshy for T {} - create_db!( CoordinatorTributary { // The last handled tributary block's (number, hash) @@ -389,12 +389,12 @@ impl TributaryDb { // 5 minutes #[cfg(not(feature = "longer-reattempts"))] const BASE_REATTEMPT_DELAY: u32 = - (5u32 * 60 * 1000).div_ceil(tributary::tendermint::TARGET_BLOCK_TIME); + (5u32 * 60 * 1000).div_ceil(tributary_sdk::tendermint::TARGET_BLOCK_TIME); // 10 minutes, intended for latent environments like the GitHub CI #[cfg(feature = "longer-reattempts")] const BASE_REATTEMPT_DELAY: u32 = - (10u32 * 60 * 1000).div_ceil(tributary::tendermint::TARGET_BLOCK_TIME); + (10u32 * 60 * 1000).div_ceil(tributary_sdk::tendermint::TARGET_BLOCK_TIME); // Linearly scale the time for the protocol with the attempt number let blocks_till_reattempt = u64::from(attempt * BASE_REATTEMPT_DELAY); @@ -446,11 +446,4 @@ impl TributaryDb { ) { ProcessorMessages::send(txn, set, &message.into()); } - - pub(crate) fn try_recv_message( - txn: &mut impl DbTxn, - set: ValidatorSet, - ) -> Option { - ProcessorMessages::try_recv(txn, set) - } } diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index 2e4a6115..9b059820 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -1,388 +1,513 @@ -use core::{marker::PhantomData, fmt::Debug, future::Future}; -use std::{sync::Arc, io}; +#![cfg_attr(docsrs, feature(doc_auto_cfg))] +#![doc = include_str!("../README.md")] +#![deny(missing_docs)] -use zeroize::Zeroizing; +use core::{marker::PhantomData, future::Future}; +use std::collections::HashMap; -use ciphersuite::{Ciphersuite, Ristretto}; +use ciphersuite::group::GroupEncoding; -use scale::Decode; -use futures_channel::mpsc::UnboundedReceiver; -use futures_util::{StreamExt, SinkExt}; -use ::tendermint::{ - ext::{BlockNumber, Commit, Block as BlockTrait, Network}, - SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender, - TendermintMachine, TendermintHandle, +use serai_client::{ + primitives::SeraiAddress, + validator_sets::primitives::{ValidatorSet, Slash}, }; -pub use ::tendermint::Evidence; +use serai_db::*; +use serai_task::ContinuallyRan; -use serai_db::Db; +use tributary_sdk::{ + tendermint::{ + tx::{TendermintTx, Evidence, decode_signed_message}, + TendermintNetwork, + }, + Signed as TributarySigned, TransactionKind, TransactionTrait, + Transaction as TributaryTransaction, Block, TributaryReader, P2p, +}; -use tokio::sync::RwLock; +use serai_cosign::Cosigning; +use serai_coordinator_substrate::NewSetInformation; -mod merkle; -pub(crate) use merkle::*; +use messages::sign::VariantSignId; -pub mod transaction; -pub use transaction::{TransactionError, Signed, TransactionKind, Transaction as TransactionTrait}; +mod transaction; +pub(crate) use transaction::{SigningProtocolRound, Signed}; +pub use transaction::Transaction; -use crate::tendermint::tx::TendermintTx; +mod db; +use db::*; -mod provided; -pub(crate) use provided::*; -pub use provided::ProvidedError; - -mod block; -pub use block::*; - -mod blockchain; -pub(crate) use blockchain::*; - -mod mempool; -pub(crate) use mempool::*; - -pub mod tendermint; -pub(crate) use crate::tendermint::*; - -#[cfg(any(test, feature = "tests"))] -pub mod tests; - -/// Size limit for an individual transaction. -// This needs to be big enough to participate in a 101-of-150 eVRF DKG with each element taking -// `MAX_KEY_LEN`. This also needs to be big enough to pariticpate in signing 520 Bitcoin inputs -// with 49 key shares, and signing 120 Monero inputs with 49 key shares. -// TODO: Add a test for these properties -pub const TRANSACTION_SIZE_LIMIT: usize = 2_000_000; -/// Amount of transactions a single account may have in the mempool. -pub const ACCOUNT_MEMPOOL_LIMIT: u32 = 50; -/// Block size limit. -// This targets a growth limit of roughly 30 GB a day, under load, in order to prevent a malicious -// participant from flooding disks and causing out of space errors in order processes. -pub const BLOCK_SIZE_LIMIT: usize = 2_001_000; - -pub(crate) const TENDERMINT_MESSAGE: u8 = 0; -pub(crate) const TRANSACTION_MESSAGE: u8 = 1; - -#[allow(clippy::large_enum_variant)] -#[derive(Clone, PartialEq, Eq, Debug)] -pub enum Transaction { - Tendermint(TendermintTx), - Application(T), +/// Messages to send to the Processors. +pub struct ProcessorMessages; +impl ProcessorMessages { + /// Try to receive a message to send to a Processor. + pub fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option { + db::ProcessorMessages::try_recv(txn, set) + } } -impl ReadWrite for Transaction { - fn read(reader: &mut R) -> io::Result { - let mut kind = [0]; - reader.read_exact(&mut kind)?; - match kind[0] { - 0 => { - let tx = TendermintTx::read(reader)?; - Ok(Transaction::Tendermint(tx)) - } - 1 => { - let tx = T::read(reader)?; - Ok(Transaction::Application(tx)) - } - _ => Err(io::Error::other("invalid transaction type")), +struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> { + _td: PhantomData, + _p2p: PhantomData

, + cosign_db: &'a CD, + tributary_txn: &'a mut TDT, + set: ValidatorSet, + validators: &'a [SeraiAddress], + total_weight: u64, + validator_weights: &'a HashMap, +} +impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> { + fn potentially_start_cosign(&mut self) { + // Don't start a new cosigning instance if we're actively running one + if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() { + return; } - } - fn write(&self, writer: &mut W) -> io::Result<()> { - match self { - Transaction::Tendermint(tx) => { - writer.write_all(&[0])?; - tx.write(writer) - } - Transaction::Application(tx) => { - writer.write_all(&[1])?; - tx.write(writer) - } - } - } -} -impl Transaction { - pub fn hash(&self) -> [u8; 32] { - match self { - Transaction::Tendermint(tx) => tx.hash(), - Transaction::Application(tx) => tx.hash(), - } - } - - pub fn kind(&self) -> TransactionKind { - match self { - Transaction::Tendermint(tx) => tx.kind(), - Transaction::Application(tx) => tx.kind(), - } - } -} - -/// An item which can be read and written. -pub trait ReadWrite: Sized { - fn read(reader: &mut R) -> io::Result; - fn write(&self, writer: &mut W) -> io::Result<()>; - - fn serialize(&self) -> Vec { - // BlockHeader is 64 bytes and likely the smallest item in this system - let mut buf = Vec::with_capacity(64); - self.write(&mut buf).unwrap(); - buf - } -} - -pub trait P2p: 'static + Send + Sync + Clone { - /// Broadcast a message to all other members of the Tributary with the specified genesis. - /// - /// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't - /// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based - /// deduplication to ensure a sane amount of load. - fn broadcast(&self, genesis: [u8; 32], msg: Vec) -> impl Send + Future; -} - -impl P2p for Arc

{ - fn broadcast(&self, genesis: [u8; 32], msg: Vec) -> impl Send + Future { - P::broadcast(self, genesis, msg) - } -} - -#[derive(Clone)] -pub struct Tributary { - db: D, - - genesis: [u8; 32], - network: TendermintNetwork, - - synced_block: Arc>>>, - synced_block_result: Arc>, - messages: Arc>>>, -} - -impl Tributary { - pub async fn new( - db: D, - genesis: [u8; 32], - start_time: u64, - key: Zeroizing<::F>, - validators: Vec<(::G, u64)>, - p2p: P, - ) -> Option { - log::info!("new Tributary with genesis {}", hex::encode(genesis)); - - let validators_vec = validators.iter().map(|validator| validator.0).collect::>(); - - let signer = Arc::new(Signer::new(genesis, key)); - let validators = Arc::new(Validators::new(genesis, validators)?); - - let mut blockchain = Blockchain::new(db.clone(), genesis, &validators_vec); - let block_number = BlockNumber(blockchain.block_number()); - - let start_time = if let Some(commit) = blockchain.commit(&blockchain.tip()) { - Commit::::decode(&mut commit.as_ref()).unwrap().end_time - } else { - start_time + // Fetch the latest intended-to-be-cosigned block + let Some(latest_substrate_block_to_cosign) = + TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set) + else { + return; }; - let proposal = TendermintBlock( - blockchain.build_block::>(&validators).serialize(), + + // If it was already cosigned, return + if TributaryDb::cosigned(self.tributary_txn, self.set, latest_substrate_block_to_cosign) { + return; + } + + let Some(substrate_block_number) = + Cosigning::::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign) + else { + // This is a valid panic as we shouldn't be scanning this block if we didn't provide all + // Provided transactions within it, and the block to cosign is a Provided transaction + panic!("cosigning a block our cosigner didn't index") + }; + + // Mark us as actively cosigning + TributaryDb::start_cosigning( + self.tributary_txn, + self.set, + latest_substrate_block_to_cosign, + substrate_block_number, ); - let blockchain = Arc::new(RwLock::new(blockchain)); - - let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p }; - - let TendermintHandle { synced_block, synced_block_result, messages, machine } = - TendermintMachine::new( - db.clone(), - network.clone(), - genesis, - block_number, - start_time, - proposal, - ) - .await; - tokio::spawn(machine.run()); - - Some(Self { - db, - genesis, - network, - synced_block: Arc::new(RwLock::new(synced_block)), - synced_block_result: Arc::new(RwLock::new(synced_block_result)), - messages: Arc::new(RwLock::new(messages)), - }) - } - - pub fn block_time() -> u32 { - TendermintNetwork::::block_time() - } - - pub fn genesis(&self) -> [u8; 32] { - self.genesis - } - - pub async fn block_number(&self) -> u64 { - self.network.blockchain.read().await.block_number() - } - pub async fn tip(&self) -> [u8; 32] { - self.network.blockchain.read().await.tip() - } - - pub fn reader(&self) -> TributaryReader { - TributaryReader(self.db.clone(), self.genesis, PhantomData) - } - - pub async fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> { - self.network.blockchain.write().await.provide_transaction(tx) - } - - pub async fn next_nonce( - &self, - signer: &::G, - order: &[u8], - ) -> Option { - self.network.blockchain.read().await.next_nonce(signer, order) - } - - // Returns Ok(true) if new, Ok(false) if an already present unsigned, or the error. - // Safe to be &self since the only meaningful usage of self is self.network.blockchain which - // successfully acquires its own write lock - pub async fn add_transaction(&self, tx: T) -> Result { - let tx = Transaction::Application(tx); - let mut to_broadcast = vec![TRANSACTION_MESSAGE]; - tx.write(&mut to_broadcast).unwrap(); - let res = self.network.blockchain.write().await.add_transaction::>( - true, - tx, - &self.network.signature_scheme(), + // Send the message for the processor to start signing + TributaryDb::send_message( + self.tributary_txn, + self.set, + messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { + session: self.set.session, + block_number: substrate_block_number, + block: latest_substrate_block_to_cosign, + }, ); - if res == Ok(true) { - self.network.p2p.broadcast(self.genesis, to_broadcast).await; - } - res } + fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) { + let signer = |signed: Signed| SeraiAddress(signed.signer().to_bytes()); - async fn sync_block_internal( - &self, - block: Block, - commit: Vec, - result: &mut UnboundedReceiver, - ) -> bool { - let (tip, block_number) = { - let blockchain = self.network.blockchain.read().await; - (blockchain.tip(), blockchain.block_number()) - }; - - if block.header.parent != tip { - log::debug!("told to sync a block whose parent wasn't our tip"); - return false; + if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() { + // Don't handle transactions from those fatally slashed + // TODO: The fact they can publish these TXs makes this a notable spam vector + if TributaryDb::is_fatally_slashed( + self.tributary_txn, + self.set, + SeraiAddress(signer.to_bytes()), + ) { + return; + } } - let block = TendermintBlock(block.serialize()); - let mut commit_ref = commit.as_ref(); - let Ok(commit) = Commit::>::decode(&mut commit_ref) else { - log::error!("sent an invalidly serialized commit"); - return false; - }; - // Storage DoS vector. We *could* truncate to solely the relevant portion, trying to save this, - // yet then we'd have to test the truncation was performed correctly. - if !commit_ref.is_empty() { - log::error!("sent an commit with additional data after it"); - return false; - } - if !self.network.verify_commit(block.id(), &commit) { - log::error!("sent an invalid commit"); - return false; - } + match tx { + // Accumulate this vote and fatally slash the participant if past the threshold + Transaction::RemoveParticipant { participant, signed } => { + let signer = signer(signed); - let number = BlockNumber(block_number + 1); - self.synced_block.write().await.send(SyncedBlock { number, block, commit }).await.unwrap(); - result.next().await.unwrap() - } - - // Sync a block. - // TODO: Since we have a static validator set, we should only need the tail commit? - pub async fn sync_block(&self, block: Block, commit: Vec) -> bool { - let mut result = self.synced_block_result.write().await; - self.sync_block_internal(block, commit, &mut result).await - } - - // Return true if the message should be rebroadcasted. - pub async fn handle_message(&self, msg: &[u8]) -> bool { - match msg.first() { - Some(&TRANSACTION_MESSAGE) => { - let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else { - log::error!("received invalid transaction message"); - return false; - }; - - // TODO: Sync mempools with fellow peers - // Can we just rebroadcast transactions not included for at least two blocks? - let res = - self.network.blockchain.write().await.add_transaction::>( - false, - tx, - &self.network.signature_scheme(), + // Check the participant voted to be removed actually exists + if !self.validators.iter().any(|validator| *validator == participant) { + TributaryDb::fatal_slash( + self.tributary_txn, + self.set, + signer, + "voted to remove non-existent participant", ); - log::debug!("received transaction message. valid new transaction: {res:?}"); - res == Ok(true) - } + return; + } - Some(&TENDERMINT_MESSAGE) => { - let Ok(msg) = - SignedMessageFor::>::decode::<&[u8]>(&mut &msg[1 ..]) - else { - log::error!("received invalid tendermint message"); - return false; + match TributaryDb::accumulate( + self.tributary_txn, + self.set, + self.validators, + self.total_weight, + block_number, + Topic::RemoveParticipant { participant }, + signer, + self.validator_weights[&signer], + &(), + ) { + DataSet::None => {} + DataSet::Participating(_) => { + TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove"); + } }; - - self.messages.write().await.send(msg).await.unwrap(); - false } - _ => false, + // Send the participation to the processor + Transaction::DkgParticipation { participation, signed } => { + TributaryDb::send_message( + self.tributary_txn, + self.set, + messages::key_gen::CoordinatorMessage::Participation { + session: self.set.session, + participant: todo!("TODO"), + participation, + }, + ); + } + Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed } => { + // Accumulate the preprocesses into our own FROST attempt manager + todo!("TODO") + } + Transaction::DkgConfirmationShare { attempt, share, signed } => { + // Accumulate the shares into our own FROST attempt manager + todo!("TODO") + } + + Transaction::Cosign { substrate_block_hash } => { + // Update the latest intended-to-be-cosigned Substrate block + TributaryDb::set_latest_substrate_block_to_cosign( + self.tributary_txn, + self.set, + substrate_block_hash, + ); + // Start a new cosign if we aren't already working on one + self.potentially_start_cosign(); + } + Transaction::Cosigned { substrate_block_hash } => { + /* + We provide one Cosigned per Cosign transaction, but they have independent orders. This + means we may receive Cosigned before Cosign. In order to ensure we only start work on + not-yet-Cosigned cosigns, we flag all cosigned blocks as cosigned. Then, when we choose + the next block to work on, we won't if it's already been cosigned. + */ + TributaryDb::mark_cosigned(self.tributary_txn, self.set, substrate_block_hash); + + // If we aren't actively cosigning this block, return + // This occurs when we have Cosign TXs A, B, C, we received Cosigned for A and start on C, + // and then receive Cosigned for B + if TributaryDb::actively_cosigning(self.tributary_txn, self.set) != + Some(substrate_block_hash) + { + return; + } + + // Since this is the block we were cosigning, mark us as having finished cosigning + TributaryDb::finish_cosigning(self.tributary_txn, self.set); + + // Start working on the next cosign + self.potentially_start_cosign(); + } + Transaction::SubstrateBlock { hash } => { + // Whitelist all of the IDs this Substrate block causes to be signed + todo!("TODO") + } + Transaction::Batch { hash } => { + // Whitelist the signing of this batch, publishing our own preprocess + todo!("TODO") + } + + Transaction::SlashReport { slash_points, signed } => { + let signer = signer(signed); + + if slash_points.len() != self.validators.len() { + TributaryDb::fatal_slash( + self.tributary_txn, + self.set, + signer, + "slash report was for a distinct amount of signers", + ); + return; + } + + // Accumulate, and if past the threshold, calculate *the* slash report and start signing it + match TributaryDb::accumulate( + self.tributary_txn, + self.set, + self.validators, + self.total_weight, + block_number, + Topic::SlashReport, + signer, + self.validator_weights[&signer], + &slash_points, + ) { + DataSet::None => {} + DataSet::Participating(data_set) => { + // Find the median reported slashes for this validator + /* + TODO: This lets 34% perform a fatal slash. That shouldn't be allowed. We need + to accept slash reports for a period past the threshold, and only fatally slash if we + have a supermajority agree the slash should be fatal. If there isn't a supermajority, + but the median believe the slash should be fatal, we need to fallback to a large + constant. + + Also, TODO, each slash point should probably be considered as + `MAX_KEY_SHARES_PER_SET * BLOCK_TIME` seconds of downtime. As this time crosses + various thresholds (1 day, 3 days, etc), a multiplier should be attached. + */ + let mut median_slash_report = Vec::with_capacity(self.validators.len()); + for i in 0 .. self.validators.len() { + let mut this_validator = + data_set.values().map(|report| report[i]).collect::>(); + this_validator.sort_unstable(); + // Choose the median, where if there are two median values, the lower one is chosen + let median_index = if (this_validator.len() % 2) == 1 { + this_validator.len() / 2 + } else { + (this_validator.len() / 2) - 1 + }; + median_slash_report.push(this_validator[median_index]); + } + + // We only publish slashes for the `f` worst performers to: + // 1) Effect amnesty if there were network disruptions which affected everyone + // 2) Ensure the signing threshold doesn't have a disincentive to do their job + + // Find the worst performer within the signing threshold's slash points + let f = (self.validators.len() - 1) / 3; + let worst_validator_in_supermajority_slash_points = { + let mut sorted_slash_points = median_slash_report.clone(); + sorted_slash_points.sort_unstable(); + // This won't be a valid index if `f == 0`, which means we don't have any validators + // to slash + let index_of_first_validator_to_slash = self.validators.len() - f; + let index_of_worst_validator_in_supermajority = index_of_first_validator_to_slash - 1; + sorted_slash_points[index_of_worst_validator_in_supermajority] + }; + + // Perform the amortization + for slash_points in &mut median_slash_report { + *slash_points = + slash_points.saturating_sub(worst_validator_in_supermajority_slash_points) + } + let amortized_slash_report = median_slash_report; + + // Create the resulting slash report + let mut slash_report = vec![]; + for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) { + if points != 0 { + slash_report.push(Slash { key: validator.into(), points }); + } + } + assert!(slash_report.len() <= f); + + // Recognize the topic for signing the slash report + TributaryDb::recognize_topic( + self.tributary_txn, + self.set, + Topic::Sign { + id: VariantSignId::SlashReport, + attempt: 0, + round: SigningProtocolRound::Preprocess, + }, + ); + // Send the message for the processor to start signing + TributaryDb::send_message( + self.tributary_txn, + self.set, + messages::coordinator::CoordinatorMessage::SignSlashReport { + session: self.set.session, + report: slash_report, + }, + ); + } + }; + } + + Transaction::Sign { id, attempt, round, data, signed } => { + let topic = Topic::Sign { id, attempt, round }; + let signer = signer(signed); + + if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] { + TributaryDb::fatal_slash( + self.tributary_txn, + self.set, + signer, + "signer signed with a distinct amount of key shares than they had key shares", + ); + return; + } + + match TributaryDb::accumulate( + self.tributary_txn, + self.set, + self.validators, + self.total_weight, + block_number, + topic, + signer, + self.validator_weights[&signer], + &data, + ) { + DataSet::None => {} + DataSet::Participating(data_set) => { + let id = topic.sign_id(self.set).expect("Topic::Sign didn't have SignId"); + let flatten_data_set = |data_set| todo!("TODO"); + let data_set = flatten_data_set(data_set); + TributaryDb::send_message( + self.tributary_txn, + self.set, + match round { + SigningProtocolRound::Preprocess => { + messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set } + } + SigningProtocolRound::Share => { + messages::sign::CoordinatorMessage::Shares { id, shares: data_set } + } + }, + ) + } + }; + } } } - /// Get a Future which will resolve once the next block has been added. - pub async fn next_block_notification( - &self, - ) -> impl Send + Sync + core::future::Future> { - let (tx, rx) = tokio::sync::oneshot::channel(); - self.network.blockchain.write().await.next_block_notifications.push_back(tx); - rx + fn handle_block(mut self, block_number: u64, block: Block) { + TributaryDb::start_of_block(self.tributary_txn, self.set, block_number); + + for tx in block.transactions { + match tx { + TributaryTransaction::Tendermint(TendermintTx::SlashEvidence(ev)) => { + // Since the evidence is on the chain, it will have already been validated + // We can just punish the signer + let data = match ev { + Evidence::ConflictingMessages(first, second) => (first, Some(second)), + Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None), + }; + let msgs = ( + decode_signed_message::>(&data.0).unwrap(), + if data.1.is_some() { + Some( + decode_signed_message::>(&data.1.unwrap()) + .unwrap(), + ) + } else { + None + }, + ); + + // Since anything with evidence is fundamentally faulty behavior, not just temporal + // errors, mark the node as fatally slashed + TributaryDb::fatal_slash( + self.tributary_txn, + self.set, + SeraiAddress(msgs.0.msg.sender), + &format!("invalid tendermint messages: {msgs:?}"), + ); + } + TributaryTransaction::Application(tx) => { + self.handle_application_tx(block_number, tx); + } + } + } } } -#[derive(Clone)] -pub struct TributaryReader(D, [u8; 32], PhantomData); -impl TributaryReader { - pub fn genesis(&self) -> [u8; 32] { - self.1 - } +/// The task to scan the Tributary, populating `ProcessorMessages`. +pub struct ScanTributaryTask { + cosign_db: CD, + tributary_db: TD, + set: ValidatorSet, + validators: Vec, + total_weight: u64, + validator_weights: HashMap, + tributary: TributaryReader, + _p2p: PhantomData

, +} - // Since these values are static once set, they can be safely read from the database without lock - // acquisition - pub fn block(&self, hash: &[u8; 32]) -> Option> { - Blockchain::::block_from_db(&self.0, self.1, hash) - } - pub fn commit(&self, hash: &[u8; 32]) -> Option> { - Blockchain::::commit_from_db(&self.0, self.1, hash) - } - pub fn parsed_commit(&self, hash: &[u8; 32]) -> Option> { - self.commit(hash).map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap()) - } - pub fn block_after(&self, hash: &[u8; 32]) -> Option<[u8; 32]> { - Blockchain::::block_after(&self.0, self.1, hash) - } - pub fn time_of_block(&self, hash: &[u8; 32]) -> Option { - self - .commit(hash) - .map(|commit| Commit::::decode(&mut commit.as_ref()).unwrap().end_time) - } +impl ScanTributaryTask { + /// Create a new instance of this task. + pub fn new( + cosign_db: CD, + tributary_db: TD, + new_set: &NewSetInformation, + tributary: TributaryReader, + ) -> Self { + let mut validators = Vec::with_capacity(new_set.validators.len()); + let mut total_weight = 0; + let mut validator_weights = HashMap::with_capacity(new_set.validators.len()); + for (validator, weight) in new_set.validators.iter().copied() { + let validator = SeraiAddress::from(validator); + let weight = u64::from(weight); + validators.push(validator); + total_weight += weight; + validator_weights.insert(validator, weight); + } - pub fn locally_provided_txs_in_block(&self, hash: &[u8; 32], order: &str) -> bool { - Blockchain::::locally_provided_txs_in_block(&self.0, &self.1, hash, order) - } - - // This isn't static, yet can be read with only minor discrepancy risks - pub fn tip(&self) -> [u8; 32] { - Blockchain::::tip_from_db(&self.0, self.1) + ScanTributaryTask { + cosign_db, + tributary_db, + set: new_set.set, + validators, + total_weight, + validator_weights, + tributary, + _p2p: PhantomData, + } + } +} + +impl ContinuallyRan for ScanTributaryTask { + fn run_iteration(&mut self) -> impl Send + Future> { + async move { + let (mut last_block_number, mut last_block_hash) = + TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set) + .unwrap_or((0, self.tributary.genesis())); + + let mut made_progress = false; + while let Some(next) = self.tributary.block_after(&last_block_hash) { + let block = self.tributary.block(&next).unwrap(); + let block_number = last_block_number + 1; + let block_hash = block.hash(); + + // Make sure we have all of the provided transactions for this block + for tx in &block.transactions { + let TransactionKind::Provided(order) = tx.kind() else { + continue; + }; + + // make sure we have all the provided txs in this block locally + if !self.tributary.locally_provided_txs_in_block(&block_hash, order) { + return Err(format!( + "didn't have the provided Transactions on-chain for set (ephemeral error): {:?}", + self.set + )); + } + } + + let mut tributary_txn = self.tributary_db.txn(); + (ScanBlock { + _td: PhantomData::, + _p2p: PhantomData::

, + cosign_db: &self.cosign_db, + tributary_txn: &mut tributary_txn, + set: self.set, + validators: &self.validators, + total_weight: self.total_weight, + validator_weights: &self.validator_weights, + }) + .handle_block(block_number, block); + TributaryDb::set_last_handled_tributary_block( + &mut tributary_txn, + self.set, + block_number, + block_hash, + ); + last_block_number = block_number; + last_block_hash = block_hash; + tributary_txn.commit(); + + made_progress = true; + } + + Ok(made_progress) + } } } diff --git a/coordinator/tributary/src/transaction.rs b/coordinator/tributary/src/transaction.rs index d7ff4092..f9fd016d 100644 --- a/coordinator/tributary/src/transaction.rs +++ b/coordinator/tributary/src/transaction.rs @@ -1,218 +1,353 @@ -use core::fmt::Debug; +use core::{ops::Deref, fmt::Debug}; use std::io; -use zeroize::Zeroize; -use thiserror::Error; - -use blake2::{Digest, Blake2b512}; +use zeroize::Zeroizing; +use rand_core::{RngCore, CryptoRng}; +use blake2::{digest::typenum::U32, Digest, Blake2b}; use ciphersuite::{ - group::{Group, GroupEncoding}, + group::{ff::Field, GroupEncoding}, Ciphersuite, Ristretto, }; use schnorr::SchnorrSignature; -use crate::{TRANSACTION_SIZE_LIMIT, ReadWrite}; +use scale::Encode; +use borsh::{BorshSerialize, BorshDeserialize}; -#[derive(Clone, PartialEq, Eq, Debug, Error)] -pub enum TransactionError { - /// Transaction exceeded the size limit. - #[error("transaction is too large")] - TooLargeTransaction, - /// Transaction's signer isn't a participant. - #[error("invalid signer")] - InvalidSigner, - /// Transaction's nonce isn't the prior nonce plus one. - #[error("invalid nonce")] - InvalidNonce, - /// Transaction's signature is invalid. - #[error("invalid signature")] - InvalidSignature, - /// Transaction's content is invalid. - #[error("transaction content is invalid")] - InvalidContent, - /// Transaction's signer has too many transactions in the mempool. - #[error("signer has too many transactions in the mempool")] - TooManyInMempool, - /// Provided Transaction added to mempool. - #[error("provided transaction added to mempool")] - ProvidedAddedToMempool, +use serai_client::{primitives::SeraiAddress, validator_sets::primitives::MAX_KEY_SHARES_PER_SET}; + +use messages::sign::VariantSignId; + +use tributary_sdk::{ + ReadWrite, + transaction::{ + Signed as TributarySigned, TransactionError, TransactionKind, Transaction as TransactionTrait, + }, +}; + +/// The round this data is for, within a signing protocol. +#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)] +pub enum SigningProtocolRound { + /// A preprocess. + Preprocess, + /// A signature share. + Share, } -/// Data for a signed transaction. -#[derive(Clone, PartialEq, Eq, Debug)] -pub struct Signed { - pub signer: ::G, - pub nonce: u32, - pub signature: SchnorrSignature, -} - -impl ReadWrite for Signed { - fn read(reader: &mut R) -> io::Result { - let signer = Ristretto::read_G(reader)?; - - let mut nonce = [0; 4]; - reader.read_exact(&mut nonce)?; - let nonce = u32::from_le_bytes(nonce); - if nonce >= (u32::MAX - 1) { - Err(io::Error::other("nonce exceeded limit"))?; +impl SigningProtocolRound { + fn nonce(&self) -> u32 { + match self { + SigningProtocolRound::Preprocess => 0, + SigningProtocolRound::Share => 1, } - - let mut signature = SchnorrSignature::::read(reader)?; - if signature.R.is_identity().into() { - // Anyone malicious could remove this and try to find zero signatures - // We should never produce zero signatures though meaning this should never come up - // If it does somehow come up, this is a decent courtesy - signature.zeroize(); - Err(io::Error::other("signature nonce was identity"))?; - } - - Ok(Signed { signer, nonce, signature }) } +} - fn write(&self, writer: &mut W) -> io::Result<()> { - // This is either an invalid signature or a private key leak - if self.signature.R.is_identity().into() { - Err(io::Error::other("signature nonce was identity"))?; - } - writer.write_all(&self.signer.to_bytes())?; - writer.write_all(&self.nonce.to_le_bytes())?; +/// `tributary::Signed` but without the nonce. +/// +/// All of our nonces are deterministic to the type of transaction and fields within. +#[derive(Clone, Copy, PartialEq, Eq, Debug)] +pub struct Signed { + /// The signer. + signer: ::G, + /// The signature. + signature: SchnorrSignature, +} + +impl BorshSerialize for Signed { + fn serialize(&self, writer: &mut W) -> Result<(), io::Error> { + writer.write_all(self.signer.to_bytes().as_ref())?; self.signature.write(writer) } } +impl BorshDeserialize for Signed { + fn deserialize_reader(reader: &mut R) -> Result { + let signer = Ristretto::read_G(reader)?; + let signature = SchnorrSignature::read(reader)?; + Ok(Self { signer, signature }) + } +} impl Signed { - pub fn read_without_nonce(reader: &mut R, nonce: u32) -> io::Result { - let signer = Ristretto::read_G(reader)?; - - let mut signature = SchnorrSignature::::read(reader)?; - if signature.R.is_identity().into() { - // Anyone malicious could remove this and try to find zero signatures - // We should never produce zero signatures though meaning this should never come up - // If it does somehow come up, this is a decent courtesy - signature.zeroize(); - Err(io::Error::other("signature nonce was identity"))?; - } - - Ok(Signed { signer, nonce, signature }) + /// Fetch the signer. + pub(crate) fn signer(&self) -> ::G { + self.signer } - pub fn write_without_nonce(&self, writer: &mut W) -> io::Result<()> { - // This is either an invalid signature or a private key leak - if self.signature.R.is_identity().into() { - Err(io::Error::other("signature nonce was identity"))?; - } - writer.write_all(&self.signer.to_bytes())?; - self.signature.write(writer) + /// Provide a nonce to convert a `Signed` into a `tributary::Signed`. + fn to_tributary_signed(self, nonce: u32) -> TributarySigned { + TributarySigned { signer: self.signer, nonce, signature: self.signature } } } -#[allow(clippy::large_enum_variant)] -#[derive(Clone, PartialEq, Eq, Debug)] -pub enum TransactionKind { - /// This transaction should be provided by every validator, in an exact order. - /// - /// The contained static string names the orderer to use. This allows two distinct provided - /// transaction kinds, without a synchronized order, to be ordered within their own kind without - /// requiring ordering with each other. - /// - /// The only malleability is in when this transaction appears on chain. The block producer will - /// include it when they have it. Block verification will fail for validators without it. - /// - /// If a supermajority of validators produce a commit for a block with a provided transaction - /// which isn't locally held, the block will be added to the local chain. When the transaction is - /// locally provided, it will be compared for correctness to the on-chain version - /// - /// In order to ensure TXs aren't accidentally provided multiple times, all provided transactions - /// must have a unique hash which is also unique to all Unsigned transactions. - Provided(&'static str), +/// The Tributary transaction definition used by Serai +#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)] +pub enum Transaction { + /// A vote to remove a participant for invalid behavior + RemoveParticipant { + /// The participant to remove + participant: SeraiAddress, + /// The transaction's signer and signature + signed: Signed, + }, - /// An unsigned transaction, only able to be included by the block producer. - /// - /// Once an Unsigned transaction is included on-chain, it may not be included again. In order to - /// have multiple Unsigned transactions with the same values included on-chain, some distinct - /// nonce must be included in order to cause a distinct hash. - /// - /// The hash must also be unique with all Provided transactions. - Unsigned, + /// A participation in the DKG + DkgParticipation { + /// The serialized participation + participation: Vec, + /// The transaction's signer and signature + signed: Signed, + }, + /// The preprocess to confirm the DKG results on-chain + DkgConfirmationPreprocess { + /// The attempt number of this signing protocol + attempt: u32, + /// The preprocess + preprocess: [u8; 64], + /// The transaction's signer and signature + signed: Signed, + }, + /// The signature share to confirm the DKG results on-chain + DkgConfirmationShare { + /// The attempt number of this signing protocol + attempt: u32, + /// The signature share + share: [u8; 32], + /// The transaction's signer and signature + signed: Signed, + }, - /// A signed transaction. - Signed(Vec, Signed), + /// Intend to cosign a finalized Substrate block + /// + /// When the time comes to start a new cosigning protocol, the most recent Substrate block will + /// be the one selected to be cosigned. + Cosign { + /// The hash of the Substrate block to cosign + substrate_block_hash: [u8; 32], + }, + + /// Note an intended-to-be-cosigned Substrate block as cosigned + /// + /// After producing this cosign, we need to start work on the latest intended-to-be cosigned + /// block. That requires agreement on when this cosign was produced, which we solve by noting + /// this cosign on-chain. + /// + /// We ideally don't have this transaction at all. The coordinator, without access to any of the + /// key shares, could observe the FROST signing session and determine a successful completion. + /// Unfortunately, that functionality is not present in modular-frost, so we do need to support + /// *some* asynchronous flow (where the processor or P2P network informs us of the successful + /// completion). + /// + /// If we use a `Provided` transaction, that requires everyone observe this cosign. + /// + /// If we use an `Unsigned` transaction, we can't verify the cosign signature inside + /// `Transaction::verify` unless we embedded the full `SignedCosign` on-chain. The issue is since + /// a Tributary is stateless with regards to the on-chain logic, including `Transaction::verify`, + /// we can't verify the signature against the group's public key unless we also include that (but + /// then we open a DoS where arbitrary group keys are specified to cause inclusion of arbitrary + /// blobs on chain). + /// + /// If we use a `Signed` transaction, we mitigate the DoS risk by having someone to fatally + /// slash. We have horrible performance though as for 100 validators, all 100 will publish this + /// transaction. + /// + /// We could use a signed `Unsigned` transaction, where it includes a signer and signature but + /// isn't technically a Signed transaction. This lets us de-duplicate the transaction premised on + /// its contents. + /// + /// The optimal choice is likely to use a `Provided` transaction. We don't actually need to + /// observe the produced cosign (which is ephemeral). As long as it's agreed the cosign in + /// question no longer needs to produced, which would mean the cosigning protocol at-large + /// cosigning the block in question, it'd be safe to provide this and move on to the next cosign. + Cosigned { + /// The hash of the Substrate block which was cosigned + substrate_block_hash: [u8; 32], + }, + + /// Acknowledge a Substrate block + /// + /// This is provided after the block has been cosigned. + /// + /// With the acknowledgement of a Substrate block, we can whitelist all the `VariantSignId`s + /// resulting from its handling. + SubstrateBlock { + /// The hash of the Substrate block + hash: [u8; 32], + }, + + /// Acknowledge a Batch + /// + /// Once everyone has acknowledged the Batch, we can begin signing it. + Batch { + /// The hash of the Batch's serialization. + /// + /// Generally, we refer to a Batch by its ID/the hash of its instructions. Here, we want to + /// ensure consensus on the Batch, and achieving consensus on its hash is the most effective + /// way to do that. + hash: [u8; 32], + }, + + /// Data from a signing protocol. + Sign { + /// The ID of the object being signed + id: VariantSignId, + /// The attempt number of this signing protocol + attempt: u32, + /// The round this data is for, within the signing protocol + round: SigningProtocolRound, + /// The data itself + /// + /// There will be `n` blobs of data where `n` is the amount of key shares the validator sending + /// this transaction has. + data: Vec>, + /// The transaction's signer and signature + signed: Signed, + }, + + /// The local view of slashes observed by the transaction's sender + SlashReport { + /// The slash points accrued by each validator + slash_points: Vec, + /// The transaction's signer and signature + signed: Signed, + }, } -// TODO: Should this be renamed TransactionTrait now that a literal Transaction exists? -// Or should the literal Transaction be renamed to Event? -pub trait Transaction: 'static + Send + Sync + Clone + Eq + Debug + ReadWrite { - /// Return what type of transaction this is. - fn kind(&self) -> TransactionKind; +impl ReadWrite for Transaction { + fn read(reader: &mut R) -> io::Result { + borsh::from_reader(reader) + } - /// Return the hash of this transaction. - /// - /// The hash must NOT commit to the signature. - fn hash(&self) -> [u8; 32]; + fn write(&self, writer: &mut W) -> io::Result<()> { + borsh::to_writer(writer, self) + } +} - /// Perform transaction-specific verification. - fn verify(&self) -> Result<(), TransactionError>; +impl TransactionTrait for Transaction { + fn kind(&self) -> TransactionKind { + match self { + Transaction::RemoveParticipant { participant, signed } => TransactionKind::Signed( + (b"RemoveParticipant", participant).encode(), + signed.to_tributary_signed(0), + ), - /// Obtain the challenge for this transaction's signature. - /// - /// Do not override this unless you know what you're doing. - /// - /// Panics if called on non-signed transactions. - fn sig_hash(&self, genesis: [u8; 32]) -> ::F { - match self.kind() { - TransactionKind::Signed(order, Signed { signature, .. }) => { - ::F::from_bytes_mod_order_wide( - &Blake2b512::digest( - [ - b"Tributary Signed Transaction", - genesis.as_ref(), - &self.hash(), - order.as_ref(), - signature.R.to_bytes().as_ref(), - ] - .concat(), - ) - .into(), - ) + Transaction::DkgParticipation { signed, .. } => { + TransactionKind::Signed(b"DkgParticipation".encode(), signed.to_tributary_signed(0)) + } + Transaction::DkgConfirmationPreprocess { attempt, signed, .. } => TransactionKind::Signed( + (b"DkgConfirmation", attempt).encode(), + signed.to_tributary_signed(0), + ), + Transaction::DkgConfirmationShare { attempt, signed, .. } => TransactionKind::Signed( + (b"DkgConfirmation", attempt).encode(), + signed.to_tributary_signed(1), + ), + + Transaction::Cosign { .. } => TransactionKind::Provided("Cosign"), + Transaction::Cosigned { .. } => TransactionKind::Provided("Cosigned"), + // TODO: Provide this + Transaction::SubstrateBlock { .. } => TransactionKind::Provided("SubstrateBlock"), + // TODO: Provide this + Transaction::Batch { .. } => TransactionKind::Provided("Batch"), + + Transaction::Sign { id, attempt, round, signed, .. } => TransactionKind::Signed( + (b"Sign", id, attempt).encode(), + signed.to_tributary_signed(round.nonce()), + ), + + Transaction::SlashReport { signed, .. } => { + TransactionKind::Signed(b"SlashReport".encode(), signed.to_tributary_signed(0)) } - _ => panic!("sig_hash called on non-signed transaction"), } } -} -pub trait GAIN: FnMut(&::G, &[u8]) -> Option {} -impl::G, &[u8]) -> Option> GAIN for F {} - -pub(crate) fn verify_transaction( - tx: &T, - genesis: [u8; 32], - get_and_increment_nonce: &mut F, -) -> Result<(), TransactionError> { - if tx.serialize().len() > TRANSACTION_SIZE_LIMIT { - Err(TransactionError::TooLargeTransaction)?; + fn hash(&self) -> [u8; 32] { + let mut tx = ReadWrite::serialize(self); + if let TransactionKind::Signed(_, signed) = self.kind() { + // Make sure the part we're cutting off is the signature + assert_eq!(tx.drain((tx.len() - 64) ..).collect::>(), signed.signature.serialize()); + } + Blake2b::::digest(&tx).into() } - tx.verify()?; + // This is a stateless verification which we use to enforce some size limits. + fn verify(&self) -> Result<(), TransactionError> { + #[allow(clippy::match_same_arms)] + match self { + // Fixed-length TX + Transaction::RemoveParticipant { .. } => {} - match tx.kind() { - TransactionKind::Provided(_) | TransactionKind::Unsigned => {} - TransactionKind::Signed(order, Signed { signer, nonce, signature }) => { - if let Some(next_nonce) = get_and_increment_nonce(&signer, &order) { - if nonce != next_nonce { - Err(TransactionError::InvalidNonce)?; + // TODO: MAX_DKG_PARTICIPATION_LEN + Transaction::DkgParticipation { .. } => {} + // These are fixed-length TXs + Transaction::DkgConfirmationPreprocess { .. } | Transaction::DkgConfirmationShare { .. } => {} + + // Provided TXs + Transaction::Cosign { .. } | + Transaction::Cosigned { .. } | + Transaction::SubstrateBlock { .. } | + Transaction::Batch { .. } => {} + + Transaction::Sign { data, .. } => { + if data.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() { + Err(TransactionError::InvalidContent)? } - } else { - // Not a participant - Err(TransactionError::InvalidSigner)?; + // TODO: MAX_SIGN_LEN } - // TODO: Use a batch verification here - if !signature.verify(signer, tx.sig_hash(genesis)) { - Err(TransactionError::InvalidSignature)?; + Transaction::SlashReport { slash_points, .. } => { + if slash_points.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() { + Err(TransactionError::InvalidContent)? + } + } + }; + Ok(()) + } +} + +impl Transaction { + /// Sign a transaction. + /// + /// Panics if signing a transaction whose type isn't `TransactionKind::Signed`. + pub fn sign( + &mut self, + rng: &mut R, + genesis: [u8; 32], + key: &Zeroizing<::F>, + ) { + fn signed(tx: &mut Transaction) -> &mut Signed { + #[allow(clippy::match_same_arms)] // This doesn't make semantic sense here + match tx { + Transaction::RemoveParticipant { ref mut signed, .. } | + Transaction::DkgParticipation { ref mut signed, .. } | + Transaction::DkgConfirmationPreprocess { ref mut signed, .. } => signed, + Transaction::DkgConfirmationShare { ref mut signed, .. } => signed, + + Transaction::Cosign { .. } => panic!("signing CosignSubstrateBlock"), + Transaction::Cosigned { .. } => panic!("signing Cosigned"), + Transaction::SubstrateBlock { .. } => panic!("signing SubstrateBlock"), + Transaction::Batch { .. } => panic!("signing Batch"), + + Transaction::Sign { ref mut signed, .. } => signed, + + Transaction::SlashReport { ref mut signed, .. } => signed, } } - } - Ok(()) + // Decide the nonce to sign with + let sig_nonce = Zeroizing::new(::F::random(rng)); + + { + // Set the signer and the nonce + let signed = signed(self); + signed.signer = Ristretto::generator() * key.deref(); + signed.signature.R = ::generator() * sig_nonce.deref(); + } + + // Get the signature hash (which now includes `R || A` making it valid as the challenge) + let sig_hash = self.sig_hash(genesis); + + // Sign the signature + signed(self).signature = SchnorrSignature::::sign(key, sig_nonce, sig_hash); + } } diff --git a/deny.toml b/deny.toml index f530b6a2..23ddd386 100644 --- a/deny.toml +++ b/deny.toml @@ -72,9 +72,10 @@ exceptions = [ { allow = ["AGPL-3.0"], name = "serai-ethereum-processor" }, { allow = ["AGPL-3.0"], name = "serai-monero-processor" }, - { allow = ["AGPL-3.0"], name = "tributary-chain" }, + { allow = ["AGPL-3.0"], name = "tributary-sdk" }, { allow = ["AGPL-3.0"], name = "serai-cosign" }, { allow = ["AGPL-3.0"], name = "serai-coordinator-substrate" }, + { allow = ["AGPL-3.0"], name = "serai-coordinator-tributary" }, { allow = ["AGPL-3.0"], name = "serai-coordinator-p2p" }, { allow = ["AGPL-3.0"], name = "serai-coordinator-libp2p-p2p" }, { allow = ["AGPL-3.0"], name = "serai-coordinator" },