8 Commits

Author SHA1 Message Date
Luke Parker
a731c0005d Finish routing our own channel abstraction around the Swarm event stream 2025-01-07 16:51:56 -05:00
Luke Parker
f27e4e3202 Move the WIP SwarmTask to its own file 2025-01-07 16:34:19 -05:00
Luke Parker
f55165e016 Add channels to send requests/recv responses 2025-01-07 15:51:15 -05:00
Luke Parker
d9e9887d34 Run the dial task whenever we have a peer disconnect 2025-01-07 15:36:42 -05:00
Luke Parker
82e753db30 Document risk of eclipse in the dial task 2025-01-07 15:35:34 -05:00
Luke Parker
052388285b Remove TaskHandle::close
TaskHandle::close meant run_now may panic if the task was closed. Now, tasks
are only closed when all handles are dropped, causing all handles to point to
running tasks (ensuring run_now won't panic).
2025-01-07 15:26:41 -05:00
Luke Parker
47a4e534ef Update serai-processor-signers to VariantSignid::Batch([u8; 32]) 2025-01-07 15:26:23 -05:00
Luke Parker
257f691277 Start filling out message handling in SwarmTask 2025-01-05 01:23:28 -05:00
13 changed files with 410 additions and 225 deletions

1
Cargo.lock generated
View File

@@ -8996,6 +8996,7 @@ dependencies = [
name = "serai-processor-signers"
version = "0.1.0"
dependencies = [
"blake2",
"borsh",
"ciphersuite",
"frost-schnorrkel",

View File

@@ -3,27 +3,29 @@
#![deny(missing_docs)]
use core::{future::Future, time::Duration};
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot, Mutex};
enum Closed {
NotClosed(Option<oneshot::Receiver<()>>),
Closed,
}
use tokio::sync::mpsc;
/// A handle for a task.
///
/// The task will only stop running once all handles for it are dropped.
//
// `run_now` isn't infallible if the task may have been closed. `run_now` on a closed task would
// either need to panic (historic behavior), silently drop the fact the task can't be run, or
// return an error. Instead of having a potential panic, and instead of modeling the error
// behavior, this task can't be closed unless all handles are dropped, ensuring calls to `run_now`
// are infallible.
#[derive(Clone)]
pub struct TaskHandle {
run_now: mpsc::Sender<()>,
#[allow(dead_code)] // This is used to track if all handles have been dropped
close: mpsc::Sender<()>,
closed: Arc<Mutex<Closed>>,
}
/// A task's internal structures.
pub struct Task {
run_now: mpsc::Receiver<()>,
close: mpsc::Receiver<()>,
closed: oneshot::Sender<()>,
}
impl Task {
@@ -34,14 +36,9 @@ impl Task {
let (run_now_send, run_now_recv) = mpsc::channel(1);
// And any call to close satisfies all calls to close
let (close_send, close_recv) = mpsc::channel(1);
let (closed_send, closed_recv) = oneshot::channel();
(
Self { run_now: run_now_recv, close: close_recv, closed: closed_send },
TaskHandle {
run_now: run_now_send,
close: close_send,
closed: Arc::new(Mutex::new(Closed::NotClosed(Some(closed_recv)))),
},
Self { run_now: run_now_recv, close: close_recv },
TaskHandle { run_now: run_now_send, close: close_send },
)
}
}
@@ -61,24 +58,6 @@ impl TaskHandle {
}
}
}
/// Close the task.
///
/// Returns once the task shuts down after it finishes its current iteration (which may be of
/// unbounded time).
pub async fn close(self) {
// If another instance of the handle called tfhis, don't error
let _ = self.close.send(()).await;
// Wait until we receive the closed message
let mut closed = self.closed.lock().await;
match &mut *closed {
Closed::NotClosed(ref mut recv) => {
assert_eq!(recv.take().unwrap().await, Ok(()), "continually ran task dropped itself?");
*closed = Closed::Closed;
}
Closed::Closed => {}
}
}
}
/// A task to be continually ran.
@@ -152,8 +131,6 @@ pub trait ContinuallyRan: Sized + Send {
},
}
}
task.closed.send(()).unwrap();
}
}
}

View File

