mirror of
https://github.com/serai-dex/serai.git
synced 2025-12-14 23:19:24 +00:00
Compare commits
5 Commits
ce676efb1f
...
985261574c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
985261574c | ||
|
|
3f3b0255f8 | ||
|
|
5fc8500f8d | ||
|
|
49c221cca2 | ||
|
|
906e2fb669 |
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -8328,6 +8328,7 @@ dependencies = [
|
|||||||
"rand_core",
|
"rand_core",
|
||||||
"schnorr-signatures",
|
"schnorr-signatures",
|
||||||
"serai-client",
|
"serai-client",
|
||||||
|
"serai-cosign",
|
||||||
"serai-db",
|
"serai-db",
|
||||||
"serai-env",
|
"serai-env",
|
||||||
"serai-message-queue",
|
"serai-message-queue",
|
||||||
|
|||||||
@@ -56,6 +56,8 @@ futures-util = { version = "0.3", default-features = false, features = ["std"] }
|
|||||||
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
|
tokio = { version = "1", default-features = false, features = ["rt-multi-thread", "sync", "time", "macros"] }
|
||||||
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] }
|
libp2p = { version = "0.52", default-features = false, features = ["tokio", "tcp", "noise", "yamux", "request-response", "gossipsub", "macros"] }
|
||||||
|
|
||||||
|
serai-cosign = { path = "./cosign" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] }
|
tributary = { package = "tributary-chain", path = "./tributary", features = ["tests"] }
|
||||||
sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] }
|
sp-application-crypto = { git = "https://github.com/serai-dex/substrate", default-features = false, features = ["std"] }
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
mod tributary;
|
mod tributary;
|
||||||
|
mod p2p;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
todo!("TODO")
|
todo!("TODO")
|
||||||
|
|||||||
70
coordinator/src/p2p/gossip.rs
Normal file
70
coordinator/src/p2p/gossip.rs
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
use core::time::Duration;
|
||||||
|
|
||||||
|
use blake2::{Digest, Blake2s256};
|
||||||
|
|
||||||
|
use scale::Encode;
|
||||||
|
use borsh::{BorshSerialize, BorshDeserialize};
|
||||||
|
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||||
|
|
||||||
|
use libp2p::gossipsub::{
|
||||||
|
IdentTopic, MessageId, MessageAuthenticity, ValidationMode, ConfigBuilder, IdentityTransform,
|
||||||
|
AllowAllSubscriptionFilter, Behaviour,
|
||||||
|
};
|
||||||
|
|
||||||
|
use serai_cosign::SignedCosign;
|
||||||
|
|
||||||
|
// Block size limit + 16 KB of space for signatures/metadata
|
||||||
|
const MAX_LIBP2P_GOSSIP_MESSAGE_SIZE: usize = tributary::BLOCK_SIZE_LIMIT + 16384;
|
||||||
|
|
||||||
|
const KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(80);
|
||||||
|
|
||||||
|
const LIBP2P_PROTOCOL: &str = "/serai/coordinator/gossip/1.0.0";
|
||||||
|
const BASE_TOPIC: &str = "/";
|
||||||
|
|
||||||
|
fn topic_for_set(set: ValidatorSet) -> IdentTopic {
|
||||||
|
IdentTopic::new(format!("/set/{}", hex::encode(set.encode())))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||||
|
pub(crate) enum Message {
|
||||||
|
Tribuary { genesis: [u8; 32], message: Vec<u8> },
|
||||||
|
Cosign(SignedCosign),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) type Behavior = Behaviour<IdentityTransform, AllowAllSubscriptionFilter>;
|
||||||
|
|
||||||
|
pub(crate) fn new_behavior() -> Behavior {
|
||||||
|
// The latency used by the Tendermint protocol, used here as the gossip epoch duration
|
||||||
|
// libp2p-rs defaults to 1 second, whereas ours will be ~2
|
||||||
|
let heartbeat_interval = tributary::tendermint::LATENCY_TIME;
|
||||||
|
// The amount of heartbeats which will occur within a single Tributary block
|
||||||
|
let heartbeats_per_block = tributary::tendermint::TARGET_BLOCK_TIME.div_ceil(heartbeat_interval);
|
||||||
|
// libp2p-rs defaults to 5, whereas ours will be ~8
|
||||||
|
let heartbeats_to_keep = 2 * heartbeats_per_block;
|
||||||
|
// libp2p-rs defaults to 3 whereas ours will be ~4
|
||||||
|
let heartbeats_to_gossip = heartbeats_per_block;
|
||||||
|
|
||||||
|
let config = ConfigBuilder::default()
|
||||||
|
.protocol_id_prefix(LIBP2P_PROTOCOL)
|
||||||
|
.history_length(usize::try_from(heartbeats_to_keep).unwrap())
|
||||||
|
.history_gossip(usize::try_from(heartbeats_to_gossip).unwrap())
|
||||||
|
.heartbeat_interval(Duration::from_millis(heartbeat_interval.into()))
|
||||||
|
.max_transmit_size(MAX_LIBP2P_GOSSIP_MESSAGE_SIZE)
|
||||||
|
.idle_timeout(KEEP_ALIVE_INTERVAL + Duration::from_secs(5))
|
||||||
|
.duplicate_cache_time(Duration::from_millis((heartbeats_to_keep * heartbeat_interval).into()))
|
||||||
|
.validation_mode(ValidationMode::Anonymous)
|
||||||
|
// Uses a content based message ID to avoid duplicates as much as possible
|
||||||
|
.message_id_fn(|msg| {
|
||||||
|
MessageId::new(&Blake2s256::digest([msg.topic.as_str().as_bytes(), &msg.data].concat()))
|
||||||
|
})
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// TODO: Don't use IdentityTransform here. Authenticate using validator keys
|
||||||
|
let mut gossipsub = Behavior::new(MessageAuthenticity::Anonymous, config.unwrap()).unwrap();
|
||||||
|
|
||||||
|
// Subscribe to the base topic
|
||||||
|
let topic = IdentTopic::new(BASE_TOPIC);
|
||||||
|
let _ = gossipsub.subscribe(&topic);
|
||||||
|
|
||||||
|
gossipsub
|
||||||
|
}
|
||||||
127
coordinator/src/p2p/heartbeat.rs
Normal file
127
coordinator/src/p2p/heartbeat.rs
Normal file
@@ -0,0 +1,127 @@
|
|||||||
|
use core::future::Future;
|
||||||
|
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||||
|
|
||||||
|
use tributary::{ReadWrite, Block, Tributary, TributaryReader};
|
||||||
|
|
||||||
|
use serai_db::*;
|
||||||
|
use serai_task::ContinuallyRan;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
tributary::Transaction,
|
||||||
|
p2p::{
|
||||||
|
reqres::{Request, Response},
|
||||||
|
P2p,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
// Amount of blocks in a minute
|
||||||
|
const BLOCKS_PER_MINUTE: usize = (60 / (tributary::tendermint::TARGET_BLOCK_TIME / 1000)) as usize;
|
||||||
|
|
||||||
|
// Maximum amount of blocks to send in a batch of blocks
|
||||||
|
pub const BLOCKS_PER_BATCH: usize = BLOCKS_PER_MINUTE + 1;
|
||||||
|
|
||||||
|
/// Sends a heartbeat to other validators on regular intervals informing them of our Tributary's
|
||||||
|
/// tip.
|
||||||
|
///
|
||||||
|
/// If the other validator has more blocks then we do, they're expected to inform us. This forms
|
||||||
|
/// the sync protocol for our Tributaries.
|
||||||
|
struct HeartbeatTask<TD: Db> {
|
||||||
|
set: ValidatorSet,
|
||||||
|
tributary: Tributary<TD, Transaction, P2p>,
|
||||||
|
reader: TributaryReader<TD, Transaction>,
|
||||||
|
p2p: P2p,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<TD: Db> ContinuallyRan for HeartbeatTask<TD> {
|
||||||
|
fn run_iteration(&mut self) -> impl Send + Future<Output = Result<bool, String>> {
|
||||||
|
async move {
|
||||||
|
// If our blockchain hasn't had a block in the past minute, trigger the heartbeat protocol
|
||||||
|
const TIME_TO_TRIGGER_SYNCING: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
|
let mut tip = self.reader.tip();
|
||||||
|
let time_since = {
|
||||||
|
let block_time = if let Some(time_of_block) = self.reader.time_of_block(&tip) {
|
||||||
|
SystemTime::UNIX_EPOCH + Duration::from_secs(time_of_block)
|
||||||
|
} else {
|
||||||
|
// If we couldn't fetch this block's time, assume it's old
|
||||||
|
// We don't want to declare its unix time as 0 and claim it's 50+ years old though
|
||||||
|
log::warn!(
|
||||||
|
"heartbeat task couldn't fetch the time of a block, flagging it as a minute old"
|
||||||
|
);
|
||||||
|
SystemTime::now() - TIME_TO_TRIGGER_SYNCING
|
||||||
|
};
|
||||||
|
SystemTime::now().duration_since(block_time).unwrap_or(Duration::ZERO)
|
||||||
|
};
|
||||||
|
let mut tip_is_stale = false;
|
||||||
|
|
||||||
|
let mut synced_block = false;
|
||||||
|
if TIME_TO_TRIGGER_SYNCING <= time_since {
|
||||||
|
log::warn!(
|
||||||
|
"last known tributary block for {:?} was {} seconds ago",
|
||||||
|
self.set,
|
||||||
|
time_since.as_secs()
|
||||||
|
);
|
||||||
|
|
||||||
|
// This requests all peers for this network, without differentiating by session
|
||||||
|
// This should be fine as most validators should overlap across sessions
|
||||||
|
'peer: for peer in self.p2p.peers(self.set.network).await {
|
||||||
|
loop {
|
||||||
|
// Create the request for blocks
|
||||||
|
if tip_is_stale {
|
||||||
|
tip = self.reader.tip();
|
||||||
|
tip_is_stale = false;
|
||||||
|
}
|
||||||
|
let request = Request::Heartbeat { set: self.set, latest_block_hash: tip };
|
||||||
|
let Ok(Response::Blocks(blocks)) = peer.send(request).await else { continue 'peer };
|
||||||
|
|
||||||
|
// This is the final batch if it has less than the maximum amount of blocks
|
||||||
|
// (signifying there weren't more blocks after this to fill the batch with)
|
||||||
|
let final_batch = blocks.len() < BLOCKS_PER_BATCH;
|
||||||
|
|
||||||
|
// Sync each block
|
||||||
|
for block_with_commit in blocks {
|
||||||
|
let Ok(block) = Block::read(&mut block_with_commit.block.as_slice()) else {
|
||||||
|
// TODO: Disconnect/slash this peer
|
||||||
|
log::warn!("received invalid Block inside response to heartbeat");
|
||||||
|
continue 'peer;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Attempt to sync the block
|
||||||
|
if !self.tributary.sync_block(block, block_with_commit.commit).await {
|
||||||
|
// The block may be invalid or may simply be stale
|
||||||
|
continue 'peer;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Because we synced a block, flag the tip as stale
|
||||||
|
tip_is_stale = true;
|
||||||
|
// And that we did sync a block
|
||||||
|
synced_block = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// If this was the final batch, move on from this peer
|
||||||
|
// We could assume they were honest and we are done syncing the chain, but this is a
|
||||||
|
// bit more robust
|
||||||
|
if final_batch {
|
||||||
|
continue 'peer;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will cause the tak to be run less and less often, ensuring we aren't spamming the
|
||||||
|
// net if we legitimately aren't making progress
|
||||||
|
if !synced_block {
|
||||||
|
Err(format!(
|
||||||
|
"tried to sync blocks for {:?} since we haven't seen one in {} seconds but didn't",
|
||||||
|
self.set,
|
||||||
|
time_since.as_secs(),
|
||||||
|
))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(synced_block)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
30
coordinator/src/p2p/mod.rs
Normal file
30
coordinator/src/p2p/mod.rs
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
use serai_client::primitives::NetworkId;
|
||||||
|
|
||||||
|
mod reqres;
|
||||||
|
use reqres::{Request, Response};
|
||||||
|
|
||||||
|
mod gossip;
|
||||||
|
|
||||||
|
mod heartbeat;
|
||||||
|
|
||||||
|
struct Peer;
|
||||||
|
impl Peer {
|
||||||
|
async fn send(&self, request: Request) -> Result<Response, tokio::time::error::Elapsed> {
|
||||||
|
(async move { todo!("TODO") }).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct P2p;
|
||||||
|
impl P2p {
|
||||||
|
async fn peers(&self, set: NetworkId) -> Vec<Peer> {
|
||||||
|
(async move { todo!("TODO") }).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl tributary::P2p for P2p {
|
||||||
|
async fn broadcast(&self, genesis: [u8; 32], msg: Vec<u8>) {
|
||||||
|
todo!("TODO")
|
||||||
|
}
|
||||||
|
}
|
||||||
126
coordinator/src/p2p/reqres.rs
Normal file
126
coordinator/src/p2p/reqres.rs
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
use core::time::Duration;
|
||||||
|
use std::io::{self, Read};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
use borsh::{BorshSerialize, BorshDeserialize};
|
||||||
|
use serai_client::validator_sets::primitives::ValidatorSet;
|
||||||
|
|
||||||
|
use futures_util::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
|
||||||
|
|
||||||
|
use libp2p::request_response::{Codec as CodecTrait, Config, Behaviour, ProtocolSupport};
|
||||||
|
|
||||||
|
use serai_cosign::SignedCosign;
|
||||||
|
|
||||||
|
/// The maximum message size for the request-response protocol
|
||||||
|
// This is derived from the heartbeat message size as it's our largest message
|
||||||
|
const MAX_LIBP2P_REQRES_MESSAGE_SIZE: usize =
|
||||||
|
(tributary::BLOCK_SIZE_LIMIT * crate::p2p::heartbeat::BLOCKS_PER_BATCH) + 1024;
|
||||||
|
|
||||||
|
const PROTOCOL: &str = "/serai/coordinator/reqres/1.0.0";
|
||||||
|
|
||||||
|
/// Requests which can be made via the request-response protocol.
|
||||||
|
#[derive(Clone, Copy, Debug, BorshSerialize, BorshDeserialize)]
|
||||||
|
pub(crate) enum Request {
|
||||||
|
/// A keep-alive to prevent our connections from being dropped.
|
||||||
|
KeepAlive,
|
||||||
|
/// A heartbeat informing our peers of our latest block, for the specified blockchain, on regular
|
||||||
|
/// intervals.
|
||||||
|
///
|
||||||
|
/// If our peers have more blocks than us, they're expected to respond with those blocks.
|
||||||
|
Heartbeat { set: ValidatorSet, latest_block_hash: [u8; 32] },
|
||||||
|
/// A request for the notable cosigns for a global session.
|
||||||
|
NotableCosigns { global_session: [u8; 32] },
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A tributary block and its commit.
|
||||||
|
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||||
|
pub(crate) struct TributaryBlockWithCommit {
|
||||||
|
pub(crate) block: Vec<u8>,
|
||||||
|
pub(crate) commit: Vec<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Responses which can be received via the request-response protocol.
|
||||||
|
#[derive(Clone, BorshSerialize, BorshDeserialize)]
|
||||||
|
pub(crate) enum Response {
|
||||||
|
Blocks(Vec<TributaryBlockWithCommit>),
|
||||||
|
NotableCosigns(Vec<SignedCosign>),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The codec used for the request-response protocol.
|
||||||
|
///
|
||||||
|
/// We don't use CBOR or JSON, but use borsh to create `Vec<u8>`s we then length-prefix. While
|
||||||
|
/// ideally, we'd use borsh directly with the `io` traits defined here, they're async and there
|
||||||
|
/// isn't an amenable API within borsh for incremental deserialization.
|
||||||
|
#[derive(Default, Clone, Copy, Debug)]
|
||||||
|
struct Codec;
|
||||||
|
impl Codec {
|
||||||
|
async fn read<M: BorshDeserialize>(io: &mut (impl Unpin + AsyncRead)) -> io::Result<M> {
|
||||||
|
let mut len = [0; 4];
|
||||||
|
io.read_exact(&mut len).await?;
|
||||||
|
let len = usize::try_from(u32::from_le_bytes(len)).expect("not at least a 32-bit platform?");
|
||||||
|
if len > MAX_LIBP2P_REQRES_MESSAGE_SIZE {
|
||||||
|
Err(io::Error::other("request length exceeded MAX_LIBP2P_REQRES_MESSAGE_SIZE"))?;
|
||||||
|
}
|
||||||
|
// This may be a non-trivial allocation easily causable
|
||||||
|
// While we could chunk the read, meaning we only perform the allocation as bandwidth is used,
|
||||||
|
// the max message size should be sufficiently sane
|
||||||
|
let mut buf = vec![0; len];
|
||||||
|
io.read_exact(&mut buf).await?;
|
||||||
|
let mut buf = buf.as_slice();
|
||||||
|
let res = M::deserialize(&mut buf)?;
|
||||||
|
if !buf.is_empty() {
|
||||||
|
Err(io::Error::other("p2p message had extra data appended to it"))?;
|
||||||
|
}
|
||||||
|
Ok(res)
|
||||||
|
}
|
||||||
|
async fn write(io: &mut (impl Unpin + AsyncWrite), msg: &impl BorshSerialize) -> io::Result<()> {
|
||||||
|
let msg = borsh::to_vec(msg).unwrap();
|
||||||
|
io.write_all(&u32::try_from(msg.len()).unwrap().to_le_bytes()).await?;
|
||||||
|
io.write_all(&msg).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[async_trait]
|
||||||
|
impl CodecTrait for Codec {
|
||||||
|
type Protocol = &'static str;
|
||||||
|
type Request = Request;
|
||||||
|
type Response = Response;
|
||||||
|
|
||||||
|
async fn read_request<R: Send + Unpin + AsyncRead>(
|
||||||
|
&mut self,
|
||||||
|
_: &Self::Protocol,
|
||||||
|
io: &mut R,
|
||||||
|
) -> io::Result<Request> {
|
||||||
|
Self::read(io).await
|
||||||
|
}
|
||||||
|
async fn read_response<R: Send + Unpin + AsyncRead>(
|
||||||
|
&mut self,
|
||||||
|
proto: &Self::Protocol,
|
||||||
|
io: &mut R,
|
||||||
|
) -> io::Result<Response> {
|
||||||
|
Self::read(io).await
|
||||||
|
}
|
||||||
|
async fn write_request<W: Send + Unpin + AsyncWrite>(
|
||||||
|
&mut self,
|
||||||
|
_: &Self::Protocol,
|
||||||
|
io: &mut W,
|
||||||
|
req: Request,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
Self::write(io, &req).await
|
||||||
|
}
|
||||||
|
async fn write_response<W: Send + Unpin + AsyncWrite>(
|
||||||
|
&mut self,
|
||||||
|
proto: &Self::Protocol,
|
||||||
|
io: &mut W,
|
||||||
|
res: Response,
|
||||||
|
) -> io::Result<()> {
|
||||||
|
Self::write(io, &res).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) type Behavior = Behaviour<Codec>;
|
||||||
|
pub(crate) fn new_behavior() -> Behavior {
|
||||||
|
let mut config = Config::default();
|
||||||
|
config.set_request_timeout(Duration::from_secs(5));
|
||||||
|
Behavior::new([(PROTOCOL, ProtocolSupport::Full)], config)
|
||||||
|
}
|
||||||
@@ -189,6 +189,8 @@ create_db!(
|
|||||||
|
|
||||||
// The latest Substrate block to cosign.
|
// The latest Substrate block to cosign.
|
||||||
LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32],
|
LatestSubstrateBlockToCosign: (set: ValidatorSet) -> [u8; 32],
|
||||||
|
// If we're actively cosigning or not.
|
||||||
|
ActivelyCosigning: (set: ValidatorSet) -> (),
|
||||||
|
|
||||||
// The weight accumulated for a topic.
|
// The weight accumulated for a topic.
|
||||||
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
|
AccumulatedWeight: (set: ValidatorSet, topic: Topic) -> u64,
|
||||||
@@ -236,6 +238,33 @@ impl TributaryDb {
|
|||||||
) {
|
) {
|
||||||
LatestSubstrateBlockToCosign::set(txn, set, &substrate_block_hash);
|
LatestSubstrateBlockToCosign::set(txn, set, &substrate_block_hash);
|
||||||
}
|
}
|
||||||
|
pub(crate) fn actively_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) -> bool {
|
||||||
|
ActivelyCosigning::get(txn, set).is_some()
|
||||||
|
}
|
||||||
|
pub(crate) fn start_cosigning(
|
||||||
|
txn: &mut impl DbTxn,
|
||||||
|
set: ValidatorSet,
|
||||||
|
substrate_block_number: u64,
|
||||||
|
) {
|
||||||
|
assert!(
|
||||||
|
ActivelyCosigning::get(txn, set).is_none(),
|
||||||
|
"starting cosigning while already cosigning"
|
||||||
|
);
|
||||||
|
ActivelyCosigning::set(txn, set, &());
|
||||||
|
|
||||||
|
TributaryDb::recognize_topic(
|
||||||
|
txn,
|
||||||
|
set,
|
||||||
|
Topic::Sign {
|
||||||
|
id: VariantSignId::Cosign(substrate_block_number),
|
||||||
|
attempt: 0,
|
||||||
|
round: SigningProtocolRound::Preprocess,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
pub(crate) fn finish_cosigning(txn: &mut impl DbTxn, set: ValidatorSet) {
|
||||||
|
assert!(ActivelyCosigning::take(txn, set).is_some(), "finished cosigning but not cosigning");
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) {
|
pub(crate) fn recognize_topic(txn: &mut impl DbTxn, set: ValidatorSet, topic: Topic) {
|
||||||
AccumulatedWeight::set(txn, set, topic, &0);
|
AccumulatedWeight::set(txn, set, topic, &0);
|
||||||
|
|||||||
@@ -36,6 +36,34 @@ struct ScanBlock<'a, D: DbTxn, TD: Db> {
|
|||||||
tributary: &'a TributaryReader<TD, Transaction>,
|
tributary: &'a TributaryReader<TD, Transaction>,
|
||||||
}
|
}
|
||||||
impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
|
impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
|
||||||
|
fn potentially_start_cosign(&mut self) {
|
||||||
|
// Don't start a new cosigning instance if we're actively running one
|
||||||
|
if TributaryDb::actively_cosigning(self.txn, self.set) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start cosigning the latest intended-to-be-cosigned block
|
||||||
|
let Some(latest_substrate_block_to_cosign) =
|
||||||
|
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set)
|
||||||
|
else {
|
||||||
|
return;
|
||||||
|
};
|
||||||
|
|
||||||
|
let substrate_block_number = todo!("TODO");
|
||||||
|
|
||||||
|
// Mark us as actively cosigning
|
||||||
|
TributaryDb::start_cosigning(self.txn, self.set, substrate_block_number);
|
||||||
|
// Send the message for the processor to start signing
|
||||||
|
TributaryDb::send_message(
|
||||||
|
self.txn,
|
||||||
|
self.set,
|
||||||
|
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
|
||||||
|
session: self.set.session,
|
||||||
|
block_number: substrate_block_number,
|
||||||
|
block: latest_substrate_block_to_cosign,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) {
|
fn handle_application_tx(&mut self, block_number: u64, tx: Transaction) {
|
||||||
let signer = |signed: Signed| SeraiAddress(signed.signer.to_bytes());
|
let signer = |signed: Signed| SeraiAddress(signed.signer.to_bytes());
|
||||||
|
|
||||||
@@ -105,41 +133,25 @@ impl<'a, D: DbTxn, TD: Db> ScanBlock<'a, D, TD> {
|
|||||||
Transaction::Cosign { substrate_block_hash } => {
|
Transaction::Cosign { substrate_block_hash } => {
|
||||||
// Update the latest intended-to-be-cosigned Substrate block
|
// Update the latest intended-to-be-cosigned Substrate block
|
||||||
TributaryDb::set_latest_substrate_block_to_cosign(self.txn, self.set, substrate_block_hash);
|
TributaryDb::set_latest_substrate_block_to_cosign(self.txn, self.set, substrate_block_hash);
|
||||||
|
// Start a new cosign if we weren't already working on one
|
||||||
// TODO: If we aren't currently cosigning a block, start cosigning this one
|
self.potentially_start_cosign();
|
||||||
}
|
}
|
||||||
Transaction::Cosigned { substrate_block_hash } => {
|
Transaction::Cosigned { substrate_block_hash } => {
|
||||||
// Start cosigning the latest intended-to-be-cosigned block
|
TributaryDb::finish_cosigning(self.txn, self.set);
|
||||||
|
|
||||||
|
// Fetch the latest intended-to-be-cosigned block
|
||||||
let Some(latest_substrate_block_to_cosign) =
|
let Some(latest_substrate_block_to_cosign) =
|
||||||
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set)
|
TributaryDb::latest_substrate_block_to_cosign(self.txn, self.set)
|
||||||
else {
|
else {
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
// If this is the block we just cosigned, return
|
// If this is the block we just cosigned, return, preventing us from signing it again
|
||||||
if latest_substrate_block_to_cosign == substrate_block_hash {
|
if latest_substrate_block_to_cosign == substrate_block_hash {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let substrate_block_number = todo!("TODO");
|
|
||||||
// Whitelist the topic
|
// Since we do have a new cosign to work on, start it
|
||||||
TributaryDb::recognize_topic(
|
self.potentially_start_cosign();
|
||||||
self.txn,
|
|
||||||
self.set,
|
|
||||||
Topic::Sign {
|
|
||||||
id: VariantSignId::Cosign(substrate_block_number),
|
|
||||||
attempt: 0,
|
|
||||||
round: SigningProtocolRound::Preprocess,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
// Send the message for the processor to start signing
|
|
||||||
TributaryDb::send_message(
|
|
||||||
self.txn,
|
|
||||||
self.set,
|
|
||||||
messages::coordinator::CoordinatorMessage::CosignSubstrateBlock {
|
|
||||||
session: self.set.session,
|
|
||||||
block_number: substrate_block_number,
|
|
||||||
block: substrate_block_hash,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
Transaction::SubstrateBlock { hash } => {
|
Transaction::SubstrateBlock { hash } => {
|
||||||
// Whitelist all of the IDs this Substrate block causes to be signed
|
// Whitelist all of the IDs this Substrate block causes to be signed
|
||||||
|
|||||||
@@ -43,12 +43,14 @@ impl SigningProtocolRound {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// `tributary::Signed` without the nonce.
|
/// `tributary::Signed` but without the nonce.
|
||||||
///
|
///
|
||||||
/// All of our nonces are deterministic to the type of transaction and fields within.
|
/// All of our nonces are deterministic to the type of transaction and fields within.
|
||||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||||
pub struct Signed {
|
pub struct Signed {
|
||||||
|
/// The signer.
|
||||||
pub signer: <Ristretto as Ciphersuite>::G,
|
pub signer: <Ristretto as Ciphersuite>::G,
|
||||||
|
/// The signature.
|
||||||
pub signature: SchnorrSignature<Ristretto>,
|
pub signature: SchnorrSignature<Ristretto>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user