Remove subxt (#460)

* Remove subxt

Removes ~20 crates from our Cargo.lock.

Removes downloading the metadata and enables removing the getMetadata RPC route
(relevant to #379).

Moves forward #337.

Done now due to distinctions in the subxt 0.32 API surface which make it
justifiable to not update.

* fmt, update due to deny triggering on a yanked crate

* Correct the handling of substrate_block_notifier now that it's ephemeral, not long-lived

* Correct URL in tests/coordinator from ws to http
This commit is contained in:
Luke Parker
2023-11-28 02:29:50 -05:00
committed by GitHub
parent 571195bfda
commit 695d1f0ecf
30 changed files with 473 additions and 718 deletions

View File

@@ -24,7 +24,6 @@ use serai_db::DbTxn;
use processor_messages::SubstrateContext;
use futures::stream::StreamExt;
use tokio::{sync::mpsc, time::sleep};
use crate::{
@@ -81,7 +80,7 @@ async fn handle_new_set<D: Db>(
assert_eq!(block.number(), 0);
// Use the next block's time
loop {
let Ok(Some(res)) = serai.block_by_number(1).await else {
let Ok(Some(res)) = serai.finalized_block_by_number(1).await else {
sleep(Duration::from_secs(5)).await;
continue;
};
@@ -340,7 +339,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
next_block: &mut u64,
) -> Result<(), SeraiError> {
// Check if there's been a new Substrate block
let latest_number = serai.latest_block().await?.number();
let latest_number = serai.latest_finalized_block().await?.number();
// TODO: If this block directly builds off a cosigned block *and* doesn't contain events, mark
// cosigned,
@@ -369,7 +368,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
None => {
let serai = serai.as_of(
serai
.block_by_number(block)
.finalized_block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized")
.hash(),
@@ -432,7 +431,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
skipped_block.map(|skipped_block| skipped_block + COSIGN_DISTANCE);
for block in (last_intended_to_cosign_block + 1) ..= latest_number {
let actual_block = serai
.block_by_number(block)
.finalized_block_by_number(block)
.await?
.expect("couldn't get block which should've been finalized");
SeraiBlockNumber::set(&mut txn, actual_block.hash(), &block);
@@ -535,7 +534,7 @@ async fn handle_new_blocks<D: Db, Pro: Processors>(
processors,
serai,
serai
.block_by_number(b)
.finalized_block_by_number(b)
.await?
.expect("couldn't get block before the latest finalized block"),
)
@@ -561,6 +560,7 @@ pub async fn scan_task<D: Db, Pro: Processors>(
let mut db = SubstrateDb::new(db);
let mut next_substrate_block = db.next_block();
/*
let new_substrate_block_notifier = {
let serai = &serai;
move || async move {
@@ -575,31 +575,55 @@ pub async fn scan_task<D: Db, Pro: Processors>(
}
}
};
let mut substrate_block_notifier = new_substrate_block_notifier().await;
*/
// TODO: Restore the above subscription-based system
let new_substrate_block_notifier = {
let serai = &serai;
move |next_substrate_block| async move {
loop {
match serai.latest_finalized_block().await {
Ok(latest) => {
if latest.header().number >= next_substrate_block {
return latest;
} else {
sleep(Duration::from_secs(3)).await;
}
}
Err(e) => {
log::error!("couldn't communicate with serai node: {e}");
sleep(Duration::from_secs(5)).await;
}
}
}
}
};
loop {
// await the next block, yet if our notifier had an error, re-create it
{
let Ok(next_block) =
tokio::time::timeout(Duration::from_secs(60), substrate_block_notifier.next()).await
let Ok(_) = tokio::time::timeout(
Duration::from_secs(60),
new_substrate_block_notifier(next_substrate_block),
)
.await
else {
// Timed out, which may be because Serai isn't finalizing or may be some issue with the
// notifier
if serai.latest_block().await.map(|block| block.number()).ok() ==
if serai.latest_finalized_block().await.map(|block| block.number()).ok() ==
Some(next_substrate_block.saturating_sub(1))
{
log::info!("serai hasn't finalized a block in the last 60s...");
} else {
substrate_block_notifier = new_substrate_block_notifier().await;
}
continue;
};
/*
// next_block is a Option<Result>
if next_block.and_then(Result::ok).is_none() {
substrate_block_notifier = new_substrate_block_notifier().await;
substrate_block_notifier = new_substrate_block_notifier(next_substrate_block);
continue;
}
*/
}
match handle_new_blocks(
@@ -632,12 +656,10 @@ pub(crate) async fn get_expected_next_batch(serai: &Serai, network: NetworkId) -
}
first = false;
let Ok(latest_block) = serai.latest_block().await else {
let Ok(serai) = serai.as_of_latest_finalized_block().await else {
continue;
};
let Ok(last) =
serai.as_of(latest_block.hash()).in_instructions().last_batch_for_network(network).await
else {
let Ok(last) = serai.in_instructions().last_batch_for_network(network).await else {
continue;
};
break if let Some(last) = last { last + 1 } else { 0 };