Start work on cleaning up the coordinator's tributary handling

This commit is contained in:
Luke Parker
2025-01-02 09:11:04 -05:00
parent 6272c40561
commit bcd3f14f4f
24 changed files with 582 additions and 5674 deletions

1
Cargo.lock generated
View File

@@ -8332,6 +8332,7 @@ dependencies = [
"serai-env",
"serai-message-queue",
"serai-processor-messages",
"serai-task",
"sp-application-crypto",
"sp-runtime",
"tokio",

View File

@@ -37,6 +37,7 @@ scale = { package = "parity-scale-codec", version = "3", default-features = fals
zalloc = { path = "../common/zalloc" }
serai-db = { path = "../common/db" }
serai-env = { path = "../common/env" }
serai-task = { path = "../common/task", version = "0.1" }
processor-messages = { package = "serai-processor-messages", path = "../processor/messages" }
message-queue = { package = "serai-message-queue", path = "../message-queue" }

View File

@@ -1,134 +0,0 @@
use blake2::{
digest::{consts::U32, Digest},
Blake2b,
};
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
in_instructions::primitives::{Batch, SignedBatch},
};
pub use serai_db::*;
use ::tributary::ReadWrite;
use crate::tributary::{TributarySpec, Transaction, scanner::RecognizedIdType};
create_db!(
MainDb {
HandledMessageDb: (network: NetworkId) -> u64,
ActiveTributaryDb: () -> Vec<u8>,
RetiredTributaryDb: (set: ValidatorSet) -> (),
FirstPreprocessDb: (
network: NetworkId,
id_type: RecognizedIdType,
id: &[u8]
) -> Vec<Vec<u8>>,
LastReceivedBatchDb: (network: NetworkId) -> u32,
ExpectedBatchDb: (network: NetworkId, id: u32) -> [u8; 32],
BatchDb: (network: NetworkId, id: u32) -> SignedBatch,
LastVerifiedBatchDb: (network: NetworkId) -> u32,
HandoverBatchDb: (set: ValidatorSet) -> u32,
LookupHandoverBatchDb: (network: NetworkId, batch: u32) -> Session,
QueuedBatchesDb: (set: ValidatorSet) -> Vec<u8>
}
);
impl ActiveTributaryDb {
pub fn active_tributaries<G: Get>(getter: &G) -> (Vec<u8>, Vec<TributarySpec>) {
let bytes = Self::get(getter).unwrap_or_default();
let mut bytes_ref: &[u8] = bytes.as_ref();
let mut tributaries = vec![];
while !bytes_ref.is_empty() {
tributaries.push(TributarySpec::deserialize_reader(&mut bytes_ref).unwrap());
}
(bytes, tributaries)
}
pub fn add_participating_in_tributary(txn: &mut impl DbTxn, spec: &TributarySpec) {
let (mut existing_bytes, existing) = ActiveTributaryDb::active_tributaries(txn);
for tributary in &existing {
if tributary == spec {
return;
}
}
spec.serialize(&mut existing_bytes).unwrap();
ActiveTributaryDb::set(txn, &existing_bytes);
}
pub fn retire_tributary(txn: &mut impl DbTxn, set: ValidatorSet) {
let mut active = Self::active_tributaries(txn).1;
for i in 0 .. active.len() {
if active[i].set() == set {
active.remove(i);
break;
}
}
let mut bytes = vec![];
for active in active {
active.serialize(&mut bytes).unwrap();
}
Self::set(txn, &bytes);
RetiredTributaryDb::set(txn, set, &());
}
}
impl FirstPreprocessDb {
pub fn save_first_preprocess(
txn: &mut impl DbTxn,
network: NetworkId,
id_type: RecognizedIdType,
id: &[u8],
preprocess: &Vec<Vec<u8>>,
) {
if let Some(existing) = FirstPreprocessDb::get(txn, network, id_type, id) {
assert_eq!(&existing, preprocess, "saved a distinct first preprocess");
return;
}
FirstPreprocessDb::set(txn, network, id_type, id, preprocess);
}
}
impl ExpectedBatchDb {
pub fn save_expected_batch(txn: &mut impl DbTxn, batch: &Batch) {
LastReceivedBatchDb::set(txn, batch.network, &batch.id);
Self::set(
txn,
batch.network,
batch.id,
&Blake2b::<U32>::digest(batch.instructions.encode()).into(),
);
}
}
impl HandoverBatchDb {
pub fn set_handover_batch(txn: &mut impl DbTxn, set: ValidatorSet, batch: u32) {
Self::set(txn, set, &batch);
LookupHandoverBatchDb::set(txn, set.network, batch, &set.session);
}
}
impl QueuedBatchesDb {
pub fn queue(txn: &mut impl DbTxn, set: ValidatorSet, batch: &Transaction) {
let mut batches = Self::get(txn, set).unwrap_or_default();
batch.write(&mut batches).unwrap();
Self::set(txn, set, &batches);
}
pub fn take(txn: &mut impl DbTxn, set: ValidatorSet) -> Vec<Transaction> {
let batches_vec = Self::get(txn, set).unwrap_or_default();
txn.del(Self::key(set));
let mut batches: &[u8] = &batches_vec;
let mut res = vec![];
while !batches.is_empty() {
res.push(Transaction::read(&mut batches).unwrap());
}
res
}
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -1,46 +0,0 @@
use std::sync::Arc;
use serai_client::primitives::NetworkId;
use processor_messages::{ProcessorMessage, CoordinatorMessage};
use message_queue::{Service, Metadata, client::MessageQueue};
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct Message {
pub id: u64,
pub network: NetworkId,
pub msg: ProcessorMessage,
}
#[async_trait::async_trait]
pub trait Processors: 'static + Send + Sync + Clone {
async fn send(&self, network: NetworkId, msg: impl Send + Into<CoordinatorMessage>);
async fn recv(&self, network: NetworkId) -> Message;
async fn ack(&self, msg: Message);
}
#[async_trait::async_trait]
impl Processors for Arc<MessageQueue> {
async fn send(&self, network: NetworkId, msg: impl Send + Into<CoordinatorMessage>) {
let msg: CoordinatorMessage = msg.into();
let metadata =
Metadata { from: self.service, to: Service::Processor(network), intent: msg.intent() };
let msg = borsh::to_vec(&msg).unwrap();
self.queue(metadata, msg).await;
}
async fn recv(&self, network: NetworkId) -> Message {
let msg = self.next(Service::Processor(network)).await;
assert_eq!(msg.from, Service::Processor(network));
let id = msg.id;
// Deserialize it into a ProcessorMessage
let msg: ProcessorMessage =
borsh::from_slice(&msg.msg).expect("message wasn't a borsh-encoded ProcessorMessage");
return Message { id, network, msg };
}
async fn ack(&self, msg: Message) {
MessageQueue::ack(self, Service::Processor(msg.network), msg.id).await
}
}

View File

@@ -1,125 +0,0 @@
use core::fmt::Debug;
use std::{
sync::Arc,
collections::{VecDeque, HashSet, HashMap},
};
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use processor_messages::CoordinatorMessage;
use async_trait::async_trait;
use tokio::sync::RwLock;
use crate::{
processors::{Message, Processors},
TributaryP2p, ReqResMessageKind, GossipMessageKind, P2pMessageKind, Message as P2pMessage, P2p,
};
pub mod tributary;
#[derive(Clone)]
pub struct MemProcessors(pub Arc<RwLock<HashMap<NetworkId, VecDeque<CoordinatorMessage>>>>);
impl MemProcessors {
#[allow(clippy::new_without_default)]
pub fn new() -> MemProcessors {
MemProcessors(Arc::new(RwLock::new(HashMap::new())))
}
}
#[async_trait::async_trait]
impl Processors for MemProcessors {
async fn send(&self, network: NetworkId, msg: impl Send + Into<CoordinatorMessage>) {
let mut processors = self.0.write().await;
let processor = processors.entry(network).or_insert_with(VecDeque::new);
processor.push_back(msg.into());
}
async fn recv(&self, _: NetworkId) -> Message {
todo!()
}
async fn ack(&self, _: Message) {
todo!()
}
}
#[allow(clippy::type_complexity)]
#[derive(Clone, Debug)]
pub struct LocalP2p(
usize,
pub Arc<RwLock<(HashSet<Vec<u8>>, Vec<VecDeque<(usize, P2pMessageKind, Vec<u8>)>>)>>,
);
impl LocalP2p {
pub fn new(validators: usize) -> Vec<LocalP2p> {
let shared = Arc::new(RwLock::new((HashSet::new(), vec![VecDeque::new(); validators])));
let mut res = vec![];
for i in 0 .. validators {
res.push(LocalP2p(i, shared.clone()));
}
res
}
}
#[async_trait]
impl P2p for LocalP2p {
type Id = usize;
async fn subscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
async fn unsubscribe(&self, _set: ValidatorSet, _genesis: [u8; 32]) {}
async fn send_raw(&self, to: Self::Id, msg: Vec<u8>) {
let mut msg_ref = msg.as_slice();
let kind = ReqResMessageKind::read(&mut msg_ref).unwrap();
self.1.write().await.1[to].push_back((self.0, P2pMessageKind::ReqRes(kind), msg_ref.to_vec()));
}
async fn broadcast_raw(&self, kind: P2pMessageKind, msg: Vec<u8>) {
// Content-based deduplication
let mut lock = self.1.write().await;
{
let already_sent = &mut lock.0;
if already_sent.contains(&msg) {
return;
}
already_sent.insert(msg.clone());
}
let queues = &mut lock.1;
let kind_len = (match kind {
P2pMessageKind::ReqRes(kind) => kind.serialize(),
P2pMessageKind::Gossip(kind) => kind.serialize(),
})
.len();
let msg = msg[kind_len ..].to_vec();
for (i, msg_queue) in queues.iter_mut().enumerate() {
if i == self.0 {
continue;
}
msg_queue.push_back((self.0, kind, msg.clone()));
}
}
async fn receive(&self) -> P2pMessage<Self> {
// This is a cursed way to implement an async read from a Vec
loop {
if let Some((sender, kind, msg)) = self.1.write().await.1[self.0].pop_front() {
return P2pMessage { sender, kind, msg };
}
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}
}
}
#[async_trait]
impl TributaryP2p for LocalP2p {
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
<Self as P2p>::broadcast(
self,
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)),
msg,
)
.await
}
}

View File

@@ -1,243 +0,0 @@
use std::{
time::{Duration, SystemTime},
collections::HashSet,
};
use zeroize::Zeroizing;
use rand_core::{RngCore, CryptoRng, OsRng};
use futures_util::{task::Poll, poll};
use ciphersuite::{group::ff::Field, Ciphersuite, Ristretto};
use borsh::BorshDeserialize;
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
};
use tokio::time::sleep;
use serai_db::MemDb;
use tributary::Tributary;
use crate::{
GossipMessageKind, P2pMessageKind, P2p,
tributary::{Transaction, TributarySpec},
tests::LocalP2p,
};
pub fn new_keys<R: RngCore + CryptoRng>(
rng: &mut R,
) -> Vec<Zeroizing<<Ristretto as Ciphersuite>::F>> {
let mut keys = vec![];
for _ in 0 .. 5 {
keys.push(Zeroizing::new(<Ristretto as Ciphersuite>::F::random(&mut *rng)));
}
keys
}
pub fn new_spec<R: RngCore + CryptoRng>(
rng: &mut R,
keys: &[Zeroizing<<Ristretto as Ciphersuite>::F>],
) -> TributarySpec {
let mut serai_block = [0; 32];
rng.fill_bytes(&mut serai_block);
let start_time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs();
let set = ValidatorSet { session: Session(0), network: NetworkId::Bitcoin };
let validators = keys
.iter()
.map(|key| ((<Ristretto as Ciphersuite>::generator() * **key), 1))
.collect::<Vec<_>>();
// Generate random eVRF keys as none of these test rely on them to have any structure
let mut evrf_keys = vec![];
for _ in 0 .. keys.len() {
let mut substrate = [0; 32];
OsRng.fill_bytes(&mut substrate);
let mut network = vec![0; 64];
OsRng.fill_bytes(&mut network);
evrf_keys.push((substrate, network));
}
let res = TributarySpec::new(serai_block, start_time, set, validators, evrf_keys);
assert_eq!(
TributarySpec::deserialize_reader(&mut borsh::to_vec(&res).unwrap().as_slice()).unwrap(),
res,
);
res
}
pub async fn new_tributaries(
keys: &[Zeroizing<<Ristretto as Ciphersuite>::F>],
spec: &TributarySpec,
) -> Vec<(MemDb, LocalP2p, Tributary<MemDb, Transaction, LocalP2p>)> {
let p2p = LocalP2p::new(keys.len());
let mut res = vec![];
for (i, key) in keys.iter().enumerate() {
let db = MemDb::new();
res.push((
db.clone(),
p2p[i].clone(),
Tributary::<_, Transaction, _>::new(
db,
spec.genesis(),
spec.start_time(),
key.clone(),
spec.validators(),
p2p[i].clone(),
)
.await
.unwrap(),
));
}
res
}
pub async fn run_tributaries(
mut tributaries: Vec<(LocalP2p, Tributary<MemDb, Transaction, LocalP2p>)>,
) {
loop {
for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind {
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
assert_eq!(genesis, tributary.genesis());
if tributary.handle_message(&msg.msg).await {
p2p.broadcast(msg.kind, msg.msg).await;
}
}
_ => panic!("unexpected p2p message found"),
}
}
}
sleep(Duration::from_millis(100)).await;
}
}
pub async fn wait_for_tx_inclusion(
tributary: &Tributary<MemDb, Transaction, LocalP2p>,
mut last_checked: [u8; 32],
hash: [u8; 32],
) -> [u8; 32] {
let reader = tributary.reader();
loop {
let tip = tributary.tip().await;
if tip == last_checked {
sleep(Duration::from_secs(1)).await;
continue;
}
let mut queue = vec![reader.block(&tip).unwrap()];
let mut block = None;
while {
let parent = queue.last().unwrap().parent();
if parent == tributary.genesis() {
false
} else {
block = Some(reader.block(&parent).unwrap());
block.as_ref().unwrap().hash() != last_checked
}
} {
queue.push(block.take().unwrap());
}
while let Some(block) = queue.pop() {
for tx in &block.transactions {
if tx.hash() == hash {
return block.hash();
}
}
}
last_checked = tip;
}
}
#[tokio::test]
async fn tributary_test() {
let keys = new_keys(&mut OsRng);
let spec = new_spec(&mut OsRng, &keys);
let mut tributaries = new_tributaries(&keys, &spec)
.await
.into_iter()
.map(|(_, p2p, tributary)| (p2p, tributary))
.collect::<Vec<_>>();
let mut blocks = 0;
let mut last_block = spec.genesis();
// Doesn't use run_tributaries as we want to wind these down at a certain point
// run_tributaries will run them ad infinitum
let timeout = SystemTime::now() + Duration::from_secs(65);
while (blocks < 10) && (SystemTime::now().duration_since(timeout).is_err()) {
for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind {
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await;
}
_ => panic!("unexpected p2p message found"),
}
}
}
let tip = tributaries[0].1.tip().await;
if tip != last_block {
last_block = tip;
blocks += 1;
}
sleep(Duration::from_millis(100)).await;
}
if blocks != 10 {
panic!("tributary chain test hit timeout");
}
// Handle all existing messages
for (p2p, tributary) in &mut tributaries {
while let Poll::Ready(msg) = poll!(p2p.receive()) {
match msg.kind {
P2pMessageKind::Gossip(GossipMessageKind::Tributary(genesis)) => {
assert_eq!(genesis, tributary.genesis());
tributary.handle_message(&msg.msg).await;
}
_ => panic!("unexpected p2p message found"),
}
}
}
// handle_message informed the Tendermint machine, yet it still has to process it
// Sleep for a second accordingly
// TODO: Is there a better way to handle this?
sleep(Duration::from_secs(1)).await;
// All tributaries should agree on the tip, within a block
let mut tips = HashSet::new();
for (_, tributary) in &tributaries {
tips.insert(tributary.tip().await);
}
assert!(tips.len() <= 2);
if tips.len() == 2 {
for tip in &tips {
// Find a Tributary where this isn't the tip
for (_, tributary) in &tributaries {
let Some(after) = tributary.reader().block_after(tip) else { continue };
// Make sure the block after is the other tip
assert!(tips.contains(&after));
return;
}
}
} else {
assert_eq!(tips.len(), 1);
return;
}
panic!("tributary had different tip with a variance exceeding one block");
}

