10 Commits

Author SHA1 Message Date
Luke Parker
f501d46d44 Correct disabling of Nagle's algorithm 2025-01-11 06:54:43 -05:00
Luke Parker
74106b025f Publish SlashReport onto the Tributary 2025-01-11 06:51:55 -05:00
Luke Parker
e731b546ab Update documentation 2025-01-11 05:13:43 -05:00
Luke Parker
77d60660d2 Move spawn_cosign from main.rs into tributary.rs
Also refines the tasks within tributary.rs a good bit.
2025-01-11 05:12:56 -05:00
Luke Parker
3c664ff05f Re-arrange coordinator/
coordinator/tributary was tributary-chain. This crate has been renamed
tributary-sdk and moved to coordinator/tributary-sdk.

coordinator/src/tributary was our instantion of a Tributary, the Transaction
type and scan task. This has been moved to coordinator/tributary.

The main reason for this was due to coordinator/main.rs becoming untidy. There
is now a collection of clean, independent APIs present in the codebase.
coordinator/main.rs is to compose them. Sometimes, these compositions are a bit
silly (reading from a channel just to forward the message to a distinct
channel). That's more than fine as the code is still readable and the value
from the cleanliness of the APIs composed far exceeds the nits from having
these odd compositions.

This breaks down a bit as we now define a global database, and have some APIs
interact with multiple other APIs.

coordinator/src/tributary was a self-contained, clean API. The recently added
task present in coordinator/tributary/mod.rs, which bound it to the rest of the
Coordinator, wasn't.

Now, coordinator/src is solely the API compositions, and all self-contained
APIs are their own crates.
2025-01-11 04:14:21 -05:00
Luke Parker
c05b0c9eba Handle Canonical, NewSet from serai-coordinator-substrate 2025-01-11 03:07:15 -05:00
Luke Parker
6d5049cab2 Move the task providing transactions onto the Tributary to the Tributary module
Slims down the main file a bit
2025-01-11 02:13:23 -05:00
Luke Parker
1419ba570a Route from tributary scanner to message-queue 2025-01-11 01:55:36 -05:00
Luke Parker
542bf2170a Provide Cosign/CosignIntent for Tributaries 2025-01-11 01:31:28 -05:00
Luke Parker
378d6b90cf Delete old Tributaries on reboot 2025-01-10 20:10:05 -05:00
63 changed files with 2278 additions and 1546 deletions

View File

@@ -173,10 +173,11 @@ jobs:
- name: Run cargo msrv on coordinator
run: |
cargo msrv verify --manifest-path coordinator/tributary/tendermint/Cargo.toml
cargo msrv verify --manifest-path coordinator/tributary/Cargo.toml
cargo msrv verify --manifest-path coordinator/tributary-sdk/tendermint/Cargo.toml
cargo msrv verify --manifest-path coordinator/tributary-sdk/Cargo.toml
cargo msrv verify --manifest-path coordinator/cosign/Cargo.toml
cargo msrv verify --manifest-path coordinator/substrate/Cargo.toml
cargo msrv verify --manifest-path coordinator/tributary/Cargo.toml
cargo msrv verify --manifest-path coordinator/p2p/Cargo.toml
cargo msrv verify --manifest-path coordinator/p2p/libp2p/Cargo.toml
cargo msrv verify --manifest-path coordinator/Cargo.toml

View File

@@ -60,9 +60,10 @@ jobs:
-p serai-ethereum-processor \
-p serai-monero-processor \
-p tendermint-machine \
-p tributary-chain \
-p tributary-sdk \
-p serai-cosign \
-p serai-coordinator-substrate \
-p serai-coordinator-tributary \
-p serai-coordinator-p2p \
-p serai-coordinator-libp2p-p2p \
-p serai-coordinator \

32
Cargo.lock generated
View File

