mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-08 12:19:24 +00:00
Verify all Batchs published by the prior set
The new set publishing a `Batch` completes the handover protocol. The new set should only publish a `Batch` once it believes the old set has completed all of its on-external-chain activity, marking it honest and finite. With the handover comes the acceptance of liability, hence the requirement for all of the on-Serai-chain activity also needing verification. While most activity would be verified in-real-time (upon ::Batch messages), the new set will now explicitly verify the complete set of `Batch`s before beginning its preprocess for its own `Batch` (the one accepting the handover).
This commit is contained in:
@@ -8,6 +8,7 @@ use blake2::{
|
|||||||
use scale::{Encode, Decode};
|
use scale::{Encode, Decode};
|
||||||
use serai_client::{
|
use serai_client::{
|
||||||
primitives::NetworkId,
|
primitives::NetworkId,
|
||||||
|
validator_sets::primitives::ValidatorSet,
|
||||||
in_instructions::primitives::{Batch, SignedBatch},
|
in_instructions::primitives::{Batch, SignedBatch},
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -95,15 +96,24 @@ impl<D: Db> MainDb<D> {
|
|||||||
getter.get(Self::first_preprocess_key(network, id))
|
getter.get(Self::first_preprocess_key(network, id))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn last_received_batch_key(network: NetworkId) -> Vec<u8> {
|
||||||
|
Self::main_key(b"last_received_batch", network.encode())
|
||||||
|
}
|
||||||
fn expected_batch_key(network: NetworkId, id: u32) -> Vec<u8> {
|
fn expected_batch_key(network: NetworkId, id: u32) -> Vec<u8> {
|
||||||
Self::main_key(b"expected_batch", (network, id).encode())
|
Self::main_key(b"expected_batch", (network, id).encode())
|
||||||
}
|
}
|
||||||
pub fn save_expected_batch(txn: &mut D::Transaction<'_>, batch: &Batch) {
|
pub fn save_expected_batch(txn: &mut D::Transaction<'_>, batch: &Batch) {
|
||||||
|
txn.put(Self::last_received_batch_key(batch.network), batch.id.to_le_bytes());
|
||||||
txn.put(
|
txn.put(
|
||||||
Self::expected_batch_key(batch.network, batch.id),
|
Self::expected_batch_key(batch.network, batch.id),
|
||||||
Blake2b::<U32>::digest(batch.instructions.encode()),
|
Blake2b::<U32>::digest(batch.instructions.encode()),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
pub fn last_received_batch<G: Get>(getter: &G, network: NetworkId) -> Option<u32> {
|
||||||
|
getter
|
||||||
|
.get(Self::last_received_batch_key(network))
|
||||||
|
.map(|id| u32::from_le_bytes(id.try_into().unwrap()))
|
||||||
|
}
|
||||||
pub fn expected_batch<G: Get>(getter: &G, network: NetworkId, id: u32) -> Option<[u8; 32]> {
|
pub fn expected_batch<G: Get>(getter: &G, network: NetworkId, id: u32) -> Option<[u8; 32]> {
|
||||||
getter.get(Self::expected_batch_key(network, id)).map(|batch| batch.try_into().unwrap())
|
getter.get(Self::expected_batch_key(network, id)).map(|batch| batch.try_into().unwrap())
|
||||||
}
|
}
|
||||||
@@ -131,4 +141,14 @@ impl<D: Db> MainDb<D> {
|
|||||||
.get(Self::last_verified_batch_key(network))
|
.get(Self::last_verified_batch_key(network))
|
||||||
.map(|id| u32::from_le_bytes(id.try_into().unwrap()))
|
.map(|id| u32::from_le_bytes(id.try_into().unwrap()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn did_handover_key(set: ValidatorSet) -> Vec<u8> {
|
||||||
|
Self::main_key(b"did_handover", set.encode())
|
||||||
|
}
|
||||||
|
pub fn set_did_handover(txn: &mut D::Transaction<'_>, set: ValidatorSet) {
|
||||||
|
txn.put(Self::did_handover_key(set), []);
|
||||||
|
}
|
||||||
|
pub fn did_handover<G: Get>(getter: &G, set: ValidatorSet) -> bool {
|
||||||
|
getter.get(Self::did_handover_key(set)).is_some()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -283,7 +283,7 @@ pub(crate) async fn scan_tributaries<
|
|||||||
"couldn't connect to Serai node to publish set_keys TX: {:?}",
|
"couldn't connect to Serai node to publish set_keys TX: {:?}",
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
tokio::time::sleep(Duration::from_secs(10)).await;
|
sleep(Duration::from_secs(10)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -540,6 +540,38 @@ async fn publish_signed_transaction<D: Db, P: P2p>(
|
|||||||
txn.commit();
|
txn.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Verifies `Batch`s which have already been indexed from Substrate.
|
||||||
|
async fn verify_published_batches<D: Db>(
|
||||||
|
txn: &mut D::Transaction<'_>,
|
||||||
|
network: NetworkId,
|
||||||
|
optimistic_up_to: u32,
|
||||||
|
) -> Option<u32> {
|
||||||
|
let last = MainDb::<D>::last_verified_batch(txn, network);
|
||||||
|
for id in last.map(|last| last + 1).unwrap_or(0) ..= optimistic_up_to {
|
||||||
|
let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(txn, network, id) else {
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
let off_chain = MainDb::<D>::expected_batch(txn, network, id).unwrap();
|
||||||
|
if on_chain != off_chain {
|
||||||
|
// Halt operations on this network and spin, as this is a critical fault
|
||||||
|
loop {
|
||||||
|
log::error!(
|
||||||
|
"{}! network: {:?} id: {} off-chain: {} on-chain: {}",
|
||||||
|
"on-chain batch doesn't match off-chain",
|
||||||
|
network,
|
||||||
|
id,
|
||||||
|
hex::encode(off_chain),
|
||||||
|
hex::encode(on_chain),
|
||||||
|
);
|
||||||
|
sleep(Duration::from_secs(60)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MainDb::<D>::save_last_verified_batch(txn, network, id);
|
||||||
|
}
|
||||||
|
|
||||||
|
MainDb::<D>::last_verified_batch(txn, network)
|
||||||
|
}
|
||||||
|
|
||||||
async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
|
async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
|
||||||
mut db: D,
|
mut db: D,
|
||||||
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
||||||
@@ -569,8 +601,6 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
|
|||||||
// TODO: Check this ID is sane (last handled ID or expected next ID)
|
// TODO: Check this ID is sane (last handled ID or expected next ID)
|
||||||
let msg = processors.recv(network).await;
|
let msg = processors.recv(network).await;
|
||||||
|
|
||||||
// TODO: We need to verify the Batches published to Substrate
|
|
||||||
|
|
||||||
if !MainDb::<D>::handled_message(&db, msg.network, msg.id) {
|
if !MainDb::<D>::handled_message(&db, msg.network, msg.id) {
|
||||||
let mut txn = db.txn();
|
let mut txn = db.txn();
|
||||||
|
|
||||||
@@ -648,38 +678,11 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
|
|||||||
#[allow(clippy::let_unit_value, unused_variables)]
|
#[allow(clippy::let_unit_value, unused_variables)]
|
||||||
let batch = ();
|
let batch = ();
|
||||||
|
|
||||||
// Verify all `Batch`s which we've already indexed from Substrate
|
// This won't be complete, as this call is when a `Batch` message is received, which
|
||||||
// This won't be complete, as it only runs when a `Batch` message is received, which
|
// will be before we get a `SignedBatch`
|
||||||
// will be before we get a `SignedBatch`. It is, however, incremental. We can use a
|
// It is, however, incremental
|
||||||
// complete version to finish the last section when we need a complete version.
|
// When we need a complete version, we use another call, continuously called as-needed
|
||||||
let last = MainDb::<D>::last_verified_batch(&txn, msg.network);
|
verify_published_batches::<D>(&mut txn, msg.network, this_batch_id).await;
|
||||||
// This variable exists so Rust can verify Send/Sync properties
|
|
||||||
let mut faulty = None;
|
|
||||||
for id in last.map(|last| last + 1).unwrap_or(0) ..= this_batch_id {
|
|
||||||
if let Some(on_chain) = SubstrateDb::<D>::batch_instructions_hash(&txn, network, id) {
|
|
||||||
let off_chain = MainDb::<D>::expected_batch(&txn, network, id).unwrap();
|
|
||||||
if on_chain != off_chain {
|
|
||||||
faulty = Some((id, off_chain, on_chain));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
MainDb::<D>::save_last_verified_batch(&mut txn, msg.network, id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some((id, off_chain, on_chain)) = faulty {
|
|
||||||
// Halt operations on this network and spin, as this is a critical fault
|
|
||||||
loop {
|
|
||||||
log::error!(
|
|
||||||
"{}! network: {:?} id: {} off-chain: {} on-chain: {}",
|
|
||||||
"on-chain batch doesn't match off-chain",
|
|
||||||
network,
|
|
||||||
id,
|
|
||||||
hex::encode(off_chain),
|
|
||||||
hex::encode(on_chain),
|
|
||||||
);
|
|
||||||
sleep(Duration::from_secs(60)).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
@@ -705,7 +708,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
|
|||||||
"{} {network:?}",
|
"{} {network:?}",
|
||||||
"couldn't connect to Serai node to get the next batch ID for",
|
"couldn't connect to Serai node to get the next batch ID for",
|
||||||
);
|
);
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
sleep(Duration::from_secs(5)).await;
|
||||||
}
|
}
|
||||||
first = false;
|
first = false;
|
||||||
|
|
||||||
@@ -762,7 +765,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
|
|||||||
// If we failed to publish it, restore it
|
// If we failed to publish it, restore it
|
||||||
batches.push_front(batch);
|
batches.push_front(batch);
|
||||||
// Sleep for a few seconds before retrying to prevent hammering the node
|
// Sleep for a few seconds before retrying to prevent hammering the node
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
sleep(Duration::from_secs(5)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -895,13 +898,32 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
|
|||||||
id.attempt,
|
id.attempt,
|
||||||
hex::encode(block),
|
hex::encode(block),
|
||||||
);
|
);
|
||||||
// If this is the first attempt instance, wait until we synchronize around
|
|
||||||
// the batch first
|
// If this is the first attempt instance, wait until we synchronize around the batch
|
||||||
|
// first
|
||||||
if id.attempt == 0 {
|
if id.attempt == 0 {
|
||||||
MainDb::<D>::save_first_preprocess(&mut txn, spec.set().network, id.id, preprocess);
|
MainDb::<D>::save_first_preprocess(&mut txn, spec.set().network, id.id, preprocess);
|
||||||
|
|
||||||
// TODO: If this is the new key's first Batch, only create this TX once we verify
|
// If this is the new key's first Batch, only create this TX once we verify all
|
||||||
// all prior published `Batch`s
|
// all prior published `Batch`s
|
||||||
|
if (spec.set().session.0 != 0) && (!MainDb::<D>::did_handover(&txn, spec.set())) {
|
||||||
|
let last_received = MainDb::<D>::last_received_batch(&txn, msg.network);
|
||||||
|
if let Some(last_received) = last_received {
|
||||||
|
// Decrease by 1, to get the ID of the Batch prior to this Batch
|
||||||
|
let prior_sets_last_batch = last_received - 1;
|
||||||
|
loop {
|
||||||
|
let successfully_verified =
|
||||||
|
verify_published_batches::<D>(&mut txn, msg.network, prior_sets_last_batch)
|
||||||
|
.await;
|
||||||
|
if successfully_verified == Some(prior_sets_last_batch) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
sleep(Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
MainDb::<D>::set_did_handover(&mut txn, spec.set());
|
||||||
|
}
|
||||||
|
|
||||||
Some(Transaction::Batch(block.0, id.id))
|
Some(Transaction::Batch(block.0, id.id))
|
||||||
} else {
|
} else {
|
||||||
Some(Transaction::BatchPreprocess(SignData {
|
Some(Transaction::BatchPreprocess(SignData {
|
||||||
|
|||||||
@@ -54,7 +54,7 @@ pub fn new_spec<R: RngCore + CryptoRng>(
|
|||||||
|
|
||||||
let set_participants = keys
|
let set_participants = keys
|
||||||
.iter()
|
.iter()
|
||||||
.map(|key| sr25519::Public((<Ristretto as Ciphersuite>::generator() * **key).to_bytes()))
|
.map(|key| (sr25519::Public((<Ristretto as Ciphersuite>::generator() * **key).to_bytes()), 1))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
let res = TributarySpec::new(serai_block, start_time, set, set_participants);
|
let res = TributarySpec::new(serai_block, start_time, set, set_participants);
|
||||||
|
|||||||
@@ -61,7 +61,9 @@ impl Serai {
|
|||||||
key: Public,
|
key: Public,
|
||||||
at_hash: [u8; 32],
|
at_hash: [u8; 32],
|
||||||
) -> Result<Option<Amount>, SeraiError> {
|
) -> Result<Option<Amount>, SeraiError> {
|
||||||
self.storage(PALLET, "Allocations", Some(vec![scale_value(network), scale_value(key)]), at_hash).await
|
self
|
||||||
|
.storage(PALLET, "Allocations", Some(vec![scale_value(network), scale_value(key)]), at_hash)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_validator_set_musig_key(
|
pub async fn get_validator_set_musig_key(
|
||||||
|
|||||||
Reference in New Issue
Block a user