Files
serai/coordinator/src/substrate/mod.rs
Luke Parker e4e4245ee3 One Round DKG (#589)
* Upstream GBP, divisor, circuit abstraction, and EC gadgets from FCMP++

* Initial eVRF implementation

Not quite done yet. It needs to communicate the resulting points and proofs to
extract them from the Pedersen Commitments in order to return those, and then
be tested.

* Add the openings of the PCs to the eVRF as necessary

* Add implementation of secq256k1

* Make DKG Encryption a bit more flexible

No longer requires the use of an EncryptionKeyMessage, and allows pre-defined
keys for encryption.

* Make NUM_BITS an argument for the field macro

* Have the eVRF take a Zeroizing private key

* Initial eVRF-based DKG

* Add embedwards25519 curve

* Inline the eVRF into the DKG library

Due to how we're handling share encryption, we'd either need two circuits or to
dedicate this circuit to the DKG. The latter makes sense at this time.

* Add documentation to the eVRF-based DKG

* Add paragraph claiming robustness

* Update to the new eVRF proof

* Finish routing the eVRF functionality

Still needs errors and serialization, along with a few other TODOs.

* Add initial eVRF DKG test

* Improve eVRF DKG

Updates how we calculcate verification shares, improves performance when
extracting multiple sets of keys, and adds more to the test for it.

* Start using a proper error for the eVRF DKG

* Resolve various TODOs

Supports recovering multiple key shares from the eVRF DKG.

Inlines two loops to save 2**16 iterations.

Adds support for creating a constant time representation of scalars < NUM_BITS.

* Ban zero ECDH keys, document non-zero requirements

* Implement eVRF traits, all the way up to the DKG, for secp256k1/ed25519

* Add Ristretto eVRF trait impls

* Support participating multiple times in the eVRF DKG

* Only participate once per key, not once per key share

* Rewrite processor key-gen around the eVRF DKG

Still a WIP.

* Finish routing the new key gen in the processor

Doesn't touch the tests, coordinator, nor Substrate yet.
`cargo +nightly fmt && cargo +nightly-2024-07-01 clippy --all-features -p serai-processor`
does pass.

* Deduplicate and better document in processor key_gen

* Update serai-processor tests to the new key gen

* Correct amount of yx coefficients, get processor key gen test to pass

* Add embedded elliptic curve keys to Substrate

* Update processor key gen tests to the eVRF DKG

* Have set_keys take signature_participants, not removed_participants

Now no one is removed from the DKG. Only `t` people publish the key however.

Uses a BitVec for an efficient encoding of the participants.

* Update the coordinator binary for the new DKG

This does not yet update any tests.

* Add sensible Debug to key_gen::[Processor, Coordinator]Message

* Have the DKG explicitly declare how to interpolate its shares

Removes the hack for MuSig where we multiply keys by the inverse of their
lagrange interpolation factor.

* Replace Interpolation::None with Interpolation::Constant

Allows the MuSig DKG to keep the secret share as the original private key,
enabling deriving FROST nonces consistently regardless of the MuSig context.

* Get coordinator tests to pass

* Update spec to the new DKG

* Get clippy to pass across the repo

* cargo machete

* Add an extra sleep to ensure expected ordering of `Participation`s

* Update orchestration

* Remove bad panic in coordinator

It expected ConfirmationShare to be n-of-n, not t-of-n.

* Improve documentation on  functions

* Update TX size limit

We now no longer have to support the ridiculous case of having 49 DKG
participations within a 101-of-150 DKG. It does remain quite high due to
needing to _sign_ so many times. It'd may be optimal for parties with multiple
key shares to independently send their preprocesses/shares (despite the
overhead that'll cause with signatures and the transaction structure).

* Correct error in the Processor spec document

* Update a few comments in the validator-sets pallet

* Send/Recv Participation one at a time

Sending all, then attempting to receive all in an expected order, wasn't working
even with notable delays between sending messages. This points to the mempool
not working as expected...

* Correct ThresholdKeys serialization in modular-frost test

* Updating existing TX size limit test for the new DKG parameters

* Increase time allowed for the DKG on the GH CI

* Correct construction of signature_participants in serai-client tests

Fault identified by akil.

* Further contextualize DkgConfirmer by ValidatorSet

Caught by a safety check we wouldn't reuse preprocesses across messages. That
raises the question of we were prior reusing preprocesses (reusing keys)?
Except that'd have caused a variety of signing failures (suggesting we had some
staggered timing avoiding it in practice but yes, this was possible in theory).

* Add necessary calls to set_embedded_elliptic_curve_key in coordinator set rotation tests

* Correct shimmed setting of a secq256k1 key

* cargo fmt

* Don't use `[0; 32]` for the embedded keys in the coordinator rotation test

The key_gen function expects the random values already decided.

* Big-endian secq256k1 scalars

Also restores the prior, safer, Encryption::register function.
2024-09-19 21:43:26 -04:00

584 lines
19 KiB
Rust

use core::{ops::Deref, time::Duration};
use std::{
sync::Arc,
collections::{HashSet, HashMap},
};
use zeroize::Zeroizing;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use serai_client::{
SeraiError, Block, Serai, TemporalSerai,
primitives::{BlockHash, EmbeddedEllipticCurve, NetworkId},
validator_sets::{primitives::ValidatorSet, ValidatorSetsEvent},
in_instructions::InInstructionsEvent,
coins::CoinsEvent,
};
use serai_db::DbTxn;
use processor_messages::SubstrateContext;
use tokio::{sync::mpsc, time::sleep};
use crate::{
Db,
processors::Processors,
tributary::{TributarySpec, SeraiDkgCompleted},
};
mod db;
pub use db::*;
mod cosign;
pub use cosign::*;
async fn in_set(
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: &TemporalSerai<'_>,
set: ValidatorSet,
) -> Result<Option<bool>, SeraiError> {
let Some(participants) = serai.validator_sets().participants(set.network).await? else {
return Ok(None);
};
let key = (Ristretto::generator() * key.deref()).to_bytes();
Ok(Some(participants.iter().any(|(participant, _)| participant.0 == key)))
}
async fn handle_new_set<D: Db>(
txn: &mut D::Transaction<'_>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
serai: &Serai,
block: &Block,
set: ValidatorSet,
) -> Result<(), SeraiError> {
if in_set(key, &serai.as_of(block.hash()), set)
.await?
.expect("NewSet for set which doesn't exist")
{
log::info!("present in set {:?}", set);
let validators;
let mut evrf_public_keys = vec![];
{
let serai = serai.as_of(block.hash());
let serai = serai.validator_sets();
let set_participants =
serai.participants(set.network).await?.expect("NewSet for set which doesn't exist");
validators = set_participants
.iter()
.map(|(k, w)| {
(
<Ristretto as Ciphersuite>::read_G::<&[u8]>(&mut k.0.as_ref())
.expect("invalid key registered as participant"),
u16::try_from(*w).unwrap(),
)
})
.collect::<Vec<_>>();
for (validator, _) in set_participants {
// This is only run for external networks which always do a DKG for Serai
let substrate = serai
.embedded_elliptic_curve_key(validator, EmbeddedEllipticCurve::Embedwards25519)
.await?
.expect("Serai called NewSet on a validator without an Embedwards25519 key");
// `embedded_elliptic_curves` is documented to have the second entry be the
// network-specific curve (if it exists and is distinct from Embedwards25519)
let network =
if let Some(embedded_elliptic_curve) = set.network.embedded_elliptic_curves().get(1) {
serai.embedded_elliptic_curve_key(validator, *embedded_elliptic_curve).await?.expect(
"Serai called NewSet on a validator without the embedded key required for the network",
)
} else {
substrate.clone()
};
evrf_public_keys.push((
<[u8; 32]>::try_from(substrate)
.expect("validator-sets pallet accepted a key of an invalid length"),
network,
));
}
};
let time = if let Ok(time) = block.time() {
time
} else {
assert_eq!(block.number(), 0);
// Use the next block's time
loop {
let Ok(Some(res)) = serai.finalized_block_by_number(1).await else {
sleep(Duration::from_secs(5)).await;
continue;
};
break res.time().unwrap();
}
};
// The block time is in milliseconds yet the Tributary is in seconds
let time = time / 1000;
// Since this block is in the past, and Tendermint doesn't play nice with starting chains after
// their start time (though it does eventually work), delay the start time by 120 seconds
// This is meant to handle ~20 blocks of lack of finalization for this first block
const SUBSTRATE_TO_TRIBUTARY_TIME_DELAY: u64 = 120;
let time = time + SUBSTRATE_TO_TRIBUTARY_TIME_DELAY;
let spec = TributarySpec::new(block.hash(), time, set, validators, evrf_public_keys);
log::info!("creating new tributary for {:?}", spec.set());
// Save it to the database now, not on the channel receiver's side, so this is safe against
// reboots
// If this txn finishes, and we reboot, then this'll be reloaded from active Tributaries
// If this txn doesn't finish, this will be re-fired
// If we waited to save to the DB, this txn may be finished, preventing re-firing, yet the
// prior fired event may have not been received yet
crate::ActiveTributaryDb::add_participating_in_tributary(txn, &spec);
new_tributary_spec.send(spec).unwrap();
} else {
log::info!("not present in new set {:?}", set);
}
Ok(())
}
async fn handle_batch_and_burns<Pro: Processors>(
txn: &mut impl DbTxn,
processors: &Pro,
serai: &Serai,
block: &Block,
) -> Result<(), SeraiError> {
// Track which networks had events with a Vec in ordr to preserve the insertion order
// While that shouldn't be needed, ensuring order never hurts, and may enable design choices
// with regards to Processor <-> Coordinator message passing
let mut networks_with_event = vec![];
let mut network_had_event = |burns: &mut HashMap<_, _>, batches: &mut HashMap<_, _>, network| {
// Don't insert this network multiple times
// A Vec is still used in order to maintain the insertion order
if !networks_with_event.contains(&network) {
networks_with_event.push(network);
burns.insert(network, vec![]);
batches.insert(network, vec![]);
}
};
let mut batch_block = HashMap::new();
let mut batches = HashMap::<NetworkId, Vec<u32>>::new();
let mut burns = HashMap::new();
let serai = serai.as_of(block.hash());
for batch in serai.in_instructions().batch_events().await? {
if let InInstructionsEvent::Batch { network, id, block: network_block, instructions_hash } =
batch
{
network_had_event(&mut burns, &mut batches, network);
BatchInstructionsHashDb::set(txn, network, id, &instructions_hash);
// Make sure this is the only Batch event for this network in this Block
assert!(batch_block.insert(network, network_block).is_none());
// Add the batch included by this block
batches.get_mut(&network).unwrap().push(id);
} else {
panic!("Batch event wasn't Batch: {batch:?}");
}
}
for burn in serai.coins().burn_with_instruction_events().await? {
if let CoinsEvent::BurnWithInstruction { from: _, instruction } = burn {
let network = instruction.balance.coin.network();
network_had_event(&mut burns, &mut batches, network);
// network_had_event should register an entry in burns
burns.get_mut(&network).unwrap().push(instruction);
} else {
panic!("Burn event wasn't Burn: {burn:?}");
}
}
assert_eq!(HashSet::<&_>::from_iter(networks_with_event.iter()).len(), networks_with_event.len());
for network in networks_with_event {
let network_latest_finalized_block = if let Some(block) = batch_block.remove(&network) {
block
} else {
// If it's had a batch or a burn, it must have had a block acknowledged
serai
.in_instructions()
.latest_block_for_network(network)
.await?
.expect("network had a batch/burn yet never set a latest block")
};
processors
.send(
network,
processor_messages::substrate::CoordinatorMessage::SubstrateBlock {
context: SubstrateContext {
serai_time: block.time().unwrap() / 1000,
network_latest_finalized_block,
},
block: block.number(),
burns: burns.remove(&network).unwrap(),
batches: batches.remove(&network).unwrap(),
},
)
.await;
}
Ok(())
}
// Handle a specific Substrate block, returning an error when it fails to get data
// (not blocking / holding)
#[allow(clippy::too_many_arguments)]
async fn handle_block<D: Db, Pro: Processors>(
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
perform_slash_report: &mpsc::UnboundedSender<ValidatorSet>,
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
processors: &Pro,
serai: &Serai,
block: Block,
) -> Result<(), SeraiError> {
let hash = block.hash();
// Define an indexed event ID.
let mut event_id = 0;
// If a new validator set was activated, create tributary/inform processor to do a DKG
for new_set in serai.as_of(hash).validator_sets().new_set_events().await? {
// Individually mark each event as handled so on reboot, we minimize duplicates
// Additionally, if the Serai connection also fails 1/100 times, this means a block with 1000
// events will successfully be incrementally handled
// (though the Serai connection should be stable, making this unnecessary)
let ValidatorSetsEvent::NewSet { set } = new_set else {
panic!("NewSet event wasn't NewSet: {new_set:?}");
};
// If this is Serai, do nothing
// We only coordinate/process external networks
if set.network == NetworkId::Serai {
continue;
}
if HandledEvent::is_unhandled(db, hash, event_id) {
log::info!("found fresh new set event {:?}", new_set);
let mut txn = db.txn();
handle_new_set::<D>(&mut txn, key, new_tributary_spec, serai, &block, set).await?;
HandledEvent::handle_event(&mut txn, hash, event_id);
txn.commit();
}
event_id += 1;
}
// If a key pair was confirmed, inform the processor
for key_gen in serai.as_of(hash).validator_sets().key_gen_events().await? {
if HandledEvent::is_unhandled(db, hash, event_id) {
log::info!("found fresh key gen event {:?}", key_gen);
let ValidatorSetsEvent::KeyGen { set, key_pair } = key_gen else {
panic!("KeyGen event wasn't KeyGen: {key_gen:?}");
};
let substrate_key = key_pair.0 .0;
processors
.send(
set.network,
processor_messages::substrate::CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext {
serai_time: block.time().unwrap() / 1000,
network_latest_finalized_block: serai
.as_of(block.hash())
.in_instructions()
.latest_block_for_network(set.network)
.await?
// The processor treats this as a magic value which will cause it to find a network
// block which has a time greater than or equal to the Serai time
.unwrap_or(BlockHash([0; 32])),
},
session: set.session,
key_pair,
},
)
.await;
// TODO: If we were in the set, yet were removed, drop the tributary
let mut txn = db.txn();
SeraiDkgCompleted::set(&mut txn, set, &substrate_key);
HandledEvent::handle_event(&mut txn, hash, event_id);
txn.commit();
}
event_id += 1;
}
for accepted_handover in serai.as_of(hash).validator_sets().accepted_handover_events().await? {
let ValidatorSetsEvent::AcceptedHandover { set } = accepted_handover else {
panic!("AcceptedHandover event wasn't AcceptedHandover: {accepted_handover:?}");
};
if set.network == NetworkId::Serai {
continue;
}
if HandledEvent::is_unhandled(db, hash, event_id) {
log::info!("found fresh accepted handover event {:?}", accepted_handover);
// TODO: This isn't atomic with the event handling
// Send a oneshot receiver so we can await the response?
perform_slash_report.send(set).unwrap();
let mut txn = db.txn();
HandledEvent::handle_event(&mut txn, hash, event_id);
txn.commit();
}
event_id += 1;
}
for retired_set in serai.as_of(hash).validator_sets().set_retired_events().await? {
let ValidatorSetsEvent::SetRetired { set } = retired_set else {
panic!("SetRetired event wasn't SetRetired: {retired_set:?}");
};
if set.network == NetworkId::Serai {
continue;
}
if HandledEvent::is_unhandled(db, hash, event_id) {
log::info!("found fresh set retired event {:?}", retired_set);
let mut txn = db.txn();
crate::ActiveTributaryDb::retire_tributary(&mut txn, set);
tributary_retired.send(set).unwrap();
HandledEvent::handle_event(&mut txn, hash, event_id);
txn.commit();
}
event_id += 1;
}
// Finally, tell the processor of acknowledged blocks/burns
// This uses a single event as unlike prior events which individually executed code, all
// following events share data collection
if HandledEvent::is_unhandled(db, hash, event_id) {
let mut txn = db.txn();
handle_batch_and_burns(&mut txn, processors, serai, &block).await?;
HandledEvent::handle_event(&mut txn, hash, event_id);
txn.commit();
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
async fn handle_new_blocks<D: Db, Pro: Processors>(
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
new_tributary_spec: &mpsc::UnboundedSender<TributarySpec>,
perform_slash_report: &mpsc::UnboundedSender<ValidatorSet>,
tributary_retired: &mpsc::UnboundedSender<ValidatorSet>,
processors: &Pro,
serai: &Serai,
next_block: &mut u64,
) -> Result<(), SeraiError> {
// Check if there's been a new Substrate block
let latest_number = serai.latest_finalized_block().await?.number();
// Advance the cosigning protocol
advance_cosign_protocol(db, key, serai, latest_number).await?;
// Reduce to the latest cosigned block
let latest_number = latest_number.min(LatestCosignedBlock::latest_cosigned_block(db));
if latest_number < *next_block {
return Ok(());
}
for b in *next_block ..= latest_number {
let block = serai
.finalized_block_by_number(b)
.await?
.expect("couldn't get block before the latest finalized block");
log::info!("handling substrate block {b}");
handle_block(
db,
key,
new_tributary_spec,
perform_slash_report,
tributary_retired,
processors,
serai,
block,
)
.await?;
*next_block += 1;
let mut txn = db.txn();
NextBlock::set(&mut txn, next_block);
txn.commit();
log::info!("handled substrate block {b}");
}
Ok(())
}
pub async fn scan_task<D: Db, Pro: Processors>(
mut db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
processors: Pro,
serai: Arc<Serai>,
new_tributary_spec: mpsc::UnboundedSender<TributarySpec>,
perform_slash_report: mpsc::UnboundedSender<ValidatorSet>,
tributary_retired: mpsc::UnboundedSender<ValidatorSet>,
) {
log::info!("scanning substrate");
let mut next_substrate_block = NextBlock::get(&db).unwrap_or_default();
/*
let new_substrate_block_notifier = {
let serai = &serai;
move || async move {
loop {
match serai.newly_finalized_block().await {
Ok(sub) => return sub,
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
};
*/
// TODO: Restore the above subscription-based system
// That would require moving serai-client from HTTP to websockets
let new_substrate_block_notifier = {
let serai = &serai;
move |next_substrate_block| async move {
loop {
match serai.latest_finalized_block().await {
Ok(latest) => {
if latest.header.number >= next_substrate_block {
return latest;
}
sleep(Duration::from_secs(3)).await;
}
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
};
loop {
// await the next block, yet if our notifier had an error, re-create it
{
let Ok(_) = tokio::time::timeout(
Duration::from_secs(60),
new_substrate_block_notifier(next_substrate_block),
)
.await
else {
// Timed out, which may be because Serai isn't finalizing or may be some issue with the
// notifier
if serai.latest_finalized_block().await.map(|block| block.number()).ok() ==
Some(next_substrate_block.saturating_sub(1))
{
log::info!("serai hasn't finalized a block in the last 60s...");
}
continue;
};
/*
// next_block is a Option<Result>
if next_block.and_then(Result::ok).is_none() {
substrate_block_notifier = new_substrate_block_notifier(next_substrate_block);
continue;
}
*/
}
match handle_new_blocks(
&mut db,
&key,
&new_tributary_spec,
&perform_slash_report,
&tributary_retired,
&processors,
&serai,
&mut next_substrate_block,
)
.await
{
Ok(()) => {}
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
/// Gets the expected ID for the next Batch.
///
/// Will log an error and apply a slight sleep on error, letting the caller simply immediately
/// retry.
pub(crate) async fn expected_next_batch(
serai: &Serai,
network: NetworkId,
) -> Result<u32, SeraiError> {
async fn expected_next_batch_inner(serai: &Serai, network: NetworkId) -> Result<u32, SeraiError> {
let serai = serai.as_of_latest_finalized_block().await?;
let last = serai.in_instructions().last_batch_for_network(network).await?;
Ok(if let Some(last) = last { last + 1 } else { 0 })
}
match expected_next_batch_inner(serai, network).await {
Ok(next) => Ok(next),
Err(e) => {
log::error!("couldn't get the expected next batch from substrate: {e:?}");
sleep(Duration::from_millis(100)).await;
Err(e)
}
}
}
/// Verifies `Batch`s which have already been indexed from Substrate.
///
/// Spins if a distinct `Batch` is detected on-chain.
///
/// This has a slight malleability in that doesn't verify *who* published a `Batch` is as expected.
/// This is deemed fine.
pub(crate) async fn verify_published_batches<D: Db>(
txn: &mut D::Transaction<'_>,
network: NetworkId,
optimistic_up_to: u32,
) -> Option<u32> {
// TODO: Localize from MainDb to SubstrateDb
let last = crate::LastVerifiedBatchDb::get(txn, network);
for id in last.map_or(0, |last| last + 1) ..= optimistic_up_to {
let Some(on_chain) = BatchInstructionsHashDb::get(txn, network, id) else {
break;
};
let off_chain = crate::ExpectedBatchDb::get(txn, network, id).unwrap();
if on_chain != off_chain {
// Halt operations on this network and spin, as this is a critical fault
loop {
log::error!(
"{}! network: {:?} id: {} off-chain: {} on-chain: {}",
"on-chain batch doesn't match off-chain",
network,
id,
hex::encode(off_chain),
hex::encode(on_chain),
);
sleep(Duration::from_secs(60)).await;
}
}
crate::LastVerifiedBatchDb::set(txn, network, &id);
}
crate::LastVerifiedBatchDb::get(txn, network)
}