View File

@@ -1,282 +0,0 @@
use core::time::Duration;
use zeroize::Zeroizing;
use rand_core::{RngCore, OsRng};
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use frost::Participant;
use sp_runtime::traits::Verify;
use serai_client::{
primitives::Signature,
validator_sets::primitives::{ValidatorSet, KeyPair},
};
use tokio::time::sleep;
use serai_db::{Get, DbTxn, Db, MemDb};
use processor_messages::{key_gen, CoordinatorMessage};
use tributary::{TransactionTrait, Tributary};
use crate::{
tributary::{
Transaction, TributarySpec,
scanner::{PublishSeraiTransaction, handle_new_blocks},
},
tests::{
MemProcessors, LocalP2p,
tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion},
},
};
#[tokio::test]
async fn dkg_test() {
env_logger::init();
let keys = new_keys(&mut OsRng);
let spec = new_spec(&mut OsRng, &keys);
let full_tributaries = new_tributaries(&keys, &spec).await;
let mut dbs = vec![];
let mut tributaries = vec![];
for (db, p2p, tributary) in full_tributaries {
dbs.push(db);
tributaries.push((p2p, tributary));
}
// Run the tributaries in the background
tokio::spawn(run_tributaries(tributaries.clone()));
let mut txs = vec![];
// Create DKG participation for each key
for key in &keys {
let mut participation = vec![0; 4096];
OsRng.fill_bytes(&mut participation);
let mut tx =
Transaction::DkgParticipation { participation, signed: Transaction::empty_signed() };
tx.sign(&mut OsRng, spec.genesis(), key);
txs.push(tx);
}
let block_before_tx = tributaries[0].1.tip().await;
// Publish t-1 participations
let t = ((keys.len() * 2) / 3) + 1;
for (i, tx) in txs.iter().take(t - 1).enumerate() {
assert_eq!(tributaries[i].1.add_transaction(tx.clone()).await, Ok(true));
wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, tx.hash()).await;
}
let expected_participations = txs
.iter()
.enumerate()
.map(|(i, tx)| {
if let Transaction::DkgParticipation { participation, .. } = tx {
CoordinatorMessage::KeyGen(key_gen::CoordinatorMessage::Participation {
session: spec.set().session,
participant: Participant::new((i + 1).try_into().unwrap()).unwrap(),
participation: participation.clone(),
})
} else {
panic!("txs wasn't a DkgParticipation");
}
})
.collect::<Vec<_>>();
async fn new_processors(
db: &mut MemDb,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
spec: &TributarySpec,
tributary: &Tributary<MemDb, Transaction, LocalP2p>,
) -> MemProcessors {
let processors = MemProcessors::new();
handle_new_blocks::<_, _, _, _, _, LocalP2p>(
db,
key,
&|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called in new_processors")
},
&processors,
&(),
&|_| async {
panic!(
"test tried to publish a new Tributary TX from handle_application_tx in new_processors"
)
},
spec,
&tributary.reader(),
)
.await;
processors
}
// Instantiate a scanner and verify it has the first two participations to report (and isn't
// waiting for `t`)
let processors = new_processors(&mut dbs[0], &keys[0], &spec, &tributaries[0].1).await;
assert_eq!(processors.0.read().await.get(&spec.set().network).unwrap().len(), t - 1);
// Publish the rest of the participations
let block_before_tx = tributaries[0].1.tip().await;
for tx in txs.iter().skip(t - 1) {
assert_eq!(tributaries[0].1.add_transaction(tx.clone()).await, Ok(true));
wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, tx.hash()).await;
}
// Verify the scanner emits all KeyGen::Participations messages
handle_new_blocks::<_, _, _, _, _, LocalP2p>(
&mut dbs[0],
&keys[0],
&|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after DkgParticipation")
},
&processors,
&(),
&|_| async {
panic!(
"test tried to publish a new Tributary TX from handle_application_tx after DkgParticipation"
)
},
&spec,
&tributaries[0].1.reader(),
)
.await;
{
let mut msgs = processors.0.write().await;
let msgs = msgs.get_mut(&spec.set().network).unwrap();
assert_eq!(msgs.len(), keys.len());
for expected in &expected_participations {
assert_eq!(&msgs.pop_front().unwrap(), expected);
}
assert!(msgs.is_empty());
}
// Verify all keys exhibit this scanner behavior
for (i, key) in keys.iter().enumerate().skip(1) {
let processors = new_processors(&mut dbs[i], key, &spec, &tributaries[i].1).await;
let mut msgs = processors.0.write().await;
let msgs = msgs.get_mut(&spec.set().network).unwrap();
assert_eq!(msgs.len(), keys.len());
for expected in &expected_participations {
assert_eq!(&msgs.pop_front().unwrap(), expected);
}
assert!(msgs.is_empty());
}
let mut substrate_key = [0; 32];
OsRng.fill_bytes(&mut substrate_key);
let mut network_key = vec![0; usize::try_from((OsRng.next_u64() % 32) + 32).unwrap()];
OsRng.fill_bytes(&mut network_key);
let key_pair = KeyPair(serai_client::Public(substrate_key), network_key.try_into().unwrap());
let mut txs = vec![];
for (i, key) in keys.iter().enumerate() {
let mut txn = dbs[i].txn();
// Claim we've generated the key pair
crate::tributary::generated_key_pair::<MemDb>(&mut txn, spec.genesis(), &key_pair);
// Publish the nonces
let attempt = 0;
let mut tx = Transaction::DkgConfirmationNonces {
attempt,
confirmation_nonces: crate::tributary::dkg_confirmation_nonces(key, &spec, &mut txn, 0),
signed: Transaction::empty_signed(),
};
txn.commit();
tx.sign(&mut OsRng, spec.genesis(), key);
txs.push(tx);
}
let block_before_tx = tributaries[0].1.tip().await;
for (i, tx) in txs.iter().enumerate() {
assert_eq!(tributaries[i].1.add_transaction(tx.clone()).await, Ok(true));
}
for tx in &txs {
wait_for_tx_inclusion(&tributaries[0].1, block_before_tx, tx.hash()).await;
}
// This should not cause any new processor event as the processor doesn't handle DKG confirming
for (i, key) in keys.iter().enumerate() {
handle_new_blocks::<_, _, _, _, _, LocalP2p>(
&mut dbs[i],
key,
&|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after DkgConfirmationNonces")
},
&processors,
&(),
// The Tributary handler should publish ConfirmationShare itself after ConfirmationNonces
&|tx| async { assert_eq!(tributaries[i].1.add_transaction(tx).await, Ok(true)) },
&spec,
&tributaries[i].1.reader(),
)
.await;
{
assert!(processors.0.read().await.get(&spec.set().network).unwrap().is_empty());
}
}
// Yet once these TXs are on-chain, the tributary should itself publish the confirmation shares
// This means in the block after the next block, the keys should be set onto Serai
// Sleep twice as long as two blocks, in case there's some stability issue
sleep(Duration::from_secs(
2 * 2 * u64::from(Tributary::<MemDb, Transaction, LocalP2p>::block_time()),
))
.await;
struct CheckPublishSetKeys {
spec: TributarySpec,
key_pair: KeyPair,
}
#[async_trait::async_trait]
impl PublishSeraiTransaction for CheckPublishSetKeys {
async fn publish_set_keys(
&self,
_db: &(impl Sync + Get),
set: ValidatorSet,
key_pair: KeyPair,
signature_participants: bitvec::vec::BitVec<u8, bitvec::order::Lsb0>,
signature: Signature,
) {
assert_eq!(set, self.spec.set());
assert_eq!(self.key_pair, key_pair);
assert!(signature.verify(
&*serai_client::validator_sets::primitives::set_keys_message(&set, &key_pair),
&serai_client::Public(
frost::dkg::musig::musig_key::<Ristretto>(
&serai_client::validator_sets::primitives::musig_context(set),
&self
.spec
.validators()
.into_iter()
.zip(signature_participants)
.filter_map(|((validator, _), included)| included.then_some(validator))
.collect::<Vec<_>>()
)
.unwrap()
.to_bytes()
),
));
}
}
// The scanner should successfully try to publish a transaction with a validly signed signature
handle_new_blocks::<_, _, _, _, _, LocalP2p>(
&mut dbs[0],
&keys[0],
&|_, _, _, _| async {
panic!("provided TX caused recognized_id to be called after DKG confirmation")
},
&processors,
&CheckPublishSetKeys { spec: spec.clone(), key_pair: key_pair.clone() },
&|_| async { panic!("test tried to publish a new Tributary TX from handle_application_tx") },
&spec,
&tributaries[0].1.reader(),
)
.await;
{
assert!(processors.0.read().await.get(&spec.set().network).unwrap().is_empty());
}
}

View File

@@ -1,74 +0,0 @@
use core::time::Duration;
use std::sync::Arc;
use rand_core::OsRng;
use tokio::{
sync::{mpsc, broadcast},
time::sleep,
};
use serai_db::MemDb;
use tributary::Tributary;
use crate::{
tributary::Transaction,
ActiveTributary, TributaryEvent,
p2p::handle_p2p_task,
tests::{
LocalP2p,
tributary::{new_keys, new_spec, new_tributaries},
},
};
#[tokio::test]
async fn handle_p2p_test() {
let keys = new_keys(&mut OsRng);
let spec = new_spec(&mut OsRng, &keys);
let mut tributaries = new_tributaries(&keys, &spec)
.await
.into_iter()
.map(|(_, p2p, tributary)| (p2p, tributary))
.collect::<Vec<_>>();
let mut tributary_senders = vec![];
let mut tributary_arcs = vec![];
for (p2p, tributary) in tributaries.drain(..) {
let tributary = Arc::new(tributary);
tributary_arcs.push(tributary.clone());
let (new_tributary_send, new_tributary_recv) = broadcast::channel(5);
let (cosign_send, _) = mpsc::unbounded_channel();
tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv));
new_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary }))
.map_err(|_| "failed to send ActiveTributary")
.unwrap();
tributary_senders.push(new_tributary_send);
}
let tributaries = tributary_arcs;
// After two blocks of time, we should have a new block
// We don't wait one block of time as we may have missed the chance for this block
sleep(Duration::from_secs((2 * Tributary::<MemDb, Transaction, LocalP2p>::block_time()).into()))
.await;
let tip = tributaries[0].tip().await;
assert!(tip != spec.genesis());
// Sleep one second to make sure this block propagates
sleep(Duration::from_secs(1)).await;
// Make sure every tributary has it
for tributary in &tributaries {
assert!(tributary.reader().block(&tip).is_some());
}
// Then after another block of time, we should have yet another new block
sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await;
let new_tip = tributaries[0].tip().await;
assert!(new_tip != tip);
sleep(Duration::from_secs(1)).await;
for tributary in tributaries {
assert!(tributary.reader().block(&new_tip).is_some());
}
}

View File

@@ -1,245 +0,0 @@
use core::fmt::Debug;
use rand_core::{RngCore, OsRng};
use ciphersuite::{group::Group, Ciphersuite, Ristretto};
use scale::{Encode, Decode};
use serai_client::{
primitives::Signature,
validator_sets::primitives::{MAX_KEY_SHARES_PER_SET, ValidatorSet, KeyPair},
};
use processor_messages::coordinator::SubstrateSignableId;
use tributary::{ReadWrite, tests::random_signed_with_nonce};
use crate::tributary::{Label, SignData, Transaction, scanner::PublishSeraiTransaction};
mod chain;
pub use chain::*;
mod tx;
mod dkg;
// TODO: Test the other transactions
mod handle_p2p;
mod sync;
#[async_trait::async_trait]
impl PublishSeraiTransaction for () {
async fn publish_set_keys(
&self,
_db: &(impl Sync + serai_db::Get),
_set: ValidatorSet,
_key_pair: KeyPair,
_signature_participants: bitvec::vec::BitVec<u8, bitvec::order::Lsb0>,
_signature: Signature,
) {
panic!("publish_set_keys was called in test")
}
}
fn random_u32<R: RngCore>(rng: &mut R) -> u32 {
u32::try_from(rng.next_u64() >> 32).unwrap()
}
fn random_vec<R: RngCore>(rng: &mut R, limit: usize) -> Vec<u8> {
let len = usize::try_from(rng.next_u64() % u64::try_from(limit).unwrap()).unwrap();
let mut res = vec![0; len];
rng.fill_bytes(&mut res);
res
}
fn random_sign_data<R: RngCore, Id: Clone + PartialEq + Eq + Debug + Encode + Decode>(
rng: &mut R,
plan: Id,
label: Label,
) -> SignData<Id> {
SignData {
plan,
attempt: random_u32(&mut OsRng),
label,
data: {
let mut res = vec![];
for _ in 0 ..= (rng.next_u64() % 255) {
res.push(random_vec(&mut OsRng, 512));
}
res
},
signed: random_signed_with_nonce(&mut OsRng, label.nonce()),
}
}
fn test_read_write<RW: Eq + Debug + ReadWrite>(value: &RW) {
assert_eq!(value, &RW::read::<&[u8]>(&mut value.serialize().as_ref()).unwrap());
}
#[test]
fn tx_size_limit() {
use serai_client::validator_sets::primitives::MAX_KEY_LEN;
use tributary::TRANSACTION_SIZE_LIMIT;
let max_dkg_coefficients = (MAX_KEY_SHARES_PER_SET * 2).div_ceil(3) + 1;
// n coefficients
// 2 ECDH values per recipient, and the encrypted share
let elements_outside_of_proof = max_dkg_coefficients + ((2 + 1) * MAX_KEY_SHARES_PER_SET);
// Then Pedersen Vector Commitments for each DH done, and the associated overhead in the proof
// It's handwaved as one commitment per DH, where we do 2 per coefficient and 1 for the explicit
// ECDHs
let vector_commitments = (2 * max_dkg_coefficients) + (2 * MAX_KEY_SHARES_PER_SET);
// Then we have commitments to the `t` polynomial of length 2 + 2 nc, where nc is the amount of
// commitments
let t_commitments = 2 + (2 * vector_commitments);
// The remainder of the proof should be ~30 elements
let proof_elements = 30;
let handwaved_dkg_size =
((elements_outside_of_proof + vector_commitments + t_commitments + proof_elements) *
MAX_KEY_LEN) +
1024;
// Further scale by two in case of any errors in the above
assert!(u32::try_from(TRANSACTION_SIZE_LIMIT).unwrap() >= (2 * handwaved_dkg_size));
}
#[test]
fn serialize_sign_data() {
fn test_read_write<Id: Clone + PartialEq + Eq + Debug + Encode + Decode>(value: &SignData<Id>) {
let mut buf = vec![];
value.write(&mut buf).unwrap();
assert_eq!(value, &SignData::read(&mut buf.as_slice()).unwrap())
}
let mut plan = [0; 3];
OsRng.fill_bytes(&mut plan);
test_read_write(&random_sign_data::<_, _>(
&mut OsRng,
plan,
if (OsRng.next_u64() % 2) == 0 { Label::Preprocess } else { Label::Share },
));
let mut plan = [0; 5];
OsRng.fill_bytes(&mut plan);
test_read_write(&random_sign_data::<_, _>(
&mut OsRng,
plan,
if (OsRng.next_u64() % 2) == 0 { Label::Preprocess } else { Label::Share },
));
let mut plan = [0; 8];
OsRng.fill_bytes(&mut plan);
test_read_write(&random_sign_data::<_, _>(
&mut OsRng,
plan,
if (OsRng.next_u64() % 2) == 0 { Label::Preprocess } else { Label::Share },
));
let mut plan = [0; 24];
OsRng.fill_bytes(&mut plan);
test_read_write(&random_sign_data::<_, _>(
&mut OsRng,
plan,
if (OsRng.next_u64() % 2) == 0 { Label::Preprocess } else { Label::Share },
));
}
#[test]
fn serialize_transaction() {
test_read_write(&Transaction::RemoveParticipant {
participant: <Ristretto as Ciphersuite>::G::random(&mut OsRng),
signed: random_signed_with_nonce(&mut OsRng, 0),
});
test_read_write(&Transaction::DkgParticipation {
participation: random_vec(&mut OsRng, 4096),
signed: random_signed_with_nonce(&mut OsRng, 0),
});
test_read_write(&Transaction::DkgConfirmationNonces {
attempt: random_u32(&mut OsRng),
confirmation_nonces: {
let mut nonces = [0; 64];
OsRng.fill_bytes(&mut nonces);
nonces
},
signed: random_signed_with_nonce(&mut OsRng, 0),
});
test_read_write(&Transaction::DkgConfirmationShare {
attempt: random_u32(&mut OsRng),
confirmation_share: {
let mut share = [0; 32];
OsRng.fill_bytes(&mut share);
share
},
signed: random_signed_with_nonce(&mut OsRng, 1),
});
{
let mut block = [0; 32];
OsRng.fill_bytes(&mut block);
test_read_write(&Transaction::CosignSubstrateBlock(block));
}
{
let mut block = [0; 32];
OsRng.fill_bytes(&mut block);
let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap();
test_read_write(&Transaction::Batch { block, batch });
}
test_read_write(&Transaction::SubstrateBlock(OsRng.next_u64()));
{
let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap();
test_read_write(&Transaction::SubstrateSign(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(batch),
Label::Preprocess,
)));
}
{
let batch = u32::try_from(OsRng.next_u64() >> 32).unwrap();
test_read_write(&Transaction::SubstrateSign(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(batch),
Label::Share,
)));
}
{
let mut plan = [0; 32];
OsRng.fill_bytes(&mut plan);
test_read_write(&Transaction::Sign(random_sign_data(&mut OsRng, plan, Label::Preprocess)));
}
{
let mut plan = [0; 32];
OsRng.fill_bytes(&mut plan);
test_read_write(&Transaction::Sign(random_sign_data(&mut OsRng, plan, Label::Share)));
}
{
let mut plan = [0; 32];
OsRng.fill_bytes(&mut plan);
let mut tx_hash = vec![0; (OsRng.next_u64() % 64).try_into().unwrap()];
OsRng.fill_bytes(&mut tx_hash);
test_read_write(&Transaction::SignCompleted {
plan,
tx_hash,
first_signer: random_signed_with_nonce(&mut OsRng, 2).signer,
signature: random_signed_with_nonce(&mut OsRng, 2).signature,
});
}
test_read_write(&Transaction::SlashReport(
{
let amount =
usize::try_from(OsRng.next_u64() % u64::from(MAX_KEY_SHARES_PER_SET - 1)).unwrap();
let mut points = vec![];
for _ in 0 .. amount {
points.push((OsRng.next_u64() >> 32).try_into().unwrap());
}
points
},
random_signed_with_nonce(&mut OsRng, 0),
));
}

