Redo coordinator's Substrate scanner

This commit is contained in:
Luke Parker
2024-12-31 10:37:19 -05:00
parent 5a42f66dc2
commit 8c9441a1a5
25 changed files with 792 additions and 743 deletions

View File

@@ -0,0 +1,35 @@
[package]
name = "serai-coordinator-substrate"
version = "0.1.0"
description = "Serai Coordinator's Substrate Scanner"
license = "AGPL-3.0-only"
repository = "https://github.com/serai-dex/serai/tree/develop/coordinator/substrate"
authors = ["Luke Parker <lukeparker5132@gmail.com>"]
keywords = []
edition = "2021"
publish = false
rust-version = "1.81"
[package.metadata.docs.rs]
all-features = true
rustdoc-args = ["--cfg", "docsrs"]
[lints]
workspace = true
[dependencies]
scale = { package = "parity-scale-codec", version = "3", default-features = false, features = ["std", "derive"] }
borsh = { version = "1", default-features = false, features = ["std", "derive", "de_strict_order"] }
serai-client = { path = "../../substrate/client", default-features = false, features = ["serai", "borsh"] }
log = { version = "0.4", default-features = false, features = ["std"] }
futures = { version = "0.3", default-features = false, features = ["std"] }
tokio = { version = "1", default-features = false }
serai-db = { version = "0.1.1", path = "../../common/db" }
serai-task = { version = "0.1", path = "../../common/task" }
serai-cosign = { path = "../cosign" }
messages = { package = "serai-processor-messages", path = "../../processor/messages" }

View File

@@ -0,0 +1,15 @@
AGPL-3.0-only license
Copyright (c) 2023-2024 Luke Parker
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License Version 3 as
published by the Free Software Foundation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.

View File

@@ -0,0 +1,14 @@
# Serai Coordinate Substrate Scanner
This is the scanner of the Serai blockchain for the purposes of Serai's coordinator.
Two event streams are defined:
- Canonical events, which must be handled by every validator, regardless of the sets they're present
in. These are represented by `serai_processor_messages::substrate::CoordinatorMessage`.
- Ephemeral events, which only need to be handled by the validators present within the sets they
relate to. These are represented by two channels, `NewSet` and `SignSlashReport`.
The canonical event stream is available without provision of a validator's public key. The ephemeral
event stream requires provision of a validator's public key. Both are ordered within themselves, yet
there are no ordering guarantees across the two.

View File

