Improve batch handling (#316)

* restrict batch size to ~25kb

* add batch size check to node

* rate limit batches to 1 per serai block

* add support for multiple batches for block

* fix review comments

* Misc fixes

Doesn't yet update tests/processor until data flow is inspected.

* Move the block from SignId to ProcessorMessage::BatchPreprocesses

* Misc clean up

---------

Co-authored-by: Luke Parker <lukeparker5132@gmail.com>
This commit is contained in:
akildemir
2023-08-14 18:57:38 +03:00
committed by GitHub
parent a3441a6871
commit e680eabb62
17 changed files with 234 additions and 155 deletions

View File

@@ -12,13 +12,13 @@ use frost::{curve::Ciphersuite, ThresholdKeys};
use log::{info, warn, error};
use tokio::time::sleep;
use scale::Decode;
use scale::{Encode, Decode};
use serai_client::{
primitives::{MAX_DATA_LEN, BlockHash, NetworkId},
tokens::primitives::{OutInstruction, OutInstructionWithBalance},
in_instructions::primitives::{
Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch,
Shorthand, RefundableInInstruction, InInstructionWithBalance, Batch, MAX_BATCH_SIZE,
},
};
@@ -396,6 +396,7 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
block,
key: key_vec,
burns,
batches,
} => {
assert_eq!(network_id, N::NETWORK, "coordinator sent us data for another network");
@@ -405,12 +406,11 @@ async fn handle_coordinator_msg<D: Db, N: Network, Co: Coordinator>(
let key = <N::Curve as Ciphersuite>::read_G::<&[u8]>(&mut key_vec.as_ref()).unwrap();
// We now have to acknowledge every block for this key up to the acknowledged block
let (blocks, outputs) =
substrate_mutable.scanner.ack_up_to_block(txn, key, block_id).await;
let outputs = substrate_mutable.scanner.ack_up_to_block(txn, key, block_id).await;
// Since this block was acknowledged, we no longer have to sign the batch for it
for block in blocks {
for batch_id in batches {
for (_, signer) in tributary_mutable.substrate_signers.iter_mut() {
signer.batch_signed(txn, block);
signer.batch_signed(txn, batch_id);
}
}
@@ -665,51 +665,84 @@ async fn run<N: Network, D: Db, Co: Coordinator>(mut raw_db: D, network: N, mut
let mut txn = raw_db.txn();
match msg.unwrap() {
ScannerEvent::Block { key, block, batch, outputs } => {
ScannerEvent::Block { key, block, outputs } => {
let mut block_hash = [0; 32];
block_hash.copy_from_slice(block.as_ref());
// TODO: Move this out from Scanner now that the Scanner no longer handles batches
let mut batch_id = substrate_mutable.scanner.next_batch_id(&txn);
let batch = Batch {
// start with empty batch
let mut batches = vec![Batch {
network: N::NETWORK,
id: batch,
id: batch_id,
block: BlockHash(block_hash),
instructions: outputs.iter().filter_map(|output| {
// If these aren't externally received funds, don't handle it as an instruction
if output.kind() != OutputType::External {
return None;
}
instructions: vec![],
}];
for output in outputs {
// If these aren't externally received funds, don't handle it as an instruction
if output.kind() != OutputType::External {
continue;
}
let mut data = output.data();
let max_data_len = MAX_DATA_LEN.try_into().unwrap();
if data.len() > max_data_len {
error!(
"data in output {} exceeded MAX_DATA_LEN ({MAX_DATA_LEN}): {}",
hex::encode(output.id()),
data.len(),
);
data = &data[.. max_data_len];
}
let mut data = output.data();
let max_data_len = usize::try_from(MAX_DATA_LEN).unwrap();
// TODO: Should we drop this, instead of truncating?
// A truncating message likely doesn't have value yet has increased data load and is
// corrupt vs a NOP. The former seems more likely to cause problems
if data.len() > max_data_len {
error!(
"data in output {} exceeded MAX_DATA_LEN ({MAX_DATA_LEN}): {}. truncating",
hex::encode(output.id()),
data.len(),
);
data = &data[.. max_data_len];
}
let shorthand = Shorthand::decode(&mut data).ok()?;
let instruction = RefundableInInstruction::try_from(shorthand).ok()?;
// TODO2: Set instruction.origin if not set (and handle refunds in general)
Some(InInstructionWithBalance {
instruction: instruction.instruction,
balance: output.balance(),
})
}).collect()
};
let Ok(shorthand) = Shorthand::decode(&mut data) else { continue };
let Ok(instruction) = RefundableInInstruction::try_from(shorthand) else { continue };
info!("created batch {} ({} instructions)", batch.id, batch.instructions.len());
// TODO2: Set instruction.origin if not set (and handle refunds in general)
let instruction = InInstructionWithBalance {
instruction: instruction.instruction,
balance: output.balance(),
};
let batch = batches.last_mut().unwrap();
batch.instructions.push(instruction);
// check if batch is over-size
if batch.encode().len() > MAX_BATCH_SIZE {
// pop the last instruction so it's back in size
let instruction = batch.instructions.pop().unwrap();
// bump the id for the new batch
batch_id += 1;
// make a new batch with this instruction included
batches.push(Batch {
network: N::NETWORK,
id: batch_id,
block: BlockHash(block_hash),
instructions: vec![instruction],
});
}
}
// Save the next batch ID
substrate_mutable.scanner.set_next_batch_id(&mut txn, batch_id + 1);
// Start signing this batch
// TODO: Don't reload both sets of keys in full just to get the Substrate public key
tributary_mutable
.substrate_signers
.get_mut(tributary_mutable.key_gen.keys(&key).0.group_key().to_bytes().as_slice())
.unwrap()
.sign(&mut txn, batch)
.await;
for batch in batches {
info!("created batch {} ({} instructions)", batch.id, batch.instructions.len());
// TODO: Don't reload both sets of keys in full just to get the Substrate public key
tributary_mutable
.substrate_signers
.get_mut(tributary_mutable.key_gen.keys(&key).0.group_key().to_bytes().as_slice())
.unwrap()
.sign(&mut txn, batch)
.await;
}
},
ScannerEvent::Completed(id, tx) => {

View File

@@ -14,8 +14,6 @@ use tokio::{
time::sleep,
};
use serai_client::primitives::BlockHash;
use crate::{
Get, DbTxn, Db,
networks::{Output, Transaction, EventualitiesTracker, Block, Network},
@@ -27,7 +25,6 @@ pub enum ScannerEvent<N: Network> {
Block {
key: <N::Curve as Ciphersuite>::G,
block: <N::Block as Block<N>>::Id,
batch: u32,
outputs: Vec<N::Output>,
},
// Eventuality completion found on-chain
@@ -115,9 +112,6 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
fn next_batch_key() -> Vec<u8> {
Self::scanner_key(b"next_batch", [])
}
fn batch_key(key: &<N::Curve as Ciphersuite>::G, block: &<N::Block as Block<N>>::Id) -> Vec<u8> {
Self::scanner_key(b"batch", [key.to_bytes().as_ref(), block.as_ref()].concat())
}
fn outputs_key(
key: &<N::Curve as Ciphersuite>::G,
block: &<N::Block as Block<N>>::Id,
@@ -129,12 +123,7 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
key: &<N::Curve as Ciphersuite>::G,
block: &<N::Block as Block<N>>::Id,
outputs: &[N::Output],
) -> u32 {
let batch_key = Self::batch_key(key, block);
if let Some(batch) = txn.get(batch_key) {
return u32::from_le_bytes(batch.try_into().unwrap());
}
) {
let mut bytes = Vec::with_capacity(outputs.len() * 64);
for output in outputs {
output.write(&mut bytes).unwrap();
@@ -150,13 +139,6 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
// 0a, 1a, 2a, 3a, 4a, 5a, 4b, 5b
// when it should be
// 0a, 1a, 2a, 3a, 4a, 4b, 5a, 5b
// Because it's a new set of outputs, allocate a batch ID for it
let next_bytes = txn.get(Self::next_batch_key()).unwrap_or(vec![0; 4]).try_into().unwrap();
let next = u32::from_le_bytes(next_bytes);
txn.put(Self::next_batch_key(), (next + 1).to_le_bytes());
txn.put(Self::batch_key(key, block), next_bytes);
next
}
fn outputs(
txn: &D::Transaction<'_>,
@@ -182,7 +164,7 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
txn: &mut D::Transaction<'_>,
key: &<N::Curve as Ciphersuite>::G,
block: usize,
) -> (Option<<N::Block as Block<N>>::Id>, Vec<N::Output>) {
) -> Vec<N::Output> {
let id = Self::block(txn, block); // It may be None for the first key rotated to
let outputs = if let Some(id) = id.as_ref() {
Self::outputs(txn, key, id).unwrap_or(vec![])
@@ -198,7 +180,7 @@ impl<N: Network, D: Db> ScannerDb<N, D> {
txn.put(Self::scanned_block_key(key), u64::try_from(block).unwrap().to_le_bytes());
// Return this block's outputs so they can be pruned from the RAM cache
(id, outputs)
outputs
}
fn latest_scanned_block<G: Get>(getter: &G, key: <N::Curve as Ciphersuite>::G) -> usize {
let bytes = getter
@@ -280,7 +262,7 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
info!("Rotating scanner to key {} at {activation_number}", hex::encode(key.to_bytes()));
let (_, outputs) = ScannerDb::<N, D>::save_scanned_block(txn, &key, activation_number);
let outputs = ScannerDb::<N, D>::save_scanned_block(txn, &key, activation_number);
scanner.ram_scanned.insert(key.to_bytes().as_ref().to_vec(), activation_number);
assert!(outputs.is_empty());
@@ -295,13 +277,25 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
ScannerDb::<N, D>::block_number(&self.scanner.read().await.db, id)
}
// Set the next batch ID to use
pub fn set_next_batch_id(&self, txn: &mut D::Transaction<'_>, batch: u32) {
txn.put(ScannerDb::<N, D>::next_batch_key(), batch.to_le_bytes());
}
// Get the next batch ID
pub fn next_batch_id(&self, txn: &D::Transaction<'_>) -> u32 {
txn
.get(ScannerDb::<N, D>::next_batch_key())
.map_or(0, |v| u32::from_le_bytes(v.try_into().unwrap()))
}
/// Acknowledge having handled a block for a key.
pub async fn ack_up_to_block(
&mut self,
txn: &mut D::Transaction<'_>,
key: <N::Curve as Ciphersuite>::G,
id: <N::Block as Block<N>>::Id,
) -> (Vec<BlockHash>, Vec<N::Output>) {
) -> Vec<N::Output> {
let mut scanner = self.scanner.write().await;
debug!("Block {} acknowledged", hex::encode(&id));
@@ -311,21 +305,16 @@ impl<N: Network, D: Db> ScannerHandle<N, D> {
// Get the number of the last block we acknowledged
let prior = ScannerDb::<N, D>::latest_scanned_block(txn, key);
let mut blocks = vec![];
let mut outputs = vec![];
for number in (prior + 1) ..= number {
let (block, these_outputs) = ScannerDb::<N, D>::save_scanned_block(txn, &key, number);
let block = BlockHash(block.unwrap().as_ref().try_into().unwrap());
blocks.push(block);
outputs.extend(these_outputs);
outputs.extend(ScannerDb::<N, D>::save_scanned_block(txn, &key, number));
}
assert_eq!(blocks.last().unwrap().as_ref(), id.as_ref());
for output in &outputs {
assert!(scanner.ram_outputs.remove(output.id().as_ref()));
}
(blocks, outputs)
outputs
}
}
@@ -514,11 +503,11 @@ impl<N: Network, D: Db> Scanner<N, D> {
// Save the outputs to disk
let mut txn = scanner.db.txn();
let batch = ScannerDb::<N, D>::save_outputs(&mut txn, &key, &block_id, &outputs);
ScannerDb::<N, D>::save_outputs(&mut txn, &key, &block_id, &outputs);
txn.commit();
// Send all outputs
if !scanner.emit(ScannerEvent::Block { key, block: block_id, batch, outputs }) {
if !scanner.emit(ScannerEvent::Block { key, block: block_id, outputs }) {
return;
}
// Write this number as scanned so we won't re-fire these outputs

View File

@@ -18,10 +18,7 @@ use frost_schnorrkel::Schnorrkel;
use log::{info, debug, warn};
use serai_client::{
primitives::BlockHash,
in_instructions::primitives::{Batch, SignedBatch, batch_message},
};
use serai_client::in_instructions::primitives::{Batch, SignedBatch, batch_message};
use messages::{sign::SignId, coordinator::*};
use crate::{Get, DbTxn, Db};
@@ -149,7 +146,9 @@ impl<D: Db> SubstrateSigner<D> {
}
// Start this attempt
if !self.signable.contains_key(&id) {
let block = if let Some(batch) = self.signable.get(&id) {
batch.block
} else {
warn!("told to attempt signing a batch we aren't currently signing for");
return;
};
@@ -162,7 +161,7 @@ impl<D: Db> SubstrateSigner<D> {
self.attempt.insert(id, attempt);
let id = SignId { key: self.keys.group_key().to_bytes().to_vec(), id, attempt };
info!("signing batch {}, attempt #{}", hex::encode(id.id), id.attempt);
info!("signing batch {} #{}", hex::encode(id.id), id.attempt);
// If we reboot mid-sign, the current design has us abort all signs and wait for latter
// attempts/new signing protocols
@@ -199,19 +198,21 @@ impl<D: Db> SubstrateSigner<D> {
// Broadcast our preprocess
self.events.push_back(SubstrateSignerEvent::ProcessorMessage(
ProcessorMessage::BatchPreprocess { id, preprocess: preprocess.serialize() },
ProcessorMessage::BatchPreprocess { id, block, preprocess: preprocess.serialize() },
));
}
pub async fn sign(&mut self, txn: &mut D::Transaction<'_>, batch: Batch) {
if SubstrateSignerDb::<D>::completed(txn, batch.block.0) {
// Use the batch id as the ID
let mut id = [0u8; 32];
id[.. 4].copy_from_slice(&batch.id.to_le_bytes());
if SubstrateSignerDb::<D>::completed(txn, id) {
debug!("Sign batch order for ID we've already completed signing");
// See batch_signed for commentary on why this simply returns
return;
}
// Use the block hash as the ID
let id = batch.block.0;
self.signable.insert(id, batch);
self.attempt(txn, id, 0).await;
}
@@ -335,14 +336,19 @@ impl<D: Db> SubstrateSigner<D> {
}
}
pub fn batch_signed(&mut self, txn: &mut D::Transaction<'_>, block: BlockHash) {
// Stop trying to sign for this batch
SubstrateSignerDb::<D>::complete(txn, block.0);
pub fn batch_signed(&mut self, txn: &mut D::Transaction<'_>, batch_id: u32) {
// Convert into 32-byte ID
// TODO: Add a BatchSignId so we don't have this inefficiency
let mut id = [0u8; 32];
id[.. 4].copy_from_slice(&batch_id.to_le_bytes());
self.signable.remove(&block.0);
self.attempt.remove(&block.0);
self.preprocessing.remove(&block.0);
self.signing.remove(&block.0);
// Stop trying to sign for this batch
SubstrateSignerDb::<D>::complete(txn, id);
self.signable.remove(&id);
self.attempt.remove(&id);
self.preprocessing.remove(&id);
self.signing.remove(&id);
// This doesn't emit SignedBatch because it doesn't have access to the SignedBatch
// This function is expected to only be called once Substrate acknowledges this block,

View File

@@ -20,7 +20,6 @@ async fn spend<N: Network, D: Db>(
network: &N,
keys: &HashMap<Participant, ThresholdKeys<N::Curve>>,
scanner: &mut ScannerHandle<N, D>,
batch: u32,
outputs: Vec<N::Output>,
) -> Vec<N::Output> {
let key = keys[&Participant::new(1).unwrap()].group_key();
@@ -52,9 +51,8 @@ async fn spend<N: Network, D: Db>(
network.mine_block().await;
}
match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() {
ScannerEvent::Block { key: this_key, block: _, batch: this_batch, outputs } => {
ScannerEvent::Block { key: this_key, block: _, outputs } => {
assert_eq!(this_key, key);
assert_eq!(this_batch, batch);
assert_eq!(outputs.len(), 1);
// Make sure this is actually a change output
assert_eq!(outputs[0].kind(), OutputType::Change);
@@ -91,10 +89,9 @@ pub async fn test_addresses<N: Network>(network: N) {
// Verify the Scanner picked them up
let outputs =
match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() {
ScannerEvent::Block { key: this_key, block, batch, outputs } => {
ScannerEvent::Block { key: this_key, block, outputs } => {
assert_eq!(this_key, key);
assert_eq!(block, block_id);
assert_eq!(batch, 0);
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].kind(), OutputType::Branch);
outputs
@@ -105,7 +102,7 @@ pub async fn test_addresses<N: Network>(network: N) {
};
// Spend the branch output, creating a change output and ensuring we actually get change
let outputs = spend(&network, &keys, &mut scanner, 1, outputs).await;
let outputs = spend(&network, &keys, &mut scanner, outputs).await;
// Also test spending the change output
spend(&network, &keys, &mut scanner, 2, outputs).await;
spend(&network, &keys, &mut scanner, outputs).await;
}

View File

@@ -55,10 +55,9 @@ pub async fn test_scanner<N: Network>(network: N) {
let verify_event = |mut scanner: ScannerHandle<N, MemDb>| async {
let outputs =
match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() {
ScannerEvent::Block { key, block, batch, outputs } => {
ScannerEvent::Block { key, block, outputs } => {
assert_eq!(key, keys.group_key());
assert_eq!(block, block_id);
assert_eq!(batch, 0);
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].kind(), OutputType::External);
outputs
@@ -90,10 +89,7 @@ pub async fn test_scanner<N: Network>(network: N) {
let mut cloned_db = db.clone();
let mut txn = cloned_db.txn();
assert_eq!(
scanner.ack_up_to_block(&mut txn, keys.group_key(), block_id).await,
(blocks, outputs)
);
assert_eq!(scanner.ack_up_to_block(&mut txn, keys.group_key(), block_id).await, outputs);
txn.commit();
// There should be no more events

View File

@@ -24,13 +24,16 @@ async fn test_substrate_signer() {
let participant_one = Participant::new(1).unwrap();
let id: u32 = 5;
let mut id_arr = [0u8; 32];
id_arr[.. 4].copy_from_slice(&id.to_le_bytes());
let block = BlockHash([0xaa; 32]);
let actual_id =
SignId { key: keys[&participant_one].group_key().to_bytes().to_vec(), id: block.0, attempt: 0 };
SignId { key: keys[&participant_one].group_key().to_bytes().to_vec(), id: id_arr, attempt: 0 };
let batch = Batch {
network: NetworkId::Monero,
id: 5,
id,
block,
instructions: vec![
InInstructionWithBalance {
@@ -81,10 +84,12 @@ async fn test_substrate_signer() {
let i = Participant::new(u16::try_from(i).unwrap()).unwrap();
if let SubstrateSignerEvent::ProcessorMessage(ProcessorMessage::BatchPreprocess {
id,
block: batch_block,
preprocess,
}) = signers.get_mut(&i).unwrap().events.pop_front().unwrap()
{
assert_eq!(id, actual_id);
assert_eq!(batch_block, block);
if signing_set.contains(&i) {
preprocesses.insert(i, preprocess);
}

View File

@@ -36,10 +36,9 @@ pub async fn test_wallet<N: Network>(network: N) {
let block_id = block.id();
match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() {
ScannerEvent::Block { key: this_key, block, batch, outputs } => {
ScannerEvent::Block { key: this_key, block, outputs } => {
assert_eq!(this_key, key);
assert_eq!(block, block_id);
assert_eq!(batch, 0);
assert_eq!(outputs.len(), 1);
(block_id, outputs)
}
@@ -110,10 +109,9 @@ pub async fn test_wallet<N: Network>(network: N) {
}
match timeout(Duration::from_secs(30), scanner.events.recv()).await.unwrap().unwrap() {
ScannerEvent::Block { key: this_key, block: block_id, batch, outputs: these_outputs } => {
ScannerEvent::Block { key: this_key, block: block_id, outputs: these_outputs } => {
assert_eq!(this_key, key);
assert_eq!(block_id, block.id());
assert_eq!(batch, 1);
assert_eq!(these_outputs, outputs);
}
ScannerEvent::Completed(_, _) => {
@@ -124,7 +122,7 @@ pub async fn test_wallet<N: Network>(network: N) {
// Check the Scanner DB can reload the outputs
let mut txn = db.txn();
assert_eq!(
scanner.ack_up_to_block(&mut txn, key, block.id()).await.1,
scanner.ack_up_to_block(&mut txn, key, block.id()).await,
[first_outputs, outputs].concat().to_vec()
);
txn.commit();