View File

@@ -1,165 +0,0 @@
use core::time::Duration;
use std::{sync::Arc, collections::HashSet};
use rand_core::OsRng;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use tokio::{
sync::{mpsc, broadcast},
time::sleep,
};
use serai_db::MemDb;
use tributary::Tributary;
use crate::{
tributary::Transaction,
ActiveTributary, TributaryEvent,
p2p::{heartbeat_tributaries_task, handle_p2p_task},
tests::{
LocalP2p,
tributary::{new_keys, new_spec, new_tributaries},
},
};
#[tokio::test]
async fn sync_test() {
let mut keys = new_keys(&mut OsRng);
let spec = new_spec(&mut OsRng, &keys);
// Ensure this can have a node fail
assert!(spec.n() > spec.t());
let mut tributaries = new_tributaries(&keys, &spec)
.await
.into_iter()
.map(|(_, p2p, tributary)| (p2p, tributary))
.collect::<Vec<_>>();
// Keep a Tributary back, effectively having it offline
let syncer_key = keys.pop().unwrap();
let (syncer_p2p, syncer_tributary) = tributaries.pop().unwrap();
// Have the rest form a P2P net
let mut tributary_senders = vec![];
let mut tributary_arcs = vec![];
let mut p2p_threads = vec![];
for (p2p, tributary) in tributaries.drain(..) {
let tributary = Arc::new(tributary);
tributary_arcs.push(tributary.clone());
let (new_tributary_send, new_tributary_recv) = broadcast::channel(5);
let (cosign_send, _) = mpsc::unbounded_channel();
let thread = tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv));
new_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary }))
.map_err(|_| "failed to send ActiveTributary")
.unwrap();
tributary_senders.push(new_tributary_send);
p2p_threads.push(thread);
}
let tributaries = tributary_arcs;
// After four blocks of time, we should have a new block
// We don't wait one block of time as we may have missed the chance for the first block
// We don't wait two blocks because we may have missed the chance, and then had a failure to
// propose by our 'offline' validator, which would cause the Tendermint round time to increase,
// requiring a longer delay
let block_time = u64::from(Tributary::<MemDb, Transaction, LocalP2p>::block_time());
sleep(Duration::from_secs(4 * block_time)).await;
let tip = tributaries[0].tip().await;
assert!(tip != spec.genesis());
// Sleep one second to make sure this block propagates
sleep(Duration::from_secs(1)).await;
// Make sure every tributary has it
for tributary in &tributaries {
assert!(tributary.reader().block(&tip).is_some());
}
// Now that we've confirmed the other tributaries formed a net without issue, drop the syncer's
// pending P2P messages
syncer_p2p.1.write().await.1.last_mut().unwrap().clear();
// Have it join the net
let syncer_key = Ristretto::generator() * *syncer_key;
let syncer_tributary = Arc::new(syncer_tributary);
let (syncer_tributary_send, syncer_tributary_recv) = broadcast::channel(5);
let (cosign_send, _) = mpsc::unbounded_channel();
tokio::spawn(handle_p2p_task(syncer_p2p.clone(), cosign_send, syncer_tributary_recv));
syncer_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary {
spec: spec.clone(),
tributary: syncer_tributary.clone(),
}))
.map_err(|_| "failed to send ActiveTributary to syncer")
.unwrap();
// It shouldn't automatically catch up. If it somehow was, our test would be broken
// Sanity check this
let tip = tributaries[0].tip().await;
// Wait until a new block occurs
sleep(Duration::from_secs(3 * block_time)).await;
// Make sure a new block actually occurred
assert!(tributaries[0].tip().await != tip);
// Make sure the new block alone didn't trigger catching up
assert_eq!(syncer_tributary.tip().await, spec.genesis());
// Start the heartbeat protocol
let (syncer_heartbeat_tributary_send, syncer_heartbeat_tributary_recv) = broadcast::channel(5);
tokio::spawn(heartbeat_tributaries_task(syncer_p2p, syncer_heartbeat_tributary_recv));
syncer_heartbeat_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary {
spec: spec.clone(),
tributary: syncer_tributary.clone(),
}))
.map_err(|_| "failed to send ActiveTributary to heartbeat")
.unwrap();
// The heartbeat is once every 10 blocks, with some limitations
sleep(Duration::from_secs(20 * block_time)).await;
assert!(syncer_tributary.tip().await != spec.genesis());
// Verify it synced to the tip
let syncer_tip = {
let tributary = &tributaries[0];
let tip = tributary.tip().await;
let syncer_tip = syncer_tributary.tip().await;
// Allow a one block tolerance in case of race conditions
assert!(
HashSet::from([tip, tributary.reader().block(&tip).unwrap().parent()]).contains(&syncer_tip)
);
syncer_tip
};
sleep(Duration::from_secs(block_time)).await;
// Verify it's now keeping up
assert!(syncer_tributary.tip().await != syncer_tip);
// Verify it's now participating in consensus
// Because only `t` validators are used in a commit, take n - t nodes offline
// leaving only `t` nodes. Which should force it to participate in the consensus
// of next blocks.
let spares = usize::from(spec.n() - spec.t());
for thread in p2p_threads.iter().take(spares) {
thread.abort();
}
// wait for a block
sleep(Duration::from_secs(block_time)).await;
if syncer_tributary
.reader()
.parsed_commit(&syncer_tributary.tip().await)
.unwrap()
.validators
.iter()
.any(|signer| signer == &syncer_key.to_bytes())
{
return;
}
panic!("synced tributary didn't start participating in consensus");
}

View File

@@ -1,62 +0,0 @@
use core::time::Duration;
use rand_core::{RngCore, OsRng};
use tokio::time::sleep;
use serai_db::MemDb;
use tributary::{
transaction::Transaction as TransactionTrait, Transaction as TributaryTransaction, Tributary,
};
use crate::{
tributary::Transaction,
tests::{
LocalP2p,
tributary::{new_keys, new_spec, new_tributaries, run_tributaries, wait_for_tx_inclusion},
},
};
#[tokio::test]
async fn tx_test() {
let keys = new_keys(&mut OsRng);
let spec = new_spec(&mut OsRng, &keys);
let tributaries = new_tributaries(&keys, &spec)
.await
.into_iter()
.map(|(_, p2p, tributary)| (p2p, tributary))
.collect::<Vec<_>>();
// Run the tributaries in the background
tokio::spawn(run_tributaries(tributaries.clone()));
// Send a TX from a random Tributary
let sender =
usize::try_from(OsRng.next_u64() % u64::try_from(tributaries.len()).unwrap()).unwrap();
let key = keys[sender].clone();
let block_before_tx = tributaries[sender].1.tip().await;
// Create the TX with a null signature so we can get its sig hash
let mut tx = Transaction::DkgParticipation {
participation: {
let mut participation = vec![0; 4096];
OsRng.fill_bytes(&mut participation);
participation
},
signed: Transaction::empty_signed(),
};
tx.sign(&mut OsRng, spec.genesis(), &key);
assert_eq!(tributaries[sender].1.add_transaction(tx.clone()).await, Ok(true));
let included_in = wait_for_tx_inclusion(&tributaries[sender].1, block_before_tx, tx.hash()).await;
// Also sleep for the block time to ensure the block is synced around before we run checks on it
sleep(Duration::from_secs(Tributary::<MemDb, Transaction, LocalP2p>::block_time().into())).await;
// All tributaries should have acknowledged this transaction in a block
for (_, tributary) in tributaries {
let block = tributary.reader().block(&included_in).unwrap();
assert_eq!(block.transactions, vec![TributaryTransaction::Application(tx.clone())]);
}
}

View File