@@ -8331,16 +8331,15 @@ dependencies = [
"serai-coordinator-libp2p-p2p",
"serai-coordinator-p2p",
"serai-coordinator-substrate",
"serai-coordinator-tributary",
"serai-cosign",
"serai-db",
"serai-env",
"serai-message-queue",
"serai-processor-messages",
"serai-task",
"sp-application-crypto",
"sp-runtime",
"tokio",
"tributary-chain",
"tributary-sdk",
"zalloc",
"zeroize",
]
@@ -8363,7 +8362,7 @@ dependencies = [
"serai-cosign",
"serai-task",
"tokio",
"tributary-chain",
"tributary-sdk",
"void",
"zeroize",
]
@@ -8380,7 +8379,7 @@ dependencies = [
"serai-db",
"serai-task",
"tokio",
"tributary-chain",
"tributary-sdk",
]
[[package]]
@@ -8424,6 +8423,27 @@ dependencies = [
"zeroize",
]
[[package]]
name = "serai-coordinator-tributary"
version = "0.1.0"
dependencies = [
"blake2",
"borsh",
"ciphersuite",
"log",
"parity-scale-codec",
"rand_core",
"schnorr-signatures",
"serai-client",
"serai-coordinator-substrate",
"serai-cosign",
"serai-db",
"serai-processor-messages",
"serai-task",
"tributary-sdk",
"zeroize",
]
[[package]]
name = "serai-cosign"
version = "0.1.0"
@@ -10977,7 +10997,7 @@ dependencies = [
]
[[package]]
name = "tributary-chain"
name = "tributary-sdk"
version = "0.1.0"
dependencies = [
"blake2",

View File

@@ -96,10 +96,11 @@ members = [
"processor/ethereum",
"processor/monero",
"coordinator/tributary/tendermint",
"coordinator/tributary",
"coordinator/tributary-sdk/tendermint",
"coordinator/tributary-sdk",
"coordinator/cosign",
"coordinator/substrate",
"coordinator/tributary",
"coordinator/p2p",
"coordinator/p2p/libp2p",
"coordinator",

View File

@@ -39,7 +39,7 @@ serai-task = { path = "../common/task", version = "0.1" }
messages = { package = "serai-processor-messages", path = "../processor/messages" }
message-queue = { package = "serai-message-queue", path = "../message-queue" }
tributary = { package = "tributary-chain", path = "./tributary" }
tributary-sdk = { path = "./tributary-sdk" }
serai-client = { path = "../substrate/client", default-features = false, features = ["serai", "borsh"] }
@@ -53,10 +53,11 @@ tokio = { version = "1", default-features = false, features = ["time", "sync", "
serai-cosign = { path = "./cosign" }
serai-coordinator-substrate = { path = "./substrate" }
serai-coordinator-tributary = { path = "./tributary" }
serai-coordinator-p2p = { path = "./p2p" }
serai-coordinator-libp2p-p2p = { path = "./p2p/libp2p" }
[features]
longer-reattempts = [] # TODO
longer-reattempts = ["serai-coordinator-tributary/longer-reattempts"]
parity-db = ["serai-db/parity-db"]
rocksdb = ["serai-db/rocksdb"]

View File

@@ -1,19 +1,29 @@
# Coordinator
- [`tendermint`](/tributary/tendermint) is an implementation of the Tendermint BFT algorithm.
- [`tendermint`](/tributary/tendermint) is an implementation of the Tendermint
BFT algorithm.
- [`tributary`](./tributary) is a micro-blockchain framework. Instead of a producing a blockchain
daemon like the Polkadot SDK or Cosmos SDK intend to, `tributary` is solely intended to be an
embedded asynchronous task within an application.
- [`tributary-sdk`](./tributary-sdk) is a micro-blockchain framework. Instead
of a producing a blockchain daemon like the Polkadot SDK or Cosmos SDK intend
to, `tributary` is solely intended to be an embedded asynchronous task within
an application.
The Serai coordinator spawns a tributary for each validator set it's coordinating. This allows
the participating validators to communicate in a byzantine-fault-tolerant manner (relying on
Tendermint for consensus).
The Serai coordinator spawns a tributary for each validator set it's
coordinating. This allows the participating validators to communicate in a
byzantine-fault-tolerant manner (relying on Tendermint for consensus).
- [`cosign`](./cosign) contains a library to decide which Substrate blocks should be cosigned and
to evaluate cosigns.
- [`cosign`](./cosign) contains a library to decide which Substrate blocks
should be cosigned and to evaluate cosigns.
- [`substrate`](./substrate) contains a library to index the Substrate blockchain and handle its
events.
- [`substrate`](./substrate) contains a library to index the Substrate
blockchain and handle its events.
- [`tributary`](./tributary) is our instantiation of the Tributary SDK for the
Serai processor. It includes the `Transaction` definition and deferred
execution logic.
- [`p2p`](./p2p) is our abstract P2P API to service the Coordinator.
- [`libp2p`](./p2p/libp2p) is our libp2p-backed implementation of the P2P API.
- [`src`](./src) contains the source code for the Coordinator binary itself.

View File

@@ -82,13 +82,13 @@ enum HasEvents {
#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct CosignIntent {
/// The global session this cosign is being performed under.
global_session: [u8; 32],
pub global_session: [u8; 32],
/// The number of the block to cosign.
block_number: u64,
pub block_number: u64,
/// The hash of the block to cosign.
block_hash: [u8; 32],
pub block_hash: [u8; 32],
/// If this cosign must be handled before further cosigns are.
notable: bool,
pub notable: bool,
}
/// A cosign.

View File

@@ -24,7 +24,7 @@ serai-db = { path = "../../common/db", version = "0.1" }
serai-client = { path = "../../substrate/client", default-features = false, features = ["serai", "borsh"] }
serai-cosign = { path = "../cosign" }
tributary = { package = "tributary-chain", path = "../tributary" }
tributary-sdk = { path = "../tributary-sdk" }
futures-lite = { version = "2", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false, features = ["sync", "macros"] }

View File

@@ -31,7 +31,7 @@ borsh = { version = "1", default-features = false, features = ["std", "derive",
serai-client = { path = "../../../substrate/client", default-features = false, features = ["serai", "borsh"] }
serai-cosign = { path = "../../cosign" }
tributary = { package = "tributary-chain", path = "../../tributary" }
tributary-sdk = { path = "../../tributary-sdk" }
void = { version = "1", default-features = false }
futures-util = { version = "0.3", default-features = false, features = ["std"] }

View File

@@ -13,7 +13,7 @@ pub use libp2p::gossipsub::Event;
use serai_cosign::SignedCosign;
// Block size limit + 16 KB of space for signatures/metadata
pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
pub(crate) const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary_sdk::BLOCK_SIZE_LIMIT + 16384;
const LIBP2P_PROTOCOL: &str = "/serai/coordinator/gossip/1.0.0";
const BASE_TOPIC: &str = "/";
@@ -42,9 +42,10 @@ pub(crate) type Behavior = Behaviour<IdentityTransform, AllowAllSubscriptionFilt
pub(crate) fn new_behavior() -> Behavior {
// The latency used by the Tendermint protocol, used here as the gossip epoch duration
// libp2p-rs defaults to 1 second, whereas ours will be ~2
let heartbeat_interval = tributary::tendermint::LATENCY_TIME;
let heartbeat_interval = tributary_sdk::tendermint::LATENCY_TIME;
// The amount of heartbeats which will occur within a single Tributary block
let heartbeats_per_block = tributary::tendermint::TARGET_BLOCK_TIME.div_ceil(heartbeat_interval);
let heartbeats_per_block =
tributary_sdk::tendermint::TARGET_BLOCK_TIME.div_ceil(heartbeat_interval);
// libp2p-rs defaults to 5, whereas ours will be ~8
let heartbeats_to_keep = 2 * heartbeats_per_block;
// libp2p-rs defaults to 3 whereas ours will be ~4

View File

@@ -188,7 +188,7 @@ impl Libp2p {
let mut swarm = SwarmBuilder::with_existing_identity(identity::Keypair::generate_ed25519())
.with_tokio()
.with_tcp(TcpConfig::default().nodelay(false), new_only_validators, new_yamux)
.with_tcp(TcpConfig::default().nodelay(true), new_only_validators, new_yamux)
.unwrap()
.with_behaviour(|_| Behavior {
allow_list: allow_block_list::Behaviour::default(),
@@ -259,7 +259,7 @@ impl Libp2p {
}
}
impl tributary::P2p for Libp2p {
impl tributary_sdk::P2p for Libp2p {
fn broadcast(&self, tributary: [u8; 32], message: Vec<u8>) -> impl Send + Future<Output = ()> {
async move {
self

View File

@@ -1,6 +1,6 @@
use core::time::Duration;
use tributary::tendermint::LATENCY_TIME;
use tributary_sdk::tendermint::LATENCY_TIME;
use libp2p::ping::{self, Config, Behaviour};
pub use ping::Event;

View File

@@ -5,7 +5,7 @@ use serai_client::validator_sets::primitives::{MAX_KEY_SHARES_PER_SET, Validator
use futures_lite::FutureExt;
use tributary::{ReadWrite, TransactionTrait, Block, Tributary, TributaryReader};
use tributary_sdk::{ReadWrite, TransactionTrait, Block, Tributary, TributaryReader};
use serai_db::*;
use serai_task::ContinuallyRan;
@@ -13,7 +13,8 @@ use serai_task::ContinuallyRan;
use crate::{Heartbeat, Peer, P2p};
// Amount of blocks in a minute
const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize;
const BLOCKS_PER_MINUTE: usize =
(60 / (tributary_sdk::tendermint::TARGET_BLOCK_TIME / 1000)) as usize;
/// The minimum amount of blocks to include/included within a batch, assuming there's blocks to
/// include in the batch.
@@ -29,7 +30,7 @@ pub const MIN_BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
/// commit is `8 + (validators * 32) + (32 + (validators * 32))` (for the time, list of validators,
/// and aggregate signature). Accordingly, this should be a safe over-estimate.
pub const BATCH_SIZE_LIMIT: usize = MIN_BLOCKS_PER_BATCH *
(tributary::BLOCK_SIZE_LIMIT + 32 + ((MAX_KEY_SHARES_PER_SET as usize) * 128));
(tributary_sdk::BLOCK_SIZE_LIMIT + 32 + ((MAX_KEY_SHARES_PER_SET as usize) * 128));
/// Sends a heartbeat to other validators on regular intervals informing them of our Tributary's
/// tip.

View File

@@ -10,7 +10,7 @@ use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use serai_db::Db;
use tributary::{ReadWrite, TransactionTrait, Tributary, TributaryReader};
use tributary_sdk::{ReadWrite, TransactionTrait, Tributary, TributaryReader};
use serai_cosign::{SignedCosign, Cosigning};
use tokio::sync::{mpsc, oneshot};
@@ -49,7 +49,9 @@ pub trait Peer<'a>: Send {
}
/// The representation of the P2P network.
pub trait P2p: Send + Sync + Clone + tributary::P2p + serai_cosign::RequestNotableCosigns {
pub trait P2p:
Send + Sync + Clone + tributary_sdk::P2p + serai_cosign::RequestNotableCosigns
{
/// The representation of a peer.
type Peer<'a>: Peer<'a>;

79
coordinator/src/db.rs Normal file
View File

@@ -0,0 +1,79 @@
use std::{path::Path, fs};
pub(crate) use serai_db::{Get, DbTxn, Db as DbTrait};
use serai_db::{create_db, db_channel};
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
};
use serai_cosign::CosignIntent;
use serai_coordinator_substrate::NewSetInformation;
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
pub(crate) type Db = serai_db::ParityDb;
#[cfg(feature = "rocksdb")]
pub(crate) type Db = serai_db::RocksDB;
#[allow(unused_variables, unreachable_code)]
fn db(path: &str) -> Db {
{
let path: &Path = path.as_ref();
// This may error if this path already exists, which we shouldn't propagate/panic on. If this
// is a problem (such as we don't have the necessary permissions to write to this path), we
// expect the following DB opening to error.
let _: Result<_, _> = fs::create_dir_all(path.parent().unwrap());
}
#[cfg(all(feature = "parity-db", feature = "rocksdb"))]
panic!("built with parity-db and rocksdb");
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
let db = serai_db::new_parity_db(path);
#[cfg(feature = "rocksdb")]
let db = serai_db::new_rocksdb(path);
db
}
pub(crate) fn coordinator_db() -> Db {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
db(&format!("{root_path}/coordinator/db"))
}
fn tributary_db_folder(set: ValidatorSet) -> String {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
let network = match set.network {
NetworkId::Serai => panic!("creating Tributary for the Serai network"),
NetworkId::Bitcoin => "Bitcoin",
NetworkId::Ethereum => "Ethereum",
NetworkId::Monero => "Monero",
};
format!("{root_path}/tributary-{network}-{}", set.session.0)
}
pub(crate) fn tributary_db(set: ValidatorSet) -> Db {
db(&format!("{}/db", tributary_db_folder(set)))
}
pub(crate) fn prune_tributary_db(set: ValidatorSet) {
log::info!("pruning data directory for tributary {set:?}");
let db = tributary_db_folder(set);
if fs::exists(&db).expect("couldn't check if tributary DB exists") {
fs::remove_dir_all(db).unwrap();
}
}
create_db! {
Coordinator {
ActiveTributaries: () -> Vec<NewSetInformation>,
RetiredTributary: (network: NetworkId) -> Session,
}
}
db_channel! {
Coordinator {
TributaryCleanup: () -> ValidatorSet,
PendingCosigns: (set: ValidatorSet) -> CosignIntent,
}
}

View File

@@ -1,10 +1,9 @@
use core::{marker::PhantomData, ops::Deref, time::Duration};
use std::{sync::Arc, time::Instant, collections::HashMap};
use core::{ops::Deref, time::Duration};
use std::{sync::Arc, time::Instant};
use zeroize::{Zeroize, Zeroizing};
use rand_core::{RngCore, OsRng};
use blake2::{digest::typenum::U32, Digest, Blake2s};
use ciphersuite::{
group::{ff::PrimeField, GroupEncoding},
Ciphersuite, Ristretto,
@@ -12,23 +11,22 @@ use ciphersuite::{
use tokio::sync::mpsc;
use scale::Encode;
use serai_client::{
primitives::{NetworkId, PublicKey, SeraiAddress},
validator_sets::primitives::ValidatorSet,
Serai,
};
use serai_client::{primitives::PublicKey, Serai};
use message_queue::{Service, client::MessageQueue};
use ::tributary::Tributary;
use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{SignedCosign, Cosigning};
use serai_coordinator_substrate::{NewSetInformation, CanonicalEventStream, EphemeralEventStream};
use serai_coordinator_substrate::{CanonicalEventStream, EphemeralEventStream, SignSlashReport};
use serai_coordinator_tributary::Transaction;
mod db;
use db::*;
mod tributary;
use tributary::{Transaction, ScanTributaryTask};
mod substrate;
use substrate::SubstrateTask;
mod p2p {
pub use serai_coordinator_p2p::*;
@@ -43,38 +41,6 @@ mod p2p {
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
type Db = serai_db::ParityDb;
#[cfg(feature = "rocksdb")]
type Db = serai_db::RocksDB;
#[allow(unused_variables, unreachable_code)]
fn db(path: &str) -> Db {
#[cfg(all(feature = "parity-db", feature = "rocksdb"))]
panic!("built with parity-db and rocksdb");
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
let db = serai_db::new_parity_db(path);
#[cfg(feature = "rocksdb")]
let db = serai_db::new_rocksdb(path);
db
}
fn coordinator_db() -> Db {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
db(&format!("{root_path}/coordinator"))
}
fn tributary_db(set: ValidatorSet) -> Db {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
let network = match set.network {
NetworkId::Serai => panic!("creating Tributary for the Serai network"),
NetworkId::Bitcoin => "Bitcoin",
NetworkId::Ethereum => "Ethereum",
NetworkId::Monero => "Monero",
};
db(&format!("{root_path}/tributary-{network}-{}", set.session.0))
}
async fn serai() -> Arc<Serai> {
const SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(10);
const MAX_SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(300);
@@ -97,7 +63,6 @@ async fn serai() -> Arc<Serai> {
}
}
// TODO: intended_cosigns
fn spawn_cosigning(
db: impl serai_db::Db,
serai: Arc<Serai>,
@@ -134,87 +99,6 @@ fn spawn_cosigning(
});
}
/// Spawn an existing Tributary.
///
/// This will spawn the Tributary, the Tributary scanning task, and inform the P2P network.
async fn spawn_tributary<P: p2p::P2p>(
db: Db,
p2p: P,
p2p_add_tributary: mpsc::UnboundedSender<Tributary<Db, Transaction, P>>,
set: NewSetInformation,
serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
) {
let genesis = <[u8; 32]>::from(Blake2s::<U32>::digest((set.serai_block, set.set).encode()));
// Since the Serai block will be finalized, then cosigned, before we handle this, this time will
// be a couple of minutes stale. While the Tributary will still function with a start time in the
// past, the Tributary will immediately incur round timeouts. We reduce these by adding a
// constant delay of a couple of minutes.
const TRIBUTARY_START_TIME_DELAY: u64 = 120;
let start_time = set.declaration_time + TRIBUTARY_START_TIME_DELAY;
let mut tributary_validators = Vec::with_capacity(set.validators.len());
let mut validators = Vec::with_capacity(set.validators.len());
let mut total_weight = 0;
let mut validator_weights = HashMap::with_capacity(set.validators.len());
for (validator, weight) in set.validators {
let validator_key = <Ristretto as Ciphersuite>::read_G(&mut validator.0.as_slice())
.expect("Serai validator had an invalid public key");
let validator = SeraiAddress::from(validator);
let weight = u64::from(weight);
tributary_validators.push((validator_key, weight));
validators.push(validator);
total_weight += weight;
validator_weights.insert(validator, weight);
}
let tributary_db = tributary_db(set.set);
let tributary = Tributary::<_, Transaction, _>::new(
tributary_db.clone(),
genesis,
start_time,
serai_key,
tributary_validators,
p2p,
)
.await
.unwrap();
let reader = tributary.reader();
p2p_add_tributary.send(tributary).expect("p2p's add_tributary channel was closed?");
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn(
(ScanTributaryTask {
cosign_db: db,
tributary_db,
set: set.set,
validators,
total_weight,
validator_weights,
tributary: reader,
_p2p: PhantomData::<P>,
})
.continually_run(scan_tributary_task_def, vec![todo!("TODO")]),
);
// TODO^ On Tributary block, drain this task's ProcessorMessages
// Have the tributary scanner run as soon as there's a new block
// TODO: Implement retiry, this will hold the tributary/handle indefinitely
tokio::spawn(async move {
loop {
tributary
.next_block_notification()
.await
.await
.map_err(|_| ())
// unreachable since this owns the tributary object and doesn't drop it
.expect("tributary was dropped causing notification to error");
scan_tributary_task.run_now();
}
});
}
#[tokio::main]
async fn main() {
// Override the panic handler with one which will panic if any tokio task panics
@@ -254,10 +138,35 @@ async fn main() {
};
// Open the database
let db = coordinator_db();
let mut db = coordinator_db();
let existing_tributaries_at_boot = {
let mut txn = db.txn();
// Cleanup all historic Tributaries
while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) {
prune_tributary_db(to_cleanup);
// Drain the cosign intents created for this set
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
// Remove the SignSlashReport notification
SignSlashReport::try_recv(&mut txn, to_cleanup);
}
// Remove retired Tributaries from ActiveTributaries
let mut active_tributaries = ActiveTributaries::get(&txn).unwrap_or(vec![]);
active_tributaries.retain(|tributary| {
RetiredTributary::get(&txn, tributary.set.network).map(|session| session.0) <
Some(tributary.set.session.0)
});
ActiveTributaries::set(&mut txn, &active_tributaries);
txn.commit();
active_tributaries
};
// Connect to the message-queue
let message_queue = MessageQueue::from_env(Service::Coordinator);
let message_queue = Arc::new(MessageQueue::from_env(Service::Coordinator));
// Connect to the Serai node
let serai = serai().await;
@@ -290,14 +199,12 @@ async fn main() {
p2p
};
// TODO: p2p_add_tributary_send, p2p_retire_tributary_send
// Spawn the Substrate scanners
// TODO: Canonical, NewSet, SignSlashReport
let (substrate_task_def, substrate_task) = Task::new();
let (substrate_canonical_task_def, substrate_canonical_task) = Task::new();
tokio::spawn(
CanonicalEventStream::new(db.clone(), serai.clone())
.continually_run(substrate_canonical_task_def, todo!("TODO")),
.continually_run(substrate_canonical_task_def, vec![substrate_task.clone()]),
);
let (substrate_ephemeral_task_def, substrate_ephemeral_task) = Task::new();
tokio::spawn(
@@ -306,7 +213,7 @@ async fn main() {
serai.clone(),
PublicKey::from_raw((<Ristretto as Ciphersuite>::generator() * serai_key.deref()).to_bytes()),
)
.continually_run(substrate_ephemeral_task_def, todo!("TODO")),
.continually_run(substrate_ephemeral_task_def, vec![substrate_task]),
);
// Spawn the cosign handler
@@ -321,9 +228,33 @@ async fn main() {
signed_cosigns_recv,
);
// TODO: Reload tributaries from disk, handle processor messages
// Spawn all Tributaries on-disk
for tributary in existing_tributaries_at_boot {
crate::tributary::spawn_tributary(
db.clone(),
message_queue.clone(),
p2p.clone(),
&p2p_add_tributary_send,
tributary,
serai_key.clone(),
)
.await;
}
// TODO: On NewSet, save to DB, send KeyGen, spawn tributary task, inform P2P network
// Handle the events from the Substrate scanner
tokio::spawn(
(SubstrateTask {
serai_key: serai_key.clone(),
db: db.clone(),
message_queue: message_queue.clone(),
p2p: p2p.clone(),
p2p_add_tributary: p2p_add_tributary_send.clone(),
p2p_retire_tributary: p2p_retire_tributary_send.clone(),
})
.continually_run(substrate_task_def, vec![]),
);
// TODO: Handle processor messages
todo!("TODO")
}

View File

@@ -0,0 +1,160 @@
use core::future::Future;
use std::sync::Arc;
use zeroize::Zeroizing;
use ciphersuite::{Ciphersuite, Ristretto};
use tokio::sync::mpsc;
use serai_db::{DbTxn, Db as DbTrait};
use serai_client::validator_sets::primitives::{Session, ValidatorSet};
use message_queue::{Service, Metadata, client::MessageQueue};
use tributary_sdk::Tributary;
use serai_task::ContinuallyRan;
use serai_coordinator_tributary::Transaction;
use serai_coordinator_p2p::P2p;
use crate::Db;
pub(crate) struct SubstrateTask<P: P2p> {
pub(crate) serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
pub(crate) db: Db,
pub(crate) message_queue: Arc<MessageQueue>,
pub(crate) p2p: P,
pub(crate) p2p_add_tributary:
mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>,
pub(crate) p2p_retire_tributary: mpsc::UnboundedSender<ValidatorSet>,
}
impl<P: P2p> ContinuallyRan for SubstrateTask<P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let mut made_progress = false;
// Handle the Canonical events
for network in serai_client::primitives::NETWORKS {
loop {
let mut txn = self.db.txn();
let Some(msg) = serai_coordinator_substrate::Canonical::try_recv(&mut txn, network)
else {
break;
};
match msg {
// TODO: Stop trying to confirm the DKG
messages::substrate::CoordinatorMessage::SetKeys { .. } => todo!("TODO"),
messages::substrate::CoordinatorMessage::SlashesReported { session } => {
let prior_retired = crate::db::RetiredTributary::get(&txn, network);
let next_to_be_retired =
prior_retired.map(|session| Session(session.0 + 1)).unwrap_or(Session(0));
assert_eq!(session, next_to_be_retired);
crate::db::RetiredTributary::set(&mut txn, network, &session);
self
.p2p_retire_tributary
.send(ValidatorSet { network, session })
.expect("p2p retire_tributary channel dropped?");
}
messages::substrate::CoordinatorMessage::Block { .. } => {}
}
let msg = messages::CoordinatorMessage::from(msg);
let metadata = Metadata {
from: Service::Coordinator,
to: Service::Processor(network),
intent: msg.intent(),
};
let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible
self.message_queue.queue(metadata, msg).await;
txn.commit();
made_progress = true;
}
}
// Handle the NewSet events
loop {
let mut txn = self.db.txn();
let Some(new_set) = serai_coordinator_substrate::NewSet::try_recv(&mut txn) else { break };
if let Some(historic_session) = new_set.set.session.0.checked_sub(2) {
// We should have retired this session if we're here
if crate::db::RetiredTributary::get(&txn, new_set.set.network).map(|session| session.0) <
Some(historic_session)
{
/*
If we haven't, it's because we're processing the NewSet event before the retiry
event from the Canonical event stream. This happens if the Canonical event, and
then the NewSet event, is fired while we're already iterating over NewSet events.
We break, dropping the txn, restoring this NewSet to the database, so we'll only
handle it once a future iteration of this loop handles the retiry event.
*/
break;
}
/*
Queue this historical Tributary for deletion.
We explicitly don't queue this upon Tributary retire, instead here, to give time to
investigate retired Tributaries if questions are raised post-retiry. This gives a
week (the duration of the following session) after the Tributary has been retired to
make a backup of the data directory for any investigations.
*/
crate::db::TributaryCleanup::send(
&mut txn,
&ValidatorSet { network: new_set.set.network, session: Session(historic_session) },
);
}
// Save this Tributary as active to the database
{
let mut active_tributaries =
crate::db::ActiveTributaries::get(&txn).unwrap_or(Vec::with_capacity(1));
active_tributaries.push(new_set.clone());
crate::db::ActiveTributaries::set(&mut txn, &active_tributaries);
}
// Send GenerateKey to the processor
let msg = messages::key_gen::CoordinatorMessage::GenerateKey {
session: new_set.set.session,
threshold: new_set.threshold,
evrf_public_keys: new_set.evrf_public_keys.clone(),
};
let msg = messages::CoordinatorMessage::from(msg);
let metadata = Metadata {
from: Service::Coordinator,
to: Service::Processor(new_set.set.network),
intent: msg.intent(),
};
let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible
self.message_queue.queue(metadata, msg).await;
// Commit the transaction for all of this
txn.commit();
// Now spawn the Tributary
// If we reboot after committing the txn, but before this is called, this will be called
// on boot
crate::tributary::spawn_tributary(
self.db.clone(),
self.message_queue.clone(),
self.p2p.clone(),
&self.p2p_add_tributary,
new_set,
self.serai_key.clone(),
)
.await;
made_progress = true;
}
Ok(made_progress)
}
}
}

View File

@@ -0,0 +1,347 @@
use core::{future::Future, time::Duration};
use std::sync::Arc;
use zeroize::Zeroizing;
use rand_core::OsRng;
use blake2::{digest::typenum::U32, Digest, Blake2s};
use ciphersuite::{Ciphersuite, Ristretto};
use tokio::sync::mpsc;
use serai_db::{DbTxn, Db as DbTrait};
use scale::Encode;
use serai_client::validator_sets::primitives::ValidatorSet;
use tributary_sdk::{TransactionError, ProvidedError, Tributary};
use serai_task::{Task, TaskHandle, ContinuallyRan};
use message_queue::{Service, Metadata, client::MessageQueue};
use serai_cosign::Cosigning;
use serai_coordinator_substrate::{NewSetInformation, SignSlashReport};
use serai_coordinator_tributary::{Transaction, ProcessorMessages, ScanTributaryTask};
use serai_coordinator_p2p::P2p;
use crate::Db;
/// Provides Cosign/Cosigned Transactions onto the Tributary.
pub(crate) struct ProvideCosignCosignedTransactionsTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
set: NewSetInformation,
tributary: Tributary<TD, Transaction, P>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan
for ProvideCosignCosignedTransactionsTask<CD, TD, P>
{
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
/// Provide a Provided Transaction to the Tributary.
///
/// This is not a well-designed function. This is specific to the context in which its called,
/// within this file. It should only be considered an internal helper for this domain alone.
async fn provide_transaction<TD: DbTrait, P: P2p>(
set: ValidatorSet,
tributary: &Tributary<TD, Transaction, P>,
tx: Transaction,
) {
match tributary.provide_transaction(tx.clone()).await {
// The Tributary uses its own DB, so we may provide this multiple times if we reboot before
// committing the txn which provoked this
Ok(()) | Err(ProvidedError::AlreadyProvided) => {}
Err(ProvidedError::NotProvided) => {
panic!("providing a Transaction which wasn't a Provided transaction: {tx:?}");
}
Err(ProvidedError::InvalidProvided(e)) => {
panic!("providing an invalid Provided transaction, tx: {tx:?}, error: {e:?}")
}
Err(ProvidedError::LocalMismatchesOnChain) => loop {
// The Tributary's scan task won't advance if we don't have the Provided transactions
// present on-chain, and this enters an infinite loop to block the calling task from
// advancing
log::error!(
"Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary",
set,
tx,
);
// Print this every five minutes as this does need to be handled
tokio::time::sleep(Duration::from_secs(5 * 60)).await;
},
}
}
async move {
let mut made_progress = false;
// Check if we produced any cosigns we were supposed to
let mut pending_notable_cosign = false;
loop {
let mut txn = self.db.txn();
// Fetch the next cosign this tributary should handle
let Some(cosign) = crate::PendingCosigns::try_recv(&mut txn, self.set.set) else { break };
pending_notable_cosign = cosign.notable;
// If we (Serai) haven't cosigned this block, break as this is still pending
let Ok(latest) = Cosigning::<CD>::latest_cosigned_block_number(&txn) else { break };
if latest < cosign.block_number {
break;
}
// Because we've cosigned it, provide the TX for that
provide_transaction(
self.set.set,
&self.tributary,
Transaction::Cosigned { substrate_block_hash: cosign.block_hash },
)
.await;
// Clear pending_notable_cosign since this cosign isn't pending
pending_notable_cosign = false;
// Commit the txn to clear this from PendingCosigns
txn.commit();
made_progress = true;
}
// If we don't have any notable cosigns pending, provide the next set of cosign intents
if !pending_notable_cosign {
let mut txn = self.db.txn();
// intended_cosigns will only yield up to and including the next notable cosign
for cosign in Cosigning::<CD>::intended_cosigns(&mut txn, self.set.set) {
// Flag this cosign as pending
crate::PendingCosigns::send(&mut txn, self.set.set, &cosign);
// Provide the transaction to queue it for work
provide_transaction(
self.set.set,
&self.tributary,
Transaction::Cosign { substrate_block_hash: cosign.block_hash },
)
.await;
}
txn.commit();
made_progress = true;
}
Ok(made_progress)
}
}
}
/// Takes the messages from ScanTributaryTask and publishes them to the message-queue.
pub(crate) struct TributaryProcessorMessagesTask<TD: DbTrait> {
tributary_db: TD,
set: ValidatorSet,
message_queue: Arc<MessageQueue>,
}
impl<TD: DbTrait> ContinuallyRan for TributaryProcessorMessagesTask<TD> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let mut made_progress = false;
loop {
let mut txn = self.tributary_db.txn();
let Some(msg) = ProcessorMessages::try_recv(&mut txn, self.set) else { break };
let metadata = Metadata {
from: Service::Coordinator,
to: Service::Processor(self.set.network),
intent: msg.intent(),
};
let msg = borsh::to_vec(&msg).unwrap();
// TODO: Make this fallible
self.message_queue.queue(metadata, msg).await;
txn.commit();
made_progress = true;
}
Ok(made_progress)
}
}
}
/// Checks for the notification to sign a slash report and does so if present.
pub(crate) struct SignSlashReportTask<CD: DbTrait, TD: DbTrait, P: P2p> {
db: CD,
tributary_db: TD,
tributary: Tributary<TD, Transaction, P>,
set: NewSetInformation,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
}
impl<CD: DbTrait, TD: DbTrait, P: P2p> ContinuallyRan for SignSlashReportTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let mut txn = self.db.txn();
let Some(()) = SignSlashReport::try_recv(&mut txn, self.set.set) else { return Ok(false) };
// Fetch the slash report for this Tributary
let mut tx =
serai_coordinator_tributary::slash_report_transaction(&self.tributary_db, &self.set);
tx.sign(&mut OsRng, self.tributary.genesis(), &self.key);
let res = self.tributary.add_transaction(tx.clone()).await;
match &res {
// Fresh publication, already published
Ok(true | false) => {}
Err(
TransactionError::TooLargeTransaction |
TransactionError::InvalidSigner |
TransactionError::InvalidNonce |
TransactionError::InvalidSignature |
TransactionError::InvalidContent,
) => {
panic!("created an invalid SlashReport transaction, tx: {tx:?}, err: {res:?}");
}
// We've published too many transactions recently
// Drop this txn to try to publish it again later on a future iteration
Err(TransactionError::TooManyInMempool) => return Ok(false),
// This isn't a Provided transaction so this should never be hit
Err(TransactionError::ProvidedAddedToMempool) => unreachable!(),
}
txn.commit();
Ok(true)
}
}
}
/// Run the scan task whenever the Tributary adds a new block.
async fn scan_on_new_block<CD: DbTrait, TD: DbTrait, P: P2p>(
db: CD,
set: ValidatorSet,
tributary: Tributary<TD, Transaction, P>,
scan_tributary_task: TaskHandle,
tasks_to_keep_alive: Vec<TaskHandle>,
) {
loop {
// Break once this Tributary is retired
if crate::RetiredTributary::get(&db, set.network).map(|session| session.0) >=
Some(set.session.0)
{
drop(tasks_to_keep_alive);
break;
}
// Have the tributary scanner run as soon as there's a new block
match tributary.next_block_notification().await.await {
Ok(()) => scan_tributary_task.run_now(),
// unreachable since this owns the tributary object and doesn't drop it
Err(_) => panic!("tributary was dropped causing notification to error"),
}
}
}
/// Spawn a Tributary.
///
/// This will:
/// - Spawn the Tributary
/// - Inform the P2P network of the Tributary
/// - Spawn the ScanTributaryTask
/// - Spawn the ProvideCosignCosignedTransactionsTask
/// - Spawn the TributaryProcessorMessagesTask
/// - Spawn the SignSlashReportTask
/// - Iterate the scan task whenever a new block occurs (not just on the standard interval)
pub(crate) async fn spawn_tributary<P: P2p>(
db: Db,
message_queue: Arc<MessageQueue>,
p2p: P,
p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>,
set: NewSetInformation,
serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
) {
// Don't spawn retired Tributaries
if crate::db::RetiredTributary::get(&db, set.set.network).map(|session| session.0) >=
Some(set.set.session.0)
{
return;
}
let genesis = <[u8; 32]>::from(Blake2s::<U32>::digest((set.serai_block, set.set).encode()));
// Since the Serai block will be finalized, then cosigned, before we handle this, this time will
// be a couple of minutes stale. While the Tributary will still function with a start time in the
// past, the Tributary will immediately incur round timeouts. We reduce these by adding a
// constant delay of a couple of minutes.
const TRIBUTARY_START_TIME_DELAY: u64 = 120;
let start_time = set.declaration_time + TRIBUTARY_START_TIME_DELAY;
let mut tributary_validators = Vec::with_capacity(set.validators.len());
for (validator, weight) in set.validators.iter().copied() {
let validator_key = <Ristretto as Ciphersuite>::read_G(&mut validator.0.as_slice())
.expect("Serai validator had an invalid public key");
let weight = u64::from(weight);
tributary_validators.push((validator_key, weight));
}
// Spawn the Tributary
let tributary_db = crate::db::tributary_db(set.set);
let tributary = Tributary::new(
tributary_db.clone(),
genesis,
start_time,
serai_key.clone(),
tributary_validators,
p2p,
)
.await
.unwrap();
let reader = tributary.reader();
// Inform the P2P network
p2p_add_tributary
.send((set.set, tributary.clone()))
.expect("p2p's add_tributary channel was closed?");
// Spawn the task to provide Cosign/Cosigned transactions onto the Tributary
let (provide_cosign_cosigned_transactions_task_def, provide_cosign_cosigned_transactions_task) =
Task::new();
tokio::spawn(
(ProvideCosignCosignedTransactionsTask {
db: db.clone(),
set: set.clone(),
tributary: tributary.clone(),
})
.continually_run(provide_cosign_cosigned_transactions_task_def, vec![]),
);
// Spawn the task to send all messages from the Tributary scanner to the message-queue
let (scan_tributary_messages_task_def, scan_tributary_messages_task) = Task::new();
tokio::spawn(
(TributaryProcessorMessagesTask {
tributary_db: tributary_db.clone(),
set: set.set,
message_queue,
})
.continually_run(scan_tributary_messages_task_def, vec![]),
);
// Spawn the scan task
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
tokio::spawn(
ScanTributaryTask::<_, _, P>::new(db.clone(), tributary_db.clone(), &set, reader)
// This is the only handle for this TributaryProcessorMessagesTask, so when this task is
// dropped, it will be too
.continually_run(scan_tributary_task_def, vec![scan_tributary_messages_task]),
);
// Spawn the sign slash report task
let (sign_slash_report_task_def, sign_slash_report_task) = Task::new();
tokio::spawn(
(SignSlashReportTask {
db: db.clone(),
tributary_db,
tributary: tributary.clone(),
set: set.clone(),
key: serai_key,
})
.continually_run(sign_slash_report_task_def, vec![]),
);
// Whenever a new block occurs, immediately run the scan task
// This function also preserves the ProvideCosignCosignedTransactionsTask handle until the
// Tributary is retired, ensuring it isn't dropped prematurely and that the task don't run ad
// infinitum
tokio::spawn(scan_on_new_block(
db,
set.set,
tributary,
scan_tributary_task,
vec![provide_cosign_cosigned_transactions_task, sign_slash_report_task],
));
}

View File

@@ -1,7 +0,0 @@
mod transaction;
pub use transaction::Transaction;
mod db;
mod scan;
pub(crate) use scan::ScanTributaryTask;

View File

@@ -1,449 +0,0 @@
use core::{marker::PhantomData, future::Future};
use std::collections::HashMap;
use ciphersuite::group::GroupEncoding;
use serai_client::{
primitives::SeraiAddress,
validator_sets::primitives::{ValidatorSet, Slash},
};
use tributary::{
Signed as TributarySigned, TransactionKind, TransactionTrait,
Transaction as TributaryTransaction, Block, TributaryReader,
tendermint::{
tx::{TendermintTx, Evidence, decode_signed_message},
TendermintNetwork,
},
};
use serai_db::*;
use serai_task::ContinuallyRan;
use messages::sign::VariantSignId;
use serai_cosign::Cosigning;
use crate::{
p2p::P2p,
tributary::{
db::*,
transaction::{SigningProtocolRound, Signed, Transaction},
},
};
struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> {
_p2p: PhantomData<P>,
cosign_db: &'a CD,
tributary_txn: &'a mut TDT,
set: ValidatorSet,
validators: &'a [SeraiAddress],
total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>,
tributary: &'a TributaryReader<TD, Transaction>,
}
impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.tributary_txn, self.set) {
return;
}
// Start cosigning the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
else {
return;
};
let Some(substrate_block_number) =
Cosigning::<CD>::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign)
else {
// This is a valid panic as we shouldn't be scanning this block if we didn't provide all
// Provided transactions within it, and the block to cosign is a Provided transaction
panic!("cosigning a block our cosigner didn't index")
};
// Mark us as actively cosigning
TributaryDb::start_cosigning(self.tributary_txn, self.set, substrate_block_number);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.tributary_txn,
self.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session,
block_number: substrate_block_number,
block: latest_substrate_block_to_cosign,
},
);
}
fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) {
let signer = |signed: Signed| SeraiAddress(signed.signer.to_bytes());
if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() {
// Don't handle transactions from those fatally slashed
// TODO: The fact they can publish these TXs makes this a notable spam vector
if TributaryDb::is_fatally_slashed(
self.tributary_txn,
self.set,
SeraiAddress(signer.to_bytes()),
) {
return;
}
}
match tx {
// Accumulate this vote and fatally slash the participant if past the threshold
Transaction::RemoveParticipant { participant, signed } => {
let signer = signer(signed);
// Check the participant voted to be removed actually exists
if !self.validators.iter().any(|validator| *validator == participant) {
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
signer,
"voted to remove non-existent participant",
);
return;
}
match TributaryDb::accumulate(
self.tributary_txn,
self.set,
self.validators,
self.total_weight,
block_number,
Topic::RemoveParticipant { participant },
signer,
self.validator_weights[&signer],
&(),
) {
DataSet::None => {}
DataSet::Participating(_) => {
TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove");
}
};
}
// Send the participation to the processor
Transaction::DkgParticipation { participation, signed } => {
TributaryDb::send_message(
self.tributary_txn,
self.set,
messages::key_gen::CoordinatorMessage::Participation {
session: self.set.session,
participant: todo!("TODO"),
participation,
},
);
}
Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed } => {
// Accumulate the preprocesses into our own FROST attempt manager
todo!("TODO")
}
Transaction::DkgConfirmationShare { attempt, share, signed } => {
// Accumulate the shares into our own FROST attempt manager
todo!("TODO")
}
Transaction::Cosign { substrate_block_hash } => {
// Update the latest intended-to-be-cosigned Substrate block
TributaryDb::set_latest_substrate_block_to_cosign(
self.tributary_txn,
self.set,
substrate_block_hash,
);
// Start a new cosign if we weren't already working on one
self.potentially_start_cosign();
}
Transaction::Cosigned { substrate_block_hash } => {
TributaryDb::finish_cosigning(self.tributary_txn, self.set);
// Fetch the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
else {
return;
};
// If this is the block we just cosigned, return, preventing us from signing it again
if latest_substrate_block_to_cosign == substrate_block_hash {
return;
}
// Since we do have a new cosign to work on, start it
self.potentially_start_cosign();
}
Transaction::SubstrateBlock { hash } => {
// Whitelist all of the IDs this Substrate block causes to be signed
todo!("TODO")
}
Transaction::Batch { hash } => {
// Whitelist the signing of this batch, publishing our own preprocess
todo!("TODO")
}
Transaction::SlashReport { slash_points, signed } => {
let signer = signer(signed);
if slash_points.len() != self.validators.len() {
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
signer,
"slash report was for a distinct amount of signers",
);
return;
}
// Accumulate, and if past the threshold, calculate *the* slash report and start signing it
match TributaryDb::accumulate(
self.tributary_txn,
self.set,
self.validators,
self.total_weight,
block_number,
Topic::SlashReport,
signer,
self.validator_weights[&signer],
&slash_points,
) {
DataSet::None => {}
DataSet::Participating(data_set) => {
// Find the median reported slashes for this validator
/*
TODO: This lets 34% perform a fatal slash. That shouldn't be allowed. We need
to accept slash reports for a period past the threshold, and only fatally slash if we
have a supermajority agree the slash should be fatal. If there isn't a supermajority,
but the median believe the slash should be fatal, we need to fallback to a large
constant.
Also, TODO, each slash point should probably be considered as
`MAX_KEY_SHARES_PER_SET * BLOCK_TIME` seconds of downtime. As this time crosses
various thresholds (1 day, 3 days, etc), a multiplier should be attached.
*/
let mut median_slash_report = Vec::with_capacity(self.validators.len());
for i in 0 .. self.validators.len() {
let mut this_validator =
data_set.values().map(|report| report[i]).collect::<Vec<_>>();
this_validator.sort_unstable();
// Choose the median, where if there are two median values, the lower one is chosen
let median_index = if (this_validator.len() % 2) == 1 {
this_validator.len() / 2
} else {
(this_validator.len() / 2) - 1
};
median_slash_report.push(this_validator[median_index]);
}
// We only publish slashes for the `f` worst performers to:
// 1) Effect amnesty if there were network disruptions which affected everyone
// 2) Ensure the signing threshold doesn't have a disincentive to do their job
// Find the worst performer within the signing threshold's slash points
let f = (self.validators.len() - 1) / 3;
let worst_validator_in_supermajority_slash_points = {
let mut sorted_slash_points = median_slash_report.clone();
sorted_slash_points.sort_unstable();
// This won't be a valid index if `f == 0`, which means we don't have any validators
// to slash
let index_of_first_validator_to_slash = self.validators.len() - f;
let index_of_worst_validator_in_supermajority = index_of_first_validator_to_slash - 1;
sorted_slash_points[index_of_worst_validator_in_supermajority]
};
// Perform the amortization
for slash_points in &mut median_slash_report {
*slash_points =
slash_points.saturating_sub(worst_validator_in_supermajority_slash_points)
}
let amortized_slash_report = median_slash_report;
// Create the resulting slash report
let mut slash_report = vec![];
for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) {
if points != 0 {
slash_report.push(Slash { key: validator.into(), points });
}
}
assert!(slash_report.len() <= f);
// Recognize the topic for signing the slash report
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
Topic::Sign {
id: VariantSignId::SlashReport,
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.tributary_txn,
self.set,
messages::coordinator::CoordinatorMessage::SignSlashReport {
session: self.set.session,
report: slash_report,
},
);
}
};
}
Transaction::Sign { id, attempt, round, data, signed } => {
let topic = Topic::Sign { id, attempt, round };
let signer = signer(signed);
if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] {
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
signer,
"signer signed with a distinct amount of key shares than they had key shares",
);
return;
}
match TributaryDb::accumulate(
self.tributary_txn,
self.set,
self.validators,
self.total_weight,
block_number,
topic,
signer,
self.validator_weights[&signer],
&data,
) {
DataSet::None => {}
DataSet::Participating(data_set) => {
let id = topic.sign_id(self.set).expect("Topic::Sign didn't have SignId");
let flatten_data_set = |data_set| todo!("TODO");
let data_set = flatten_data_set(data_set);
TributaryDb::send_message(
self.tributary_txn,
self.set,
match round {
SigningProtocolRound::Preprocess => {
messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set }
}
SigningProtocolRound::Share => {
messages::sign::CoordinatorMessage::Shares { id, shares: data_set }
}
},
)
}
};
}
}
}
fn handle_block(mut self, block_number: u64, block: Block<Transaction>) {
TributaryDb::start_of_block(self.tributary_txn, self.set, block_number);
for tx in block.transactions {
match tx {
TributaryTransaction::Tendermint(TendermintTx::SlashEvidence(ev)) => {
// Since the evidence is on the chain, it will have already been validated
// We can just punish the signer
let data = match ev {
Evidence::ConflictingMessages(first, second) => (first, Some(second)),
Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None),
};
let msgs = (
decode_signed_message::<TendermintNetwork<TD, Transaction, P>>(&data.0).unwrap(),
if data.1.is_some() {
Some(
decode_signed_message::<TendermintNetwork<TD, Transaction, P>>(&data.1.unwrap())
.unwrap(),
)
} else {
None
},
);
// Since anything with evidence is fundamentally faulty behavior, not just temporal
// errors, mark the node as fatally slashed
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
SeraiAddress(msgs.0.msg.sender),
&format!("invalid tendermint messages: {msgs:?}"),
);
}
TributaryTransaction::Application(tx) => {
self.handle_application_tx(block_number, tx);
}
}
}
}
}
pub(crate) struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> {
pub(crate) cosign_db: CD,
pub(crate) tributary_db: TD,
pub(crate) set: ValidatorSet,
pub(crate) validators: Vec<SeraiAddress>,
pub(crate) total_weight: u64,
pub(crate) validator_weights: HashMap<SeraiAddress, u64>,
pub(crate) tributary: TributaryReader<TD, Transaction>,
pub(crate) _p2p: PhantomData<P>,
}
impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)
.unwrap_or((0, self.tributary.genesis()));
let mut made_progess = false;
while let Some(next) = self.tributary.block_after(&last_block_hash) {
let block = self.tributary.block(&next).unwrap();
let block_number = last_block_number + 1;
let block_hash = block.hash();
// Make sure we have all of the provided transactions for this block
for tx in &block.transactions {
let TransactionKind::Provided(order) = tx.kind() else {
continue;
};
// make sure we have all the provided txs in this block locally
if !self.tributary.locally_provided_txs_in_block(&block_hash, order) {
return Err(format!(
"didn't have the provided Transactions on-chain for set (ephemeral error): {:?}",
self.set
));
}
}
let mut tributary_txn = self.tributary_db.txn();
(ScanBlock {
_p2p: PhantomData::<P>,
cosign_db: &self.cosign_db,
tributary_txn: &mut tributary_txn,
set: self.set,
validators: &self.validators,
total_weight: self.total_weight,
validator_weights: &self.validator_weights,
tributary: &self.tributary,
})
.handle_block(block_number, block);
TributaryDb::set_last_handled_tributary_block(
&mut tributary_txn,
self.set,
block_number,
block_hash,
);
last_block_number = block_number;
last_block_hash = block_hash;
tributary_txn.commit();
made_progess = true;
}
Ok(made_progess)
}
}
}

