diff --git a/coordinator/src/p2p/libp2p/gossip.rs b/coordinator/src/p2p/libp2p/gossip.rs index 4d75d9ea..db5af299 100644 --- a/coordinator/src/p2p/libp2p/gossip.rs +++ b/coordinator/src/p2p/libp2p/gossip.rs @@ -31,10 +31,10 @@ pub(crate) enum Message { } impl Message { - pub(crate) fn topic(&self) -> TopicHash { + pub(crate) fn topic(&self) -> IdentTopic { match self { - Message::Tributary { tributary, .. } => topic_for_tributary(*tributary).hash(), - Message::Cosign(_) => IdentTopic::new(BASE_TOPIC).hash(), + Message::Tributary { tributary, .. } => topic_for_tributary(*tributary), + Message::Cosign(_) => IdentTopic::new(BASE_TOPIC), } } } diff --git a/coordinator/src/p2p/libp2p/swarm.rs b/coordinator/src/p2p/libp2p/swarm.rs index 63f8f734..f4c5d7fe 100644 --- a/coordinator/src/p2p/libp2p/swarm.rs +++ b/coordinator/src/p2p/libp2p/swarm.rs @@ -57,8 +57,6 @@ pub(crate) struct SwarmTask { swarm: Swarm, - last_message: Instant, - gossip: mpsc::UnboundedReceiver, signed_cosigns: mpsc::UnboundedSender, tributary_gossip: mpsc::UnboundedSender<([u8; 32], Vec)>, @@ -255,8 +253,31 @@ impl SwarmTask { let message = message.expect("channel for messages to gossip was closed?"); let topic = message.topic(); let message = borsh::to_vec(&message).unwrap(); - let _: Result<_, _> = self.swarm.behaviour_mut().gossip.publish(topic, message); - self.last_message = Instant::now(); + + /* + If we're sending a message for this topic, it's because this topic is relevant to us. + Subscribe to it. + + We create topics roughly weekly, one per validator set/session. Once present in a + topic, we're interested in all messages for it until the validator set/session retires. + Then there should no longer be any messages for the topic as we should drop the + Tributary which creates the messages. + + We use this as an argument to not bother implement unsubscribing from topics. They're + incredibly infrequently created and old topics shouldn't still have messages published + to them. Having the coordinator reboot being our method of unsubscribing is fine. + + Alternatively, we could route an API to determine when a topic is retired, or retire + any topics we haven't sent messages on in the past hour. + */ + let behavior = self.swarm.behaviour_mut(); + let _: Result<_, _> = behavior.gossip.subscribe(&topic); + /* + This may be an error of `InsufficientPeers`. If so, we could ask DialTask to dial more + peers for this network. We don't as we assume DialTask will detect the lack of peers + for this network, and will already successfully handle this. + */ + let _: Result<_, _> = behavior.gossip.publish(topic.hash(), message); } request = self.outbound_requests.recv() => { @@ -310,8 +331,6 @@ impl SwarmTask { swarm, - last_message: Instant::now(), - gossip, signed_cosigns, tributary_gossip,