@@ -285,10 +285,10 @@ impl<D: Db> Cosigning<D> {
///
/// If this global session hasn't produced any notable cosigns, this will return the latest
/// cosigns for this session.
pub fn notable_cosigns(&self, global_session: [u8; 32]) -> Vec<SignedCosign> {
pub fn notable_cosigns(getter: &impl Get, global_session: [u8; 32]) -> Vec<SignedCosign> {
let mut cosigns = Vec::with_capacity(serai_client::primitives::NETWORKS.len());
for network in serai_client::primitives::NETWORKS {
if let Some(cosign) = NetworksLatestCosignedBlock::get(&self.db, global_session, network) {
if let Some(cosign) = NetworksLatestCosignedBlock::get(getter, global_session, network) {
cosigns.push(cosign);
}
}

View File

@@ -17,6 +17,16 @@ use serai_task::ContinuallyRan;
use crate::p2p::{PORT, Peers, validators::Validators};
const TARGET_PEERS_PER_NETWORK: usize = 5;
/*
If we only tracked the target amount of peers per network, we'd risk being eclipsed by an
adversary who immediately connects to us with their array of validators upon our boot. Their
array would satisfy our target amount of peers, so we'd never seek more, enabling the adversary
to be the only entity we peered with.
We solve this by additionally requiring an explicit amount of peers we dialed. That means we
randomly chose to connect to these peers.
*/
// TODO const TARGET_DIALED_PEERS_PER_NETWORK: usize = 3;
struct DialTask {
serai: Serai,

View File

@@ -7,9 +7,10 @@ use borsh::{BorshSerialize, BorshDeserialize};
use serai_client::validator_sets::primitives::ValidatorSet;
use libp2p::gossipsub::{
IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentityTransform,
AllowAllSubscriptionFilter, Behaviour,
TopicHash, IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
IdentityTransform, AllowAllSubscriptionFilter, Behaviour,
};
pub use libp2p::gossipsub::Event;
use serai_cosign::SignedCosign;
@@ -27,10 +28,19 @@ fn topic_for_set(set: ValidatorSet) -> IdentTopic {
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) enum Message {
Tribuary { genesis: [u8; 32], message: Vec<u8> },
Tributary { set: ValidatorSet, message: Vec<u8> },
Cosign(SignedCosign),
}
impl Message {
pub(crate) fn topic(&self) -> TopicHash {
match self {
Message::Tributary { set, .. } => topic_for_set(*set).hash(),
Message::Cosign(_) => IdentTopic::new(BASE_TOPIC).hash(),
}
}
}
pub(crate) type Behavior = Behaviour<IdentityTransform, AllowAllSubscriptionFilter>;
pub(crate) fn new_behavior() -> Behavior {

View File

@@ -1,19 +1,16 @@
use core::future::Future;
use std::{
sync::Arc,
collections::{HashSet, HashMap},
time::{Duration, Instant},
};
use serai_client::primitives::{NetworkId, PublicKey};
use tokio::sync::{mpsc, RwLock};
use tokio::sync::RwLock;
use futures_util::StreamExt;
use libp2p::{
multihash::Multihash,
identity::PeerId,
swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm},
};
use serai_task::ContinuallyRan;
use libp2p::{multihash::Multihash, identity::PeerId, swarm::NetworkBehaviour};
/// A struct to sync the validators from the Serai node in order to keep track of them.
mod validators;
@@ -35,6 +32,9 @@ mod gossip;
/// The heartbeat task, effecting sync of Tributaries
mod heartbeat;
/// The swarm task, running it and dispatching to/from it
mod swarm;
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
fn peer_id_from_public(public: PublicKey) -> PeerId {
@@ -76,133 +76,19 @@ struct Behavior {
gossip: gossip::Behavior,
}
struct SwarmTask {
to_dial: mpsc::UnboundedReceiver<DialOpts>,
struct UpdateSharedValidatorsTask {
validators: Arc<RwLock<Validators>>,
last_refreshed_validators: Instant,
next_refresh_validators: Instant,
peers: Peers,
rebuild_peers_at: Instant,
swarm: Swarm<Behavior>,
}
impl SwarmTask {
async fn run(mut self) {
loop {
let time_till_refresh_validators =
self.next_refresh_validators.saturating_duration_since(Instant::now());
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
impl ContinuallyRan for UpdateSharedValidatorsTask {
// Only run every minute, not the default of every five seconds
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
tokio::select! {
biased;
// Refresh the instance of validators we use to track peers/share with authenticate
// TODO: Move this to a task
() = tokio::time::sleep(time_till_refresh_validators) => {
const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(60);
const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5 * 60);
let update = update_shared_validators(&self.validators).await;
match update {
Ok(removed) => {
for removed in removed {
let _: Result<_, _> = self.swarm.disconnect_peer_id(removed);
}
self.last_refreshed_validators = Instant::now();
self.next_refresh_validators = Instant::now() + TIME_BETWEEN_REFRESH_VALIDATORS;
}
Err(e) => {
log::warn!("couldn't refresh validators: {e:?}");
// Increase the delay before the next refresh by using the time since the last
// refresh. This will be 5 seconds, then 5 seconds, then 10 seconds, then 20...
let time_since_last = self
.next_refresh_validators
.saturating_duration_since(self.last_refreshed_validators);
// But limit the delay
self.next_refresh_validators =
Instant::now() + time_since_last.min(MAX_TIME_BETWEEN_REFRESH_VALIDATORS);
},
}
}
// Rebuild the peers every 10 minutes
//
// This handles edge cases such as when a validator changes the networks they're present
// in, race conditions, or any other edge cases/quirks which would otherwise risk spiraling
// out of control
() = tokio::time::sleep(time_till_rebuild_peers) => {
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
let validators_by_network = self.validators.read().await.by_network().clone();
let connected = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
let mut peers = HashMap::new();
for (network, validators) in validators_by_network {
peers.insert(network, validators.intersection(&connected).copied().collect());
}
*self.peers.peers.write().await = peers;
self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS;
}
// Dial peers we're instructed to
dial_opts = self.to_dial.recv() => {
let dial_opts = dial_opts.expect("DialTask was closed?");
let _: Result<_, _> = self.swarm.dial(dial_opts);
}
// Handle swarm events
event = self.swarm.next() => {
// `Swarm::next` will never return `Poll::Ready(None)`
// https://docs.rs/
// libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E
let event = event.unwrap();
match event {
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => todo!("TODO"),
SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => todo!("TODO"),
// New connection, so update peers
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
let Some(networks) =
self.validators.read().await.networks(&peer_id).cloned() else { continue };
for network in networks {
self
.peers
.peers
.write()
.await
.entry(network)
.or_insert_with(HashSet::new)
.insert(peer_id);
}
},
// Connection closed, so update peers
SwarmEvent::ConnectionClosed { peer_id, .. } => {
let Some(networks) =
self.validators.read().await.networks(&peer_id).cloned() else { continue };
for network in networks {
self
.peers
.peers
.write()
.await
.entry(network)
.or_insert_with(HashSet::new)
.remove(&peer_id);
}
},
SwarmEvent::IncomingConnection { .. } |
SwarmEvent::IncomingConnectionError { .. } |
SwarmEvent::OutgoingConnectionError { .. } |
SwarmEvent::NewListenAddr { .. } |
SwarmEvent::ExpiredListenAddr { .. } |
SwarmEvent::ListenerClosed { .. } |
SwarmEvent::ListenerError { .. } |
SwarmEvent::Dialing { .. } => {}
}
}
}
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
async move {
update_shared_validators(&self.validators).await.map_err(|e| format!("{e:?}"))?;
Ok(true)
}
}
}

View File

@@ -8,7 +8,10 @@ use serai_client::validator_sets::primitives::ValidatorSet;
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use libp2p::request_response::{Codec as CodecTrait, Config, Behaviour, ProtocolSupport};
use libp2p::request_response::{
self, Codec as CodecTrait, Event as GenericEvent, Config, Behaviour, ProtocolSupport,
};
pub use request_response::Message;
use serai_cosign::SignedCosign;
@@ -43,16 +46,19 @@ pub(crate) struct TributaryBlockWithCommit {
/// Responses which can be received via the request-response protocol.
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub(crate) enum Response {
NoResponse,
Blocks(Vec<TributaryBlockWithCommit>),
NotableCosigns(Vec<SignedCosign>),
}
impl fmt::Debug for Response {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
(match self {
Response::Blocks(_) => fmt.debug_struct("Response::Block"),
Response::NotableCosigns(_) => fmt.debug_struct("Response::NotableCosigns"),
})
.finish_non_exhaustive()
match self {
Response::NoResponse => fmt.debug_struct("Response::NoResponse").finish(),
Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(),
Response::NotableCosigns(_) => {
fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive()
}
}
}
}
@@ -127,6 +133,8 @@ impl CodecTrait for Codec {
}
}
pub(crate) type Event = GenericEvent<Request, Response>;
pub(crate) type Behavior = Behaviour<Codec>;
pub(crate) fn new_behavior() -> Behavior {
let mut config = Config::default();

View File

@@ -0,0 +1,276 @@
use std::{
sync::Arc,
collections::{HashSet, HashMap},
time::{Duration, Instant},
};
use borsh::BorshDeserialize;
use serai_client::validator_sets::primitives::ValidatorSet;
use tokio::sync::{mpsc, oneshot, RwLock};
use serai_task::TaskHandle;
use serai_cosign::SignedCosign;
use futures_util::StreamExt;
use libp2p::{
identity::PeerId,
request_response::{RequestId, ResponseChannel},
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
};
use crate::p2p::{
Peers, BehaviorEvent, Behavior,
validators::Validators,
reqres::{self, Request, Response},
gossip,
};
/*
`SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the
contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of
purposes making this a rather large task.
Responsibilities include:
- Actually dialing new peers (the selection process occurs in another task)
- Maintaining the peers structure (as we need the Swarm object to see who our peers are)
- Gossiping messages
- Dispatching gossiped messages
- Sending requests
- Dispatching responses to requests
- Dispatching received requests
- Sending responses
*/
struct SwarmTask {
dial_task: TaskHandle,
to_dial: mpsc::UnboundedReceiver<DialOpts>,
last_dial_task_run: Instant,
validators: Arc<RwLock<Validators>>,
peers: Peers,
rebuild_peers_at: Instant,
swarm: Swarm<Behavior>,
gossip: mpsc::UnboundedReceiver<gossip::Message>,
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Option<Response>>)>,
outbound_request_responses: HashMap<RequestId, oneshot::Sender<Option<Response>>>,
inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>,
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
/* TODO
let cosigns = Cosigning::<D>::notable_cosigns(&self.db, global_session);
let res = reqres::Response::NotableCosigns(cosigns);
let _: Result<_, _> = self.swarm.behaviour_mut().reqres.send_response(channel, res);
*/
notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>,
inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>,
}
impl SwarmTask {
fn handle_gossip(&mut self, event: gossip::Event) {
match event {
gossip::Event::Message { message, .. } => {
let Ok(message) = gossip::Message::deserialize(&mut message.data.as_slice()) else {
// TODO: Penalize the PeerId which sent this message
return;
};
match message {
gossip::Message::Tributary { set, message } => {
let _: Result<_, _> = self.tributary_gossip.send((set, message));
}
gossip::Message::Cosign(signed_cosign) => {
let _: Result<_, _> = self.signed_cosigns.send(signed_cosign);
}
}
}
gossip::Event::Subscribed { .. } | gossip::Event::Unsubscribed { .. } => {}
gossip::Event::GossipsubNotSupported { peer_id } => {
let _: Result<_, _> = self.swarm.disconnect_peer_id(peer_id);
}
}
}
fn handle_reqres(&mut self, event: reqres::Event) {
match event {
reqres::Event::Message { message, .. } => match message {
reqres::Message::Request { request_id, request, channel } => {
match request {
// TODO: Send these
reqres::Request::KeepAlive => {
let _: Result<_, _> =
self.swarm.behaviour_mut().reqres.send_response(channel, Response::NoResponse);
}
reqres::Request::Heartbeat { set, latest_block_hash } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> =
self.heartbeat_requests.send((request_id, set, latest_block_hash));
}
reqres::Request::NotableCosigns { global_session } => {
self.inbound_request_response_channels.insert(request_id, channel);
let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session));
}
}
}
reqres::Message::Response { request_id, response } => {
// Send Some(response) as the response for the request
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
let _: Result<_, _> = channel.send(Some(response));
}
}
},
reqres::Event::OutboundFailure { request_id, .. } => {
// Send None as the response for the request
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
let _: Result<_, _> = channel.send(None);
}
}
reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}
}
}
async fn run(mut self) {
loop {
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
tokio::select! {
// Dial peers we're instructed to
dial_opts = self.to_dial.recv() => {
let dial_opts = dial_opts.expect("DialTask was closed?");
let _: Result<_, _> = self.swarm.dial(dial_opts);
}
/*
Rebuild the peers every 10 minutes.
This protects against any race conditions/edge cases we have in our logic to track peers,
along with unrepresented behavior such as when a peer changes the networks they're active
in. This lets the peer tracking logic simply be 'good enough' to not become horribly
corrupt over the span of `TIME_BETWEEN_REBUILD_PEERS`.
We also use this to disconnect all peers who are no longer active in any network.
*/
() = tokio::time::sleep(time_till_rebuild_peers) => {
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
let validators_by_network = self.validators.read().await.by_network().clone();
let connected_peers = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
// We initially populate the list of peers to disconnect with all peers
let mut to_disconnect = connected_peers.clone();
// Build the new peers object
let mut peers = HashMap::new();
for (network, validators) in validators_by_network {
peers.insert(network, validators.intersection(&connected_peers).copied().collect());
// If this peer is in this validator set, don't keep it flagged for disconnection
to_disconnect.retain(|peer| !validators.contains(peer));
}
// Write the new peers object
*self.peers.peers.write().await = peers;
self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS;
// Disconnect all peers marked for disconnection
for peer in to_disconnect {
let _: Result<_, _> = self.swarm.disconnect_peer_id(peer);
}
}
// Handle swarm events
event = self.swarm.next() => {
// `Swarm::next` will never return `Poll::Ready(None)`
// https://docs.rs/
// libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E
let event = event.unwrap();
match event {
// New connection, so update peers
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
let Some(networks) =
self.validators.read().await.networks(&peer_id).cloned() else { continue };
let mut peers = self.peers.peers.write().await;
for network in networks {
peers.entry(network).or_insert_with(HashSet::new).insert(peer_id);
}
}
// Connection closed, so update peers
SwarmEvent::ConnectionClosed { peer_id, .. } => {
let Some(networks) =
self.validators.read().await.networks(&peer_id).cloned() else { continue };
let mut peers = self.peers.peers.write().await;
for network in networks {
peers.entry(network).or_insert_with(HashSet::new).remove(&peer_id);
}
/*
We want to re-run the dial task, since we lost a peer, in case we should find new
peers. This opens a DoS where a validator repeatedly opens/closes connections to
force iterations of the dial task. We prevent this by setting a minimum distance
since the last explicit iteration.
This is suboptimal. If we have several disconnects in immediate proximity, we'll
trigger the dial task upon the first (where we may still have enough peers we
shouldn't dial more) but not the last (where we may have so few peers left we
should dial more). This is accepted as the dial task will eventually run on its
natural timer.
*/
const MINIMUM_TIME_SINCE_LAST_EXPLICIT_DIAL: Duration = Duration::from_secs(60);
let now = Instant::now();
if (self.last_dial_task_run + MINIMUM_TIME_SINCE_LAST_EXPLICIT_DIAL) < now {
self.dial_task.run_now();
self.last_dial_task_run = now;
}
}
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => {
self.handle_reqres(event)
}
SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => {
self.handle_gossip(event)
}
// We don't handle any of these
SwarmEvent::IncomingConnection { .. } |
SwarmEvent::IncomingConnectionError { .. } |
SwarmEvent::OutgoingConnectionError { .. } |
SwarmEvent::NewListenAddr { .. } |
SwarmEvent::ExpiredListenAddr { .. } |
SwarmEvent::ListenerClosed { .. } |
SwarmEvent::ListenerError { .. } |
SwarmEvent::Dialing { .. } => {}
}
}
message = self.gossip.recv() => {
let message = message.expect("channel for messages to gossip was closed?");
let topic = message.topic();
let message = borsh::to_vec(&message).unwrap();
let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message);
}
request = self.outbound_requests.recv() => {
let (peer, request, response_channel) =
request.expect("channel for requests was closed?");
let request_id = self.swarm.behaviour_mut().reqres.send_request(&peer, request);
self.outbound_request_responses.insert(request_id, response_channel);
}
response = self.inbound_request_responses.recv() => {
let (request_id, response) =
response.expect("channel for inbound request responses was closed?");
if let Some(channel) = self.inbound_request_response_channels.remove(&request_id) {
let _: Result<_, _> =
self.swarm.behaviour_mut().reqres.send_response(channel, response);
}
}
}
}
}
}

