Add a cosigning protocol to ensure finalizations are unique (#433)

* Add a function to deterministically decide which Serai blocks should be co-signed

Has a 5 minute latency between co-signs, also used as the maximal latency
before a co-sign is started.

* Get all active tributaries we're in at a specific block

* Add and route CosignSubstrateBlock, a new provided TX

* Split queued cosigns per network

* Rename BatchSignId to SubstrateSignId

* Add SubstrateSignableId, a meta-type for either Batch or Block, and modularize around it

* Handle the CosignSubstrateBlock provided TX

* Revert substrate_signer.rs to develop (and patch to still work)

Due to SubstrateSigner moving when the prior multisig closes, yet cosigning
occurring with the most recent key, a single SubstrateSigner can be reused.
We could manage multiple SubstrateSigners, yet considering the much lower
specifications for cosigning, I'd rather treat it distinctly.

* Route cosigning through the processor

* Add note to rename SubstrateSigner post-PR

I don't want to do so now in order to preserve the diff's clarity.

* Implement cosign evaluation into the coordinator

* Get tests to compile

* Bug fixes, mark blocks without cosigners available as cosigned

* Correct the ID Batch preprocesses are saved under, add log statements

* Create a dedicated function to handle cosigns

* Correct the flow around Batch verification/queueing

Verifying `Batch`s could stall when a `Batch` was signed before its
predecessors/before the block it's contained in was cosigned (the latter being
inevitable as we can't sign a block containing a signed batch before signing
the batch).

Now, Batch verification happens on a distinct async task in order to not block
the handling of processor messages. This task is the sole caller of verify in
order to ensure last_verified_batch isn't unexpectedly mutated.

When the processor message handler needs to access it, or needs to queue a
Batch, it associates the DB TXN with a lock preventing the other task from
doing so.

This lock, as currently implemented, is a poor and inefficient design. It
should be modified to the pattern used for cosign management. Additionally, a
new primitive of a DB-backed channel may be immensely valuable.

Fixes a standing potential deadlock and a deadlock introduced with the
cosigning protocol.

* Working full-stack tests

After the last commit, this only required extending a timeout.

* Replace "co-sign" with "cosign" to make finding text easier

* Update the coordinator tests to support cosigning

* Inline prior_batch calculation to prevent panic on rotation

Noticed when doing a final review of the branch.
This commit is contained in:
Luke Parker
2023-11-15 16:57:21 -05:00
committed by GitHub
parent 79e4cce2f6
commit 96f1d26f7a
29 changed files with 1900 additions and 348 deletions

View File

@@ -0,0 +1,209 @@
use core::time::Duration;
use std::{
sync::{Arc, Mutex, RwLock},
collections::{HashSet, HashMap},
};
use tokio::{sync::mpsc, time::sleep};
use scale::Encode;
use sp_application_crypto::RuntimePublic;
use serai_client::{
primitives::{NETWORKS, NetworkId, Signature},
validator_sets::primitives::{Session, ValidatorSet},
SeraiError, Serai,
};
use serai_db::{DbTxn, Db};
use processor_messages::coordinator::cosign_block_msg;
use crate::{
p2p::{CosignedBlock, P2pMessageKind, P2p},
substrate::SubstrateDb,
};
pub struct CosignEvaluator<D: Db> {
db: Mutex<D>,
serai: Arc<Serai>,
stakes: RwLock<Option<HashMap<NetworkId, u64>>>,
latest_cosigns: RwLock<HashMap<NetworkId, (u64, CosignedBlock)>>,
}
impl<D: Db> CosignEvaluator<D> {
fn update_latest_cosign(&self) {
let stakes_lock = self.stakes.read().unwrap();
// If we haven't gotten the stake data yet, return
let Some(stakes) = stakes_lock.as_ref() else { return };
let total_stake = stakes.values().cloned().sum::<u64>();
let latest_cosigns = self.latest_cosigns.read().unwrap();
let mut highest_block = 0;
for (block_num, _) in latest_cosigns.values() {
let mut networks = HashSet::new();
for (network, (sub_block_num, _)) in &*latest_cosigns {
if sub_block_num >= block_num {
networks.insert(network);
}
}
let sum_stake =
networks.into_iter().map(|network| stakes.get(network).unwrap_or(&0)).sum::<u64>();
let needed_stake = ((total_stake * 2) / 3) + 1;
if (total_stake == 0) || (sum_stake > needed_stake) {
highest_block = highest_block.max(*block_num);
}
}
let mut db_lock = self.db.lock().unwrap();
let mut txn = db_lock.txn();
if highest_block > SubstrateDb::<D>::latest_cosigned_block(&txn) {
log::info!("setting latest cosigned block to {}", highest_block);
SubstrateDb::<D>::set_latest_cosigned_block(&mut txn, highest_block);
}
txn.commit();
}
async fn update_stakes(&self) -> Result<(), SeraiError> {
let serai = self.serai.as_of(self.serai.latest_block_hash().await?);
let mut stakes = HashMap::new();
for network in NETWORKS {
// Use if this network has published a Batch for a short-circuit of if they've ever set a key
let set_key = serai.in_instructions().last_batch_for_network(network).await?.is_some();
if set_key {
stakes.insert(
network,
serai
.validator_sets()
.total_allocated_stake(network)
.await?
.expect("network which published a batch didn't have a stake set")
.0,
);
}
}
// Since we've successfully built stakes, set it
*self.stakes.write().unwrap() = Some(stakes);
self.update_latest_cosign();
Ok(())
}
// Uses Err to signify a message should be retried
async fn handle_new_cosign(&self, cosign: CosignedBlock) -> Result<(), SeraiError> {
let Some(block) = self.serai.block(cosign.block).await? else {
log::warn!("received cosign for an unknown block");
return Ok(());
};
// If this an old cosign, don't bother handling it
if block.number() <
self.latest_cosigns.read().unwrap().get(&cosign.network).map(|cosign| cosign.0).unwrap_or(0)
{
log::debug!("received old cosign from {:?}", cosign.network);
return Ok(());
}
// Get the key for this network as of the prior block
let serai = self.serai.as_of(block.header().parent_hash.into());
let Some(latest_session) = serai.validator_sets().session(cosign.network).await? else {
log::warn!("received cosign from {:?}, which doesn't yet have a session", cosign.network);
return Ok(());
};
let prior_session = Session(latest_session.0.saturating_sub(1));
let set_with_keys = if serai
.validator_sets()
.keys(ValidatorSet { network: cosign.network, session: prior_session })
.await?
.is_some()
{
ValidatorSet { network: cosign.network, session: prior_session }
} else {
ValidatorSet { network: cosign.network, session: latest_session }
};
let Some(keys) = serai.validator_sets().keys(set_with_keys).await? else {
log::warn!("received cosign for a block we didn't have keys for");
return Ok(());
};
if !keys.0.verify(&cosign_block_msg(cosign.block), &Signature(cosign.signature)) {
log::warn!("received cosigned block with an invalid signature");
return Ok(());
}
log::info!("received cosign for block {} by {:?}", block.number(), cosign.network);
self.latest_cosigns.write().unwrap().insert(cosign.network, (block.number(), cosign));
self.update_latest_cosign();
Ok(())
}
#[allow(clippy::new_ret_no_self)]
pub fn new<P: P2p>(db: D, p2p: P, serai: Arc<Serai>) -> mpsc::UnboundedSender<CosignedBlock> {
let evaluator = Arc::new(Self {
db: Mutex::new(db),
serai,
stakes: RwLock::new(None),
latest_cosigns: RwLock::new(HashMap::new()),
});
// Spawn a task to update stakes regularly
tokio::spawn({
let evaluator = evaluator.clone();
async move {
loop {
// Run this until it passes
while evaluator.update_stakes().await.is_err() {
log::warn!("couldn't update stakes in the cosign evaluator");
// Try again in 10 seconds
sleep(Duration::from_secs(10)).await;
}
// Run it every 10 minutes as we don't need the exact stake data for this to be valid
sleep(Duration::from_secs(10 * 60)).await;
}
}
});
// Spawn a task to receive cosigns and handle them
let (send, mut recv) = mpsc::unbounded_channel();
tokio::spawn({
let evaluator = evaluator.clone();
async move {
while let Some(msg) = recv.recv().await {
while evaluator.handle_new_cosign(msg).await.is_err() {
// Try again in 10 seconds
sleep(Duration::from_secs(10)).await;
}
}
}
});
// Spawn a task to rebroadcast the most recent cosigns
tokio::spawn({
async move {
loop {
let cosigns = evaluator
.latest_cosigns
.read()
.unwrap()
.values()
.map(|cosign| cosign.1)
.collect::<Vec<_>>();
for cosign in cosigns {
P2p::broadcast(&p2p, P2pMessageKind::CosignedBlock, cosign.encode()).await;
}
sleep(Duration::from_secs(60)).await;
}
}
});
// Return the channel to send cosigns
send
}
}