@@ -3,186 +3,344 @@ use std::collections::HashMap;
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use frost::Participant;
use serai_client::{primitives::SeraiAddress, validator_sets::primitives::ValidatorSet};
use serai_client::validator_sets::primitives::{KeyPair, ValidatorSet};
use processor_messages::sign::VariantSignId;
use processor_messages::coordinator::SubstrateSignableId;
use serai_db::*;
pub use serai_db::*;
use tributary::ReadWrite;
use crate::tributary::{Label, Transaction};
use crate::tributary::transaction::SigningProtocolRound;
/// A topic within the database which the group participates in
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)]
pub enum Topic {
DkgConfirmation,
SubstrateSign(SubstrateSignableId),
Sign([u8; 32]),
/// Vote to remove a participant
RemoveParticipant { participant: SeraiAddress },
// DkgParticipation isn't represented here as participations are immediately sent to the
// processor, not accumulated within this databse
/// Participation in the signing protocol to confirm the DKG results on Substrate
DkgConfirmation { attempt: u32, label: SigningProtocolRound },
/// The local view of the SlashReport, to be aggregated into the final SlashReport
SlashReport,
/// Participation in a signing protocol
Sign { id: VariantSignId, attempt: u32, label: SigningProtocolRound },
}
// A struct to refer to a piece of data all validators will presumably provide a value for.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode)]
pub struct DataSpecification {
pub topic: Topic,
pub label: Label,
pub attempt: u32,
enum Participating {
Participated,
Everyone,
}
pub enum DataSet {
Participating(HashMap<Participant, Vec<u8>>),
NotParticipating,
impl Topic {
// The topic used by the next attempt of this protocol
fn next_attempt_topic(self) -> Option<Topic> {
#[allow(clippy::match_same_arms)]
match self {
Topic::RemoveParticipant { .. } => None,
Topic::DkgConfirmation { attempt, label: _ } => Some(Topic::DkgConfirmation {
attempt: attempt + 1,
label: SigningProtocolRound::Preprocess,
}),
Topic::SlashReport { .. } => None,
Topic::Sign { id, attempt, label: _ } => {
Some(Topic::Sign { id, attempt: attempt + 1, label: SigningProtocolRound::Preprocess })
}
}
}
// The topic for the re-attempt to schedule
fn reattempt_topic(self) -> Option<(u32, Topic)> {
#[allow(clippy::match_same_arms)]
match self {
Topic::RemoveParticipant { .. } => None,
Topic::DkgConfirmation { attempt, label } => match label {
SigningProtocolRound::Preprocess => {
let attempt = attempt + 1;
Some((
attempt,
Topic::DkgConfirmation { attempt, label: SigningProtocolRound::Preprocess },
))
}
SigningProtocolRound::Share => None,
},
Topic::SlashReport { .. } => None,
Topic::Sign { id, attempt, label } => match label {
SigningProtocolRound::Preprocess => {
let attempt = attempt + 1;
Some((attempt, Topic::Sign { id, attempt, label: SigningProtocolRound::Preprocess }))
}
SigningProtocolRound::Share => None,
},
}
}
/// The topic which precedes this topic as a prerequisite
///
/// The preceding topic must define this topic as succeeding
fn preceding_topic(self) -> Option<Topic> {
#[allow(clippy::match_same_arms)]
match self {
Topic::RemoveParticipant { .. } => None,
Topic::DkgConfirmation { attempt, label } => match label {
SigningProtocolRound::Preprocess => None,
SigningProtocolRound::Share => {
Some(Topic::DkgConfirmation { attempt, label: SigningProtocolRound::Preprocess })
}
},
Topic::SlashReport { .. } => None,
Topic::Sign { id, attempt, label } => match label {
SigningProtocolRound::Preprocess => None,
SigningProtocolRound::Share => {
Some(Topic::Sign { id, attempt, label: SigningProtocolRound::Preprocess })
}
},
}
}
/// The topic which succeeds this topic, with this topic as a prerequisite
///
/// The succeeding topic must define this topic as preceding
fn succeeding_topic(self) -> Option<Topic> {
#[allow(clippy::match_same_arms)]
match self {
Topic::RemoveParticipant { .. } => None,
Topic::DkgConfirmation { attempt, label } => match label {
SigningProtocolRound::Preprocess => {
Some(Topic::DkgConfirmation { attempt, label: SigningProtocolRound::Share })
}
SigningProtocolRound::Share => None,
},
Topic::SlashReport { .. } => None,
Topic::Sign { id, attempt, label } => match label {
SigningProtocolRound::Preprocess => {
Some(Topic::Sign { id, attempt, label: SigningProtocolRound::Share })
}
SigningProtocolRound::Share => None,
},
}
}
fn requires_whitelisting(&self) -> bool {
#[allow(clippy::match_same_arms)]
match self {
// We don't require whitelisting to remove a participant
Topic::RemoveParticipant { .. } => false,
// We don't require whitelisting for the first attempt, solely the re-attempts
Topic::DkgConfirmation { attempt, .. } => *attempt != 0,
// We don't require whitelisting for the slash report
Topic::SlashReport { .. } => false,
// We do require whitelisting for every sign protocol
Topic::Sign { .. } => true,
}
}
fn required_participation(&self, n: u64) -> u64 {
let _ = self;
// All of our topics require 2/3rds participation
((2 * n) / 3) + 1
}
fn participating(&self) -> Participating {
#[allow(clippy::match_same_arms)]
match self {
Topic::RemoveParticipant { .. } => Participating::Everyone,
Topic::DkgConfirmation { .. } => Participating::Participated,
Topic::SlashReport { .. } => Participating::Everyone,
Topic::Sign { .. } => Participating::Participated,
}
}
}
pub enum Accumulation {
Ready(DataSet),
NotReady,
/// The resulting data set from an accumulation
pub enum DataSet<D: Borshy> {
/// Accumulating this did not produce a data set to act on
/// (non-existent, not ready, prior handled, not participating, etc.)
None,
/// The data set was ready and we are participating in this event
Participating(HashMap<SeraiAddress, D>),
}
// TODO: Move from genesis to set for indexing
trait Borshy: BorshSerialize + BorshDeserialize {}
impl<T: BorshSerialize + BorshDeserialize> Borshy for T {}
create_db!(
Tributary {
SeraiBlockNumber: (hash: [u8; 32]) -> u64,
SeraiDkgCompleted: (set: ValidatorSet) -> [u8; 32],
CoordinatorTributary {
// The last handled tributary block's (number, hash)
LastHandledTributaryBlock: (set: ValidatorSet) -> (u64, [u8; 32]),
TributaryBlockNumber: (block: [u8; 32]) -> u32,
LastHandledBlock: (genesis: [u8; 32]) -> [u8; 32],
// The slash points a validator has accrued, with u64::MAX representing a fatal slash.
SlashPoints: (set: ValidatorSet, validator: SeraiAddress) -> u64,
// TODO: Revisit the point of this
FatalSlashes: (genesis: [u8; 32]) -> Vec<[u8; 32]>,
// TODO: Combine these two
FatallySlashed: (genesis: [u8; 32], account: [u8; 32]) -> (),
SlashPoints: (genesis: [u8; 32], account: [u8; 32]) -> u32,
// The latest Substrate block to cosign.
LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32],
VotedToRemove: (genesis: [u8; 32], voter: [u8; 32], to_remove: [u8; 32]) -> (),
VotesToRemove: (genesis: [u8; 32], to_remove: [u8; 32]) -> u16,
// The weight accumulated for a topic.
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
// The entries accumulated for a topic, by validator.
Accumulated: <D: Borshy>(set: ValidatorSet, topic: Topic, validator: SeraiAddress) -> D,
AttemptDb: (genesis: [u8; 32], topic: &Topic) -> u32,
ReattemptDb: (genesis: [u8; 32], block: u32) -> Vec<Topic>,
DataReceived: (genesis: [u8; 32], data_spec: &DataSpecification) -> u16,
DataDb: (genesis: [u8; 32], data_spec: &DataSpecification, signer_bytes: &[u8; 32]) -> Vec<u8>,
DkgParticipation: (genesis: [u8; 32], from: u16) -> Vec<u8>,
ConfirmationNonces: (genesis: [u8; 32], attempt: u32) -> HashMap<Participant, Vec<u8>>,
DkgKeyPair: (genesis: [u8; 32]) -> KeyPair,
PlanIds: (genesis: &[u8], block: u64) -> Vec<[u8; 32]>,
SignedTransactionDb: (order: &[u8], nonce: u32) -> Vec<u8>,
SlashReports: (genesis: [u8; 32], signer: [u8; 32]) -> Vec<u32>,
SlashReported: (genesis: [u8; 32]) -> u16,
SlashReportCutOff: (genesis: [u8; 32]) -> u64,
SlashReport: (set: ValidatorSet) -> Vec<([u8; 32], u32)>,
// Topics to be recognized as of a certain block number due to the reattempt protocol.
Reattempt: (set: ValidatorSet, block_number: u64) -> Vec<Topic>,
}
);
impl FatalSlashes {
pub fn get_as_keys(getter: &impl Get, genesis: [u8; 32]) -> Vec<<Ristretto as Ciphersuite>::G> {
FatalSlashes::get(getter, genesis)
.unwrap_or(vec![])
.iter()
.map(|key| <Ristretto as Ciphersuite>::G::from_bytes(key).unwrap())
.collect::<Vec<_>>()
pub struct TributaryDb;
impl TributaryDb {
pub fn last_handled_tributary_block(
getter: &impl Get,
set: ValidatorSet,
) -> Option<(u64, [u8; 32])> {
LastHandledTributaryBlock::get(getter, set)
}
}
impl FatallySlashed {
pub fn set_fatally_slashed(txn: &mut impl DbTxn, genesis: [u8; 32], account: [u8; 32]) {
Self::set(txn, genesis, account, &());
let mut existing = FatalSlashes::get(txn, genesis).unwrap_or_default();
// Don't append if we already have it, which can occur upon multiple faults
if existing.iter().any(|existing| existing == &account) {
return;
}
existing.push(account);
FatalSlashes::set(txn, genesis, &existing);
}
}
impl AttemptDb {
pub fn recognize_topic(txn: &mut impl DbTxn, genesis: [u8; 32], topic: Topic) {
Self::set(txn, genesis, &topic, &0u32);
}
pub fn start_next_attempt(txn: &mut impl DbTxn, genesis: [u8; 32], topic: Topic) -> u32 {
let next =
Self::attempt(txn, genesis, topic).expect("starting next attempt for unknown topic") + 1;
Self::set(txn, genesis, &topic, &next);
next
}
pub fn attempt(getter: &impl Get, genesis: [u8; 32], topic: Topic) -> Option<u32> {
let attempt = Self::get(getter, genesis, &topic);
// Don't require explicit recognition of the DkgConfirmation topic as it starts when the chain
// does
// Don't require explicit recognition of the SlashReport topic as it isn't a DoS risk and it
// should always happen (eventually)
if attempt.is_none() &&
((topic == Topic::DkgConfirmation) ||
(topic == Topic::SubstrateSign(SubstrateSignableId::SlashReport)))
{
return Some(0);
}
attempt
}
}
impl ReattemptDb {
pub fn schedule_reattempt(
pub fn set_last_handled_tributary_block(
txn: &mut impl DbTxn,
genesis: [u8; 32],
current_block_number: u32,
topic: Topic,
set: ValidatorSet,
block_number: u64,
block_hash: [u8; 32],
) {
// 5 minutes
#[cfg(not(feature = "longer-reattempts"))]
const BASE_REATTEMPT_DELAY: u32 = (5 * 60 * 1000) / tributary::tendermint::TARGET_BLOCK_TIME;
// 10 minutes, intended for latent environments like the GitHub CI
#[cfg(feature = "longer-reattempts")]
const BASE_REATTEMPT_DELAY: u32 = (10 * 60 * 1000) / tributary::tendermint::TARGET_BLOCK_TIME;
// 5 minutes for attempts 0 ..= 2, 10 minutes for attempts 3 ..= 5, 15 minutes for attempts > 5
// Assumes no event will take longer than 15 minutes, yet grows the time in case there are
// network bandwidth issues
let reattempt_delay = BASE_REATTEMPT_DELAY *
((AttemptDb::attempt(txn, genesis, topic)
.expect("scheduling re-attempt for unknown topic") /
3) +
1)
.min(3);
let upon_block = current_block_number + reattempt_delay;
let mut reattempts = Self::get(txn, genesis, upon_block).unwrap_or(vec![]);
reattempts.push(topic);
Self::set(txn, genesis, upon_block, &reattempts);
LastHandledTributaryBlock::set(txn, set, &(block_number, block_hash));
}
pub fn take(txn: &mut impl DbTxn, genesis: [u8; 32], block_number: u32) -> Vec<Topic> {
let res = Self::get(txn, genesis, block_number).unwrap_or(vec![]);
if !res.is_empty() {
Self::del(txn, genesis, block_number);
pub fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) {
AccumulatedWeight::set(txn, set, topic, &0);
}
pub fn start_of_block(txn: &mut impl DbTxn, set: ValidatorSet, block_number: u64) {
for topic in Reattempt::take(txn, set, block_number).unwrap_or(vec![]) {
// TODO: Slash all people who preprocessed but didn't share
Self::recognize_topic(txn, set, topic);
}
res
}
}
impl SignedTransactionDb {
pub fn take_signed_transaction(
pub fn fatal_slash(
txn: &mut impl DbTxn,
order: &[u8],
nonce: u32,
) -> Option<Transaction> {
let res = SignedTransactionDb::get(txn, order, nonce)
.map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap());
if res.is_some() {
Self::del(txn, order, nonce);
set: ValidatorSet,
validator: SeraiAddress,
reason: &str,
) {
log::warn!("{validator} fatally slashed: {reason}");
SlashPoints::set(txn, set, validator, &u64::MAX);
}
pub fn is_fatally_slashed(getter: &impl Get, set: ValidatorSet, validator: SeraiAddress) -> bool {
SlashPoints::get(getter, set, validator).unwrap_or(0) == u64::MAX
}
#[allow(clippy::too_many_arguments)]
pub fn accumulate<D: Borshy>(
txn: &mut impl DbTxn,
set: ValidatorSet,
validators: &[SeraiAddress],
total_weight: u64,
block_number: u64,
topic: Topic,
validator: SeraiAddress,
validator_weight: u64,
data: &D,
) -> DataSet<D> {
// This function will only be called once for a (validator, topic) tuple due to how we handle
// nonces on transactions (deterministically to the topic)
let accumulated_weight = AccumulatedWeight::get(txn, set, topic);
if topic.requires_whitelisting() && accumulated_weight.is_none() {
Self::fatal_slash(txn, set, validator, "participated in unrecognized topic");
return DataSet::None;
}
let mut accumulated_weight = accumulated_weight.unwrap_or(0);
// Check if there's a preceding topic, this validator participated
let preceding_topic = topic.preceding_topic();
if let Some(preceding_topic) = preceding_topic {
if Accumulated::<D>::get(txn, set, preceding_topic, validator).is_none() {
Self::fatal_slash(
txn,
set,
validator,
"participated in topic without participating in prior",
);
return DataSet::None;
}
}
// The complete lack of validation on the data by these NOPs opens the potential for spam here
// If we've already accumulated past the threshold, NOP
if accumulated_weight >= topic.required_participation(total_weight) {
return DataSet::None;
}
// If this is for an old attempt, NOP
if let Some(next_attempt_topic) = topic.next_attempt_topic() {
if AccumulatedWeight::get(txn, set, next_attempt_topic).is_some() {
return DataSet::None;
}
}
// Accumulate the data
accumulated_weight += validator_weight;
AccumulatedWeight::set(txn, set, topic, &accumulated_weight);
Accumulated::set(txn, set, topic, validator, data);
// Check if we now cross the weight threshold
if accumulated_weight >= topic.required_participation(total_weight) {
// Queue this for re-attempt after enough time passes
if let Some((attempt, reattempt_topic)) = topic.reattempt_topic() {
// 5 minutes
#[cfg(not(feature = "longer-reattempts"))]
const BASE_REATTEMPT_DELAY: u32 =
(5u32 * 60 * 1000).div_ceil(tributary::tendermint::TARGET_BLOCK_TIME);
// 10 minutes, intended for latent environments like the GitHub CI
#[cfg(feature = "longer-reattempts")]
const BASE_REATTEMPT_DELAY: u32 =
(10u32 * 60 * 1000).div_ceil(tributary::tendermint::TARGET_BLOCK_TIME);
// Linearly scale the time for the protocol with the attempt number
let blocks_till_reattempt = u64::from(attempt * BASE_REATTEMPT_DELAY);
let recognize_at = block_number + blocks_till_reattempt;
let mut queued = Reattempt::get(txn, set, recognize_at).unwrap_or(Vec::with_capacity(1));
queued.push(reattempt_topic);
Reattempt::set(txn, set, recognize_at, &queued);
}
// Register the succeeding topic
let succeeding_topic = topic.succeeding_topic();
if let Some(succeeding_topic) = succeeding_topic {
Self::recognize_topic(txn, set, succeeding_topic);
}
// Fetch and return all participations
let mut data_set = HashMap::with_capacity(validators.len());
for validator in validators {
if let Some(data) = Accumulated::<D>::get(txn, set, topic, *validator) {
// Clean this data up if there's not a succeeding topic
// If there is, we wait as the succeeding topic checks our participation in this topic
if succeeding_topic.is_none() {
Accumulated::<D>::del(txn, set, topic, *validator);
}
// If this *was* the succeeding topic, clean up the preceding topic's data
if let Some(preceding_topic) = preceding_topic {
Accumulated::<D>::del(txn, set, preceding_topic, *validator);
}
data_set.insert(*validator, data);
}
}
let participated = data_set.contains_key(&validator);
match topic.participating() {
Participating::Participated => {
if participated {
DataSet::Participating(data_set)
} else {
DataSet::None
}
}
Participating::Everyone => DataSet::Participating(data_set),
}
} else {
DataSet::None
}
res
}
}

View File

@@ -1,554 +0,0 @@
use core::ops::Deref;
use std::collections::HashMap;
use zeroize::Zeroizing;
use rand_core::OsRng;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use frost::dkg::Participant;
use scale::{Encode, Decode};
use serai_client::{Signature, validator_sets::primitives::KeyPair};
use tributary::{Signed, TransactionKind, TransactionTrait};
use processor_messages::{
key_gen::self,
coordinator::{self, SubstrateSignableId, SubstrateSignId},
sign::{self, SignId},
};
use serai_db::*;
use crate::{
processors::Processors,
tributary::{
*,
signing_protocol::DkgConfirmer,
scanner::{
RecognizedIdType, RIDTrait, PublishSeraiTransaction, PTTTrait, TributaryBlockHandler,
},
},
P2p,
};
pub fn dkg_confirmation_nonces(
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
spec: &TributarySpec,
txn: &mut impl DbTxn,
attempt: u32,
) -> [u8; 64] {
DkgConfirmer::new(key, spec, txn, attempt).preprocess()
}
pub fn generated_key_pair<D: Db>(
txn: &mut D::Transaction<'_>,
genesis: [u8; 32],
key_pair: &KeyPair,
) {
DkgKeyPair::set(txn, genesis, key_pair);
}
fn unflatten(spec: &TributarySpec, data: &mut HashMap<Participant, Vec<u8>>) {
for (validator, _) in spec.validators() {
let Some(range) = spec.i(validator) else { continue };
let Some(all_segments) = data.remove(&range.start) else {
continue;
};
let mut data_vec = Vec::<_>::decode(&mut all_segments.as_slice()).unwrap();
for i in u16::from(range.start) .. u16::from(range.end) {
let i = Participant::new(i).unwrap();
data.insert(i, data_vec.remove(0));
}
}
}
impl<
D: Db,
T: DbTxn,
Pro: Processors,
PST: PublishSeraiTransaction,
PTT: PTTTrait,
RID: RIDTrait,
P: P2p,
> TributaryBlockHandler<'_, D, T, Pro, PST, PTT, RID, P>
{
fn accumulate(
&mut self,
data_spec: &DataSpecification,
signer: <Ristretto as Ciphersuite>::G,
data: &Vec<u8>,
) -> Accumulation {
log::debug!("accumulating entry for {:?} attempt #{}", &data_spec.topic, &data_spec.attempt);
let genesis = self.spec.genesis();
if DataDb::get(self.txn, genesis, data_spec, &signer.to_bytes()).is_some() {
panic!("accumulating data for a participant multiple times");
}
let signer_shares = {
let signer_i = self.spec.i(signer).expect("transaction signer wasn't a member of the set");
u16::from(signer_i.end) - u16::from(signer_i.start)
};
let prior_received = DataReceived::get(self.txn, genesis, data_spec).unwrap_or_default();
let now_received = prior_received + signer_shares;
DataReceived::set(self.txn, genesis, data_spec, &now_received);
DataDb::set(self.txn, genesis, data_spec, &signer.to_bytes(), data);
let received_range = (prior_received + 1) ..= now_received;
// If 2/3rds of the network participated in this preprocess, queue it for an automatic
// re-attempt
if (data_spec.label == Label::Preprocess) && received_range.contains(&self.spec.t()) {
// Double check the attempt on this entry, as we don't want to schedule a re-attempt if this
// is an old entry
// This is an assert, not part of the if check, as old data shouldn't be here in the first
// place
assert_eq!(AttemptDb::attempt(self.txn, genesis, data_spec.topic), Some(data_spec.attempt));
ReattemptDb::schedule_reattempt(self.txn, genesis, self.block_number, data_spec.topic);
}
// If we have all the needed commitments/preprocesses/shares, tell the processor
if received_range.contains(&self.spec.t()) {
log::debug!(
"accumulation for entry {:?} attempt #{} is ready",
&data_spec.topic,
&data_spec.attempt
);
let mut data = HashMap::new();
for validator in self.spec.validators().iter().map(|validator| validator.0) {
let Some(i) = self.spec.i(validator) else { continue };
data.insert(
i.start,
if let Some(data) = DataDb::get(self.txn, genesis, data_spec, &validator.to_bytes()) {
data
} else {
continue;
},
);
}
assert_eq!(data.len(), usize::from(self.spec.t()));
// Remove our own piece of data, if we were involved
if let Some(i) = self.spec.i(Ristretto::generator() * self.our_key.deref()) {
if data.remove(&i.start).is_some() {
return Accumulation::Ready(DataSet::Participating(data));
}
}
return Accumulation::Ready(DataSet::NotParticipating);
}
Accumulation::NotReady
}
fn handle_data(
&mut self,
data_spec: &DataSpecification,
bytes: &Vec<u8>,
signed: &Signed,
) -> Accumulation {
let genesis = self.spec.genesis();
let Some(curr_attempt) = AttemptDb::attempt(self.txn, genesis, data_spec.topic) else {
// Premature publication of a valid ID/publication of an invalid ID
self.fatal_slash(signed.signer.to_bytes(), "published data for ID without an attempt");
return Accumulation::NotReady;
};
// If they've already published a TX for this attempt, slash
// This shouldn't be reachable since nonces were made inserted by the coordinator, yet it's a
// cheap check to leave in for safety
if DataDb::get(self.txn, genesis, data_spec, &signed.signer.to_bytes()).is_some() {
self.fatal_slash(signed.signer.to_bytes(), "published data multiple times");
return Accumulation::NotReady;
}
// If the attempt is lesser than the blockchain's, return
if data_spec.attempt < curr_attempt {
log::debug!(
"dated attempt published onto tributary for topic {:?} (used attempt {}, current {})",
data_spec.topic,
data_spec.attempt,
curr_attempt
);
return Accumulation::NotReady;
}
// If the attempt is greater, this is a premature publication, full slash
if data_spec.attempt > curr_attempt {
self.fatal_slash(
signed.signer.to_bytes(),
"published data with an attempt which hasn't started",
);
return Accumulation::NotReady;
}
// TODO: We can also full slash if shares before all commitments, or share before the
// necessary preprocesses
// TODO: If this is shares, we need to check they are part of the selected signing set
// Accumulate this data
self.accumulate(data_spec, signed.signer, bytes)
}
fn check_sign_data_len(
&mut self,
signer: <Ristretto as Ciphersuite>::G,
len: usize,
) -> Result<(), ()> {
let signer_i = self.spec.i(signer).expect("signer wasn't a member of the set");
if len != usize::from(u16::from(signer_i.end) - u16::from(signer_i.start)) {
self.fatal_slash(
signer.to_bytes(),
"signer published a distinct amount of sign data than they had shares",
);
Err(())?;
}
Ok(())
}
// TODO: Don't call fatal_slash in here, return the party to fatal_slash to ensure no further
// execution occurs
pub(crate) async fn handle_application_tx(&mut self, tx: Transaction) {
let genesis = self.spec.genesis();
// Don't handle transactions from fatally slashed participants
// This prevents removed participants from sabotaging the removal signing sessions and so on
// TODO: Because fatally slashed participants can still publish onto the blockchain, they have
// a notable DoS ability
if let TransactionKind::Signed(_, signed) = tx.kind() {
if FatallySlashed::get(self.txn, genesis, signed.signer.to_bytes()).is_some() {
return;
}
}
match tx {
Transaction::RemoveParticipant { participant, signed } => {
if self.spec.i(participant).is_none() {
self.fatal_slash(participant.to_bytes(), "RemoveParticipant vote for non-validator");
return;
}
let participant = participant.to_bytes();
let signer = signed.signer.to_bytes();
assert!(
VotedToRemove::get(self.txn, genesis, signer, participant).is_none(),
"VotedToRemove multiple times despite a single nonce being allocated",
);
VotedToRemove::set(self.txn, genesis, signer, participant, &());
let prior_votes = VotesToRemove::get(self.txn, genesis, participant).unwrap_or(0);
let signer_votes =
self.spec.i(signed.signer).expect("signer wasn't a validator for this network?");
let new_votes = prior_votes + u16::from(signer_votes.end) - u16::from(signer_votes.start);
VotesToRemove::set(self.txn, genesis, participant, &new_votes);
if ((prior_votes + 1) ..= new_votes).contains(&self.spec.t()) {
self.fatal_slash(participant, "RemoveParticipant vote")
}
}
Transaction::DkgParticipation { participation, signed } => {
// Send the participation to the processor
self
.processors
.send(
self.spec.set().network,
key_gen::CoordinatorMessage::Participation {
session: self.spec.set().session,
participant: self
.spec
.i(signed.signer)
.expect("signer wasn't a validator for this network?")
.start,
participation,
},
)
.await;
}
Transaction::DkgConfirmationNonces { attempt, confirmation_nonces, signed } => {
let data_spec =
DataSpecification { topic: Topic::DkgConfirmation, label: Label::Preprocess, attempt };
match self.handle_data(&data_spec, &confirmation_nonces.to_vec(), &signed) {
Accumulation::Ready(DataSet::Participating(confirmation_nonces)) => {
log::info!(
"got all DkgConfirmationNonces for {}, attempt {attempt}",
hex::encode(genesis)
);
ConfirmationNonces::set(self.txn, genesis, attempt, &confirmation_nonces);
// Send the expected DkgConfirmationShare
// TODO: Slight race condition here due to set, publish tx, then commit txn
let key_pair = DkgKeyPair::get(self.txn, genesis)
.expect("participating in confirming key we don't have");
let mut tx = match DkgConfirmer::new(self.our_key, self.spec, self.txn, attempt)
.share(confirmation_nonces, &key_pair)
{
Ok(confirmation_share) => Transaction::DkgConfirmationShare {
attempt,
confirmation_share,
signed: Transaction::empty_signed(),
},
Err(participant) => Transaction::RemoveParticipant {
participant: self.spec.reverse_lookup_i(participant).unwrap(),
signed: Transaction::empty_signed(),
},
};
tx.sign(&mut OsRng, genesis, self.our_key);
self.publish_tributary_tx.publish_tributary_tx(tx).await;
}
Accumulation::Ready(DataSet::NotParticipating) | Accumulation::NotReady => {}
}
}
Transaction::DkgConfirmationShare { attempt, confirmation_share, signed } => {
let data_spec =
DataSpecification { topic: Topic::DkgConfirmation, label: Label::Share, attempt };
match self.handle_data(&data_spec, &confirmation_share.to_vec(), &signed) {
Accumulation::Ready(DataSet::Participating(shares)) => {
log::info!(
"got all DkgConfirmationShare for {}, attempt {attempt}",
hex::encode(genesis)
);
let preprocesses = ConfirmationNonces::get(self.txn, genesis, attempt).unwrap();
// TODO: This can technically happen under very very very specific timing as the txn
// put happens before DkgConfirmationShare, yet the txn isn't guaranteed to be
// committed
let key_pair = DkgKeyPair::get(self.txn, genesis).expect(
"in DkgConfirmationShare handling, which happens after everyone \
(including us) fires DkgConfirmationShare, yet no confirming key pair",
);
// Determine the bitstring representing who participated before we move `shares`
let validators = self.spec.validators();
let mut signature_participants = bitvec::vec::BitVec::with_capacity(validators.len());
for (participant, _) in validators {
signature_participants.push(
(participant == (<Ristretto as Ciphersuite>::generator() * self.our_key.deref())) ||
shares.contains_key(&self.spec.i(participant).unwrap().start),
);
}
// Produce the final signature
let mut confirmer = DkgConfirmer::new(self.our_key, self.spec, self.txn, attempt);
let sig = match confirmer.complete(preprocesses, &key_pair, shares) {
Ok(sig) => sig,
Err(p) => {
let mut tx = Transaction::RemoveParticipant {
participant: self.spec.reverse_lookup_i(p).unwrap(),
signed: Transaction::empty_signed(),
};
tx.sign(&mut OsRng, genesis, self.our_key);
self.publish_tributary_tx.publish_tributary_tx(tx).await;
return;
}
};
self
.publish_serai_tx
.publish_set_keys(
self.db,
self.spec.set(),
key_pair,
signature_participants,
Signature(sig),
)
.await;
}
Accumulation::Ready(DataSet::NotParticipating) | Accumulation::NotReady => {}
}
}
Transaction::CosignSubstrateBlock(hash) => {
AttemptDb::recognize_topic(
self.txn,
genesis,
Topic::SubstrateSign(SubstrateSignableId::CosigningSubstrateBlock(hash)),
);
let block_number = SeraiBlockNumber::get(self.txn, hash)
.expect("CosignSubstrateBlock yet didn't save Serai block number");
let msg = coordinator::CoordinatorMessage::CosignSubstrateBlock {
id: SubstrateSignId {
session: self.spec.set().session,
id: SubstrateSignableId::CosigningSubstrateBlock(hash),
attempt: 0,
},
block_number,
};
self.processors.send(self.spec.set().network, msg).await;
}
Transaction::Batch { block: _, batch } => {
// Because this Batch has achieved synchrony, its batch ID should be authorized
AttemptDb::recognize_topic(
self.txn,
genesis,
Topic::SubstrateSign(SubstrateSignableId::Batch(batch)),
);
self
.recognized_id
.recognized_id(
self.spec.set(),
genesis,
RecognizedIdType::Batch,
batch.to_le_bytes().to_vec(),
)
.await;
}
Transaction::SubstrateBlock(block) => {
let plan_ids = PlanIds::get(self.txn, &genesis, block).expect(
"synced a tributary block finalizing a substrate block in a provided transaction \
despite us not providing that transaction",
);
for id in plan_ids {
AttemptDb::recognize_topic(self.txn, genesis, Topic::Sign(id));
self
.recognized_id
.recognized_id(self.spec.set(), genesis, RecognizedIdType::Plan, id.to_vec())
.await;
}
}
Transaction::SubstrateSign(data) => {
let signer = data.signed.signer;
let Ok(()) = self.check_sign_data_len(signer, data.data.len()) else {
return;
};
let expected_len = match data.label {
Label::Preprocess => 64,
Label::Share => 32,
};
for data in &data.data {
if data.len() != expected_len {
self.fatal_slash(
signer.to_bytes(),
"unexpected length data for substrate signing protocol",
);
return;
}
}
let data_spec = DataSpecification {
topic: Topic::SubstrateSign(data.plan),
label: data.label,
attempt: data.attempt,
};
let Accumulation::Ready(DataSet::Participating(mut results)) =
self.handle_data(&data_spec, &data.data.encode(), &data.signed)
else {
return;
};
unflatten(self.spec, &mut results);
let id = SubstrateSignId {
session: self.spec.set().session,
id: data.plan,
attempt: data.attempt,
};
let msg = match data.label {
Label::Preprocess => coordinator::CoordinatorMessage::SubstratePreprocesses {
id,
preprocesses: results.into_iter().map(|(v, p)| (v, p.try_into().unwrap())).collect(),
},
Label::Share => coordinator::CoordinatorMessage::SubstrateShares {
id,
shares: results.into_iter().map(|(v, p)| (v, p.try_into().unwrap())).collect(),
},
};
self.processors.send(self.spec.set().network, msg).await;
}
Transaction::Sign(data) => {
let Ok(()) = self.check_sign_data_len(data.signed.signer, data.data.len()) else {
return;
};
let data_spec = DataSpecification {
topic: Topic::Sign(data.plan),
label: data.label,
attempt: data.attempt,
};
if let Accumulation::Ready(DataSet::Participating(mut results)) =
self.handle_data(&data_spec, &data.data.encode(), &data.signed)
{
unflatten(self.spec, &mut results);
let id =
SignId { session: self.spec.set().session, id: data.plan, attempt: data.attempt };
self
.processors
.send(
self.spec.set().network,
match data.label {
Label::Preprocess => {
sign::CoordinatorMessage::Preprocesses { id, preprocesses: results }
}
Label::Share => sign::CoordinatorMessage::Shares { id, shares: results },
},
)
.await;
}
}
Transaction::SignCompleted { plan, tx_hash, first_signer, signature: _ } => {
log::info!(
"on-chain SignCompleted claims {} completes {}",
hex::encode(&tx_hash),
hex::encode(plan)
);
if AttemptDb::attempt(self.txn, genesis, Topic::Sign(plan)).is_none() {
self.fatal_slash(first_signer.to_bytes(), "claimed an unrecognized plan was completed");
return;
};
// TODO: Confirm this signer hasn't prior published a completion
let msg = sign::CoordinatorMessage::Completed {
session: self.spec.set().session,
id: plan,
tx: tx_hash,
};
self.processors.send(self.spec.set().network, msg).await;
}
Transaction::SlashReport(points, signed) => {
let signer_range = self.spec.i(signed.signer).unwrap();
let signer_len = u16::from(signer_range.end) - u16::from(signer_range.start);
if points.len() != (self.spec.validators().len() - 1) {
self.fatal_slash(
signed.signer.to_bytes(),
"submitted a distinct amount of slash points to participants",
);
return;
}
if SlashReports::get(self.txn, genesis, signed.signer.to_bytes()).is_some() {
self.fatal_slash(signed.signer.to_bytes(), "submitted multiple slash points");
return;
}
SlashReports::set(self.txn, genesis, signed.signer.to_bytes(), &points);
let prior_reported = SlashReported::get(self.txn, genesis).unwrap_or(0);
let now_reported = prior_reported + signer_len;
SlashReported::set(self.txn, genesis, &now_reported);
if (prior_reported < self.spec.t()) && (now_reported >= self.spec.t()) {
SlashReportCutOff::set(
self.txn,
genesis,
// 30 minutes into the future
&(u64::from(self.block_number) +
((30 * 60 * 1000) / u64::from(tributary::tendermint::TARGET_BLOCK_TIME))),
);
}
}
}
}
}

View File

@@ -1,63 +1,6 @@
use tributary::{
ReadWrite,
transaction::{TransactionError, TransactionKind, Transaction as TransactionTrait},
Tributary,
};
mod transaction;
pub use transaction::Transaction;
mod db;
pub use db::*;
mod spec;
pub use spec::TributarySpec;
mod transaction;
pub use transaction::{Label, SignData, Transaction};
mod signing_protocol;
mod handle;
pub use handle::*;
pub mod scanner;
pub async fn publish_signed_transaction<D: Db, P: crate::P2p>(
txn: &mut D::Transaction<'_>,
tributary: &Tributary<D, Transaction, P>,
tx: Transaction,
) {
log::debug!("publishing transaction {}", hex::encode(tx.hash()));
let (order, signer) = if let TransactionKind::Signed(order, signed) = tx.kind() {
let signer = signed.signer;
// Safe as we should deterministically create transactions, meaning if this is already on-disk,
// it's what we're saving now
SignedTransactionDb::set(txn, &order, signed.nonce, &tx.serialize());
(order, signer)
} else {
panic!("non-signed transaction passed to publish_signed_transaction");
};
// If we're trying to publish 5, when the last transaction published was 3, this will delay
// publication until the point in time we publish 4
while let Some(tx) = SignedTransactionDb::take_signed_transaction(
txn,
&order,
tributary
.next_nonce(&signer, &order)
.await
.expect("we don't have a nonce, meaning we aren't a participant on this tributary"),
) {
// We need to return a proper error here to enable that, due to a race condition around
// multiple publications
match tributary.add_transaction(tx.clone()).await {
Ok(_) => {}
// Some asynchonicity if InvalidNonce, assumed safe to deterministic nonces
Err(TransactionError::InvalidNonce) => {
log::warn!("publishing TX {tx:?} returned InvalidNonce. was it already added?")
}
Err(e) => panic!("created an invalid transaction: {e:?}"),
}
}
}
mod scan;

View File

@@ -0,0 +1,203 @@
use core::future::Future;
use std::collections::HashMap;
use ciphersuite::group::GroupEncoding;
use serai_client::{primitives::SeraiAddress, validator_sets::primitives::ValidatorSet};
use tributary::{
Signed as TributarySigned, TransactionError, TransactionKind, TransactionTrait,
Transaction as TributaryTransaction, Block, TributaryReader,
tendermint::{
tx::{TendermintTx, Evidence, decode_signed_message},
TendermintNetwork,
},
};
use serai_db::*;
use serai_task::ContinuallyRan;
use crate::tributary::{
db::*,
transaction::{Signed, Transaction},
};
struct ScanBlock<'a, D: DbTxn, TD: Db> {
txn: &'a mut D,
set: ValidatorSet,
validators: &'a [SeraiAddress],
total_weight: u64,
validator_weights: &'a HashMap<SeraiAddress, u64>,
tributary: &'a TributaryReader<TD, Transaction>,
}
impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) {
let signer = |signed: Signed| SeraiAddress(signed.signer.to_bytes());
if let TransactionKind::Signed(_, TributarySigned { signer, .. }) = tx.kind() {
// Don't handle transactions from those fatally slashed
// TODO: The fact they can publish these TXs makes this a notable spam vector
if TributaryDb::is_fatally_slashed(self.txn, self.set, SeraiAddress(signer.to_bytes())) {
return;
}
}
match tx {
Transaction::RemoveParticipant { participant, signed } => {
// Accumulate this vote and fatally slash the participant if past the threshold
let signer = signer(signed);
match TributaryDb::accumulate(
self.txn,
self.set,
self.validators,
self.total_weight,
block_number,
Topic::RemoveParticipant { participant },
signer,
self.validator_weights[&signer],
&(),
) {
DataSet::None => {}
DataSet::Participating(_) => {
TributaryDb::fatal_slash(self.txn, self.set, participant, "voted to remove")
}
}
}
Transaction::DkgParticipation { participation, signed } => {
// Send the participation to the processor
todo!("TODO")
}
Transaction::DkgConfirmationPreprocess { attempt, preprocess, signed } => {
// Accumulate the preprocesses into our own FROST attempt manager
todo!("TODO")
}
Transaction::DkgConfirmationShare { attempt, share, signed } => {
// Accumulate the shares into our own FROST attempt manager
todo!("TODO")
}
Transaction::Cosign { substrate_block_hash } => {
// Update the latest intended-to-be-cosigned Substrate block
todo!("TODO")
}
Transaction::Cosigned { substrate_block_hash } => {
// Start cosigning the latest intended-to-be-cosigned block
todo!("TODO")
}
Transaction::SubstrateBlock { hash } => {
// Whitelist all of the IDs this Substrate block causes to be signed
todo!("TODO")
}
Transaction::Batch { hash } => {
// Whitelist the signing of this batch, publishing our own preprocess
todo!("TODO")
}
Transaction::SlashReport { slash_points, signed } => {
// Accumulate, and if past the threshold, calculate *the* slash report and start signing it
todo!("TODO")
}
Transaction::Sign { id, attempt, label, data, signed } => todo!("TODO"),
}
}
fn handle_block(mut self, block_number: u64, block: Block<Transaction>) {
TributaryDb::start_of_block(self.txn, self.set, block_number);
for tx in block.transactions {
match tx {
TributaryTransaction::Tendermint(TendermintTx::SlashEvidence(ev)) => {
// Since the evidence is on the chain, it will have already been validated
// We can just punish the signer
let data = match ev {
Evidence::ConflictingMessages(first, second) => (first, Some(second)),
Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None),
};
/* TODO
let msgs = (
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.0).unwrap(),
if data.1.is_some() {
Some(
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.1.unwrap())
.unwrap(),
)
} else {
None
},
);
// Since anything with evidence is fundamentally faulty behavior, not just temporal
// errors, mark the node as fatally slashed
TributaryDb::fatal_slash(
self.txn, msgs.0.msg.sender, &format!("invalid tendermint messages: {msgs:?}"));
*/
todo!("TODO")
}
TributaryTransaction::Application(tx) => {
self.handle_application_tx(block_number, tx);
}
}
}
}
}
struct ScanTributaryTask<D: Db, TD: Db> {
db: D,
set: ValidatorSet,
validators: Vec<SeraiAddress>,
total_weight: u64,
validator_weights: HashMap<SeraiAddress, u64>,
tributary: TributaryReader<TD, Transaction>,
}
impl<D: Db, TD: Db> ContinuallyRan for ScanTributaryTask<D, TD> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let (mut last_block_number, mut last_block_hash) =
TributaryDb::last_handled_tributary_block(&self.db, self.set)
.unwrap_or((0, self.tributary.genesis()));
let mut made_progess = false;
while let Some(next) = self.tributary.block_after(&last_block_hash) {
let block = self.tributary.block(&next).unwrap();
let block_number = last_block_number + 1;
let block_hash = block.hash();
// Make sure we have all of the provided transactions for this block
for tx in &block.transactions {
let TransactionKind::Provided(order) = tx.kind() else {
continue;
};
// make sure we have all the provided txs in this block locally
if !self.tributary.locally_provided_txs_in_block(&block_hash, order) {
return Err(format!(
"didn't have the provided Transactions on-chain for set (ephemeral error): {:?}",
self.set
));
}
}
let mut txn = self.db.txn();
(ScanBlock {
txn: &mut txn,
set: self.set,
validators: &self.validators,
total_weight: self.total_weight,
validator_weights: &self.validator_weights,
tributary: &self.tributary,
})
.handle_block(block_number, block);
TributaryDb::set_last_handled_tributary_block(&mut txn, self.set, block_number, block_hash);
last_block_number = block_number;
last_block_hash = block_hash;
txn.commit();
made_progess = true;
}
Ok(made_progess)
}
}
}

View File

@@ -1,685 +0,0 @@
use core::{marker::PhantomData, future::Future, time::Duration};
use std::sync::Arc;
use zeroize::Zeroizing;
use rand_core::OsRng;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use tokio::sync::broadcast;
use scale::{Encode, Decode};
use serai_client::{
primitives::Signature,
validator_sets::primitives::{KeyPair, ValidatorSet},
Serai,
};
use serai_db::DbTxn;
use processor_messages::coordinator::{SubstrateSignId, SubstrateSignableId};
use tributary::{
TransactionKind, Transaction as TributaryTransaction, TransactionError, Block, TributaryReader,
tendermint::{
tx::{TendermintTx, Evidence, decode_signed_message},
TendermintNetwork,
},
};
use crate::{Db, processors::Processors, substrate::BatchInstructionsHashDb, tributary::*, P2p};
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
pub enum RecognizedIdType {
Batch,
Plan,
}
#[async_trait::async_trait]
pub trait RIDTrait {
async fn recognized_id(
&self,
set: ValidatorSet,
genesis: [u8; 32],
kind: RecognizedIdType,
id: Vec<u8>,
);
}
#[async_trait::async_trait]
impl<
FRid: Send + Future<Output = ()>,
F: Sync + Fn(ValidatorSet, [u8; 32], RecognizedIdType, Vec<u8>) -> FRid,
> RIDTrait for F
{
async fn recognized_id(
&self,
set: ValidatorSet,
genesis: [u8; 32],
kind: RecognizedIdType,
id: Vec<u8>,
) {
(self)(set, genesis, kind, id).await
}
}
#[async_trait::async_trait]
pub trait PublishSeraiTransaction {
async fn publish_set_keys(
&self,
db: &(impl Sync + Get),
set: ValidatorSet,
key_pair: KeyPair,
signature_participants: bitvec::vec::BitVec<u8, bitvec::order::Lsb0>,
signature: Signature,
);
}
mod impl_pst_for_serai {
use super::*;
use serai_client::SeraiValidatorSets;
// Uses a macro because Rust can't resolve the lifetimes/generics around the check function
// check is expected to return true if the effect has already occurred
// The generated publish function will return true if *we* published the transaction
macro_rules! common_pst {
($Meta: ty, $check: ident) => {
async fn publish(
serai: &Serai,
db: &impl Get,
set: ValidatorSet,
tx: serai_client::Transaction,
meta: $Meta,
) -> bool {
loop {
match serai.publish(&tx).await {
Ok(_) => return true,
// This is assumed to be some ephemeral error due to the assumed fault-free
// creation
// TODO2: Differentiate connection errors from invariants
Err(e) => {
// The following block is irrelevant, and can/likely will fail, if we're publishing
// a TX for an old session
// If we're on a newer session, move on
if crate::RetiredTributaryDb::get(db, set).is_some() {
log::warn!("trying to publish a TX relevant to set {set:?} which isn't the latest");
return false;
}
if let Ok(serai) = serai.as_of_latest_finalized_block().await {
let serai = serai.validator_sets();
// Check if someone else published the TX in question
if $check(serai, set, meta).await {
return false;
}
}
log::error!("couldn't connect to Serai node to publish TX: {e:?}");
tokio::time::sleep(core::time::Duration::from_secs(5)).await;
}
}
}
}
};
}
#[async_trait::async_trait]
impl PublishSeraiTransaction for Serai {
async fn publish_set_keys(
&self,
db: &(impl Sync + Get),
set: ValidatorSet,
key_pair: KeyPair,
signature_participants: bitvec::vec::BitVec<u8, bitvec::order::Lsb0>,
signature: Signature,
) {
let tx =
SeraiValidatorSets::set_keys(set.network, key_pair, signature_participants, signature);
async fn check(serai: SeraiValidatorSets<'_>, set: ValidatorSet, (): ()) -> bool {
if matches!(serai.keys(set).await, Ok(Some(_))) {
log::info!("another coordinator set key pair for {:?}", set);
return true;
}
false
}
common_pst!((), check);
if publish(self, db, set, tx, ()).await {
log::info!("published set keys for {set:?}");
}
}
}
}
#[async_trait::async_trait]
pub trait PTTTrait {
async fn publish_tributary_tx(&self, tx: Transaction);
}
#[async_trait::async_trait]
impl<FPtt: Send + Future<Output = ()>, F: Sync + Fn(Transaction) -> FPtt> PTTTrait for F {
async fn publish_tributary_tx(&self, tx: Transaction) {
(self)(tx).await
}
}
pub struct TributaryBlockHandler<
'a,
D: Db,
T: DbTxn,
Pro: Processors,
PST: PublishSeraiTransaction,
PTT: PTTTrait,
RID: RIDTrait,
P: P2p,
> {
pub db: &'a D,
pub txn: &'a mut T,
pub our_key: &'a Zeroizing<<Ristretto as Ciphersuite>::F>,
pub recognized_id: &'a RID,
pub processors: &'a Pro,
pub publish_serai_tx: &'a PST,
pub publish_tributary_tx: &'a PTT,
pub spec: &'a TributarySpec,
block: Block<Transaction>,
pub block_number: u32,
_p2p: PhantomData<P>,
}
impl<
D: Db,
T: DbTxn,
Pro: Processors,
PST: PublishSeraiTransaction,
PTT: PTTTrait,
RID: RIDTrait,
P: P2p,
> TributaryBlockHandler<'_, D, T, Pro, PST, PTT, RID, P>
{
pub fn fatal_slash(&mut self, slashing: [u8; 32], reason: &str) {
let genesis = self.spec.genesis();
log::warn!("fatally slashing {}. reason: {}", hex::encode(slashing), reason);
FatallySlashed::set_fatally_slashed(self.txn, genesis, slashing);
// TODO: disconnect the node from network/ban from further participation in all Tributaries
}
// TODO: Once Substrate confirms a key, we need to rotate our validator set OR form a second
// Tributary post-DKG
// https://github.com/serai-dex/serai/issues/426
async fn handle(mut self) {
log::info!("found block for Tributary {:?}", self.spec.set());
let transactions = self.block.transactions.clone();
for tx in transactions {
match tx {
TributaryTransaction::Tendermint(TendermintTx::SlashEvidence(ev)) => {
// Since the evidence is on the chain, it should already have been validated
// We can just punish the signer
let data = match ev {
Evidence::ConflictingMessages(first, second) => (first, Some(second)),
Evidence::InvalidPrecommit(first) | Evidence::InvalidValidRound(first) => (first, None),
};
let msgs = (
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.0).unwrap(),
if data.1.is_some() {
Some(
decode_signed_message::<TendermintNetwork<D, Transaction, P>>(&data.1.unwrap())
.unwrap(),
)
} else {
None
},
);
// Since anything with evidence is fundamentally faulty behavior, not just temporal
// errors, mark the node as fatally slashed
self.fatal_slash(msgs.0.msg.sender, &format!("invalid tendermint messages: {msgs:?}"));
}
TributaryTransaction::Application(tx) => {
self.handle_application_tx(tx).await;
}
}
}
let genesis = self.spec.genesis();
// Calculate the shares still present, spinning if not enough are
{
// Start with the original n value
let mut present_shares = self.spec.n();
// Remove everyone fatally slashed
let current_fatal_slashes = FatalSlashes::get_as_keys(self.txn, genesis);
for removed in &current_fatal_slashes {
let original_i_for_removed =
self.spec.i(*removed).expect("removed party was never present");
let removed_shares =
u16::from(original_i_for_removed.end) - u16::from(original_i_for_removed.start);
present_shares -= removed_shares;
}
// Spin if the present shares don't satisfy the required threshold
if present_shares < self.spec.t() {
loop {
log::error!(
"fatally slashed so many participants for {:?} we no longer meet the threshold",
self.spec.set()
);
tokio::time::sleep(core::time::Duration::from_secs(60)).await;
}
}
}
for topic in ReattemptDb::take(self.txn, genesis, self.block_number) {
let attempt = AttemptDb::start_next_attempt(self.txn, genesis, topic);
log::info!("potentially re-attempting {topic:?} with attempt {attempt}");
// Slash people who failed to participate as expected in the prior attempt
{
let prior_attempt = attempt - 1;
// TODO: If 67% sent preprocesses, this should be them. Else, this should be vec![]
let expected_participants: Vec<<Ristretto as Ciphersuite>::G> = vec![];
let mut did_not_participate = vec![];
for expected_participant in expected_participants {
if DataDb::get(
self.txn,
genesis,
&DataSpecification {
topic,
// Since we got the preprocesses, we were supposed to get the shares
label: Label::Share,
attempt: prior_attempt,
},
&expected_participant.to_bytes(),
)
.is_none()
{
did_not_participate.push(expected_participant);
}
}
// If a supermajority didn't participate as expected, the protocol was likely aborted due
// to detection of a completion or some larger networking error
// Accordingly, clear did_not_participate
// TODO
// TODO: Increment the slash points of people who didn't preprocess in some expected window
// of time
// Slash everyone who didn't participate as expected
// This may be overzealous as if a minority detects a completion, they'll abort yet the
// supermajority will cause the above allowance to not trigger, causing an honest minority
// to be slashed
// At the end of the protocol, the accumulated slashes are reduced by the amount obtained
// by the worst-performing member of the supermajority, and this is expected to
// sufficiently compensate for slashes which occur under normal operation
// TODO
}
/*
All of these have the same common flow:
1) Check if this re-attempt is actually needed
2) If so, dispatch whatever events as needed
This is because we *always* re-attempt any protocol which had participation. That doesn't
mean we *should* re-attempt this protocol.
The alternatives were:
1) Note on-chain we completed a protocol, halting re-attempts upon 34%.
2) Vote on-chain to re-attempt a protocol.
This schema doesn't have any additional messages upon the success case (whereas
alternative #1 does) and doesn't have overhead (as alternative #2 does, sending votes and
then preprocesses. This only sends preprocesses).
*/
match topic {
Topic::DkgConfirmation => {
if SeraiDkgCompleted::get(self.txn, self.spec.set()).is_none() {
log::info!("re-attempting DKG confirmation with attempt {attempt}");
// Since it wasn't completed, publish our nonces for the next attempt
let confirmation_nonces =
crate::tributary::dkg_confirmation_nonces(self.our_key, self.spec, self.txn, attempt);
let mut tx = Transaction::DkgConfirmationNonces {
attempt,
confirmation_nonces,
signed: Transaction::empty_signed(),
};
tx.sign(&mut OsRng, genesis, self.our_key);
self.publish_tributary_tx.publish_tributary_tx(tx).await;
}
}
Topic::SubstrateSign(inner_id) => {
let id = processor_messages::coordinator::SubstrateSignId {
session: self.spec.set().session,
id: inner_id,
attempt,
};
match inner_id {
SubstrateSignableId::CosigningSubstrateBlock(block) => {
let block_number = SeraiBlockNumber::get(self.txn, block)
.expect("couldn't get the block number for prior attempted cosign");
// Check if the cosigner has a signature from our set for this block/a newer one
let latest_cosign =
crate::cosign_evaluator::LatestCosign::get(self.txn, self.spec.set().network)
.map_or(0, |cosign| cosign.block_number);
if latest_cosign < block_number {
log::info!("re-attempting cosigning {block_number:?} with attempt {attempt}");
// Instruct the processor to start the next attempt
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
id,
block_number,
},
)
.await;
}
}
SubstrateSignableId::Batch(batch) => {
// If the Batch hasn't appeared on-chain...
if BatchInstructionsHashDb::get(self.txn, self.spec.set().network, batch).is_none() {
log::info!("re-attempting signing batch {batch:?} with attempt {attempt}");
// Instruct the processor to start the next attempt
// The processor won't continue if it's already signed a Batch
// Prior checking if the Batch is on-chain just may reduce the non-participating
// 33% from publishing their re-attempt messages
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::BatchReattempt { id },
)
.await;
}
}
SubstrateSignableId::SlashReport => {
// If this Tributary hasn't been retired...
// (published SlashReport/took too long to do so)
if crate::RetiredTributaryDb::get(self.txn, self.spec.set()).is_none() {
log::info!(
"re-attempting signing slash report for {:?} with attempt {attempt}",
self.spec.set()
);
let report = SlashReport::get(self.txn, self.spec.set())
.expect("re-attempting signing a SlashReport we don't have?");
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::SignSlashReport {
id,
report,
},
)
.await;
}
}
}
}
Topic::Sign(id) => {
// Instruct the processor to start the next attempt
// If it has already noted a completion, it won't send a preprocess and will simply drop
// the re-attempt message
self
.processors
.send(
self.spec.set().network,
processor_messages::sign::CoordinatorMessage::Reattempt {
id: processor_messages::sign::SignId {
session: self.spec.set().session,
id,
attempt,
},
},
)
.await;
}
}
}
if Some(u64::from(self.block_number)) == SlashReportCutOff::get(self.txn, genesis) {
// Grab every slash report
let mut all_reports = vec![];
for (i, (validator, _)) in self.spec.validators().into_iter().enumerate() {
let Some(mut report) = SlashReports::get(self.txn, genesis, validator.to_bytes()) else {
continue;
};
// Assign them 0 points for themselves
report.insert(i, 0);
let signer_i = self.spec.i(validator).unwrap();
let signer_len = u16::from(signer_i.end) - u16::from(signer_i.start);
// Push `n` copies, one for each of their shares
for _ in 0 .. signer_len {
all_reports.push(report.clone());
}
}
// For each participant, grab their median
let mut medians = vec![];
for p in 0 .. self.spec.validators().len() {
let mut median_calc = vec![];
for report in &all_reports {
median_calc.push(report[p]);
}
median_calc.sort_unstable();
medians.push(median_calc[median_calc.len() / 2]);
}
// Grab the points of the last party within the best-performing threshold
// This is done by first expanding the point values by the amount of shares
let mut sorted_medians = vec![];
for (i, (_, shares)) in self.spec.validators().into_iter().enumerate() {
for _ in 0 .. shares {
sorted_medians.push(medians[i]);
}
}
// Then performing the sort
sorted_medians.sort_unstable();
let worst_points_by_party_within_threshold = sorted_medians[usize::from(self.spec.t()) - 1];
// Reduce everyone's points by this value
for median in &mut medians {
*median = median.saturating_sub(worst_points_by_party_within_threshold);
}
// The threshold now has the proper incentive to report this as they no longer suffer
// negative effects
//
// Additionally, if all validators had degraded performance, they don't all get penalized for
// what's likely outside their control (as it occurred universally)
// Mark everyone fatally slashed with u32::MAX
for (i, (validator, _)) in self.spec.validators().into_iter().enumerate() {
if FatallySlashed::get(self.txn, genesis, validator.to_bytes()).is_some() {
medians[i] = u32::MAX;
}
}
let mut report = vec![];
for (i, (validator, _)) in self.spec.validators().into_iter().enumerate() {
if medians[i] != 0 {
report.push((validator.to_bytes(), medians[i]));
}
}
// This does lock in the report, meaning further slash point accumulations won't be reported
// They still have value to be locally tracked due to local decisions made based off
// accumulated slash reports
SlashReport::set(self.txn, self.spec.set(), &report);
// Start a signing protocol for this
self
.processors
.send(
self.spec.set().network,
processor_messages::coordinator::CoordinatorMessage::SignSlashReport {
id: SubstrateSignId {
session: self.spec.set().session,
id: SubstrateSignableId::SlashReport,
attempt: 0,
},
report,
},
)
.await;
}
}
}
#[allow(clippy::too_many_arguments)]
pub(crate) async fn handle_new_blocks<
D: Db,
Pro: Processors,
PST: PublishSeraiTransaction,
PTT: PTTTrait,
RID: RIDTrait,
P: P2p,
>(
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &RID,
processors: &Pro,
publish_serai_tx: &PST,
publish_tributary_tx: &PTT,
spec: &TributarySpec,
tributary: &TributaryReader<D, Transaction>,
) {
let genesis = tributary.genesis();
let mut last_block = LastHandledBlock::get(db, genesis).unwrap_or(genesis);
let mut block_number = TributaryBlockNumber::get(db, last_block).unwrap_or(0);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();
block_number += 1;
// Make sure we have all of the provided transactions for this block
for tx in &block.transactions {
// Provided TXs will appear first in the Block, so we can break after we hit a non-Provided
let TransactionKind::Provided(order) = tx.kind() else {
break;
};
// make sure we have all the provided txs in this block locally
if !tributary.locally_provided_txs_in_block(&block.hash(), order) {
return;
}
}
let mut db_clone = db.clone();
let mut txn = db_clone.txn();
TributaryBlockNumber::set(&mut txn, next, &block_number);
(TributaryBlockHandler {
db,
txn: &mut txn,
spec,
our_key: key,
recognized_id,
processors,
publish_serai_tx,
publish_tributary_tx,
block,
block_number,
_p2p: PhantomData::<P>,
})
.handle()
.await;
last_block = next;
LastHandledBlock::set(&mut txn, genesis, &next);
txn.commit();
}
}
pub(crate) async fn scan_tributaries_task<
D: Db,
Pro: Processors,
P: P2p,
RID: 'static + Send + Sync + Clone + RIDTrait,
>(
raw_db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: RID,
processors: Pro,
serai: Arc<Serai>,
mut tributary_event: broadcast::Receiver<crate::TributaryEvent<D, P>>,
) {
log::info!("scanning tributaries");
loop {
match tributary_event.recv().await {
Ok(crate::TributaryEvent::NewTributary(crate::ActiveTributary { spec, tributary })) => {
// For each Tributary, spawn a dedicated scanner task
tokio::spawn({
let raw_db = raw_db.clone();
let key = key.clone();
let recognized_id = recognized_id.clone();
let processors = processors.clone();
let serai = serai.clone();
async move {
let spec = &spec;
let reader = tributary.reader();
let mut tributary_db = raw_db.clone();
loop {
// Check if the set was retired, and if so, don't further operate
if crate::db::RetiredTributaryDb::get(&raw_db, spec.set()).is_some() {
break;
}
// Obtain the next block notification now to prevent obtaining it immediately after
// the next block occurs
let next_block_notification = tributary.next_block_notification().await;
handle_new_blocks::<_, _, _, _, _, P>(
&mut tributary_db,
&key,
&recognized_id,
&processors,
&*serai,
&|tx: Transaction| {
let tributary = tributary.clone();
async move {
match tributary.add_transaction(tx.clone()).await {
Ok(_) => {}
// Can happen as this occurs on a distinct DB TXN
Err(TransactionError::InvalidNonce) => {
log::warn!(
"publishing TX {tx:?} returned InvalidNonce. was it already added?"
)
}
Err(e) => panic!("created an invalid transaction: {e:?}"),
}
}
},
spec,
&reader,
)
.await;
// Run either when the notification fires, or every interval of block_time
let _ = tokio::time::timeout(
Duration::from_secs(tributary::Tributary::<D, Transaction, P>::block_time().into()),
next_block_notification,
)
.await;
}
}
});
}
// The above loop simply checks the DB every few seconds, voiding the need for this event
Ok(crate::TributaryEvent::TributaryRetired(_)) => {}
Err(broadcast::error::RecvError::Lagged(_)) => {
panic!("scan_tributaries lagged to handle tributary_event")
}
Err(broadcast::error::RecvError::Closed) => panic!("tributary_event sender closed"),
}
}
}

