Attempt to resolve #434

Worsens the coordinator tests' ensuring of the validity of the cosigning
protocol, yet should actually properly model it.
This commit is contained in:
Luke Parker
2023-11-17 16:00:35 -05:00
parent db49a63c2b
commit 25066437da
6 changed files with 242 additions and 217 deletions

View File

@@ -1,21 +1,31 @@
#![allow(clippy::needless_pass_by_ref_mut)] // False positives
use std::{
sync::{OnceLock, Mutex},
sync::{OnceLock, Arc, Mutex},
time::Duration,
fs,
};
use tokio::{task::AbortHandle, sync::Mutex as AsyncMutex};
use rand_core::{RngCore, OsRng};
use zeroize::Zeroizing;
use ciphersuite::{group::ff::PrimeField, Ciphersuite, Ristretto};
use ciphersuite::{
group::{ff::PrimeField, GroupEncoding},
Ciphersuite, Ristretto,
};
use serai_client::primitives::NetworkId;
use messages::{CoordinatorMessage, ProcessorMessage};
use messages::{
coordinator::{SubstrateSignableId, SubstrateSignId, cosign_block_msg},
CoordinatorMessage, ProcessorMessage,
};
use serai_message_queue::{Service, Metadata, client::MessageQueue};
use serai_client::Serai;
use serai_client::{primitives::Signature, Serai};
use dockertest::{
PullPolicy, Image, LogAction, LogPolicy, LogSource, LogOptions, StartPolicy,
@@ -143,6 +153,30 @@ pub fn coordinator_stack(
)
}
fn is_cosign_message(msg: &CoordinatorMessage) -> bool {
matches!(
msg,
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { .. }
)
) || matches!(
msg,
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::SubstratePreprocesses {
id: SubstrateSignId { id: SubstrateSignableId::CosigningSubstrateBlock(_), .. },
..
}
),
) || matches!(
msg,
CoordinatorMessage::Coordinator(messages::coordinator::CoordinatorMessage::SubstrateShares {
id: SubstrateSignId { id: SubstrateSignableId::CosigningSubstrateBlock(_), .. },
..
}),
)
}
#[derive(Clone)]
pub struct Processor {
network: NetworkId,
@@ -152,13 +186,23 @@ pub struct Processor {
#[allow(unused)]
coordinator_handle: String,
next_send_id: u64,
next_recv_id: u64,
queue: MessageQueue,
queue: Arc<AsyncMutex<(u64, u64, MessageQueue)>>,
abort_handle: Option<Arc<AbortHandle>>,
substrate_key: Arc<AsyncMutex<Option<Zeroizing<<Ristretto as Ciphersuite>::F>>>>,
}
impl Drop for Processor {
fn drop(&mut self) {
if let Some(abort_handle) = self.abort_handle.take() {
abort_handle.abort();
};
}
}
impl Processor {
pub async fn new(
raw_i: u8,
network: NetworkId,
ops: &DockerOperations,
handles: (String, String, String),
@@ -183,21 +227,147 @@ impl Processor {
// The Serai RPC may or may not be started
// Assume it is and continue, so if it's a few seconds late, it's still within tolerance
Processor {
let mut res = Processor {
network,
serai_rpc,
message_queue_handle: handles.1,
coordinator_handle: handles.2,
next_send_id: 0,
next_recv_id: 0,
queue: MessageQueue::new(
Service::Processor(network),
message_queue_rpc,
Zeroizing::new(processor_key),
),
}
queue: Arc::new(AsyncMutex::new((
0,
0,
MessageQueue::new(
Service::Processor(network),
message_queue_rpc,
Zeroizing::new(processor_key),
),
))),
abort_handle: None,
substrate_key: Arc::new(AsyncMutex::new(None)),
};
// Handle any cosigns which come up
res.abort_handle = Some(Arc::new(
tokio::spawn({
let mut res = res.clone();
async move {
loop {
tokio::task::yield_now().await;
let msg = {
let mut queue_lock = res.queue.lock().await;
let (_, next_recv_id, queue) = &mut *queue_lock;
let Ok(msg) =
tokio::time::timeout(Duration::from_secs(1), queue.next(Service::Coordinator))
.await
else {
continue;
};
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, *next_recv_id);
let msg_msg = serde_json::from_slice(&msg.msg).unwrap();
if !is_cosign_message(&msg_msg) {
continue;
}
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
msg_msg
};
struct CurrentCosign {
block_number: u64,
block: [u8; 32],
}
static CURRENT_COSIGN: OnceLock<AsyncMutex<Option<CurrentCosign>>> = OnceLock::new();
let mut current_cosign =
CURRENT_COSIGN.get_or_init(|| AsyncMutex::new(None)).lock().await;
match msg {
// If this is a CosignSubstrateBlock, reset the CurrentCosign
// While technically, each processor should individually track the current cosign,
// this is fine for current testing purposes
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
id,
block_number,
},
) => {
let block = match id {
SubstrateSignId {
id: SubstrateSignableId::CosigningSubstrateBlock(block),
..
} => block,
_ => panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock ID"),
};
let new_cosign = CurrentCosign { block_number, block };
if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) {
*current_cosign = Some(new_cosign);
}
res
.send_message(messages::coordinator::ProcessorMessage::CosignPreprocess {
id: id.clone(),
preprocesses: vec![vec![raw_i; 64]],
})
.await;
}
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. },
) => {
// TODO: Assert the ID matches CURRENT_COSIGN
// TODO: Verify the received preprocesses
res
.send_message(messages::coordinator::ProcessorMessage::SubstrateShare {
id,
shares: vec![[raw_i; 32]],
})
.await;
}
CoordinatorMessage::Coordinator(
messages::coordinator::CoordinatorMessage::SubstrateShares { .. },
) => {
// TODO: Assert the ID matches CURRENT_COSIGN
// TODO: Verify the shares
let block_number = current_cosign.as_ref().unwrap().block_number;
let block = current_cosign.as_ref().unwrap().block;
let substrate_key = res.substrate_key.lock().await.clone().unwrap();
// Expand to a key pair as Schnorrkel expects
// It's the private key + 32-bytes of entropy for nonces + the public key
let mut schnorrkel_key_pair = [0; 96];
schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr());
OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]);
schnorrkel_key_pair[64 ..].copy_from_slice(
&(<Ristretto as Ciphersuite>::generator() * *substrate_key).to_bytes(),
);
let signature = Signature(
schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair)
.unwrap()
.sign_simple(b"substrate", &cosign_block_msg(block_number, block))
.to_bytes(),
);
res
.send_message(messages::coordinator::ProcessorMessage::CosignedBlock {
block_number,
block,
signature: signature.0.to_vec(),
})
.await;
}
_ => panic!("unexpected message passed is_cosign_message"),
}
}
}
})
.abort_handle(),
));
res
}
pub async fn serai(&self) -> Serai {
@@ -207,8 +377,10 @@ impl Processor {
/// Send a message to the coordinator as a processor.
pub async fn send_message(&mut self, msg: impl Into<ProcessorMessage>) {
let msg: ProcessorMessage = msg.into();
self
.queue
let mut queue_lock = self.queue.lock().await;
let (next_send_id, _, queue) = &mut *queue_lock;
queue
.queue(
Metadata {
from: Service::Processor(self.network),
@@ -218,20 +390,39 @@ impl Processor {
serde_json::to_string(&msg).unwrap().into_bytes(),
)
.await;
self.next_send_id += 1;
*next_send_id += 1;
}
/// Receive a message from the coordinator as a processor.
pub async fn recv_message(&mut self) -> CoordinatorMessage {
// Set a timeout of an entire 6 minutes as cosigning may be delayed by up to 5 minutes
let msg =
tokio::time::timeout(Duration::from_secs(6 * 60), self.queue.next(Service::Coordinator))
loop {
tokio::task::yield_now().await;
let mut queue_lock = self.queue.lock().await;
let (_, next_recv_id, queue) = &mut *queue_lock;
// Set a timeout of an entire 6 minutes as cosigning may be delayed by up to 5 minutes
let msg = tokio::time::timeout(Duration::from_secs(6 * 60), queue.next(Service::Coordinator))
.await
.unwrap();
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, self.next_recv_id);
self.queue.ack(Service::Coordinator, msg.id).await;
self.next_recv_id += 1;
serde_json::from_slice(&msg.msg).unwrap()
assert_eq!(msg.from, Service::Coordinator);
assert_eq!(msg.id, *next_recv_id);
// If this is a cosign message, let the cosign task handle it
let msg_msg = serde_json::from_slice(&msg.msg).unwrap();
if is_cosign_message(&msg_msg) {
continue;
}
queue.ack(Service::Coordinator, msg.id).await;
*next_recv_id += 1;
return msg_msg;
}
}
pub async fn set_substrate_key(
&mut self,
substrate_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
) {
*self.substrate_key.lock().await = Some(substrate_key);
}
}