View File

@@ -1,6 +1,6 @@
use core::ops::Deref;
use std::{
sync::Arc,
sync::{OnceLock, Arc},
time::Duration,
collections::{VecDeque, HashSet, HashMap},
};
@@ -18,6 +18,7 @@ use frost::Participant;
use serai_db::{DbTxn, Db};
use serai_env as env;
use scale::Encode;
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
@@ -27,7 +28,7 @@ use serai_client::{
use message_queue::{Service, client::MessageQueue};
use tokio::{
sync::{RwLock, mpsc, broadcast},
sync::{Mutex, RwLock, mpsc, broadcast},
time::sleep,
};
@@ -46,13 +47,20 @@ use db::MainDb;
mod p2p;
pub use p2p::*;
use processor_messages::{key_gen, sign, coordinator, ProcessorMessage};
use processor_messages::{
key_gen, sign,
coordinator::{self, SubstrateSignableId},
ProcessorMessage,
};
pub mod processors;
use processors::Processors;
mod substrate;
use substrate::SubstrateDb;
use substrate::{CosignTransactions, SubstrateDb};
mod cosign_evaluator;
use cosign_evaluator::CosignEvaluator;
#[cfg(test)]
pub mod tests;
@@ -162,10 +170,16 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
}
}
// TODO: Find a better pattern for this
static HANDOVER_VERIFY_QUEUE_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
#[allow(clippy::too_many_arguments)]
async fn handle_processor_message<D: Db, P: P2p>(
db: &mut D,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: &Serai,
p2p: &P,
cosign_channel: &mpsc::UnboundedSender<CosignedBlock>,
tributaries: &HashMap<Session, ActiveTributary<D, P>>,
network: NetworkId,
msg: &processors::Message,
@@ -174,6 +188,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
return true;
}
let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await;
let mut txn = db.txn();
let mut relevant_tributary = match &msg.msg {
@@ -270,12 +285,29 @@ async fn handle_processor_message<D: Db, P: P2p>(
coordinator::ProcessorMessage::InvalidParticipant { id, .. } => {
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
}
coordinator::ProcessorMessage::CosignPreprocess { id, .. } => {
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
}
coordinator::ProcessorMessage::BatchPreprocess { id, .. } => {
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
}
coordinator::ProcessorMessage::BatchShare { id, .. } => {
coordinator::ProcessorMessage::SubstrateShare { id, .. } => {
Some(SubstrateDb::<D>::session_for_key(&txn, &id.key).unwrap())
}
coordinator::ProcessorMessage::CosignedBlock { block, signature } => {
let cosigned_block = CosignedBlock {
network,
block: *block,
signature: {
let mut arr = [0; 64];
arr.copy_from_slice(signature);
arr
},
};
cosign_channel.send(cosigned_block).unwrap();
P2p::broadcast(p2p, P2pMessageKind::CosignedBlock, cosigned_block.encode()).await;
None
}
},
// These don't return a relevant Tributary as there's no Tributary with action expected
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
@@ -284,20 +316,7 @@ async fn handle_processor_message<D: Db, P: P2p>(
batch.network, msg.network,
"processor sent us a batch for a different network than it was for",
);
let this_batch_id = batch.id;
MainDb::<D>::save_expected_batch(&mut txn, batch);
// Re-define batch
// We can't drop it, yet it shouldn't be accidentally used in the following block
#[allow(clippy::let_unit_value, unused_variables)]
let batch = ();
// This won't be complete, as this call is when a `Batch` message is received, which
// will be before we get a `SignedBatch`
// It is, however, incremental
// When we need a complete version, we use another call, continuously called as-needed
substrate::verify_published_batches::<D>(&mut txn, msg.network, this_batch_id).await;
None
}
// If this is a new Batch, immediately publish it (if we can)
@@ -323,8 +342,6 @@ async fn handle_processor_message<D: Db, P: P2p>(
next += 1;
}
let start_id = batches.front().map(|batch| batch.batch.id);
let last_id = batches.back().map(|batch| batch.batch.id);
while let Some(batch) = batches.pop_front() {
// If this Batch should no longer be published, continue
if substrate::get_expected_next_batch(serai, network).await > batch.batch.id {
@@ -357,40 +374,8 @@ async fn handle_processor_message<D: Db, P: P2p>(
sleep(Duration::from_secs(5)).await;
}
}
// Verify the `Batch`s we just published
if let Some(last_id) = last_id {
loop {
let verified =
substrate::verify_published_batches::<D>(&mut txn, msg.network, last_id).await;
if verified == Some(last_id) {
break;
}
}
}
// Check if any of these `Batch`s were a handover `Batch`
// If so, we need to publish any delayed `Batch` provided transactions
let mut relevant = None;
if let Some(start_id) = start_id {
let last_id = last_id.unwrap();
for batch in start_id .. last_id {
if let Some(set) = MainDb::<D>::is_handover_batch(&txn, msg.network, batch) {
// relevant may already be Some. This is a safe over-write, as we don't need to
// be concerned for handovers of Tributaries which have completed their handovers
// While this does bypass the checks that Tributary would've performed at the
// time, if we ever actually participate in a handover, we will verify *all*
// prior `Batch`s, including the ones which would've been explicitly verified
// then
//
// We should only declare this session relevant if it's relevant to us
// We only set handover `Batch`s when we're trying to produce said `Batch`, so this
// would be a `Batch` we were involved in the production of
// Accordingly, iy's relevant
relevant = Some(set.session);
}
}
}
relevant
None
}
},
};
@@ -598,10 +583,18 @@ async fn handle_processor_message<D: Db, P: P2p>(
// slash) and censor transactions (yet don't explicitly ban)
vec![]
}
coordinator::ProcessorMessage::CosignPreprocess { id, preprocesses } => {
vec![Transaction::SubstratePreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocesses,
signed: Transaction::empty_signed(),
})]
}
coordinator::ProcessorMessage::BatchPreprocess { id, block, preprocesses } => {
log::info!(
"informed of batch (sign ID {}, attempt {}) for block {}",
hex::encode(id.id),
hex::encode(id.id.encode()),
id.attempt,
hex::encode(block),
);
@@ -613,69 +606,79 @@ async fn handle_processor_message<D: Db, P: P2p>(
&mut txn,
spec.set().network,
RecognizedIdType::Batch,
&id.id,
&{
let SubstrateSignableId::Batch(id) = id.id else {
panic!("BatchPreprocess SubstrateSignableId wasn't Batch")
};
id.encode()
},
preprocesses,
);
let intended = Transaction::Batch(
block.0,
match id.id {
SubstrateSignableId::Batch(id) => id,
_ => panic!("BatchPreprocess did not contain Batch ID"),
},
);
// If this is the new key's first Batch, only create this TX once we verify all
// all prior published `Batch`s
// TODO: This assumes BatchPreprocess is immediately after Batch
// Ensure that assumption
let last_received = MainDb::<D>::last_received_batch(&txn, msg.network).unwrap();
let handover_batch = MainDb::<D>::handover_batch(&txn, spec.set());
if handover_batch.is_none() {
MainDb::<D>::set_handover_batch(&mut txn, spec.set(), last_received);
if last_received != 0 {
// Decrease by 1, to get the ID of the Batch prior to this Batch
let prior_sets_last_batch = last_received - 1;
// TODO: If we're looping here, we're not handling the messages we need to in order
// to create the Batch we're looking for
// Don't have the processor yield the handover batch untill the batch before is
// acknowledged on-chain?
loop {
let successfully_verified = substrate::verify_published_batches::<D>(
&mut txn,
msg.network,
prior_sets_last_batch,
)
.await;
if successfully_verified == Some(prior_sets_last_batch) {
break;
let mut queue = false;
if let Some(handover_batch) = handover_batch {
// There is a race condition here. We may verify all `Batch`s from the prior set,
// start signing the handover `Batch` `n`, start signing `n+1`, have `n+1` signed
// before `n` (or at the same time), yet then the prior set forges a malicious
// `Batch` `n`.
//
// The malicious `Batch` `n` would be publishable to Serai, as Serai can't
// distinguish what's intended to be a handover `Batch`, yet then anyone could
// publish the new set's `n+1`, causing their acceptance of the handover.
//
// To fix this, if this is after the handover `Batch` and we have yet to verify
// publication of the handover `Batch`, don't yet yield the provided.
if last_received > handover_batch {
if let Some(last_verified) = MainDb::<D>::last_verified_batch(&txn, msg.network) {
if last_verified < handover_batch {
queue = true;
}
sleep(Duration::from_secs(5)).await;
} else {
queue = true;
}
}
} else {
MainDb::<D>::set_handover_batch(&mut txn, spec.set(), last_received);
// If this isn't the first batch, meaning we do have to verify all prior batches, and
// the prior Batch hasn't been verified yet...
if (last_received != 0) &&
MainDb::<D>::last_verified_batch(&txn, msg.network)
.map(|last_verified| last_verified < (last_received - 1))
.unwrap_or(true)
{
// Withhold this TX until we verify all prior `Batch`s
queue = true;
}
}
// There is a race condition here. We may verify all `Batch`s from the prior set,
// start signing the handover `Batch` `n`, start signing `n+1`, have `n+1` signed
// before `n` (or at the same time), yet then the prior set forges a malicious
// `Batch` `n`.
//
// The malicious `Batch` `n` would be publishable to Serai, as Serai can't
// distinguish what's intended to be a handover `Batch`, yet then anyone could
// publish the new set's `n+1`, causing their acceptance of the handover.
//
// To fix this, if this is after the handover `Batch` and we have yet to verify
// publication of the handover `Batch`, don't yet yield the provided.
let handover_batch = MainDb::<D>::handover_batch(&txn, spec.set()).unwrap();
let intended = Transaction::Batch(block.0, id.id);
let mut res = vec![intended.clone()];
if last_received > handover_batch {
if let Some(last_verified) = MainDb::<D>::last_verified_batch(&txn, msg.network) {
if last_verified < handover_batch {
res = vec![];
}
} else {
res = vec![];
}
}
if res.is_empty() {
if queue {
MainDb::<D>::queue_batch(&mut txn, spec.set(), intended);
vec![]
} else {
// Because this is post-verification of the handover batch, take all queued `Batch`s
// now to ensure we don't provide this before an already queued Batch
// This *may* be an unreachable case due to how last_verified_batch is set, yet it
// doesn't hurt to have as a defensive pattern
let mut res = MainDb::<D>::take_queued_batches(&mut txn, spec.set());
res.push(intended);
res
}
res
} else {
vec![Transaction::BatchPreprocess(SignData {
vec![Transaction::SubstratePreprocess(SignData {
plan: id.id,
attempt: id.attempt,
data: preprocesses,
@@ -683,24 +686,19 @@ async fn handle_processor_message<D: Db, P: P2p>(
})]
}
}
coordinator::ProcessorMessage::BatchShare { id, shares } => {
vec![Transaction::BatchShare(SignData {
coordinator::ProcessorMessage::SubstrateShare { id, shares } => {
vec![Transaction::SubstrateShare(SignData {
plan: id.id,
attempt: id.attempt,
data: shares.into_iter().map(|share| share.to_vec()).collect(),
signed: Transaction::empty_signed(),
})]
}
coordinator::ProcessorMessage::CosignedBlock { .. } => unreachable!(),
},
ProcessorMessage::Substrate(inner_msg) => match inner_msg {
processor_messages::substrate::ProcessorMessage::Batch { .. } => unreachable!(),
processor_messages::substrate::ProcessorMessage::SignedBatch { .. } => {
// We only reach here if this SignedBatch triggered the publication of a handover
// Batch
// Since the handover `Batch` was successfully published and verified, we no longer
// have to worry about the above n+1 attack
MainDb::<D>::take_queued_batches(&mut txn, spec.set())
}
processor_messages::substrate::ProcessorMessage::SignedBatch { .. } => unreachable!(),
},
};
@@ -766,11 +764,14 @@ async fn handle_processor_message<D: Db, P: P2p>(
true
}
#[allow(clippy::too_many_arguments)]
async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
mut db: D,
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>,
mut processors: Pro,
p2p: P,
cosign_channel: mpsc::UnboundedSender<CosignedBlock>,
network: NetworkId,
mut tributary_event: mpsc::UnboundedReceiver<TributaryEvent<D, P>>,
) {
@@ -794,10 +795,154 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
}
// TODO: Check this ID is sane (last handled ID or expected next ID)
let msg = processors.recv(network).await;
if handle_processor_message(&mut db, &key, &serai, &tributaries, network, &msg).await {
let Ok(msg) = tokio::time::timeout(Duration::from_secs(1), processors.recv(network)).await
else {
continue;
};
log::trace!("entering handle_processor_message for {:?}", network);
if handle_processor_message(
&mut db,
&key,
&serai,
&p2p,
&cosign_channel,
&tributaries,
network,
&msg,
)
.await
{
processors.ack(msg).await;
}
log::trace!("exited handle_processor_message for {:?}", network);
}
}
#[allow(clippy::too_many_arguments)]
async fn handle_cosigns_and_batch_publication<D: Db, P: P2p>(
mut db: D,
network: NetworkId,
mut tributary_event: mpsc::UnboundedReceiver<TributaryEvent<D, P>>,
) {
let mut tributaries = HashMap::new();
'outer: loop {
// TODO: Create a better async flow for this, as this does still hammer this task
tokio::task::yield_now().await;
match tributary_event.try_recv() {
Ok(event) => match event {
TributaryEvent::NewTributary(tributary) => {
let set = tributary.spec.set();
assert_eq!(set.network, network);
tributaries.insert(set.session, tributary);
}
TributaryEvent::TributaryRetired(set) => {
tributaries.remove(&set.session);
}
},
Err(mpsc::error::TryRecvError::Empty) => {}
Err(mpsc::error::TryRecvError::Disconnected) => {
panic!("handle_processor_messages tributary_event sender closed")
}
}
// Handle pending cosigns
while let Some((session, block, hash)) = CosignTransactions::peek_cosign(&db, network) {
let Some(ActiveTributary { spec, tributary }) = tributaries.get(&session) else {
log::warn!("didn't yet have tributary we're supposed to cosign with");
break;
};
log::info!(
"{network:?} {session:?} cosigning block #{block} (hash {}...)",
hex::encode(&hash[.. 8])
);
let tx = Transaction::CosignSubstrateBlock(hash);
let res = tributary.provide_transaction(tx.clone()).await;
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
if res == Err(ProvidedError::LocalMismatchesOnChain) {
// Spin, since this is a crit for this Tributary
loop {
log::error!(
"{}. tributary: {}, provided: {:?}",
"tributary added distinct CosignSubstrateBlock",
hex::encode(spec.genesis()),
&tx,
);
sleep(Duration::from_secs(60)).await;
}
}
panic!("provided an invalid CosignSubstrateBlock: {res:?}");
}
CosignTransactions::take_cosign(db.txn(), network);
}
// Verify any publifshed `Batch`s
{
let _hvq_lock = HANDOVER_VERIFY_QUEUE_LOCK.get_or_init(|| Mutex::new(())).lock().await;
let mut txn = db.txn();
let mut to_publish = vec![];
let start_id = MainDb::<D>::last_verified_batch(&txn, network)
.map(|already_verified| already_verified + 1)
.unwrap_or(0);
if let Some(last_id) =
substrate::verify_published_batches::<D>(&mut txn, network, u32::MAX).await
{
// Check if any of these `Batch`s were a handover `Batch` or the `Batch` before a handover
// `Batch`
// If so, we need to publish queued provided `Batch` transactions
for batch in start_id ..= last_id {
let is_pre_handover = MainDb::<D>::is_handover_batch(&txn, network, batch + 1);
if let Some(set) = is_pre_handover {
let mut queued = MainDb::<D>::take_queued_batches(&mut txn, set);
// is_handover_batch is only set for handover `Batch`s we're participating in, making
// this safe
if queued.is_empty() {
panic!("knew the next Batch was a handover yet didn't queue it");
}
// Only publish the handover Batch
to_publish.push((set.session, queued.remove(0)));
// Re-queue the remaining batches
for remaining in queued {
MainDb::<D>::queue_batch(&mut txn, set, remaining);
}
}
let is_handover = MainDb::<D>::is_handover_batch(&txn, network, batch);
if let Some(set) = is_handover {
for queued in MainDb::<D>::take_queued_batches(&mut txn, set) {
to_publish.push((set.session, queued));
}
}
}
}
for (session, tx) in to_publish {
let Some(ActiveTributary { spec, tributary }) = tributaries.get(&session) else {
log::warn!("didn't yet have tributary we're supposed to provide a queued Batch for");
// Safe since this will drop the txn updating the most recently queued batch
continue 'outer;
};
let res = tributary.provide_transaction(tx.clone()).await;
if !(res.is_ok() || (res == Err(ProvidedError::AlreadyProvided))) {
if res == Err(ProvidedError::LocalMismatchesOnChain) {
// Spin, since this is a crit for this Tributary
loop {
log::error!(
"{}. tributary: {}, provided: {:?}",
"tributary added distinct Batch",
hex::encode(spec.genesis()),
&tx,
);
sleep(Duration::from_secs(60)).await;
}
}
panic!("provided an invalid Batch: {res:?}");
}
}
txn.commit();
}
}
}
@@ -806,6 +951,8 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
serai: Arc<Serai>,
processors: Pro,
p2p: P,
cosign_channel: mpsc::UnboundedSender<CosignedBlock>,
mut tributary_event: broadcast::Receiver<TributaryEvent<D, P>>,
) {
let mut channels = HashMap::new();
@@ -813,26 +960,34 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
if network == NetworkId::Serai {
continue;
}
let (send, recv) = mpsc::unbounded_channel();
let (processor_send, processor_recv) = mpsc::unbounded_channel();
tokio::spawn(handle_processor_messages(
db.clone(),
key.clone(),
serai.clone(),
processors.clone(),
p2p.clone(),
cosign_channel.clone(),
network,
recv,
processor_recv,
));
channels.insert(network, send);
let (cosign_send, cosign_recv) = mpsc::unbounded_channel();
tokio::spawn(handle_cosigns_and_batch_publication(db.clone(), network, cosign_recv));
channels.insert(network, (processor_send, cosign_send));
}
// Listen to new tributary events
loop {
match tributary_event.recv().await.unwrap() {
TributaryEvent::NewTributary(tributary) => channels[&tributary.spec.set().network]
.send(TributaryEvent::NewTributary(tributary))
.unwrap(),
TributaryEvent::NewTributary(tributary) => {
let (c1, c2) = &channels[&tributary.spec.set().network];
c1.send(TributaryEvent::NewTributary(tributary.clone())).unwrap();
c2.send(TributaryEvent::NewTributary(tributary)).unwrap();
}
TributaryEvent::TributaryRetired(set) => {
channels[&set.network].send(TributaryEvent::TributaryRetired(set)).unwrap()
let (c1, c2) = &channels[&set.network];
c1.send(TributaryEvent::TributaryRetired(set)).unwrap();
c2.send(TributaryEvent::TributaryRetired(set)).unwrap();
}
};
}
@@ -944,6 +1099,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
});
move |set: ValidatorSet, genesis, id_type, id: Vec<u8>, nonce| {
log::debug!("recognized ID {:?} {}", id_type, hex::encode(&id));
let mut raw_db = raw_db.clone();
let key = key.clone();
let tributaries = tributaries.clone();
@@ -956,6 +1112,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
loop {
let Some(preprocess) = MainDb::<D>::first_preprocess(raw_db, set.network, id_type, id)
else {
log::warn!("waiting for preprocess for recognized ID");
sleep(Duration::from_millis(100)).await;
continue;
};
@@ -964,9 +1121,9 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
};
let mut tx = match id_type {
RecognizedIdType::Batch => Transaction::BatchPreprocess(SignData {
RecognizedIdType::Batch => Transaction::SubstratePreprocess(SignData {
data: get_preprocess(&raw_db, id_type, &id).await,
plan: id.try_into().unwrap(),
plan: SubstrateSignableId::Batch(id.as_slice().try_into().unwrap()),
attempt: 0,
signed: Transaction::empty_signed(),
}),
@@ -1029,11 +1186,27 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
// in a while (presumably because we're behind)
tokio::spawn(p2p::heartbeat_tributaries_task(p2p.clone(), tributary_event_listener_3));
// Create the Cosign evaluator
let cosign_channel = CosignEvaluator::new(raw_db.clone(), p2p.clone(), serai.clone());
// Handle P2P messages
tokio::spawn(p2p::handle_p2p_task(p2p, tributary_event_listener_4));
tokio::spawn(p2p::handle_p2p_task(
p2p.clone(),
cosign_channel.clone(),
tributary_event_listener_4,
));
// Handle all messages from processors
handle_processors(raw_db, key, serai, processors, tributary_event_listener_5).await;
handle_processors(
raw_db,
key,
serai,
processors,
p2p,
cosign_channel,
tributary_event_listener_5,
)
.await;
}
#[tokio::main]

View File

@@ -8,6 +8,9 @@ use std::{
use async_trait::async_trait;
use scale::{Encode, Decode};
use serai_client::primitives::NetworkId;
use serai_db::Db;
use tokio::{
@@ -37,12 +40,20 @@ use crate::{Transaction, Block, Tributary, ActiveTributary, TributaryEvent};
// TODO: Use distinct topics
const LIBP2P_TOPIC: &str = "serai-coordinator";
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug, Encode, Decode)]
pub struct CosignedBlock {
pub network: NetworkId,
pub block: [u8; 32],
pub signature: [u8; 64],
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, Debug)]
pub enum P2pMessageKind {
KeepAlive,
Tributary([u8; 32]),
Heartbeat([u8; 32]),
Block([u8; 32]),
CosignedBlock,
}
impl P2pMessageKind {
@@ -64,6 +75,9 @@ impl P2pMessageKind {
res.extend(genesis);
res
}
P2pMessageKind::CosignedBlock => {
vec![4]
}
}
}
@@ -87,6 +101,7 @@ impl P2pMessageKind {
reader.read_exact(&mut genesis).ok()?;
P2pMessageKind::Block(genesis)
}),
4 => Some(P2pMessageKind::CosignedBlock),
_ => None,
}
}
@@ -122,6 +137,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(),
}
);
self.broadcast_raw(actual_msg).await;
@@ -148,6 +164,7 @@ pub trait P2p: Send + Sync + Clone + fmt::Debug + TributaryP2p {
P2pMessageKind::Tributary(genesis) => format!("Tributary({})", hex::encode(genesis)),
P2pMessageKind::Heartbeat(genesis) => format!("Heartbeat({})", hex::encode(genesis)),
P2pMessageKind::Block(genesis) => format!("Block({})", hex::encode(genesis)),
P2pMessageKind::CosignedBlock => "CosignedBlock".to_string(),
}
);
Message { sender, kind, msg }
@@ -433,6 +450,7 @@ pub async fn heartbeat_tributaries_task<D: Db, P: P2p>(
pub async fn handle_p2p_task<D: Db, P: P2p>(
p2p: P,
cosign_channel: mpsc::UnboundedSender<CosignedBlock>,
mut tributary_event: broadcast::Receiver<TributaryEvent<D, P>>,
) {
let channels = Arc::new(RwLock::new(HashMap::<_, mpsc::UnboundedSender<Message<P>>>::new()));
@@ -562,6 +580,8 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
res
);
}
P2pMessageKind::CosignedBlock => unreachable!(),
}
}
}
@@ -596,6 +616,14 @@ pub async fn handle_p2p_task<D: Db, P: P2p>(
channel.send(msg).unwrap();
}
}
P2pMessageKind::CosignedBlock => {
let mut msg_ref: &[u8] = msg.msg.as_ref();
let Ok(msg) = CosignedBlock::decode(&mut scale::IoReader(&mut msg_ref)) else {
log::error!("received CosignedBlock message with invalidly serialized contents");
continue;
};
cosign_channel.send(msg).unwrap();
}
}
}
}

