Update processor message flow around the new SignedBatch flow

This commit is contained in:
Luke Parker
2023-04-10 02:51:15 -04:00
parent ccec529cee
commit b9f38fb354
10 changed files with 213 additions and 98 deletions

View File

@@ -37,7 +37,7 @@ use bitcoin_serai::bitcoin::{
};
use serai_client::{
primitives::{MAX_DATA_LEN, BITCOIN, Amount, Balance},
primitives::{MAX_DATA_LEN, BITCOIN, BITCOIN_NET_ID, NetworkId, Amount, Balance},
coins::bitcoin::Address,
};
@@ -286,6 +286,7 @@ impl Coin for Bitcoin {
type Address = Address;
const NETWORK: NetworkId = BITCOIN_NET_ID;
const ID: &'static str = "Bitcoin";
const CONFIRMATIONS: usize = 3;

View File

@@ -10,7 +10,7 @@ use frost::{
sign::PreprocessMachine,
};
use serai_client::primitives::Balance;
use serai_client::primitives::{NetworkId, Balance};
#[cfg(feature = "bitcoin")]
pub mod bitcoin;
@@ -157,6 +157,7 @@ impl<E: Eventuality> Default for EventualitiesTracker<E> {
}
pub trait Block<C: Coin>: Send + Sync + Sized + Clone + Debug {
// This is currently bounded to being 32-bytes.
type Id: 'static + Id;
fn id(&self) -> Self::Id;
fn median_fee(&self) -> C::Fee;
@@ -249,6 +250,8 @@ pub trait Coin: 'static + Send + Sync + Clone + PartialEq + Eq + Debug {
+ TryInto<Vec<u8>>
+ TryFrom<Vec<u8>>;
/// Network ID for this coin.
const NETWORK: NetworkId;
/// String ID for this coin.
const ID: &'static str;
/// The amount of confirmations required to consider a block 'final'.

View File

@@ -26,7 +26,7 @@ use monero_serai::{
use tokio::time::sleep;
pub use serai_client::{
primitives::{MAX_DATA_LEN, MONERO, Amount, Balance},
primitives::{MAX_DATA_LEN, MONERO, MONERO_NET_ID, NetworkId, Amount, Balance},
coins::monero::Address,
};
@@ -217,6 +217,7 @@ impl Coin for Monero {
type Address = Address;
const NETWORK: NetworkId = MONERO_NET_ID;
const ID: &'static str = "Monero";
const CONFIRMATIONS: usize = 10;

View File

@@ -15,7 +15,7 @@ use frost::{
use log::info;
use serai_client::validator_sets::primitives::ValidatorSet;
use serai_client::{primitives::BlockHash, validator_sets::primitives::ValidatorSet};
use messages::key_gen::*;
use crate::{DbTxn, Db, coins::Coin};
@@ -23,7 +23,7 @@ use crate::{DbTxn, Db, coins::Coin};
#[derive(Debug)]
pub enum KeyGenEvent<C: Ciphersuite> {
KeyConfirmed {
activation_number: usize,
activation_block: BlockHash,
substrate_keys: ThresholdKeys<Ristretto>,
coin_keys: ThresholdKeys<C>,
},
@@ -374,7 +374,7 @@ impl<C: Coin, D: Db> KeyGen<C, D> {
);
KeyGenEvent::KeyConfirmed {
activation_number: context.coin_latest_block_number.try_into().unwrap(),
activation_block: context.coin_latest_finalized_block,
substrate_keys,
coin_keys,
}

View File

@@ -19,12 +19,14 @@ use tokio::time::sleep;
use scale::Decode;
use serai_client::{
primitives::MAX_DATA_LEN,
primitives::{MAX_DATA_LEN, BlockHash},
tokens::primitives::{OutInstruction, OutInstructionWithBalance},
in_instructions::primitives::{Shorthand, RefundableInInstruction, InInstructionWithBalance},
in_instructions::primitives::{
Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch, SignedBatch,
},
};
use messages::{SubstrateContext, sign, substrate, CoordinatorMessage, ProcessorMessage};
use messages::{SubstrateContext, CoordinatorMessage, ProcessorMessage};
mod plan;
pub use plan::*;
@@ -135,8 +137,15 @@ async fn sign_plans<C: Coin, D: Db>(
plans: Vec<Plan<C>>,
) {
let mut plans = VecDeque::from(plans);
let start = SystemTime::UNIX_EPOCH.checked_add(Duration::from_secs(context.time)).unwrap();
let block_number = context.coin_latest_block_number.try_into().unwrap();
let mut block_hash = <C::Block as Block<C>>::Id::default();
block_hash.as_mut().copy_from_slice(&context.coin_latest_finalized_block.0);
let block_number = scanner
.block_number(&block_hash)
.await
.expect("told to sign_plans on a context we're not synced to");
let fee = get_fee(coin, block_number).await;
@@ -145,7 +154,7 @@ async fn sign_plans<C: Coin, D: Db>(
info!("preparing plan {}: {:?}", hex::encode(id), plan);
let key = plan.key.to_bytes();
db.save_signing(key.as_ref(), context.coin_latest_block_number, context.time, &plan);
db.save_signing(key.as_ref(), block_number.try_into().unwrap(), context.time, &plan);
let (tx, branches) = prepare_send(coin, &signers[key.as_ref()], block_number, fee, plan).await;
// TODO: If we reboot mid-sign_plans, for a DB-backed scheduler, these may be partially
@@ -253,92 +262,76 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
// If this message expects a higher block number than we have, halt until synced
async fn wait<C: Coin, D: Db>(
coin: &C,
scanner: &ScannerHandle<C, D>,
context: &SubstrateContext
block_hash: &BlockHash
) {
let needed = usize::try_from(context.coin_latest_block_number).unwrap();
let mut needed_hash = <C::Block as Block<C>>::Id::default();
needed_hash.as_mut().copy_from_slice(&block_hash.0);
let block_number;
loop {
let Ok(actual) = coin.get_latest_block_number().await else {
error!("couldn't get the latest block number");
// Sleep for a minute as node errors should be incredibly uncommon yet take multiple
// seconds to resolve
sleep(Duration::from_secs(60)).await;
// Ensure our scanner has scanned this block, which means our daemon has this block at
// a sufficient depth
let Some(block_number_inner) = scanner.block_number(&needed_hash).await else {
warn!(
"node is desynced. we haven't scanned {} which should happen after {} confirms",
hex::encode(&needed_hash),
C::CONFIRMATIONS,
);
sleep(Duration::from_secs(10)).await;
continue;
};
// Check our daemon has this block
// CONFIRMATIONS - 1 since any block's TXs have one confirmation (the block itself)
let confirmed = actual.saturating_sub(C::CONFIRMATIONS - 1);
if needed > confirmed {
// This may occur within some natural latency window
warn!(
"node is desynced. need block {}, have {}",
// Print the block needed for the needed block to be confirmed
needed + (C::CONFIRMATIONS - 1),
actual,
);
// Sleep for one second per needed block
// If the node is disconnected from the network, this will be faster than it should
// be, yet presumably it just neeeds a moment to sync up
sleep(Duration::from_secs((needed - confirmed).try_into().unwrap())).await;
}
// Check our scanner has scanned it
// This check does void the need for the last one, yet it provides a bit better
// debugging
let ram_scanned = scanner.ram_scanned().await;
if ram_scanned < needed {
warn!("scanner is behind. need block {}, scanned up to {}", needed, ram_scanned);
sleep(Duration::from_secs((needed - ram_scanned).try_into().unwrap())).await;
}
// TODO: Sanity check we got an AckBlock (or this is the AckBlock) for the block in
// question
/*
let synced = |context: &SubstrateContext, key| -> Result<(), ()> {
// Check that we've synced this block and can actually operate on it ourselves
let latest = scanner.latest_scanned(key);
if usize::try_from(context.coin_latest_block_number).unwrap() < latest {
log::warn!(
"coin node disconnected/desynced from rest of the network. \
our block: {latest:?}, network's acknowledged: {}",
context.coin_latest_block_number
);
Err(())?;
}
Ok(())
};
*/
block_number = block_number_inner;
break;
}
// While the scanner has cemented this block, that doesn't mean it's been scanned for all
// keys
// ram_scanned will return the lowest scanned block number out of all keys
while scanner.ram_scanned().await < block_number {
sleep(Duration::from_secs(1)).await;
}
// TODO: Sanity check we got an AckBlock (or this is the AckBlock) for the block in
// question
/*
let synced = |context: &SubstrateContext, key| -> Result<(), ()> {
// Check that we've synced this block and can actually operate on it ourselves
let latest = scanner.latest_scanned(key);
if usize::try_from(context.coin_latest_block_number).unwrap() < latest {
log::warn!(
"coin node disconnected/desynced from rest of the network. \
our block: {latest:?}, network's acknowledged: {}",
context.coin_latest_block_number
);
Err(())?;
}
Ok(())
};
*/
}
match &msg.msg {
CoordinatorMessage::KeyGen(_) => {},
CoordinatorMessage::Sign(_) => {},
CoordinatorMessage::Substrate(msg) => {
match msg {
substrate::CoordinatorMessage::BlockAcknowledged { context, .. } => {
wait(&coin, &scanner, context).await;
},
substrate::CoordinatorMessage::Burns { context, .. } => {
wait(&coin, &scanner, context).await;
},
}
},
if let Some(required) = msg.msg.required_block() {
wait(&scanner, &required).await;
}
match msg.msg.clone() {
CoordinatorMessage::KeyGen(msg) => {
match key_gen.handle(msg).await {
// TODO: Handle substrate_keys
KeyGenEvent::KeyConfirmed { activation_number, substrate_keys: _, coin_keys } => {
KeyGenEvent::KeyConfirmed { activation_block, substrate_keys: _, coin_keys } => {
let keys = coin_keys;
let key = keys.group_key();
let mut activation_block_hash = <C::Block as Block<C>>::Id::default();
activation_block_hash.as_mut().copy_from_slice(&activation_block.0);
let activation_number =
scanner
.block_number(&activation_block_hash)
.await
.expect("KeyConfirmed from context we haven't synced");
scanner.rotate_key(activation_number, key).await;
schedulers.insert(key.to_bytes().as_ref().to_vec(), Scheduler::<C>::new(key));
signers.insert(
@@ -358,13 +351,19 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
signers[msg.key()].handle(msg).await;
}
CoordinatorMessage::Coordinator(_) => todo!(),
CoordinatorMessage::Substrate(msg) => {
match msg {
substrate::CoordinatorMessage::BlockAcknowledged { context, key: key_vec, block } => {
messages::substrate::CoordinatorMessage::BlockAcknowledged {
context,
key: key_vec,
block
} => {
let key =
<C::Curve as Ciphersuite>::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap();
let mut block_id = <C::Block as Block<C>>::Id::default();
block_id.as_mut().copy_from_slice(&block);
block_id.as_mut().copy_from_slice(&block.0);
let plans = schedulers
.get_mut(&key_vec)
@@ -381,7 +380,7 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
).await;
}
substrate::CoordinatorMessage::Burns { context, burns } => {
messages::substrate::CoordinatorMessage::Burns { context, burns } => {
// TODO2: Rewrite rotation documentation
let schedule_key = active_keys.last().expect("burn event despite no keys");
let scheduler = schedulers.get_mut(schedule_key.to_bytes().as_ref()).unwrap();
@@ -424,9 +423,18 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
// TODO
match msg.unwrap() {
ScannerEvent::Outputs(key, block, outputs) => {
coordinator.send(ProcessorMessage::Substrate(substrate::ProcessorMessage::Update {
key: key.to_bytes().as_ref().to_vec(),
block: block.as_ref().to_vec(),
let key = key.to_bytes().as_ref().to_vec();
let mut block_hash = [0; 32];
block_hash.copy_from_slice(block.as_ref());
// TODO
let id = 0;
let batch = Batch {
network: C::NETWORK,
id,
block: BlockHash(block_hash),
instructions: outputs.iter().filter_map(|output| {
// If these aren't externally received funds, don't handle it as an instruction
if output.kind() != OutputType::External {
@@ -451,8 +459,18 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
instruction: instruction.instruction,
balance: output.balance(),
})
}).collect(),
})).await;
}).collect()
};
coordinator.send(ProcessorMessage::Substrate(
messages::substrate::ProcessorMessage::Update {
key,
batch: SignedBatch {
batch,
signature: sp_application_crypto::sr25519::Signature([0; 64]),
},
}
)).await;
},
}
},
@@ -462,7 +480,7 @@ async fn run<C: Coin, D: Db, Co: Coordinator>(raw_db: D, coin: C, mut coordinato
SignerEvent::SignedTransaction { id, tx } => {
main_db.finish_signing(&key, id);
coordinator
.send(ProcessorMessage::Sign(sign::ProcessorMessage::Completed {
.send(ProcessorMessage::Sign(messages::sign::ProcessorMessage::Completed {
key,
id,
tx: tx.as_ref().to_vec()

View File

@@ -232,6 +232,10 @@ impl<C: Coin, D: Db> ScannerHandle<C, D> {
scanner.keys.push(key);
}
pub async fn block_number(&self, id: &<C::Block as Block<C>>::Id) -> Option<usize> {
self.scanner.read().await.db.block_number(id)
}
/// Acknowledge having handled a block for a key.
pub async fn ack_block(
&self,
@@ -341,7 +345,7 @@ impl<C: Coin, D: Db> Scanner<C, D> {
if let Some(id) = scanner.db.block(i) {
// TODO2: Also check this block builds off the previous block
if id != block.id() {
if id != block_id {
panic!("{} reorg'd from {id:?} to {:?}", C::ID, hex::encode(block_id));
}
} else {

View File

@@ -8,7 +8,7 @@ use group::GroupEncoding;
use frost::{Participant, ThresholdParams, tests::clone_without};
use serai_client::{
primitives::MONERO_NET_ID,
primitives::{MONERO_NET_ID, BlockHash},
validator_sets::primitives::{Session, ValidatorSet},
};
@@ -119,14 +119,14 @@ pub async fn test_key_gen<C: Coin>() {
for i in 1 ..= 5 {
let key_gen = key_gens.get_mut(&i).unwrap();
if let KeyGenEvent::KeyConfirmed { activation_number, substrate_keys, coin_keys } = key_gen
if let KeyGenEvent::KeyConfirmed { activation_block, substrate_keys, coin_keys } = key_gen
.handle(CoordinatorMessage::ConfirmKeyPair {
context: SubstrateContext { time: 0, coin_latest_block_number: 111 },
context: SubstrateContext { time: 0, coin_latest_finalized_block: BlockHash([0x11; 32]) },
id: ID,
})
.await
{
assert_eq!(activation_number, 111);
assert_eq!(activation_block, BlockHash([0x11; 32]));
let params =
ThresholdParams::new(3, 5, Participant::new(u16::try_from(i).unwrap()).unwrap()).unwrap();
assert_eq!(substrate_keys.params(), params);