Perform MuSig signing of generated keys

This commit is contained in:
Luke Parker
2023-08-14 06:08:55 -04:00
parent 6b41c91dc2
commit 5e02f936e4
12 changed files with 653 additions and 175 deletions

View File

@@ -1,7 +1,11 @@
use std::io::Read;
use scale::{Encode, Decode};
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use serai_client::validator_sets::primitives::KeyPair;
pub use serai_db::*;
#[derive(Debug)]
@@ -51,6 +55,22 @@ impl<D: Db> TributaryDb<D> {
})
}
fn currently_completing_key_pair_key(genesis: [u8; 32]) -> Vec<u8> {
Self::tributary_key(b"currently_completing_key_pair", genesis)
}
pub fn save_currently_completing_key_pair(
txn: &mut D::Transaction<'_>,
genesis: [u8; 32],
key_pair: &KeyPair,
) {
txn.put(Self::currently_completing_key_pair_key(genesis), key_pair.encode())
}
pub fn currently_completing_key_pair<G: Get>(getter: &G, genesis: [u8; 32]) -> Option<KeyPair> {
getter
.get(Self::currently_completing_key_pair_key(genesis))
.map(|bytes| KeyPair::decode(&mut bytes.as_slice()).unwrap())
}
fn recognized_id_key(label: &'static str, genesis: [u8; 32], id: [u8; 32]) -> Vec<u8> {
Self::tributary_key(b"recognized", [label.as_bytes(), genesis.as_ref(), id.as_ref()].concat())
}

View File