View File

@@ -1,361 +0,0 @@
/*
A MuSig-based signing protocol executed with the validators' keys.
This is used for confirming the results of a DKG on-chain, an operation requiring all validators
which aren't specified as removed while still satisfying a supermajority.
Since we're using the validator's keys, as needed for their being the root of trust, the
coordinator must perform the signing. This is distinct from all other group-signing operations,
as they're all done by the processor.
The MuSig-aggregation achieves on-chain efficiency and enables a more secure design pattern.
While we could individually tack votes, that'd require logic to prevent voting multiple times and
tracking the accumulated votes. MuSig-aggregation simply requires checking the list is sorted and
the list's weight exceeds the threshold.
Instead of maintaining state in memory, a combination of the DB and re-execution are used. This
is deemed acceptable re: performance as:
1) This is only done prior to a DKG being confirmed on Substrate and is assumed infrequent.
2) This is an O(n) algorithm.
3) The size of the validator set is bounded by MAX_KEY_SHARES_PER_SET.
Accordingly, this should be tolerable.
As for safety, it is explicitly unsafe to reuse nonces across signing sessions. This raises
concerns regarding our re-execution which is dependent on fixed nonces. Safety is derived from
the nonces being context-bound under a BFT protocol. The flow is as follows:
1) Decide the nonce.
2) Publish the nonces' commitments, receiving everyone elses *and potentially the message to be
signed*.
3) Sign and publish the signature share.
In order for nonce re-use to occur, the received nonce commitments (or the message to be signed)
would have to be distinct and sign would have to be called again.
Before we act on any received messages, they're ordered and finalized by a BFT algorithm. The
only way to operate on distinct received messages would be if:
1) A logical flaw exists, letting new messages over write prior messages
2) A reorganization occurred from chain A to chain B, and with it, different messages
Reorganizations are not supported, as BFT is assumed by the presence of a BFT algorithm. While
a significant amount of processes may be byzantine, leading to BFT being broken, that still will
not trigger a reorganization. The only way to move to a distinct chain, with distinct messages,
would be by rebuilding the local process (this time following chain B). Upon any complete
rebuild, we'd re-decide nonces, achieving safety. This does set a bound preventing partial
rebuilds which is accepted.
Additionally, to ensure a rebuilt service isn't flagged as malicious, we have to check the
commitments generated from the decided nonces are in fact its commitments on-chain (TODO).
TODO: We also need to review how we're handling Processor preprocesses and likely implement the
same on-chain-preprocess-matches-presumed-preprocess check before publishing shares.
*/
use core::ops::Deref;
use std::collections::{HashSet, HashMap};
use zeroize::{Zeroize, Zeroizing};
use rand_core::OsRng;
use blake2::{Digest, Blake2s256};
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
use frost::{
FrostError,
dkg::{Participant, musig::musig},
ThresholdKeys,
sign::*,
};
use frost_schnorrkel::Schnorrkel;
use scale::Encode;
#[rustfmt::skip]
use serai_client::validator_sets::primitives::{ValidatorSet, KeyPair, musig_context, set_keys_message};
use serai_db::*;
use crate::tributary::TributarySpec;
create_db!(
SigningProtocolDb {
CachedPreprocesses: (context: &impl Encode) -> [u8; 32]
DataSignedWith: (context: &impl Encode) -> (Vec<u8>, HashMap<Participant, Vec<u8>>),
}
);
struct SigningProtocol<'a, T: DbTxn, C: Encode> {
pub(crate) key: &'a Zeroizing<<Ristretto as Ciphersuite>::F>,
pub(crate) spec: &'a TributarySpec,
pub(crate) txn: &'a mut T,
pub(crate) context: C,
}
impl<T: DbTxn, C: Encode> SigningProtocol<'_, T, C> {
fn preprocess_internal(
&mut self,
participants: &[<Ristretto as Ciphersuite>::G],
) -> (AlgorithmSignMachine<Ristretto, Schnorrkel>, [u8; 64]) {
// Encrypt the cached preprocess as recovery of it will enable recovering the private key
// While the DB isn't expected to be arbitrarily readable, it isn't a proper secret store and
// shouldn't be trusted as one
let mut encryption_key = {
let mut encryption_key_preimage =
Zeroizing::new(b"Cached Preprocess Encryption Key".to_vec());
encryption_key_preimage.extend(self.context.encode());
let repr = Zeroizing::new(self.key.to_repr());
encryption_key_preimage.extend(repr.deref());
Blake2s256::digest(&encryption_key_preimage)
};
let encryption_key_slice: &mut [u8] = encryption_key.as_mut();
// Create the MuSig keys
let keys: ThresholdKeys<Ristretto> =
musig(&musig_context(self.spec.set()), self.key, participants)
.expect("signing for a set we aren't in/validator present multiple times")
.into();
// Define the algorithm
let algorithm = Schnorrkel::new(b"substrate");
// Check if we've prior preprocessed
if CachedPreprocesses::get(self.txn, &self.context).is_none() {
// If we haven't, we create a machine solely to obtain the preprocess with
let (machine, _) =
AlgorithmMachine::new(algorithm.clone(), keys.clone()).preprocess(&mut OsRng);
// Cache and save the preprocess to disk
let mut cache = machine.cache();
assert_eq!(cache.0.len(), 32);
#[allow(clippy::needless_range_loop)]
for b in 0 .. 32 {
cache.0[b] ^= encryption_key_slice[b];
}
CachedPreprocesses::set(self.txn, &self.context, &cache.0);
}
// We're now guaranteed to have the preprocess, hence why this `unwrap` is safe
let cached = CachedPreprocesses::get(self.txn, &self.context).unwrap();
let mut cached = Zeroizing::new(cached);
#[allow(clippy::needless_range_loop)]
for b in 0 .. 32 {
cached[b] ^= encryption_key_slice[b];
}
encryption_key_slice.zeroize();
// Create the machine from the cached preprocess
let (machine, preprocess) =
AlgorithmSignMachine::from_cache(algorithm, keys, CachedPreprocess(cached));
(machine, preprocess.serialize().try_into().unwrap())
}
fn share_internal(
&mut self,
participants: &[<Ristretto as Ciphersuite>::G],
mut serialized_preprocesses: HashMap<Participant, Vec<u8>>,
msg: &[u8],
) -> Result<(AlgorithmSignatureMachine<Ristretto, Schnorrkel>, [u8; 32]), Participant> {
// We can't clear the preprocess as we sitll need it to accumulate all of the shares
// We do save the message we signed so any future calls with distinct messages panic
// This assumes the txn deciding this data is committed before the share is broaadcast
if let Some((existing_msg, existing_preprocesses)) =
DataSignedWith::get(self.txn, &self.context)
{
assert_eq!(msg, &existing_msg, "obtaining a signature share for a distinct message");
assert_eq!(
&serialized_preprocesses, &existing_preprocesses,
"obtaining a signature share with a distinct set of preprocesses"
);
} else {
DataSignedWith::set(
self.txn,
&self.context,
&(msg.to_vec(), serialized_preprocesses.clone()),
);
}
// Get the preprocessed machine
let (machine, _) = self.preprocess_internal(participants);
// Deserialize all the preprocesses
let mut participants = serialized_preprocesses.keys().copied().collect::<Vec<_>>();
participants.sort();
let mut preprocesses = HashMap::new();
for participant in participants {
preprocesses.insert(
participant,
machine
.read_preprocess(&mut serialized_preprocesses.remove(&participant).unwrap().as_slice())
.map_err(|_| participant)?,
);
}
// Sign the share
let (machine, share) = machine.sign(preprocesses, msg).map_err(|e| match e {
FrostError::InternalError(e) => unreachable!("FrostError::InternalError {e}"),
FrostError::InvalidParticipant(_, _) |
FrostError::InvalidSigningSet(_) |
FrostError::InvalidParticipantQuantity(_, _) |
FrostError::DuplicatedParticipant(_) |
FrostError::MissingParticipant(_) => panic!("unexpected error during sign: {e:?}"),
FrostError::InvalidPreprocess(p) | FrostError::InvalidShare(p) => p,
})?;
Ok((machine, share.serialize().try_into().unwrap()))
}
fn complete_internal(
machine: AlgorithmSignatureMachine<Ristretto, Schnorrkel>,
shares: HashMap<Participant, Vec<u8>>,
) -> Result<[u8; 64], Participant> {
let shares = shares
.into_iter()
.map(|(p, share)| {
machine.read_share(&mut share.as_slice()).map(|share| (p, share)).map_err(|_| p)
})
.collect::<Result<HashMap<_, _>, _>>()?;
let signature = machine.complete(shares).map_err(|e| match e {
FrostError::InternalError(e) => unreachable!("FrostError::InternalError {e}"),
FrostError::InvalidParticipant(_, _) |
FrostError::InvalidSigningSet(_) |
FrostError::InvalidParticipantQuantity(_, _) |
FrostError::DuplicatedParticipant(_) |
FrostError::MissingParticipant(_) => unreachable!("{e:?}"),
FrostError::InvalidPreprocess(p) | FrostError::InvalidShare(p) => p,
})?;
Ok(signature.to_bytes())
}
}
// Get the keys of the participants, noted by their threshold is, and return a new map indexed by
// their MuSig is.
fn threshold_i_map_to_keys_and_musig_i_map(
spec: &TributarySpec,
our_key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
mut map: HashMap<Participant, Vec<u8>>,
) -> (Vec<<Ristretto as Ciphersuite>::G>, HashMap<Participant, Vec<u8>>) {
// Insert our own index so calculations aren't offset
let our_threshold_i = spec
.i(<Ristretto as Ciphersuite>::generator() * our_key.deref())
.expect("not in a set we're signing for")
.start;
// Asserts we weren't unexpectedly already present
assert!(map.insert(our_threshold_i, vec![]).is_none());
let spec_validators = spec.validators();
let key_from_threshold_i = |threshold_i| {
for (key, _) in &spec_validators {
if threshold_i == spec.i(*key).expect("validator wasn't in a set they're in").start {
return *key;
}
}
panic!("requested info for threshold i which doesn't exist")
};
let mut sorted = vec![];
let mut threshold_is = map.keys().copied().collect::<Vec<_>>();
threshold_is.sort();
for threshold_i in threshold_is {
sorted.push((
threshold_i,
key_from_threshold_i(threshold_i),
map.remove(&threshold_i).unwrap(),
));
}
// Now that signers are sorted, with their shares, create a map with the is needed for MuSig
let mut participants = vec![];
let mut map = HashMap::new();
let mut our_musig_i = None;
for (raw_i, (threshold_i, key, share)) in sorted.into_iter().enumerate() {
let musig_i = Participant::new(u16::try_from(raw_i).unwrap() + 1).unwrap();
if threshold_i == our_threshold_i {
our_musig_i = Some(musig_i);
}
participants.push(key);
map.insert(musig_i, share);
}
map.remove(&our_musig_i.unwrap()).unwrap();
(participants, map)
}
type DkgConfirmerSigningProtocol<'a, T> =
SigningProtocol<'a, T, (&'static [u8; 12], ValidatorSet, u32)>;
pub(crate) struct DkgConfirmer<'a, T: DbTxn> {
key: &'a Zeroizing<<Ristretto as Ciphersuite>::F>,
spec: &'a TributarySpec,
txn: &'a mut T,
attempt: u32,
}
impl<T: DbTxn> DkgConfirmer<'_, T> {
pub(crate) fn new<'a>(
key: &'a Zeroizing<<Ristretto as Ciphersuite>::F>,
spec: &'a TributarySpec,
txn: &'a mut T,
attempt: u32,
) -> DkgConfirmer<'a, T> {
DkgConfirmer { key, spec, txn, attempt }
}
fn signing_protocol(&mut self) -> DkgConfirmerSigningProtocol<'_, T> {
let context = (b"DkgConfirmer", self.spec.set(), self.attempt);
SigningProtocol { key: self.key, spec: self.spec, txn: self.txn, context }
}
fn preprocess_internal(&mut self) -> (AlgorithmSignMachine<Ristretto, Schnorrkel>, [u8; 64]) {
// This preprocesses with just us as we only decide the participants after obtaining
// preprocesses
let participants = vec![<Ristretto as Ciphersuite>::generator() * self.key.deref()];
self.signing_protocol().preprocess_internal(&participants)
}
// Get the preprocess for this confirmation.
pub(crate) fn preprocess(&mut self) -> [u8; 64] {
self.preprocess_internal().1
}
fn share_internal(
&mut self,
preprocesses: HashMap<Participant, Vec<u8>>,
key_pair: &KeyPair,
) -> Result<(AlgorithmSignatureMachine<Ristretto, Schnorrkel>, [u8; 32]), Participant> {
let (participants, preprocesses) =
threshold_i_map_to_keys_and_musig_i_map(self.spec, self.key, preprocesses);
let msg = set_keys_message(&self.spec.set(), key_pair);
self.signing_protocol().share_internal(&participants, preprocesses, &msg)
}
// Get the share for this confirmation, if the preprocesses are valid.
pub(crate) fn share(
&mut self,
preprocesses: HashMap<Participant, Vec<u8>>,
key_pair: &KeyPair,
) -> Result<[u8; 32], Participant> {
self.share_internal(preprocesses, key_pair).map(|(_, share)| share)
}
pub(crate) fn complete(
&mut self,
preprocesses: HashMap<Participant, Vec<u8>>,
key_pair: &KeyPair,
shares: HashMap<Participant, Vec<u8>>,
) -> Result<[u8; 64], Participant> {
assert_eq!(preprocesses.keys().collect::<HashSet<_>>(), shares.keys().collect::<HashSet<_>>());
let shares = threshold_i_map_to_keys_and_musig_i_map(self.spec, self.key, shares).1;
let machine = self
.share_internal(preprocesses, key_pair)
.expect("trying to complete a machine which failed to preprocess")
.0;
DkgConfirmerSigningProtocol::<'_, T>::complete_internal(machine, shares)
}
}