View File

@@ -1,338 +0,0 @@
use core::{ops::Deref, fmt::Debug};
use std::io;
use zeroize::Zeroizing;
use rand_core::{RngCore, CryptoRng};
use blake2::{digest::typenum::U32, Digest, Blake2b};
use ciphersuite::{
group::{ff::Field, GroupEncoding},
Ciphersuite, Ristretto,
};
use schnorr::SchnorrSignature;
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{primitives::SeraiAddress, validator_sets::primitives::MAX_KEY_SHARES_PER_SET};
use messages::sign::VariantSignId;
use tributary::{
ReadWrite,
transaction::{
Signed as TributarySigned, TransactionError, TransactionKind, Transaction as TransactionTrait,
},
};
/// The round this data is for, within a signing protocol.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)]
pub enum SigningProtocolRound {
/// A preprocess.
Preprocess,
/// A signature share.
Share,
}
impl SigningProtocolRound {
fn nonce(&self) -> u32 {
match self {
SigningProtocolRound::Preprocess => 0,
SigningProtocolRound::Share => 1,
}
}
}
/// `tributary::Signed` but without the nonce.
///
/// All of our nonces are deterministic to the type of transaction and fields within.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Signed {
/// The signer.
pub signer: <Ristretto as Ciphersuite>::G,
/// The signature.
pub signature: SchnorrSignature<Ristretto>,
}
impl BorshSerialize for Signed {
fn serialize<W: io::Write>(&self, writer: &mut W) -> Result<(), io::Error> {
writer.write_all(self.signer.to_bytes().as_ref())?;
self.signature.write(writer)
}
}
impl BorshDeserialize for Signed {
fn deserialize_reader<R: io::Read>(reader: &mut R) -> Result<Self, io::Error> {
let signer = Ristretto::read_G(reader)?;
let signature = SchnorrSignature::read(reader)?;
Ok(Self { signer, signature })
}
}
impl Signed {
/// Provide a nonce to convert a `Signed` into a `tributary::Signed`.
fn nonce(&self, nonce: u32) -> TributarySigned {
TributarySigned { signer: self.signer, nonce, signature: self.signature }
}
}
/// The Tributary transaction definition used by Serai
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum Transaction {
/// A vote to remove a participant for invalid behavior
RemoveParticipant {
/// The participant to remove
participant: SeraiAddress,
/// The transaction's signer and signature
signed: Signed,
},
/// A participation in the DKG
DkgParticipation {
participation: Vec<u8>,
/// The transaction's signer and signature
signed: Signed,
},
/// The preprocess to confirm the DKG results on-chain
DkgConfirmationPreprocess {
/// The attempt number of this signing protocol
attempt: u32,
// The preprocess
preprocess: [u8; 64],
/// The transaction's signer and signature
signed: Signed,
},
/// The signature share to confirm the DKG results on-chain
DkgConfirmationShare {
/// The attempt number of this signing protocol
attempt: u32,
// The signature share
share: [u8; 32],
/// The transaction's signer and signature
signed: Signed,
},
/// Intend to co-sign a finalized Substrate block
///
/// When the time comes to start a new co-signing protocol, the most recent Substrate block will
/// be the one selected to be cosigned.
Cosign {
/// The hash of the Substrate block to sign
substrate_block_hash: [u8; 32],
},
/// The cosign for a Substrate block
///
/// After producing this cosign, we need to start work on the latest intended-to-be cosigned
/// block. That requires agreement on when this cosign was produced, which we solve by embedding
/// this cosign on chain.
///
/// We ideally don't have this transaction at all. The coordinator, without access to any of the
/// key shares, could observe the FROST signing session and determine a successful completion.
/// Unfortunately, that functionality is not present in modular-frost, so we do need to support
/// *some* asynchronous flow (where the processor or P2P network informs us of the successful
/// completion).
///
/// If we use a `Provided` transaction, that requires everyone observe this cosign.
///
/// If we use an `Unsigned` transaction, we can't verify the cosign signature inside
/// `Transaction::verify` unless we embedded the full `SignedCosign` on-chain. The issue is since
/// a Tributary is stateless with regards to the on-chain logic, including `Transaction::verify`,
/// we can't verify the signature against the group's public key unless we also include that (but
/// then we open a DoS where arbitrary group keys are specified to cause inclusion of arbitrary
/// blobs on chain).
///
/// If we use a `Signed` transaction, we mitigate the DoS risk by having someone to fatally
/// slash. We have horrible performance though as for 100 validators, all 100 will publish this
/// transaction.
///
/// We could use a signed `Unsigned` transaction, where it includes a signer and signature but
/// isn't technically a Signed transaction. This lets us de-duplicate the transaction premised on
/// its contents.
///
/// The optimal choice is likely to use a `Provided` transaction. We don't actually need to
/// observe the produced cosign (which is ephemeral). As long as it's agreed the cosign in
/// question no longer needs to produced, which would mean the cosigning protocol at-large
/// cosigning the block in question, it'd be safe to provide this and move on to the next cosign.
Cosigned { substrate_block_hash: [u8; 32] },
/// Acknowledge a Substrate block
///
/// This is provided after the block has been cosigned.
///
/// With the acknowledgement of a Substrate block, we can whitelist all the `VariantSignId`s
/// resulting from its handling.
SubstrateBlock {
/// The hash of the Substrate block
hash: [u8; 32],
},
/// Acknowledge a Batch
///
/// Once everyone has acknowledged the Batch, we can begin signing it.
Batch {
/// The hash of the Batch's serialization.
///
/// Generally, we refer to a Batch by its ID/the hash of its instructions. Here, we want to
/// ensure consensus on the Batch, and achieving consensus on its hash is the most effective
/// way to do that.
hash: [u8; 32],
},
/// Data from a signing protocol.
Sign {
/// The ID of the object being signed
id: VariantSignId,
/// The attempt number of this signing protocol
attempt: u32,
/// The round this data is for, within the signing protocol
round: SigningProtocolRound,
/// The data itself
///
/// There will be `n` blobs of data where `n` is the amount of key shares the validator sending
/// this transaction has.
data: Vec<Vec<u8>>,
/// The transaction's signer and signature
signed: Signed,
},
/// The local view of slashes observed by the transaction's sender
SlashReport {
/// The slash points accrued by each validator
slash_points: Vec<u32>,
/// The transaction's signer and signature
signed: Signed,
},
}
impl ReadWrite for Transaction {
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
borsh::from_reader(reader)
}
fn write<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
borsh::to_writer(writer, self)
}
}
impl TransactionTrait for Transaction {
fn kind(&self) -> TransactionKind {
match self {
Transaction::RemoveParticipant { participant, signed } => {
TransactionKind::Signed((b"RemoveParticipant", participant).encode(), signed.nonce(0))
}
Transaction::DkgParticipation { signed, .. } => {
TransactionKind::Signed(b"DkgParticipation".encode(), signed.nonce(0))
}
Transaction::DkgConfirmationPreprocess { attempt, signed, .. } => {
TransactionKind::Signed((b"DkgConfirmation", attempt).encode(), signed.nonce(0))
}
Transaction::DkgConfirmationShare { attempt, signed, .. } => {
TransactionKind::Signed((b"DkgConfirmation", attempt).encode(), signed.nonce(1))
}
Transaction::Cosign { .. } => TransactionKind::Provided("CosignSubstrateBlock"),
Transaction::Cosigned { .. } => TransactionKind::Provided("Cosigned"),
Transaction::SubstrateBlock { .. } => TransactionKind::Provided("SubstrateBlock"),
Transaction::Batch { .. } => TransactionKind::Provided("Batch"),
Transaction::Sign { id, attempt, round, signed, .. } => {
TransactionKind::Signed((b"Sign", id, attempt).encode(), signed.nonce(round.nonce()))
}
Transaction::SlashReport { signed, .. } => {
TransactionKind::Signed(b"SlashReport".encode(), signed.nonce(0))
}
}
}
fn hash(&self) -> [u8; 32] {
let mut tx = ReadWrite::serialize(self);
if let TransactionKind::Signed(_, signed) = self.kind() {
// Make sure the part we're cutting off is the signature
assert_eq!(tx.drain((tx.len() - 64) ..).collect::<Vec<_>>(), signed.signature.serialize());
}
Blake2b::<U32>::digest(&tx).into()
}
// This is a stateless verification which we use to enforce some size limits.
fn verify(&self) -> Result<(), TransactionError> {
#[allow(clippy::match_same_arms)]
match self {
// Fixed-length TX
Transaction::RemoveParticipant { .. } => {}
// TODO: MAX_DKG_PARTICIPATION_LEN
Transaction::DkgParticipation { .. } => {}
// These are fixed-length TXs
Transaction::DkgConfirmationPreprocess { .. } | Transaction::DkgConfirmationShare { .. } => {}
// Provided TXs
Transaction::Cosign { .. } |
Transaction::Cosigned { .. } |
Transaction::SubstrateBlock { .. } |
Transaction::Batch { .. } => {}
Transaction::Sign { data, .. } => {
if data.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() {
Err(TransactionError::InvalidContent)?
}
// TODO: MAX_SIGN_LEN
}
Transaction::SlashReport { slash_points, .. } => {
if slash_points.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() {
Err(TransactionError::InvalidContent)?
}
}
};
Ok(())
}
}
impl Transaction {
// Sign a transaction
//
// Panics if signing a transaction type which isn't `TransactionKind::Signed`
pub fn sign<R: RngCore + CryptoRng>(
&mut self,
rng: &mut R,
genesis: [u8; 32],
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
) {
fn signed(tx: &mut Transaction) -> &mut Signed {
#[allow(clippy::match_same_arms)] // This doesn't make semantic sense here
match tx {
Transaction::RemoveParticipant { ref mut signed, .. } |
Transaction::DkgParticipation { ref mut signed, .. } |
Transaction::DkgConfirmationPreprocess { ref mut signed, .. } => signed,
Transaction::DkgConfirmationShare { ref mut signed, .. } => signed,
Transaction::Cosign { .. } => panic!("signing CosignSubstrateBlock"),
Transaction::Cosigned { .. } => panic!("signing Cosigned"),
Transaction::SubstrateBlock { .. } => panic!("signing SubstrateBlock"),
Transaction::Batch { .. } => panic!("signing Batch"),
Transaction::Sign { ref mut signed, .. } => signed,
Transaction::SlashReport { ref mut signed, .. } => signed,
}
}
// Decide the nonce to sign with
let sig_nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(rng));
{
// Set the signer and the nonce
let signed = signed(self);
signed.signer = Ristretto::generator() * key.deref();
signed.signature.R = <Ristretto as Ciphersuite>::generator() * sig_nonce.deref();
}
// Get the signature hash (which now includes `R || A` making it valid as the challenge)
let sig_hash = self.sig_hash(genesis);
// Sign the signature
signed(self).signature = SchnorrSignature::<Ristretto>::sign(key, sig_nonce, sig_hash);
}
}

