Make publish_signed_transaction safe for out of order publications

This is a possibility under the new deterministic nonce scheme.

While there is a concern of us never creating a transaction with a nonce,
blocking everything, we should always create transactions. We'll always publish
preprocesses, and while we'll only publish shares if everyone else does, we
only allocate for shares once everyone else does.
This commit is contained in:
Luke Parker
2023-09-27 00:44:31 -04:00
parent db8dc1e864
commit 64d370ac11
2 changed files with 50 additions and 19 deletions

View File

@@ -5,7 +5,8 @@ use serai_client::{primitives::NetworkId, in_instructions::primitives::SignedBat
pub use serai_db::*; pub use serai_db::*;
use crate::tributary::TributarySpec; use ::tributary::ReadWrite;
use crate::tributary::{TributarySpec, Transaction};
#[derive(Debug)] #[derive(Debug)]
pub struct MainDb<D: Db>(PhantomData<D>); pub struct MainDb<D: Db>(PhantomData<D>);
@@ -51,6 +52,21 @@ impl<D: Db> MainDb<D> {
txn.put(key, existing_bytes); txn.put(key, existing_bytes);
} }
fn signed_transaction_key(nonce: u32) -> Vec<u8> {
Self::main_key(b"signed_transaction", nonce.to_le_bytes())
}
pub fn save_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32, tx: Transaction) {
txn.put(Self::signed_transaction_key(nonce), tx.serialize());
}
pub fn take_signed_transaction(txn: &mut D::Transaction<'_>, nonce: u32) -> Option<Transaction> {
let key = Self::signed_transaction_key(nonce);
let res = txn.get(&key).map(|bytes| Transaction::read(&mut bytes.as_slice()).unwrap());
if res.is_some() {
txn.del(&key);
}
res
}
fn first_preprocess_key(id: [u8; 32]) -> Vec<u8> { fn first_preprocess_key(id: [u8; 32]) -> Vec<u8> {
Self::main_key(b"first_preprocess", id) Self::main_key(b"first_preprocess", id)
} }

View File

@@ -490,27 +490,42 @@ pub async fn handle_p2p<D: Db, P: P2p>(
} }
} }
pub async fn publish_signed_transaction<D: Db, P: P2p>( async fn publish_signed_transaction<D: Db, P: P2p>(
db: &mut D,
tributary: &Tributary<D, Transaction, P>, tributary: &Tributary<D, Transaction, P>,
tx: Transaction, tx: Transaction,
) { ) {
log::debug!("publishing transaction {}", hex::encode(tx.hash())); log::debug!("publishing transaction {}", hex::encode(tx.hash()));
if let TransactionKind::Signed(signed) = tx.kind() {
// TODO: What if we try to publish TX with a nonce of 5 when the blockchain only has 3? let mut txn = db.txn();
if tributary let signer = if let TransactionKind::Signed(signed) = tx.kind() {
.next_nonce(signed.signer) let signer = signed.signer;
.await
.expect("we don't have a nonce, meaning we aren't a participant on this tributary") > // Safe as we should deterministically create transactions, meaning if this is already on-disk,
signed.nonce // it's what we're saving now
{ MainDb::<D>::save_signed_transaction(&mut txn, signed.nonce, tx);
log::warn!("we've already published this transaction. this should only appear on reboot");
} else { signer
// We should've created a valid transaction
assert!(tributary.add_transaction(tx).await, "created an invalid transaction");
}
} else { } else {
panic!("non-signed transaction passed to publish_signed_transaction"); panic!("non-signed transaction passed to publish_signed_transaction");
};
// If we're trying to publish 5, when the last transaction published was 3, this will delay
// publication until the point in time we publish 4
while let Some(tx) = MainDb::<D>::take_signed_transaction(
&mut txn,
tributary
.next_nonce(signer)
.await
.expect("we don't have a nonce, meaning we aren't a participant on this tributary"),
) {
// We should've created a valid transaction
// This does assume publish_signed_transaction hasn't been called twice with the same
// transaction, which risks a race condition on the validity of this assert
// Our use case only calls this function sequentially
assert!(tributary.add_transaction(tx).await, "created an invalid transaction");
} }
txn.commit();
} }
async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>( async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
@@ -521,7 +536,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
tributary: ActiveTributary<D, P>, tributary: ActiveTributary<D, P>,
mut recv: mpsc::UnboundedReceiver<processors::Message>, mut recv: mpsc::UnboundedReceiver<processors::Message>,
) { ) {
let db_clone = db.clone(); // Enables cloning the DB while we have a txn let mut db_clone = db.clone(); // Enables cloning the DB while we have a txn
let pub_key = Ristretto::generator() * key.deref(); let pub_key = Ristretto::generator() * key.deref();
let ActiveTributary { spec, tributary } = tributary; let ActiveTributary { spec, tributary } = tributary;
@@ -799,7 +814,7 @@ async fn handle_processor_messages<D: Db, Pro: Processors, P: P2p>(
}; };
tx.sign(&mut OsRng, genesis, &key, nonce); tx.sign(&mut OsRng, genesis, &key, nonce);
publish_signed_transaction(&tributary, tx).await; publish_signed_transaction(&mut db_clone, &tributary, tx).await;
} }
} }
} }
@@ -953,7 +968,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
}); });
move |network, genesis, id_type, id, nonce| { move |network, genesis, id_type, id, nonce| {
let raw_db = raw_db.clone(); let mut raw_db = raw_db.clone();
let key = key.clone(); let key = key.clone();
let tributaries = tributaries.clone(); let tributaries = tributaries.clone();
async move { async move {
@@ -994,7 +1009,7 @@ pub async fn run<D: Db, Pro: Processors, P: P2p>(
// TODO: This may happen if the task above is simply slow // TODO: This may happen if the task above is simply slow
panic!("tributary we don't have came to consensus on an Batch"); panic!("tributary we don't have came to consensus on an Batch");
}; };
publish_signed_transaction(tributary, tx).await; publish_signed_transaction(&mut raw_db, tributary, tx).await;
} }
} }
}; };