Add a TributaryReader which doesn't require a borrow to operate

Reduces lock contention.

Additionally changes block_key to include the genesis. While not technically
needed, the lack of genesis introduced a side effect where any Tributary on the
the database could return the block of any other Tributary. While that wasn't a
security issue, returning it suggested it was on-chain when it wasn't. This may
have been usable to create issues.
This commit is contained in:
Luke Parker
2023-04-24 06:50:40 -04:00
parent e0820759c0
commit e74b4ab94f
10 changed files with 178 additions and 154 deletions

View File

@@ -19,7 +19,7 @@ use serai_client::Serai;
use tokio::{sync::RwLock, time::sleep};
use ::tributary::{ReadWrite, Block, Tributary};
use ::tributary::{ReadWrite, Block, Tributary, TributaryReader};
mod tributary;
use crate::tributary::{TributarySpec, Transaction};
@@ -65,7 +65,7 @@ async fn add_tributary<D: Db, P: P2p>(
p2p: P,
tributaries: &mut HashMap<[u8; 32], ActiveTributary<D, P>>,
spec: TributarySpec,
) {
) -> TributaryReader<D, Transaction> {
let tributary = Tributary::<_, Transaction, _>::new(
// TODO: Use a db on a distinct volume
db,
@@ -78,10 +78,14 @@ async fn add_tributary<D: Db, P: P2p>(
.await
.unwrap();
let reader = tributary.reader();
tributaries.insert(
tributary.genesis(),
ActiveTributary { spec, tributary: Arc::new(RwLock::new(tributary)) },
);
reader
}
pub async fn scan_substrate<D: Db, Pro: Processor>(
@@ -123,6 +127,11 @@ pub async fn scan_tributaries<D: Db, Pro: Processor, P: P2p>(
mut processor: Pro,
tributaries: Arc<RwLock<HashMap<[u8; 32], ActiveTributary<D, P>>>>,
) {
let mut tributary_readers = vec![];
for ActiveTributary { spec, tributary } in tributaries.read().await.values() {
tributary_readers.push((spec.clone(), tributary.read().await.reader()));
}
// Handle new Tributary blocks
let mut tributary_db = tributary::TributaryDb::new(raw_db.clone());
loop {
@@ -133,28 +142,27 @@ pub async fn scan_tributaries<D: Db, Pro: Processor, P: P2p>(
{
let mut new_tributaries = NEW_TRIBUTARIES.write().await;
while let Some(spec) = new_tributaries.pop_front() {
add_tributary(
let reader = add_tributary(
raw_db.clone(),
key.clone(),
p2p.clone(),
// This is a short-lived write acquisition, which is why it should be fine
&mut *tributaries.write().await,
spec,
spec.clone(),
)
.await;
tributary_readers.push((spec, reader));
}
}
// TODO: Make a TributaryReader which only requires a DB handle and safely doesn't require
// locks
// Use that here
for ActiveTributary { spec, tributary } in tributaries.read().await.values() {
tributary::scanner::handle_new_blocks::<_, _, P>(
for (spec, reader) in &tributary_readers {
tributary::scanner::handle_new_blocks::<_, _>(
&mut tributary_db,
&key,
&mut processor,
spec,
&*tributary.read().await,
reader,
)
.await;
}
@@ -177,8 +185,8 @@ pub async fn heartbeat_tributaries<D: Db, P: P2p>(
for ActiveTributary { spec: _, tributary } in tributaries.read().await.values() {
let tributary = tributary.read().await;
let tip = tributary.tip().await;
let block_time =
SystemTime::UNIX_EPOCH + Duration::from_secs(tributary.time_of_block(&tip).unwrap_or(0));
let block_time = SystemTime::UNIX_EPOCH +
Duration::from_secs(tributary.reader().time_of_block(&tip).unwrap_or(0));
// Only trigger syncing if the block is more than a minute behind
if SystemTime::now() > (block_time + Duration::from_secs(60)) {
@@ -202,8 +210,8 @@ pub async fn handle_p2p<D: Db, P: P2p>(
let mut msg = p2p.receive().await;
match msg.kind {
P2pMessageKind::Tributary(genesis) => {
let tributaries_read = tributaries.read().await;
let Some(tributary) = tributaries_read.get(&genesis) else {
let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else {
log::debug!("received p2p message for unknown network");
continue;
};
@@ -215,8 +223,8 @@ pub async fn handle_p2p<D: Db, P: P2p>(
// TODO: Rate limit this
P2pMessageKind::Heartbeat(genesis) => {
let tributaries_read = tributaries.read().await;
let Some(tributary) = tributaries_read.get(&genesis) else {
let tributaries = tributaries.read().await;
let Some(tributary) = tributaries.get(&genesis) else {
log::debug!("received heartbeat message for unknown network");
continue;
};
@@ -264,12 +272,13 @@ pub async fn handle_p2p<D: Db, P: P2p>(
log::debug!("received heartbeat and selected to respond");
let reader = tributary_read.reader();
drop(tributary_read);
let mut latest = msg.msg.try_into().unwrap();
// TODO: All of these calls don't *actually* need a read lock, just access to a DB handle
// We can reduce lock contention accordingly
while let Some(next) = tributary_read.block_after(&latest) {
let mut res = tributary_read.block(&next).unwrap().serialize();
res.extend(tributary_read.commit(&next).unwrap());
while let Some(next) = reader.block_after(&latest) {
let mut res = reader.block(&next).unwrap().serialize();
res.extend(reader.commit(&next).unwrap());
p2p.send(msg.sender, P2pMessageKind::Block(tributary.spec.genesis()), res).await;
latest = next;
}
@@ -320,8 +329,14 @@ pub async fn run<D: Db, Pro: Processor, P: P2p>(
// Reload active tributaries from the database
// TODO: Can MainDb take a borrow?
for spec in MainDb(raw_db.clone()).active_tributaries().1 {
add_tributary(raw_db.clone(), key.clone(), p2p.clone(), &mut *tributaries.write().await, spec)
.await;
let _ = add_tributary(
raw_db.clone(),
key.clone(),
p2p.clone(),
&mut *tributaries.write().await,
spec,
)
.await;
}
// Handle new blocks for each Tributary