Reduce and clarify data flow in Tributary scanner

This commit is contained in:
Luke Parker
2023-09-01 00:59:10 -04:00
parent 3f3f6b2d0c
commit fda90e23c9
4 changed files with 168 additions and 175 deletions

View File

@@ -359,6 +359,8 @@ pub async fn handle_p2p<D: Db, P: P2p>(
} }
// TODO2: Rate limit this per timestamp // TODO2: Rate limit this per timestamp
// And/or slash on Heartbeat which justifies a response, since the node obviously was
// offline and we must now use our bandwidth to compensate for them?
P2pMessageKind::Heartbeat(genesis) => { P2pMessageKind::Heartbeat(genesis) => {
if msg.msg.len() != 40 { if msg.msg.len() != 40 {
log::error!("validator sent invalid heartbeat"); log::error!("validator sent invalid heartbeat");

View File

@@ -8,6 +8,47 @@ use serai_client::validator_sets::primitives::{ValidatorSet, KeyPair};
pub use serai_db::*; pub use serai_db::*;
// Used to determine if an ID is acceptable
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub enum Zone {
Dkg,
Batch,
Sign,
}
impl Zone {
fn label(&self) -> &'static str {
match self {
Zone::Dkg => "dkg",
Zone::Batch => "batch",
Zone::Sign => "sign",
}
}
}
// A struct to refer to a piece of data all validators will presumably provide a value for.
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct DataSpecification {
pub zone: Zone,
pub label: &'static str,
pub id: [u8; 32],
pub attempt: u32,
}
impl DataSpecification {
fn as_key(&self, genesis: [u8; 32]) -> Vec<u8> {
// TODO: Use a proper transcript here to avoid conflicts?
[
genesis.as_ref(),
self.zone.label().as_bytes(),
self.label.as_bytes(),
self.id.as_ref(),
self.attempt.to_le_bytes().as_ref(),
]
.concat()
}
}
#[derive(Debug)] #[derive(Debug)]
pub struct TributaryDb<D: Db>(pub D); pub struct TributaryDb<D: Db>(pub D);
impl<D: Db> TributaryDb<D> { impl<D: Db> TributaryDb<D> {
@@ -97,24 +138,17 @@ impl<D: Db> TributaryDb<D> {
Some(KeyPair::decode(&mut getter.get(Self::key_pair_key(set))?.as_slice()).unwrap()) Some(KeyPair::decode(&mut getter.get(Self::key_pair_key(set))?.as_slice()).unwrap())
} }
fn recognized_id_key(label: &'static str, genesis: [u8; 32], id: [u8; 32]) -> Vec<u8> { fn recognized_id_key(genesis: [u8; 32], zone: Zone, id: [u8; 32]) -> Vec<u8> {
Self::tributary_key(b"recognized", [label.as_bytes(), genesis.as_ref(), id.as_ref()].concat()) Self::tributary_key(
b"recognized",
[genesis.as_ref(), zone.label().as_bytes(), id.as_ref()].concat(),
)
} }
pub fn recognized_id<G: Get>( pub fn recognized_id<G: Get>(getter: &G, genesis: [u8; 32], zone: Zone, id: [u8; 32]) -> bool {
getter: &G, getter.get(Self::recognized_id_key(genesis, zone, id)).is_some()
label: &'static str,
genesis: [u8; 32],
id: [u8; 32],
) -> bool {
getter.get(Self::recognized_id_key(label, genesis, id)).is_some()
} }
pub fn recognize_id( pub fn recognize_id(txn: &mut D::Transaction<'_>, genesis: [u8; 32], zone: Zone, id: [u8; 32]) {
txn: &mut D::Transaction<'_>, txn.put(Self::recognized_id_key(genesis, zone, id), [])
label: &'static str,
genesis: [u8; 32],
id: [u8; 32],
) {
txn.put(Self::recognized_id_key(label, genesis, id), [])
} }
fn attempt_key(genesis: [u8; 32], id: [u8; 32]) -> Vec<u8> { fn attempt_key(genesis: [u8; 32], id: [u8; 32]) -> Vec<u8> {
@@ -127,62 +161,42 @@ impl<D: Db> TributaryDb<D> {
) )
} }
fn data_received_key( // Key for the amount of instances received thus far
label: &'static [u8], fn data_received_key(genesis: [u8; 32], data_spec: &DataSpecification) -> Vec<u8> {
genesis: [u8; 32], Self::tributary_key(b"data_received", data_spec.as_key(genesis))
id: [u8; 32],
attempt: u32,
) -> Vec<u8> {
Self::tributary_key(
b"data_received",
[label, genesis.as_ref(), id.as_ref(), attempt.to_le_bytes().as_ref()].concat(),
)
} }
fn data_key( fn data_key(
label: &'static [u8],
genesis: [u8; 32], genesis: [u8; 32],
id: [u8; 32], data_spec: &DataSpecification,
attempt: u32,
signer: <Ristretto as Ciphersuite>::G, signer: <Ristretto as Ciphersuite>::G,
) -> Vec<u8> { ) -> Vec<u8> {
Self::tributary_key( Self::tributary_key(
b"data", b"data",
[ [data_spec.as_key(genesis).as_slice(), signer.to_bytes().as_ref()].concat(),
label,
genesis.as_ref(),
id.as_ref(),
attempt.to_le_bytes().as_ref(),
signer.to_bytes().as_ref(),
]
.concat(),
) )
} }
pub fn data<G: Get>( pub fn data<G: Get>(
label: &'static [u8],
getter: &G, getter: &G,
genesis: [u8; 32], genesis: [u8; 32],
id: [u8; 32], data_spec: &DataSpecification,
attempt: u32,
signer: <Ristretto as Ciphersuite>::G, signer: <Ristretto as Ciphersuite>::G,
) -> Option<Vec<u8>> { ) -> Option<Vec<u8>> {
getter.get(Self::data_key(label, genesis, id, attempt, signer)) getter.get(Self::data_key(genesis, data_spec, signer))
} }
pub fn set_data( pub fn set_data(
label: &'static [u8],
txn: &mut D::Transaction<'_>, txn: &mut D::Transaction<'_>,
genesis: [u8; 32], genesis: [u8; 32],
id: [u8; 32], data_spec: &DataSpecification,
attempt: u32,
signer: <Ristretto as Ciphersuite>::G, signer: <Ristretto as Ciphersuite>::G,
data: &[u8], data: &[u8],
) -> u16 { ) -> u16 {
let received_key = Self::data_received_key(label, genesis, id, attempt); let received_key = Self::data_received_key(genesis, data_spec);
let mut received = let mut received =
u16::from_le_bytes(txn.get(&received_key).unwrap_or(vec![0; 2]).try_into().unwrap()); u16::from_le_bytes(txn.get(&received_key).unwrap_or(vec![0; 2]).try_into().unwrap());
received += 1; received += 1;
txn.put(received_key, received.to_le_bytes()); txn.put(received_key, received.to_le_bytes());
txn.put(Self::data_key(label, genesis, id, attempt, signer), data); txn.put(Self::data_key(genesis, data_spec, signer), data);
received received
} }

View File

@@ -33,13 +33,18 @@ use processor_messages::{
use serai_db::{Get, Db}; use serai_db::{Get, Db};
use crate::processors::Processors; use crate::{
use super::{Transaction, TributarySpec, TributaryDb, scanner::RecognizedIdType}; processors::Processors,
tributary::{
Transaction, TributarySpec, Zone, DataSpecification, TributaryDb, scanner::RecognizedIdType,
},
};
const DKG_CONFIRMATION_NONCES: &[u8] = b"dkg_confirmation_nonces"; const DKG_CONFIRMATION_NONCES: &str = "confirmation_nonces";
const DKG_CONFIRMATION_SHARES: &[u8] = b"dkg_confirmation_shares"; const DKG_CONFIRMATION_SHARES: &str = "confirmation_shares";
// Instead of maintaing state, this simply re-creates the machine(s) in-full on every call. // Instead of maintaing state, this simply re-creates the machine(s) in-full on every call (which
// should only be once per tributary).
// This simplifies data flow and prevents requiring multiple paths. // This simplifies data flow and prevents requiring multiple paths.
// While more expensive, this only runs an O(n) algorithm, which is tolerable to run multiple // While more expensive, this only runs an O(n) algorithm, which is tolerable to run multiple
// times. // times.
@@ -154,27 +159,18 @@ impl DkgConfirmer {
} }
} }
#[allow(clippy::too_many_arguments)] // TODO
fn read_known_to_exist_data<D: Db, G: Get>( fn read_known_to_exist_data<D: Db, G: Get>(
getter: &G, getter: &G,
spec: &TributarySpec, spec: &TributarySpec,
key: &Zeroizing<<Ristretto as Ciphersuite>::F>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
label: &'static [u8], data_spec: &DataSpecification,
id: [u8; 32],
needed: u16, needed: u16,
attempt: u32,
bytes: Vec<u8>,
signed: Option<&Signed>,
) -> Option<HashMap<Participant, Vec<u8>>> { ) -> Option<HashMap<Participant, Vec<u8>>> {
let mut data = HashMap::new(); let mut data = HashMap::new();
for validator in spec.validators().iter().map(|validator| validator.0) { for validator in spec.validators().iter().map(|validator| validator.0) {
data.insert( data.insert(
spec.i(validator).unwrap(), spec.i(validator).unwrap(),
if Some(&validator) == signed.map(|signed| &signed.signer) { if let Some(data) = TributaryDb::<D>::data(getter, spec.genesis(), data_spec, validator) {
bytes.clone()
} else if let Some(data) =
TributaryDb::<D>::data(label, getter, spec.genesis(), id, attempt, validator)
{
data data
} else { } else {
continue; continue;
@@ -220,19 +216,14 @@ pub fn generated_key_pair<D: Db>(
txn, txn,
spec, spec,
key, key,
DKG_CONFIRMATION_NONCES, &DataSpecification { zone: Zone::Dkg, label: DKG_CONFIRMATION_NONCES, id: [0; 32], attempt },
[0; 32],
spec.n(), spec.n(),
attempt,
vec![],
None,
) else { ) else {
panic!("wasn't a participant in confirming a key pair"); panic!("wasn't a participant in confirming a key pair");
}; };
DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair) DkgConfirmer::share(spec, key, attempt, preprocesses, key_pair)
} }
#[allow(clippy::too_many_arguments)] // TODO
pub async fn handle_application_tx< pub async fn handle_application_tx<
D: Db, D: Db,
Pro: Processors, Pro: Processors,
@@ -245,93 +236,68 @@ pub async fn handle_application_tx<
spec: &TributarySpec, spec: &TributarySpec,
processors: &Pro, processors: &Pro,
publish_serai_tx: PST, publish_serai_tx: PST,
genesis: [u8; 32],
key: &Zeroizing<<Ristretto as Ciphersuite>::F>, key: &Zeroizing<<Ristretto as Ciphersuite>::F>,
recognized_id: RID, recognized_id: RID,
txn: &mut <D as Db>::Transaction<'_>, txn: &mut <D as Db>::Transaction<'_>,
) { ) {
// Used to determine if an ID is acceptable let genesis = spec.genesis();
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
enum Zone {
Dkg,
Batch,
Sign,
}
impl Zone { let handle = |txn: &mut _, data_spec: &DataSpecification, bytes: Vec<u8>, signed: &Signed| {
fn label(&self) -> &'static str { if data_spec.zone == Zone::Dkg {
match self { // Since Dkg doesn't have an ID, solely attempts, this should just be [0; 32]
Zone::Dkg => { assert_eq!(data_spec.id, [0; 32], "DKG, which shouldn't have IDs, had a non-0 ID");
panic!("getting the label for dkg despite dkg code paths not needing a label") } else if !TributaryDb::<D>::recognized_id(txn, genesis, data_spec.zone, data_spec.id) {
} // TODO: Full slash
Zone::Batch => "batch", todo!();
Zone::Sign => "sign",
}
} }
}
let handle = // If they've already published a TX for this attempt, slash
|txn: &mut _, zone: Zone, label, needed, id, attempt, bytes: Vec<u8>, signed: &Signed| { if let Some(data) = TributaryDb::<D>::data(txn, genesis, data_spec, signed.signer) {
if zone == Zone::Dkg { if data != bytes {
// Since Dkg doesn't have an ID, solely attempts, this should just be [0; 32]
assert_eq!(id, [0; 32], "DKG, which shouldn't have IDs, had a non-0 ID");
} else if !TributaryDb::<D>::recognized_id(txn, zone.label(), genesis, id) {
// TODO: Full slash // TODO: Full slash
todo!(); todo!();
} }
// If they've already published a TX for this attempt, slash // TODO: Slash
if let Some(data) = TributaryDb::<D>::data(label, txn, genesis, id, attempt, signed.signer) { return None;
if data != bytes { }
// TODO: Full slash
todo!();
}
// TODO: Slash // If the attempt is lesser than the blockchain's, slash
return None; let curr_attempt = TributaryDb::<D>::attempt(txn, genesis, data_spec.id);
} if data_spec.attempt < curr_attempt {
// TODO: Slash for being late
return None;
}
if data_spec.attempt > curr_attempt {
// TODO: Full slash
todo!();
}
// If the attempt is lesser than the blockchain's, slash // TODO: We can also full slash if shares before all commitments, or share before the
let curr_attempt = TributaryDb::<D>::attempt(txn, genesis, id); // necessary preprocesses
if attempt < curr_attempt {
// TODO: Slash for being late
return None;
}
if attempt > curr_attempt {
// TODO: Full slash
todo!();
}
// TODO: We can also full slash if shares before all commitments, or share before the // TODO: If this is shares, we need to check they are part of the selected signing set
// necessary preprocesses
// TODO: If this is shares, we need to check they are part of the selected signing set // Store this data
let received = TributaryDb::<D>::set_data(txn, genesis, data_spec, signed.signer, &bytes);
// Store this data // If we have all the needed commitments/preprocesses/shares, tell the processor
let received = // TODO: This needs to be coded by weight, not by validator count
TributaryDb::<D>::set_data(label, txn, genesis, id, attempt, signed.signer, &bytes); let needed = if data_spec.zone == Zone::Dkg { spec.n() } else { spec.t() };
if received == needed {
// If we have all the needed commitments/preprocesses/shares, tell the processor return Some(read_known_to_exist_data::<D, _>(txn, spec, key, data_spec, needed));
// TODO: This needs to be coded by weight, not by validator count }
if received == needed { None
return Some(read_known_to_exist_data::<D, _>( };
txn,
spec,
key,
label,
id,
needed,
attempt,
bytes,
Some(signed),
));
}
None
};
match tx { match tx {
Transaction::DkgCommitments(attempt, bytes, signed) => { Transaction::DkgCommitments(attempt, bytes, signed) => {
match handle(txn, Zone::Dkg, b"dkg_commitments", spec.n(), [0; 32], attempt, bytes, &signed) { match handle(
txn,
&DataSpecification { zone: Zone::Dkg, label: "commitments", id: [0; 32], attempt },
bytes,
&signed,
) {
Some(Some(commitments)) => { Some(Some(commitments)) => {
log::info!("got all DkgCommitments for {}", hex::encode(genesis)); log::info!("got all DkgCommitments for {}", hex::encode(genesis));
processors processors
@@ -377,15 +343,21 @@ pub async fn handle_application_tx<
let confirmation_nonces = handle( let confirmation_nonces = handle(
txn, txn,
Zone::Dkg, &DataSpecification {
DKG_CONFIRMATION_NONCES, zone: Zone::Dkg,
spec.n(), label: DKG_CONFIRMATION_NONCES,
[0; 32], id: [0; 32],
attempt, attempt,
},
confirmation_nonces.to_vec(), confirmation_nonces.to_vec(),
&signed, &signed,
); );
match handle(txn, Zone::Dkg, b"dkg_shares", spec.n(), [0; 32], attempt, bytes, &signed) { match handle(
txn,
&DataSpecification { zone: Zone::Dkg, label: "shares", id: [0; 32], attempt },
bytes,
&signed,
) {
Some(Some(shares)) => { Some(Some(shares)) => {
log::info!("got all DkgShares for {}", hex::encode(genesis)); log::info!("got all DkgShares for {}", hex::encode(genesis));
assert!(confirmation_nonces.is_some()); assert!(confirmation_nonces.is_some());
@@ -407,11 +379,12 @@ pub async fn handle_application_tx<
Transaction::DkgConfirmed(attempt, shares, signed) => { Transaction::DkgConfirmed(attempt, shares, signed) => {
match handle( match handle(
txn, txn,
Zone::Dkg, &DataSpecification {
DKG_CONFIRMATION_SHARES, zone: Zone::Dkg,
spec.n(), label: DKG_CONFIRMATION_SHARES,
[0; 32], id: [0; 32],
attempt, attempt,
},
shares.to_vec(), shares.to_vec(),
&signed, &signed,
) { ) {
@@ -422,12 +395,13 @@ pub async fn handle_application_tx<
txn, txn,
spec, spec,
key, key,
DKG_CONFIRMATION_NONCES, &DataSpecification {
[0; 32], zone: Zone::Dkg,
label: DKG_CONFIRMATION_NONCES,
id: [0; 32],
attempt,
},
spec.n(), spec.n(),
attempt,
vec![],
None,
) else { ) else {
panic!("wasn't a participant in DKG confirmation nonces"); panic!("wasn't a participant in DKG confirmation nonces");
}; };
@@ -458,7 +432,7 @@ pub async fn handle_application_tx<
Transaction::Batch(_, batch) => { Transaction::Batch(_, batch) => {
// Because this Batch has achieved synchrony, its batch ID should be authorized // Because this Batch has achieved synchrony, its batch ID should be authorized
TributaryDb::<D>::recognize_id(txn, Zone::Batch.label(), genesis, batch); TributaryDb::<D>::recognize_id(txn, genesis, Zone::Batch, batch);
recognized_id(spec.set().network, genesis, RecognizedIdType::Batch, batch).await; recognized_id(spec.set().network, genesis, RecognizedIdType::Batch, batch).await;
} }
@@ -469,7 +443,7 @@ pub async fn handle_application_tx<
); );
for id in plan_ids { for id in plan_ids {
TributaryDb::<D>::recognize_id(txn, Zone::Sign.label(), genesis, id); TributaryDb::<D>::recognize_id(txn, genesis, Zone::Sign, id);
recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id).await; recognized_id(spec.set().network, genesis, RecognizedIdType::Plan, id).await;
} }
} }
@@ -477,11 +451,12 @@ pub async fn handle_application_tx<
Transaction::BatchPreprocess(data) => { Transaction::BatchPreprocess(data) => {
match handle( match handle(
txn, txn,
Zone::Batch, &DataSpecification {
b"batch_preprocess", zone: Zone::Batch,
spec.t(), label: "preprocess",
data.plan, id: data.plan,
data.attempt, attempt: data.attempt,
},
data.data, data.data,
&data.signed, &data.signed,
) { ) {
@@ -503,11 +478,12 @@ pub async fn handle_application_tx<
Transaction::BatchShare(data) => { Transaction::BatchShare(data) => {
match handle( match handle(
txn, txn,
Zone::Batch, &DataSpecification {
b"batch_share", zone: Zone::Batch,
spec.t(), label: "share",
data.plan, id: data.plan,
data.attempt, attempt: data.attempt,
},
data.data, data.data,
&data.signed, &data.signed,
) { ) {
@@ -534,11 +510,12 @@ pub async fn handle_application_tx<
let key_pair = TributaryDb::<D>::key_pair(txn, spec.set()); let key_pair = TributaryDb::<D>::key_pair(txn, spec.set());
match handle( match handle(
txn, txn,
Zone::Sign, &DataSpecification {
b"sign_preprocess", zone: Zone::Sign,
spec.t(), label: "preprocess",
data.plan, id: data.plan,
data.attempt, attempt: data.attempt,
},
data.data, data.data,
&data.signed, &data.signed,
) { ) {
@@ -568,11 +545,12 @@ pub async fn handle_application_tx<
let key_pair = TributaryDb::<D>::key_pair(txn, spec.set()); let key_pair = TributaryDb::<D>::key_pair(txn, spec.set());
match handle( match handle(
txn, txn,
Zone::Sign, &DataSpecification {
b"sign_share", zone: Zone::Sign,
spec.t(), label: "share",
data.plan, id: data.plan,
data.attempt, attempt: data.attempt,
},
data.data, data.data,
&data.signed, &data.signed,
) { ) {

View File

@@ -84,7 +84,6 @@ async fn handle_block<
spec, spec,
processors, processors,
publish_serai_tx.clone(), publish_serai_tx.clone(),
genesis,
key, key,
recognized_id.clone(), recognized_id.clone(),
&mut txn, &mut txn,