View File

@@ -77,17 +77,16 @@ impl Validators {
fn incorporate_session_changes(
&mut self,
session_changes: Vec<(NetworkId, Session, HashSet<PeerId>)>,
) -> HashSet<PeerId> {
let mut removed = HashSet::new();
) {
for (network, session, validators) in session_changes {
// Remove the existing validators
for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) {
// Get all networks this validator is in
let mut networks = self.validators.remove(&validator).unwrap();
// Remove this one
networks.remove(&network);
if networks.is_empty() {
removed.insert(validator);
} else {
// Insert the networks back if the validator was present in other networks
if !networks.is_empty() {
self.validators.insert(validator, networks);
}
}
@@ -101,16 +100,15 @@ impl Validators {
// Update the session we have populated
self.sessions.insert(network, session);
}
removed
}
/// Update the view of the validators.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update(&mut self) -> Result<HashSet<PeerId>, String> {
pub(crate) async fn update(&mut self) -> Result<(), String> {
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
Ok(self.incorporate_session_changes(session_changes))
self.incorporate_session_changes(session_changes);
Ok(())
}
pub(crate) fn by_network(&self) -> &HashMap<NetworkId, HashSet<PeerId>> {
@@ -128,13 +126,17 @@ impl Validators {
/// Update the view of the validators.
///
/// This minimizes the time an exclusive lock is held over the validators to minimize the
/// disruption to functioning.
///
/// Returns all validators removed from the active validator set.
pub(crate) async fn update_shared_validators(
validators: &Arc<RwLock<Validators>>,
) -> Result<HashSet<PeerId>, String> {
) -> Result<(), String> {
let session_changes = {
let validators = validators.read().await;
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
};
Ok(validators.write().await.incorporate_session_changes(session_changes))
validators.write().await.incorporate_session_changes(session_changes);
Ok(())
}

View File

@@ -24,6 +24,7 @@ workspace = true
rand_core = { version = "0.6", default-features = false }
zeroize = { version = "1", default-features = false, features = ["std"] }
blake2 = { version = "0.10", default-features = false, features = ["std"] }
ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] }
frost = { package = "modular-frost", path = "../../crypto/frost", default-features = false }
frost-schnorrkel = { path = "../../crypto/schnorrkel", default-features = false }

View File

@@ -5,8 +5,9 @@ use serai_db::{Get, DbTxn, create_db};
create_db! {
SignersBatch {
ActiveSigningProtocols: (session: Session) -> Vec<u32>,
Batches: (id: u32) -> Batch,
ActiveSigningProtocols: (session: Session) -> Vec<[u8; 32]>,
BatchHash: (id: u32) -> [u8; 32],
Batches: (hash: [u8; 32]) -> Batch,
SignedBatches: (id: u32) -> SignedBatch,
LastAcknowledgedBatch: () -> u32,
}

View File

@@ -1,9 +1,12 @@
use core::future::Future;
use std::collections::HashSet;
use blake2::{digest::typenum::U32, Digest, Blake2b};
use ciphersuite::{group::GroupEncoding, Ristretto};
use frost::dkg::ThresholdKeys;
use scale::Encode;
use serai_validator_sets_primitives::Session;
use serai_in_instructions_primitives::{SignedBatch, batch_message};
@@ -40,7 +43,7 @@ pub(crate) struct BatchSignerTask<D: Db, E: GroupEncoding> {
external_key: E,
keys: Vec<ThresholdKeys<Ristretto>>,
active_signing_protocols: HashSet<u32>,
active_signing_protocols: HashSet<[u8; 32]>,
attempt_manager: AttemptManager<D, WrappedSchnorrkelMachine>,
}
@@ -63,7 +66,6 @@ impl<D: Db, E: GroupEncoding> BatchSignerTask<D, E> {
active_signing_protocols.insert(id);
let batch = Batches::get(&db, id).unwrap();
assert_eq!(batch.id, id);
let mut machines = Vec::with_capacity(keys.len());
for keys in &keys {
@@ -90,19 +92,21 @@ impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
iterated = true;
// Save this to the database as a transaction to sign
self.active_signing_protocols.insert(batch.id);
let batch_hash = <[u8; 32]>::from(Blake2b::<U32>::digest(batch.encode()));
self.active_signing_protocols.insert(batch_hash);
ActiveSigningProtocols::set(
&mut txn,
self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
Batches::set(&mut txn, batch.id, &batch);
BatchHash::set(&mut txn, batch.id, &batch_hash);
Batches::set(&mut txn, batch_hash, &batch);
let mut machines = Vec::with_capacity(self.keys.len());
for keys in &self.keys {
machines.push(WrappedSchnorrkelMachine::new(keys.clone(), batch_message(&batch)));
}
for msg in self.attempt_manager.register(VariantSignId::Batch(batch.id), machines) {
for msg in self.attempt_manager.register(VariantSignId::Batch(batch_hash), machines) {
BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
}
@@ -112,48 +116,57 @@ impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
// Check for acknowledged Batches (meaning we should no longer sign for these Batches)
loop {
let mut txn = self.db.txn();
let Some(id) = AcknowledgedBatches::try_recv(&mut txn, &self.external_key) else {
break;
};
let batch_hash = {
let Some(batch_id) = AcknowledgedBatches::try_recv(&mut txn, &self.external_key) else {
break;
};
/*
We may have yet to register this signing protocol.
While `BatchesToSign` is populated before `AcknowledgedBatches`, we could theoretically
have `BatchesToSign` populated with a new batch _while iterating over
`AcknowledgedBatches`_, and then have `AcknowledgedBatched` populated. In that edge
case, we will see the acknowledgement notification before we see the transaction.
In such a case, we break (dropping the txn, re-queueing the acknowledgement
notification). On the task's next iteration, we'll process the Batch from
`BatchesToSign` and be able to make progress.
*/
let Some(batch_hash) = BatchHash::take(&mut txn, batch_id) else {
drop(txn);
break;
};
batch_hash
};
let batch =
Batches::take(&mut txn, batch_hash).expect("BatchHash populated but not Batches");
iterated = true;
// Update the last acknowledged Batch
{
let last_acknowledged = LastAcknowledgedBatch::get(&txn);
if Some(id) > last_acknowledged {
LastAcknowledgedBatch::set(&mut txn, &id);
if Some(batch.id) > last_acknowledged {
LastAcknowledgedBatch::set(&mut txn, &batch.id);
}
}
/*
We may have yet to register this signing protocol.
While `BatchesToSign` is populated before `AcknowledgedBatches`, we could theoretically
have `BatchesToSign` populated with a new batch _while iterating over
`AcknowledgedBatches`_, and then have `AcknowledgedBatched` populated. In that edge case,
we will see the acknowledgement notification before we see the transaction.
In such a case, we break (dropping the txn, re-queueing the acknowledgement notification).
On the task's next iteration, we'll process the Batch from `BatchesToSign` and be
able to make progress.
*/
if !self.active_signing_protocols.remove(&id) {
break;
}
iterated = true;
// Since it was, remove this as an active signing protocol
// Remove this as an active signing protocol
assert!(self.active_signing_protocols.remove(&batch_hash));
ActiveSigningProtocols::set(
&mut txn,
self.session,
&self.active_signing_protocols.iter().copied().collect(),
);
// Clean up the database
Batches::del(&mut txn, id);
SignedBatches::del(&mut txn, id);
// Clean up SignedBatches
SignedBatches::del(&mut txn, batch.id);
// We retire with a txn so we either successfully flag this Batch as acknowledged, and
// won't re-register it (making this retire safe), or we don't flag it, meaning we will
// re-register it, yet that's safe as we have yet to retire it
self.attempt_manager.retire(&mut txn, VariantSignId::Batch(id));
self.attempt_manager.retire(&mut txn, VariantSignId::Batch(batch_hash));
txn.commit();
}

View File

@@ -143,7 +143,7 @@ impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
// the prior Batch(es) (and accordingly didn't publish them)
let last_batch =
crate::batch::last_acknowledged_batch(&txn).max(db::LastPublishedBatch::get(&txn));
let mut next_batch = last_batch.map_or(0, |id| id + 1);
let mut next_batch = last_batch.map(|id| id + 1).unwrap_or(0);
while let Some(batch) = crate::batch::signed_batch(&txn, next_batch) {
iterated = true;
db::LastPublishedBatch::set(&mut txn, &batch.batch.id);