View File

@@ -234,7 +234,7 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
else {
panic!("AcceptedHandover event wasn't a AcceptedHandover event: {accepted_handover:?}");
};
crate::SignSlashReport::send(&mut txn, set);
crate::SignSlashReport::send(&mut txn, *set);
}
txn.commit();

View File

@@ -32,7 +32,7 @@ fn borsh_deserialize_validators<R: io::Read>(
}
/// The information for a new set.
#[derive(Debug, BorshSerialize, BorshDeserialize)]
#[derive(Clone, Debug, BorshSerialize, BorshDeserialize)]
pub struct NewSetInformation {
/// The set.
pub set: ValidatorSet,
@@ -66,8 +66,8 @@ mod _public_db {
// Relevant new set, from an ephemeral event stream
NewSet: () -> NewSetInformation,
// Relevant sign slash report, from an ephemeral event stream
SignSlashReport: () -> ValidatorSet,
// Potentially relevant sign slash report, from an ephemeral event stream
SignSlashReport: (set: ValidatorSet) -> (),
}
);
}
@@ -109,12 +109,12 @@ impl NewSet {
/// notifications for all relevant validator sets will be included.
pub struct SignSlashReport;
impl SignSlashReport {
pub(crate) fn send(txn: &mut impl DbTxn, set: &ValidatorSet) {
_public_db::SignSlashReport::send(txn, set);
pub(crate) fn send(txn: &mut impl DbTxn, set: ValidatorSet) {
_public_db::SignSlashReport::send(txn, set, &());
}
/// Try to receive a notification to sign a slash report, returning `None` if there is none to
/// receive.
pub fn try_recv(txn: &mut impl DbTxn) -> Option<ValidatorSet> {
_public_db::SignSlashReport::try_recv(txn)
pub fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<()> {
_public_db::SignSlashReport::try_recv(txn, set)
}
}

View File

@@ -0,0 +1,49 @@
[package]
name = "tributary-sdk"
version = "0.1.0"
description = "A micro-blockchain to provide consensus and ordering to P2P communication"
license = "AGPL-3.0-only"
repository = "https://github.com/serai-dex/serai/tree/develop/coordinator/tributary-sdk"
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
edition = "2021"
rust-version = "1.81"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[lints]
workspace = true
[dependencies]
thiserror = { version = "2", default-features = false, features = ["std"] }
subtle = { version = "^2", default-features = false, features = ["std"] }
zeroize = { version = "^1.5", default-features = false, features = ["std"] }
rand = { version = "0.8", default-features = false, features = ["std"] }
rand_chacha = { version = "0.3", default-features = false, features = ["std"] }
blake2 = { version = "0.10", default-features = false, features = ["std"] }
transcript = { package = "flexible-transcript", path = "../../crypto/transcript", version = "0.3", default-features = false, features = ["std", "recommended"] }
ciphersuite = { package = "ciphersuite", path = "../../crypto/ciphersuite", version = "0.4", default-features = false, features = ["std", "ristretto"] }
schnorr = { package = "schnorr-signatures", path = "../../crypto/schnorr", version = "0.5", default-features = false, features = ["std"] }
hex = { version = "0.4", default-features = false, features = ["std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
serai-db = { path = "../../common/db", version = "0.1" }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] }
futures-util = { version = "0.3", default-features = false, features = ["std", "sink", "channel"] }
futures-channel = { version = "0.3", default-features = false, features = ["std", "sink"] }
tendermint = { package = "tendermint-machine", path = "./tendermint", version = "0.2" }
tokio = { version = "1", default-features = false, features = ["sync", "time", "rt"] }
[dev-dependencies]
tokio = { version = "1", features = ["macros"] }
[features]
tests = []

View File

@@ -0,0 +1,15 @@
AGPL-3.0-only license
Copyright (c) 2023 Luke Parker
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License Version 3 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@@ -0,0 +1,3 @@
# Tributary
A verifiable, ordered broadcast layer implemented as a BFT micro-blockchain.

View File

@@ -0,0 +1,388 @@
use core::{marker::PhantomData, fmt::Debug, future::Future};
use std::{sync::Arc, io};
use zeroize::Zeroizing;
use ciphersuite::{Ciphersuite, Ristretto};
use scale::Decode;
use futures_channel::mpsc::UnboundedReceiver;
use futures_util::{StreamExt, SinkExt};
use ::tendermint::{
ext::{BlockNumber, Commit, Block as BlockTrait, Network},
SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender,
TendermintMachine, TendermintHandle,
};
pub use ::tendermint::Evidence;
use serai_db::Db;
use tokio::sync::RwLock;
mod merkle;
pub(crate) use merkle::*;
pub mod transaction;
pub use transaction::{TransactionError, Signed, TransactionKind, Transaction as TransactionTrait};
use crate::tendermint::tx::TendermintTx;
mod provided;
pub(crate) use provided::*;
pub use provided::ProvidedError;
mod block;
pub use block::*;
mod blockchain;
pub(crate) use blockchain::*;
mod mempool;
pub(crate) use mempool::*;
pub mod tendermint;
pub(crate) use crate::tendermint::*;
#[cfg(any(test, feature = "tests"))]
pub mod tests;
/// Size limit for an individual transaction.
// This needs to be big enough to participate in a 101-of-150 eVRF DKG with each element taking
// `MAX_KEY_LEN`. This also needs to be big enough to pariticpate in signing 520 Bitcoin inputs
// with 49 key shares, and signing 120 Monero inputs with 49 key shares.
// TODO: Add a test for these properties
pub const TRANSACTION_SIZE_LIMIT: usize = 2_000_000;
/// Amount of transactions a single account may have in the mempool.
pub const ACCOUNT_MEMPOOL_LIMIT: u32 = 50;
/// Block size limit.
// This targets a growth limit of roughly 30 GB a day, under load, in order to prevent a malicious
// participant from flooding disks and causing out of space errors in order processes.
pub const BLOCK_SIZE_LIMIT: usize = 2_001_000;
pub(crate) const TENDERMINT_MESSAGE: u8 = 0;
pub(crate) const TRANSACTION_MESSAGE: u8 = 1;
#[allow(clippy::large_enum_variant)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum Transaction<T: TransactionTrait> {
Tendermint(TendermintTx),
Application(T),
}
impl<T: TransactionTrait> ReadWrite for Transaction<T> {
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let mut kind = [0];
reader.read_exact(&mut kind)?;
match kind[0] {
0 => {
let tx = TendermintTx::read(reader)?;
Ok(Transaction::Tendermint(tx))
}
1 => {
let tx = T::read(reader)?;
Ok(Transaction::Application(tx))
}
_ => Err(io::Error::other("invalid transaction type")),
}
}
fn write<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
match self {
Transaction::Tendermint(tx) => {
writer.write_all(&[0])?;
tx.write(writer)
}
Transaction::Application(tx) => {
writer.write_all(&[1])?;
tx.write(writer)
}
}
}
}
impl<T: TransactionTrait> Transaction<T> {
pub fn hash(&self) -> [u8; 32] {
match self {
Transaction::Tendermint(tx) => tx.hash(),
Transaction::Application(tx) => tx.hash(),
}
}
pub fn kind(&self) -> TransactionKind {
match self {
Transaction::Tendermint(tx) => tx.kind(),
Transaction::Application(tx) => tx.kind(),
}
}
}
/// An item which can be read and written.
pub trait ReadWrite: Sized {
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self>;
fn write<W: io::Write>(&self, writer: &mut W) -> io::Result<()>;
fn serialize(&self) -> Vec<u8> {
// BlockHeader is 64 bytes and likely the smallest item in this system
let mut buf = Vec::with_capacity(64);
self.write(&mut buf).unwrap();
buf
}
}
pub trait P2p: 'static + Send + Sync + Clone {
/// Broadcast a message to all other members of the Tributary with the specified genesis.
///
/// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't
/// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based
/// deduplication to ensure a sane amount of load.
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()>;
}
impl<P: P2p> P2p for Arc<P> {
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
P::broadcast(self, genesis, msg)
}
}
#[derive(Clone)]
pub struct Tributary<D: Db, T: TransactionTrait, P: P2p> {
db: D,
genesis: [u8; 32],
network: TendermintNetwork<D, T, P>,
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
}
impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
pub async fn new(
db: D,
genesis: [u8; 32],
start_time: u64,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
validators: Vec<(<Ristretto as Ciphersuite>::G, u64)>,
p2p: P,
) -> Option<Self> {
log::info!("new Tributary with genesis {}", hex::encode(genesis));
let validators_vec = validators.iter().map(|validator| validator.0).collect::<Vec<_>>();
let signer = Arc::new(Signer::new(genesis, key));
let validators = Arc::new(Validators::new(genesis, validators)?);
let mut blockchain = Blockchain::new(db.clone(), genesis, &validators_vec);
let block_number = BlockNumber(blockchain.block_number());
let start_time = if let Some(commit) = blockchain.commit(&blockchain.tip()) {
Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time
} else {
start_time
};
let proposal = TendermintBlock(
blockchain.build_block::<TendermintNetwork<D, T, P>>(&validators).serialize(),
);
let blockchain = Arc::new(RwLock::new(blockchain));
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };
let TendermintHandle { synced_block, synced_block_result, messages, machine } =
TendermintMachine::new(
db.clone(),
network.clone(),
genesis,
block_number,
start_time,
proposal,
)
.await;
tokio::spawn(machine.run());
Some(Self {
db,
genesis,
network,
synced_block: Arc::new(RwLock::new(synced_block)),
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
messages: Arc::new(RwLock::new(messages)),
})
}
pub fn block_time() -> u32 {
TendermintNetwork::<D, T, P>::block_time()
}
pub fn genesis(&self) -> [u8; 32] {
self.genesis
}
pub async fn block_number(&self) -> u64 {
self.network.blockchain.read().await.block_number()
}
pub async fn tip(&self) -> [u8; 32] {
self.network.blockchain.read().await.tip()
}
pub fn reader(&self) -> TributaryReader<D, T> {
TributaryReader(self.db.clone(), self.genesis, PhantomData)
}
pub async fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> {
self.network.blockchain.write().await.provide_transaction(tx)
}
pub async fn next_nonce(
&self,
signer: &<Ristretto as Ciphersuite>::G,
order: &[u8],
) -> Option<u32> {
self.network.blockchain.read().await.next_nonce(signer, order)
}
// Returns Ok(true) if new, Ok(false) if an already present unsigned, or the error.
// Safe to be &self since the only meaningful usage of self is self.network.blockchain which
// successfully acquires its own write lock
pub async fn add_transaction(&self, tx: T) -> Result<bool, TransactionError> {
let tx = Transaction::Application(tx);
let mut to_broadcast = vec![TRANSACTION_MESSAGE];
tx.write(&mut to_broadcast).unwrap();
let res = self.network.blockchain.write().await.add_transaction::<TendermintNetwork<D, T, P>>(
true,
tx,
&self.network.signature_scheme(),
);
if res == Ok(true) {
self.network.p2p.broadcast(self.genesis, to_broadcast).await;
}
res
}
async fn sync_block_internal(
&self,
block: Block<T>,
commit: Vec<u8>,
result: &mut UnboundedReceiver<bool>,
) -> bool {
let (tip, block_number) = {
let blockchain = self.network.blockchain.read().await;
(blockchain.tip(), blockchain.block_number())
};
if block.header.parent != tip {
log::debug!("told to sync a block whose parent wasn't our tip");
return false;
}
let block = TendermintBlock(block.serialize());
let mut commit_ref = commit.as_ref();
let Ok(commit) = Commit::<Arc<Validators>>::decode(&mut commit_ref) else {
log::error!("sent an invalidly serialized commit");
return false;
};
// Storage DoS vector. We *could* truncate to solely the relevant portion, trying to save this,
// yet then we'd have to test the truncation was performed correctly.
if !commit_ref.is_empty() {
log::error!("sent an commit with additional data after it");
return false;
}
if !self.network.verify_commit(block.id(), &commit) {
log::error!("sent an invalid commit");
return false;
}
let number = BlockNumber(block_number + 1);
self.synced_block.write().await.send(SyncedBlock { number, block, commit }).await.unwrap();
result.next().await.unwrap()
}
// Sync a block.
// TODO: Since we have a static validator set, we should only need the tail commit?
pub async fn sync_block(&self, block: Block<T>, commit: Vec<u8>) -> bool {
let mut result = self.synced_block_result.write().await;
self.sync_block_internal(block, commit, &mut result).await
}
// Return true if the message should be rebroadcasted.
pub async fn handle_message(&self, msg: &[u8]) -> bool {
match msg.first() {
Some(&TRANSACTION_MESSAGE) => {
let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else {
log::error!("received invalid transaction message");
return false;
};
// TODO: Sync mempools with fellow peers
// Can we just rebroadcast transactions not included for at least two blocks?
let res =
self.network.blockchain.write().await.add_transaction::<TendermintNetwork<D, T, P>>(
false,
tx,
&self.network.signature_scheme(),
);
log::debug!("received transaction message. valid new transaction: {res:?}");
res == Ok(true)
}
Some(&TENDERMINT_MESSAGE) => {
let Ok(msg) =
SignedMessageFor::<TendermintNetwork<D, T, P>>::decode::<&[u8]>(&mut &msg[1 ..])
else {
log::error!("received invalid tendermint message");
return false;
};
self.messages.write().await.send(msg).await.unwrap();
false
}
_ => false,
}
}
/// Get a Future which will resolve once the next block has been added.
pub async fn next_block_notification(
&self,
) -> impl Send + Sync + core::future::Future<Output = Result<(), impl Send + Sync>> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.network.blockchain.write().await.next_block_notifications.push_back(tx);
rx
}
}
#[derive(Clone)]
pub struct TributaryReader<D: Db, T: TransactionTrait>(D, [u8; 32], PhantomData<T>);
impl<D: Db, T: TransactionTrait> TributaryReader<D, T> {
pub fn genesis(&self) -> [u8; 32] {
self.1
}
// Since these values are static once set, they can be safely read from the database without lock
// acquisition
pub fn block(&self, hash: &[u8; 32]) -> Option<Block<T>> {
Blockchain::<D, T>::block_from_db(&self.0, self.1, hash)
}
pub fn commit(&self, hash: &[u8; 32]) -> Option<Vec<u8>> {
Blockchain::<D, T>::commit_from_db(&self.0, self.1, hash)
}
pub fn parsed_commit(&self, hash: &[u8; 32]) -> Option<Commit<Validators>> {
self.commit(hash).map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap())
}
pub fn block_after(&self, hash: &[u8; 32]) -> Option<[u8; 32]> {
Blockchain::<D, T>::block_after(&self.0, self.1, hash)
}
pub fn time_of_block(&self, hash: &[u8; 32]) -> Option<u64> {
self
.commit(hash)
.map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time)
}
pub fn locally_provided_txs_in_block(&self, hash: &[u8; 32], order: &str) -> bool {
Blockchain::<D, T>::locally_provided_txs_in_block(&self.0, &self.1, hash, order)
}
// This isn't static, yet can be read with only minor discrepancy risks
pub fn tip(&self) -> [u8; 32] {
Blockchain::<D, T>::tip_from_db(&self.0, self.1)
}
}