View File

@@ -1,124 +0,0 @@
use core::{ops::Range, fmt::Debug};
use std::{io, collections::HashMap};
use transcript::{Transcript, RecommendedTranscript};
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use frost::Participant;
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::validator_sets::primitives::ValidatorSet;
fn borsh_serialize_validators<W: io::Write>(
validators: &Vec<(<Ristretto as Ciphersuite>::G, u16)>,
writer: &mut W,
) -> Result<(), io::Error> {
let len = u16::try_from(validators.len()).unwrap();
BorshSerialize::serialize(&len, writer)?;
for validator in validators {
BorshSerialize::serialize(&validator.0.to_bytes(), writer)?;
BorshSerialize::serialize(&validator.1, writer)?;
}
Ok(())
}
fn borsh_deserialize_validators<R: io::Read>(
reader: &mut R,
) -> Result<Vec<(<Ristretto as Ciphersuite>::G, u16)>, io::Error> {
let len: u16 = BorshDeserialize::deserialize_reader(reader)?;
let mut res = vec![];
for _ in 0 .. len {
let compressed: [u8; 32] = BorshDeserialize::deserialize_reader(reader)?;
let point = Option::from(<Ristretto as Ciphersuite>::G::from_bytes(&compressed))
.ok_or_else(|| io::Error::other("invalid point for validator"))?;
let weight: u16 = BorshDeserialize::deserialize_reader(reader)?;
res.push((point, weight));
}
Ok(res)
}
#[derive(Clone, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct TributarySpec {
serai_block: [u8; 32],
start_time: u64,
set: ValidatorSet,
#[borsh(
serialize_with = "borsh_serialize_validators",
deserialize_with = "borsh_deserialize_validators"
)]
validators: Vec<(<Ristretto as Ciphersuite>::G, u16)>,
evrf_public_keys: Vec<([u8; 32], Vec<u8>)>,
}
impl TributarySpec {
pub fn new(
serai_block: [u8; 32],
start_time: u64,
set: ValidatorSet,
validators: Vec<(<Ristretto as Ciphersuite>::G, u16)>,
evrf_public_keys: Vec<([u8; 32], Vec<u8>)>,
) -> TributarySpec {
Self { serai_block, start_time, set, validators, evrf_public_keys }
}
pub fn set(&self) -> ValidatorSet {
self.set
}
pub fn genesis(&self) -> [u8; 32] {
// Calculate the genesis for this Tributary
let mut genesis = RecommendedTranscript::new(b"Serai Tributary Genesis");
// This locks it to a specific Serai chain
genesis.append_message(b"serai_block", self.serai_block);
genesis.append_message(b"session", self.set.session.0.to_le_bytes());
genesis.append_message(b"network", self.set.network.encode());
let genesis = genesis.challenge(b"genesis");
let genesis_ref: &[u8] = genesis.as_ref();
genesis_ref[.. 32].try_into().unwrap()
}
pub fn start_time(&self) -> u64 {
self.start_time
}
pub fn n(&self) -> u16 {
self.validators.iter().map(|(_, weight)| *weight).sum()
}
pub fn t(&self) -> u16 {
((2 * self.n()) / 3) + 1
}
pub fn i(&self, key: <Ristretto as Ciphersuite>::G) -> Option<Range<Participant>> {
let mut all_is = HashMap::new();
let mut i = 1;
for (validator, weight) in &self.validators {
all_is.insert(
*validator,
Range { start: Participant::new(i).unwrap(), end: Participant::new(i + weight).unwrap() },
);
i += weight;
}
Some(all_is.get(&key)?.clone())
}
pub fn reverse_lookup_i(&self, i: Participant) -> Option<<Ristretto as Ciphersuite>::G> {
for (validator, _) in &self.validators {
if self.i(*validator).map_or(false, |range| range.contains(&i)) {
return Some(*validator);
}
}
None
}
pub fn validators(&self) -> Vec<(<Ristretto as Ciphersuite>::G, u64)> {
self.validators.iter().map(|(validator, weight)| (*validator, u64::from(*weight))).collect()
}
pub fn evrf_public_keys(&self) -> Vec<([u8; 32], Vec<u8>)> {
self.evrf_public_keys.clone()
}
}

