diff --git a/coordinator/src/main.rs b/coordinator/src/main.rs index 54fbbb9e..7a3451fd 100644 --- a/coordinator/src/main.rs +++ b/coordinator/src/main.rs @@ -67,8 +67,6 @@ pub enum TributaryEvent { TributaryRetired(ValidatorSet), } -// TODO: Clean up the actual underlying Tributary/Tendermint tasks - // Creates a new tributary and sends it to all listeners. async fn add_tributary( db: D, @@ -86,6 +84,7 @@ async fn add_tributary( let tributary = Tributary::<_, Transaction, _>::new( // TODO2: Use a db on a distinct volume to protect against DoS attacks + // TODO2: Delete said db once the Tributary is dropped db, spec.genesis(), spec.start_time(), diff --git a/coordinator/tributary/src/lib.rs b/coordinator/tributary/src/lib.rs index d9f50097..0d4d96ce 100644 --- a/coordinator/tributary/src/lib.rs +++ b/coordinator/tributary/src/lib.rs @@ -152,6 +152,14 @@ pub struct Tributary { synced_block: Arc>>>, synced_block_result: Arc>, messages: Arc>>>, + + p2p_meta_task_handle: Arc, +} + +impl Drop for Tributary { + fn drop(&mut self) { + self.p2p_meta_task_handle.abort(); + } } impl Tributary { @@ -186,26 +194,29 @@ impl Tributary { let to_rebroadcast = Arc::new(RwLock::new(vec![])); // Actively rebroadcast consensus messages to ensure they aren't prematurely dropped from the // P2P layer - tokio::spawn({ - let to_rebroadcast = to_rebroadcast.clone(); - let p2p = p2p.clone(); - async move { - loop { - let to_rebroadcast = to_rebroadcast.read().await.clone(); - for msg in to_rebroadcast { - p2p.broadcast(genesis, msg).await; + let p2p_meta_task_handle = Arc::new( + tokio::spawn({ + let to_rebroadcast = to_rebroadcast.clone(); + let p2p = p2p.clone(); + async move { + loop { + let to_rebroadcast = to_rebroadcast.read().await.clone(); + for msg in to_rebroadcast { + p2p.broadcast(genesis, msg).await; + } + tokio::time::sleep(core::time::Duration::from_secs(1)).await; } - tokio::time::sleep(core::time::Duration::from_secs(1)).await; } - } - }); + }) + .abort_handle(), + ); let network = TendermintNetwork { genesis, signer, validators, blockchain, to_rebroadcast, p2p }; let TendermintHandle { synced_block, synced_block_result, messages, machine } = TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; - tokio::task::spawn(machine.run()); + tokio::spawn(machine.run()); Some(Self { db, @@ -214,6 +225,7 @@ impl Tributary { synced_block: Arc::new(RwLock::new(synced_block)), synced_block_result: Arc::new(RwLock::new(synced_block_result)), messages: Arc::new(RwLock::new(messages)), + p2p_meta_task_handle, }) } diff --git a/coordinator/tributary/tendermint/tests/ext.rs b/coordinator/tributary/tendermint/tests/ext.rs index aa87ee29..8fd6242d 100644 --- a/coordinator/tributary/tendermint/tests/ext.rs +++ b/coordinator/tributary/tendermint/tests/ext.rs @@ -176,7 +176,7 @@ impl TestNetwork { TestBlock { id: 1u32.to_le_bytes(), valid: Ok(()) }, ) .await; - tokio::task::spawn(machine.run()); + tokio::spawn(machine.run()); write.push((messages, synced_block, synced_block_result)); } }