mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-12 14:09:25 +00:00
Compare commits
8 Commits
c6d0fb477c
...
a731c0005d
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a731c0005d | ||
|
|
f27e4e3202 | ||
|
|
f55165e016 | ||
|
|
d9e9887d34 | ||
|
|
82e753db30 | ||
|
|
052388285b | ||
|
|
47a4e534ef | ||
|
|
257f691277 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8996,6 +8996,7 @@ dependencies = [
|
|||||||
name = "serai-processor-signers"
|
name = "serai-processor-signers"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"blake2",
|
||||||
"borsh",
|
"borsh",
|
||||||
"ciphersuite",
|
"ciphersuite",
|
||||||
"frost-schnorrkel",
|
"frost-schnorrkel",
|
||||||
|
|||||||
@@ -3,27 +3,29 @@
|
|||||||
#![deny(missing_docs)]
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
use core::{future::Future, time::Duration};
|
use core::{future::Future, time::Duration};
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use tokio::sync::{mpsc, oneshot, Mutex};
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
enum Closed {
|
|
||||||
NotClosed(Option<oneshot::Receiver<()>>),
|
|
||||||
Closed,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A handle for a task.
|
/// A handle for a task.
|
||||||
|
///
|
||||||
|
/// The task will only stop running once all handles for it are dropped.
|
||||||
|
//
|
||||||
|
// `run_now` isn't infallible if the task may have been closed. `run_now` on a closed task would
|
||||||
|
// either need to panic (historic behavior), silently drop the fact the task can't be run, or
|
||||||
|
// return an error. Instead of having a potential panic, and instead of modeling the error
|
||||||
|
// behavior, this task can't be closed unless all handles are dropped, ensuring calls to `run_now`
|
||||||
|
// are infallible.
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TaskHandle {
|
pub struct TaskHandle {
|
||||||
run_now: mpsc::Sender<()>,
|
run_now: mpsc::Sender<()>,
|
||||||
|
#[allow(dead_code)] // This is used to track if all handles have been dropped
|
||||||
close: mpsc::Sender<()>,
|
close: mpsc::Sender<()>,
|
||||||
closed: Arc<Mutex<Closed>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A task's internal structures.
|
/// A task's internal structures.
|
||||||
pub struct Task {
|
pub struct Task {
|
||||||
run_now: mpsc::Receiver<()>,
|
run_now: mpsc::Receiver<()>,
|
||||||
close: mpsc::Receiver<()>,
|
close: mpsc::Receiver<()>,
|
||||||
closed: oneshot::Sender<()>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Task {
|
impl Task {
|
||||||
@@ -34,14 +36,9 @@ impl Task {
|
|||||||
let (run_now_send, run_now_recv) = mpsc::channel(1);
|
let (run_now_send, run_now_recv) = mpsc::channel(1);
|
||||||
// And any call to close satisfies all calls to close
|
// And any call to close satisfies all calls to close
|
||||||
let (close_send, close_recv) = mpsc::channel(1);
|
let (close_send, close_recv) = mpsc::channel(1);
|
||||||
let (closed_send, closed_recv) = oneshot::channel();
|
|
||||||
(
|
(
|
||||||
Self { run_now: run_now_recv, close: close_recv, closed: closed_send },
|
Self { run_now: run_now_recv, close: close_recv },
|
||||||
TaskHandle {
|
TaskHandle { run_now: run_now_send, close: close_send },
|
||||||
run_now: run_now_send,
|
|
||||||
close: close_send,
|
|
||||||
closed: Arc::new(Mutex::new(Closed::NotClosed(Some(closed_recv)))),
|
|
||||||
},
|
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -61,24 +58,6 @@ impl TaskHandle {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Close the task.
|
|
||||||
///
|
|
||||||
/// Returns once the task shuts down after it finishes its current iteration (which may be of
|
|
||||||
/// unbounded time).
|
|
||||||
pub async fn close(self) {
|
|
||||||
// If another instance of the handle called tfhis, don't error
|
|
||||||
let _ = self.close.send(()).await;
|
|
||||||
// Wait until we receive the closed message
|
|
||||||
let mut closed = self.closed.lock().await;
|
|
||||||
match &mut *closed {
|
|
||||||
Closed::NotClosed(ref mut recv) => {
|
|
||||||
assert_eq!(recv.take().unwrap().await, Ok(()), "continually ran task dropped itself?");
|
|
||||||
*closed = Closed::Closed;
|
|
||||||
}
|
|
||||||
Closed::Closed => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A task to be continually ran.
|
/// A task to be continually ran.
|
||||||
@@ -152,8 +131,6 @@ pub trait ContinuallyRan: Sized + Send {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
task.closed.send(()).unwrap();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -285,10 +285,10 @@ impl<D: Db> Cosigning<D> {
|
|||||||
///
|
///
|
||||||
/// If this global session hasn't produced any notable cosigns, this will return the latest
|
/// If this global session hasn't produced any notable cosigns, this will return the latest
|
||||||
/// cosigns for this session.
|
/// cosigns for this session.
|
||||||
pub fn notable_cosigns(&self, global_session: [u8; 32]) -> Vec<SignedCosign> {
|
pub fn notable_cosigns(getter: &impl Get, global_session: [u8; 32]) -> Vec<SignedCosign> {
|
||||||
let mut cosigns = Vec::with_capacity(serai_client::primitives::NETWORKS.len());
|
let mut cosigns = Vec::with_capacity(serai_client::primitives::NETWORKS.len());
|
||||||
for network in serai_client::primitives::NETWORKS {
|
for network in serai_client::primitives::NETWORKS {
|
||||||
if let Some(cosign) = NetworksLatestCosignedBlock::get(&self.db, global_session, network) {
|
if let Some(cosign) = NetworksLatestCosignedBlock::get(getter, global_session, network) {
|
||||||
cosigns.push(cosign);
|
cosigns.push(cosign);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,6 +17,16 @@ use serai_task::ContinuallyRan;
|
|||||||
use crate::p2p::{PORT, Peers, validators::Validators};
|
use crate::p2p::{PORT, Peers, validators::Validators};
|
||||||
|
|
||||||
const TARGET_PEERS_PER_NETWORK: usize = 5;
|
const TARGET_PEERS_PER_NETWORK: usize = 5;
|
||||||
|
/*
|
||||||
|
If we only tracked the target amount of peers per network, we'd risk being eclipsed by an
|
||||||
|
adversary who immediately connects to us with their array of validators upon our boot. Their
|
||||||
|
array would satisfy our target amount of peers, so we'd never seek more, enabling the adversary
|
||||||
|
to be the only entity we peered with.
|
||||||
|
|
||||||
|
We solve this by additionally requiring an explicit amount of peers we dialed. That means we
|
||||||
|
randomly chose to connect to these peers.
|
||||||
|
*/
|
||||||
|
// TODO const TARGET_DIALED_PEERS_PER_NETWORK: usize = 3;
|
||||||
|
|
||||||
struct DialTask {
|
struct DialTask {
|
||||||
serai: Serai,
|
serai: Serai,
|
||||||
|
|||||||
@@ -7,9 +7,10 @@ use borsh::{BorshSerialize, BorshDeserialize};
|
|||||||
use serai_client::validator_sets::primitives::ValidatorSet;
|
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||||
|
|
||||||
use libp2p::gossipsub::{
|
use libp2p::gossipsub::{
|
||||||
IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentityTransform,
|
TopicHash, IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder,
|
||||||
AllowAllSubscriptionFilter, Behaviour,
|
IdentityTransform, AllowAllSubscriptionFilter, Behaviour,
|
||||||
};
|
};
|
||||||
|
pub use libp2p::gossipsub::Event;
|
||||||
|
|
||||||
use serai_cosign::SignedCosign;
|
use serai_cosign::SignedCosign;
|
||||||
|
|
||||||
@@ -27,10 +28,19 @@ fn topic_for_set(set: ValidatorSet) -> IdentTopic {
|
|||||||
|
|
||||||
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||||
pub(crate) enum Message {
|
pub(crate) enum Message {
|
||||||
Tribuary { genesis: [u8; 32], message: Vec<u8> },
|
Tributary { set: ValidatorSet, message: Vec<u8> },
|
||||||
Cosign(SignedCosign),
|
Cosign(SignedCosign),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl Message {
|
||||||
|
pub(crate) fn topic(&self) -> TopicHash {
|
||||||
|
match self {
|
||||||
|
Message::Tributary { set, .. } => topic_for_set(*set).hash(),
|
||||||
|
Message::Cosign(_) => IdentTopic::new(BASE_TOPIC).hash(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) type Behavior = Behaviour<IdentityTransform, AllowAllSubscriptionFilter>;
|
pub(crate) type Behavior = Behaviour<IdentityTransform, AllowAllSubscriptionFilter>;
|
||||||
|
|
||||||
pub(crate) fn new_behavior() -> Behavior {
|
pub(crate) fn new_behavior() -> Behavior {
|
||||||
|
|||||||
@@ -1,19 +1,16 @@
|
|||||||
|
use core::future::Future;
|
||||||
use std::{
|
use std::{
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
collections::{HashSet, HashMap},
|
collections::{HashSet, HashMap},
|
||||||
time::{Duration, Instant},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use serai_client::primitives::{NetworkId, PublicKey};
|
use serai_client::primitives::{NetworkId, PublicKey};
|
||||||
|
|
||||||
use tokio::sync::{mpsc, RwLock};
|
use tokio::sync::RwLock;
|
||||||
|
|
||||||
use futures_util::StreamExt;
|
use serai_task::ContinuallyRan;
|
||||||
use libp2p::{
|
|
||||||
multihash::Multihash,
|
use libp2p::{multihash::Multihash, identity::PeerId, swarm::NetworkBehaviour};
|
||||||
identity::PeerId,
|
|
||||||
swarm::{dial_opts::DialOpts, NetworkBehaviour, SwarmEvent, Swarm},
|
|
||||||
};
|
|
||||||
|
|
||||||
/// A struct to sync the validators from the Serai node in order to keep track of them.
|
/// A struct to sync the validators from the Serai node in order to keep track of them.
|
||||||
mod validators;
|
mod validators;
|
||||||
@@ -35,6 +32,9 @@ mod gossip;
|
|||||||
/// The heartbeat task, effecting sync of Tributaries
|
/// The heartbeat task, effecting sync of Tributaries
|
||||||
mod heartbeat;
|
mod heartbeat;
|
||||||
|
|
||||||
|
/// The swarm task, running it and dispatching to/from it
|
||||||
|
mod swarm;
|
||||||
|
|
||||||
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
|
const PORT: u16 = 30563; // 5132 ^ (('c' << 8) | 'o')
|
||||||
|
|
||||||
fn peer_id_from_public(public: PublicKey) -> PeerId {
|
fn peer_id_from_public(public: PublicKey) -> PeerId {
|
||||||
@@ -76,133 +76,19 @@ struct Behavior {
|
|||||||
gossip: gossip::Behavior,
|
gossip: gossip::Behavior,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SwarmTask {
|
struct UpdateSharedValidatorsTask {
|
||||||
to_dial: mpsc::UnboundedReceiver<DialOpts>,
|
|
||||||
|
|
||||||
validators: Arc<RwLock<Validators>>,
|
validators: Arc<RwLock<Validators>>,
|
||||||
last_refreshed_validators: Instant,
|
|
||||||
next_refresh_validators: Instant,
|
|
||||||
|
|
||||||
peers: Peers,
|
|
||||||
rebuild_peers_at: Instant,
|
|
||||||
|
|
||||||
swarm: Swarm<Behavior>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SwarmTask {
|
impl ContinuallyRan for UpdateSharedValidatorsTask {
|
||||||
async fn run(mut self) {
|
// Only run every minute, not the default of every five seconds
|
||||||
loop {
|
const DELAY_BETWEEN_ITERATIONS: u64 = 60;
|
||||||
let time_till_refresh_validators =
|
const MAX_DELAY_BETWEEN_ITERATIONS: u64 = 5 * 60;
|
||||||
self.next_refresh_validators.saturating_duration_since(Instant::now());
|
|
||||||
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
|
|
||||||
|
|
||||||
tokio::select! {
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||||
biased;
|
async move {
|
||||||
|
update_shared_validators(&self.validators).await.map_err(|e| format!("{e:?}"))?;
|
||||||
// Refresh the instance of validators we use to track peers/share with authenticate
|
Ok(true)
|
||||||
// TODO: Move this to a task
|
|
||||||
() = tokio::time::sleep(time_till_refresh_validators) => {
|
|
||||||
const TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(60);
|
|
||||||
const MAX_TIME_BETWEEN_REFRESH_VALIDATORS: Duration = Duration::from_secs(5 * 60);
|
|
||||||
|
|
||||||
let update = update_shared_validators(&self.validators).await;
|
|
||||||
match update {
|
|
||||||
Ok(removed) => {
|
|
||||||
for removed in removed {
|
|
||||||
let _: Result<_, _> = self.swarm.disconnect_peer_id(removed);
|
|
||||||
}
|
|
||||||
self.last_refreshed_validators = Instant::now();
|
|
||||||
self.next_refresh_validators = Instant::now() + TIME_BETWEEN_REFRESH_VALIDATORS;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!("couldn't refresh validators: {e:?}");
|
|
||||||
// Increase the delay before the next refresh by using the time since the last
|
|
||||||
// refresh. This will be 5 seconds, then 5 seconds, then 10 seconds, then 20...
|
|
||||||
let time_since_last = self
|
|
||||||
.next_refresh_validators
|
|
||||||
.saturating_duration_since(self.last_refreshed_validators);
|
|
||||||
// But limit the delay
|
|
||||||
self.next_refresh_validators =
|
|
||||||
Instant::now() + time_since_last.min(MAX_TIME_BETWEEN_REFRESH_VALIDATORS);
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Rebuild the peers every 10 minutes
|
|
||||||
//
|
|
||||||
// This handles edge cases such as when a validator changes the networks they're present
|
|
||||||
// in, race conditions, or any other edge cases/quirks which would otherwise risk spiraling
|
|
||||||
// out of control
|
|
||||||
() = tokio::time::sleep(time_till_rebuild_peers) => {
|
|
||||||
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
|
|
||||||
|
|
||||||
let validators_by_network = self.validators.read().await.by_network().clone();
|
|
||||||
let connected = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
|
|
||||||
let mut peers = HashMap::new();
|
|
||||||
for (network, validators) in validators_by_network {
|
|
||||||
peers.insert(network, validators.intersection(&connected).copied().collect());
|
|
||||||
}
|
|
||||||
*self.peers.peers.write().await = peers;
|
|
||||||
|
|
||||||
self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Dial peers we're instructed to
|
|
||||||
dial_opts = self.to_dial.recv() => {
|
|
||||||
let dial_opts = dial_opts.expect("DialTask was closed?");
|
|
||||||
let _: Result<_, _> = self.swarm.dial(dial_opts);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle swarm events
|
|
||||||
event = self.swarm.next() => {
|
|
||||||
// `Swarm::next` will never return `Poll::Ready(None)`
|
|
||||||
// https://docs.rs/
|
|
||||||
// libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E
|
|
||||||
let event = event.unwrap();
|
|
||||||
match event {
|
|
||||||
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => todo!("TODO"),
|
|
||||||
SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => todo!("TODO"),
|
|
||||||
// New connection, so update peers
|
|
||||||
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
|
|
||||||
let Some(networks) =
|
|
||||||
self.validators.read().await.networks(&peer_id).cloned() else { continue };
|
|
||||||
for network in networks {
|
|
||||||
self
|
|
||||||
.peers
|
|
||||||
.peers
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(network)
|
|
||||||
.or_insert_with(HashSet::new)
|
|
||||||
.insert(peer_id);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
// Connection closed, so update peers
|
|
||||||
SwarmEvent::ConnectionClosed { peer_id, .. } => {
|
|
||||||
let Some(networks) =
|
|
||||||
self.validators.read().await.networks(&peer_id).cloned() else { continue };
|
|
||||||
for network in networks {
|
|
||||||
self
|
|
||||||
.peers
|
|
||||||
.peers
|
|
||||||
.write()
|
|
||||||
.await
|
|
||||||
.entry(network)
|
|
||||||
.or_insert_with(HashSet::new)
|
|
||||||
.remove(&peer_id);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
SwarmEvent::IncomingConnection { .. } |
|
|
||||||
SwarmEvent::IncomingConnectionError { .. } |
|
|
||||||
SwarmEvent::OutgoingConnectionError { .. } |
|
|
||||||
SwarmEvent::NewListenAddr { .. } |
|
|
||||||
SwarmEvent::ExpiredListenAddr { .. } |
|
|
||||||
SwarmEvent::ListenerClosed { .. } |
|
|
||||||
SwarmEvent::ListenerError { .. } |
|
|
||||||
SwarmEvent::Dialing { .. } => {}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,7 +8,10 @@ use serai_client::validator_sets::primitives::ValidatorSet;
|
|||||||
|
|
||||||
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
|
||||||
use libp2p::request_response::{Codec as CodecTrait, Config, Behaviour, ProtocolSupport};
|
use libp2p::request_response::{
|
||||||
|
self, Codec as CodecTrait, Event as GenericEvent, Config, Behaviour, ProtocolSupport,
|
||||||
|
};
|
||||||
|
pub use request_response::Message;
|
||||||
|
|
||||||
use serai_cosign::SignedCosign;
|
use serai_cosign::SignedCosign;
|
||||||
|
|
||||||
@@ -43,16 +46,19 @@ pub(crate) struct TributaryBlockWithCommit {
|
|||||||
/// Responses which can be received via the request-response protocol.
|
/// Responses which can be received via the request-response protocol.
|
||||||
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||||
pub(crate) enum Response {
|
pub(crate) enum Response {
|
||||||
|
NoResponse,
|
||||||
Blocks(Vec<TributaryBlockWithCommit>),
|
Blocks(Vec<TributaryBlockWithCommit>),
|
||||||
NotableCosigns(Vec<SignedCosign>),
|
NotableCosigns(Vec<SignedCosign>),
|
||||||
}
|
}
|
||||||
impl fmt::Debug for Response {
|
impl fmt::Debug for Response {
|
||||||
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
(match self {
|
match self {
|
||||||
Response::Blocks(_) => fmt.debug_struct("Response::Block"),
|
Response::NoResponse => fmt.debug_struct("Response::NoResponse").finish(),
|
||||||
Response::NotableCosigns(_) => fmt.debug_struct("Response::NotableCosigns"),
|
Response::Blocks(_) => fmt.debug_struct("Response::Block").finish_non_exhaustive(),
|
||||||
})
|
Response::NotableCosigns(_) => {
|
||||||
.finish_non_exhaustive()
|
fmt.debug_struct("Response::NotableCosigns").finish_non_exhaustive()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -127,6 +133,8 @@ impl CodecTrait for Codec {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) type Event = GenericEvent<Request, Response>;
|
||||||
|
|
||||||
pub(crate) type Behavior = Behaviour<Codec>;
|
pub(crate) type Behavior = Behaviour<Codec>;
|
||||||
pub(crate) fn new_behavior() -> Behavior {
|
pub(crate) fn new_behavior() -> Behavior {
|
||||||
let mut config = Config::default();
|
let mut config = Config::default();
|
||||||
|
|||||||
276
coordinator/src/p2p/swarm.rs
Normal file
276
coordinator/src/p2p/swarm.rs
Normal file
@@ -0,0 +1,276 @@
|
|||||||
|
use std::{
|
||||||
|
sync::Arc,
|
||||||
|
collections::{HashSet, HashMap},
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use borsh::BorshDeserialize;
|
||||||
|
|
||||||
|
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||||
|
|
||||||
|
use tokio::sync::{mpsc, oneshot, RwLock};
|
||||||
|
|
||||||
|
use serai_task::TaskHandle;
|
||||||
|
|
||||||
|
use serai_cosign::SignedCosign;
|
||||||
|
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use libp2p::{
|
||||||
|
identity::PeerId,
|
||||||
|
request_response::{RequestId, ResponseChannel},
|
||||||
|
swarm::{dial_opts::DialOpts, SwarmEvent, Swarm},
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::p2p::{
|
||||||
|
Peers, BehaviorEvent, Behavior,
|
||||||
|
validators::Validators,
|
||||||
|
reqres::{self, Request, Response},
|
||||||
|
gossip,
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
`SwarmTask` handles everything we need the `Swarm` object for. The goal is to minimize the
|
||||||
|
contention on this task. Unfortunately, the `Swarm` object itself is needed for a variety of
|
||||||
|
purposes making this a rather large task.
|
||||||
|
|
||||||
|
Responsibilities include:
|
||||||
|
- Actually dialing new peers (the selection process occurs in another task)
|
||||||
|
- Maintaining the peers structure (as we need the Swarm object to see who our peers are)
|
||||||
|
- Gossiping messages
|
||||||
|
- Dispatching gossiped messages
|
||||||
|
- Sending requests
|
||||||
|
- Dispatching responses to requests
|
||||||
|
- Dispatching received requests
|
||||||
|
- Sending responses
|
||||||
|
*/
|
||||||
|
struct SwarmTask {
|
||||||
|
dial_task: TaskHandle,
|
||||||
|
to_dial: mpsc::UnboundedReceiver<DialOpts>,
|
||||||
|
last_dial_task_run: Instant,
|
||||||
|
|
||||||
|
validators: Arc<RwLock<Validators>>,
|
||||||
|
peers: Peers,
|
||||||
|
rebuild_peers_at: Instant,
|
||||||
|
|
||||||
|
swarm: Swarm<Behavior>,
|
||||||
|
|
||||||
|
gossip: mpsc::UnboundedReceiver<gossip::Message>,
|
||||||
|
signed_cosigns: mpsc::UnboundedSender<SignedCosign>,
|
||||||
|
tributary_gossip: mpsc::UnboundedSender<(ValidatorSet, Vec<u8>)>,
|
||||||
|
|
||||||
|
outbound_requests: mpsc::UnboundedReceiver<(PeerId, Request, oneshot::Sender<Option<Response>>)>,
|
||||||
|
outbound_request_responses: HashMap<RequestId, oneshot::Sender<Option<Response>>>,
|
||||||
|
|
||||||
|
inbound_request_response_channels: HashMap<RequestId, ResponseChannel<Response>>,
|
||||||
|
heartbeat_requests: mpsc::UnboundedSender<(RequestId, ValidatorSet, [u8; 32])>,
|
||||||
|
/* TODO
|
||||||
|
let cosigns = Cosigning::<D>::notable_cosigns(&self.db, global_session);
|
||||||
|
let res = reqres::Response::NotableCosigns(cosigns);
|
||||||
|
let _: Result<_, _> = self.swarm.behaviour_mut().reqres.send_response(channel, res);
|
||||||
|
*/
|
||||||
|
notable_cosign_requests: mpsc::UnboundedSender<(RequestId, [u8; 32])>,
|
||||||
|
inbound_request_responses: mpsc::UnboundedReceiver<(RequestId, Response)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SwarmTask {
|
||||||
|
fn handle_gossip(&mut self, event: gossip::Event) {
|
||||||
|
match event {
|
||||||
|
gossip::Event::Message { message, .. } => {
|
||||||
|
let Ok(message) = gossip::Message::deserialize(&mut message.data.as_slice()) else {
|
||||||
|
// TODO: Penalize the PeerId which sent this message
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
match message {
|
||||||
|
gossip::Message::Tributary { set, message } => {
|
||||||
|
let _: Result<_, _> = self.tributary_gossip.send((set, message));
|
||||||
|
}
|
||||||
|
gossip::Message::Cosign(signed_cosign) => {
|
||||||
|
let _: Result<_, _> = self.signed_cosigns.send(signed_cosign);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
gossip::Event::Subscribed { .. } | gossip::Event::Unsubscribed { .. } => {}
|
||||||
|
gossip::Event::GossipsubNotSupported { peer_id } => {
|
||||||
|
let _: Result<_, _> = self.swarm.disconnect_peer_id(peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_reqres(&mut self, event: reqres::Event) {
|
||||||
|
match event {
|
||||||
|
reqres::Event::Message { message, .. } => match message {
|
||||||
|
reqres::Message::Request { request_id, request, channel } => {
|
||||||
|
match request {
|
||||||
|
// TODO: Send these
|
||||||
|
reqres::Request::KeepAlive => {
|
||||||
|
let _: Result<_, _> =
|
||||||
|
self.swarm.behaviour_mut().reqres.send_response(channel, Response::NoResponse);
|
||||||
|
}
|
||||||
|
reqres::Request::Heartbeat { set, latest_block_hash } => {
|
||||||
|
self.inbound_request_response_channels.insert(request_id, channel);
|
||||||
|
let _: Result<_, _> =
|
||||||
|
self.heartbeat_requests.send((request_id, set, latest_block_hash));
|
||||||
|
}
|
||||||
|
reqres::Request::NotableCosigns { global_session } => {
|
||||||
|
self.inbound_request_response_channels.insert(request_id, channel);
|
||||||
|
let _: Result<_, _> = self.notable_cosign_requests.send((request_id, global_session));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reqres::Message::Response { request_id, response } => {
|
||||||
|
// Send Some(response) as the response for the request
|
||||||
|
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
|
||||||
|
let _: Result<_, _> = channel.send(Some(response));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
reqres::Event::OutboundFailure { request_id, .. } => {
|
||||||
|
// Send None as the response for the request
|
||||||
|
if let Some(channel) = self.outbound_request_responses.remove(&request_id) {
|
||||||
|
let _: Result<_, _> = channel.send(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
reqres::Event::InboundFailure { .. } | reqres::Event::ResponseSent { .. } => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn run(mut self) {
|
||||||
|
loop {
|
||||||
|
let time_till_rebuild_peers = self.rebuild_peers_at.saturating_duration_since(Instant::now());
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
// Dial peers we're instructed to
|
||||||
|
dial_opts = self.to_dial.recv() => {
|
||||||
|
let dial_opts = dial_opts.expect("DialTask was closed?");
|
||||||
|
let _: Result<_, _> = self.swarm.dial(dial_opts);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
Rebuild the peers every 10 minutes.
|
||||||
|
|
||||||
|
This protects against any race conditions/edge cases we have in our logic to track peers,
|
||||||
|
along with unrepresented behavior such as when a peer changes the networks they're active
|
||||||
|
in. This lets the peer tracking logic simply be 'good enough' to not become horribly
|
||||||
|
corrupt over the span of `TIME_BETWEEN_REBUILD_PEERS`.
|
||||||
|
|
||||||
|
We also use this to disconnect all peers who are no longer active in any network.
|
||||||
|
*/
|
||||||
|
() = tokio::time::sleep(time_till_rebuild_peers) => {
|
||||||
|
const TIME_BETWEEN_REBUILD_PEERS: Duration = Duration::from_secs(10 * 60);
|
||||||
|
|
||||||
|
let validators_by_network = self.validators.read().await.by_network().clone();
|
||||||
|
let connected_peers = self.swarm.connected_peers().copied().collect::<HashSet<_>>();
|
||||||
|
|
||||||
|
// We initially populate the list of peers to disconnect with all peers
|
||||||
|
let mut to_disconnect = connected_peers.clone();
|
||||||
|
|
||||||
|
// Build the new peers object
|
||||||
|
let mut peers = HashMap::new();
|
||||||
|
for (network, validators) in validators_by_network {
|
||||||
|
peers.insert(network, validators.intersection(&connected_peers).copied().collect());
|
||||||
|
|
||||||
|
// If this peer is in this validator set, don't keep it flagged for disconnection
|
||||||
|
to_disconnect.retain(|peer| !validators.contains(peer));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write the new peers object
|
||||||
|
*self.peers.peers.write().await = peers;
|
||||||
|
self.rebuild_peers_at = Instant::now() + TIME_BETWEEN_REBUILD_PEERS;
|
||||||
|
|
||||||
|
// Disconnect all peers marked for disconnection
|
||||||
|
for peer in to_disconnect {
|
||||||
|
let _: Result<_, _> = self.swarm.disconnect_peer_id(peer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle swarm events
|
||||||
|
event = self.swarm.next() => {
|
||||||
|
// `Swarm::next` will never return `Poll::Ready(None)`
|
||||||
|
// https://docs.rs/
|
||||||
|
// libp2p/0.54.1/libp2p/struct.Swarm.html#impl-Stream-for-Swarm%3CTBehaviour%3E
|
||||||
|
let event = event.unwrap();
|
||||||
|
match event {
|
||||||
|
// New connection, so update peers
|
||||||
|
SwarmEvent::ConnectionEstablished { peer_id, .. } => {
|
||||||
|
let Some(networks) =
|
||||||
|
self.validators.read().await.networks(&peer_id).cloned() else { continue };
|
||||||
|
let mut peers = self.peers.peers.write().await;
|
||||||
|
for network in networks {
|
||||||
|
peers.entry(network).or_insert_with(HashSet::new).insert(peer_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Connection closed, so update peers
|
||||||
|
SwarmEvent::ConnectionClosed { peer_id, .. } => {
|
||||||
|
let Some(networks) =
|
||||||
|
self.validators.read().await.networks(&peer_id).cloned() else { continue };
|
||||||
|
let mut peers = self.peers.peers.write().await;
|
||||||
|
for network in networks {
|
||||||
|
peers.entry(network).or_insert_with(HashSet::new).remove(&peer_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
We want to re-run the dial task, since we lost a peer, in case we should find new
|
||||||
|
peers. This opens a DoS where a validator repeatedly opens/closes connections to
|
||||||
|
force iterations of the dial task. We prevent this by setting a minimum distance
|
||||||
|
since the last explicit iteration.
|
||||||
|
|
||||||
|
This is suboptimal. If we have several disconnects in immediate proximity, we'll
|
||||||
|
trigger the dial task upon the first (where we may still have enough peers we
|
||||||
|
shouldn't dial more) but not the last (where we may have so few peers left we
|
||||||
|
should dial more). This is accepted as the dial task will eventually run on its
|
||||||
|
natural timer.
|
||||||
|
*/
|
||||||
|
const MINIMUM_TIME_SINCE_LAST_EXPLICIT_DIAL: Duration = Duration::from_secs(60);
|
||||||
|
let now = Instant::now();
|
||||||
|
if (self.last_dial_task_run + MINIMUM_TIME_SINCE_LAST_EXPLICIT_DIAL) < now {
|
||||||
|
self.dial_task.run_now();
|
||||||
|
self.last_dial_task_run = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SwarmEvent::Behaviour(BehaviorEvent::Reqres(event)) => {
|
||||||
|
self.handle_reqres(event)
|
||||||
|
}
|
||||||
|
SwarmEvent::Behaviour(BehaviorEvent::Gossip(event)) => {
|
||||||
|
self.handle_gossip(event)
|
||||||
|
}
|
||||||
|
|
||||||
|
// We don't handle any of these
|
||||||
|
SwarmEvent::IncomingConnection { .. } |
|
||||||
|
SwarmEvent::IncomingConnectionError { .. } |
|
||||||
|
SwarmEvent::OutgoingConnectionError { .. } |
|
||||||
|
SwarmEvent::NewListenAddr { .. } |
|
||||||
|
SwarmEvent::ExpiredListenAddr { .. } |
|
||||||
|
SwarmEvent::ListenerClosed { .. } |
|
||||||
|
SwarmEvent::ListenerError { .. } |
|
||||||
|
SwarmEvent::Dialing { .. } => {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message = self.gossip.recv() => {
|
||||||
|
let message = message.expect("channel for messages to gossip was closed?");
|
||||||
|
let topic = message.topic();
|
||||||
|
let message = borsh::to_vec(&message).unwrap();
|
||||||
|
let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message);
|
||||||
|
}
|
||||||
|
|
||||||
|
request = self.outbound_requests.recv() => {
|
||||||
|
let (peer, request, response_channel) =
|
||||||
|
request.expect("channel for requests was closed?");
|
||||||
|
let request_id = self.swarm.behaviour_mut().reqres.send_request(&peer, request);
|
||||||
|
self.outbound_request_responses.insert(request_id, response_channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
response = self.inbound_request_responses.recv() => {
|
||||||
|
let (request_id, response) =
|
||||||
|
response.expect("channel for inbound request responses was closed?");
|
||||||
|
if let Some(channel) = self.inbound_request_response_channels.remove(&request_id) {
|
||||||
|
let _: Result<_, _> =
|
||||||
|
self.swarm.behaviour_mut().reqres.send_response(channel, response);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -77,17 +77,16 @@ impl Validators {
|
|||||||
fn incorporate_session_changes(
|
fn incorporate_session_changes(
|
||||||
&mut self,
|
&mut self,
|
||||||
session_changes: Vec<(NetworkId, Session, HashSet<PeerId>)>,
|
session_changes: Vec<(NetworkId, Session, HashSet<PeerId>)>,
|
||||||
) -> HashSet<PeerId> {
|
) {
|
||||||
let mut removed = HashSet::new();
|
|
||||||
|
|
||||||
for (network, session, validators) in session_changes {
|
for (network, session, validators) in session_changes {
|
||||||
// Remove the existing validators
|
// Remove the existing validators
|
||||||
for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) {
|
for validator in self.by_network.remove(&network).unwrap_or_else(HashSet::new) {
|
||||||
|
// Get all networks this validator is in
|
||||||
let mut networks = self.validators.remove(&validator).unwrap();
|
let mut networks = self.validators.remove(&validator).unwrap();
|
||||||
|
// Remove this one
|
||||||
networks.remove(&network);
|
networks.remove(&network);
|
||||||
if networks.is_empty() {
|
// Insert the networks back if the validator was present in other networks
|
||||||
removed.insert(validator);
|
if !networks.is_empty() {
|
||||||
} else {
|
|
||||||
self.validators.insert(validator, networks);
|
self.validators.insert(validator, networks);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -101,16 +100,15 @@ impl Validators {
|
|||||||
// Update the session we have populated
|
// Update the session we have populated
|
||||||
self.sessions.insert(network, session);
|
self.sessions.insert(network, session);
|
||||||
}
|
}
|
||||||
|
|
||||||
removed
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Update the view of the validators.
|
/// Update the view of the validators.
|
||||||
///
|
///
|
||||||
/// Returns all validators removed from the active validator set.
|
/// Returns all validators removed from the active validator set.
|
||||||
pub(crate) async fn update(&mut self) -> Result<HashSet<PeerId>, String> {
|
pub(crate) async fn update(&mut self) -> Result<(), String> {
|
||||||
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
|
let session_changes = Self::session_changes(&self.serai, &self.sessions).await?;
|
||||||
Ok(self.incorporate_session_changes(session_changes))
|
self.incorporate_session_changes(session_changes);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn by_network(&self) -> &HashMap<NetworkId, HashSet<PeerId>> {
|
pub(crate) fn by_network(&self) -> &HashMap<NetworkId, HashSet<PeerId>> {
|
||||||
@@ -128,13 +126,17 @@ impl Validators {
|
|||||||
|
|
||||||
/// Update the view of the validators.
|
/// Update the view of the validators.
|
||||||
///
|
///
|
||||||
|
/// This minimizes the time an exclusive lock is held over the validators to minimize the
|
||||||
|
/// disruption to functioning.
|
||||||
|
///
|
||||||
/// Returns all validators removed from the active validator set.
|
/// Returns all validators removed from the active validator set.
|
||||||
pub(crate) async fn update_shared_validators(
|
pub(crate) async fn update_shared_validators(
|
||||||
validators: &Arc<RwLock<Validators>>,
|
validators: &Arc<RwLock<Validators>>,
|
||||||
) -> Result<HashSet<PeerId>, String> {
|
) -> Result<(), String> {
|
||||||
let session_changes = {
|
let session_changes = {
|
||||||
let validators = validators.read().await;
|
let validators = validators.read().await;
|
||||||
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
|
Validators::session_changes(validators.serai.clone(), validators.sessions.clone()).await?
|
||||||
};
|
};
|
||||||
Ok(validators.write().await.incorporate_session_changes(session_changes))
|
validators.write().await.incorporate_session_changes(session_changes);
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ workspace = true
|
|||||||
rand_core = { version = "0.6", default-features = false }
|
rand_core = { version = "0.6", default-features = false }
|
||||||
zeroize = { version = "1", default-features = false, features = ["std"] }
|
zeroize = { version = "1", default-features = false, features = ["std"] }
|
||||||
|
|
||||||
|
blake2 = { version = "0.10", default-features = false, features = ["std"] }
|
||||||
ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] }
|
ciphersuite = { path = "../../crypto/ciphersuite", default-features = false, features = ["std"] }
|
||||||
frost = { package = "modular-frost", path = "../../crypto/frost", default-features = false }
|
frost = { package = "modular-frost", path = "../../crypto/frost", default-features = false }
|
||||||
frost-schnorrkel = { path = "../../crypto/schnorrkel", default-features = false }
|
frost-schnorrkel = { path = "../../crypto/schnorrkel", default-features = false }
|
||||||
|
|||||||
@@ -5,8 +5,9 @@ use serai_db::{Get, DbTxn, create_db};
|
|||||||
|
|
||||||
create_db! {
|
create_db! {
|
||||||
SignersBatch {
|
SignersBatch {
|
||||||
ActiveSigningProtocols: (session: Session) -> Vec<u32>,
|
ActiveSigningProtocols: (session: Session) -> Vec<[u8; 32]>,
|
||||||
Batches: (id: u32) -> Batch,
|
BatchHash: (id: u32) -> [u8; 32],
|
||||||
|
Batches: (hash: [u8; 32]) -> Batch,
|
||||||
SignedBatches: (id: u32) -> SignedBatch,
|
SignedBatches: (id: u32) -> SignedBatch,
|
||||||
LastAcknowledgedBatch: () -> u32,
|
LastAcknowledgedBatch: () -> u32,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,9 +1,12 @@
|
|||||||
use core::future::Future;
|
use core::future::Future;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
|
|
||||||
|
use blake2::{digest::typenum::U32, Digest, Blake2b};
|
||||||
use ciphersuite::{group::GroupEncoding, Ristretto};
|
use ciphersuite::{group::GroupEncoding, Ristretto};
|
||||||
use frost::dkg::ThresholdKeys;
|
use frost::dkg::ThresholdKeys;
|
||||||
|
|
||||||
|
use scale::Encode;
|
||||||
|
|
||||||
use serai_validator_sets_primitives::Session;
|
use serai_validator_sets_primitives::Session;
|
||||||
use serai_in_instructions_primitives::{SignedBatch, batch_message};
|
use serai_in_instructions_primitives::{SignedBatch, batch_message};
|
||||||
|
|
||||||
@@ -40,7 +43,7 @@ pub(crate) struct BatchSignerTask<D: Db, E: GroupEncoding> {
|
|||||||
external_key: E,
|
external_key: E,
|
||||||
keys: Vec<ThresholdKeys<Ristretto>>,
|
keys: Vec<ThresholdKeys<Ristretto>>,
|
||||||
|
|
||||||
active_signing_protocols: HashSet<u32>,
|
active_signing_protocols: HashSet<[u8; 32]>,
|
||||||
attempt_manager: AttemptManager<D, WrappedSchnorrkelMachine>,
|
attempt_manager: AttemptManager<D, WrappedSchnorrkelMachine>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,7 +66,6 @@ impl<D: Db, E: GroupEncoding> BatchSignerTask<D, E> {
|
|||||||
active_signing_protocols.insert(id);
|
active_signing_protocols.insert(id);
|
||||||
|
|
||||||
let batch = Batches::get(&db, id).unwrap();
|
let batch = Batches::get(&db, id).unwrap();
|
||||||
assert_eq!(batch.id, id);
|
|
||||||
|
|
||||||
let mut machines = Vec::with_capacity(keys.len());
|
let mut machines = Vec::with_capacity(keys.len());
|
||||||
for keys in &keys {
|
for keys in &keys {
|
||||||
@@ -90,19 +92,21 @@ impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
|
|||||||
iterated = true;
|
iterated = true;
|
||||||
|
|
||||||
// Save this to the database as a transaction to sign
|
// Save this to the database as a transaction to sign
|
||||||
self.active_signing_protocols.insert(batch.id);
|
let batch_hash = <[u8; 32]>::from(Blake2b::<U32>::digest(batch.encode()));
|
||||||
|
self.active_signing_protocols.insert(batch_hash);
|
||||||
ActiveSigningProtocols::set(
|
ActiveSigningProtocols::set(
|
||||||
&mut txn,
|
&mut txn,
|
||||||
self.session,
|
self.session,
|
||||||
&self.active_signing_protocols.iter().copied().collect(),
|
&self.active_signing_protocols.iter().copied().collect(),
|
||||||
);
|
);
|
||||||
Batches::set(&mut txn, batch.id, &batch);
|
BatchHash::set(&mut txn, batch.id, &batch_hash);
|
||||||
|
Batches::set(&mut txn, batch_hash, &batch);
|
||||||
|
|
||||||
let mut machines = Vec::with_capacity(self.keys.len());
|
let mut machines = Vec::with_capacity(self.keys.len());
|
||||||
for keys in &self.keys {
|
for keys in &self.keys {
|
||||||
machines.push(WrappedSchnorrkelMachine::new(keys.clone(), batch_message(&batch)));
|
machines.push(WrappedSchnorrkelMachine::new(keys.clone(), batch_message(&batch)));
|
||||||
}
|
}
|
||||||
for msg in self.attempt_manager.register(VariantSignId::Batch(batch.id), machines) {
|
for msg in self.attempt_manager.register(VariantSignId::Batch(batch_hash), machines) {
|
||||||
BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
|
BatchSignerToCoordinatorMessages::send(&mut txn, self.session, &msg);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -112,48 +116,57 @@ impl<D: Db, E: Send + GroupEncoding> ContinuallyRan for BatchSignerTask<D, E> {
|
|||||||
// Check for acknowledged Batches (meaning we should no longer sign for these Batches)
|
// Check for acknowledged Batches (meaning we should no longer sign for these Batches)
|
||||||
loop {
|
loop {
|
||||||
let mut txn = self.db.txn();
|
let mut txn = self.db.txn();
|
||||||
let Some(id) = AcknowledgedBatches::try_recv(&mut txn, &self.external_key) else {
|
let batch_hash = {
|
||||||
break;
|
let Some(batch_id) = AcknowledgedBatches::try_recv(&mut txn, &self.external_key) else {
|
||||||
};
|
break;
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
We may have yet to register this signing protocol.
|
||||||
|
|
||||||
|
While `BatchesToSign` is populated before `AcknowledgedBatches`, we could theoretically
|
||||||
|
have `BatchesToSign` populated with a new batch _while iterating over
|
||||||
|
`AcknowledgedBatches`_, and then have `AcknowledgedBatched` populated. In that edge
|
||||||
|
case, we will see the acknowledgement notification before we see the transaction.
|
||||||
|
|
||||||
|
In such a case, we break (dropping the txn, re-queueing the acknowledgement
|
||||||
|
notification). On the task's next iteration, we'll process the Batch from
|
||||||
|
`BatchesToSign` and be able to make progress.
|
||||||
|
*/
|
||||||
|
let Some(batch_hash) = BatchHash::take(&mut txn, batch_id) else {
|
||||||
|
drop(txn);
|
||||||
|
break;
|
||||||
|
};
|
||||||
|
batch_hash
|
||||||
|
};
|
||||||
|
let batch =
|
||||||
|
Batches::take(&mut txn, batch_hash).expect("BatchHash populated but not Batches");
|
||||||
|
|
||||||
|
iterated = true;
|
||||||
|
|
||||||
|
// Update the last acknowledged Batch
|
||||||
{
|
{
|
||||||
let last_acknowledged = LastAcknowledgedBatch::get(&txn);
|
let last_acknowledged = LastAcknowledgedBatch::get(&txn);
|
||||||
if Some(id) > last_acknowledged {
|
if Some(batch.id) > last_acknowledged {
|
||||||
LastAcknowledgedBatch::set(&mut txn, &id);
|
LastAcknowledgedBatch::set(&mut txn, &batch.id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
// Remove this as an active signing protocol
|
||||||
We may have yet to register this signing protocol.
|
assert!(self.active_signing_protocols.remove(&batch_hash));
|
||||||
|
|
||||||
While `BatchesToSign` is populated before `AcknowledgedBatches`, we could theoretically
|
|
||||||
have `BatchesToSign` populated with a new batch _while iterating over
|
|
||||||
`AcknowledgedBatches`_, and then have `AcknowledgedBatched` populated. In that edge case,
|
|
||||||
we will see the acknowledgement notification before we see the transaction.
|
|
||||||
|
|
||||||
In such a case, we break (dropping the txn, re-queueing the acknowledgement notification).
|
|
||||||
On the task's next iteration, we'll process the Batch from `BatchesToSign` and be
|
|
||||||
able to make progress.
|
|
||||||
*/
|
|
||||||
if !self.active_signing_protocols.remove(&id) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
iterated = true;
|
|
||||||
|
|
||||||
// Since it was, remove this as an active signing protocol
|
|
||||||
ActiveSigningProtocols::set(
|
ActiveSigningProtocols::set(
|
||||||
&mut txn,
|
&mut txn,
|
||||||
self.session,
|
self.session,
|
||||||
&self.active_signing_protocols.iter().copied().collect(),
|
&self.active_signing_protocols.iter().copied().collect(),
|
||||||
);
|
);
|
||||||
// Clean up the database
|
|
||||||
Batches::del(&mut txn, id);
|
// Clean up SignedBatches
|
||||||
SignedBatches::del(&mut txn, id);
|
SignedBatches::del(&mut txn, batch.id);
|
||||||
|
|
||||||
// We retire with a txn so we either successfully flag this Batch as acknowledged, and
|
// We retire with a txn so we either successfully flag this Batch as acknowledged, and
|
||||||
// won't re-register it (making this retire safe), or we don't flag it, meaning we will
|
// won't re-register it (making this retire safe), or we don't flag it, meaning we will
|
||||||
// re-register it, yet that's safe as we have yet to retire it
|
// re-register it, yet that's safe as we have yet to retire it
|
||||||
self.attempt_manager.retire(&mut txn, VariantSignId::Batch(id));
|
self.attempt_manager.retire(&mut txn, VariantSignId::Batch(batch_hash));
|
||||||
|
|
||||||
txn.commit();
|
txn.commit();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -143,7 +143,7 @@ impl<D: Db, C: Coordinator> ContinuallyRan for CoordinatorTask<D, C> {
|
|||||||
// the prior Batch(es) (and accordingly didn't publish them)
|
// the prior Batch(es) (and accordingly didn't publish them)
|
||||||
let last_batch =
|
let last_batch =
|
||||||
crate::batch::last_acknowledged_batch(&txn).max(db::LastPublishedBatch::get(&txn));
|
crate::batch::last_acknowledged_batch(&txn).max(db::LastPublishedBatch::get(&txn));
|
||||||
let mut next_batch = last_batch.map_or(0, |id| id + 1);
|
let mut next_batch = last_batch.map(|id| id + 1).unwrap_or(0);
|
||||||
while let Some(batch) = crate::batch::signed_batch(&txn, next_batch) {
|
while let Some(batch) = crate::batch::signed_batch(&txn, next_batch) {
|
||||||
iterated = true;
|
iterated = true;
|
||||||
db::LastPublishedBatch::set(&mut txn, &batch.batch.id);
|
db::LastPublishedBatch::set(&mut txn, &batch.batch.id);
|
||||||
|
|||||||
Reference in New Issue
Block a user