Use a timeout in case the JSON-RPC notifications have unexpected behavior

This commit is contained in:
Luke Parker
2023-08-30 17:57:33 -04:00
parent d5a19eca8c
commit 493a222421

View File

@@ -122,12 +122,23 @@ pub async fn scan_substrate<D: Db, Pro: Processors>(
loop { loop {
// await the next block, yet if our notifier had an error, re-create it // await the next block, yet if our notifier had an error, re-create it
{ {
if substrate_block_notifier let Ok(next_block) =
.next() tokio::time::timeout(Duration::from_secs(60), substrate_block_notifier.next()).await
.await else {
.and_then(|result| if result.is_err() { None } else { Some(()) }) // Timed out, which may be because Serai isn't finalizing or may be some issue with the
.is_none() // notifier
if serai.get_latest_block().await.map(|block| block.number()).ok() ==
Some(next_substrate_block.saturating_sub(1))
{ {
log::info!("serai hasn't finalized a block in the last 60s...");
} else {
substrate_block_notifier = new_substrate_block_notifier().await;
}
continue;
};
// next_block is a Option<Result>
if next_block.and_then(Result::ok).is_none() {
substrate_block_notifier = new_substrate_block_notifier().await; substrate_block_notifier = new_substrate_block_notifier().await;
continue; continue;
} }
@@ -249,11 +260,11 @@ pub async fn scan_tributaries<
} }
// This is assumed to be some ephemeral error due to the assumed fault-free // This is assumed to be some ephemeral error due to the assumed fault-free
// creation // creation
// TODO: Differentiate connection errors from invariants // TODO2: Differentiate connection errors from invariants
Err(e) => { Err(e) => {
// Check if this failed because the keys were already set by someone else // Check if this failed because the keys were already set by someone else
if matches!(serai.get_keys(spec.set()).await, Ok(Some(_))) { if matches!(serai.get_keys(spec.set()).await, Ok(Some(_))) {
log::info!("other party set key pair for {:?}", set); log::info!("another coordinator set key pair for {:?}", set);
break; break;
} }
@@ -271,7 +282,7 @@ pub async fn scan_tributaries<
} }
// Sleep for half the block time // Sleep for half the block time
// TODO2: Should we define a notification system for when a new block occurs? // TODO2: Define a notification system for when a new block occurs
sleep(Duration::from_secs((Tributary::<D, Transaction, P>::block_time() / 2).into())).await; sleep(Duration::from_secs((Tributary::<D, Transaction, P>::block_time() / 2).into())).await;
} }
} }
@@ -627,7 +638,7 @@ pub async fn handle_processors<D: Db, Pro: Processors, P: P2p>(
"processor sent us a batch for a different network than it was for", "processor sent us a batch for a different network than it was for",
); );
// TODO: Check this key's key pair's substrate key is authorized to publish batches // TODO: Check this key's key pair's substrate key is authorized to publish batches
// TODO: Check the batch ID is an atomic increment // TODO: Handle the fact batch n+1 can be signed before batch n
let tx = Serai::execute_batch(batch.clone()); let tx = Serai::execute_batch(batch.clone());
loop { loop {