View File

@@ -11,10 +11,10 @@ use ciphersuite::{
};
use schnorr::SchnorrSignature;
use scale::{Encode, Decode};
use scale::Encode;
use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::primitives::PublicKey;
use serai_client::primitives::SeraiAddress;
use processor_messages::sign::VariantSignId;
@@ -27,33 +27,22 @@ use tributary::{
/// The label for data from a signing protocol.
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, BorshSerialize, BorshDeserialize)]
pub enum Label {
pub enum SigningProtocolRound {
/// A preprocess.
Preprocess,
/// A signature share.
Share,
}
impl Label {
impl SigningProtocolRound {
fn nonce(&self) -> u32 {
match self {
Label::Preprocess => 0,
Label::Share => 1,
SigningProtocolRound::Preprocess => 0,
SigningProtocolRound::Share => 1,
}
}
}
fn borsh_serialize_public<W: io::Write>(
public: &PublicKey,
writer: &mut W,
) -> Result<(), io::Error> {
// This doesn't use `encode_to` as `encode_to` panics if the writer returns an error
writer.write_all(&public.encode())
}
fn borsh_deserialize_public<R: io::Read>(reader: &mut R) -> Result<PublicKey, io::Error> {
Decode::decode(&mut scale::IoReader(reader)).map_err(io::Error::other)
}
/// `tributary::Signed` without the nonce.
///
/// All of our nonces are deterministic to the type of transaction and fields within.
@@ -90,11 +79,7 @@ pub enum Transaction {
/// A vote to remove a participant for invalid behavior
RemoveParticipant {
/// The participant to remove
#[borsh(
serialize_with = "borsh_serialize_public",
deserialize_with = "borsh_deserialize_public"
)]
participant: PublicKey,
participant: SeraiAddress,
/// The transaction's signer and signature
signed: Signed,
},
@@ -119,7 +104,7 @@ pub enum Transaction {
/// The attempt number of this signing protocol
attempt: u32,
// The signature share
confirmation_share: [u8; 32],
share: [u8; 32],
/// The transaction's signer and signature
signed: Signed,
},
@@ -128,11 +113,46 @@ pub enum Transaction {
///
/// When the time comes to start a new co-signing protocol, the most recent Substrate block will
/// be the one selected to be cosigned.
CosignSubstrateBlock {
/// THe hash of the Substrate block to sign
hash: [u8; 32],
Cosign {
/// The hash of the Substrate block to sign
substrate_block_hash: [u8; 32],
},
/// The cosign for a Substrate block
///
/// After producing this cosign, we need to start work on the latest intended-to-be cosigned
/// block. That requires agreement on when this cosign was produced, which we solve by embedding
/// this cosign on chain.
///
/// We ideally don't have this transaction at all. The coordinator, without access to any of the
/// key shares, could observe the FROST signing session and determine a successful completion.
/// Unfortunately, that functionality is not present in modular-frost, so we do need to support
/// *some* asynchronous flow (where the processor or P2P network informs us of the successful
/// completion).
///
/// If we use a `Provided` transaction, that requires everyone observe this cosign.
///
/// If we use an `Unsigned` transaction, we can't verify the cosign signature inside
/// `Transaction::verify` unless we embedded the full `SignedCosign` on-chain. The issue is since
/// a Tributary is stateless with regards to the on-chain logic, including `Transaction::verify`,
/// we can't verify the signature against the group's public key unless we also include that (but
/// then we open a DoS where arbitrary group keys are specified to cause inclusion of arbitrary
/// blobs on chain).
///
/// If we use a `Signed` transaction, we mitigate the DoS risk by having someone to fatally
/// slash. We have horrible performance though as for 100 validators, all 100 will publish this
/// transaction.
///
/// We could use a signed `Unsigned` transaction, where it includes a signer and signature but
/// isn't technically a Signed transaction. This lets us de-duplicate the transaction premised on
/// its contents.
///
/// The optimal choice is likely to use a `Provided` transaction. We don't actually need to
/// observe the produced cosign (which is ephemeral). As long as it's agreed the cosign in
/// question no longer needs to produced, which would mean the cosigning protocol at-large
/// cosigning the block in question, it'd be safe to provide this and move on to the next cosign.
Cosigned { substrate_block_hash: [u8; 32] },
/// Acknowledge a Substrate block
///
/// This is provided after the block has been cosigned.
@@ -156,21 +176,14 @@ pub enum Transaction {
hash: [u8; 32],
},
/// The local view of slashes observed by the transaction's sender
SlashReport {
/// The slash points accrued by each validator
slash_points: Vec<u32>,
/// The transaction's signer and signature
signed: Signed,
},
/// Data from a signing protocol.
Sign {
/// The ID of the object being signed
id: VariantSignId,
/// The attempt number of this signing protocol
attempt: u32,
/// The label for this data within the signing protocol
label: Label,
label: SigningProtocolRound,
/// The data itself
///
/// There will be `n` blobs of data where `n` is the amount of key shares the validator sending
@@ -179,6 +192,14 @@ pub enum Transaction {
/// The transaction's signer and signature
signed: Signed,
},
/// The local view of slashes observed by the transaction's sender
SlashReport {
/// The slash points accrued by each validator
slash_points: Vec<u32>,
/// The transaction's signer and signature
signed: Signed,
},
}
impl ReadWrite for Transaction {
@@ -208,7 +229,8 @@ impl TransactionTrait for Transaction {
TransactionKind::Signed((b"DkgConfirmation", attempt).encode(), signed.nonce(1))
}
Transaction::CosignSubstrateBlock { .. } => TransactionKind::Provided("CosignSubstrateBlock"),
Transaction::Cosign { .. } => TransactionKind::Provided("CosignSubstrateBlock"),
Transaction::Cosigned { .. } => TransactionKind::Provided("Cosigned"),
Transaction::SubstrateBlock { .. } => TransactionKind::Provided("SubstrateBlock"),
Transaction::Batch { .. } => TransactionKind::Provided("Batch"),
@@ -240,6 +262,8 @@ impl TransactionTrait for Transaction {
impl Transaction {
// Sign a transaction
//
// Panics if signing a transaction type which isn't `TransactionKind::Signed`
pub fn sign<R: RngCore + CryptoRng>(
&mut self,
rng: &mut R,
@@ -254,7 +278,8 @@ impl Transaction {
Transaction::DkgConfirmationPreprocess { ref mut signed, .. } => signed,
Transaction::DkgConfirmationShare { ref mut signed, .. } => signed,
Transaction::CosignSubstrateBlock { .. } => panic!("signing CosignSubstrateBlock"),
Transaction::Cosign { .. } => panic!("signing CosignSubstrateBlock"),
Transaction::Cosigned { .. } => panic!("signing Cosigned"),
Transaction::SubstrateBlock { .. } => panic!("signing SubstrateBlock"),
Transaction::Batch { .. } => panic!("signing Batch"),

View File

@@ -106,7 +106,7 @@ pub struct Participation<C: Ciphersuite> {
impl<C: Ciphersuite> Participation<C> {
pub fn read<R: Read>(reader: &mut R, n: u16) -> io::Result<Self> {
// TODO: Replace `len` with some calculcation deterministic to the params
// TODO: Replace `len` with some calculation deterministic to the params
let mut len = [0; 4];
reader.read_exact(&mut len)?;
let len = usize::try_from(u32::from_le_bytes(len)).expect("<32-bit platform?");

View File

@@ -83,7 +83,7 @@ pub mod sign {
#[derive(Clone, Copy, PartialEq, Eq, Hash, Encode, Decode, BorshSerialize, BorshDeserialize)]
pub enum VariantSignId {
Cosign(u64),
Batch(u32),
Batch([u8; 32]),
SlashReport,
Transaction([u8; 32]),
}

View File

@@ -52,7 +52,7 @@ pub fn borsh_deserialize_signature<R: borsh::io::Read>(
// TODO: Remove this for solely Public?
#[derive(
Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Encode, Decode, MaxEncodedLen, TypeInfo,
Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug, Encode, Decode, MaxEncodedLen, TypeInfo,
)]
#[cfg_attr(feature = "std", derive(Zeroize))]
#[cfg_attr(feature = "borsh", derive(BorshSerialize, BorshDeserialize))]