@@ -0,0 +1,216 @@
use std::future::Future;
use futures::stream::{StreamExt, FuturesOrdered};
use serai_client::Serai;
use messages::substrate::{InInstructionResult, ExecutedBatch, CoordinatorMessage};
use serai_db::*;
use serai_task::ContinuallyRan;
use serai_cosign::Cosigning;
create_db!(
CoordinatorSubstrateCanonical {
NextBlock: () -> u64,
}
);
/// The event stream for canonical events.
pub struct CanonicalEventStream<D: Db> {
db: D,
serai: Serai,
}
impl<D: Db> CanonicalEventStream<D> {
/// Create a new canonical event stream.
///
/// Only one of these may exist over the provided database.
pub fn new(db: D, serai: Serai) -> Self {
Self { db, serai }
}
}
impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let next_block = NextBlock::get(&self.db).unwrap_or(0);
let latest_finalized_block =
Cosigning::<D>::latest_cosigned_block_number(&self.db).map_err(|e| format!("{e:?}"))?;
// These are all the events which generate canonical messages
struct CanonicalEvents {
time: u64,
key_gen_events: Vec<serai_client::validator_sets::ValidatorSetsEvent>,
set_retired_events: Vec<serai_client::validator_sets::ValidatorSetsEvent>,
batch_events: Vec<serai_client::in_instructions::InInstructionsEvent>,
burn_events: Vec<serai_client::coins::CoinsEvent>,
}
// For a cosigned block, fetch all relevant events
let scan = {
let db = self.db.clone();
let serai = &self.serai;
move |block_number| {
let block_hash = Cosigning::<D>::cosigned_block(&db, block_number);
async move {
let block_hash = match block_hash {
Ok(Some(block_hash)) => block_hash,
Ok(None) => {
panic!("iterating to latest cosigned block but couldn't get cosigned block")
}
Err(serai_cosign::Faulted) => return Err("cosigning process faulted".to_string()),
};
let temporal_serai = serai.as_of(block_hash);
let temporal_serai_validators = temporal_serai.validator_sets();
let temporal_serai_instructions = temporal_serai.in_instructions();
let temporal_serai_coins = temporal_serai.coins();
let (block, key_gen_events, set_retired_events, batch_events, burn_events) =
tokio::try_join!(
serai.block(block_hash),
temporal_serai_validators.key_gen_events(),
temporal_serai_validators.set_retired_events(),
temporal_serai_instructions.batch_events(),
temporal_serai_coins.burn_with_instruction_events(),
)
.map_err(|e| format!("{e:?}"))?;
let Some(block) = block else {
Err(format!("Serai node didn't have cosigned block #{block_number}"))?
};
let time = if block_number == 0 {
block.time().unwrap_or(0)
} else {
// Serai's block time is in milliseconds
block
.time()
.ok_or_else(|| "non-genesis Serai block didn't have a time".to_string())? /
1000
};
Ok((
block_number,
CanonicalEvents {
time,
key_gen_events,
set_retired_events,
batch_events,
burn_events,
},
))
}
}
};
// Sync the next set of upcoming blocks all at once to minimize latency
const BLOCKS_TO_SYNC_AT_ONCE: u64 = 10;
let mut set = FuturesOrdered::new();
for block_number in
next_block ..= latest_finalized_block.min(next_block + BLOCKS_TO_SYNC_AT_ONCE)
{
set.push_back(scan(block_number));
}
for block_number in next_block ..= latest_finalized_block {
// Get the next block in our queue
let (popped_block_number, block) = set.next().await.unwrap()?;
assert_eq!(block_number, popped_block_number);
// Re-populate the queue
if (block_number + BLOCKS_TO_SYNC_AT_ONCE) <= latest_finalized_block {
set.push_back(scan(block_number + BLOCKS_TO_SYNC_AT_ONCE));
}
let mut txn = self.db.txn();
for key_gen in block.key_gen_events {
let serai_client::validator_sets::ValidatorSetsEvent::KeyGen { set, key_pair } = &key_gen
else {
panic!("KeyGen event wasn't a KeyGen event: {key_gen:?}");
};
crate::Canonical::send(
&mut txn,
set.network,
&CoordinatorMessage::SetKeys {
serai_time: block.time,
session: set.session,
key_pair: key_pair.clone(),
},
);
}
for set_retired in block.set_retired_events {
let serai_client::validator_sets::ValidatorSetsEvent::SetRetired { set } = &set_retired
else {
panic!("SetRetired event wasn't a SetRetired event: {set_retired:?}");
};
crate::Canonical::send(
&mut txn,
set.network,
&CoordinatorMessage::SlashesReported { session: set.session },
);
}
for network in serai_client::primitives::NETWORKS {
let mut batch = None;
for this_batch in &block.batch_events {
let serai_client::in_instructions::InInstructionsEvent::Batch {
network: batch_network,
publishing_session,
id,
in_instructions_hash,
in_instruction_results,
} = this_batch
else {
panic!("Batch event wasn't a Batch event: {this_batch:?}");
};
if network == *batch_network {
if batch.is_some() {
Err("Serai block had multiple batches for the same network".to_string())?;
}
batch = Some(ExecutedBatch {
id: *id,
publisher: *publishing_session,
in_instructions_hash: *in_instructions_hash,
in_instruction_results: in_instruction_results
.iter()
.map(|bit| {
if *bit {
InInstructionResult::Succeeded
} else {
InInstructionResult::Failed
}
})
.collect(),
});
}
}
let mut burns = vec![];
for burn in &block.burn_events {
let serai_client::coins::CoinsEvent::BurnWithInstruction { from: _, instruction } =
&burn
else {
panic!("Burn event wasn't a Burn.in event: {burn:?}");
};
if instruction.balance.coin.network() == network {
burns.push(instruction.clone());
}
}
crate::Canonical::send(
&mut txn,
network,
&CoordinatorMessage::Block { serai_block_number: block_number, batch, burns },
);
}
txn.commit();
}
Ok(next_block <= latest_finalized_block)
}
}
}