View File

@@ -0,0 +1,218 @@
use core::fmt::Debug;
use std::io;
use zeroize::Zeroize;
use thiserror::Error;
use blake2::{Digest, Blake2b512};
use ciphersuite::{
group::{Group, GroupEncoding},
Ciphersuite, Ristretto,
};
use schnorr::SchnorrSignature;
use crate::{TRANSACTION_SIZE_LIMIT, ReadWrite};
#[derive(Clone, PartialEq, Eq, Debug, Error)]
pub enum TransactionError {
/// Transaction exceeded the size limit.
#[error("transaction is too large")]
TooLargeTransaction,
/// Transaction's signer isn't a participant.
#[error("invalid signer")]
InvalidSigner,
/// Transaction's nonce isn't the prior nonce plus one.
#[error("invalid nonce")]
InvalidNonce,
/// Transaction's signature is invalid.
#[error("invalid signature")]
InvalidSignature,
/// Transaction's content is invalid.
#[error("transaction content is invalid")]
InvalidContent,
/// Transaction's signer has too many transactions in the mempool.
#[error("signer has too many transactions in the mempool")]
TooManyInMempool,
/// Provided Transaction added to mempool.
#[error("provided transaction added to mempool")]
ProvidedAddedToMempool,
}
/// Data for a signed transaction.
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Signed {
pub signer: <Ristretto as Ciphersuite>::G,
pub nonce: u32,
pub signature: SchnorrSignature<Ristretto>,
}
impl ReadWrite for Signed {
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let signer = Ristretto::read_G(reader)?;
let mut nonce = [0; 4];
reader.read_exact(&mut nonce)?;
let nonce = u32::from_le_bytes(nonce);
if nonce >= (u32::MAX - 1) {
Err(io::Error::other("nonce exceeded limit"))?;
}
let mut signature = SchnorrSignature::<Ristretto>::read(reader)?;
if signature.R.is_identity().into() {
// Anyone malicious could remove this and try to find zero signatures
// We should never produce zero signatures though meaning this should never come up
// If it does somehow come up, this is a decent courtesy
signature.zeroize();
Err(io::Error::other("signature nonce was identity"))?;
}
Ok(Signed { signer, nonce, signature })
}
fn write<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
// This is either an invalid signature or a private key leak
if self.signature.R.is_identity().into() {
Err(io::Error::other("signature nonce was identity"))?;
}
writer.write_all(&self.signer.to_bytes())?;
writer.write_all(&self.nonce.to_le_bytes())?;
self.signature.write(writer)
}
}
impl Signed {
pub fn read_without_nonce<R: io::Read>(reader: &mut R, nonce: u32) -> io::Result<Self> {
let signer = Ristretto::read_G(reader)?;
let mut signature = SchnorrSignature::<Ristretto>::read(reader)?;
if signature.R.is_identity().into() {
// Anyone malicious could remove this and try to find zero signatures
// We should never produce zero signatures though meaning this should never come up
// If it does somehow come up, this is a decent courtesy
signature.zeroize();
Err(io::Error::other("signature nonce was identity"))?;
}
Ok(Signed { signer, nonce, signature })
}
pub fn write_without_nonce<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
// This is either an invalid signature or a private key leak
if self.signature.R.is_identity().into() {
Err(io::Error::other("signature nonce was identity"))?;
}
writer.write_all(&self.signer.to_bytes())?;
self.signature.write(writer)
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum TransactionKind {
/// This transaction should be provided by every validator, in an exact order.
///
/// The contained static string names the orderer to use. This allows two distinct provided
/// transaction kinds, without a synchronized order, to be ordered within their own kind without
/// requiring ordering with each other.
///
/// The only malleability is in when this transaction appears on chain. The block producer will
/// include it when they have it. Block verification will fail for validators without it.
///
/// If a supermajority of validators produce a commit for a block with a provided transaction
/// which isn't locally held, the block will be added to the local chain. When the transaction is
/// locally provided, it will be compared for correctness to the on-chain version
///
/// In order to ensure TXs aren't accidentally provided multiple times, all provided transactions
/// must have a unique hash which is also unique to all Unsigned transactions.
Provided(&'static str),
/// An unsigned transaction, only able to be included by the block producer.
///
/// Once an Unsigned transaction is included on-chain, it may not be included again. In order to
/// have multiple Unsigned transactions with the same values included on-chain, some distinct
/// nonce must be included in order to cause a distinct hash.
///
/// The hash must also be unique with all Provided transactions.
Unsigned,
/// A signed transaction.
Signed(Vec<u8>, Signed),
}
// TODO: Should this be renamed TransactionTrait now that a literal Transaction exists?
// Or should the literal Transaction be renamed to Event?
pub trait Transaction: 'static + Send + Sync + Clone + Eq + Debug + ReadWrite {
/// Return what type of transaction this is.
fn kind(&self) -> TransactionKind;
/// Return the hash of this transaction.
///
/// The hash must NOT commit to the signature.
fn hash(&self) -> [u8; 32];
/// Perform transaction-specific verification.
fn verify(&self) -> Result<(), TransactionError>;
/// Obtain the challenge for this transaction's signature.
///
/// Do not override this unless you know what you're doing.
///
/// Panics if called on non-signed transactions.
fn sig_hash(&self, genesis: [u8; 32]) -> <Ristretto as Ciphersuite>::F {
match self.kind() {
TransactionKind::Signed(order, Signed { signature, .. }) => {
<Ristretto as Ciphersuite>::F::from_bytes_mod_order_wide(
&Blake2b512::digest(
[
b"Tributary Signed Transaction",
genesis.as_ref(),
&self.hash(),
order.as_ref(),
signature.R.to_bytes().as_ref(),
]
.concat(),
)
.into(),
)
}
_ => panic!("sig_hash called on non-signed transaction"),
}
}
}
pub trait GAIN: FnMut(&<Ristretto as Ciphersuite>::G, &[u8]) -> Option<u32> {}
impl<F: FnMut(&<Ristretto as Ciphersuite>::G, &[u8]) -> Option<u32>> GAIN for F {}
pub(crate) fn verify_transaction<F: GAIN, T: Transaction>(
tx: &T,
genesis: [u8; 32],
get_and_increment_nonce: &mut F,
) -> Result<(), TransactionError> {
if tx.serialize().len() > TRANSACTION_SIZE_LIMIT {
Err(TransactionError::TooLargeTransaction)?;
}
tx.verify()?;
match tx.kind() {
TransactionKind::Provided(_) | TransactionKind::Unsigned => {}
TransactionKind::Signed(order, Signed { signer, nonce, signature }) => {
if let Some(next_nonce) = get_and_increment_nonce(&signer, &order) {
if nonce != next_nonce {
Err(TransactionError::InvalidNonce)?;
}
} else {
// Not a participant
Err(TransactionError::InvalidSigner)?;
}
// TODO: Use a batch verification here
if !signature.verify(signer, tx.sig_hash(genesis)) {
Err(TransactionError::InvalidSignature)?;
}
}
}
Ok(())
}

View File

@@ -1,11 +1,13 @@
[package]
name = "tributary-chain"
name = "serai-coordinator-tributary"
version = "0.1.0"
description = "A micro-blockchain to provide consensus and ordering to P2P communication"
description = "The Tributary used by the Serai Coordinator"
license = "AGPL-3.0-only"
repository = "https://github.com/serai-dex/serai/tree/develop/coordinator/tributary"
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
keywords = []
edition = "2021"
publish = false
rust-version = "1.81"
[package.metadata.docs.rs]
@@ -16,34 +18,29 @@ rustdoc-args = ["--cfg", "docsrs"]
workspace = true
[dependencies]
thiserror = { version = "2", default-features = false, features = ["std"] }
subtle = { version = "^2", default-features = false, features = ["std"] }
zeroize = { version = "^1.5", default-features = false, features = ["std"] }
rand = { version = "0.8", default-features = false, features = ["std"] }
rand_chacha = { version = "0.3", default-features = false, features = ["std"] }
rand_core = { version = "0.6", default-features = false, features = ["std"] }
blake2 = { version = "0.10", default-features = false, features = ["std"] }
transcript = { package = "flexible-transcript", path = "../../crypto/transcript", default-features = false, features = ["std", "recommended"] }
ciphersuite = { package = "ciphersuite", path = "../../crypto/ciphersuite", default-features = false, features = ["std", "ristretto"] }
ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] }
schnorr = { package = "schnorr-signatures", path = "../../crypto/schnorr", default-features = false, features = ["std"] }
hex = { version = "0.4", default-features = false, features = ["std"] }
log = { version = "0.4", default-features = false, features = ["std"] }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
serai-client = { path = "../../substrate/client", default-features = false, features = ["serai", "borsh"] }
serai-db = { path = "../../common/db" }
serai-task = { path = "../../common/task", version = "0.1" }
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] }
futures-util = { version = "0.3", default-features = false, features = ["std", "sink", "channel"] }
futures-channel = { version = "0.3", default-features = false, features = ["std", "sink"] }
tendermint = { package = "tendermint-machine", path = "./tendermint" }
tributary-sdk = { path = "../tributary-sdk" }
tokio = { version = "1", default-features = false, features = ["sync", "time", "rt"] }
serai-cosign = { path = "../cosign" }
serai-coordinator-substrate = { path = "../substrate" }
[dev-dependencies]
tokio = { version = "1", features = ["macros"] }
messages = { package = "serai-processor-messages", path = "../../processor/messages" }
log = { version = "0.4", default-features = false, features = ["std"] }
[features]
tests = []
longer-reattempts = []

