mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-09 04:39:24 +00:00
Update all of serai-coordinator to compile with the new serai-client-serai
This commit is contained in:
@@ -24,7 +24,7 @@ borsh = { version = "1", default-features = false, features = ["std", "derive",
|
||||
|
||||
dkg = { path = "../../crypto/dkg", default-features = false, features = ["std"] }
|
||||
|
||||
serai-client = { path = "../../substrate/client", version = "0.1", default-features = false, features = ["serai"] }
|
||||
serai-client-serai = { path = "../../substrate/client/serai", default-features = false }
|
||||
|
||||
log = { version = "0.4", default-features = false, features = ["std"] }
|
||||
|
||||
|
||||
@@ -3,7 +3,13 @@ use std::sync::Arc;
|
||||
|
||||
use futures::stream::{StreamExt, FuturesOrdered};
|
||||
|
||||
use serai_client::{validator_sets::primitives::ExternalValidatorSet, Serai};
|
||||
use serai_client_serai::{
|
||||
abi::{
|
||||
self,
|
||||
primitives::{network_id::ExternalNetworkId, validator_sets::ExternalValidatorSet},
|
||||
},
|
||||
Serai,
|
||||
};
|
||||
|
||||
use messages::substrate::{InInstructionResult, ExecutedBatch, CoordinatorMessage};
|
||||
|
||||
@@ -15,6 +21,7 @@ use serai_cosign::Cosigning;
|
||||
create_db!(
|
||||
CoordinatorSubstrateCanonical {
|
||||
NextBlock: () -> u64,
|
||||
LastIndexedBatchId: (network: ExternalNetworkId) -> u32,
|
||||
}
|
||||
);
|
||||
|
||||
@@ -45,10 +52,10 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
// These are all the events which generate canonical messages
|
||||
struct CanonicalEvents {
|
||||
time: u64,
|
||||
key_gen_events: Vec<serai_client::validator_sets::ValidatorSetsEvent>,
|
||||
set_retired_events: Vec<serai_client::validator_sets::ValidatorSetsEvent>,
|
||||
batch_events: Vec<serai_client::in_instructions::InInstructionsEvent>,
|
||||
burn_events: Vec<serai_client::coins::CoinsEvent>,
|
||||
set_keys_events: Vec<abi::validator_sets::Event>,
|
||||
slash_report_events: Vec<abi::validator_sets::Event>,
|
||||
batch_events: Vec<abi::in_instructions::Event>,
|
||||
burn_events: Vec<abi::coins::Event>,
|
||||
}
|
||||
|
||||
// For a cosigned block, fetch all relevant events
|
||||
@@ -66,16 +73,16 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
}
|
||||
Err(serai_cosign::Faulted) => return Err("cosigning process faulted".to_string()),
|
||||
};
|
||||
let temporal_serai = serai.as_of(block_hash);
|
||||
let temporal_serai = serai.as_of(block_hash).await.map_err(|e| format!("{e}"))?;
|
||||
let temporal_serai_validators = temporal_serai.validator_sets();
|
||||
let temporal_serai_instructions = temporal_serai.in_instructions();
|
||||
let temporal_serai_coins = temporal_serai.coins();
|
||||
|
||||
let (block, key_gen_events, set_retired_events, batch_events, burn_events) =
|
||||
let (block, set_keys_events, slash_report_events, batch_events, burn_events) =
|
||||
tokio::try_join!(
|
||||
serai.block(block_hash),
|
||||
temporal_serai_validators.key_gen_events(),
|
||||
temporal_serai_validators.set_retired_events(),
|
||||
temporal_serai_validators.set_keys_events(),
|
||||
temporal_serai_validators.slash_report_events(),
|
||||
temporal_serai_instructions.batch_events(),
|
||||
temporal_serai_coins.burn_with_instruction_events(),
|
||||
)
|
||||
@@ -84,22 +91,14 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
Err(format!("Serai node didn't have cosigned block #{block_number}"))?
|
||||
};
|
||||
|
||||
let time = if block_number == 0 {
|
||||
block.time().unwrap_or(0)
|
||||
} else {
|
||||
// Serai's block time is in milliseconds
|
||||
block
|
||||
.time()
|
||||
.ok_or_else(|| "non-genesis Serai block didn't have a time".to_string())? /
|
||||
1000
|
||||
};
|
||||
|
||||
// We use time in seconds, not milliseconds, here
|
||||
let time = block.header.unix_time_in_millis() / 1000;
|
||||
Ok((
|
||||
block_number,
|
||||
CanonicalEvents {
|
||||
time,
|
||||
key_gen_events,
|
||||
set_retired_events,
|
||||
set_keys_events,
|
||||
slash_report_events,
|
||||
batch_events,
|
||||
burn_events,
|
||||
},
|
||||
@@ -131,10 +130,9 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
|
||||
let mut txn = self.db.txn();
|
||||
|
||||
for key_gen in block.key_gen_events {
|
||||
let serai_client::validator_sets::ValidatorSetsEvent::KeyGen { set, key_pair } = &key_gen
|
||||
else {
|
||||
panic!("KeyGen event wasn't a KeyGen event: {key_gen:?}");
|
||||
for set_keys in block.set_keys_events {
|
||||
let abi::validator_sets::Event::SetKeys { set, key_pair } = &set_keys else {
|
||||
panic!("`SetKeys` event wasn't a `SetKeys` event: {set_keys:?}");
|
||||
};
|
||||
crate::Canonical::send(
|
||||
&mut txn,
|
||||
@@ -147,12 +145,10 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
);
|
||||
}
|
||||
|
||||
for set_retired in block.set_retired_events {
|
||||
let serai_client::validator_sets::ValidatorSetsEvent::SetRetired { set } = &set_retired
|
||||
else {
|
||||
panic!("SetRetired event wasn't a SetRetired event: {set_retired:?}");
|
||||
for slash_report in block.slash_report_events {
|
||||
let abi::validator_sets::Event::SlashReport { set } = &slash_report else {
|
||||
panic!("`SlashReport` event wasn't a `SlashReport` event: {slash_report:?}");
|
||||
};
|
||||
let Ok(set) = ExternalValidatorSet::try_from(*set) else { continue };
|
||||
crate::Canonical::send(
|
||||
&mut txn,
|
||||
set.network,
|
||||
@@ -160,10 +156,12 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
);
|
||||
}
|
||||
|
||||
for network in serai_client::primitives::EXTERNAL_NETWORKS {
|
||||
for network in ExternalNetworkId::all() {
|
||||
let mut batch = None;
|
||||
for this_batch in &block.batch_events {
|
||||
let serai_client::in_instructions::InInstructionsEvent::Batch {
|
||||
// Only irrefutable as this is the only member of the enum at this time
|
||||
#[expect(irrefutable_let_patterns)]
|
||||
let abi::in_instructions::Event::Batch {
|
||||
network: batch_network,
|
||||
publishing_session,
|
||||
id,
|
||||
@@ -194,14 +192,19 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
})
|
||||
.collect(),
|
||||
});
|
||||
|
||||
if LastIndexedBatchId::get(&txn, network) != id.checked_sub(1) {
|
||||
panic!(
|
||||
"next batch from Serai's ID was not an increment of the last indexed batch's ID"
|
||||
);
|
||||
}
|
||||
LastIndexedBatchId::set(&mut txn, network, id);
|
||||
}
|
||||
}
|
||||
|
||||
let mut burns = vec![];
|
||||
for burn in &block.burn_events {
|
||||
let serai_client::coins::CoinsEvent::BurnWithInstruction { from: _, instruction } =
|
||||
&burn
|
||||
else {
|
||||
let abi::coins::Event::BurnWithInstruction { from: _, instruction } = &burn else {
|
||||
panic!("BurnWithInstruction event wasn't a BurnWithInstruction event: {burn:?}");
|
||||
};
|
||||
if instruction.balance.coin.network() == network {
|
||||
@@ -223,3 +226,7 @@ impl<D: Db> ContinuallyRan for CanonicalEventStream<D> {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn last_indexed_batch_id(txn: &impl DbTxn, network: ExternalNetworkId) -> Option<u32> {
|
||||
LastIndexedBatchId::get(txn, network)
|
||||
}
|
||||
|
||||
@@ -3,9 +3,14 @@ use std::sync::Arc;
|
||||
|
||||
use futures::stream::{StreamExt, FuturesOrdered};
|
||||
|
||||
use serai_client::{
|
||||
primitives::{SeraiAddress, EmbeddedEllipticCurve},
|
||||
validator_sets::primitives::{MAX_KEY_SHARES_PER_SET, ExternalValidatorSet},
|
||||
use serai_client_serai::{
|
||||
abi::primitives::{
|
||||
BlockHash,
|
||||
crypto::EmbeddedEllipticCurveKeys,
|
||||
network_id::ExternalNetworkId,
|
||||
validator_sets::{KeyShares, ExternalValidatorSet},
|
||||
address::SeraiAddress,
|
||||
},
|
||||
Serai,
|
||||
};
|
||||
|
||||
@@ -49,10 +54,10 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
||||
|
||||
// These are all the events which generate canonical messages
|
||||
struct EphemeralEvents {
|
||||
block_hash: [u8; 32],
|
||||
block_hash: BlockHash,
|
||||
time: u64,
|
||||
new_set_events: Vec<serai_client::validator_sets::ValidatorSetsEvent>,
|
||||
accepted_handover_events: Vec<serai_client::validator_sets::ValidatorSetsEvent>,
|
||||
set_decided_events: Vec<serai_client_serai::abi::validator_sets::Event>,
|
||||
accepted_handover_events: Vec<serai_client_serai::abi::validator_sets::Event>,
|
||||
}
|
||||
|
||||
// For a cosigned block, fetch all relevant events
|
||||
@@ -71,11 +76,11 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
||||
Err(serai_cosign::Faulted) => return Err("cosigning process faulted".to_string()),
|
||||
};
|
||||
|
||||
let temporal_serai = serai.as_of(block_hash);
|
||||
let temporal_serai = serai.as_of(block_hash).await.map_err(|e| format!("{e}"))?;
|
||||
let temporal_serai_validators = temporal_serai.validator_sets();
|
||||
let (block, new_set_events, accepted_handover_events) = tokio::try_join!(
|
||||
let (block, set_decided_events, accepted_handover_events) = tokio::try_join!(
|
||||
serai.block(block_hash),
|
||||
temporal_serai_validators.new_set_events(),
|
||||
temporal_serai_validators.set_decided_events(),
|
||||
temporal_serai_validators.accepted_handover_events(),
|
||||
)
|
||||
.map_err(|e| format!("{e:?}"))?;
|
||||
@@ -83,19 +88,11 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
||||
Err(format!("Serai node didn't have cosigned block #{block_number}"))?
|
||||
};
|
||||
|
||||
let time = if block_number == 0 {
|
||||
block.time().unwrap_or(0)
|
||||
} else {
|
||||
// Serai's block time is in milliseconds
|
||||
block
|
||||
.time()
|
||||
.ok_or_else(|| "non-genesis Serai block didn't have a time".to_string())? /
|
||||
1000
|
||||
};
|
||||
|
||||
// We use time in seconds, not milliseconds, here
|
||||
let time = block.header.unix_time_in_millis() / 1000;
|
||||
Ok((
|
||||
block_number,
|
||||
EphemeralEvents { block_hash, time, new_set_events, accepted_handover_events },
|
||||
EphemeralEvents { block_hash, time, set_decided_events, accepted_handover_events },
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -126,48 +123,40 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
||||
|
||||
let mut txn = self.db.txn();
|
||||
|
||||
for new_set in block.new_set_events {
|
||||
let serai_client::validator_sets::ValidatorSetsEvent::NewSet { set } = &new_set else {
|
||||
panic!("NewSet event wasn't a NewSet event: {new_set:?}");
|
||||
for set_decided in block.set_decided_events {
|
||||
let serai_client_serai::abi::validator_sets::Event::SetDecided { set, validators } =
|
||||
&set_decided
|
||||
else {
|
||||
panic!("`SetDecided` event wasn't a `SetDecided` event: {set_decided:?}");
|
||||
};
|
||||
// We only coordinate over external networks
|
||||
let Ok(set) = ExternalValidatorSet::try_from(*set) else { continue };
|
||||
|
||||
let serai = self.serai.as_of(block.block_hash);
|
||||
let serai = self.serai.as_of(block.block_hash).await.map_err(|e| format!("{e}"))?;
|
||||
let serai = serai.validator_sets();
|
||||
let Some(validators) =
|
||||
serai.participants(set.network.into()).await.map_err(|e| format!("{e:?}"))?
|
||||
else {
|
||||
Err(format!(
|
||||
"block #{block_number} declared a new set but didn't have the participants"
|
||||
))?
|
||||
};
|
||||
let validators = validators
|
||||
.into_iter()
|
||||
.map(|(validator, weight)| (SeraiAddress::from(validator), weight))
|
||||
.collect::<Vec<_>>();
|
||||
let validators =
|
||||
validators.iter().map(|(validator, weight)| (*validator, weight)).collect::<Vec<_>>();
|
||||
let in_set = validators.iter().any(|(validator, _)| *validator == self.validator);
|
||||
if in_set {
|
||||
if u16::try_from(validators.len()).is_err() {
|
||||
Err("more than u16::MAX validators sent")?;
|
||||
}
|
||||
|
||||
let Ok(validators) = validators
|
||||
let validators = validators
|
||||
.into_iter()
|
||||
.map(|(validator, weight)| u16::try_from(weight).map(|weight| (validator, weight)))
|
||||
.collect::<Result<Vec<_>, _>>()
|
||||
else {
|
||||
Err("validator's weight exceeded u16::MAX".to_string())?
|
||||
};
|
||||
.map(|(validator, weight)| (validator, weight.0))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// Do the summation in u32 so we don't risk a u16 overflow
|
||||
let total_weight = validators.iter().map(|(_, weight)| u32::from(*weight)).sum::<u32>();
|
||||
if total_weight > u32::from(MAX_KEY_SHARES_PER_SET) {
|
||||
if total_weight > u32::from(KeyShares::MAX_PER_SET) {
|
||||
Err(format!(
|
||||
"{set:?} has {total_weight} key shares when the max is {MAX_KEY_SHARES_PER_SET}"
|
||||
"{set:?} has {total_weight} key shares when the max is {}",
|
||||
KeyShares::MAX_PER_SET
|
||||
))?;
|
||||
}
|
||||
let total_weight = u16::try_from(total_weight).unwrap();
|
||||
let total_weight = u16::try_from(total_weight)
|
||||
.expect("value smaller than `u16` constant but doesn't fit in `u16`");
|
||||
|
||||
// Fetch all of the validators' embedded elliptic curve keys
|
||||
let mut embedded_elliptic_curve_keys = FuturesOrdered::new();
|
||||
@@ -175,52 +164,51 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
||||
let validator = *validator;
|
||||
// try_join doesn't return a future so we need to wrap it in this additional async
|
||||
// block
|
||||
embedded_elliptic_curve_keys.push_back(async move {
|
||||
tokio::try_join!(
|
||||
// One future to fetch the substrate embedded key
|
||||
serai.embedded_elliptic_curve_key(
|
||||
validator.into(),
|
||||
EmbeddedEllipticCurve::Embedwards25519
|
||||
),
|
||||
// One future to fetch the external embedded key, if there is a distinct curve
|
||||
async {
|
||||
// `embedded_elliptic_curves` is documented to have the second entry be the
|
||||
// network-specific curve (if it exists and is distinct from Embedwards25519)
|
||||
if let Some(curve) = set.network.embedded_elliptic_curves().get(1) {
|
||||
serai.embedded_elliptic_curve_key(validator.into(), *curve).await.map(Some)
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
embedded_elliptic_curve_keys.push_back({
|
||||
let serai = serai.clone();
|
||||
async move {
|
||||
match serai.embedded_elliptic_curve_keys(validator, set.network).await {
|
||||
Ok(Some(keys)) => Ok(Some((
|
||||
validator,
|
||||
match keys {
|
||||
EmbeddedEllipticCurveKeys::Bitcoin(substrate, external) => {
|
||||
assert_eq!(set.network, ExternalNetworkId::Bitcoin);
|
||||
(substrate, external.as_slice().to_vec())
|
||||
}
|
||||
EmbeddedEllipticCurveKeys::Ethereum(substrate, external) => {
|
||||
assert_eq!(set.network, ExternalNetworkId::Ethereum);
|
||||
(substrate, external.as_slice().to_vec())
|
||||
}
|
||||
EmbeddedEllipticCurveKeys::Monero(substrate) => {
|
||||
assert_eq!(set.network, ExternalNetworkId::Monero);
|
||||
(substrate, substrate.as_slice().to_vec())
|
||||
}
|
||||
},
|
||||
))),
|
||||
Ok(None) => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
)
|
||||
.map(|(substrate_embedded_key, external_embedded_key)| {
|
||||
(validator, substrate_embedded_key, external_embedded_key)
|
||||
})
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
let mut evrf_public_keys = Vec::with_capacity(usize::from(total_weight));
|
||||
for (validator, weight) in &validators {
|
||||
let (future_validator, substrate_embedded_key, external_embedded_key) =
|
||||
embedded_elliptic_curve_keys.next().await.unwrap().map_err(|e| format!("{e:?}"))?;
|
||||
let Some((future_validator, (substrate_embedded_key, external_embedded_key))) =
|
||||
embedded_elliptic_curve_keys.next().await.unwrap().map_err(|e| format!("{e:?}"))?
|
||||
else {
|
||||
Err("`SetDecided` with validator missing an embedded key".to_string())?
|
||||
};
|
||||
assert_eq!(*validator, future_validator);
|
||||
let external_embedded_key =
|
||||
external_embedded_key.unwrap_or(substrate_embedded_key.clone());
|
||||
match (substrate_embedded_key, external_embedded_key) {
|
||||
(Some(substrate_embedded_key), Some(external_embedded_key)) => {
|
||||
let substrate_embedded_key = <[u8; 32]>::try_from(substrate_embedded_key)
|
||||
.map_err(|_| "Embedwards25519 key wasn't 32 bytes".to_string())?;
|
||||
for _ in 0 .. *weight {
|
||||
evrf_public_keys.push((substrate_embedded_key, external_embedded_key.clone()));
|
||||
}
|
||||
}
|
||||
_ => Err("NewSet with validator missing an embedded key".to_string())?,
|
||||
|
||||
for _ in 0 .. *weight {
|
||||
evrf_public_keys.push((substrate_embedded_key, external_embedded_key.clone()));
|
||||
}
|
||||
}
|
||||
|
||||
let mut new_set = NewSetInformation {
|
||||
set,
|
||||
serai_block: block.block_hash,
|
||||
serai_block: block.block_hash.0,
|
||||
declaration_time: block.time,
|
||||
// TODO: This should be inlined into the Processor's key gen code
|
||||
// It's legacy from when we removed participants from the key gen
|
||||
@@ -238,7 +226,7 @@ impl<D: Db> ContinuallyRan for EphemeralEventStream<D> {
|
||||
}
|
||||
|
||||
for accepted_handover in block.accepted_handover_events {
|
||||
let serai_client::validator_sets::ValidatorSetsEvent::AcceptedHandover { set } =
|
||||
let serai_client_serai::abi::validator_sets::Event::AcceptedHandover { set } =
|
||||
&accepted_handover
|
||||
else {
|
||||
panic!("AcceptedHandover event wasn't a AcceptedHandover event: {accepted_handover:?}");
|
||||
|
||||
@@ -8,10 +8,14 @@ use borsh::{BorshSerialize, BorshDeserialize};
|
||||
|
||||
use dkg::Participant;
|
||||
|
||||
use serai_client::{
|
||||
primitives::{ExternalNetworkId, SeraiAddress, Signature},
|
||||
validator_sets::primitives::{Session, ExternalValidatorSet, KeyPair, SlashReport},
|
||||
in_instructions::primitives::SignedBatch,
|
||||
use serai_client_serai::abi::{
|
||||
primitives::{
|
||||
network_id::ExternalNetworkId,
|
||||
validator_sets::{Session, ExternalValidatorSet, SlashReport},
|
||||
crypto::{Signature, KeyPair},
|
||||
address::SeraiAddress,
|
||||
instructions::SignedBatch,
|
||||
},
|
||||
Transaction,
|
||||
};
|
||||
|
||||
@@ -19,6 +23,7 @@ use serai_db::*;
|
||||
|
||||
mod canonical;
|
||||
pub use canonical::CanonicalEventStream;
|
||||
use canonical::last_indexed_batch_id;
|
||||
mod ephemeral;
|
||||
pub use ephemeral::EphemeralEventStream;
|
||||
|
||||
@@ -37,7 +42,7 @@ pub struct NewSetInformation {
|
||||
pub set: ExternalValidatorSet,
|
||||
/// The Serai block which declared it.
|
||||
pub serai_block: [u8; 32],
|
||||
/// The time of the block which declared it, in seconds.
|
||||
/// The time of the block which declared it, in seconds since the epoch.
|
||||
pub declaration_time: u64,
|
||||
/// The threshold to use.
|
||||
pub threshold: u16,
|
||||
@@ -96,9 +101,9 @@ mod _public_db {
|
||||
create_db!(
|
||||
CoordinatorSubstrate {
|
||||
// Keys to set on the Serai network
|
||||
Keys: (network: ExternalNetworkId) -> (Session, Vec<u8>),
|
||||
Keys: (network: ExternalNetworkId) -> (Session, Transaction),
|
||||
// Slash reports to publish onto the Serai network
|
||||
SlashReports: (network: ExternalNetworkId) -> (Session, Vec<u8>),
|
||||
SlashReports: (network: ExternalNetworkId) -> (Session, Transaction),
|
||||
}
|
||||
);
|
||||
}
|
||||
@@ -171,7 +176,7 @@ impl Keys {
|
||||
}
|
||||
}
|
||||
|
||||
let tx = serai_client::validator_sets::SeraiValidatorSets::set_keys(
|
||||
let tx = serai_client_serai::ValidatorSets::set_keys(
|
||||
set.network,
|
||||
key_pair,
|
||||
signature_participants,
|
||||
@@ -192,7 +197,7 @@ pub struct SignedBatches;
|
||||
impl SignedBatches {
|
||||
/// Send a `SignedBatch` to publish onto Serai.
|
||||
pub fn send(txn: &mut impl DbTxn, batch: &SignedBatch) {
|
||||
_public_db::SignedBatches::send(txn, batch.batch.network, batch);
|
||||
_public_db::SignedBatches::send(txn, batch.batch.network(), batch);
|
||||
}
|
||||
pub(crate) fn try_recv(txn: &mut impl DbTxn, network: ExternalNetworkId) -> Option<SignedBatch> {
|
||||
_public_db::SignedBatches::try_recv(txn, network)
|
||||
@@ -219,11 +224,8 @@ impl SlashReports {
|
||||
}
|
||||
}
|
||||
|
||||
let tx = serai_client::validator_sets::SeraiValidatorSets::report_slashes(
|
||||
set.network,
|
||||
slash_report,
|
||||
signature,
|
||||
);
|
||||
let tx =
|
||||
serai_client_serai::ValidatorSets::report_slashes(set.network, slash_report, signature);
|
||||
_public_db::SlashReports::set(txn, set.network, &(set.session, tx));
|
||||
}
|
||||
pub(crate) fn take(
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
use core::future::Future;
|
||||
use std::sync::Arc;
|
||||
|
||||
#[rustfmt::skip]
|
||||
use serai_client::{primitives::ExternalNetworkId, in_instructions::primitives::SignedBatch, SeraiError, Serai};
|
||||
use serai_client_serai::{
|
||||
abi::primitives::{network_id::ExternalNetworkId, instructions::SignedBatch},
|
||||
RpcError, Serai,
|
||||
};
|
||||
|
||||
use serai_db::{Get, DbTxn, Db, create_db};
|
||||
use serai_task::ContinuallyRan;
|
||||
@@ -31,7 +33,7 @@ impl<D: Db> PublishBatchTask<D> {
|
||||
}
|
||||
|
||||
impl<D: Db> ContinuallyRan for PublishBatchTask<D> {
|
||||
type Error = SeraiError;
|
||||
type Error = RpcError;
|
||||
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
@@ -43,8 +45,8 @@ impl<D: Db> ContinuallyRan for PublishBatchTask<D> {
|
||||
};
|
||||
|
||||
// If this is a Batch not yet published, save it into our unordered mapping
|
||||
if LastPublishedBatch::get(&txn, self.network) < Some(batch.batch.id) {
|
||||
BatchesToPublish::set(&mut txn, self.network, batch.batch.id, &batch);
|
||||
if LastPublishedBatch::get(&txn, self.network) < Some(batch.batch.id()) {
|
||||
BatchesToPublish::set(&mut txn, self.network, batch.batch.id(), &batch);
|
||||
}
|
||||
|
||||
txn.commit();
|
||||
@@ -52,12 +54,8 @@ impl<D: Db> ContinuallyRan for PublishBatchTask<D> {
|
||||
|
||||
// Synchronize our last published batch with the Serai network's
|
||||
let next_to_publish = {
|
||||
// This uses the latest finalized block, not the latest cosigned block, which should be
|
||||
// fine as in the worst case, the only impact is no longer attempting TX publication
|
||||
let serai = self.serai.as_of_latest_finalized_block().await?;
|
||||
let last_batch = serai.in_instructions().last_batch_for_network(self.network).await?;
|
||||
|
||||
let mut txn = self.db.txn();
|
||||
let last_batch = crate::last_indexed_batch_id(&txn, self.network);
|
||||
let mut our_last_batch = LastPublishedBatch::get(&txn, self.network);
|
||||
while our_last_batch < last_batch {
|
||||
let next_batch = our_last_batch.map(|batch| batch + 1).unwrap_or(0);
|
||||
@@ -68,6 +66,7 @@ impl<D: Db> ContinuallyRan for PublishBatchTask<D> {
|
||||
if let Some(last_batch) = our_last_batch {
|
||||
LastPublishedBatch::set(&mut txn, self.network, &last_batch);
|
||||
}
|
||||
txn.commit();
|
||||
last_batch.map(|batch| batch + 1).unwrap_or(0)
|
||||
};
|
||||
|
||||
@@ -75,7 +74,7 @@ impl<D: Db> ContinuallyRan for PublishBatchTask<D> {
|
||||
if let Some(batch) = BatchesToPublish::get(&self.db, self.network, next_to_publish) {
|
||||
self
|
||||
.serai
|
||||
.publish(&serai_client::in_instructions::SeraiInInstructions::execute_batch(batch))
|
||||
.publish_transaction(&serai_client_serai::InInstructions::execute_batch(batch))
|
||||
.await?;
|
||||
true
|
||||
} else {
|
||||
|
||||
@@ -3,7 +3,10 @@ use std::sync::Arc;
|
||||
|
||||
use serai_db::{DbTxn, Db};
|
||||
|
||||
use serai_client::{primitives::ExternalNetworkId, validator_sets::primitives::Session, Serai};
|
||||
use serai_client_serai::{
|
||||
abi::primitives::{network_id::ExternalNetworkId, validator_sets::Session},
|
||||
Serai,
|
||||
};
|
||||
|
||||
use serai_task::ContinuallyRan;
|
||||
|
||||
@@ -36,7 +39,8 @@ impl<D: Db> PublishSlashReportTask<D> {
|
||||
let serai = self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
|
||||
let serai = serai.validator_sets();
|
||||
let session_after_slash_report = Session(session.0 + 1);
|
||||
let current_session = serai.session(network.into()).await.map_err(|e| format!("{e:?}"))?;
|
||||
let current_session =
|
||||
serai.current_session(network.into()).await.map_err(|e| format!("{e:?}"))?;
|
||||
let current_session = current_session.map(|session| session.0);
|
||||
// Only attempt to publish the slash report for session #n while session #n+1 is still
|
||||
// active
|
||||
@@ -55,14 +59,13 @@ impl<D: Db> PublishSlashReportTask<D> {
|
||||
}
|
||||
|
||||
// If this session which should publish a slash report already has, move on
|
||||
let key_pending_slash_report =
|
||||
serai.key_pending_slash_report(network).await.map_err(|e| format!("{e:?}"))?;
|
||||
if key_pending_slash_report.is_none() {
|
||||
if !serai.pending_slash_report(network).await.map_err(|e| format!("{e:?}"))? {
|
||||
txn.commit();
|
||||
return Ok(false);
|
||||
};
|
||||
|
||||
match self.serai.publish(&slash_report).await {
|
||||
// Since this slash report is still pending, publish it
|
||||
match self.serai.publish_transaction(&slash_report).await {
|
||||
Ok(()) => {
|
||||
txn.commit();
|
||||
Ok(true)
|
||||
@@ -84,7 +87,7 @@ impl<D: Db> ContinuallyRan for PublishSlashReportTask<D> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
let mut error = None;
|
||||
for network in serai_client::primitives::EXTERNAL_NETWORKS {
|
||||
for network in ExternalNetworkId::all() {
|
||||
let network_res = self.publish(network).await;
|
||||
// We made progress if any network successfully published their slash report
|
||||
made_progress |= network_res == Ok(true);
|
||||
|
||||
@@ -3,7 +3,10 @@ use std::sync::Arc;
|
||||
|
||||
use serai_db::{DbTxn, Db};
|
||||
|
||||
use serai_client::{validator_sets::primitives::ExternalValidatorSet, Serai};
|
||||
use serai_client_serai::{
|
||||
abi::primitives::{network_id::ExternalNetworkId, validator_sets::ExternalValidatorSet},
|
||||
Serai,
|
||||
};
|
||||
|
||||
use serai_task::ContinuallyRan;
|
||||
|
||||
@@ -28,7 +31,7 @@ impl<D: Db> ContinuallyRan for SetKeysTask<D> {
|
||||
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, Self::Error>> {
|
||||
async move {
|
||||
let mut made_progress = false;
|
||||
for network in serai_client::primitives::EXTERNAL_NETWORKS {
|
||||
for network in ExternalNetworkId::all() {
|
||||
let mut txn = self.db.txn();
|
||||
let Some((session, keys)) = Keys::take(&mut txn, network) else {
|
||||
// No keys to set
|
||||
@@ -40,7 +43,8 @@ impl<D: Db> ContinuallyRan for SetKeysTask<D> {
|
||||
let serai =
|
||||
self.serai.as_of_latest_finalized_block().await.map_err(|e| format!("{e:?}"))?;
|
||||
let serai = serai.validator_sets();
|
||||
let current_session = serai.session(network.into()).await.map_err(|e| format!("{e:?}"))?;
|
||||
let current_session =
|
||||
serai.current_session(network.into()).await.map_err(|e| format!("{e:?}"))?;
|
||||
let current_session = current_session.map(|session| session.0);
|
||||
// Only attempt to set these keys if this isn't a retired session
|
||||
if Some(session.0) < current_session {
|
||||
@@ -67,7 +71,7 @@ impl<D: Db> ContinuallyRan for SetKeysTask<D> {
|
||||
continue;
|
||||
};
|
||||
|
||||
match self.serai.publish(&keys).await {
|
||||
match self.serai.publish_transaction(&keys).await {
|
||||
Ok(()) => {
|
||||
txn.commit();
|
||||
made_progress = true;
|
||||
|
||||
Reference in New Issue
Block a user