Remove reliance on a blockchain read lock from block/commit

This commit is contained in:
Luke Parker
2023-04-23 23:37:40 -04:00
parent c476f9b640
commit 215155f84b
6 changed files with 31 additions and 32 deletions

View File

@@ -175,8 +175,8 @@ pub async fn heartbeat_tributaries<D: Db, P: P2p>(
for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() { for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() {
let tributary = tributary.read().await; let tributary = tributary.read().await;
let tip = tributary.tip().await; let tip = tributary.tip().await;
let block_time = SystemTime::UNIX_EPOCH + let block_time =
Duration::from_secs(tributary.time_of_block(&tip).await.unwrap_or(0)); SystemTime::UNIX_EPOCH + Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0));
// Only trigger syncing if the block is more than a minute behind // Only trigger syncing if the block is more than a minute behind
if SystemTime::now() > (block_time + Duration::from_secs(60)) { if SystemTime::now() > (block_time + Duration::from_secs(60)) {

View File

@@ -124,14 +124,14 @@ pub async fn wait_for_tx_inclusion(
continue; continue;
} }
let mut queue = vec![tributary.block(&tip).await.unwrap()]; let mut queue = vec![tributary.block(&tip).unwrap()];
let mut block = None; let mut block = None;
while { while {
let parent = queue.last().unwrap().parent(); let parent = queue.last().unwrap().parent();
if parent == tributary.genesis() { if parent == tributary.genesis() {
false false
} else { } else {
block = Some(tributary.block(&parent).await.unwrap()); block = Some(tributary.block(&parent).unwrap());
block.as_ref().unwrap().hash() != last_checked block.as_ref().unwrap().hash() != last_checked
} }
} { } {

View File

@@ -46,7 +46,7 @@ async fn tx_test() {
// All tributaries should have acknowledged this transaction in a block // All tributaries should have acknowledged this transaction in a block
for (_, tributary) in tributaries { for (_, tributary) in tributaries {
let block = tributary.block(&included_in).await.unwrap(); let block = tributary.block(&included_in).unwrap();
assert_eq!(block.transactions, vec![tx.clone()]); assert_eq!(block.transactions, vec![tx.clone()]);
} }
} }

View File

@@ -307,7 +307,7 @@ pub async fn handle_new_blocks<D: Db, Pro: Processor, P: P2p>(
let mut blocks = VecDeque::new(); let mut blocks = VecDeque::new();
// This is a new block, as per the prior if check // This is a new block, as per the prior if check
blocks.push_back(tributary.block(&latest).await.unwrap()); blocks.push_back(tributary.block(&latest).unwrap());
let mut block = None; let mut block = None;
while { while {
@@ -317,7 +317,7 @@ pub async fn handle_new_blocks<D: Db, Pro: Processor, P: P2p>(
false false
} else { } else {
// Get this block // Get this block
block = Some(tributary.block(&parent).await.unwrap()); block = Some(tributary.block(&parent).unwrap());
// If it's the last block we've scanned, it's the end. Else, push it // If it's the last block we've scanned, it's the end. Else, push it
block.as_ref().unwrap().hash() != last_block block.as_ref().unwrap().hash() != last_block
} }

View File

@@ -29,12 +29,12 @@ impl<D: Db, T: Transaction> Blockchain<D, T> {
fn block_number_key(&self) -> Vec<u8> { fn block_number_key(&self) -> Vec<u8> {
D::key(b"tributary_blockchain", b"block_number", self.genesis) D::key(b"tributary_blockchain", b"block_number", self.genesis)
} }
fn block_key(&self, hash: &[u8; 32]) -> Vec<u8> { fn block_key(hash: &[u8; 32]) -> Vec<u8> {
// Since block hashes incorporate their parent, and the first parent is the genesis, this is // Since block hashes incorporate their parent, and the first parent is the genesis, this is
// fine not incorporating the hash unless there's a hash collision // fine not incorporating the hash unless there's a hash collision
D::key(b"tributary_blockchain", b"block", hash) D::key(b"tributary_blockchain", b"block", hash)
} }
fn commit_key(&self, hash: &[u8; 32]) -> Vec<u8> { fn commit_key(hash: &[u8; 32]) -> Vec<u8> {
D::key(b"tributary_blockchain", b"commit", hash) D::key(b"tributary_blockchain", b"commit", hash)
} }
fn next_nonce_key(&self, signer: &<Ristretto as Ciphersuite>::G) -> Vec<u8> { fn next_nonce_key(&self, signer: &<Ristretto as Ciphersuite>::G) -> Vec<u8> {
@@ -92,17 +92,17 @@ impl<D: Db, T: Transaction> Blockchain<D, T> {
self.block_number self.block_number
} }
pub(crate) fn block(&self, block: &[u8; 32]) -> Option<Block<T>> { pub(crate) fn block_from_db(db: &D, block: &[u8; 32]) -> Option<Block<T>> {
self db.get(Self::block_key(block))
.db
.as_ref()
.unwrap()
.get(self.block_key(block))
.map(|bytes| Block::<T>::read::<&[u8]>(&mut bytes.as_ref()).unwrap()) .map(|bytes| Block::<T>::read::<&[u8]>(&mut bytes.as_ref()).unwrap())
} }
pub(crate) fn commit_from_db(db: &D, block: &[u8; 32]) -> Option<Vec<u8>> {
db.get(Self::commit_key(block))
}
pub(crate) fn commit(&self, block: &[u8; 32]) -> Option<Vec<u8>> { pub(crate) fn commit(&self, block: &[u8; 32]) -> Option<Vec<u8>> {
self.db.as_ref().unwrap().get(self.commit_key(block)) Self::commit_from_db(self.db.as_ref().unwrap(), block)
} }
pub(crate) fn add_transaction(&mut self, internal: bool, tx: T) -> bool { pub(crate) fn add_transaction(&mut self, internal: bool, tx: T) -> bool {
@@ -155,8 +155,8 @@ impl<D: Db, T: Transaction> Blockchain<D, T> {
self.block_number += 1; self.block_number += 1;
txn.put(self.block_number_key(), self.block_number.to_le_bytes()); txn.put(self.block_number_key(), self.block_number.to_le_bytes());
txn.put(self.block_key(&self.tip), block.serialize()); txn.put(Self::block_key(&self.tip), block.serialize());
txn.put(self.commit_key(&self.tip), commit); txn.put(Self::commit_key(&self.tip), commit);
for tx in &block.transactions { for tx in &block.transactions {
match tx.kind() { match tx.kind() {

View File

@@ -83,6 +83,8 @@ impl<P: P2p> P2p for Arc<P> {
#[derive(Clone)] #[derive(Clone)]
pub struct Tributary<D: Db, T: Transaction, P: P2p> { pub struct Tributary<D: Db, T: Transaction, P: P2p> {
db: D,
genesis: [u8; 32], genesis: [u8; 32],
network: TendermintNetwork<D, T, P>, network: TendermintNetwork<D, T, P>,
@@ -104,7 +106,7 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
let signer = Arc::new(Signer::new(genesis, key)); let signer = Arc::new(Signer::new(genesis, key));
let validators = Arc::new(Validators::new(genesis, validators)?); let validators = Arc::new(Validators::new(genesis, validators)?);
let mut blockchain = Blockchain::new(db, genesis, &validators_vec); let mut blockchain = Blockchain::new(db.clone(), genesis, &validators_vec);
let block_number = BlockNumber(blockchain.block_number().into()); let block_number = BlockNumber(blockchain.block_number().into());
let start_time = if let Some(commit) = blockchain.commit(&blockchain.tip()) { let start_time = if let Some(commit) = blockchain.commit(&blockchain.tip()) {
@@ -121,7 +123,7 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
TendermintMachine::new(network.clone(), block_number, start_time, proposal).await; TendermintMachine::new(network.clone(), block_number, start_time, proposal).await;
tokio::task::spawn(machine.run()); tokio::task::spawn(machine.run());
Some(Self { genesis, network, synced_block, messages: Arc::new(RwLock::new(messages)) }) Some(Self { db, genesis, network, synced_block, messages: Arc::new(RwLock::new(messages)) })
} }
pub fn block_time() -> u32 { pub fn block_time() -> u32 {
@@ -132,29 +134,26 @@ impl<D: Db, T: Transaction, P: P2p> Tributary<D, T, P> {
self.genesis self.genesis
} }
// TODO: block, time_of_block, and commit shouldn't require acquiring the read lock
// These values can be safely read directly from the database since they're static
pub async fn block_number(&self) -> u32 { pub async fn block_number(&self) -> u32 {
self.network.blockchain.read().await.block_number() self.network.blockchain.read().await.block_number()
} }
pub async fn tip(&self) -> [u8; 32] { pub async fn tip(&self) -> [u8; 32] {
self.network.blockchain.read().await.tip() self.network.blockchain.read().await.tip()
} }
pub async fn block(&self, hash: &[u8; 32]) -> Option<Block<T>> {
self.network.blockchain.read().await.block(hash) // Since these values are static, they can be safely read from the database without lock
// acquisition
pub fn block(&self, hash: &[u8; 32]) -> Option<Block<T>> {
Blockchain::<D, T>::block_from_db(&self.db, hash)
} }
pub async fn time_of_block(&self, hash: &[u8; 32]) -> Option<u64> { pub fn commit(&self, hash: &[u8; 32]) -> Option<Vec<u8>> {
Blockchain::<D, T>::commit_from_db(&self.db, hash)
}
pub fn time_of_block(&self, hash: &[u8; 32]) -> Option<u64> {
self self
.network
.blockchain
.read()
.await
.commit(hash) .commit(hash)
.map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time) .map(|commit| Commit::<Validators>::decode(&mut commit.as_ref()).unwrap().end_time)
} }
pub async fn commit(&self, hash: &[u8; 32]) -> Option<Vec<u8>> {
self.network.blockchain.read().await.commit(hash)
}
pub async fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> { pub async fn provide_transaction(&self, tx: T) -> Result<(), ProvidedError> {
self.network.blockchain.write().await.provide_transaction(tx) self.network.blockchain.write().await.provide_transaction(tx)