View File

@@ -0,0 +1,240 @@
use std::future::Future;
use futures::stream::{StreamExt, FuturesOrdered};
use serai_client::{
primitives::{PublicKey, NetworkId, EmbeddedEllipticCurve},
validator_sets::primitives::MAX_KEY_SHARES_PER_SET,
Serai,
};
use serai_db::*;
use serai_task::ContinuallyRan;
use serai_cosign::Cosigning;
use crate::NewSetInformation;
create_db!(
CoordinatorSubstrateEphemeral {
NextBlock: () -> u64,
}
);
/// The event stream for ephemeral events.
pub struct EphemeralEventStream<D: Db> {
db: D,
serai: Serai,
validator: PublicKey,
}
impl<D: Db> EphemeralEventStream<D> {
/// Create a new ephemeral event stream.
///
/// Only one of these may exist over the provided database.
pub fn new(db: D, serai: Serai, validator: PublicKey) -> Self {
Self { db, serai, validator }
}
}
impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
let next_block = NextBlock::get(&self.db).unwrap_or(0);
let latest_finalized_block =
Cosigning::<D>::latest_cosigned_block_number(&self.db).map_err(|e| format!("{e:?}"))?;
// These are all the events which generate canonical messages
struct EphemeralEvents {
block_hash: [u8; 32],
time: u64,
new_set_events: Vec<serai_client::validator_sets::ValidatorSetsEvent>,
accepted_handover_events: Vec<serai_client::validator_sets::ValidatorSetsEvent>,
}
// For a cosigned block, fetch all relevant events
let scan = {
let db = self.db.clone();
let serai = &self.serai;
move |block_number| {
let block_hash = Cosigning::<D>::cosigned_block(&db, block_number);
async move {
let block_hash = match block_hash {
Ok(Some(block_hash)) => block_hash,
Ok(None) => {
panic!("iterating to latest cosigned block but couldn't get cosigned block")
}
Err(serai_cosign::Faulted) => return Err("cosigning process faulted".to_string()),
};
let temporal_serai = serai.as_of(block_hash);
let temporal_serai_validators = temporal_serai.validator_sets();
let (block, new_set_events, accepted_handover_events) = tokio::try_join!(
serai.block(block_hash),
temporal_serai_validators.new_set_events(),
temporal_serai_validators.accepted_handover_events(),
)
.map_err(|e| format!("{e:?}"))?;
let Some(block) = block else {
Err(format!("Serai node didn't have cosigned block #{block_number}"))?
};
let time = if block_number == 0 {
block.time().unwrap_or(0)
} else {
// Serai's block time is in milliseconds
block
.time()
.ok_or_else(|| "non-genesis Serai block didn't have a time".to_string())? /
1000
};
Ok((
block_number,
EphemeralEvents { block_hash, time, new_set_events, accepted_handover_events },
))
}
}
};
// Sync the next set of upcoming blocks all at once to minimize latency
const BLOCKS_TO_SYNC_AT_ONCE: u64 = 50;
let mut set = FuturesOrdered::new();
for block_number in
next_block ..= latest_finalized_block.min(next_block + BLOCKS_TO_SYNC_AT_ONCE)
{
set.push_back(scan(block_number));
}
for block_number in next_block ..= latest_finalized_block {
// Get the next block in our queue
let (popped_block_number, block) = set.next().await.unwrap()?;
assert_eq!(block_number, popped_block_number);
// Re-populate the queue
if (block_number + BLOCKS_TO_SYNC_AT_ONCE) <= latest_finalized_block {
set.push_back(scan(block_number + BLOCKS_TO_SYNC_AT_ONCE));
}
let mut txn = self.db.txn();
for new_set in block.new_set_events {
let serai_client::validator_sets::ValidatorSetsEvent::NewSet { set } = &new_set else {
panic!("NewSet event wasn't a NewSet event: {new_set:?}");
};
// We only coordinate over external networks
if set.network == NetworkId::Serai {
continue;
}
let serai = self.serai.as_of(block.block_hash);
let serai = serai.validator_sets();
let Some(validators) =
serai.participants(set.network).await.map_err(|e| format!("{e:?}"))?
else {
Err(format!(
"block #{block_number} declared a new set but didn't have the participants"
))?
};
let in_set = validators.iter().any(|(validator, _)| *validator == self.validator);
if in_set {
if u16::try_from(validators.len()).is_err() {
Err("more than u16::MAX validators sent")?;
}
let Ok(validators) = validators
.into_iter()
.map(|(validator, weight)| u16::try_from(weight).map(|weight| (validator, weight)))
.collect::<Result<Vec<_>, _>>()
else {
Err("validator's weight exceeded u16::MAX".to_string())?
};
let total_weight = validators.iter().map(|(_, weight)| u32::from(*weight)).sum::<u32>();
if total_weight > MAX_KEY_SHARES_PER_SET {
Err(format!(
"{set:?} has {total_weight} key shares when the max is {MAX_KEY_SHARES_PER_SET}"
))?;
}
let total_weight = u16::try_from(total_weight).unwrap();
// Fetch all of the validators' embedded elliptic curve keys
let mut embedded_elliptic_curve_keys = FuturesOrdered::new();
for (validator, _) in &validators {
let validator = *validator;
// try_join doesn't return a future so we need to wrap it in this additional async
// block
embedded_elliptic_curve_keys.push_back(async move {
tokio::try_join!(
// One future to fetch the substrate embedded key
serai
.embedded_elliptic_curve_key(validator, EmbeddedEllipticCurve::Embedwards25519),
// One future to fetch the external embedded key, if there is a distinct curve
async {
// `embedded_elliptic_curves` is documented to have the second entry be the
// network-specific curve (if it exists and is distinct from Embedwards25519)
if let Some(curve) = set.network.embedded_elliptic_curves().get(1) {
serai.embedded_elliptic_curve_key(validator, *curve).await.map(Some)
} else {
Ok(None)
}
}
)
.map(|(substrate_embedded_key, external_embedded_key)| {
(validator, substrate_embedded_key, external_embedded_key)
})
});
}
let mut evrf_public_keys = Vec::with_capacity(usize::from(total_weight));
for (validator, weight) in &validators {
let (future_validator, substrate_embedded_key, external_embedded_key) =
embedded_elliptic_curve_keys.next().await.unwrap().map_err(|e| format!("{e:?}"))?;
assert_eq!(*validator, future_validator);
let external_embedded_key =
external_embedded_key.unwrap_or(substrate_embedded_key.clone());
match (substrate_embedded_key, external_embedded_key) {
(Some(substrate_embedded_key), Some(external_embedded_key)) => {
let substrate_embedded_key = <[u8; 32]>::try_from(substrate_embedded_key)
.map_err(|_| "Embedwards25519 key wasn't 32 bytes".to_string())?;
for _ in 0 .. *weight {
evrf_public_keys.push((substrate_embedded_key, external_embedded_key.clone()));
}
}
_ => Err("NewSet with validator missing an embedded key".to_string())?,
}
}
crate::NewSet::send(
&mut txn,
&NewSetInformation {
set: *set,
serai_block: block.block_hash,
start_time: block.time,
// TODO: Why do we have this as an explicit field here?
// Shouldn't thiis be inlined into the Processor's key gen code, where it's used?
threshold: ((total_weight * 2) / 3) + 1,
validators,
evrf_public_keys,
},
);
}
}
for accepted_handover in block.accepted_handover_events {
let serai_client::validator_sets::ValidatorSetsEvent::AcceptedHandover { set } =
&accepted_handover
else {
panic!("AcceptedHandover event wasn't a AcceptedHandover event: {accepted_handover:?}");
};
crate::SignSlashReport::send(&mut txn, set);
}
txn.commit();
}
Ok(next_block <= latest_finalized_block)
}
}
}

