Provide Cosign/CosignIntent for Tributaries

This commit is contained in:
Luke Parker
2025-01-11 01:31:28 -05:00
parent 378d6b90cf
commit 542bf2170a
8 changed files with 265 additions and 117 deletions

2
Cargo.lock generated
View File

@@ -8337,8 +8337,6 @@ dependencies = [
"serai-message-queue",
"serai-processor-messages",
"serai-task",
"sp-application-crypto",
"sp-runtime",
"tokio",
"tributary-chain",
"zalloc",

View File

@@ -82,13 +82,13 @@ enum HasEvents {
#[derive(Clone, Copy, PartialEq, Eq, Debug, BorshSerialize, BorshDeserialize)]
pub struct CosignIntent {
/// The global session this cosign is being performed under.
global_session: [u8; 32],
pub global_session: [u8; 32],
/// The number of the block to cosign.
block_number: u64,
pub block_number: u64,
/// The hash of the block to cosign.
block_hash: [u8; 32],
pub block_hash: [u8; 32],
/// If this cosign must be handled before further cosigns are.
notable: bool,
pub notable: bool,
}
/// A cosign.

79
coordinator/src/db.rs Normal file
View File

@@ -0,0 +1,79 @@
use std::{path::Path, fs};
pub(crate) use serai_db::{Get, DbTxn, Db as DbTrait};
use serai_db::{create_db, db_channel};
use serai_client::{
primitives::NetworkId,
validator_sets::primitives::{Session, ValidatorSet},
};
use serai_cosign::CosignIntent;
use serai_coordinator_substrate::NewSetInformation;
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
pub(crate) type Db = serai_db::ParityDb;
#[cfg(feature = "rocksdb")]
pub(crate) type Db = serai_db::RocksDB;
#[allow(unused_variables, unreachable_code)]
fn db(path: &str) -> Db {
{
let path: &Path = path.as_ref();
// This may error if this path already exists, which we shouldn't propagate/panic on. If this
// is a problem (such as we don't have the necessary permissions to write to this path), we
// expect the following DB opening to error.
let _: Result<_, _> = fs::create_dir_all(path.parent().unwrap());
}
#[cfg(all(feature = "parity-db", feature = "rocksdb"))]
panic!("built with parity-db and rocksdb");
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
let db = serai_db::new_parity_db(path);
#[cfg(feature = "rocksdb")]
let db = serai_db::new_rocksdb(path);
db
}
pub(crate) fn coordinator_db() -> Db {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
db(&format!("{root_path}/coordinator/db"))
}
fn tributary_db_folder(set: ValidatorSet) -> String {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
let network = match set.network {
NetworkId::Serai => panic!("creating Tributary for the Serai network"),
NetworkId::Bitcoin => "Bitcoin",
NetworkId::Ethereum => "Ethereum",
NetworkId::Monero => "Monero",
};
format!("{root_path}/tributary-{network}-{}", set.session.0)
}
pub(crate) fn tributary_db(set: ValidatorSet) -> Db {
db(&format!("{}/db", tributary_db_folder(set)))
}
pub(crate) fn prune_tributary_db(set: ValidatorSet) {
log::info!("pruning data directory for tributary {set:?}");
let db = tributary_db_folder(set);
if fs::exists(&db).expect("couldn't check if tributary DB exists") {
fs::remove_dir_all(db).unwrap();
}
}
create_db! {
Coordinator {
ActiveTributaries: () -> Vec<NewSetInformation>,
RetiredTributary: (network: NetworkId) -> Session,
}
}
db_channel! {
Coordinator {
TributaryCleanup: () -> ValidatorSet,
PendingCosigns: (set: ValidatorSet) -> CosignIntent,
}
}

View File

@@ -1,5 +1,5 @@
use core::{marker::PhantomData, ops::Deref, time::Duration};
use std::{sync::Arc, collections::HashMap, time::Instant, path::Path, fs};
use std::{sync::Arc, collections::HashMap, time::Instant};
use zeroize::{Zeroize, Zeroizing};
use rand_core::{RngCore, OsRng};
@@ -20,31 +20,19 @@ use serai_client::{
};
use message_queue::{Service, client::MessageQueue};
use serai_db::{*, Db as DbTrait};
use ::tributary::Tributary;
use ::tributary::ProvidedError;
use serai_task::{Task, TaskHandle, ContinuallyRan};
use serai_cosign::{SignedCosign, Cosigning};
use serai_coordinator_substrate::{NewSetInformation, CanonicalEventStream, EphemeralEventStream};
mod db;
use db::*;
mod tributary;
use tributary::{Transaction, ScanTributaryTask};
create_db! {
Coordinator {
ActiveTributaries: () -> Vec<NewSetInformation>,
RetiredTributary: (set: ValidatorSet) -> (),
}
}
db_channel! {
Coordinator {
TributaryCleanup: () -> ValidatorSet,
}
}
mod p2p {
pub use serai_coordinator_p2p::*;
pub use serai_coordinator_libp2p_p2p::Libp2p;
@@ -58,49 +46,7 @@ mod p2p {
static ALLOCATOR: zalloc::ZeroizingAlloc<std::alloc::System> =
zalloc::ZeroizingAlloc(std::alloc::System);
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
type Db = serai_db::ParityDb;
#[cfg(feature = "rocksdb")]
type Db = serai_db::RocksDB;
#[allow(unused_variables, unreachable_code)]
fn db(path: &str) -> Db {
{
let path: &Path = path.as_ref();
// This may error if this path already exists, which we shouldn't propagate/panic on. If this
// is a problem (such as we don't have the necessary permissions to write to this path), we
// expect the following DB opening to error.
let _: Result<_, _> = fs::create_dir_all(path.parent().unwrap());
}
#[cfg(all(feature = "parity-db", feature = "rocksdb"))]
panic!("built with parity-db and rocksdb");
#[cfg(all(feature = "parity-db", not(feature = "rocksdb")))]
let db = serai_db::new_parity_db(path);
#[cfg(feature = "rocksdb")]
let db = serai_db::new_rocksdb(path);
db
}
fn coordinator_db() -> Db {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
db(&format!("{root_path}/coordinator/db"))
}
fn tributary_db_folder(set: ValidatorSet) -> String {
let root_path = serai_env::var("DB_PATH").expect("path to DB wasn't specified");
let network = match set.network {
NetworkId::Serai => panic!("creating Tributary for the Serai network"),
NetworkId::Bitcoin => "Bitcoin",
NetworkId::Ethereum => "Ethereum",
NetworkId::Monero => "Monero",
};
format!("{root_path}/tributary-{network}-{}", set.session.0)
}
fn tributary_db(set: ValidatorSet) -> Db {
db(&format!("{}/db", tributary_db_folder(set)))
}
type Tributary<P> = ::tributary::Tributary<Db, Transaction, P>;
async fn serai() -> Arc<Serai> {
const SERAI_CONNECTION_DELAY: Duration = Duration::from_secs(10);
@@ -124,7 +70,6 @@ async fn serai() -> Arc<Serai> {
}
}
// TODO: intended_cosigns
fn spawn_cosigning(
db: impl serai_db::Db,
serai: Arc<Serai>,
@@ -167,12 +112,13 @@ fn spawn_cosigning(
async fn spawn_tributary<P: p2p::P2p>(
mut db: Db,
p2p: P,
p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary<Db, Transaction, P>)>,
p2p_add_tributary: &mpsc::UnboundedSender<(ValidatorSet, Tributary<P>)>,
set: NewSetInformation,
serai_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
) {
// Don't spawn retired Tributaries
if RetiredTributary::get(&db, set.set).is_some() {
if RetiredTributary::get(&db, set.set.network).map(|session| session.0) >= Some(set.set.session.0)
{
return;
}
@@ -216,21 +162,15 @@ async fn spawn_tributary<P: p2p::P2p>(
}
let tributary_db = tributary_db(set.set);
let tributary = Tributary::<_, Transaction, _>::new(
tributary_db.clone(),
genesis,
start_time,
serai_key,
tributary_validators,
p2p,
)
.await
.unwrap();
let mut tributary =
Tributary::new(tributary_db.clone(), genesis, start_time, serai_key, tributary_validators, p2p)
.await
.unwrap();
let reader = tributary.reader();
p2p_add_tributary.send((set.set, tributary)).expect("p2p's add_tributary channel was closed?");
let (scan_tributary_task_def, scan_tributary_task) = Task::new();
let (scan_tributary_task_def, mut scan_tributary_task) = Task::new();
tokio::spawn(
(ScanTributaryTask {
cosign_db: db.clone(),
@@ -246,21 +186,114 @@ async fn spawn_tributary<P: p2p::P2p>(
);
// TODO^ On Tributary block, drain this task's ProcessorMessages
// Have the tributary scanner run as soon as there's a new block
tokio::spawn(async move {
loop {
if RetiredTributary::get(&db, set.set).is_some() {
// Break once this Tributary is retired
if RetiredTributary::get(&db, set.set.network).map(|session| session.0) >=
Some(set.set.session.0)
{
break;
}
tributary
.next_block_notification()
.await
.await
.map_err(|_| ())
let provide = |tributary: Tributary<_>, scan_tributary_task, tx: Transaction| async move {
match tributary.provide_transaction(tx.clone()).await {
// The Tributary uses its own DB, so we may provide this multiple times if we reboot
// before committing the txn which provoked this
Ok(()) | Err(ProvidedError::AlreadyProvided) => {}
Err(ProvidedError::NotProvided) => {
panic!("providing a Transaction which wasn't a Provided transaction?");
}
Err(ProvidedError::InvalidProvided(e)) => {
panic!("providing an invalid Provided transaction: {e:?}")
}
Err(ProvidedError::LocalMismatchesOnChain) => {
// Drop the Tributary and scan Tributary task so we don't continue running them here
drop(tributary);
drop(scan_tributary_task);
loop {
// We're actually only halting the Tributary's scan task (which already only scans
// if all Provided transactions align) as the P2P task is still maintaining a clone
// of the Tributary handle
log::error!(
"Tributary {:?} was supposed to provide {:?} but peers disagree, halting Tributary",
set.set,
tx,
);
// Print this every five minutes as this does need to be handled
tokio::time::sleep(Duration::from_secs(5 * 60)).await;
}
// Declare this unreachable so Rust will let us perform the above drops
unreachable!();
}
}
(tributary, scan_tributary_task)
};
// Check if we produced any cosigns we were supposed to
let mut pending_notable_cosign = false;
loop {
let mut txn = db.txn();
// Fetch the next cosign this tributary should handle
let Some(cosign) = PendingCosigns::try_recv(&mut txn, set.set) else { break };
pending_notable_cosign = cosign.notable;
// If we (Serai) haven't cosigned this block, break as this is still pending
let Ok(latest) = Cosigning::<Db>::latest_cosigned_block_number(&txn) else { break };
if latest < cosign.block_number {
break;
}
// Because we've cosigned it, provide the TX for that
(tributary, scan_tributary_task) = provide(
tributary,
scan_tributary_task,
Transaction::Cosigned { substrate_block_hash: cosign.block_hash },
)
.await;
// Clear pending_notable_cosign since this cosign isn't pending
pending_notable_cosign = false;
// Commit the txn to clear this from PendingCosigns
txn.commit();
}
// If we don't have any notable cosigns pending, provide the next set of cosign intents
if pending_notable_cosign {
let mut txn = db.txn();
// intended_cosigns will only yield up to and including the next notable cosign
for cosign in Cosigning::<Db>::intended_cosigns(&mut txn, set.set) {
// Flag this cosign as pending
PendingCosigns::send(&mut txn, set.set, &cosign);
// Provide the transaction to queue it for work
(tributary, scan_tributary_task) = provide(
tributary,
scan_tributary_task,
Transaction::Cosign { substrate_block_hash: cosign.block_hash },
)
.await;
}
txn.commit();
}
// Have the tributary scanner run as soon as there's a new block
// This is wrapped in a timeout so we don't go too long without running the above code
match tokio::time::timeout(
Duration::from_millis(::tributary::tendermint::TARGET_BLOCK_TIME.into()),
tributary.next_block_notification().await,
)
.await
{
// Future resolved within the timeout, notification
Ok(Ok(())) => scan_tributary_task.run_now(),
// Future resolved within the timeout, notification failed due to sender being dropped
// unreachable since this owns the tributary object and doesn't drop it
.expect("tributary was dropped causing notification to error");
scan_tributary_task.run_now();
Ok(Err(_)) => panic!("tributary was dropped causing notification to error"),
// Future didn't resolve within the timeout
Err(_) => {}
}
}
});
}
@@ -311,15 +344,17 @@ async fn main() {
// Cleanup all historic Tributaries
while let Some(to_cleanup) = TributaryCleanup::try_recv(&mut txn) {
// TributaryCleanup may fire this message multiple times so this may fail if we've already
// performed cleanup
log::info!("pruning data directory for historic tributary {to_cleanup:?}");
let _: Result<_, _> = fs::remove_dir_all(tributary_db_folder(to_cleanup));
prune_tributary_db(to_cleanup);
// Drain the cosign intents created for this set
while !Cosigning::<Db>::intended_cosigns(&mut txn, to_cleanup).is_empty() {}
}
// Remove retired Tributaries from ActiveTributaries
let mut active_tributaries = ActiveTributaries::get(&txn).unwrap_or(vec![]);
active_tributaries.retain(|tributary| RetiredTributary::get(&txn, tributary.set).is_none());
active_tributaries.retain(|tributary| {
RetiredTributary::get(&txn, tributary.set.network).map(|session| session.0) <
Some(tributary.set.session.0)
});
ActiveTributaries::set(&mut txn, &active_tributaries);
txn.commit();

View File

@@ -189,8 +189,10 @@ create_db!(
// The latest Substrate block to cosign.
LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32],
// If we're actively cosigning or not.
ActivelyCosigning: (set: ValidatorSet) -> (),
// The hash of the block we're actively cosigning.
ActivelyCosigning: (set: ValidatorSet) -> [u8; 32],
// If this block has already been cosigned.
Cosigned: (set: ValidatorSet, substrate_block_hash: [u8; 32]) -> (),
// The weight accumulated for a topic.
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
@@ -238,19 +240,20 @@ impl TributaryDb {
) {
LatestSubstrateBlockToCosign::set(txn, set, &substrate_block_hash);
}
pub(crate) fn actively_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) -> bool {
ActivelyCosigning::get(txn, set).is_some()
pub(crate) fn actively_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) -> Option<[u8; 32]> {
ActivelyCosigning::get(txn, set)
}
pub(crate) fn start_cosigning(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
substrate_block_number: u64,
) {
assert!(
ActivelyCosigning::get(txn, set).is_none(),
"starting cosigning while already cosigning"
);
ActivelyCosigning::set(txn, set, &());
ActivelyCosigning::set(txn, set, &substrate_block_hash);
TributaryDb::recognize_topic(
txn,
@@ -265,6 +268,20 @@ impl TributaryDb {
pub(crate) fn finish_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) {
assert!(ActivelyCosigning::take(txn, set).is_some(), "finished cosigning but not cosigning");
}
pub(crate) fn mark_cosigned(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
) {
Cosigned::set(txn, set, substrate_block_hash, &());
}
pub(crate) fn cosigned(
txn: &mut impl DbTxn,
set: ValidatorSet,
substrate_block_hash: [u8; 32],
) -> bool {
Cosigned::get(txn, set, substrate_block_hash).is_some()
}
pub(crate) fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) {
AccumulatedWeight::set(txn, set, topic, &0);

View File

@@ -45,17 +45,22 @@ struct ScanBlock<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> {
impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
fn potentially_start_cosign(&mut self) {
// Don't start a new cosigning instance if we're actively running one
if TributaryDb::actively_cosigning(self.tributary_txn, self.set) {
if TributaryDb::actively_cosigning(self.tributary_txn, self.set).is_some() {
return;
}
// Start cosigning the latest intended-to-be-cosigned block
// Fetch the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
else {
return;
};
// If it was already cosigned, return
if TributaryDb::cosigned(self.tributary_txn, self.set, latest_substrate_block_to_cosign) {
return;
}
let Some(substrate_block_number) =
Cosigning::<CD>::finalized_block_number(self.cosign_db, latest_substrate_block_to_cosign)
else {
@@ -65,7 +70,12 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
};
// Mark us as actively cosigning
TributaryDb::start_cosigning(self.tributary_txn, self.set, substrate_block_number);
TributaryDb::start_cosigning(
self.tributary_txn,
self.set,
latest_substrate_block_to_cosign,
substrate_block_number,
);
// Send the message for the processor to start signing
TributaryDb::send_message(
self.tributary_txn,
@@ -154,24 +164,31 @@ impl<'a, CD: Db, TD: Db, TDT: DbTxn, P: P2p> ScanBlock<'a, CD, TD, TDT, P> {
self.set,
substrate_block_hash,
);
// Start a new cosign if we weren't already working on one
// Start a new cosign if we aren't already working on one
self.potentially_start_cosign();
}
Transaction::Cosigned { substrate_block_hash } => {
TributaryDb::finish_cosigning(self.tributary_txn, self.set);
/*
We provide one Cosigned per Cosign transaction, but they have independent orders. This
means we may receive Cosigned before Cosign. In order to ensure we only start work on
not-yet-Cosigned cosigns, we flag all cosigned blocks as cosigned. Then, when we choose
the next block to work on, we won't if it's already been cosigned.
*/
TributaryDb::mark_cosigned(self.tributary_txn, self.set, substrate_block_hash);
// Fetch the latest intended-to-be-cosigned block
let Some(latest_substrate_block_to_cosign) =
TributaryDb::latest_substrate_block_to_cosign(self.tributary_txn, self.set)
else {
return;
};
// If this is the block we just cosigned, return, preventing us from signing it again
if latest_substrate_block_to_cosign == substrate_block_hash {
// If we aren't actively cosigning this block, return
// This occurs when we have Cosign TXs A, B, C, we received Cosigned for A and start on C,
// and then receive Cosigned for B
if TributaryDb::actively_cosigning(self.tributary_txn, self.set) !=
Some(substrate_block_hash)
{
return;
}
// Since we do have a new cosign to work on, start it
// Since this is the block we were cosigning, mark us as having finished cosigning
TributaryDb::finish_cosigning(self.tributary_txn, self.set);
// Start working on the next cosign
self.potentially_start_cosign();
}
Transaction::SubstrateBlock { hash } => {

View File

@@ -231,9 +231,11 @@ impl TransactionTrait for Transaction {
TransactionKind::Signed((b"DkgConfirmation", attempt).encode(), signed.nonce(1))
}
Transaction::Cosign { .. } => TransactionKind::Provided("CosignSubstrateBlock"),
Transaction::Cosign { .. } => TransactionKind::Provided("Cosign"),
Transaction::Cosigned { .. } => TransactionKind::Provided("Cosigned"),
// TODO: Provide this
Transaction::SubstrateBlock { .. } => TransactionKind::Provided("SubstrateBlock"),
// TODO: Provide this
Transaction::Batch { .. } => TransactionKind::Provided("Batch"),
Transaction::Sign { id, attempt, round, signed, .. } => {

View File

@@ -213,17 +213,17 @@ pub mod substrate {
pub enum CoordinatorMessage {
/// Keys set on the Serai blockchain.
///
/// This is set by the Coordinator's Substrate canonical event stream.
/// This is sent by the Coordinator's Substrate canonical event stream.
SetKeys { serai_time: u64, session: Session, key_pair: KeyPair },
/// Slashes reported on the Serai blockchain OR the process timed out.
///
/// This is the final message for a session,
///
/// This is set by the Coordinator's Substrate canonical event stream.
/// This is sent by the Coordinator's Substrate canonical event stream.
SlashesReported { session: Session },
/// A block from Serai with relevance to this processor.
///
/// This is set by the Coordinator's Substrate canonical event stream.
/// This is sent by the Coordinator's Substrate canonical event stream.
Block {
serai_block_number: u64,
batch: Option<ExecutedBatch>,