Break handle_processor_messages out to handle_processor_message, move a helper fn to substrate

This commit is contained in:
Luke Parker
2023-10-13 23:36:07 -04:00
parent 80e5ca9328
commit 7275a95907
2 changed files with 606 additions and 591 deletions

View File

@@ -18,7 +18,11 @@ use frost::Participant;
use serai_db::{DbTxn, Db}; use serai_db::{DbTxn, Db};
use serai_env as env; use serai_env as env;
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet, Public, Serai}; use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
Public, Serai,
};
use message_queue::{Service, client::MessageQueue}; use message_queue::{Service, client::MessageQueue};
@@ -108,19 +112,18 @@ async fn add_tributary<D: Db, Pro: Processors, P: P2p>(
} }
async fn publish_signed_transaction<D: Db, P: P2p>( async fn publish_signed_transaction<D: Db, P: P2p>(
db: &mut D, txn: &mut D::Transaction<'_>,
tributary: &Tributary<D, Transaction, P>, tributary: &Tributary<D, Transaction, P>,
tx: Transaction, tx: Transaction,
) { ) {
log::debug!("publishing transaction {}", hex::encode(tx.hash())); log::debug!("publishing transaction {}", hex::encode(tx.hash()));
let mut txn = db.txn();
let signer = if let TransactionKind::Signed(signed) = tx.kind() { let signer = if let TransactionKind::Signed(signed) = tx.kind() {
let signer = signed.signer; let signer = signed.signer;
// Safe as we should deterministically create transactions, meaning if this is already on-disk, // Safe as we should deterministically create transactions, meaning if this is already on-disk,
// it's what we're saving now // it's what we're saving now
MainDb::<D>::save_signed_transaction(&mut txn, signed.nonce, tx); MainDb::<D>::save_signed_transaction(txn, signed.nonce, tx);
signer signer
} else { } else {
@@ -130,7 +133,7 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
// If we're trying to publish 5, when the last transaction published was 3, this will delay // If we're trying to publish 5, when the last transaction published was 3, this will delay
// publication until the point in time we publish 4 // publication until the point in time we publish 4
while let Some(tx) = MainDb::<D>::take_signed_transaction( while let Some(tx) = MainDb::<D>::take_signed_transaction(
&mut txn, txn,
tributary tributary
.next_nonce(signer) .next_nonce(signer)
.await .await
@@ -142,71 +145,20 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
// Our use case only calls this function sequentially // Our use case only calls this function sequentially
assert!(tributary.add_transaction(tx).await, "created an invalid transaction"); assert!(tributary.add_transaction(tx).await, "created an invalid transaction");
} }
txn.commit();
} }
/// Verifies `Batch`s which have already been indexed from Substrate. async fn handle_processor_message<D: Db, P: P2p>(
async fn verify_published_batches<D: Db>( db: &mut D,
txn: &mut D::Transaction<'_>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: &Serai,
tributaries: &HashMap<Session, ActiveTributary<D, P>>,
network: NetworkId, network: NetworkId,
optimistic_up_to: u32, msg: &processors::Message,
) -> Option<u32> { ) -> bool {
let last = MainDb::<D>::last_verified_batch(txn, network); if MainDb::<D>::handled_message(db, msg.network, msg.id) {
for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to { return true;
let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(txn, network, id) else {
break;
};
let off_chain = MainDb::<D>::expected_batch(txn, network, id).unwrap();
if on_chain != off_chain {
// Halt operations on this network and spin, as this is a critical fault
loop {
log::error!(
"{}! network: {:?} id: {} off-chain: {} on-chain: {}",
"on-chain batch doesn't match off-chain",
network,
id,
hex::encode(off_chain),
hex::encode(on_chain),
);
sleep(Duration::from_secs(60)).await;
}
}
MainDb::<D>::save_last_verified_batch(txn, network, id);
} }
MainDb::<D>::last_verified_batch(txn, network)
}
async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
mut db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>,
mut processors: Pro,
network: NetworkId,
mut new_tributary: mpsc::UnboundedReceiver<ActiveTributary<D, P>>,
) {
let mut db_clone = db.clone(); // Enables cloning the DB while we have a txn
let pub_key = Ristretto::generator() * key.deref();
let mut tributaries = HashMap::new();
loop {
match new_tributary.try_recv() {
Ok(tributary) => {
let set = tributary.spec.set();
assert_eq!(set.network, network);
tributaries.insert(set.session, tributary);
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
panic!("handle_processor_messages new_tributary sender closed")
}
}
// TODO: Check this ID is sane (last handled ID or expected next ID)
let msg = processors.recv(network).await;
if !MainDb::<D>::handled_message(&db, msg.network, msg.id) {
let mut txn = db.txn(); let mut txn = db.txn();
let mut relevant_tributary = match &msg.msg { let mut relevant_tributary = match &msg.msg {
@@ -299,7 +251,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
// will be before we get a `SignedBatch` // will be before we get a `SignedBatch`
// It is, however, incremental // It is, however, incremental
// When we need a complete version, we use another call, continuously called as-needed // When we need a complete version, we use another call, continuously called as-needed
verify_published_batches::<D>(&mut txn, msg.network, this_batch_id).await; substrate::verify_published_batches::<D>(&mut txn, msg.network, this_batch_id).await;
None None
} }
@@ -317,29 +269,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
MainDb::<D>::save_batch(&mut txn, batch.clone()); MainDb::<D>::save_batch(&mut txn, batch.clone());
// Get the next-to-execute batch ID // Get the next-to-execute batch ID
async fn get_next(serai: &Serai, network: NetworkId) -> u32 { let mut next = substrate::get_expected_next_batch(serai, network).await;
let mut first = true;
loop {
if !first {
log::error!(
"{} {network:?}",
"couldn't connect to Serai node to get the next batch ID for",
);
sleep(Duration::from_secs(5)).await;
}
first = false;
let Ok(latest_block) = serai.get_latest_block().await else {
continue;
};
let Ok(last) = serai.get_last_batch_for_network(latest_block.hash(), network).await
else {
continue;
};
break if let Some(last) = last { last + 1 } else { 0 };
}
}
let mut next = get_next(&serai, network).await;
// Since we have a new batch, publish all batches yet to be published to Serai // Since we have a new batch, publish all batches yet to be published to Serai
// This handles the edge-case where batch n+1 is signed before batch n is // This handles the edge-case where batch n+1 is signed before batch n is
@@ -353,16 +283,12 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
let last_id = batches.back().map(|batch| batch.batch.id); let last_id = batches.back().map(|batch| batch.batch.id);
while let Some(batch) = batches.pop_front() { while let Some(batch) = batches.pop_front() {
// If this Batch should no longer be published, continue // If this Batch should no longer be published, continue
if get_next(&serai, network).await > batch.batch.id { if substrate::get_expected_next_batch(serai, network).await > batch.batch.id {
continue; continue;
} }
let tx = Serai::execute_batch(batch.clone()); let tx = Serai::execute_batch(batch.clone());
log::debug!( log::debug!("attempting to publish batch {:?} {}", batch.batch.network, batch.batch.id,);
"attempting to publish batch {:?} {}",
batch.batch.network,
batch.batch.id,
);
// This publish may fail if this transactions already exists in the mempool, which is // This publish may fail if this transactions already exists in the mempool, which is
// possible, or if this batch was already executed on-chain // possible, or if this batch was already executed on-chain
// Either case will have eventual resolution and be handled by the above check on if // Either case will have eventual resolution and be handled by the above check on if
@@ -390,7 +316,8 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
// Verify the `Batch`s we just published // Verify the `Batch`s we just published
if let Some(last_id) = last_id { if let Some(last_id) = last_id {
loop { loop {
let verified = verify_published_batches::<D>(&mut txn, msg.network, last_id).await; let verified =
substrate::verify_published_batches::<D>(&mut txn, msg.network, last_id).await;
if verified == Some(last_id) { if verified == Some(last_id) {
break; break;
} }
@@ -410,6 +337,11 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
// time, if we ever actually participate in a handover, we will verify *all* // time, if we ever actually participate in a handover, we will verify *all*
// prior `Batch`s, including the ones which would've been explicitly verified // prior `Batch`s, including the ones which would've been explicitly verified
// then // then
//
// We should only declare this session relevant if it's relevant to us
// We only set handover `Batch`s when we're trying to produce said `Batch`, so this
// would be a `Batch` we were involved in the production of
// Accordingly, iy's relevant
relevant = Some(set.session); relevant = Some(set.session);
} }
} }
@@ -419,11 +351,10 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
}, },
}; };
// If we have a relevant Tributary, check it's actually still relevant and has yet to be // If we have a relevant Tributary, check it's actually still relevant and has yet to be retired
// retired
if let Some(relevant_tributary_value) = relevant_tributary { if let Some(relevant_tributary_value) = relevant_tributary {
if !is_active_set( if !is_active_set(
&serai, serai,
ValidatorSet { network: msg.network, session: relevant_tributary_value }, ValidatorSet { network: msg.network, session: relevant_tributary_value },
) )
.await .await
@@ -436,30 +367,28 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
if let Some(relevant_tributary) = relevant_tributary { if let Some(relevant_tributary) = relevant_tributary {
// Make sure we have it // Make sure we have it
// Per the reasoning above, we only return a Tributary as relevant if we're a participant // Per the reasoning above, we only return a Tributary as relevant if we're a participant
// Accordingly, we do *need* to have this Tributary now to handle it UNLESS the Tributary // Accordingly, we do *need* to have this Tributary now to handle it UNLESS the Tributary has
// has already completed and this is simply an old message // already completed and this is simply an old message (which we prior checked)
let Some(ActiveTributary { spec, tributary }) = tributaries.get(&relevant_tributary) else { let Some(ActiveTributary { spec, tributary }) = tributaries.get(&relevant_tributary) else {
// Since we don't, sleep for a fraction of a second and move to the next loop iteration // Since we don't, sleep for a fraction of a second and return false, signaling we didn't
// At the start of the loop, we'll check for new tributaries, making this eventually // handle this message
// resolve // At the start of the loop which calls this function, we'll check for new tributaries, making
// this eventually resolve
sleep(Duration::from_millis(100)).await; sleep(Duration::from_millis(100)).await;
continue; return false;
}; };
let genesis = spec.genesis(); let genesis = spec.genesis();
let pub_key = Ristretto::generator() * key.deref();
let txs = match msg.msg.clone() { let txs = match msg.msg.clone() {
ProcessorMessage::KeyGen(inner_msg) => match inner_msg { ProcessorMessage::KeyGen(inner_msg) => match inner_msg {
key_gen::ProcessorMessage::Commitments { id, commitments } => { key_gen::ProcessorMessage::Commitments { id, commitments } => {
vec![Transaction::DkgCommitments( vec![Transaction::DkgCommitments(id.attempt, commitments, Transaction::empty_signed())]
id.attempt,
commitments,
Transaction::empty_signed(),
)]
} }
key_gen::ProcessorMessage::Shares { id, mut shares } => { key_gen::ProcessorMessage::Shares { id, mut shares } => {
// Create a MuSig-based machine to inform Substrate of this key generation // Create a MuSig-based machine to inform Substrate of this key generation
let nonces = crate::tributary::dkg_confirmation_nonces(&key, spec, id.attempt); let nonces = crate::tributary::dkg_confirmation_nonces(key, spec, id.attempt);
let mut tx_shares = Vec::with_capacity(shares.len()); let mut tx_shares = Vec::with_capacity(shares.len());
for i in 1 ..= spec.n() { for i in 1 ..= spec.n() {
@@ -471,9 +400,8 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
{ {
continue; continue;
} }
tx_shares.push( tx_shares
shares.remove(&i).expect("processor didn't send share for another validator"), .push(shares.remove(&i).expect("processor didn't send share for another validator"));
);
} }
vec![Transaction::DkgShares { vec![Transaction::DkgShares {
@@ -493,7 +421,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
// Tell the Tributary the key pair, get back the share for the MuSig signature // Tell the Tributary the key pair, get back the share for the MuSig signature
let share = crate::tributary::generated_key_pair::<D>( let share = crate::tributary::generated_key_pair::<D>(
&mut txn, &mut txn,
&key, key,
spec, spec,
&(Public(substrate_key), network_key.try_into().unwrap()), &(Public(substrate_key), network_key.try_into().unwrap()),
id.attempt, id.attempt,
@@ -540,7 +468,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
first_signer: pub_key, first_signer: pub_key,
signature: SchnorrSignature { R, s: <Ristretto as Ciphersuite>::F::ZERO }, signature: SchnorrSignature { R, s: <Ristretto as Ciphersuite>::F::ZERO },
}; };
let signed = SchnorrSignature::sign(&key, r, tx.sign_completed_challenge()); let signed = SchnorrSignature::sign(key, r, tx.sign_completed_challenge());
match &mut tx { match &mut tx {
Transaction::SignCompleted { signature, .. } => { Transaction::SignCompleted { signature, .. } => {
*signature = signed; *signature = signed;
@@ -575,8 +503,11 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
// Decrease by 1, to get the ID of the Batch prior to this Batch // Decrease by 1, to get the ID of the Batch prior to this Batch
let prior_sets_last_batch = last_received - 1; let prior_sets_last_batch = last_received - 1;
loop { loop {
let successfully_verified = let successfully_verified = substrate::verify_published_batches::<D>(
verify_published_batches::<D>(&mut txn, msg.network, prior_sets_last_batch) &mut txn,
msg.network,
prior_sets_last_batch,
)
.await; .await;
if successfully_verified == Some(prior_sets_last_batch) { if successfully_verified == Some(prior_sets_last_batch) {
break; break;
@@ -680,10 +611,10 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
log::trace!("getting next nonce for Tributary TX in response to processor message"); log::trace!("getting next nonce for Tributary TX in response to processor message");
let nonce = loop { let nonce = loop {
let Some(nonce) = NonceDecider::<D>::nonce(&txn, genesis, &tx) let Some(nonce) =
.expect("signed TX didn't have nonce") NonceDecider::<D>::nonce(&txn, genesis, &tx).expect("signed TX didn't have nonce")
else { else {
// This can be None if: // This can be None if the following events occur, in order:
// 1) We scanned the relevant transaction(s) in a Tributary block // 1) We scanned the relevant transaction(s) in a Tributary block
// 2) The processor was sent a message and responded // 2) The processor was sent a message and responded
// 3) The Tributary TXN has yet to be committed // 3) The Tributary TXN has yet to be committed
@@ -693,9 +624,9 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
}; };
break nonce; break nonce;
}; };
tx.sign(&mut OsRng, genesis, &key, nonce); tx.sign(&mut OsRng, genesis, key, nonce);
publish_signed_transaction(&mut db_clone, tributary, tx).await; publish_signed_transaction(&mut txn, tributary, tx).await;
} }
} }
} }
@@ -703,11 +634,39 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
MainDb::<D>::save_handled_message(&mut txn, msg.network, msg.id); MainDb::<D>::save_handled_message(&mut txn, msg.network, msg.id);
txn.commit(); txn.commit();
true
} }
async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
mut db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>,
mut processors: Pro,
network: NetworkId,
mut new_tributary: mpsc::UnboundedReceiver<ActiveTributary<D, P>>,
) {
let mut tributaries = HashMap::new();
loop {
match new_tributary.try_recv() {
Ok(tributary) => {
let set = tributary.spec.set();
assert_eq!(set.network, network);
tributaries.insert(set.session, tributary);
}
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
panic!("handle_processor_messages new_tributary sender closed")
}
}
// TODO: Check this ID is sane (last handled ID or expected next ID)
let msg = processors.recv(network).await;
if handle_processor_message(&mut db, &key, &serai, &tributaries, network, &msg).await {
processors.ack(msg).await; processors.ack(msg).await;
} }
} }
}
pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>( pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
db: D, db: D,
@@ -863,7 +822,9 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
// TODO: This may happen if the task above is simply slow // TODO: This may happen if the task above is simply slow
panic!("tributary we don't have came to consensus on an Batch"); panic!("tributary we don't have came to consensus on an Batch");
}; };
publish_signed_transaction(&mut raw_db, tributary, tx).await; let mut txn = raw_db.txn();
publish_signed_transaction(&mut txn, tributary, tx).await;
txn.commit();
} }
} }
}; };

View File

@@ -359,59 +359,6 @@ async fn handle_new_blocks<D: Db, CNT: Clone + Fn(&mut D, TributarySpec), Pro: P
Ok(()) Ok(())
} }
pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool {
// TODO: Track this from the Substrate scanner to reduce our overhead? We'd only have a DB
// call, instead of a series of network requests
let latest = loop {
let Ok(res) = serai.get_latest_block_hash().await else {
log::error!(
"couldn't get the latest block hash from serai when checking tributary relevancy"
);
sleep(Duration::from_secs(5)).await;
continue;
};
break res;
};
let latest_session = loop {
let Ok(res) = serai.get_session(set.network, latest).await else {
log::error!("couldn't get the latest session from serai when checking tributary relevancy");
sleep(Duration::from_secs(5)).await;
continue;
};
// If the on-chain Session is None, then this Session is greater and therefore, for the
// purposes here, active
let Some(res) = res else { return true };
break res;
};
if latest_session.0 > set.session.0 {
// If we're on the Session after the Session after this Session, then this Session is
// definitively completed
if latest_session.0 > (set.session.0 + 1) {
return false;
} else {
// Since the next session has started, check its handover status
let keys = loop {
let Ok(res) = serai.get_keys(set, latest).await else {
log::error!(
"couldn't get the keys for a session from serai when checking tributary relevancy"
);
sleep(Duration::from_secs(5)).await;
continue;
};
break res;
};
// If the keys have been deleted, then this Tributary is retired
if keys.is_none() {
return false;
}
}
}
true
}
pub async fn scan_task<D: Db, Pro: Processors>( pub async fn scan_task<D: Db, Pro: Processors>(
db: D, db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>, key: Zeroizing<<Ristretto as Ciphersuite>::F>,
@@ -494,3 +441,110 @@ pub async fn scan_task<D: Db, Pro: Processors>(
} }
} }
} }
/// Returns if a ValidatorSet has yet to be retired.
pub async fn is_active_set(serai: &Serai, set: ValidatorSet) -> bool {
// TODO: Track this from the Substrate scanner to reduce our overhead? We'd only have a DB
// call, instead of a series of network requests
let latest = loop {
let Ok(res) = serai.get_latest_block_hash().await else {
log::error!(
"couldn't get the latest block hash from serai when checking tributary relevancy"
);
sleep(Duration::from_secs(5)).await;
continue;
};
break res;
};
let latest_session = loop {
let Ok(res) = serai.get_session(set.network, latest).await else {
log::error!("couldn't get the latest session from serai when checking tributary relevancy");
sleep(Duration::from_secs(5)).await;
continue;
};
// If the on-chain Session is None, then this Session is greater and therefore, for the
// purposes here, active
let Some(res) = res else { return true };
break res;
};
if latest_session.0 > set.session.0 {
// If we're on the Session after the Session after this Session, then this Session is
// definitively completed
if latest_session.0 > (set.session.0 + 1) {
return false;
} else {
// Since the next session has started, check its handover status
let keys = loop {
let Ok(res) = serai.get_keys(set, latest).await else {
log::error!(
"couldn't get the keys for a session from serai when checking tributary relevancy"
);
sleep(Duration::from_secs(5)).await;
continue;
};
break res;
};
// If the keys have been deleted, then this Tributary is retired
if keys.is_none() {
return false;
}
}
}
true
}
/// Gets the expected ID for the next Batch.
pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) -> u32 {
let mut first = true;
loop {
if !first {
log::error!("{} {network:?}", "couldn't connect to Serai node to get the next batch ID for",);
sleep(Duration::from_secs(5)).await;
}
first = false;
let Ok(latest_block) = serai.get_latest_block().await else {
continue;
};
let Ok(last) = serai.get_last_batch_for_network(latest_block.hash(), network).await else {
continue;
};
break if let Some(last) = last { last + 1 } else { 0 };
}
}
/// Verifies `Batch`s which have already been indexed from Substrate.
pub(crate) async fn verify_published_batches<D: Db>(
txn: &mut D::Transaction<'_>,
network: NetworkId,
optimistic_up_to: u32,
) -> Option<u32> {
// TODO: Localize from MainDb to SubstrateDb
let last = crate::MainDb::<D>::last_verified_batch(txn, network);
for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to {
let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(txn, network, id) else {
break;
};
let off_chain = crate::MainDb::<D>::expected_batch(txn, network, id).unwrap();
if on_chain != off_chain {
// Halt operations on this network and spin, as this is a critical fault
loop {
log::error!(
"{}! network: {:?} id: {} off-chain: {} on-chain: {}",
"on-chain batch doesn't match off-chain",
network,
id,
hex::encode(off_chain),
hex::encode(on_chain),
);
sleep(Duration::from_secs(60)).await;
}
}
crate::MainDb::<D>::save_last_verified_batch(txn, network, id);
}
crate::MainDb::<D>::last_verified_batch(txn, network)
}