View File

@@ -0,0 +1,109 @@
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
#![doc = include_str!("../README.md")]
#![deny(missing_docs)]
use scale::{Encode, Decode};
use borsh::{io, BorshSerialize, BorshDeserialize};
use serai_client::{
primitives::{PublicKey, NetworkId},
validator_sets::primitives::ValidatorSet,
};
use serai_db::*;
mod canonical;
mod ephemeral;
fn borsh_serialize_validators<W: io::Write>(
validators: &Vec<(PublicKey, u16)>,
writer: &mut W,
) -> Result<(), io::Error> {
// This doesn't use `encode_to` as `encode_to` panics if the writer returns an error
writer.write_all(&validators.encode())
}
fn borsh_deserialize_validators<R: io::Read>(
reader: &mut R,
) -> Result<Vec<(PublicKey, u16)>, io::Error> {
Decode::decode(&mut scale::IoReader(reader)).map_err(io::Error::other)
}
/// The information for a new set.
#[derive(Debug, BorshSerialize, BorshDeserialize)]
pub struct NewSetInformation {
set: ValidatorSet,
serai_block: [u8; 32],
start_time: u64,
threshold: u16,
#[borsh(
serialize_with = "borsh_serialize_validators",
deserialize_with = "borsh_deserialize_validators"
)]
validators: Vec<(PublicKey, u16)>,
evrf_public_keys: Vec<([u8; 32], Vec<u8>)>,
}
mod _public_db {
use serai_client::{primitives::NetworkId, validator_sets::primitives::ValidatorSet};
use serai_db::*;
use crate::NewSetInformation;
db_channel!(
CoordinatorSubstrate {
// Canonical messages to send to the processor
Canonical: (network: NetworkId) -> messages::substrate::CoordinatorMessage,
// Relevant new set, from an ephemeral event stream
NewSet: () -> NewSetInformation,
// Relevant sign slash report, from an ephemeral event stream
SignSlashReport: () -> ValidatorSet,
}
);
}
/// The canonical event stream.
pub struct Canonical;
impl Canonical {
pub(crate) fn send(
txn: &mut impl DbTxn,
network: NetworkId,
msg: &messages::substrate::CoordinatorMessage,
) {
_public_db::Canonical::send(txn, network, msg);
}
/// Try to receive a canonical event, returning `None` if there is none to receive.
pub fn try_recv(
txn: &mut impl DbTxn,
network: NetworkId,
) -> Option<messages::substrate::CoordinatorMessage> {
_public_db::Canonical::try_recv(txn, network)
}
}
/// The channel for new set events emitted by an ephemeral event stream.
pub struct NewSet;
impl NewSet {
pub(crate) fn send(txn: &mut impl DbTxn, msg: &NewSetInformation) {
_public_db::NewSet::send(txn, msg);
}
/// Try to receive a new set's information, returning `None` if there is none to receive.
pub fn try_recv(txn: &mut impl DbTxn) -> Option<NewSetInformation> {
_public_db::NewSet::try_recv(txn)
}
}
/// The channel for notifications to sign a slash report, as emitted by an ephemeral event stream.
pub struct SignSlashReport;
impl SignSlashReport {
pub(crate) fn send(txn: &mut impl DbTxn, set: &ValidatorSet) {
_public_db::SignSlashReport::send(txn, set);
}
/// Try to receive a notification to sign a slash report, returning `None` if there is none to
/// receive.
pub fn try_recv(txn: &mut impl DbTxn) -> Option<ValidatorSet> {
_public_db::SignSlashReport::try_recv(txn)
}
}