View File

@@ -1,12 +1,79 @@
use std::sync::{OnceLock, MutexGuard, Mutex};
use scale::{Encode, Decode};
pub use serai_db::*;
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, KeyPair},
validator_sets::primitives::{Session, ValidatorSet, KeyPair},
};
create_db! {
NewSubstrateDb {
CosignTriggered: () -> (),
IntendedCosign: () -> (u64, Option<u64>),
BlockHasEvents: (block: u64) -> u8,
CosignTransactions: (network: NetworkId) -> Vec<(Session, u64, [u8; 32])>
}
}
impl IntendedCosign {
pub fn set_intended_cosign(txn: &mut impl DbTxn, intended: u64) {
Self::set(txn, &(intended, None::<u64>));
}
pub fn set_skipped_cosign(txn: &mut impl DbTxn, skipped: u64) {
let (intended, prior_skipped) = Self::get(txn).unwrap();
assert!(prior_skipped.is_none());
Self::set(txn, &(intended, Some(skipped)));
}
}
// This guarantees:
// 1) Appended transactions are appended
// 2) Taking cosigns does not clear any TXs which weren't taken
// 3) Taking does actually clear the set
static COSIGN_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
pub struct CosignTxn<T: DbTxn>(T, MutexGuard<'static, ()>);
impl<T: DbTxn> CosignTxn<T> {
pub fn new(txn: T) -> Self {
Self(txn, COSIGN_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap())
}
pub fn commit(self) {
self.0.commit();
}
}
impl CosignTransactions {
// Append a cosign transaction.
pub fn append_cosign<T: DbTxn>(
txn: &mut CosignTxn<T>,
set: ValidatorSet,
number: u64,
hash: [u8; 32],
) {
#[allow(clippy::unwrap_or_default)]
let mut txs = CosignTransactions::get(&txn.0, set.network).unwrap_or(vec![]);
txs.push((set.session, number, hash));
CosignTransactions::set(&mut txn.0, set.network, &txs);
}
// Peek at the next cosign transaction.
pub fn peek_cosign(getter: &impl Get, network: NetworkId) -> Option<(Session, u64, [u8; 32])> {
let mut to_cosign = CosignTransactions::get(getter, network)?;
if to_cosign.is_empty() {
None?
}
Some(to_cosign.swap_remove(0))
}
// Take the next transaction, panicking if it doesn't exist.
pub fn take_cosign(mut txn: impl DbTxn, network: NetworkId) {
let _lock = COSIGN_LOCK.get_or_init(|| Mutex::new(())).lock().unwrap();
let mut txs = CosignTransactions::get(&txn, network).unwrap();
txs.remove(0);
CosignTransactions::set(&mut txn, network, &txs);
txn.commit();
}
}
#[derive(Debug)]
pub struct SubstrateDb<D: Db>(pub D);
impl<D: Db> SubstrateDb<D> {
@@ -18,16 +85,30 @@ impl<D: Db> SubstrateDb<D> {
D::key(b"coordinator_substrate", dst, key)
}
fn block_key() -> Vec<u8> {
Self::substrate_key(b"block", [])
fn next_block_key() -> Vec<u8> {
Self::substrate_key(b"next_block", [])
}
pub fn set_next_block(&mut self, block: u64) {
let mut txn = self.0.txn();
txn.put(Self::block_key(), block.to_le_bytes());
txn.put(Self::next_block_key(), block.to_le_bytes());
txn.commit();
}
pub fn next_block(&self) -> u64 {
u64::from_le_bytes(self.0.get(Self::block_key()).unwrap_or(vec![0; 8]).try_into().unwrap())
u64::from_le_bytes(self.0.get(Self::next_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap())
}
fn latest_cosigned_block_key() -> Vec<u8> {
Self::substrate_key(b"latest_cosigned_block", [])
}
pub fn set_latest_cosigned_block(txn: &mut D::Transaction<'_>, latest_cosigned_block: u64) {
txn.put(Self::latest_cosigned_block_key(), latest_cosigned_block.to_le_bytes());
}
pub fn latest_cosigned_block<G: Get>(getter: &G) -> u64 {
let db = u64::from_le_bytes(
getter.get(Self::latest_cosigned_block_key()).unwrap_or(vec![0; 8]).try_into().unwrap(),
);
// Mark the genesis as cosigned
db.max(1)
}
fn event_key(id: &[u8], index: u32) -> Vec<u8> {

View File

@@ -8,11 +8,12 @@ use zeroize::Zeroizing;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use scale::{Encode, Decode};
use serai_client::{
SeraiError, Block, Serai, TemporalSerai,
primitives::{BlockHash, NetworkId},
validator_sets::{
primitives::{ValidatorSet, KeyPair, amortize_excess_key_shares},
primitives::{Session, ValidatorSet, KeyPair, amortize_excess_key_shares},
ValidatorSetsEvent,
},
in_instructions::InInstructionsEvent,
@@ -363,12 +364,191 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
next_block: &mut u64,
) -> Result<(), SeraiError> {
// Check if there's been a new Substrate block
let latest = serai.latest_block().await?;
let latest_number = latest.number();
let latest_number = serai.latest_block().await?.number();
// TODO: If this block directly builds off a cosigned block *and* doesn't contain events, mark
// cosigned,
// TODO: Can we remove any of these events while maintaining security?
{
// If:
// A) This block has events and it's been at least X blocks since the last cosign or
// B) This block doesn't have events but it's been X blocks since a skipped block which did
// have events or
// C) This block key gens (which changes who the cosigners are)
// cosign this block.
const COSIGN_DISTANCE: u64 = 5 * 60 / 6; // 5 minutes, expressed in blocks
#[derive(Clone, Copy, PartialEq, Eq, Debug, Encode, Decode)]
enum HasEvents {
KeyGen,
Yes,
No,
}
async fn block_has_events(
txn: &mut impl DbTxn,
serai: &Serai,
block: u64,
) -> Result<HasEvents, SeraiError> {
let cached = BlockHasEvents::get(txn, block);
match cached {
None => {
let serai = serai.as_of(
serai
.block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized")
.hash(),
);
if !serai.validator_sets().key_gen_events().await?.is_empty() {
return Ok(HasEvents::KeyGen);
}
let has_no_events = serai.coins().burn_with_instruction_events().await?.is_empty() &&
serai.in_instructions().batch_events().await?.is_empty() &&
serai.validator_sets().new_set_events().await?.is_empty() &&
serai.validator_sets().set_retired_events().await?.is_empty();
let has_events = if has_no_events { HasEvents::No } else { HasEvents::Yes };
let has_events = has_events.encode();
assert_eq!(has_events.len(), 1);
BlockHasEvents::set(txn, block, &has_events[0]);
Ok(HasEvents::Yes)
}
Some(code) => Ok(HasEvents::decode(&mut [code].as_slice()).unwrap()),
}
}
let mut txn = db.0.txn();
let Some((last_intended_to_cosign_block, mut skipped_block)) = IntendedCosign::get(&txn) else {
IntendedCosign::set_intended_cosign(&mut txn, 1);
txn.commit();
return Ok(());
};
// If we haven't flagged skipped, and a block within the distance had events, flag the first
// such block as skipped
let mut distance_end_exclusive = last_intended_to_cosign_block + COSIGN_DISTANCE;
// If we've never triggered a cosign, don't skip any cosigns
if CosignTriggered::get(&txn).is_none() {
distance_end_exclusive = 0;
}
if skipped_block.is_none() {
for b in (last_intended_to_cosign_block + 1) .. distance_end_exclusive {
if b > latest_number {
break;
}
if block_has_events(&mut txn, serai, b).await? == HasEvents::Yes {
skipped_block = Some(b);
log::debug!("skipping cosigning {b} due to proximity to prior cosign");
IntendedCosign::set_skipped_cosign(&mut txn, b);
break;
}
}
}
let mut has_no_cosigners = None;
let mut cosign = vec![];
// Block we should cosign no matter what if no prior blocks qualified for cosigning
let maximally_latent_cosign_block =
skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE);
for block in (last_intended_to_cosign_block + 1) ..= latest_number {
let mut set = false;
let block_has_events = block_has_events(&mut txn, serai, block).await?;
// If this block is within the distance,
if block < distance_end_exclusive {
// and set a key, cosign it
if block_has_events == HasEvents::KeyGen {
IntendedCosign::set_intended_cosign(&mut txn, block);
set = true;
// Carry skipped if it isn't included by cosigning this block
if let Some(skipped) = skipped_block {
if skipped > block {
IntendedCosign::set_skipped_cosign(&mut txn, block);
}
}
}
} else if (Some(block) == maximally_latent_cosign_block) ||
(block_has_events != HasEvents::No)
{
// Since this block was outside the distance and had events/was maximally latent, cosign it
IntendedCosign::set_intended_cosign(&mut txn, block);
set = true;
}
if set {
// Get the keys as of the prior block
// That means if this block is setting new keys (which won't lock in until we process this
// block), we won't freeze up waiting for the yet-to-be-processed keys to sign this block
let actual_block = serai
.block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized");
let serai = serai.as_of(actual_block.header().parent_hash.into());
has_no_cosigners = Some(actual_block.clone());
for network in serai_client::primitives::NETWORKS {
// Get the latest session to have set keys
let Some(latest_session) = serai.validator_sets().session(network).await? else {
continue;
};
let prior_session = Session(latest_session.0.saturating_sub(1));
let set_with_keys = if serai
.validator_sets()
.keys(ValidatorSet { network, session: prior_session })
.await?
.is_some()
{
ValidatorSet { network, session: prior_session }
} else {
let set = ValidatorSet { network, session: latest_session };
if serai.validator_sets().keys(set).await?.is_none() {
continue;
}
set
};
// Since this is a valid cosigner, don't flag this block as having no cosigners
has_no_cosigners = None;
log::debug!("{:?} will be cosigning {block}", set_with_keys.network);
if in_set(key, &serai, set_with_keys).await?.unwrap() {
cosign.push((set_with_keys, block, actual_block.hash()));
}
}
break;
}
}
// If this block doesn't have cosigners, yet does have events, automatically mark it as
// cosigned
if let Some(has_no_cosigners) = has_no_cosigners {
log::debug!("{} had no cosigners available, marking as cosigned", has_no_cosigners.number());
SubstrateDb::<D>::set_latest_cosigned_block(&mut txn, has_no_cosigners.number());
txn.commit();
} else {
CosignTriggered::set(&mut txn, &());
let mut txn = CosignTxn::new(txn);
for (set, block, hash) in cosign {
log::debug!("cosigning {block} with {:?} {:?}", set.network, set.session);
CosignTransactions::append_cosign(&mut txn, set, block, hash);
}
txn.commit();
}
}
// Reduce to the latest cosigned block
let latest_number = latest_number.min(SubstrateDb::<D>::latest_cosigned_block(&db.0));
if latest_number < *next_block {
return Ok(());
}
let mut latest = Some(latest);
for b in *next_block ..= latest_number {
log::info!("found substrate block {b}");
@@ -379,14 +559,10 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
tributary_retired,
processors,
serai,
if b == latest_number {
latest.take().unwrap()
} else {
serai
.block_by_number(b)
.await?
.expect("couldn't get block before the latest finalized block")
},
serai
.block_by_number(b)
.await?
.expect("couldn't get block before the latest finalized block"),
)
.await?;
*next_block += 1;
@@ -495,7 +671,9 @@ pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) -
/// Verifies `Batch`s which have already been indexed from Substrate.
///
/// This has a slight malleability in that doesn't verify *who* published a Batch is as expected.
/// Spins if a distinct `Batch` is detected on-chain.
///
/// This has a slight malleability in that doesn't verify *who* published a `Batch` is as expected.
/// This is deemed fine.
pub(crate) async fn verify_published_batches<D: Db>(
txn: &mut D::Transaction<'_>,

View File

@@ -3,7 +3,10 @@ use std::sync::Arc;
use rand_core::OsRng;
use tokio::{sync::broadcast, time::sleep};
use tokio::{
sync::{mpsc, broadcast},
time::sleep,
};
use serai_db::MemDb;
@@ -32,7 +35,8 @@ async fn handle_p2p_test() {
let tributary = Arc::new(tributary);
tributary_arcs.push(tributary.clone());
let (new_tributary_send, new_tributary_recv) = broadcast::channel(5);
tokio::spawn(handle_p2p_task(p2p, new_tributary_recv));
let (cosign_send, _) = mpsc::unbounded_channel();
tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv));
new_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary }))
.map_err(|_| "failed to send ActiveTributary")

View File

@@ -2,6 +2,9 @@ use core::fmt::Debug;
use rand_core::{RngCore, OsRng};
use scale::{Encode, Decode};
use processor_messages::coordinator::SubstrateSignableId;
use tributary::{ReadWrite, tests::random_signed};
use crate::tributary::{SignData, Transaction};
@@ -28,10 +31,10 @@ fn random_vec<R: RngCore>(rng: &mut R, limit: usize) -> Vec<u8> {
res
}
fn random_sign_data<R: RngCore, const N: usize>(rng: &mut R) -> SignData<N> {
let mut plan = [0; N];
rng.fill_bytes(&mut plan);
fn random_sign_data<R: RngCore, Id: Clone + PartialEq + Eq + Debug + Encode + Decode>(
rng: &mut R,
plan: Id,
) -> SignData<Id> {
SignData {
plan,
attempt: random_u32(&mut OsRng),
@@ -80,10 +83,18 @@ fn tx_size_limit() {
#[test]
fn serialize_sign_data() {
test_read_write(random_sign_data::<_, 3>(&mut OsRng));
test_read_write(random_sign_data::<_, 8>(&mut OsRng));
test_read_write(random_sign_data::<_, 16>(&mut OsRng));
test_read_write(random_sign_data::<_, 24>(&mut OsRng));
let mut plan = [0; 3];
OsRng.fill_bytes(&mut plan);
test_read_write(random_sign_data::<_, _>(&mut OsRng, plan));
let mut plan = [0; 5];
OsRng.fill_bytes(&mut plan);
test_read_write(random_sign_data::<_, _>(&mut OsRng, plan));
let mut plan = [0; 8];
OsRng.fill_bytes(&mut plan);
test_read_write(random_sign_data::<_, _>(&mut OsRng, plan));
let mut plan = [0; 24];
OsRng.fill_bytes(&mut plan);
test_read_write(random_sign_data::<_, _>(&mut OsRng, plan));
}
#[test]
@@ -168,6 +179,12 @@ fn serialize_transaction() {
random_signed(&mut OsRng),
));
{
let mut block = [0; 32];
OsRng.fill_bytes(&mut block);
test_read_write(Transaction::CosignSubstrateBlock(block));
}
{
let mut block = [0; 32];
OsRng.fill_bytes(&mut block);
@@ -177,11 +194,33 @@ fn serialize_transaction() {
}
test_read_write(Transaction::SubstrateBlock(OsRng.next_u64()));
test_read_write(Transaction::BatchPreprocess(random_sign_data(&mut OsRng)));
test_read_write(Transaction::BatchShare(random_sign_data(&mut OsRng)));
{
let mut plan = [0; 5];
OsRng.fill_bytes(&mut plan);
test_read_write(Transaction::SubstratePreprocess(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(plan),
)));
}
{
let mut plan = [0; 5];
OsRng.fill_bytes(&mut plan);
test_read_write(Transaction::SubstrateShare(random_sign_data(
&mut OsRng,
SubstrateSignableId::Batch(plan),
)));
}
test_read_write(Transaction::SignPreprocess(random_sign_data(&mut OsRng)));
test_read_write(Transaction::SignShare(random_sign_data(&mut OsRng)));
{
let mut plan = [0; 32];
OsRng.fill_bytes(&mut plan);
test_read_write(Transaction::SignPreprocess(random_sign_data(&mut OsRng, plan)));
}
{
let mut plan = [0; 32];
OsRng.fill_bytes(&mut plan);
test_read_write(Transaction::SignShare(random_sign_data(&mut OsRng, plan)));
}
{
let mut plan = [0; 32];

View File

@@ -5,7 +5,10 @@ use rand_core::OsRng;
use ciphersuite::{group::GroupEncoding, Ciphersuite, Ristretto};
use tokio::{sync::broadcast, time::sleep};
use tokio::{
sync::{mpsc, broadcast},
time::sleep,
};
use serai_db::MemDb;
@@ -42,7 +45,8 @@ async fn sync_test() {
let tributary = Arc::new(tributary);
tributary_arcs.push(tributary.clone());
let (new_tributary_send, new_tributary_recv) = broadcast::channel(5);
let thread = tokio::spawn(handle_p2p_task(p2p, new_tributary_recv));
let (cosign_send, _) = mpsc::unbounded_channel();
let thread = tokio::spawn(handle_p2p_task(p2p, cosign_send, new_tributary_recv));
new_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary { spec: spec.clone(), tributary }))
.map_err(|_| "failed to send ActiveTributary")
@@ -77,7 +81,8 @@ async fn sync_test() {
let syncer_key = Ristretto::generator() * *syncer_key;
let syncer_tributary = Arc::new(syncer_tributary);
let (syncer_tributary_send, syncer_tributary_recv) = broadcast::channel(5);
tokio::spawn(handle_p2p_task(syncer_p2p.clone(), syncer_tributary_recv));
let (cosign_send, _) = mpsc::unbounded_channel();
tokio::spawn(handle_p2p_task(syncer_p2p.clone(), cosign_send, syncer_tributary_recv));
syncer_tributary_send
.send(TributaryEvent::NewTributary(ActiveTributary {
spec: spec.clone(),

View File

@@ -9,6 +9,8 @@ use frost::Participant;
use serai_client::validator_sets::primitives::{ValidatorSet, KeyPair};
use processor_messages::coordinator::SubstrateSignableId;
pub use serai_db::*;
use crate::tributary::TributarySpec;
@@ -16,16 +18,21 @@ use crate::tributary::TributarySpec;
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Topic {
Dkg,
Batch([u8; 5]),
SubstrateSign(SubstrateSignableId),
Sign([u8; 32]),
}
impl Topic {
fn as_key(&self, genesis: [u8; 32]) -> Vec<u8> {
let mut res = genesis.to_vec();
#[allow(unused_assignments)] // False positive
let mut id_buf = vec![];
let (label, id) = match self {
Topic::Dkg => (b"dkg".as_slice(), [].as_slice()),
Topic::Batch(id) => (b"batch".as_slice(), id.as_slice()),
Topic::SubstrateSign(id) => {
id_buf = id.encode();
(b"substrate_sign".as_slice(), id_buf.as_slice())
}
Topic::Sign(id) => (b"sign".as_slice(), id.as_slice()),
};
res.push(u8::try_from(label.len()).unwrap());

View File

@@ -18,7 +18,7 @@ use tributary::{Signed, TransactionKind, TransactionTrait};
use processor_messages::{
key_gen::{self, KeyGenId},
coordinator::{self, BatchSignId},
coordinator::{self, SubstrateSignableId, SubstrateSignId},
sign::{self, SignId},
};
@@ -498,10 +498,50 @@ pub(crate) async fn handle_application_tx<
}
}
Transaction::CosignSubstrateBlock(hash) => {
TributaryDb::<D>::recognize_topic(
txn,
genesis,
Topic::SubstrateSign(SubstrateSignableId::CosigningSubstrateBlock(hash)),
);
NonceDecider::handle_substrate_signable(
txn,
genesis,
SubstrateSignableId::CosigningSubstrateBlock(hash),
);
let key = loop {
let Some(key_pair) = TributaryDb::<D>::key_pair(txn, spec.set()) else {
// This can happen based on a timing condition
log::warn!("CosignSubstrateBlock yet keys weren't set yet");
tokio::time::sleep(core::time::Duration::from_secs(1)).await;
continue;
};
break key_pair.0.into();
};
processors
.send(
spec.set().network,
coordinator::CoordinatorMessage::CosignSubstrateBlock {
id: SubstrateSignId {
key,
id: SubstrateSignableId::CosigningSubstrateBlock(hash),
attempt: 0,
},
},
)
.await;
}
Transaction::Batch(_, batch) => {
// Because this Batch has achieved synchrony, its batch ID should be authorized
TributaryDb::<D>::recognize_topic(txn, genesis, Topic::Batch(batch));
let nonce = NonceDecider::handle_batch(txn, genesis, batch);
TributaryDb::<D>::recognize_topic(
txn,
genesis,
Topic::SubstrateSign(SubstrateSignableId::Batch(batch)),
);
let nonce =
NonceDecider::handle_substrate_signable(txn, genesis, SubstrateSignableId::Batch(batch));
recognized_id(spec.set(), genesis, RecognizedIdType::Batch, batch.to_vec(), nonce).await;
}
@@ -518,14 +558,14 @@ pub(crate) async fn handle_application_tx<
}
}
Transaction::BatchPreprocess(data) => {
Transaction::SubstratePreprocess(data) => {
let Ok(_) = check_sign_data_len::<D>(txn, spec, data.signed.signer, data.data.len()) else {
return;
};
match handle(
txn,
&DataSpecification {
topic: Topic::Batch(data.plan),
topic: Topic::SubstrateSign(data.plan),
label: BATCH_PREPROCESS,
attempt: data.attempt,
},
@@ -534,13 +574,13 @@ pub(crate) async fn handle_application_tx<
) {
Accumulation::Ready(DataSet::Participating(mut preprocesses)) => {
unflatten(spec, &mut preprocesses);
NonceDecider::selected_for_signing_batch(txn, genesis, data.plan);
NonceDecider::selected_for_signing_substrate(txn, genesis, data.plan);
let key = TributaryDb::<D>::key_pair(txn, spec.set()).unwrap().0 .0;
processors
.send(
spec.set().network,
coordinator::CoordinatorMessage::BatchPreprocesses {
id: BatchSignId { key, id: data.plan, attempt: data.attempt },
coordinator::CoordinatorMessage::SubstratePreprocesses {
id: SubstrateSignId { key, id: data.plan, attempt: data.attempt },
preprocesses,
},
)
@@ -550,14 +590,14 @@ pub(crate) async fn handle_application_tx<
Accumulation::NotReady => {}
}
}
Transaction::BatchShare(data) => {
Transaction::SubstrateShare(data) => {
let Ok(_) = check_sign_data_len::<D>(txn, spec, data.signed.signer, data.data.len()) else {
return;
};
match handle(
txn,
&DataSpecification {
topic: Topic::Batch(data.plan),
topic: Topic::SubstrateSign(data.plan),
label: BATCH_SHARE,
attempt: data.attempt,
},
@@ -570,8 +610,8 @@ pub(crate) async fn handle_application_tx<
processors
.send(
spec.set().network,
coordinator::CoordinatorMessage::BatchShares {
id: BatchSignId { key, id: data.plan, attempt: data.attempt },
coordinator::CoordinatorMessage::SubstrateShares {
id: SubstrateSignId { key, id: data.plan, attempt: data.attempt },
shares: shares
.into_iter()
.map(|(validator, share)| (validator, share.try_into().unwrap()))

View File

@@ -1,4 +1,7 @@
use core::ops::{Deref, Range};
use core::{
ops::{Deref, Range},
fmt::Debug,
};
use std::io::{self, Read, Write};
use zeroize::Zeroizing;
@@ -15,6 +18,7 @@ use schnorr::SchnorrSignature;
use frost::Participant;
use scale::{Encode, Decode};
use processor_messages::coordinator::SubstrateSignableId;
use serai_client::{
primitives::{NetworkId, PublicKey},
@@ -167,8 +171,8 @@ impl TributarySpec {
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct SignData<const N: usize> {
pub plan: [u8; N],
pub struct SignData<Id: Clone + PartialEq + Eq + Debug + Encode + Decode> {
pub plan: Id,
pub attempt: u32,
pub data: Vec<Vec<u8>>,
@@ -176,10 +180,10 @@ pub struct SignData<const N: usize> {
pub signed: Signed,
}
impl<const N: usize> ReadWrite for SignData<N> {
impl<Id: Clone + PartialEq + Eq + Debug + Encode + Decode> ReadWrite for SignData<Id> {
fn read<R: io::Read>(reader: &mut R) -> io::Result<Self> {
let mut plan = [0; N];
reader.read_exact(&mut plan)?;
let plan = Id::decode(&mut scale::IoReader(&mut *reader))
.map_err(|_| io::Error::new(io::ErrorKind::Other, "invalid plan in SignData"))?;
let mut attempt = [0; 4];
reader.read_exact(&mut attempt)?;
@@ -208,7 +212,7 @@ impl<const N: usize> ReadWrite for SignData<N> {
}
fn write<W: io::Write>(&self, writer: &mut W) -> io::Result<()> {
writer.write_all(&self.plan)?;
writer.write_all(&self.plan.encode())?;
writer.write_all(&self.attempt.to_le_bytes())?;
writer.write_all(&[u8::try_from(self.data.len()).unwrap()])?;
@@ -253,6 +257,9 @@ pub enum Transaction {
},
DkgConfirmed(u32, [u8; 32], Signed),
// Co-sign a Substrate block.
CosignSubstrateBlock([u8; 32]),
// When we have synchrony on a batch, we can allow signing it
// TODO (never?): This is less efficient compared to an ExternalBlock provided transaction,
// which would be binding over the block hash and automatically achieve synchrony on all
@@ -263,11 +270,11 @@ pub enum Transaction {
// IDs
SubstrateBlock(u64),
BatchPreprocess(SignData<5>),
BatchShare(SignData<5>),
SubstratePreprocess(SignData<SubstrateSignableId>),
SubstrateShare(SignData<SubstrateSignableId>),
SignPreprocess(SignData<32>),
SignShare(SignData<32>),
SignPreprocess(SignData<[u8; 32]>),
SignShare(SignData<[u8; 32]>),
// This is defined as an Unsigned transaction in order to de-duplicate SignCompleted amongst
// reporters (who should all report the same thing)
// We do still track the signer in order to prevent a single signer from publishing arbitrarily
@@ -415,6 +422,12 @@ impl ReadWrite for Transaction {
}
5 => {
let mut block = [0; 32];
reader.read_exact(&mut block)?;
Ok(Transaction::CosignSubstrateBlock(block))
}
6 => {
let mut block = [0; 32];
reader.read_exact(&mut block)?;
let mut batch = [0; 5];
@@ -422,19 +435,19 @@ impl ReadWrite for Transaction {
Ok(Transaction::Batch(block, batch))
}
6 => {
7 => {
let mut block = [0; 8];
reader.read_exact(&mut block)?;
Ok(Transaction::SubstrateBlock(u64::from_le_bytes(block)))
}
7 => SignData::read(reader).map(Transaction::BatchPreprocess),
8 => SignData::read(reader).map(Transaction::BatchShare),
8 => SignData::read(reader).map(Transaction::SubstratePreprocess),
9 => SignData::read(reader).map(Transaction::SubstrateShare),
9 => SignData::read(reader).map(Transaction::SignPreprocess),
10 => SignData::read(reader).map(Transaction::SignShare),
10 => SignData::read(reader).map(Transaction::SignPreprocess),
11 => SignData::read(reader).map(Transaction::SignShare),
11 => {
12 => {
let mut plan = [0; 32];
reader.read_exact(&mut plan)?;
@@ -534,36 +547,41 @@ impl ReadWrite for Transaction {
signed.write(writer)
}
Transaction::Batch(block, batch) => {
Transaction::CosignSubstrateBlock(block) => {
writer.write_all(&[5])?;
writer.write_all(block)
}
Transaction::Batch(block, batch) => {
writer.write_all(&[6])?;
writer.write_all(block)?;
writer.write_all(batch)
}
Transaction::SubstrateBlock(block) => {
writer.write_all(&[6])?;
writer.write_all(&[7])?;
writer.write_all(&block.to_le_bytes())
}
Transaction::BatchPreprocess(data) => {
writer.write_all(&[7])?;
Transaction::SubstratePreprocess(data) => {
writer.write_all(&[8])?;
data.write(writer)
}
Transaction::BatchShare(data) => {
writer.write_all(&[8])?;
Transaction::SubstrateShare(data) => {
writer.write_all(&[9])?;
data.write(writer)
}
Transaction::SignPreprocess(data) => {
writer.write_all(&[9])?;
data.write(writer)
}
Transaction::SignShare(data) => {
writer.write_all(&[10])?;
data.write(writer)
}
Transaction::SignCompleted { plan, tx_hash, first_signer, signature } => {
Transaction::SignShare(data) => {
writer.write_all(&[11])?;
data.write(writer)
}
Transaction::SignCompleted { plan, tx_hash, first_signer, signature } => {
writer.write_all(&[12])?;
writer.write_all(plan)?;
writer
.write_all(&[u8::try_from(tx_hash.len()).expect("tx hash length exceed 255 bytes")])?;
@@ -585,11 +603,13 @@ impl TransactionTrait for Transaction {
Transaction::InvalidDkgShare { signed, .. } => TransactionKind::Signed(signed),
Transaction::DkgConfirmed(_, _, signed) => TransactionKind::Signed(signed),
Transaction::CosignSubstrateBlock(_) => TransactionKind::Provided("cosign"),
Transaction::Batch(_, _) => TransactionKind::Provided("batch"),
Transaction::SubstrateBlock(_) => TransactionKind::Provided("serai"),
Transaction::BatchPreprocess(data) => TransactionKind::Signed(&data.signed),
Transaction::BatchShare(data) => TransactionKind::Signed(&data.signed),
Transaction::SubstratePreprocess(data) => TransactionKind::Signed(&data.signed),
Transaction::SubstrateShare(data) => TransactionKind::Signed(&data.signed),
Transaction::SignPreprocess(data) => TransactionKind::Signed(&data.signed),
Transaction::SignShare(data) => TransactionKind::Signed(&data.signed),
@@ -607,7 +627,7 @@ impl TransactionTrait for Transaction {
}
fn verify(&self) -> Result<(), TransactionError> {
if let Transaction::BatchShare(data) = self {
if let Transaction::SubstrateShare(data) = self {
for data in &data.data {
if data.len() != 32 {
Err(TransactionError::InvalidContent)?;
@@ -655,11 +675,13 @@ impl Transaction {
Transaction::InvalidDkgShare { ref mut signed, .. } => signed,
Transaction::DkgConfirmed(_, _, ref mut signed) => signed,
Transaction::CosignSubstrateBlock(_) => panic!("signing CosignSubstrateBlock"),
Transaction::Batch(_, _) => panic!("signing Batch"),
Transaction::SubstrateBlock(_) => panic!("signing SubstrateBlock"),
Transaction::BatchPreprocess(ref mut data) => &mut data.signed,
Transaction::BatchShare(ref mut data) => &mut data.signed,
Transaction::SubstratePreprocess(ref mut data) => &mut data.signed,
Transaction::SubstrateShare(ref mut data) => &mut data.signed,
Transaction::SignPreprocess(ref mut data) => &mut data.signed,
Transaction::SignShare(ref mut data) => &mut data.signed,

View File

@@ -1,11 +1,13 @@
use serai_db::{Get, DbTxn, create_db};
use processor_messages::coordinator::SubstrateSignableId;
use crate::tributary::Transaction;
use scale::Encode;
const BATCH_CODE: u8 = 0;
const BATCH_SIGNING_CODE: u8 = 1;
const SUBSTRATE_CODE: u8 = 0;
const SUBSTRATE_SIGNING_CODE: u8 = 1;
const PLAN_CODE: u8 = 2;
const PLAN_SIGNING_CODE: u8 = 3;
@@ -30,9 +32,13 @@ impl NextNonceDb {
/// transactions in response. Enables rebooting/rebuilding validators with full safety.
pub struct NonceDecider;
impl NonceDecider {
pub fn handle_batch(txn: &mut impl DbTxn, genesis: [u8; 32], batch: [u8; 5]) -> u32 {
pub fn handle_substrate_signable(
txn: &mut impl DbTxn,
genesis: [u8; 32],
id: SubstrateSignableId,
) -> u32 {
let nonce_for = NextNonceDb::allocate_nonce(txn, genesis);
ItemNonceDb::set(txn, genesis, BATCH_CODE, &batch, &nonce_for);
ItemNonceDb::set(txn, genesis, SUBSTRATE_CODE, &id.encode(), &nonce_for);
nonce_for
}
@@ -53,12 +59,16 @@ impl NonceDecider {
// TODO: The processor won't yield shares for this if the signing protocol aborts. We need to
// detect when we're expecting shares for an aborted protocol and insert a dummy transaction
// there.
pub fn selected_for_signing_batch(txn: &mut impl DbTxn, genesis: [u8; 32], batch: [u8; 5]) {
pub fn selected_for_signing_substrate(
txn: &mut impl DbTxn,
genesis: [u8; 32],
id: SubstrateSignableId,
) {
let nonce_for = NextNonceDb::allocate_nonce(txn, genesis);
ItemNonceDb::set(txn, genesis, BATCH_SIGNING_CODE, &batch, &nonce_for);
ItemNonceDb::set(txn, genesis, SUBSTRATE_SIGNING_CODE, &id.encode(), &nonce_for);
}
// TODO: Same TODO as selected_for_signing_batch
// TODO: Same TODO as selected_for_signing_substrate
pub fn selected_for_signing_plan(txn: &mut impl DbTxn, genesis: [u8; 32], plan: [u8; 32]) {
let nonce_for = NextNonceDb::allocate_nonce(txn, genesis);
ItemNonceDb::set(txn, genesis, PLAN_SIGNING_CODE, &plan, &nonce_for);
@@ -86,23 +96,26 @@ impl NonceDecider {
assert_eq!(*attempt, 0);
Some(Some(2))
}
Transaction::CosignSubstrateBlock(_) => None,
Transaction::Batch(_, _) => None,
Transaction::SubstrateBlock(_) => None,
Transaction::BatchPreprocess(data) => {
Transaction::SubstratePreprocess(data) => {
assert_eq!(data.attempt, 0);
Some(ItemNonceDb::get(getter, genesis, BATCH_CODE, &data.plan))
Some(ItemNonceDb::get(getter, genesis, SUBSTRATE_CODE, &data.plan.encode()))
}
Transaction::BatchShare(data) => {
Transaction::SubstrateShare(data) => {
assert_eq!(data.attempt, 0);
Some(ItemNonceDb::get(getter, genesis, BATCH_SIGNING_CODE, &data.plan))
Some(ItemNonceDb::get(getter, genesis, SUBSTRATE_SIGNING_CODE, &data.plan.encode()))
}
Transaction::SignPreprocess(data) => {
assert_eq!(data.attempt, 0);
Some(ItemNonceDb::get(getter, genesis, PLAN_CODE, &data.plan))
Some(ItemNonceDb::get(getter, genesis, PLAN_CODE, &data.plan.encode()))
}
Transaction::SignShare(data) => {
assert_eq!(data.attempt, 0);
Some(ItemNonceDb::get(getter, genesis, PLAN_SIGNING_CODE, &data.plan))
Some(ItemNonceDb::get(getter, genesis, PLAN_SIGNING_CODE, &data.plan.encode()))
}
Transaction::SignCompleted { .. } => None,
}