@@ -218,7 +218,14 @@ impl ReadWrite for SignData {
pub enum Transaction {
// Once this completes successfully, no more instances should be created.
DkgCommitments(u32, Vec<u8>, Signed),
DkgShares(u32, Participant, HashMap<Participant, Vec<u8>>, Signed),
DkgShares {
attempt: u32,
sender_i: Participant,
shares: HashMap<Participant, Vec<u8>>,
confirmation_nonces: [u8; 64],
signed: Signed,
},
DkgConfirmed(u32, [u8; 32], Signed),
// When an external block is finalized, we can allow the associated batch IDs
// Commits to the full block so eclipsed nodes don't continue on their eclipsed state
@@ -289,36 +296,53 @@ impl ReadWrite for Transaction {
shares
};
let mut confirmation_nonces = [0; 64];
reader.read_exact(&mut confirmation_nonces)?;
let signed = Signed::read(reader)?;
Ok(Transaction::DkgShares(
Ok(Transaction::DkgShares {
attempt,
Participant::new(sender_i)
sender_i: Participant::new(sender_i)
.ok_or_else(|| io::Error::new(io::ErrorKind::Other, "invalid sender participant"))?,
shares,
confirmation_nonces,
signed,
))
})
}
2 => {
let mut attempt = [0; 4];
reader.read_exact(&mut attempt)?;
let attempt = u32::from_le_bytes(attempt);
let mut confirmation_share = [0; 32];
reader.read_exact(&mut confirmation_share)?;
let signed = Signed::read(reader)?;
Ok(Transaction::DkgConfirmed(attempt, confirmation_share, signed))
}
3 => {
let mut block = [0; 32];
reader.read_exact(&mut block)?;
Ok(Transaction::ExternalBlock(block))
}
3 => {
4 => {
let mut block = [0; 8];
reader.read_exact(&mut block)?;
Ok(Transaction::SubstrateBlock(u64::from_le_bytes(block)))
}
4 => SignData::read(reader).map(Transaction::BatchPreprocess),
5 => SignData::read(reader).map(Transaction::BatchShare),
5 => SignData::read(reader).map(Transaction::BatchPreprocess),
6 => SignData::read(reader).map(Transaction::BatchShare),
6 => SignData::read(reader).map(Transaction::SignPreprocess),
7 => SignData::read(reader).map(Transaction::SignShare),
7 => SignData::read(reader).map(Transaction::SignPreprocess),
8 => SignData::read(reader).map(Transaction::SignShare),
8 => {
9 => {
let mut plan = [0; 32];
reader.read_exact(&mut plan)?;
@@ -349,7 +373,7 @@ impl ReadWrite for Transaction {
signed.write(writer)
}
Transaction::DkgShares(attempt, sender_i, shares, signed) => {
Transaction::DkgShares { attempt, sender_i, shares, confirmation_nonces, signed } => {
writer.write_all(&[1])?;
writer.write_all(&attempt.to_le_bytes())?;
@@ -389,38 +413,46 @@ impl ReadWrite for Transaction {
writer.write_all(share)?;
}
writer.write_all(confirmation_nonces)?;
signed.write(writer)
}
Transaction::DkgConfirmed(attempt, share, signed) => {
writer.write_all(&[2])?;
writer.write_all(&attempt.to_le_bytes())?;
writer.write_all(share)?;
signed.write(writer)
}
Transaction::ExternalBlock(block) => {
writer.write_all(&[2])?;
writer.write_all(&[3])?;
writer.write_all(block)
}
Transaction::SubstrateBlock(block) => {
writer.write_all(&[3])?;
writer.write_all(&[4])?;
writer.write_all(&block.to_le_bytes())
}
Transaction::BatchPreprocess(data) => {
writer.write_all(&[4])?;
writer.write_all(&[5])?;
data.write(writer)
}
Transaction::BatchShare(data) => {
writer.write_all(&[5])?;
writer.write_all(&[6])?;
data.write(writer)
}
Transaction::SignPreprocess(data) => {
writer.write_all(&[6])?;
data.write(writer)
}
Transaction::SignShare(data) => {
writer.write_all(&[7])?;
data.write(writer)
}
Transaction::SignCompleted(plan, tx, signed) => {
Transaction::SignShare(data) => {
writer.write_all(&[8])?;
data.write(writer)
}
Transaction::SignCompleted(plan, tx, signed) => {
writer.write_all(&[9])?;
writer.write_all(plan)?;
writer.write_all(&[u8::try_from(tx.len()).expect("tx hash length exceed 255 bytes")])?;
writer.write_all(tx)?;
@@ -434,7 +466,8 @@ impl TransactionTrait for Transaction {
fn kind(&self) -> TransactionKind<'_> {
match self {
Transaction::DkgCommitments(_, _, signed) => TransactionKind::Signed(signed),
Transaction::DkgShares(_, _, _, signed) => TransactionKind::Signed(signed),
Transaction::DkgShares { signed, .. } => TransactionKind::Signed(signed),
Transaction::DkgConfirmed(_, _, signed) => TransactionKind::Signed(signed),
Transaction::ExternalBlock(_) => TransactionKind::Provided("external"),
Transaction::SubstrateBlock(_) => TransactionKind::Provided("serai"),
@@ -492,7 +525,8 @@ impl Transaction {
fn signed(tx: &mut Transaction) -> &mut Signed {
match tx {
Transaction::DkgCommitments(_, _, ref mut signed) => signed,
Transaction::DkgShares(_, _, _, ref mut signed) => signed,
Transaction::DkgShares { ref mut signed, .. } => signed,
Transaction::DkgConfirmed(_, _, ref mut signed) => signed,
Transaction::ExternalBlock(_) => panic!("signing ExternalBlock"),
Transaction::SubstrateBlock(_) => panic!("signing SubstrateBlock"),

View File

@@ -1,9 +1,26 @@
use core::ops::Deref;
use core::{ops::Deref, future::Future};
use std::collections::HashMap;
use zeroize::Zeroizing;
use rand_core::SeedableRng;
use rand_chacha::ChaCha20Rng;
use transcript::{Transcript, RecommendedTranscript};
use ciphersuite::{Ciphersuite, Ristretto};
use frost::{
FrostError,
dkg::{Participant, musig::musig},
sign::*,
};
use frost_schnorrkel::Schnorrkel;
use serai_client::{
Signature,
validator_sets::primitives::{ValidatorSet, KeyPair, musig_context, set_keys_message},
subxt::utils::Encoded,
Serai,
};
use tokio::sync::mpsc::UnboundedSender;
@@ -15,7 +32,7 @@ use processor_messages::{
coordinator, CoordinatorMessage,
};
use serai_db::DbTxn;
use serai_db::{Get, DbTxn};
use crate::{
Db,
@@ -23,6 +40,188 @@ use crate::{
tributary::{TributaryDb, TributarySpec, Transaction},
};
const DKG_CONFIRMATION_NONCES: &[u8] = b"dkg_confirmation_nonces";
const DKG_CONFIRMATION_SHARES: &[u8] = b"dkg_confirmation_shares";
// Instead of maintaing state, this simply re-creates the machine(s) in-full on every call.
// This simplifies data flow and prevents requiring multiple paths.
// While more expensive, this only runs an O(n) algorithm, which is tolerable to run multiple
// times.
struct DkgConfirmer;
impl DkgConfirmer {
fn preprocess_internal(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
) -> (AlgorithmSignMachine<Ristretto, Schnorrkel>, [u8; 64]) {
// TODO: Does Substrate already have a validator-uniqueness check?
let validators = spec.validators().iter().map(|val| val.0).collect::<Vec<_>>();
let context = musig_context(spec.set());
let mut chacha = ChaCha20Rng::from_seed({
let mut entropy_transcript = RecommendedTranscript::new(b"DkgConfirmer Entropy");
entropy_transcript.append_message(b"spec", spec.serialize());
entropy_transcript.append_message(b"key", Zeroizing::new(key.to_bytes()));
// TODO: This is incredibly insecure unless message-bound (or bound via the attempt)
Zeroizing::new(entropy_transcript).rng_seed(b"preprocess")
});
let (machine, preprocess) = AlgorithmMachine::new(
Schnorrkel::new(b"substrate"),
musig(&context, key, &validators)
.expect("confirming the DKG for a set we aren't in/validator present multiple times")
.into(),
)
.preprocess(&mut chacha);
(machine, preprocess.serialize().try_into().unwrap())
}
// Get the preprocess for this confirmation.
fn preprocess(spec: &TributarySpec, key: &Zeroizing<<Ristretto as Ciphersuite>::F>) -> [u8; 64] {
Self::preprocess_internal(spec, key).1
}
fn share_internal(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
preprocesses: HashMap<Participant, Vec<u8>>,
key_pair: &KeyPair,
) -> Result<(AlgorithmSignatureMachine<Ristretto, Schnorrkel>, [u8; 32]), Participant> {
let machine = Self::preprocess_internal(spec, key).0;
let preprocesses = preprocesses
.into_iter()
.map(|(p, preprocess)| {
machine
.read_preprocess(&mut preprocess.as_slice())
.map(|preprocess| (p, preprocess))
.map_err(|_| p)
})
.collect::<Result<HashMap<_, _>, _>>()?;
let (machine, share) = machine
.sign(preprocesses, &set_keys_message(&spec.set(), key_pair))
.map_err(|e| match e {
FrostError::InternalError(e) => unreachable!("FrostError::InternalError {e}"),
FrostError::InvalidParticipant(_, _) |
FrostError::InvalidSigningSet(_) |
FrostError::InvalidParticipantQuantity(_, _) |
FrostError::DuplicatedParticipant(_) |
FrostError::MissingParticipant(_) => unreachable!("{e:?}"),
FrostError::InvalidPreprocess(p) | FrostError::InvalidShare(p) => p,
})?;
Ok((machine, share.serialize().try_into().unwrap()))
}
// Get the share for this confirmation, if the preprocesses are valid.
fn share(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
preprocesses: HashMap<Participant, Vec<u8>>,
key_pair: &KeyPair,
) -> Result<[u8; 32], Participant> {
Self::share_internal(spec, key, preprocesses, key_pair).map(|(_, share)| share)
}
fn complete(
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
preprocesses: HashMap<Participant, Vec<u8>>,
key_pair: &KeyPair,
shares: HashMap<Participant, Vec<u8>>,
) -> Result<[u8; 64], Participant> {
let machine = Self::share_internal(spec, key, preprocesses, key_pair)
.expect("trying to complete a machine which failed to preprocess")
.0;
let shares = shares
.into_iter()
.map(|(p, share)| {
machine.read_share(&mut share.as_slice()).map(|share| (p, share)).map_err(|_| p)
})
.collect::<Result<HashMap<_, _>, _>>()?;
let signature = machine.complete(shares).map_err(|e| match e {
FrostError::InternalError(e) => unreachable!("FrostError::InternalError {e}"),
FrostError::InvalidParticipant(_, _) |
FrostError::InvalidSigningSet(_) |
FrostError::InvalidParticipantQuantity(_, _) |
FrostError::DuplicatedParticipant(_) |
FrostError::MissingParticipant(_) => unreachable!("{e:?}"),
FrostError::InvalidPreprocess(p) | FrostError::InvalidShare(p) => p,
})?;
Ok(signature.to_bytes())
}
}
#[allow(clippy::too_many_arguments)] // TODO
fn read_known_to_exist_data<D: Db, G: Get>(
getter: &G,
spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
label: &'static [u8],
id: [u8; 32],
needed: u16,
attempt: u32,
bytes: Vec<u8>,
signed: Option<&Signed>,
) -> HashMap<Participant, Vec<u8>> {
let mut data = HashMap::new();
for validator in spec.validators().iter().map(|validator| validator.0) {
data.insert(
spec.i(validator).unwrap(),
if Some(&validator) == signed.map(|signed| &signed.signer) {
bytes.clone()
} else if let Some(data) =
TributaryDb::<D>::data(label, getter, spec.genesis(), id, attempt, validator)
{
data
} else {
continue;
},
);
}
assert_eq!(data.len(), usize::from(needed));
// Remove our own piece of data
assert!(data
.remove(
&spec
.i(Ristretto::generator() * key.deref())
.expect("handling a message for a Tributary we aren't part of")
)
.is_some());
data
}
pub fn dkg_confirmation_nonces(
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
spec: &TributarySpec,
) -> [u8; 64] {
DkgConfirmer::preprocess(spec, key)
}
#[allow(clippy::needless_pass_by_ref_mut)]
pub fn generated_key_pair<D: Db>(
txn: &mut D::Transaction<'_>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
spec: &TributarySpec,
key_pair: &KeyPair,
) -> Result<[u8; 32], Participant> {
TributaryDb::<D>::save_currently_completing_key_pair(txn, spec.genesis(), key_pair);
let attempt = 0; // TODO
let preprocesses = read_known_to_exist_data::<D, _>(
txn,
spec,
key,
DKG_CONFIRMATION_NONCES,
[0; 32],
spec.n(),
attempt,
vec![],
None,
);
DkgConfirmer::share(spec, key, preprocesses, key_pair)
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum RecognizedIdType {
Block,
@@ -31,11 +230,17 @@ pub enum RecognizedIdType {
// Handle a specific Tributary block
#[allow(clippy::needless_pass_by_ref_mut)] // False positive?
async fn handle_block<D: Db, Pro: Processors>(
async fn handle_block<
D: Db,
Pro: Processors,
F: Future<Output = ()>,
PST: Fn(ValidatorSet, Encoded) -> F,
>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
processors: &Pro,
publish_serai_tx: PST,
spec: &TributarySpec,
block: Block<Transaction>,
) {
@@ -70,92 +275,79 @@ async fn handle_block<D: Db, Pro: Processors>(
}
}
let mut handle = |zone: Zone,
label,
needed,
id,
attempt,
mut bytes: Vec<u8>,
signed: Signed| {
if zone == Zone::Dkg {
// Since Dkg doesn't have an ID, solely attempts, this should just be [0; 32]
assert_eq!(id, [0; 32], "DKG, which shouldn't have IDs, had a non-0 ID");
} else if !TributaryDb::<D>::recognized_id(&txn, zone.label(), genesis, id) {
// TODO: Full slash
todo!();
}
// If they've already published a TX for this attempt, slash
if let Some(data) = TributaryDb::<D>::data(label, &txn, genesis, id, attempt, signed.signer)
{
if data != bytes {
let handle =
|txn: &mut _, zone: Zone, label, needed, id, attempt, bytes: Vec<u8>, signed: &Signed| {
if zone == Zone::Dkg {
// Since Dkg doesn't have an ID, solely attempts, this should just be [0; 32]
assert_eq!(id, [0; 32], "DKG, which shouldn't have IDs, had a non-0 ID");
} else if !TributaryDb::<D>::recognized_id(txn, zone.label(), genesis, id) {
// TODO: Full slash
todo!();
}
// TODO: Slash
return None;
}
// If they've already published a TX for this attempt, slash
if let Some(data) =
TributaryDb::<D>::data(label, txn, genesis, id, attempt, signed.signer)
{
if data != bytes {
// TODO: Full slash
todo!();
}
// If the attempt is lesser than the blockchain's, slash
let curr_attempt = TributaryDb::<D>::attempt(&txn, genesis, id);
if attempt < curr_attempt {
// TODO: Slash for being late
return None;
}
if attempt > curr_attempt {
// TODO: Full slash
todo!();
}
// TODO: We can also full slash if shares before all commitments, or share before the
// necessary preprocesses
// TODO: If this is shares, we need to check they are part of the selected signing set
// Store this data
let received =
TributaryDb::<D>::set_data(label, &mut txn, genesis, id, attempt, signed.signer, &bytes);
// If we have all the needed commitments/preprocesses/shares, tell the processor
// TODO: This needs to be coded by weight, not by validator count
if received == needed {
let mut data = HashMap::new();
for validator in spec.validators().iter().map(|validator| validator.0) {
data.insert(
spec.i(validator).unwrap(),
if validator == signed.signer {
bytes.split_off(0)
} else if let Some(data) =
TributaryDb::<D>::data(label, &txn, genesis, id, attempt, validator)
{
data
} else {
continue;
},
);
// TODO: Slash
return None;
}
assert_eq!(data.len(), usize::from(needed));
// Remove our own piece of data
assert!(data
.remove(
&spec
.i(Ristretto::generator() * key.deref())
.expect("handling a message for a Tributary we aren't part of")
)
.is_some());
// If the attempt is lesser than the blockchain's, slash
let curr_attempt = TributaryDb::<D>::attempt(txn, genesis, id);
if attempt < curr_attempt {
// TODO: Slash for being late
return None;
}
if attempt > curr_attempt {
// TODO: Full slash
todo!();
}
return Some(data);
}
None
};
// TODO: We can also full slash if shares before all commitments, or share before the
// necessary preprocesses
// TODO: If this is shares, we need to check they are part of the selected signing set
// Store this data
let received =
TributaryDb::<D>::set_data(label, txn, genesis, id, attempt, signed.signer, &bytes);
// If we have all the needed commitments/preprocesses/shares, tell the processor
// TODO: This needs to be coded by weight, not by validator count
if received == needed {
return Some(read_known_to_exist_data::<D, _>(
txn,
spec,
key,
label,
id,
needed,
attempt,
bytes,
Some(signed),
));
}
None
};
match tx {
Transaction::DkgCommitments(attempt, bytes, signed) => {
if let Some(commitments) =
handle(Zone::Dkg, b"dkg_commitments", spec.n(), [0; 32], attempt, bytes, signed)
{
if let Some(commitments) = handle(
&mut txn,
Zone::Dkg,
b"dkg_commitments",
spec.n(),
[0; 32],
attempt,
bytes,
&signed,
) {
log::info!("got all DkgCommitments for {}", hex::encode(genesis));
processors
.send(
@@ -169,7 +361,7 @@ async fn handle_block<D: Db, Pro: Processors>(
}
}
Transaction::DkgShares(attempt, sender_i, mut shares, signed) => {
Transaction::DkgShares { attempt, sender_i, mut shares, confirmation_nonces, signed } => {
if sender_i !=
spec
.i(signed.signer)
@@ -192,10 +384,21 @@ async fn handle_block<D: Db, Pro: Processors>(
// within the valid range will be the sender's i
let bytes = if sender_i == our_i { vec![] } else { shares.remove(&our_i).unwrap() };
let confirmation_nonces = handle(
&mut txn,
Zone::Dkg,
DKG_CONFIRMATION_NONCES,
spec.n(),
[0; 32],
attempt,
confirmation_nonces.to_vec(),
&signed,
);
if let Some(shares) =
handle(Zone::Dkg, b"dkg_shares", spec.n(), [0; 32], attempt, bytes, signed)
handle(&mut txn, Zone::Dkg, b"dkg_shares", spec.n(), [0; 32], attempt, bytes, &signed)
{
log::info!("got all DkgShares for {}", hex::encode(genesis));
assert!(confirmation_nonces.is_some());
processors
.send(
spec.set().network,
@@ -205,6 +408,53 @@ async fn handle_block<D: Db, Pro: Processors>(
}),
)
.await;
} else {
assert!(confirmation_nonces.is_none());
}
}
Transaction::DkgConfirmed(attempt, shares, signed) => {
if let Some(shares) = handle(
&mut txn,
Zone::Dkg,
DKG_CONFIRMATION_SHARES,
spec.n(),
[0; 32],
attempt,
shares.to_vec(),
&signed,
) {
log::info!("got all DkgConfirmed for {}", hex::encode(genesis));
let preprocesses = read_known_to_exist_data::<D, _>(
&txn,
spec,
key,
DKG_CONFIRMATION_NONCES,
[0; 32],
spec.n(),
attempt,
vec![],
None,
);
let key_pair = TributaryDb::<D>::currently_completing_key_pair(&txn, genesis)
.unwrap_or_else(|| {
panic!(
"in DkgConfirmed handling, which happens after everyone {}",
"(including us) fires DkgConfirmed, yet no confirming key pair"
)
});
let Ok(sig) = DkgConfirmer::complete(spec, key, preprocesses, &key_pair, shares) else {
// TODO: Full slash
todo!();
};
publish_serai_tx(
spec.set(),
Serai::set_validator_set_keys(spec.set().network, key_pair, Signature(sig)),
)
.await;
}
}
@@ -232,13 +482,14 @@ async fn handle_block<D: Db, Pro: Processors>(
Transaction::BatchPreprocess(data) => {
if let Some(preprocesses) = handle(
&mut txn,
Zone::Batch,
b"batch_preprocess",
spec.t(),
data.plan,
data.attempt,
data.data,
data.signed,
&data.signed,
) {
processors
.send(
@@ -255,13 +506,14 @@ async fn handle_block<D: Db, Pro: Processors>(
}
Transaction::BatchShare(data) => {
if let Some(shares) = handle(
&mut txn,
Zone::Batch,
b"batch_share",
spec.t(),
data.plan,
data.attempt,
data.data,
data.signed,
&data.signed,
) {
processors
.send(
@@ -280,13 +532,14 @@ async fn handle_block<D: Db, Pro: Processors>(
Transaction::SignPreprocess(data) => {
if let Some(preprocesses) = handle(
&mut txn,
Zone::Sign,
b"sign_preprocess",
spec.t(),
data.plan,
data.attempt,
data.data,
data.signed,
&data.signed,
) {
processors
.send(
@@ -301,13 +554,14 @@ async fn handle_block<D: Db, Pro: Processors>(
}
Transaction::SignShare(data) => {
if let Some(shares) = handle(
&mut txn,
Zone::Sign,
b"sign_share",
spec.t(),
data.plan,
data.attempt,
data.data,
data.signed,
&data.signed,
) {
processors
.send(
@@ -332,11 +586,17 @@ async fn handle_block<D: Db, Pro: Processors>(
// TODO: Trigger any necessary re-attempts
}
pub async fn handle_new_blocks<D: Db, Pro: Processors>(
pub async fn handle_new_blocks<
D: Db,
Pro: Processors,
F: Future<Output = ()>,
PST: Clone + Fn(ValidatorSet, Encoded) -> F,
>(
db: &mut TributaryDb<D>,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: &UnboundedSender<([u8; 32], RecognizedIdType, [u8; 32])>,
processors: &Pro,
publish_serai_tx: PST,
spec: &TributarySpec,
tributary: &TributaryReader<D, Transaction>,
) {
@@ -344,7 +604,7 @@ pub async fn handle_new_blocks<D: Db, Pro: Processors>(
let mut last_block = db.last_block(genesis);
while let Some(next) = tributary.block_after(&last_block) {
let block = tributary.block(&next).unwrap();
handle_block(db, key, recognized_id, processors, spec, block).await;
handle_block(db, key, recognized_id, processors, publish_serai_tx.clone(), spec, block).await;
last_block = next;
db.set_last_block(genesis, next);
}