mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-12 14:09:25 +00:00
Merge branch 'develop' into HEAD
This commit is contained in:
@@ -5,7 +5,10 @@ use std::{
|
||||
time::Duration,
|
||||
};
|
||||
|
||||
use tokio::{task::AbortHandle, sync::Mutex as AsyncMutex};
|
||||
use tokio::{
|
||||
task::AbortHandle,
|
||||
sync::{Mutex as AsyncMutex, mpsc},
|
||||
};
|
||||
|
||||
use rand_core::{RngCore, OsRng};
|
||||
|
||||
@@ -58,21 +61,21 @@ pub fn coordinator_instance(
|
||||
}
|
||||
|
||||
pub fn serai_composition(name: &str, fast_epoch: bool) -> TestBodySpecification {
|
||||
if fast_epoch {
|
||||
(if fast_epoch {
|
||||
serai_docker_tests::build("serai-fast-epoch".to_string());
|
||||
TestBodySpecification::with_image(
|
||||
Image::with_repository("serai-dev-serai-fast-epoch").pull_policy(PullPolicy::Never),
|
||||
)
|
||||
.replace_env([("SERAI_NAME".to_string(), name.to_lowercase())].into())
|
||||
.set_publish_all_ports(true)
|
||||
} else {
|
||||
serai_docker_tests::build("serai".to_string());
|
||||
TestBodySpecification::with_image(
|
||||
Image::with_repository("serai-dev-serai").pull_policy(PullPolicy::Never),
|
||||
)
|
||||
.replace_env([("SERAI_NAME".to_string(), name.to_lowercase())].into())
|
||||
.set_publish_all_ports(true)
|
||||
}
|
||||
})
|
||||
.replace_env(
|
||||
[("SERAI_NAME".to_string(), name.to_lowercase()), ("KEY".to_string(), " ".to_string())].into(),
|
||||
)
|
||||
.set_publish_all_ports(true)
|
||||
}
|
||||
|
||||
fn is_cosign_message(msg: &CoordinatorMessage) -> bool {
|
||||
@@ -104,7 +107,6 @@ pub struct Handles {
|
||||
pub(crate) message_queue: String,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Processor {
|
||||
network: NetworkId,
|
||||
|
||||
@@ -112,7 +114,8 @@ pub struct Processor {
|
||||
#[allow(unused)]
|
||||
handles: Handles,
|
||||
|
||||
queue: Arc<AsyncMutex<(u64, u64, MessageQueue)>>,
|
||||
msgs: mpsc::UnboundedReceiver<messages::CoordinatorMessage>,
|
||||
queue_for_sending: MessageQueue,
|
||||
abort_handle: Option<Arc<AbortHandle>>,
|
||||
|
||||
substrate_key: Arc<AsyncMutex<Option<Zeroizing<<Ristretto as Ciphersuite>::F>>>>,
|
||||
@@ -153,156 +156,173 @@ impl Processor {
|
||||
// The Serai RPC may or may not be started
|
||||
// Assume it is and continue, so if it's a few seconds late, it's still within tolerance
|
||||
|
||||
// Create the queue
|
||||
let mut queue = (
|
||||
0,
|
||||
Arc::new(MessageQueue::new(
|
||||
Service::Processor(network),
|
||||
message_queue_rpc.clone(),
|
||||
Zeroizing::new(processor_key),
|
||||
)),
|
||||
);
|
||||
|
||||
let (msg_send, msg_recv) = mpsc::unbounded_channel();
|
||||
|
||||
let substrate_key = Arc::new(AsyncMutex::new(None));
|
||||
let mut res = Processor {
|
||||
network,
|
||||
|
||||
serai_rpc,
|
||||
handles,
|
||||
|
||||
queue: Arc::new(AsyncMutex::new((
|
||||
0,
|
||||
0,
|
||||
MessageQueue::new(
|
||||
Service::Processor(network),
|
||||
message_queue_rpc,
|
||||
Zeroizing::new(processor_key),
|
||||
),
|
||||
))),
|
||||
queue_for_sending: MessageQueue::new(
|
||||
Service::Processor(network),
|
||||
message_queue_rpc,
|
||||
Zeroizing::new(processor_key),
|
||||
),
|
||||
msgs: msg_recv,
|
||||
abort_handle: None,
|
||||
|
||||
substrate_key: Arc::new(AsyncMutex::new(None)),
|
||||
substrate_key: substrate_key.clone(),
|
||||
};
|
||||
|
||||
// Handle any cosigns which come up
|
||||
res.abort_handle = Some(Arc::new(
|
||||
tokio::spawn({
|
||||
let mut res = res.clone();
|
||||
async move {
|
||||
loop {
|
||||
tokio::task::yield_now().await;
|
||||
// Spawn a task to handle cosigns and forward messages as appropriate
|
||||
let abort_handle = tokio::spawn({
|
||||
async move {
|
||||
loop {
|
||||
// Get new messages
|
||||
let (next_recv_id, queue) = &mut queue;
|
||||
let msg = queue.next(Service::Coordinator).await;
|
||||
assert_eq!(msg.from, Service::Coordinator);
|
||||
assert_eq!(msg.id, *next_recv_id);
|
||||
queue.ack(Service::Coordinator, msg.id).await;
|
||||
*next_recv_id += 1;
|
||||
|
||||
let msg = {
|
||||
let mut queue_lock = res.queue.lock().await;
|
||||
let (_, next_recv_id, queue) = &mut *queue_lock;
|
||||
let Ok(msg) =
|
||||
tokio::time::timeout(Duration::from_secs(1), queue.next(Service::Coordinator))
|
||||
.await
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
assert_eq!(msg.from, Service::Coordinator);
|
||||
assert_eq!(msg.id, *next_recv_id);
|
||||
let msg_msg = borsh::from_slice(&msg.msg).unwrap();
|
||||
|
||||
let msg_msg = borsh::from_slice(&msg.msg).unwrap();
|
||||
// Remove any BatchReattempts clogging the pipe
|
||||
// TODO: Set up a wrapper around serai-client so we aren't throwing this away yet
|
||||
// leave it for the tests
|
||||
if matches!(
|
||||
msg_msg,
|
||||
messages::CoordinatorMessage::Coordinator(
|
||||
messages::coordinator::CoordinatorMessage::BatchReattempt { .. }
|
||||
)
|
||||
) {
|
||||
queue.ack(Service::Coordinator, msg.id).await;
|
||||
*next_recv_id += 1;
|
||||
continue;
|
||||
}
|
||||
if !is_cosign_message(&msg_msg) {
|
||||
continue;
|
||||
};
|
||||
queue.ack(Service::Coordinator, msg.id).await;
|
||||
*next_recv_id += 1;
|
||||
msg_msg
|
||||
};
|
||||
// Remove any BatchReattempts clogging the pipe
|
||||
// TODO: Set up a wrapper around serai-client so we aren't throwing this away yet
|
||||
// leave it for the tests
|
||||
if matches!(
|
||||
msg_msg,
|
||||
messages::CoordinatorMessage::Coordinator(
|
||||
messages::coordinator::CoordinatorMessage::BatchReattempt { .. }
|
||||
)
|
||||
) {
|
||||
continue;
|
||||
}
|
||||
|
||||
struct CurrentCosign {
|
||||
block_number: u64,
|
||||
block: [u8; 32],
|
||||
}
|
||||
static CURRENT_COSIGN: OnceLock<AsyncMutex<Option<CurrentCosign>>> = OnceLock::new();
|
||||
let mut current_cosign =
|
||||
CURRENT_COSIGN.get_or_init(|| AsyncMutex::new(None)).lock().await;
|
||||
match msg {
|
||||
// If this is a CosignSubstrateBlock, reset the CurrentCosign
|
||||
// While technically, each processor should individually track the current cosign,
|
||||
// this is fine for current testing purposes
|
||||
CoordinatorMessage::Coordinator(
|
||||
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
|
||||
id,
|
||||
block_number,
|
||||
if !is_cosign_message(&msg_msg) {
|
||||
msg_send.send(msg_msg).unwrap();
|
||||
continue;
|
||||
}
|
||||
let msg = msg_msg;
|
||||
|
||||
let send_message = |msg: ProcessorMessage| async move {
|
||||
queue
|
||||
.queue(
|
||||
Metadata {
|
||||
from: Service::Processor(network),
|
||||
to: Service::Coordinator,
|
||||
intent: msg.intent(),
|
||||
},
|
||||
) => {
|
||||
let SubstrateSignId {
|
||||
id: SubstrateSignableId::CosigningSubstrateBlock(block), ..
|
||||
} = id
|
||||
else {
|
||||
panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock ID")
|
||||
};
|
||||
borsh::to_vec(&msg).unwrap(),
|
||||
)
|
||||
.await;
|
||||
};
|
||||
|
||||
let new_cosign = CurrentCosign { block_number, block };
|
||||
if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) {
|
||||
*current_cosign = Some(new_cosign);
|
||||
struct CurrentCosign {
|
||||
block_number: u64,
|
||||
block: [u8; 32],
|
||||
}
|
||||
static CURRENT_COSIGN: OnceLock<AsyncMutex<Option<CurrentCosign>>> = OnceLock::new();
|
||||
let mut current_cosign =
|
||||
CURRENT_COSIGN.get_or_init(|| AsyncMutex::new(None)).lock().await;
|
||||
match msg {
|
||||
// If this is a CosignSubstrateBlock, reset the CurrentCosign
|
||||
// While technically, each processor should individually track the current cosign,
|
||||
// this is fine for current testing purposes
|
||||
CoordinatorMessage::Coordinator(
|
||||
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock { id, block_number },
|
||||
) => {
|
||||
let SubstrateSignId {
|
||||
id: SubstrateSignableId::CosigningSubstrateBlock(block), ..
|
||||
} = id
|
||||
else {
|
||||
panic!("CosignSubstrateBlock didn't have CosigningSubstrateBlock ID")
|
||||
};
|
||||
|
||||
let new_cosign = CurrentCosign { block_number, block };
|
||||
if current_cosign.is_none() || (current_cosign.as_ref().unwrap().block != block) {
|
||||
*current_cosign = Some(new_cosign);
|
||||
}
|
||||
send_message(
|
||||
messages::coordinator::ProcessorMessage::CosignPreprocess {
|
||||
id: id.clone(),
|
||||
preprocesses: vec![[raw_i; 64]],
|
||||
}
|
||||
res
|
||||
.send_message(messages::coordinator::ProcessorMessage::CosignPreprocess {
|
||||
id: id.clone(),
|
||||
preprocesses: vec![[raw_i; 64]],
|
||||
})
|
||||
.await;
|
||||
}
|
||||
CoordinatorMessage::Coordinator(
|
||||
messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. },
|
||||
) => {
|
||||
// TODO: Assert the ID matches CURRENT_COSIGN
|
||||
// TODO: Verify the received preprocesses
|
||||
res
|
||||
.send_message(messages::coordinator::ProcessorMessage::SubstrateShare {
|
||||
id,
|
||||
shares: vec![[raw_i; 32]],
|
||||
})
|
||||
.await;
|
||||
}
|
||||
CoordinatorMessage::Coordinator(
|
||||
messages::coordinator::CoordinatorMessage::SubstrateShares { .. },
|
||||
) => {
|
||||
// TODO: Assert the ID matches CURRENT_COSIGN
|
||||
// TODO: Verify the shares
|
||||
|
||||
let block_number = current_cosign.as_ref().unwrap().block_number;
|
||||
let block = current_cosign.as_ref().unwrap().block;
|
||||
|
||||
let substrate_key = res.substrate_key.lock().await.clone().unwrap();
|
||||
|
||||
// Expand to a key pair as Schnorrkel expects
|
||||
// It's the private key + 32-bytes of entropy for nonces + the public key
|
||||
let mut schnorrkel_key_pair = [0; 96];
|
||||
schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr());
|
||||
OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]);
|
||||
schnorrkel_key_pair[64 ..].copy_from_slice(
|
||||
&(<Ristretto as Ciphersuite>::generator() * *substrate_key).to_bytes(),
|
||||
);
|
||||
let signature = Signature(
|
||||
schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair)
|
||||
.unwrap()
|
||||
.sign_simple(b"substrate", &cosign_block_msg(block_number, block))
|
||||
.to_bytes(),
|
||||
);
|
||||
|
||||
res
|
||||
.send_message(messages::coordinator::ProcessorMessage::CosignedBlock {
|
||||
block_number,
|
||||
block,
|
||||
signature: signature.0.to_vec(),
|
||||
})
|
||||
.await;
|
||||
}
|
||||
_ => panic!("unexpected message passed is_cosign_message"),
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
CoordinatorMessage::Coordinator(
|
||||
messages::coordinator::CoordinatorMessage::SubstratePreprocesses { id, .. },
|
||||
) => {
|
||||
// TODO: Assert the ID matches CURRENT_COSIGN
|
||||
// TODO: Verify the received preprocesses
|
||||
send_message(
|
||||
messages::coordinator::ProcessorMessage::SubstrateShare {
|
||||
id,
|
||||
shares: vec![[raw_i; 32]],
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
CoordinatorMessage::Coordinator(
|
||||
messages::coordinator::CoordinatorMessage::SubstrateShares { .. },
|
||||
) => {
|
||||
// TODO: Assert the ID matches CURRENT_COSIGN
|
||||
// TODO: Verify the shares
|
||||
|
||||
let block_number = current_cosign.as_ref().unwrap().block_number;
|
||||
let block = current_cosign.as_ref().unwrap().block;
|
||||
|
||||
let substrate_key = substrate_key.lock().await.clone().unwrap();
|
||||
|
||||
// Expand to a key pair as Schnorrkel expects
|
||||
// It's the private key + 32-bytes of entropy for nonces + the public key
|
||||
let mut schnorrkel_key_pair = [0; 96];
|
||||
schnorrkel_key_pair[.. 32].copy_from_slice(&substrate_key.to_repr());
|
||||
OsRng.fill_bytes(&mut schnorrkel_key_pair[32 .. 64]);
|
||||
schnorrkel_key_pair[64 ..].copy_from_slice(
|
||||
&(<Ristretto as Ciphersuite>::generator() * *substrate_key).to_bytes(),
|
||||
);
|
||||
let signature = Signature(
|
||||
schnorrkel::keys::Keypair::from_bytes(&schnorrkel_key_pair)
|
||||
.unwrap()
|
||||
.sign_simple(b"substrate", &cosign_block_msg(block_number, block))
|
||||
.to_bytes(),
|
||||
);
|
||||
|
||||
send_message(
|
||||
messages::coordinator::ProcessorMessage::CosignedBlock {
|
||||
block_number,
|
||||
block,
|
||||
signature: signature.0.to_vec(),
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
_ => panic!("unexpected message passed is_cosign_message"),
|
||||
}
|
||||
}
|
||||
})
|
||||
.abort_handle(),
|
||||
));
|
||||
}
|
||||
})
|
||||
.abort_handle();
|
||||
|
||||
res.abort_handle = Some(Arc::new(abort_handle));
|
||||
|
||||
res
|
||||
}
|
||||
@@ -315,9 +335,8 @@ impl Processor {
|
||||
pub async fn send_message(&mut self, msg: impl Into<ProcessorMessage>) {
|
||||
let msg: ProcessorMessage = msg.into();
|
||||
|
||||
let mut queue_lock = self.queue.lock().await;
|
||||
let (next_send_id, _, queue) = &mut *queue_lock;
|
||||
queue
|
||||
self
|
||||
.queue_for_sending
|
||||
.queue(
|
||||
Metadata {
|
||||
from: Service::Processor(self.network),
|
||||
@@ -327,36 +346,13 @@ impl Processor {
|
||||
borsh::to_vec(&msg).unwrap(),
|
||||
)
|
||||
.await;
|
||||
*next_send_id += 1;
|
||||
}
|
||||
|
||||
async fn recv_message_inner(&mut self) -> CoordinatorMessage {
|
||||
loop {
|
||||
tokio::task::yield_now().await;
|
||||
|
||||
let mut queue_lock = self.queue.lock().await;
|
||||
let (_, next_recv_id, queue) = &mut *queue_lock;
|
||||
let msg = queue.next(Service::Coordinator).await;
|
||||
assert_eq!(msg.from, Service::Coordinator);
|
||||
assert_eq!(msg.id, *next_recv_id);
|
||||
|
||||
// If this is a cosign message, let the cosign task handle it
|
||||
let msg_msg = borsh::from_slice(&msg.msg).unwrap();
|
||||
if is_cosign_message(&msg_msg) {
|
||||
continue;
|
||||
}
|
||||
|
||||
queue.ack(Service::Coordinator, msg.id).await;
|
||||
*next_recv_id += 1;
|
||||
return msg_msg;
|
||||
}
|
||||
}
|
||||
|
||||
/// Receive a message from the coordinator as a processor.
|
||||
pub async fn recv_message(&mut self) -> CoordinatorMessage {
|
||||
// Set a timeout of 30 minutes to allow effectively any protocol to occur without a fear of
|
||||
// an arbitrary timeout cutting it short
|
||||
tokio::time::timeout(Duration::from_secs(30 * 60), self.recv_message_inner()).await.unwrap()
|
||||
tokio::time::timeout(Duration::from_secs(20 * 60), self.msgs.recv()).await.unwrap().unwrap()
|
||||
}
|
||||
|
||||
pub async fn set_substrate_key(
|
||||
|
||||
@@ -245,7 +245,7 @@ pub async fn batch(
|
||||
)
|
||||
);
|
||||
|
||||
// Send the ack as expected, though it shouldn't trigger any observable behavior
|
||||
// Send the ack as expected
|
||||
processor
|
||||
.send_message(messages::ProcessorMessage::Coordinator(
|
||||
messages::coordinator::ProcessorMessage::SubstrateBlockAck {
|
||||
|
||||
@@ -137,7 +137,6 @@ pub(crate) async fn new_test(test_body: impl TestBody, fast_epoch: bool) {
|
||||
*OUTER_OPS.get_or_init(|| Mutex::new(None)).lock().await = None;
|
||||
|
||||
// Spawns a coordinator, if one has yet to be spawned, or else runs the test.
|
||||
#[async_recursion::async_recursion]
|
||||
async fn spawn_coordinator_or_run_test(inner_ops: DockerOperations) {
|
||||
// If the outer operations have yet to be set, these *are* the outer operations
|
||||
let outer_ops = OUTER_OPS.get().unwrap();
|
||||
@@ -180,7 +179,10 @@ pub(crate) async fn new_test(test_body: impl TestBody, fast_epoch: bool) {
|
||||
test.provide_container(composition);
|
||||
|
||||
drop(context_lock);
|
||||
test.run_async(spawn_coordinator_or_run_test).await;
|
||||
fn recurse(ops: DockerOperations) -> core::pin::Pin<Box<impl Send + Future<Output = ()>>> {
|
||||
Box::pin(spawn_coordinator_or_run_test(ops))
|
||||
}
|
||||
test.run_async(recurse).await;
|
||||
} else {
|
||||
let outer_ops = outer_ops.lock().await.take().unwrap();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user