View File

@@ -1,6 +1,6 @@
AGPL-3.0-only license
Copyright (c) 2023 Luke Parker
Copyright (c) 2023-2025 Luke Parker
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License Version 3 as

View File

@@ -1,3 +1,4 @@
# Tributary
# Serai Coordinator Tributary
A verifiable, ordered broadcast layer implemented as a BFT micro-blockchain.
The Tributary used by the Serai Coordinator. This includes the `Transaction`
definition and the code to handle blocks added on-chain.

View File

@@ -9,7 +9,7 @@ use messages::sign::{VariantSignId, SignId};
use serai_db::*;
use crate::tributary::transaction::SigningProtocolRound;
use crate::transaction::SigningProtocolRound;
/// A topic within the database which the group participates in
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)]
@@ -167,6 +167,9 @@ impl Topic {
}
}
pub(crate) trait Borshy: BorshSerialize + BorshDeserialize {}
impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
/// The resulting data set from an accumulation
pub(crate) enum DataSet<D: Borshy> {
/// Accumulating this did not produce a data set to act on
@@ -176,21 +179,20 @@ pub(crate) enum DataSet<D: Borshy> {
Participating(HashMap<SeraiAddress, D>),
}
trait Borshy: BorshSerialize + BorshDeserialize {}
impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
create_db!(
CoordinatorTributary {
// The last handled tributary block's (number, hash)
LastHandledTributaryBlock: (set: ValidatorSet) -> (u64, [u8; 32]),
// The slash points a validator has accrued, with u64::MAX representing a fatal slash.
SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u64,
// The slash points a validator has accrued, with u32::MAX representing a fatal slash.
SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u32,
// The latest Substrate block to cosign.
LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32],
// If we're actively cosigning or not.
ActivelyCosigning: (set: ValidatorSet) -> (),
// The hash of the block we're actively cosigning.
ActivelyCosigning: (set: ValidatorSet) -> [u8; 32],
// If this block has already been cosigned.
Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (),
// The weight accumulated for a topic.
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
@@ -238,19 +240,20 @@ impl TributaryDb {
) {
LatestSubstrateBlockToCosign::set(txn, set, &substrate_block_hash);
}
pub(crate) fn actively_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) -> bool {
ActivelyCosigning::get(txn, set).is_some()
pub(crate) fn actively_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<[u8; 32]> {
ActivelyCosigning::get(txn, set)
}
pub(crate) fn start_cosigning(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
substrate_block_number: u64,
) {
assert!(
ActivelyCosigning::get(txn, set).is_none(),
"starting cosigning while already cosigning"
);
ActivelyCosigning::set(txn, set, &());
ActivelyCosigning::set(txn, set, &substrate_block_hash);
TributaryDb::recognize_topic(
txn,
@@ -265,6 +268,20 @@ impl TributaryDb {
pub(crate) fn finish_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) {
assert!(ActivelyCosigning::take(txn, set).is_some(), "finished cosigning but not cosigning");
}
pub(crate) fn mark_cosigned(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
) {
Cosigned::set(txn, set, substrate_block_hash, &());
}
pub(crate) fn cosigned(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
) -> bool {
Cosigned::get(txn, set, substrate_block_hash).is_some()
}
pub(crate) fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) {
AccumulatedWeight::set(txn, set, topic, &0);
@@ -299,7 +316,7 @@ impl TributaryDb {
reason: &str,
) {
log::warn!("{validator} fatally slashed: {reason}");
SlashPoints::set(txn, set, validator, &u64::MAX);
SlashPoints::set(txn, set, validator, &u32::MAX);
}
pub(crate) fn is_fatally_slashed(
@@ -307,7 +324,7 @@ impl TributaryDb {
set: ValidatorSet,
validator: SeraiAddress,
) -> bool {
SlashPoints::get(getter, set, validator).unwrap_or(0) == u64::MAX
SlashPoints::get(getter, set, validator).unwrap_or(0) == u32::MAX
}
#[allow(clippy::too_many_arguments)]
@@ -372,12 +389,12 @@ impl TributaryDb {
// 5 minutes
#[cfg(not(feature = "longer-reattempts"))]
const BASE_REATTEMPT_DELAY: u32 =
(5u32 * 60 * 1000).div_ceil(tributary::tendermint::TARGET_BLOCK_TIME);
(5u32 * 60 * 1000).div_ceil(tributary_sdk::tendermint::TARGET_BLOCK_TIME);
// 10 minutes, intended for latent environments like the GitHub CI
#[cfg(feature = "longer-reattempts")]
const BASE_REATTEMPT_DELAY: u32 =
(10u32 * 60 * 1000).div_ceil(tributary::tendermint::TARGET_BLOCK_TIME);
(10u32 * 60 * 1000).div_ceil(tributary_sdk::tendermint::TARGET_BLOCK_TIME);
// Linearly scale the time for the protocol with the attempt number
let blocks_till_reattempt = u64::from(attempt * BASE_REATTEMPT_DELAY);

View File

@@ -1,388 +1,523 @@
use core::{marker::PhantomData, fmt::Debug, future::Future};
use std::{sync::Arc, io};
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
use zeroize::Zeroizing;
use core::{marker::PhantomData, future::Future};
use std::collections::HashMap;
use ciphersuite::{Ciphersuite, Ristretto};
use ciphersuite::group::GroupEncoding;
use scale::Decode;
use futures_channel::mpsc::UnboundedReceiver;
use futures_util::{StreamExt, SinkExt};
use ::tendermint::{
ext::{BlockNumber, Commit, Block as BlockTrait, Network},
SignedMessageFor, SyncedBlock, SyncedBlockSender, SyncedBlockResultReceiver, MessageSender,
TendermintMachine, TendermintHandle,
use serai_client::{
primitives::SeraiAddress,
validator_sets::primitives::{ValidatorSet, Slash},
};
pub use ::tendermint::Evidence;
use serai_db::*;
use serai_task::ContinuallyRan;
use serai_db::Db;
use tributary_sdk::{
tendermint::{
tx::{TendermintTx, Evidence, decode_signed_message},
TendermintNetwork,
},
Signed as TributarySigned, TransactionKind, TransactionTrait,
Transaction as TributaryTransaction, Block, TributaryReader, P2p,
};
use tokio::sync::RwLock;
use serai_cosign::Cosigning;
use serai_coordinator_substrate::NewSetInformation;
mod merkle;
pub(crate) use merkle::*;
use messages::sign::VariantSignId;
pub mod transaction;
pub use transaction::{TransactionError, Signed, TransactionKind, Transaction as TransactionTrait};
mod transaction;
pub(crate) use transaction::{SigningProtocolRound, Signed};
pub use transaction::Transaction;
use crate::tendermint::tx::TendermintTx;
mod db;
use db::*;
mod provided;
pub(crate) use provided::*;
pub use provided::ProvidedError;
mod block;
pub use block::*;
mod blockchain;
pub(crate) use blockchain::*;
mod mempool;
pub(crate) use mempool::*;
pub mod tendermint;
pub(crate) use crate::tendermint::*;
#[cfg(any(test, feature = "tests"))]
pub mod tests;
/// Size limit for an individual transaction.
// This needs to be big enough to participate in a 101-of-150 eVRF DKG with each element taking
// `MAX_KEY_LEN`. This also needs to be big enough to pariticpate in signing 520 Bitcoin inputs
// with 49 key shares, and signing 120 Monero inputs with 49 key shares.
// TODO: Add a test for these properties
pub const TRANSACTION_SIZE_LIMIT: usize = 2_000_000;
/// Amount of transactions a single account may have in the mempool.
pub const ACCOUNT_MEMPOOL_LIMIT: u32 = 50;
/// Block size limit.
// This targets a growth limit of roughly 30 GB a day, under load, in order to prevent a malicious
// participant from flooding disks and causing out of space errors in order processes.
pub const BLOCK_SIZE_LIMIT: usize = 2_001_000;
pub(crate) const TENDERMINT_MESSAGE: u8 = 0;
pub(crate) const TRANSACTION_MESSAGE: u8 = 1;
#[allow(clippy::large_enum_variant)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum Transaction<T: TransactionTrait> {
Tendermint(TendermintTx),
Application(T),
/// Messages to send to the Processors.
pub struct ProcessorMessages;
impl ProcessorMessages {
/// Try to receive a message to send to a Processor.
pub fn try_recv(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<messages::CoordinatorMessage> {
db::ProcessorMessages::try_recv(txn, set)
}
}
impl<T: TransactionTrait> ReadWrite for Transaction<T> {
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let mut kind = [0];
reader.read_exact(&mut kind)?;
match kind[0] {
0 => {
let tx = TendermintTx::read(reader)?;
Ok(Transaction::Tendermint(tx))
}
1 => {
let tx = T::read(reader)?;
Ok(Transaction::Application(tx))
}
_ => Err(io::Error::other("invalid transaction type")),
struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> {
_td: PhantomData<TD>,
_p2p: PhantomData<P>,
cosign_db: &'a CD,
tributary_txn: &'a mut TDT,
set: ValidatorSet,
validators: &'a [SeraiAddress],
total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>,
}
impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() {
return;
}
}
fn write<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
match self {
Transaction::Tendermint(tx) => {
writer.write_all(&[0])?;
tx.write(writer)
}
Transaction::Application(tx) => {
writer.write_all(&[1])?;
tx.write(writer)
}
}
}
}
impl<T: TransactionTrait> Transaction<T> {
pub fn hash(&self) -> [u8; 32] {
match self {
Transaction::Tendermint(tx) => tx.hash(),
Transaction::Application(tx) => tx.hash(),
}
}
pub fn kind(&self) -> TransactionKind {
match self {
Transaction::Tendermint(tx) => tx.kind(),
Transaction::Application(tx) => tx.kind(),
}
}
}
/// An item which can be read and written.
pub trait ReadWrite: Sized {
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self>;
fn write<W: io::Write>(&self, writer: &mut W) -> io::Result<()>;
fn serialize(&self) -> Vec<u8> {
// BlockHeader is 64 bytes and likely the smallest item in this system
let mut buf = Vec::with_capacity(64);
self.write(&mut buf).unwrap();
buf
}
}
pub trait P2p: 'static + Send + Sync + Clone {
/// Broadcast a message to all other members of the Tributary with the specified genesis.
///
/// The Tributary will re-broadcast consensus messages on a fixed interval to ensure they aren't
/// prematurely dropped from the P2P layer. THe P2P layer SHOULD perform content-based
/// deduplication to ensure a sane amount of load.
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()>;
}
impl<P: P2p> P2p for Arc<P> {
fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) -> impl Send + Future<Output = ()> {
P::broadcast(self, genesis, msg)
}
}
#[derive(Clone)]
pub struct Tributary<D: Db, T: TransactionTrait, P: P2p> {
db: D,
genesis: [u8; 32],
network: TendermintNetwork<D, T, P>,
synced_block: Arc<RwLock<SyncedBlockSender<TendermintNetwork<D, T, P>>>>,
synced_block_result: Arc<RwLock<SyncedBlockResultReceiver>>,
messages: Arc<RwLock<MessageSender<TendermintNetwork<D, T, P>>>>,
}
impl<D: Db, T: TransactionTrait, P: P2p> Tributary<D, T, P> {
pub async fn new(
db: D,
genesis: [u8; 32],
start_time: u64,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
validators: Vec<(<Ristretto as Ciphersuite>::G, u64)>,
p2p: P,
) -> Option<Self> {
log::info!("new Tributary with genesis {}", hex::encode(genesis));
let validators_vec = validators.iter().map(|validator| validator.0).collect::<Vec<_>>();
let signer = Arc::new(Signer::new(genesis, key));
let validators = Arc::new(Validators::new(genesis, validators)?);
let mut blockchain = Blockchain::new(db.clone(), genesis, &validators_vec);
let block_number = BlockNumber(blockchain.block_number());
let start_time = if let Some(commit) = blockchain.commit(&blockchain.tip()) {
Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time
} else {
start_time
// Fetch the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
else {
return;
};
let proposal = TendermintBlock(
blockchain.build_block::<TendermintNetwork<D, T, P>>(&validators).serialize(),
// If it was already cosigned, return
if TributaryDb::cosigned(self.tributary_txn, self.set, latest_substrate_block_to_cosign) {
return;
}
let Some(substrate_block_number) =
Cosigning::<CD>::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign)
else {
// This is a valid panic as we shouldn't be scanning this block if we didn't provide all
// Provided transactions within it, and the block to cosign is a Provided transaction
panic!("cosigning a block our cosigner didn't index")
};
// Mark us as actively cosigning
TributaryDb::start_cosigning(
self.tributary_txn,
self.set,
latest_substrate_block_to_cosign,
substrate_block_number,
);
let blockchain = Arc::new(RwLock::new(blockchain));
let network = TendermintNetwork { genesis, signer, validators, blockchain, p2p };
let TendermintHandle { synced_block, synced_block_result, messages, machine } =
TendermintMachine::new(
db.clone(),
network.clone(),
genesis,
block_number,
start_time,
proposal,
)
.await;
tokio::spawn(machine.run());
Some(Self {
db,
genesis,
network,
synced_block: Arc::new(RwLock::new(synced_block)),
synced_block_result: Arc::new(RwLock::new(synced_block_result)),
messages: Arc::new(RwLock::new(messages)),
})
}
pub fn block_time() -> u32 {
TendermintNetwork::<D, T, P>::block_time()
}
pub fn genesis(&self) -> [u8; 32] {
self.genesis
}
pub async fn block_number(&self) -> u64 {
self.network.blockchain.read().await.block_number()
}
pub async fn tip(&self) -> [u8; 32] {
self.network.blockchain.read().await.tip()
}
pub fn reader(&self) -> TributaryReader<D, T> {
TributaryReader(self.db.clone(), self.genesis, PhantomData)
}
pub async fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> {
self.network.blockchain.write().await.provide_transaction(tx)
}
pub async fn next_nonce(
&self,
signer: &<Ristretto as Ciphersuite>::G,
order: &[u8],
) -> Option<u32> {
self.network.blockchain.read().await.next_nonce(signer, order)
}
// Returns Ok(true) if new, Ok(false) if an already present unsigned, or the error.
// Safe to be &self since the only meaningful usage of self is self.network.blockchain which
// successfully acquires its own write lock
pub async fn add_transaction(&self, tx: T) -> Result<bool, TransactionError> {
let tx = Transaction::Application(tx);
let mut to_broadcast = vec![TRANSACTION_MESSAGE];
tx.write(&mut to_broadcast).unwrap();
let res = self.network.blockchain.write().await.add_transaction::<TendermintNetwork<D, T, P>>(
true,
tx,
&self.network.signature_scheme(),
// Send the message for the processor to start signing
TributaryDb::send_message(
self.tributary_txn,
self.set,
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
session: self.set.session,
block_number: substrate_block_number,
block: latest_substrate_block_to_cosign,
},
);
if res == Ok(true) {
self.network.p2p.broadcast(self.genesis, to_broadcast).await;
}
res
}
fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) {
let signer = |signed: Signed| SeraiAddress(signed.signer().to_bytes());
async fn sync_block_internal(
&self,
block: Block<T>,
commit: Vec<u8>,
result: &mut UnboundedReceiver<bool>,
) -> bool {
let (tip, block_number) = {
let blockchain = self.network.blockchain.read().await;
(blockchain.tip(), blockchain.block_number())
};
if block.header.parent != tip {
log::debug!("told to sync a block whose parent wasn't our tip");
return false;
if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() {
// Don't handle transactions from those fatally slashed
// TODO: The fact they can publish these TXs makes this a notable spam vector
if TributaryDb::is_fatally_slashed(
self.tributary_txn,
self.set,
SeraiAddress(signer.to_bytes()),
) {
return;
}
}
let block = TendermintBlock(block.serialize());
let mut commit_ref = commit.as_ref();
let Ok(commit) = Commit::<Arc<Validators>>::decode(&mut commit_ref) else {
log::error!("sent an invalidly serialized commit");
return false;
};
// Storage DoS vector. We *could* truncate to solely the relevant portion, trying to save this,
// yet then we'd have to test the truncation was performed correctly.
if !commit_ref.is_empty() {
log::error!("sent an commit with additional data after it");
return false;
}
if !self.network.verify_commit(block.id(), &commit) {
log::error!("sent an invalid commit");
return false;
}
match tx {
// Accumulate this vote and fatally slash the participant if past the threshold
Transaction::RemoveParticipant { participant, signed } => {
let signer = signer(signed);
let number = BlockNumber(block_number + 1);
self.synced_block.write().await.send(SyncedBlock { number, block, commit }).await.unwrap();
result.next().await.unwrap()
}
// Sync a block.
// TODO: Since we have a static validator set, we should only need the tail commit?
pub async fn sync_block(&self, block: Block<T>, commit: Vec<u8>) -> bool {
let mut result = self.synced_block_result.write().await;
self.sync_block_internal(block, commit, &mut result).await
}
// Return true if the message should be rebroadcasted.
pub async fn handle_message(&self, msg: &[u8]) -> bool {
match msg.first() {
Some(&TRANSACTION_MESSAGE) => {
let Ok(tx) = Transaction::read::<&[u8]>(&mut &msg[1 ..]) else {
log::error!("received invalid transaction message");
return false;
};
// TODO: Sync mempools with fellow peers
// Can we just rebroadcast transactions not included for at least two blocks?
let res =
self.network.blockchain.write().await.add_transaction::<TendermintNetwork<D, T, P>>(
false,
tx,
&self.network.signature_scheme(),
// Check the participant voted to be removed actually exists
if !self.validators.iter().any(|validator| *validator == participant) {
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
signer,
"voted to remove non-existent participant",
);
log::debug!("received transaction message. valid new transaction: {res:?}");
res == Ok(true)
}
return;
}
Some(&TENDERMINT_MESSAGE) => {
let Ok(msg) =
SignedMessageFor::<TendermintNetwork<D, T, P>>::decode::<&[u8]>(&mut &msg[1 ..])
else {
log::error!("received invalid tendermint message");
return false;
match TributaryDb::accumulate(
self.tributary_txn,
self.set,
self.validators,
self.total_weight,
block_number,
Topic::RemoveParticipant { participant },
signer,
self.validator_weights[&signer],
&(),
) {
DataSet::None => {}
DataSet::Participating(_) => {
TributaryDb::fatal_slash(self.tributary_txn, self.set, participant, "voted to remove");
}
};
self.messages.write().await.send(msg).await.unwrap();
false
}
_ => false,
// Send the participation to the processor
Transaction::DkgParticipation { participation, signed } => {
TributaryDb::send_message(
self.tributary_txn,
self.set,
messages::key_gen::CoordinatorMessage::Participation {
session: self.set.session,
participant: todo!("TODO"),
participation,
},
);
}
Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed } => {
// Accumulate the preprocesses into our own FROST attempt manager
todo!("TODO")
}
Transaction::DkgConfirmationShare { attempt, share, signed } => {
// Accumulate the shares into our own FROST attempt manager
todo!("TODO")
}
Transaction::Cosign { substrate_block_hash } => {
// Update the latest intended-to-be-cosigned Substrate block
TributaryDb::set_latest_substrate_block_to_cosign(
self.tributary_txn,
self.set,
substrate_block_hash,
);
// Start a new cosign if we aren't already working on one
self.potentially_start_cosign();
}
Transaction::Cosigned { substrate_block_hash } => {
/*
We provide one Cosigned per Cosign transaction, but they have independent orders. This
means we may receive Cosigned before Cosign. In order to ensure we only start work on
not-yet-Cosigned cosigns, we flag all cosigned blocks as cosigned. Then, when we choose
the next block to work on, we won't if it's already been cosigned.
*/
TributaryDb::mark_cosigned(self.tributary_txn, self.set, substrate_block_hash);
// If we aren't actively cosigning this block, return
// This occurs when we have Cosign TXs A, B, C, we received Cosigned for A and start on C,
// and then receive Cosigned for B
if TributaryDb::actively_cosigning(self.tributary_txn, self.set) !=
Some(substrate_block_hash)
{
return;
}
// Since this is the block we were cosigning, mark us as having finished cosigning
TributaryDb::finish_cosigning(self.tributary_txn, self.set);
// Start working on the next cosign
self.potentially_start_cosign();
}
Transaction::SubstrateBlock { hash } => {
// Whitelist all of the IDs this Substrate block causes to be signed
todo!("TODO")
}
Transaction::Batch { hash } => {
// Whitelist the signing of this batch, publishing our own preprocess
todo!("TODO")
}
Transaction::SlashReport { slash_points, signed } => {
let signer = signer(signed);
if slash_points.len() != self.validators.len() {
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
signer,
"slash report was for a distinct amount of signers",
);
return;
}
// Accumulate, and if past the threshold, calculate *the* slash report and start signing it
match TributaryDb::accumulate(
self.tributary_txn,
self.set,
self.validators,
self.total_weight,
block_number,
Topic::SlashReport,
signer,
self.validator_weights[&signer],
&slash_points,
) {
DataSet::None => {}
DataSet::Participating(data_set) => {
// Find the median reported slashes for this validator
/*
TODO: This lets 34% perform a fatal slash. That shouldn't be allowed. We need
to accept slash reports for a period past the threshold, and only fatally slash if we
have a supermajority agree the slash should be fatal. If there isn't a supermajority,
but the median believe the slash should be fatal, we need to fallback to a large
constant.
Also, TODO, each slash point should probably be considered as
`MAX_KEY_SHARES_PER_SET * BLOCK_TIME` seconds of downtime. As this time crosses
various thresholds (1 day, 3 days, etc), a multiplier should be attached.
*/
let mut median_slash_report = Vec::with_capacity(self.validators.len());
for i in 0 .. self.validators.len() {
let mut this_validator =
data_set.values().map(|report| report[i]).collect::<Vec<_>>();
this_validator.sort_unstable();
// Choose the median, where if there are two median values, the lower one is chosen
let median_index = if (this_validator.len() % 2) == 1 {
this_validator.len() / 2
} else {
(this_validator.len() / 2) - 1
};
median_slash_report.push(this_validator[median_index]);
}
// We only publish slashes for the `f` worst performers to:
// 1) Effect amnesty if there were network disruptions which affected everyone
// 2) Ensure the signing threshold doesn't have a disincentive to do their job
// Find the worst performer within the signing threshold's slash points
let f = (self.validators.len() - 1) / 3;
let worst_validator_in_supermajority_slash_points = {
let mut sorted_slash_points = median_slash_report.clone();
sorted_slash_points.sort_unstable();
// This won't be a valid index if `f == 0`, which means we don't have any validators
// to slash
let index_of_first_validator_to_slash = self.validators.len() - f;
let index_of_worst_validator_in_supermajority = index_of_first_validator_to_slash - 1;
sorted_slash_points[index_of_worst_validator_in_supermajority]
};
// Perform the amortization
for slash_points in &mut median_slash_report {
*slash_points =
slash_points.saturating_sub(worst_validator_in_supermajority_slash_points)
}
let amortized_slash_report = median_slash_report;
// Create the resulting slash report
let mut slash_report = vec![];
for (validator, points) in self.validators.iter().copied().zip(amortized_slash_report) {
if points != 0 {
slash_report.push(Slash { key: validator.into(), points });
}
}
assert!(slash_report.len() <= f);
// Recognize the topic for signing the slash report
TributaryDb::recognize_topic(
self.tributary_txn,
self.set,
Topic::Sign {
id: VariantSignId::SlashReport,
attempt: 0,
round: SigningProtocolRound::Preprocess,
},
);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.tributary_txn,
self.set,
messages::coordinator::CoordinatorMessage::SignSlashReport {
session: self.set.session,
report: slash_report,
},
);
}
};
}
Transaction::Sign { id, attempt, round, data, signed } => {
let topic = Topic::Sign { id, attempt, round };
let signer = signer(signed);
if u64::try_from(data.len()).unwrap() != self.validator_weights[&signer] {
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
signer,
"signer signed with a distinct amount of key shares than they had key shares",
);
return;
}
match TributaryDb::accumulate(
self.tributary_txn,
self.set,
self.validators,
self.total_weight,
block_number,
topic,
signer,
self.validator_weights[&signer],
&data,
) {
DataSet::None => {}
DataSet::Participating(data_set) => {
let id = topic.sign_id(self.set).expect("Topic::Sign didn't have SignId");
let flatten_data_set = |data_set| todo!("TODO");
let data_set = flatten_data_set(data_set);
TributaryDb::send_message(
self.tributary_txn,
self.set,
match round {
SigningProtocolRound::Preprocess => {
messages::sign::CoordinatorMessage::Preprocesses { id, preprocesses: data_set }
}
SigningProtocolRound::Share => {
messages::sign::CoordinatorMessage::Shares { id, shares: data_set }
}
},
)
}
};
}
}
}
/// Get a Future which will resolve once the next block has been added.
pub async fn next_block_notification(
&self,
) -> impl Send + Sync + core::future::Future<Output = Result<(), impl Send + Sync>> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.network.blockchain.write().await.next_block_notifications.push_back(tx);
rx
fn handle_block(mut self, block_number: u64, block: Block<Transaction>) {
TributaryDb::start_of_block(self.tributary_txn, self.set, block_number);
for tx in block.transactions {
match tx {
TributaryTransaction::Tendermint(TendermintTx::SlashEvidence(ev)) => {
// Since the evidence is on the chain, it will have already been validated
// We can just punish the signer
let data = match ev {
Evidence::ConflictingMessages(first, second) => (first, Some(second)),
Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None),
};
let msgs = (
decode_signed_message::<TendermintNetwork<TD, Transaction, P>>(&data.0).unwrap(),
if data.1.is_some() {
Some(
decode_signed_message::<TendermintNetwork<TD, Transaction, P>>(&data.1.unwrap())
.unwrap(),
)
} else {
None
},
);
// Since anything with evidence is fundamentally faulty behavior, not just temporal
// errors, mark the node as fatally slashed
TributaryDb::fatal_slash(
self.tributary_txn,
self.set,
SeraiAddress(msgs.0.msg.sender),
&format!("invalid tendermint messages: {msgs:?}"),
);
}
TributaryTransaction::Application(tx) => {
self.handle_application_tx(block_number, tx);
}
}
}
}
}
#[derive(Clone)]
pub struct TributaryReader<D: Db, T: TransactionTrait>(D, [u8; 32], PhantomData<T>);
impl<D: Db, T: TransactionTrait> TributaryReader<D, T> {
pub fn genesis(&self) -> [u8; 32] {
self.1
}
/// The task to scan the Tributary, populating `ProcessorMessages`.
pub struct ScanTributaryTask<CD: Db, TD: Db, P: P2p> {
cosign_db: CD,
tributary_db: TD,
set: ValidatorSet,
validators: Vec<SeraiAddress>,
total_weight: u64,
validator_weights: HashMap<SeraiAddress, u64>,
tributary: TributaryReader<TD, Transaction>,
_p2p: PhantomData<P>,
}
// Since these values are static once set, they can be safely read from the database without lock
// acquisition
pub fn block(&self, hash: &[u8; 32]) -> Option<Block<T>> {
Blockchain::<D, T>::block_from_db(&self.0, self.1, hash)
}
pub fn commit(&self, hash: &[u8; 32]) -> Option<Vec<u8>> {
Blockchain::<D, T>::commit_from_db(&self.0, self.1, hash)
}
pub fn parsed_commit(&self, hash: &[u8; 32]) -> Option<Commit<Validators>> {
self.commit(hash).map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap())
}
pub fn block_after(&self, hash: &[u8; 32]) -> Option<[u8; 32]> {
Blockchain::<D, T>::block_after(&self.0, self.1, hash)
}
pub fn time_of_block(&self, hash: &[u8; 32]) -> Option<u64> {
self
.commit(hash)
.map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time)
}
impl<CD: Db, TD: Db, P: P2p> ScanTributaryTask<CD, TD, P> {
/// Create a new instance of this task.
pub fn new(
cosign_db: CD,
tributary_db: TD,
new_set: &NewSetInformation,
tributary: TributaryReader<TD, Transaction>,
) -> Self {
let mut validators = Vec::with_capacity(new_set.validators.len());
let mut total_weight = 0;
let mut validator_weights = HashMap::with_capacity(new_set.validators.len());
for (validator, weight) in new_set.validators.iter().copied() {
let validator = SeraiAddress::from(validator);
let weight = u64::from(weight);
validators.push(validator);
total_weight += weight;
validator_weights.insert(validator, weight);
}
pub fn locally_provided_txs_in_block(&self, hash: &[u8; 32], order: &str) -> bool {
Blockchain::<D, T>::locally_provided_txs_in_block(&self.0, &self.1, hash, order)
}
// This isn't static, yet can be read with only minor discrepancy risks
pub fn tip(&self) -> [u8; 32] {
Blockchain::<D, T>::tip_from_db(&self.0, self.1)
ScanTributaryTask {
cosign_db,
tributary_db,
set: new_set.set,
validators,
total_weight,
validator_weights,
tributary,
_p2p: PhantomData,
}
}
}
impl<CD: Db, TD: Db, P: P2p> ContinuallyRan for ScanTributaryTask<CD, TD, P> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.tributary_db, self.set)
.unwrap_or((0, self.tributary.genesis()));
let mut made_progress = false;
while let Some(next) = self.tributary.block_after(&last_block_hash) {
let block = self.tributary.block(&next).unwrap();
let block_number = last_block_number + 1;
let block_hash = block.hash();
// Make sure we have all of the provided transactions for this block
for tx in &block.transactions {
let TransactionKind::Provided(order) = tx.kind() else {
continue;
};
// make sure we have all the provided txs in this block locally
if !self.tributary.locally_provided_txs_in_block(&block_hash, order) {
return Err(format!(
"didn't have the provided Transactions on-chain for set (ephemeral error): {:?}",
self.set
));
}
}
let mut tributary_txn = self.tributary_db.txn();
(ScanBlock {
_td: PhantomData::<TD>,
_p2p: PhantomData::<P>,
cosign_db: &self.cosign_db,
tributary_txn: &mut tributary_txn,
set: self.set,
validators: &self.validators,
total_weight: self.total_weight,
validator_weights: &self.validator_weights,
})
.handle_block(block_number, block);
TributaryDb::set_last_handled_tributary_block(
&mut tributary_txn,
self.set,
block_number,
block_hash,
);
last_block_number = block_number;
last_block_hash = block_hash;
tributary_txn.commit();
made_progress = true;
}
Ok(made_progress)
}
}
}
/// Create the Transaction::SlashReport to publish per the local view.
pub fn slash_report_transaction(getter: &impl Get, set: &NewSetInformation) -> Transaction {
let mut slash_points = Vec::with_capacity(set.validators.len());
for (validator, _weight) in set.validators.iter().copied() {
let validator = SeraiAddress::from(validator);
slash_points.push(SlashPoints::get(getter, set.set, validator).unwrap_or(0));
}
Transaction::SlashReport { slash_points, signed: Signed::default() }
}

View File

@@ -1,218 +1,365 @@
use core::fmt::Debug;
use core::{ops::Deref, fmt::Debug};
use std::io;
use zeroize::Zeroize;
use thiserror::Error;
use blake2::{Digest, Blake2b512};
use zeroize::Zeroizing;
use rand_core::{RngCore, CryptoRng};
use blake2::{digest::typenum::U32, Digest, Blake2b};
use ciphersuite::{
group::{Group, GroupEncoding},
group::{ff::Field, Group, GroupEncoding},
Ciphersuite, Ristretto,
};
use schnorr::SchnorrSignature;
use crate::{TRANSACTION_SIZE_LIMIT, ReadWrite};
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
#[derive(Clone, PartialEq, Eq, Debug, Error)]
pub enum TransactionError {
/// Transaction exceeded the size limit.
#[error("transaction is too large")]
TooLargeTransaction,
/// Transaction's signer isn't a participant.
#[error("invalid signer")]
InvalidSigner,
/// Transaction's nonce isn't the prior nonce plus one.
#[error("invalid nonce")]
InvalidNonce,
/// Transaction's signature is invalid.
#[error("invalid signature")]
InvalidSignature,
/// Transaction's content is invalid.
#[error("transaction content is invalid")]
InvalidContent,
/// Transaction's signer has too many transactions in the mempool.
#[error("signer has too many transactions in the mempool")]
TooManyInMempool,
/// Provided Transaction added to mempool.
#[error("provided transaction added to mempool")]
ProvidedAddedToMempool,
use serai_client::{primitives::SeraiAddress, validator_sets::primitives::MAX_KEY_SHARES_PER_SET};
use messages::sign::VariantSignId;
use tributary_sdk::{
ReadWrite,
transaction::{
Signed as TributarySigned, TransactionError, TransactionKind, Transaction as TransactionTrait,
},
};
/// The round this data is for, within a signing protocol.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)]
pub enum SigningProtocolRound {
/// A preprocess.
Preprocess,
/// A signature share.
Share,
}
/// Data for a signed transaction.
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Signed {
pub signer: <Ristretto as Ciphersuite>::G,
pub nonce: u32,
pub signature: SchnorrSignature<Ristretto>,
}
impl ReadWrite for Signed {
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let signer = Ristretto::read_G(reader)?;
let mut nonce = [0; 4];
reader.read_exact(&mut nonce)?;
let nonce = u32::from_le_bytes(nonce);
if nonce >= (u32::MAX - 1) {
Err(io::Error::other("nonce exceeded limit"))?;
impl SigningProtocolRound {
fn nonce(&self) -> u32 {
match self {
SigningProtocolRound::Preprocess => 0,
SigningProtocolRound::Share => 1,
}
let mut signature = SchnorrSignature::<Ristretto>::read(reader)?;
if signature.R.is_identity().into() {
// Anyone malicious could remove this and try to find zero signatures
// We should never produce zero signatures though meaning this should never come up
// If it does somehow come up, this is a decent courtesy
signature.zeroize();
Err(io::Error::other("signature nonce was identity"))?;
}
Ok(Signed { signer, nonce, signature })
}
}
fn write<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
// This is either an invalid signature or a private key leak
if self.signature.R.is_identity().into() {
Err(io::Error::other("signature nonce was identity"))?;
}
writer.write_all(&self.signer.to_bytes())?;
writer.write_all(&self.nonce.to_le_bytes())?;
/// `tributary::Signed` but without the nonce.
///
/// All of our nonces are deterministic to the type of transaction and fields within.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Signed {
/// The signer.
signer: <Ristretto as Ciphersuite>::G,
/// The signature.
signature: SchnorrSignature<Ristretto>,
}
impl BorshSerialize for Signed {
fn serialize<W: io::Write>(&self, writer: &mut W) -> Result<(), io::Error> {
writer.write_all(self.signer.to_bytes().as_ref())?;
self.signature.write(writer)
}
}
impl BorshDeserialize for Signed {
fn deserialize_reader<R: io::Read>(reader: &mut R) -> Result<Self, io::Error> {
let signer = Ristretto::read_G(reader)?;
let signature = SchnorrSignature::read(reader)?;
Ok(Self { signer, signature })
}
}
impl Signed {
pub fn read_without_nonce<R: io::Read>(reader: &mut R, nonce: u32) -> io::Result<Self> {
let signer = Ristretto::read_G(reader)?;
let mut signature = SchnorrSignature::<Ristretto>::read(reader)?;
if signature.R.is_identity().into() {
// Anyone malicious could remove this and try to find zero signatures
// We should never produce zero signatures though meaning this should never come up
// If it does somehow come up, this is a decent courtesy
signature.zeroize();
Err(io::Error::other("signature nonce was identity"))?;
}
Ok(Signed { signer, nonce, signature })
/// Fetch the signer.
pub(crate) fn signer(&self) -> <Ristretto as Ciphersuite>::G {
self.signer
}
pub fn write_without_nonce<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
// This is either an invalid signature or a private key leak
if self.signature.R.is_identity().into() {
Err(io::Error::other("signature nonce was identity"))?;
}
writer.write_all(&self.signer.to_bytes())?;
self.signature.write(writer)
/// Provide a nonce to convert a `Signed` into a `tributary::Signed`.
fn to_tributary_signed(self, nonce: u32) -> TributarySigned {
TributarySigned { signer: self.signer, nonce, signature: self.signature }
}
}
#[allow(clippy::large_enum_variant)]
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum TransactionKind {
/// This transaction should be provided by every validator, in an exact order.
///
/// The contained static string names the orderer to use. This allows two distinct provided
/// transaction kinds, without a synchronized order, to be ordered within their own kind without
/// requiring ordering with each other.
///
/// The only malleability is in when this transaction appears on chain. The block producer will
/// include it when they have it. Block verification will fail for validators without it.
///
/// If a supermajority of validators produce a commit for a block with a provided transaction
/// which isn't locally held, the block will be added to the local chain. When the transaction is
/// locally provided, it will be compared for correctness to the on-chain version
///
/// In order to ensure TXs aren't accidentally provided multiple times, all provided transactions
/// must have a unique hash which is also unique to all Unsigned transactions.
Provided(&'static str),
/// An unsigned transaction, only able to be included by the block producer.
///
/// Once an Unsigned transaction is included on-chain, it may not be included again. In order to
/// have multiple Unsigned transactions with the same values included on-chain, some distinct
/// nonce must be included in order to cause a distinct hash.
///
/// The hash must also be unique with all Provided transactions.
Unsigned,
/// A signed transaction.
Signed(Vec<u8>, Signed),
impl Default for Signed {
fn default() -> Self {
Self {
signer: <Ristretto as Ciphersuite>::G::identity(),
signature: SchnorrSignature {
R: <Ristretto as Ciphersuite>::G::identity(),
s: <Ristretto as Ciphersuite>::F::ZERO,
},
}
}
}
// TODO: Should this be renamed TransactionTrait now that a literal Transaction exists?
// Or should the literal Transaction be renamed to Event?
pub trait Transaction: 'static + Send + Sync + Clone + Eq + Debug + ReadWrite {
/// Return what type of transaction this is.
fn kind(&self) -> TransactionKind;
/// The Tributary transaction definition used by Serai
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub enum Transaction {
/// A vote to remove a participant for invalid behavior
RemoveParticipant {
/// The participant to remove
participant: SeraiAddress,
/// The transaction's signer and signature
signed: Signed,
},
/// Return the hash of this transaction.
///
/// The hash must NOT commit to the signature.
fn hash(&self) -> [u8; 32];
/// A participation in the DKG
DkgParticipation {
/// The serialized participation
participation: Vec<u8>,
/// The transaction's signer and signature
signed: Signed,
},
/// The preprocess to confirm the DKG results on-chain
DkgConfirmationPreprocess {
/// The attempt number of this signing protocol
attempt: u32,
/// The preprocess
preprocess: [u8; 64],
/// The transaction's signer and signature
signed: Signed,
},
/// The signature share to confirm the DKG results on-chain
DkgConfirmationShare {
/// The attempt number of this signing protocol
attempt: u32,
/// The signature share
share: [u8; 32],
/// The transaction's signer and signature
signed: Signed,
},
/// Perform transaction-specific verification.
fn verify(&self) -> Result<(), TransactionError>;
/// Intend to cosign a finalized Substrate block
///
/// When the time comes to start a new cosigning protocol, the most recent Substrate block will
/// be the one selected to be cosigned.
Cosign {
/// The hash of the Substrate block to cosign
substrate_block_hash: [u8; 32],
},
/// Obtain the challenge for this transaction's signature.
/// Note an intended-to-be-cosigned Substrate block as cosigned
///
/// Do not override this unless you know what you're doing.
/// After producing this cosign, we need to start work on the latest intended-to-be cosigned
/// block. That requires agreement on when this cosign was produced, which we solve by noting
/// this cosign on-chain.
///
/// Panics if called on non-signed transactions.
fn sig_hash(&self, genesis: [u8; 32]) -> <Ristretto as Ciphersuite>::F {
match self.kind() {
TransactionKind::Signed(order, Signed { signature, .. }) => {
<Ristretto as Ciphersuite>::F::from_bytes_mod_order_wide(
&Blake2b512::digest(
[
b"Tributary Signed Transaction",
genesis.as_ref(),
&self.hash(),
order.as_ref(),
signature.R.to_bytes().as_ref(),
]
.concat(),
)
.into(),
)
/// We ideally don't have this transaction at all. The coordinator, without access to any of the
/// key shares, could observe the FROST signing session and determine a successful completion.
/// Unfortunately, that functionality is not present in modular-frost, so we do need to support
/// *some* asynchronous flow (where the processor or P2P network informs us of the successful
/// completion).
///
/// If we use a `Provided` transaction, that requires everyone observe this cosign.
///
/// If we use an `Unsigned` transaction, we can't verify the cosign signature inside
/// `Transaction::verify` unless we embedded the full `SignedCosign` on-chain. The issue is since
/// a Tributary is stateless with regards to the on-chain logic, including `Transaction::verify`,
/// we can't verify the signature against the group's public key unless we also include that (but
/// then we open a DoS where arbitrary group keys are specified to cause inclusion of arbitrary
/// blobs on chain).
///
/// If we use a `Signed` transaction, we mitigate the DoS risk by having someone to fatally
/// slash. We have horrible performance though as for 100 validators, all 100 will publish this
/// transaction.
///
/// We could use a signed `Unsigned` transaction, where it includes a signer and signature but
/// isn't technically a Signed transaction. This lets us de-duplicate the transaction premised on
/// its contents.
///
/// The optimal choice is likely to use a `Provided` transaction. We don't actually need to
/// observe the produced cosign (which is ephemeral). As long as it's agreed the cosign in
/// question no longer needs to produced, which would mean the cosigning protocol at-large
/// cosigning the block in question, it'd be safe to provide this and move on to the next cosign.
Cosigned {
/// The hash of the Substrate block which was cosigned
substrate_block_hash: [u8; 32],
},
/// Acknowledge a Substrate block
///
/// This is provided after the block has been cosigned.
///
/// With the acknowledgement of a Substrate block, we can whitelist all the `VariantSignId`s
/// resulting from its handling.
SubstrateBlock {
/// The hash of the Substrate block
hash: [u8; 32],
},
/// Acknowledge a Batch
///
/// Once everyone has acknowledged the Batch, we can begin signing it.
Batch {
/// The hash of the Batch's serialization.
///
/// Generally, we refer to a Batch by its ID/the hash of its instructions. Here, we want to
/// ensure consensus on the Batch, and achieving consensus on its hash is the most effective
/// way to do that.
hash: [u8; 32],
},
/// Data from a signing protocol.
Sign {
/// The ID of the object being signed
id: VariantSignId,
/// The attempt number of this signing protocol
attempt: u32,
/// The round this data is for, within the signing protocol
round: SigningProtocolRound,
/// The data itself
///
/// There will be `n` blobs of data where `n` is the amount of key shares the validator sending
/// this transaction has.
data: Vec<Vec<u8>>,
/// The transaction's signer and signature
signed: Signed,
},
/// The local view of slashes observed by the transaction's sender
SlashReport {
/// The slash points accrued by each validator
slash_points: Vec<u32>,
/// The transaction's signer and signature
signed: Signed,
},
}
impl ReadWrite for Transaction {
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
borsh::from_reader(reader)
}
fn write<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
borsh::to_writer(writer, self)
}
}
impl TransactionTrait for Transaction {
fn kind(&self) -> TransactionKind {
match self {
Transaction::RemoveParticipant { participant, signed } => TransactionKind::Signed(
(b"RemoveParticipant", participant).encode(),
signed.to_tributary_signed(0),
),
Transaction::DkgParticipation { signed, .. } => {
TransactionKind::Signed(b"DkgParticipation".encode(), signed.to_tributary_signed(0))
}
Transaction::DkgConfirmationPreprocess { attempt, signed, .. } => TransactionKind::Signed(
(b"DkgConfirmation", attempt).encode(),
signed.to_tributary_signed(0),
),
Transaction::DkgConfirmationShare { attempt, signed, .. } => TransactionKind::Signed(
(b"DkgConfirmation", attempt).encode(),
signed.to_tributary_signed(1),
),
Transaction::Cosign { .. } => TransactionKind::Provided("Cosign"),
Transaction::Cosigned { .. } => TransactionKind::Provided("Cosigned"),
// TODO: Provide this
Transaction::SubstrateBlock { .. } => TransactionKind::Provided("SubstrateBlock"),
// TODO: Provide this
Transaction::Batch { .. } => TransactionKind::Provided("Batch"),
Transaction::Sign { id, attempt, round, signed, .. } => TransactionKind::Signed(
(b"Sign", id, attempt).encode(),
signed.to_tributary_signed(round.nonce()),
),
Transaction::SlashReport { signed, .. } => {
TransactionKind::Signed(b"SlashReport".encode(), signed.to_tributary_signed(0))
}
_ => panic!("sig_hash called on non-signed transaction"),
}
}
}
pub trait GAIN: FnMut(&<Ristretto as Ciphersuite>::G, &[u8]) -> Option<u32> {}
impl<F: FnMut(&<Ristretto as Ciphersuite>::G, &[u8]) -> Option<u32>> GAIN for F {}
pub(crate) fn verify_transaction<F: GAIN, T: Transaction>(
tx: &T,
genesis: [u8; 32],
get_and_increment_nonce: &mut F,
) -> Result<(), TransactionError> {
if tx.serialize().len() > TRANSACTION_SIZE_LIMIT {
Err(TransactionError::TooLargeTransaction)?;
fn hash(&self) -> [u8; 32] {
let mut tx = ReadWrite::serialize(self);
if let TransactionKind::Signed(_, signed) = self.kind() {
// Make sure the part we're cutting off is the signature
assert_eq!(tx.drain((tx.len() - 64) ..).collect::<Vec<_>>(), signed.signature.serialize());
}
Blake2b::<U32>::digest(&tx).into()
}
tx.verify()?;
// This is a stateless verification which we use to enforce some size limits.
fn verify(&self) -> Result<(), TransactionError> {
#[allow(clippy::match_same_arms)]
match self {
// Fixed-length TX
Transaction::RemoveParticipant { .. } => {}
match tx.kind() {
TransactionKind::Provided(_) | TransactionKind::Unsigned => {}
TransactionKind::Signed(order, Signed { signer, nonce, signature }) => {
if let Some(next_nonce) = get_and_increment_nonce(&signer, &order) {
if nonce != next_nonce {
Err(TransactionError::InvalidNonce)?;
// TODO: MAX_DKG_PARTICIPATION_LEN
Transaction::DkgParticipation { .. } => {}
// These are fixed-length TXs
Transaction::DkgConfirmationPreprocess { .. } | Transaction::DkgConfirmationShare { .. } => {}
// Provided TXs
Transaction::Cosign { .. } |
Transaction::Cosigned { .. } |
Transaction::SubstrateBlock { .. } |
Transaction::Batch { .. } => {}
Transaction::Sign { data, .. } => {
if data.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() {
Err(TransactionError::InvalidContent)?
}
} else {
// Not a participant
Err(TransactionError::InvalidSigner)?;
// TODO: MAX_SIGN_LEN
}
// TODO: Use a batch verification here
if !signature.verify(signer, tx.sig_hash(genesis)) {
Err(TransactionError::InvalidSignature)?;
Transaction::SlashReport { slash_points, .. } => {
if slash_points.len() > usize::try_from(MAX_KEY_SHARES_PER_SET).unwrap() {
Err(TransactionError::InvalidContent)?
}
}
};
Ok(())
}
}
impl Transaction {
/// Sign a transaction.
///
/// Panics if signing a transaction whose type isn't `TransactionKind::Signed`.
pub fn sign<R: RngCore + CryptoRng>(
&mut self,
rng: &mut R,
genesis: [u8; 32],
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
) {
fn signed(tx: &mut Transaction) -> &mut Signed {
#[allow(clippy::match_same_arms)] // This doesn't make semantic sense here
match tx {
Transaction::RemoveParticipant { ref mut signed, .. } |
Transaction::DkgParticipation { ref mut signed, .. } |
Transaction::DkgConfirmationPreprocess { ref mut signed, .. } => signed,
Transaction::DkgConfirmationShare { ref mut signed, .. } => signed,
Transaction::Cosign { .. } => panic!("signing CosignSubstrateBlock"),
Transaction::Cosigned { .. } => panic!("signing Cosigned"),
Transaction::SubstrateBlock { .. } => panic!("signing SubstrateBlock"),
Transaction::Batch { .. } => panic!("signing Batch"),
Transaction::Sign { ref mut signed, .. } => signed,
Transaction::SlashReport { ref mut signed, .. } => signed,
}
}
}
Ok(())
// Decide the nonce to sign with
let sig_nonce = Zeroizing::new(<Ristretto as Ciphersuite>::F::random(rng));
{
// Set the signer and the nonce
let signed = signed(self);
signed.signer = Ristretto::generator() * key.deref();
signed.signature.R = <Ristretto as Ciphersuite>::generator() * sig_nonce.deref();
}
// Get the signature hash (which now includes `R || A` making it valid as the challenge)
let sig_hash = self.sig_hash(genesis);
// Sign the signature
signed(self).signature = SchnorrSignature::<Ristretto>::sign(key, sig_nonce, sig_hash);
}
}

View File

@@ -72,9 +72,10 @@ exceptions = [
{ allow = ["AGPL-3.0"], name = "serai-ethereum-processor" },
{ allow = ["AGPL-3.0"], name = "serai-monero-processor" },
{ allow = ["AGPL-3.0"], name = "tributary-chain" },
{ allow = ["AGPL-3.0"], name = "tributary-sdk" },
{ allow = ["AGPL-3.0"], name = "serai-cosign" },
{ allow = ["AGPL-3.0"], name = "serai-coordinator-substrate" },
{ allow = ["AGPL-3.0"], name = "serai-coordinator-tributary" },
{ allow = ["AGPL-3.0"], name = "serai-coordinator-p2p" },
{ allow = ["AGPL-3.0"], name = "serai-coordinator-libp2p-p2p" },
{ allow = ["AGPL-3.0"], name = "serai-coordinator" },

View File

@@ -24,7 +24,7 @@ pub mod key_gen {
pub enum CoordinatorMessage {
/// Instructs the Processor to begin the key generation process.
///
/// This is sent by the Coordinator when it creates the Tributary (TODO).
/// This is sent by the Coordinator when it creates the Tributary.
GenerateKey { session: Session, threshold: u16, evrf_public_keys: Vec<([u8; 32], Vec<u8>)> },
/// Received participations for the specified key generation protocol.
///
@@ -213,17 +213,17 @@ pub mod substrate {
pub enum CoordinatorMessage {
/// Keys set on the Serai blockchain.
///
/// This is set by the Coordinator's Substrate canonical event stream.
/// This is sent by the Coordinator's Substrate canonical event stream.
SetKeys { serai_time: u64, session: Session, key_pair: KeyPair },
/// Slashes reported on the Serai blockchain OR the process timed out.
///
/// This is the final message for a session,
///
/// This is set by the Coordinator's Substrate canonical event stream.
/// This is sent by the Coordinator's Substrate canonical event stream.
SlashesReported { session: Session },
/// A block from Serai with relevance to this processor.
///
/// This is set by the Coordinator's Substrate canonical event stream.
/// This is sent by the Coordinator's Substrate canonical event stream.
Block {
serai_block_number: u64,
batch: Option<ExecutedBatch>,