2023-08-01 19:00:48 -04:00
|
|
|
#![allow(clippy::needless_pass_by_ref_mut)] // False positives
|
|
|
|
|
|
2023-08-14 06:54:04 -04:00
|
|
|
use std::{
|
2023-12-22 21:09:18 -05:00
|
|
|
sync::{OnceLock, Arc},
|
2023-08-14 06:54:04 -04:00
|
|
|
time::Duration,
|
|
|
|
|
};
|
2023-08-01 19:00:48 -04:00
|
|
|
|
2024-03-20 08:23:23 -04:00
|
|
|
use tokio::{
|
|
|
|
|
task::AbortHandle,
|
|
|
|
|
sync::{Mutex as AsyncMutex, mpsc},
|
|
|
|
|
};
|
2023-11-17 16:00:35 -05:00
|
|
|
|
|
|
|
|
use rand_core::{RngCore, OsRng};
|
|
|
|
|
|
2023-08-06 12:38:44 -04:00
|
|
|
use zeroize::Zeroizing;
|
|
|
|
|
|
2023-11-17 16:00:35 -05:00
|
|
|
use ciphersuite::{
|
|
|
|
|
group::{ff::PrimeField, GroupEncoding},
|
|
|
|
|
Ciphersuite, Ristretto,
|
|
|
|
|
};
|
2023-08-01 19:00:48 -04:00
|
|
|
|
|
|
|
|
use serai_client::primitives::NetworkId;
|
|
|
|
|
|
2023-11-17 16:00:35 -05:00
|
|
|
use messages::{
|
|
|
|
|
coordinator::{SubstrateSignableId, SubstrateSignId, cosign_block_msg},
|
|
|
|
|
CoordinatorMessage, ProcessorMessage,
|
|
|
|
|
};
|
2023-08-06 12:38:44 -04:00
|
|
|
use serai_message_queue::{Service, Metadata, client::MessageQueue};
|
|
|
|
|
|
2023-11-17 16:00:35 -05:00
|
|
|
use serai_client::{primitives::Signature, Serai};
|
2023-08-21 01:54:25 -04:00
|
|
|
|
2023-12-22 21:09:18 -05:00
|
|
|
use dockertest::{PullPolicy, Image, TestBodySpecification, DockerOperations};
|
2023-08-01 19:00:48 -04:00
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
|
mod tests;
|
|
|
|
|
|
2023-08-06 12:38:44 -04:00
|
|
|
pub fn coordinator_instance(
|
|
|
|
|
name: &str,
|
|
|
|
|
message_queue_key: <Ristretto as Ciphersuite>::F,
|
2023-10-23 06:59:38 -04:00
|
|
|
) -> TestBodySpecification {
|
2023-08-01 19:00:48 -04:00
|
|
|
serai_docker_tests::build("coordinator".to_string());
|
|
|
|
|
|
2023-10-23 06:59:38 -04:00
|
|
|
TestBodySpecification::with_image(
|
2023-08-01 19:00:48 -04:00
|
|
|
Image::with_repository("serai-dev-coordinator").pull_policy(PullPolicy::Never),
|
|
|
|
|
)
|
2023-10-23 06:59:38 -04:00
|
|
|
.replace_env(
|
2023-08-01 19:00:48 -04:00
|
|
|
[
|
|
|
|
|
("MESSAGE_QUEUE_KEY".to_string(), hex::encode(message_queue_key.to_repr())),
|
|
|
|
|
("DB_PATH".to_string(), "./coordinator-db".to_string()),
|
2023-08-06 12:38:44 -04:00
|
|
|
("SERAI_KEY".to_string(), {
|
|
|
|
|
use serai_client::primitives::insecure_pair_from_name;
|
2023-08-06 13:42:16 -04:00
|
|
|
hex::encode(&insecure_pair_from_name(name).as_ref().secret.to_bytes()[.. 32])
|
2023-08-06 12:38:44 -04:00
|
|
|
}),
|
2023-08-08 15:12:47 -04:00
|
|
|
(
|
|
|
|
|
"RUST_LOG".to_string(),
|
|
|
|
|
"serai_coordinator=trace,".to_string() + "tributary_chain=trace," + "tendermint=trace",
|
|
|
|
|
),
|
2023-08-01 19:00:48 -04:00
|
|
|
]
|
|
|
|
|
.into(),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
2024-06-21 15:39:17 +03:00
|
|
|
pub fn serai_composition(name: &str, fast_epoch: bool) -> TestBodySpecification {
|
|
|
|
|
(if fast_epoch {
|
|
|
|
|
serai_docker_tests::build("serai-fast-epoch".to_string());
|
|
|
|
|
TestBodySpecification::with_image(
|
|
|
|
|
Image::with_repository("serai-dev-serai-fast-epoch").pull_policy(PullPolicy::Never),
|
|
|
|
|
)
|
|
|
|
|
} else {
|
|
|
|
|
serai_docker_tests::build("serai".to_string());
|
|
|
|
|
TestBodySpecification::with_image(
|
|
|
|
|
Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never),
|
|
|
|
|
)
|
|
|
|
|
})
|
2024-03-22 22:34:18 -04:00
|
|
|
.replace_env(
|
2024-03-23 17:38:50 -04:00
|
|
|
[("SERAI_NAME".to_string(), name.to_lowercase()), ("KEY".to_string(), " ".to_string())].into(),
|
2024-03-22 22:34:18 -04:00
|
|
|
)
|
2023-10-23 06:59:38 -04:00
|
|
|
.set_publish_all_ports(true)
|
2023-08-01 19:00:48 -04:00
|
|
|
}
|
|
|
|
|
|
2023-11-17 16:00:35 -05:00
|
|
|
fn is_cosign_message(msg: &CoordinatorMessage) -> bool {
|
|
|
|
|
matches!(
|
|
|
|
|
msg,
|
|
|
|
|
CoordinatorMessage::Coordinator(
|
|
|
|
|
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { .. }
|
|
|
|
|
)
|
|
|
|
|
) || matches!(
|
|
|
|
|
msg,
|
|
|
|
|
CoordinatorMessage::Coordinator(
|
|
|
|
|
messages::coordinator::CoordinatorMessage::SubstratePreprocesses {
|
|
|
|
|
id: SubstrateSignId { id: SubstrateSignableId::CosigningSubstrateBlock(_), .. },
|
|
|
|
|
..
|
|
|
|
|
}
|
|
|
|
|
),
|
|
|
|
|
) || matches!(
|
|
|
|
|
msg,
|
|
|
|
|
CoordinatorMessage::Coordinator(messages::coordinator::CoordinatorMessage::SubstrateShares {
|
|
|
|
|
id: SubstrateSignId { id: SubstrateSignableId::CosigningSubstrateBlock(_), .. },
|
|
|
|
|
..
|
|
|
|
|
}),
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
2023-12-22 21:09:18 -05:00
|
|
|
#[derive(Clone, PartialEq, Eq, Debug)]
|
|
|
|
|
pub struct Handles {
|
|
|
|
|
pub(crate) serai: String,
|
|
|
|
|
pub(crate) message_queue: String,
|
|
|
|
|
}
|
|
|
|
|
|
2023-08-06 12:38:44 -04:00
|
|
|
pub struct Processor {
|
|
|
|
|
network: NetworkId,
|
|
|
|
|
|
2023-08-21 01:54:25 -04:00
|
|
|
serai_rpc: String,
|
2023-08-06 12:38:44 -04:00
|
|
|
#[allow(unused)]
|
2023-12-22 21:09:18 -05:00
|
|
|
handles: Handles,
|
2023-08-06 12:38:44 -04:00
|
|
|
|
2024-03-20 08:23:23 -04:00
|
|
|
msgs: mpsc::UnboundedReceiver<messages::CoordinatorMessage>,
|
|
|
|
|
queue_for_sending: MessageQueue,
|
2023-11-17 16:00:35 -05:00
|
|
|
abort_handle: Option<Arc<AbortHandle>>,
|
|
|
|
|
|
|
|
|
|
substrate_key: Arc<AsyncMutex<Option<Zeroizing<<Ristretto as Ciphersuite>::F>>>>,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Drop for Processor {
|
|
|
|
|
fn drop(&mut self) {
|
|
|
|
|
if let Some(abort_handle) = self.abort_handle.take() {
|
|
|
|
|
abort_handle.abort();
|
|
|
|
|
};
|
|
|
|
|
}
|
2023-08-06 12:38:44 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
impl Processor {
|
|
|
|
|
pub async fn new(
|
2023-11-17 16:00:35 -05:00
|
|
|
raw_i: u8,
|
2023-08-06 12:38:44 -04:00
|
|
|
network: NetworkId,
|
|
|
|
|
ops: &DockerOperations,
|
2023-12-22 21:09:18 -05:00
|
|
|
handles: Handles,
|
2023-08-06 12:38:44 -04:00
|
|
|
processor_key: <Ristretto as Ciphersuite>::F,
|
|
|
|
|
) -> Processor {
|
2023-12-22 21:09:18 -05:00
|
|
|
let message_queue_rpc = ops.handle(&handles.message_queue).host_port(2287).unwrap();
|
2023-08-06 12:38:44 -04:00
|
|
|
let message_queue_rpc = format!("{}:{}", message_queue_rpc.0, message_queue_rpc.1);
|
|
|
|
|
|
|
|
|
|
// Sleep until the Substrate RPC starts
|
2023-12-22 21:09:18 -05:00
|
|
|
let serai_rpc = ops.handle(&handles.serai).host_port(9944).unwrap();
|
2023-11-28 02:29:50 -05:00
|
|
|
let serai_rpc = format!("http://{}:{}", serai_rpc.0, serai_rpc.1);
|
2023-08-06 12:38:44 -04:00
|
|
|
// Bound execution to 60 seconds
|
|
|
|
|
for _ in 0 .. 60 {
|
2023-08-14 06:54:04 -04:00
|
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
2023-11-28 02:29:50 -05:00
|
|
|
let Ok(client) = Serai::new(serai_rpc.clone()).await else { continue };
|
|
|
|
|
if client.latest_finalized_block_hash().await.is_err() {
|
2023-08-06 12:38:44 -04:00
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// The Serai RPC may or may not be started
|
|
|
|
|
// Assume it is and continue, so if it's a few seconds late, it's still within tolerance
|
|
|
|
|
|
2024-03-20 08:23:23 -04:00
|
|
|
// Create the queue
|
|
|
|
|
let mut queue = (
|
|
|
|
|
0,
|
|
|
|
|
Arc::new(MessageQueue::new(
|
|
|
|
|
Service::Processor(network),
|
|
|
|
|
message_queue_rpc.clone(),
|
|
|
|
|
Zeroizing::new(processor_key),
|
|
|
|
|
)),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let (msg_send, msg_recv) = mpsc::unbounded_channel();
|
|
|
|
|
|
|
|
|
|
let substrate_key = Arc::new(AsyncMutex::new(None));
|
2023-11-17 16:00:35 -05:00
|
|
|
let mut res = Processor {
|
2023-08-06 12:38:44 -04:00
|
|
|
network,
|
|
|
|
|
|
2023-08-21 01:54:25 -04:00
|
|
|
serai_rpc,
|
2023-12-22 21:09:18 -05:00
|
|
|
handles,
|
2023-08-06 12:38:44 -04:00
|
|
|
|
2024-03-20 08:23:23 -04:00
|
|
|
queue_for_sending: MessageQueue::new(
|
|
|
|
|
Service::Processor(network),
|
|
|
|
|
message_queue_rpc,
|
|
|
|
|
Zeroizing::new(processor_key),
|
|
|
|
|
),
|
|
|
|
|
msgs: msg_recv,
|
2023-11-17 16:00:35 -05:00
|
|
|
abort_handle: None,
|
|
|
|
|
|
2024-03-20 08:23:23 -04:00
|
|
|
substrate_key: substrate_key.clone(),
|
2023-11-17 16:00:35 -05:00
|
|
|
};
|
|
|
|
|
|
2024-03-20 08:23:23 -04:00
|
|
|
// Spawn a task to handle cosigns and forward messages as appropriate
|
|
|
|
|
let abort_handle = tokio::spawn({
|
|
|
|
|
async move {
|
|
|
|
|
loop {
|
|
|
|
|
// Get new messages
|
|
|
|
|
let (next_recv_id, queue) = &mut queue;
|
|
|
|
|
let msg = queue.next(Service::Coordinator).await;
|
|
|
|
|
assert_eq!(msg.from, Service::Coordinator);
|
|
|
|
|
assert_eq!(msg.id, *next_recv_id);
|
|
|
|
|
queue.ack(Service::Coordinator, msg.id).await;
|
|
|
|
|
*next_recv_id += 1;
|
|
|
|
|
|
|
|
|
|
let msg_msg = borsh::from_slice(&msg.msg).unwrap();
|
|
|
|
|
|
|
|
|
|
// Remove any BatchReattempts clogging the pipe
|
|
|
|
|
// TODO: Set up a wrapper around serai-client so we aren't throwing this away yet
|
|
|
|
|
// leave it for the tests
|
|
|
|
|
if matches!(
|
|
|
|
|
msg_msg,
|
|
|
|
|
messages::CoordinatorMessage::Coordinator(
|
|
|
|
|
messages::coordinator::CoordinatorMessage::BatchReattempt { .. }
|
|
|
|
|
)
|
|
|
|
|
) {
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if !is_cosign_message(&msg_msg) {
|
|
|
|
|
msg_send.send(msg_msg).unwrap();
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
let msg = msg_msg;
|
|
|
|
|
|
|
|
|
|
let send_message = |msg: ProcessorMessage| async move {
|
|
|
|
|
queue
|
|
|
|
|
.queue(
|
|
|
|
|
Metadata {
|
|
|
|
|
from: Service::Processor(network),
|
|
|
|
|
to: Service::Coordinator,
|
|
|
|
|
intent: msg.intent(),
|
|
|
|
|
},
|
|
|
|
|
borsh::to_vec(&msg).unwrap(),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
struct CurrentCosign {
|
|
|
|
|
block_number: u64,
|
|
|
|
|
block: [u8; 32],
|
|
|
|
|
}
|
|
|
|
|
static CURRENT_COSIGN: OnceLock<AsyncMutex<Option<CurrentCosign>>> = OnceLock::new();
|
|
|
|
|
let mut current_cosign =
|
|
|
|
|
CURRENT_COSIGN.get_or_init(|| AsyncMutex::new(None)).lock().await;
|
|
|
|
|
match msg {
|
|
|
|
|
// If this is a CosignSubstrateBlock, reset the CurrentCosign
|
|
|
|
|
// While technically, each processor should individually track the current cosign,
|
|
|
|
|
// this is fine for current testing purposes
|
|
|
|
|
CoordinatorMessage::Coordinator(
|
|
|
|
|
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { id, block_number },
|
|
|
|
|
) => {
|
|
|
|
|
let SubstrateSignId {
|
|
|
|
|
id: SubstrateSignableId::CosigningSubstrateBlock(block), ..
|
|
|
|
|
} = id
|
2023-11-17 16:00:35 -05:00
|
|
|
else {
|
2024-03-20 08:23:23 -04:00
|
|
|
panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock ID")
|
2023-11-17 16:00:35 -05:00
|
|
|
};
|
2024-03-20 08:23:23 -04:00
|
|
|
|
|
|
|
|
let new_cosign = CurrentCosign { block_number, block };
|
|
|
|
|
if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) {
|
|
|
|
|
*current_cosign = Some(new_cosign);
|
2023-11-17 16:00:35 -05:00
|
|
|
}
|
2024-03-20 08:23:23 -04:00
|
|
|
send_message(
|
|
|
|
|
messages::coordinator::ProcessorMessage::CosignPreprocess {
|
|
|
|
|
id: id.clone(),
|
|
|
|
|
preprocesses: vec![[raw_i; 64]],
|
|
|
|
|
}
|
|
|
|
|
.into(),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
2023-11-17 16:00:35 -05:00
|
|
|
}
|
2024-03-20 08:23:23 -04:00
|
|
|
CoordinatorMessage::Coordinator(
|
|
|
|
|
messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. },
|
|
|
|
|
) => {
|
|
|
|
|
// TODO: Assert the ID matches CURRENT_COSIGN
|
|
|
|
|
// TODO: Verify the received preprocesses
|
|
|
|
|
send_message(
|
|
|
|
|
messages::coordinator::ProcessorMessage::SubstrateShare {
|
2023-11-17 16:00:35 -05:00
|
|
|
id,
|
2024-03-20 08:23:23 -04:00
|
|
|
shares: vec![[raw_i; 32]],
|
|
|
|
|
}
|
|
|
|
|
.into(),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
|
|
|
|
}
|
|
|
|
|
CoordinatorMessage::Coordinator(
|
|
|
|
|
messages::coordinator::CoordinatorMessage::SubstrateShares { .. },
|
|
|
|
|
) => {
|
|
|
|
|
// TODO: Assert the ID matches CURRENT_COSIGN
|
|
|
|
|
// TODO: Verify the shares
|
|
|
|
|
|
|
|
|
|
let block_number = current_cosign.as_ref().unwrap().block_number;
|
|
|
|
|
let block = current_cosign.as_ref().unwrap().block;
|
|
|
|
|
|
|
|
|
|
let substrate_key = substrate_key.lock().await.clone().unwrap();
|
|
|
|
|
|
|
|
|
|
// Expand to a key pair as Schnorrkel expects
|
|
|
|
|
// It's the private key + 32-bytes of entropy for nonces + the public key
|
|
|
|
|
let mut schnorrkel_key_pair = [0; 96];
|
|
|
|
|
schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr());
|
|
|
|
|
OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]);
|
|
|
|
|
schnorrkel_key_pair[64 ..].copy_from_slice(
|
|
|
|
|
&(<Ristretto as Ciphersuite>::generator() * *substrate_key).to_bytes(),
|
|
|
|
|
);
|
|
|
|
|
let signature = Signature(
|
|
|
|
|
schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair)
|
|
|
|
|
.unwrap()
|
|
|
|
|
.sign_simple(b"substrate", &cosign_block_msg(block_number, block))
|
|
|
|
|
.to_bytes(),
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
send_message(
|
|
|
|
|
messages::coordinator::ProcessorMessage::CosignedBlock {
|
2023-11-17 16:00:35 -05:00
|
|
|
block_number,
|
2024-03-20 08:23:23 -04:00
|
|
|
block,
|
|
|
|
|
signature: signature.0.to_vec(),
|
2023-11-17 16:00:35 -05:00
|
|
|
}
|
2024-03-20 08:23:23 -04:00
|
|
|
.into(),
|
|
|
|
|
)
|
|
|
|
|
.await;
|
2023-11-17 16:00:35 -05:00
|
|
|
}
|
2024-03-20 08:23:23 -04:00
|
|
|
_ => panic!("unexpected message passed is_cosign_message"),
|
2023-11-17 16:00:35 -05:00
|
|
|
}
|
|
|
|
|
}
|
2024-03-20 08:23:23 -04:00
|
|
|
}
|
|
|
|
|
})
|
|
|
|
|
.abort_handle();
|
|
|
|
|
|
|
|
|
|
res.abort_handle = Some(Arc::new(abort_handle));
|
2023-11-17 16:00:35 -05:00
|
|
|
|
|
|
|
|
res
|
2023-08-06 12:38:44 -04:00
|
|
|
}
|
|
|
|
|
|
2023-08-21 01:54:25 -04:00
|
|
|
pub async fn serai(&self) -> Serai {
|
2023-11-28 02:29:50 -05:00
|
|
|
Serai::new(self.serai_rpc.clone()).await.unwrap()
|
2023-08-21 01:54:25 -04:00
|
|
|
}
|
|
|
|
|
|
2023-09-27 12:20:57 -04:00
|
|
|
/// Send a message to the coordinator as a processor.
|
2023-08-06 12:38:44 -04:00
|
|
|
pub async fn send_message(&mut self, msg: impl Into<ProcessorMessage>) {
|
|
|
|
|
let msg: ProcessorMessage = msg.into();
|
2023-11-17 16:00:35 -05:00
|
|
|
|
2024-03-20 08:23:23 -04:00
|
|
|
self
|
|
|
|
|
.queue_for_sending
|
2023-08-06 12:38:44 -04:00
|
|
|
.queue(
|
|
|
|
|
Metadata {
|
|
|
|
|
from: Service::Processor(self.network),
|
|
|
|
|
to: Service::Coordinator,
|
|
|
|
|
intent: msg.intent(),
|
|
|
|
|
},
|
2023-11-25 04:01:11 -05:00
|
|
|
borsh::to_vec(&msg).unwrap(),
|
2023-08-06 12:38:44 -04:00
|
|
|
)
|
|
|
|
|
.await;
|
2023-11-17 16:00:35 -05:00
|
|
|
}
|
|
|
|
|
|
2023-12-12 12:28:53 -05:00
|
|
|
/// Receive a message from the coordinator as a processor.
|
|
|
|
|
pub async fn recv_message(&mut self) -> CoordinatorMessage {
|
2024-02-18 08:19:07 -05:00
|
|
|
// Set a timeout of 20 minutes to allow effectively any protocol to occur without a fear of
|
2023-12-12 12:28:53 -05:00
|
|
|
// an arbitrary timeout cutting it short
|
2024-03-20 08:23:23 -04:00
|
|
|
tokio::time::timeout(Duration::from_secs(20 * 60), self.msgs.recv()).await.unwrap().unwrap()
|
2023-12-12 12:28:53 -05:00
|
|
|
}
|
|
|
|
|
|
2023-11-17 16:00:35 -05:00
|
|
|
pub async fn set_substrate_key(
|
|
|
|
|
&mut self,
|
|
|
|
|
substrate_key: Zeroizing<<Ristretto as Ciphersuite>::F>,
|
|
|
|
|
) {
|
|
|
|
|
*self.substrate_key.lock().await = Some(substrate_key);
|
2023-08-06 12:38:44 -04:00
|
|
|
}
|
|
|
|
|
}
|